这里我会总结一下,Springboot 集成 spring-kafka中,consumer 的相关配置,Api
这里的东西,比 Producer 稍微多一些
集成相关配置
server: port: 9000 spring: kafka: bootstrap-servers: 192.168.1.74:9092 consumer: group-id: group_id # 手动提交 enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session.timeout.ms: 60000 listener: type: batch log-container-config: false concurrency: 3 # 手动提交 ack-mode: manual_immediateSpring Kafka 集成Kafka,消息的监听者配置,使用特别简单,只有一个注解即可实现监听
虽然只有一个注解配置,需要注意消息的序列化,以及相关监听处理
示例一
@KafkaListener(topics = "users", groupId = "group_id") public void consume(String message, Acknowledgment acknowledgment) throws IOException { try { logger.info(String.format("#### -> Consumed message -> %s", message)); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { // 手动提交 offset acknowledgment.acknowledge(); } }示例二
import com.common.Bar2; import com.common.Foo2; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * 这里定义了消费者组,和多个 topic 的对应 * * @author Gary Russell * @since 5.1 */ @Component @KafkaListener(id = "multiGroup", topics = {"foos", "bars", "test"}) public class MultiMethods { private final Logger logger = LoggerFactory.getLogger(MultiMethods.class); @KafkaHandler public void foo(Foo2 foo) { System.out.println("Received: " + foo); } @KafkaHandler public void bar(Bar2 bar) { System.out.println("Received: " + bar); } @KafkaHandler(isDefault = true) public void unknown(Object object) { System.out.println("Received unknown: " + object); } @KafkaHandler public void tests(ConsumerRecord record) { logger.info("Received -> key :{} ", record.key()); } }