Spring Boot 整合 Apache Kafka

    技术2022-07-11  96

    前言

    由于本文主要简介 Spring Boot 整合 Apache Kafka,因此默认读者已经成功安装并已启动 Apache Kafka 服务。

    正文

    添加依赖

    在生产者和消费者项目中添加 spring-kafka 依赖,如下:

    <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.5.3.RELEASE</version> </dependency>

    由于笔者的项目用的 Spring Boot 版本为 2.3.0.RELEASE ,因此 spring-kafka 使用 2.5.x 版本,请注意版本对应,若版本不对应可能会出现诸如 ClassNotFoundException 之类的问题。

    spring-kafka 与 Spring Boot 版本对应可在 Spring for Apache Kafka 页面查看,下图是笔者写此文章时的版本对应关系截图:

    生产者配置

    在生产者所在项目的 application.yml 文件中添加配置:

    spring: kafka: producer: # 指定 kafka 服务器信息; bootstrap-servers: 192.168.1.120:9092

    说明:

    192.168.1.120 为笔者 kafka 服务所在 IP 地址,读者需要自行替换为自己 kafka 服务所在 IP 地址;

    9092 为笔者 kafka 服务端口,读者需要自行替换为自己 kafka 服务端口;

    若要连接多台 kafka 服务,可用半角逗号 , 分割多台 kafka 服务器信息;

    发送消息:

    @Autowired private KafkaTemplate kafkaTemplate; void send() { // 发送消息到指定 topic; kafkaTemplate.send("test-simple-topic", "消息内容"); }

    亦可通过添加回调异步处理消息发送成功、失败后的逻辑,如下:

    // 发送消息到指定 topic; ListenableFuture sendResult = kafkaTemplate.send("test-simple-topic", "消息内容"); // 接收异步回调; sendResult.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>(){ @Override public void onSuccess(SendResult<Integer, String> result) { System.out.println("发送成功," + result); // TODO: 执行发送成功的操作...... } @Override public void onFailure(Throwable ex) { System.out.println("发送失败"); // TODO: 执行发送失败的操作...... } });

    消费者配置

    在消费者所在项目的 application.yml 文件中添加配置:

    spring: kafka: consumer: # 指定 kafka 服务器信息; bootstrap-servers: 192.168.1.120:9092 # 消费者所在的组; group-id: kafka-test auto-offset-reset: latest enable-auto-commit: true

    消费消息:

    @Component public class MessageHandler { @KafkaListener(topics = {"test-simple-topic"}) public void handleMessage(String message) { System.out.println("开始处理消息......"); System.out.println(message); System.out.println("消息处理完成"); } }

    说明:

    @KafkaListener 可以接收多个参数,可通过在 @KafkaListener 中指定 groupId 值以覆盖 application.yml 中的 group-id 值;

    测试

    生产消息

    启动生产者项目,调用 send() 方法;

    消费消息

    启动消费者项目,当有新消息时,能看到 handle1(String message) 方法被调用并打印如下信息:

    开始处理消息...... 1593588193582 消息处理完成

    至此,我们已经在 Spring Boot 中简单地集成了 Apache Kafka,读者亦可在 此处 查看其他用法。

    本文中的示例代码可在 此处 下载。

    参考链接

    《Spring Boot Reference Documentation》: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#boot-features-kafka

    《Spring Kafka 2.5.3.RELEASE API》:https://docs.spring.io/spring-kafka/docs/2.5.3.RELEASE/api/

    Processed: 0.011, SQL: 9