拦截器接口分析
package org.apache.kafka.clients.producer; import org.apache.kafka.common.Configurable; //生产者拦截器可做消息发送前以及producer回调前的定制化需求,允许用户指定多个Interceptor按照顺序作用于一条消息从而形成一个拦截链 public interface ProducerInterceptor<K, V> extends Configurable { /** * 该方法封装于KafkaProducer.send()方法中,运行在用户主线程 * Producer确保在消息序列化前调用该方法,可以对消息进行任意操作,但慎重修改消息的topic、key和partition,会影响分区以及日志压缩 */ public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); /** * 该方法在消息被应答之前或者发送失败时调用,并且通常都是在callback()触发之前执行,运行在IO线程中 * 代码逻辑尽量简单,否则影响消息发送效率 */ public void onAcknowledgement(RecordMetadata metadata, Exception exception); /** * This is called when interceptor is closed */ public void close(); }自定义拦截器
public class DefineInterceptor implements ProducerInterceptor { AtomicInteger success = new AtomicInteger(); AtomicInteger filter = new AtomicInteger(); @Override public ProducerRecord onSend(ProducerRecord record) { String newValue = "Interceptor-test--" + record.value(); return new ProducerRecord(record.topic(),record.partition(),record.timestamp(),record.key(),newValue); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (metadata == null) { filter.getAndIncrement(); }else { success.getAndIncrement(); } } /** * 生产者的producer.close触发 */ @Override public void close() { double successRate = (double) success.get() / (success.get() + filter.get()); System.out.println("消息发送成功率" + successRate*100 +"%"); } @Override public void configure(Map<String, ?> configs) {} }指定拦截器
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, DefineInterceptor.class.getName())