kafka—生产者相关API

    技术2023-07-20  66

    kafka—生产者API

    maven依赖

    <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>

    入门案例

    import com.test.demo.config.KafkaConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * @author bigTree */ public class ProducersOne { public static final String TOPIC = "producers_one"; private static final String PRODUCER_KEY_SER = StringSerializer.class.getName(); private static final String PRODUCER_VALUE_SER = StringSerializer.class.getName(); private static final String BROKER_LIST = "hdp01:9092,hdp02:9092,hdp03:9092"; /** * 生产者初始化 */ public static Properties getInitProducer() { Properties properties = new Properties(); properties.put("bootstrap.servers", BROKER_LIST); properties.put("key.serializer", PRODUCER_KEY_SER); properties.put("value.serializer", PRODUCER_VALUE_SER); return properties; } public static void main(String[] args) { //基础配置 Properties init = getInitProducer(); //获取生产者 KafkaProducer<String, People> producer = new KafkaProducer<>(init); //指定消息发送的相关配置 String message = "hello, Tom "; ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC, message); //发送消息 try{ for (int i = 0; i < 10; i++) { producer.send(record); } }catch (Exception e){ e.printStackTrace(); }finally { //释放资源 producer.close(); } } }

    消息发送三种模式

    1.发后即忘(fire-and-forget):发送消息不关注是否发送成功。性能最好,可靠性最差 2.异步(async):send方法本身是异步的,使用Callback方式:kafka响应时回调,要么发送成功要么抛出异常

    try{ producer.send(record, new Callback() { //RecordMetadata和Exception是互斥的 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); }else{ String topic = metadata.topic(); int partition = metadata.partition(); System.out.println("topic:"+ topic +",partition:"+ partition); } } }); }catch (Exception e){ e.printStackTrace(); }finally { producer.close(); }

    3.同步(sync):send返回的Future对象链式调用get(),阻塞式等待返回结果

    try { producer.send(record).get(); 方式一:可设置超时时间 //producer.send(record).get(1000L,MILLISECONDS); //方式二:可获取一些元数据信息 // Future<RecordMetadata> future = producer.send(record); // RecordMetadata metadata = future.get(); // String topic = metadata.topic(); // int partition = metadata.partition(); // System.out.println("topic:"+ topic +",partition:"+ partition); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }finally { producer.close(); }
    Processed: 0.011, SQL: 9