使用消息中间件无非就是异步,解耦,削峰。 现在最热门就是Kafka了。
本文将本地kafka+springboot服务搭建起来
首先下载kafka和zookeeper kafka_2.12-1.1.0 下载地址:http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz
zookeeper-3.4.12 下载地址:http://mirror.bit.edu.cn/apache/zookeeper/
我使用的版本是 kafka_2.12-2.5.0.tgz zookeeper-3.4.14.tar.gz
解压安装: 一、zookeeper部署
1、解压zookeeper 到 E:\study\kafka\zookeeper-3.4.12 目录
2、打开:E:\study\kafka\zookeeper-3.4.12\conf,把zoo_sample.cfg重命名成zoo.cfg ,打开zoo.cfg文件,找到如下信息并做修改
dataDir=E:\study\kafka\data\logs\zookeeper dataLogDir=E:\study\kafka\data\logs\zookeeper3、添加zookeeper环境变量
ZOOKEEPER_HOME:E:\study\kafka\zookeeper-3.4.12
Path中追加 ;%ZOOKEEPER_HOME%\bin;
4、运行zookeeper
看到上面的信息标识zookeeper已经顺利完成部署。
二 、kafka部署
1、解压kafka 到 E:\study\kafka\kafka_2.12-1.1.0 目录
2、打开E:\study\kafka\kafka_2.12-1.1.0\server.properties
3、找到log.dirs配置项,修改为log.dirs=E:\study\kafka\data\logs\kafka
4、使用PowerShell窗口(如果使用cmd,命令启动kafka会报命令太长,语法不正确),cd 进入E:\study\kafka\kafka_2.12-1.1.0\bin\windows 目录
5、输入命令,
.\kafka-server-start.bat ../../config/server.properties如果看到如下信息提示,则表示kafka部署成功
现在的kafka已经集成了zookeeper,但是还是另外下载了zookeeper,主要是自带的这个默认配置是单机版的,一般来说生产环境肯定是要做集群来保证高可用,如果直接改自带的这个配置也不是不可以,但万一改错了把 Kafka 弄坏了咋整?再一个是一般来说 ZK 集群和 Kafka 集群应该分离才对,但二进制包里它俩耦合在一起了,为了部署 ZK 集群连带着拷贝了 Kafka 目录有点儿多余(费点儿心思单独拷出来也不是不行……),所以索性不去动它,另起一套。
kafka的server.properties配置文件默认已经配置了zookeeper地址zookeeper.connect=localhost:2181
三:springboot集成kafka
1.pom.xml文件引入相关jar包
<!--kafka--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.5.RELEASE</version> </dependency>这里springboot是2.1.1.RELEASE版本,kafka-clients是2.1.0和spring-kafka是2.2.5.RELEASE
这里是有版本关联的: spring官方描述的spring-kafka的版本和kafka-clients的版本对应关系
2:首先定义一个bean用来发送消息的载体
import lombok.Data; import lombok.experimental.Accessors; /** * @Author 18011618 * @Description 定义用户发送的日志数据 * @Date 14:42 2018/7/20 * @Modify By */ @Data @Accessors(chain = true) public class UserLog { private String username; private String userid; private String state; }3:创建消息生产者
import com.alibaba.fastjson.JSON; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * @author [jiangbohao] * @Date: 2020/7/4 */ @Component public class UserLogProducer { @Autowired private KafkaTemplate kafkaTemplate; /** * 发送数据 * * @param userid */ public void sendLog(String userid) { UserLog userLog = new UserLog(); userLog.setUsername("jhp"); userLog.setUserid(userid); userLog.setState("0"); System.err.println("发送用户日志数据:" + userLog); kafkaTemplate.send("user-log", JSON.toJSONString(userLog)); } }消息的发送直接使用KafkaTemplate模板即可,都封装好了,直接使用
4:创建消息消费者
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** * @author [jiangbohao] * @Date: 2020/7/4 */ @Component public class UserLogConsumer { @KafkaListener(topics = {"user-log"}) public void consumer(ConsumerRecord<?,?> consumerRecord){ //判断是否为null Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value()); System.out.println(">>>>>>>>>> record =" + kafkaMessage); if(kafkaMessage.isPresent()){ //得到Optional实例中的值 Object message = kafkaMessage.get(); System.err.println("消费消息:"+message); } } }消费机制是通过监听器实现的,直接使用这个注解接口,它可以根据指定的条件进行消息的监听:
5:写一个测试类,项目启动的时候运行, @PostConstruct注解的方法在项目启动的时候执行这个方法,也可以理解为在spring容器启动的时候执行,可作为一些数据的常规化加载,比如数据字典之类的。同时项目启动后执行还可以实现CommandLineRunner接口
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * @author [jiangbohao] * @Date: 2020/7/4 */ @Component public class Test { @Autowired private UserLogProducer userLogProducer; @PostConstruct public void init(){ for (int i = 0; i < 10; i++) { //调用消息发送类中的消息发送方法 userLogProducer.sendLog(String.valueOf(i)); } } }配置文件:
spring: #kafka kafka: bootstrap-servers: localhost:9092 # 指定kafka 代理地址,可以多个 #=============== provider ======================= producer: retries: 0 #发送错误后,消息重发次数 batch-size: 16384 #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 buffer-memory: 33554432 # 设置生产者内存缓冲区的大小。 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 #=============== consumer ======================= # 指定默认消费者group id consumer: group-id: user-log-group # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录 auto-offset-reset: earliest # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: true # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D auto-commit-interval: 100 # 指定消息key和消息体的编解码方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer6:控制台输出如下
