入门案例
import com.test.demo.config.KafkaConfig; import com.test.demo.pojo.People; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.regex.Pattern; /** * @author bigTree */ public class ConsumersOne { private static final String CONSUMER_KEY_SER = StringDeserializer.class.getName(); private static final String CONSUMER_VALUE_SER = StringDeserializer.class.getName(); private static final String BROKER_LIST = "hdp01:9092,hdp02:9092,hdp03:9092"; private static final String GROUP_ID = "test-01"; private static final String TOPIC = "producer_one"; private static final Pattern PATTERN_TOPIC = Pattern.compile("producers.*"); /** * 消费者初始化 */ public static Properties getInitConsumer(String groupId) { Properties properties = new Properties(); properties.put("bootstrap.servers", BROKER_LIST); properties.put("key.deserializer", CONSUMER_KEY_SER); properties.put("value.deserializer", CONSUMER_VALUE_SER); properties.put("group.id", groupId); return properties; } public static void main(String[] args) { //基础配置 Properties init = getInitConsumer(GROUP_ID); /** * 默认自动提交是定期提交,周期时间由参数auto.commit.imterval.ms配置,默认5秒 */ //关闭自动提交 init.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); init.replace("value.deserializer", DefineDeSerializer.class.getName()); KafkaConsumer<String,People> consumer = new KafkaConsumer<>(init); /** * 订阅主题三种方式: * 1、订阅主题 subscribe() 2、订阅正则匹配主题 subscribe() 3、指定特定分区消费 assign() * 1)partitionsFor(TOPIC):可获取元数据信息 2)assignment():获取消费者分区信息 */ //指定主题subscribe consumer.subscribe(Collections.singletonList(TOPIC)); //正则匹配 // consumer.subscribe(PATTERN_TOPIC); //消费指定分区 // consumer.assign(Arrays.asList(new TopicPartition(TOPIC,0))); //取消订阅 // consumer.unsubscribe(); //暂停分区消费 consumer.pause(Collections.singletonList(new TopicPartition(TOPIC,1))); //获取以前调用暂停的一组分区 //Set<TopicPartition> paused = consumer.paused(); //恢复消费 //consumer.resume(paused); try { while(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); if(!records.isEmpty()){ System.out.println("消息数量=" + records.count()); for (ConsumerRecord<String, String> record : records) { String topic = record.topic(); int partition = record.partition(); String value = record.value(); System.out.println("topic:" + topic + "\t" + "partition:" + partition + "\t" + "value:" + value); } } } catch (Exception e){ e.printStackTrace(); }finally { consumer.close(); } } }分区粒度同步提交
while(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); if(!records.isEmpty()){ //partitions() 获取消费数据的分区信息 Set<TopicPartition> partitions = records.partitions(); for(TopicPartition tp: partitions){ //records()获取指定分区的记录 List<ConsumerRecord<String, String>> consumerRecords= records.records(tp); if(!consumerRecords.isEmpty()){ for(ConsumerRecord record: consumerRecords){ int partition = record.partition(); Object value = record.value(); long offset = record.offset(); System.out.println("partition = "+ partition +",value = "+ value +",offset = "+ offset); } Long lastOffset = consumerRecords.get(consumerRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastOffset))); } } } }异步提交
hile(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); if(!records.isEmpty()){ System.out.println("消息数量=" + records.count()); for (ConsumerRecord<String, String> record : records) { String topic = record.topic(); int partition = record.partition(); String value = record.value(); System.out.println("topic:" + topic + "\t" + "partition:" + partition + "\t" + "value:" + value); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception == null){ System.out.println("offset commit successful" + offsets); }else { System.out.println("offset commit fail !"); } } }); } }seek()可以从分区重置位移消费
/** * 消费位移相关api: * 1)consumer.committed(topicPartition).offset() 分区最近一次提交的位移 * 2)consumer.position(topicPartition) 下一次拉取消息的位移 * 3)partitionRecords.get(partitionRecords.size() - 1).offset() 分区最新的消费位移 * 4)consumer.offsetsForTimes() 根据时间戳返回大于等于参数时间戳的第一条消息的分区,时间戳,偏移量 * 5)consumer.beginningOffsets() 保留在分区中的消息的最开始的偏移量 * 6)consumer.endOffsets() 保留在分区中的最后一条写入的消息的偏移量 */ while(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); if(!records.isEmpty()){ partitions = records.partitions(); for(TopicPartition tp: partitions){ List<ConsumerRecord<String, String>> consumerRecords = records.records(tp); if(!consumerRecords.isEmpty()){ long commitOffset = consumer.committed(tp).offset(); long recordOffset = consumerRecords.get(consumerRecords.size() - 1).offset(); long endOffsets = consumer.endOffsets(Collections.singletonList(tp)).get(tp); long beginOffset = consumer.beginningOffsets(Collections.singletonList(tp)).get(tp); long endOffset = consumer.endOffsets(Collections.singletonList(tp)).get(tp); long timeOffset = consumer.offsetsForTimes(timestampsToSearch).get(tp).offset(); } } } }