maven核心引入:
<!-- kafka应用--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>其他maven引入(方便测试):
<!--Spring Boot 测试组件--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>2.0.9.RELEASE</version> <scope>test</scope> <exclusions> <exclusion> <groupId>com.vaadin.external.google</groupId> <artifactId>android-json</artifactId> </exclusion> </exclusions> </dependency> <!-- 测试应用--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <!-- swagger应用--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.7.0</version> </dependency> <!-- mvc应用--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- lombok应用--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- 测试应用--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency>配置swaggerUI:
注意RequestHandlerSelectors.basePackage(“com.xdoc.template.module”),将这个包名改成自己的
@Configuration @EnableSwagger2 public class SwaggerConfig extends WebMvcConfigurerAdapter { private boolean enableSwagger=true; @Autowired private ApplicationContext applicationContext; @PostConstruct public void setObjectMapper() { ObjectMapper objectMapper = new ObjectMapper(); SimpleModule module = new SimpleModule(); objectMapper.registerModule(module); JacksonAnnotationIntrospector jacksonAnnotationIntrospector= new JacksonAnnotationIntrospector(); objectMapper.setAnnotationIntrospector(new JacksonAnnotationIntrospector() { @Override public boolean isAnnotationBundle(Annotation ann) { if (ann.annotationType() == JSONField.class) { return true; } return super.isAnnotationBundle(ann); } @Override public PropertyName findNameForSerialization(Annotated a) { PropertyName nameForSerialization = super.findNameForSerialization(a); if (nameForSerialization == null || nameForSerialization == PropertyName.USE_DEFAULT) { JSONField jsonField = _findAnnotation(a, JSONField.class); if (jsonField != null) { return PropertyName.construct(jsonField.name()); } } return nameForSerialization; } @Override public PropertyName findNameForDeserialization(Annotated a) { PropertyName nameForDeserialization = super.findNameForDeserialization(a); if (nameForDeserialization == null || nameForDeserialization == PropertyName.USE_DEFAULT) { JSONField jsonField = _findAnnotation(a, JSONField.class); if (jsonField != null) { return PropertyName.construct(jsonField.name()); } } return nameForDeserialization; } }); ObjectMapperConfigured objectMapperConfigured = new ObjectMapperConfigured(applicationContext, objectMapper); applicationContext.publishEvent(objectMapperConfigured); } @Bean public Docket api() { return new Docket(DocumentationType.SWAGGER_2).groupName("framework").enable(enableSwagger) .genericModelSubstitutes(DeferredResult.class).useDefaultResponseMessages(false) .forCodeGeneration(false).pathMapping("/").apiInfo(apiInfo()).select() .apis(RequestHandlerSelectors.basePackage("com.xdoc.template.module")).build() } private ApiInfo apiInfo() { return new ApiInfoBuilder().title("kafka测试").description("测试文档").termsOfServiceUrl("") .contact("dragon").version("1.0").build(); } }application.yml
spring: kafka: bootstrap-servers: 112.126.57.37:9092,112.126.57.37:9093 producer: # 生产者————序列化机制配置 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-deserializer: org.apache.kafka.common.serialization.StringSerializer # 生产者————消息缓存配置 batch-size: 65536 buffer-memory: 524288 listener: concurrency: 1 type: batch consumer: # 消费者————标识/事务配置 group-id: tencent-trip max-poll-records: 20 auto-commit-interval: 100 enable-auto-commit: true # 消费者————序列化器配置 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer我们首先演示一个简单的发布订阅模式,就是消费者利用监听模式获取消费信息; 消费者配置类
@Component @Slf4j public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.key-deserializer}") private String consumerKeyDeserializer; @Value("${spring.kafka.consumer.value-deserializer}") private String consumerValueDeserializer; @Value("${spring.kafka.consumer.max-poll-records}") private String maxPollRecords; @Value("${spring.kafka.consumer.auto-commit-interval}") private String autoCommitIntervalMs; @Value("${spring.kafka.consumer.enable-auto-commit}") private String enableAutoCommit; @Value("${spring.kafka.listener.concurrency}") private Integer concurrency; /** * 消费者—poll配置的时候会用到 */ @Bean public KafkaConsumer kafkaConsumer(){ KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfigs()); kafkaConsumer.subscribe(Collections.singletonList("test1")); return kafkaConsumer; } /** * 消费者——监听器整体配置 * @return */ @Bean public ConcurrentKafkaListenerContainerFactory ContainerFactory() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); //禁止自动启动 container.setAutoStartup(false); container.setBatchListener(true); container.setConcurrency(concurrency); return container; } /** * 消费者配置生成 * @return */ private Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//连接地址 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//组标识 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumerKeyDeserializer);//序列化配置 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumerValueDeserializer);//序列化配置 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//每一批拉取的数量,监听时 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);//是否自动提交消费位移 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);//消费位移自动提交时间间隔 return props; } }生产者配置类
@Component @Slf4j public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.key-serializer}") private String producerKeyDeserializer; @Value("${spring.kafka.producer.value-deserializer}") private String producerValueDeserializer; /** * 生产者——kafkaTemplate配置 * @return */ @DependsOn("producerFactory") @Bean public KafkaTemplate<Integer, String> kafkaTemplate(KafkaGlobalHandler kafkaGlobalHandler,ProducerFactory producerFactory) { KafkaTemplate template = new KafkaTemplate<String, String>(producerFactory); template.setDefaultTopic("defalutTopic"); template.setProducerListener(kafkaGlobalHandler); return template; } /** * 生产者配置工厂 * @return */ @Bean public ProducerFactory<Integer, String> producerFactory() { DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); return factory; } /** * 生产者者配置生成 * @return */ private Map<String, Object> producerConfigs() { Map<String, Object> producerConfigs = new HashMap<>(); producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerConfigs.put(ProducerConfig.RETRIES_CONFIG, 1);// 重试,0为不启用重试机制 producerConfigs.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量 producerConfigs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000); // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录 producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producerKeyDeserializer); producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,producerValueDeserializer); return producerConfigs; } }创建生产者的api接口
@Api(tags = "kafka——Api测试") @RestController public class KafkaProducerController { @Autowired KafkaTemplate kafkaTemplate; @PostMapping("/kafkaTest/sendMessage") public ResponseVO sendMessage(String message){ kafkaTemplate.send("test1", message); return ApiResult.success("成功"); } }创建监听者的处理接口
@Component public class KafkaComuserListener { @KafkaListener(topics = "testTopic") public void receiveMessage(String message){ System.out.println("testTopic:"+message); } }测试 输入swaggerUI地址:http://localhost:30002/swagger-ui.html# 发送消息: 观察控制台:
默认主题可以添加这个测试,但注意默认消费者要在template中配置好topic
@KafkaListener(topics = "defalutTopic") public void receiveDefalutTopicMessage(String message){ System.out.println("defalutTopic: " + message); System.out.println("defalutTopic: " + message); System.out.println("defalutTopic: " + message); System.out.println("defalutTopic: " + message); System.out.println("defalutTopic: " + message); System.out.println("defalutTopic: " + message); System.out.println("defalutTopic: " + message); System.out.println("defalutTopic: " + message); System.out.println("defalutTopic: " + message); System.out.println("defalutTopic: " + message); System.out.println("defalutTopic: " + message); System.out.println("defalutTopic: " + message); System.out.println("defalutTopic: " + message); }手动提交
kafka将消费位移交给消费者自己去管理那么,如何设置手动提交呢?
首先关闭消费者的自动提交功能
spring: kafka: …… consumer: …… auto-commit-interval: 100 enable-auto-commit: false ……我们利用junit测试类去测试
@RunWith(SpringRunner.class) @SpringBootTest class DemoApplicationTests { @Autowired KafkaTemplate KafkaConsumer; @Test void contextLoads() { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); if(records.count()!=0){ consumer.commitSync(); } } } }发送消息 所有api
consumer.commitSync();//同步提交位移 consumer.commitAsync();//异步提交位移 consumer.wakeup();//其他线程调用一次wakeup可以中断while Ture循环 consumer.close();//被中断的线程需要调用关闭按钮让出连接权,让另外一个consumer建立连接 consumer.seek(new TopicPartition("test1",2),2);//更改consumer的拉取位置,seek()只是指定了poll()拉取的开始位移,这并不影响在Kafka中保存的提交位移 /** * 获取某个主题的分区信息 */ List<PartitionInfo> partitionInfoList = consumer.partitionsFor("topic1"); /** * 让消费者固定分区和指定消费 * 可以遍历分区信息区绑定 */ partitionInfoList.forEach(partitionInfo->{ consumer.assign(Collections.singletonList(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()))); }); //重平衡监听 consumer.subscribe(Arrays.asList("test1"), new ConsumerRebalanceListener() { public void onPartitionsAssigned(Collection<TopicPartition> partitions) { //均衡之前 } public void onPartitionsRevoked(Collection<TopicPartition> partitions) {//均衡之后 } });重平衡监听
/** * @消费者重平衡监听器 */ consumer.subscribe(Arrays.asList("test1"), new ConsumerRebalanceListener() { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { //均衡之前 /** * 业务还没处理完,先提交位移然后保存数据库 */ } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) {//均衡之后 /** * 从数据库拿出未处理的消息,接着处理 */ } });NetworkClient是SpringBoot中作为消费者向kafak服务器发送的网络关键类,利用这个类我们可以看到提交前的数据,重而排查一些问题; 然后Fetcher是作为拉取者最核心的部分,我们关注一下这个方法就行;
public synchronized int sendFetches() { // Update metrics in case there was an assignment change sensors.maybeUpdateAssignment(subscriptions); Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests(); for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) { final Node fetchTarget = entry.getKey(); final FetchSessionHandler.FetchRequestData data = entry.getValue(); final FetchRequest.Builder request = FetchRequest.Builder .forConsumer(this.maxWaitMs, this.minBytes, data.toSend()) .isolationLevel(isolationLevel) .setMaxBytes(this.maxBytes) .metadata(data.metadata()) .toForget(data.toForget()) .rackId(clientRackId); if (log.isDebugEnabled()) { log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget); } RequestFuture<ClientResponse> future = client.send(fetchTarget, request); // We add the node to the set of nodes with pending fetch requests before adding the // listener because the future may have been fulfilled on another thread (e.g. during a // disconnection being handled by the heartbeat thread) which will mean the listener // will be invoked synchronously. this.nodesWithPendingFetchRequests.add(entry.getKey().id()); future.addListener(new RequestFutureListener<ClientResponse>() { @Override public void onSuccess(ClientResponse resp) { synchronized (Fetcher.this) { try { @SuppressWarnings("unchecked") FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody(); FetchSessionHandler handler = sessionHandler(fetchTarget.id()); if (handler == null) { log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", fetchTarget.id()); return; } if (!handler.handleResponse(response)) { return; } Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet()); FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions); for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) { TopicPartition partition = entry.getKey(); FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition); if (requestData == null) { String message; if (data.metadata().isFull()) { message = MessageFormatter.arrayFormat( "Response for missing full request partition: partition={}; metadata={}", new Object[]{partition, data.metadata()}).getMessage(); } else { message = MessageFormatter.arrayFormat( "Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}", new Object[]{partition, data.metadata(), data.toSend(), data.toForget()}).getMessage(); } // Received fetch response for missing session partition throw new IllegalStateException(message); } else { long fetchOffset = requestData.fetchOffset; FetchResponse.PartitionData<Records> partitionData = entry.getValue(); log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", isolationLevel, fetchOffset, partition, partitionData); Iterator<? extends RecordBatch> batches = partitionData.records.batches().iterator(); short responseVersion = resp.requestHeader().apiVersion(); completedFetches.add(new CompletedFetch(partition, partitionData, metricAggregator, batches, fetchOffset, responseVersion)); } } sensors.fetchLatency.record(resp.requestLatencyMs()); } finally { nodesWithPendingFetchRequests.remove(fetchTarget.id()); } } } @Override public void onFailure(RuntimeException e) { synchronized (Fetcher.this) { try { FetchSessionHandler handler = sessionHandler(fetchTarget.id()); if (handler != null) { handler.handleError(e); } } finally { nodesWithPendingFetchRequests.remove(fetchTarget.id()); } } } }); } return fetchRequestMap.size(); }