自定义消费者拦截器
//可以在消费到消息或在提交消费位移时进行一些定制化的操作 public class ConsumerInterceptor implements org.apache.kafka.clients.consumer.ConsumerInterceptor { @Override public void configure(Map<String, ?> configs) {} /** * 提交位移之后调用 */ @Override public ConsumerRecords onConsume(ConsumerRecords records) { List<ConsumerRecord<String, String>> filterRecords = new ArrayList<> (); Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords= new HashMap<> (); Set<TopicPartition> partitions = records.partitions(); for(TopicPartition tp : partitions){ List<ConsumerRecord<String, String>> consumerRecords = records.records(tp); for(ConsumerRecord<String, String> record: consumerRecords){ if(!record.value().startsWith("test-")) { filterRecords.add(record); } } if(filterRecords.size() > 0){ newRecords.put(tp, filterRecords); } } return new ConsumerRecords<>(newRecords); } /** * poll()方法返回之前调用对消息进行定制化 */ @Override public void onCommit(Map offsets) { offsets.forEach((k,v) -> System.out.println("tp = "+ k +"\toffset = "+ v)); } @Override public void close() {} }