Kafka的基础使用

    技术2022-07-11  93

    在代码中使用Kafka其实也是非常简单的,首先肯定是需要引入相关的依赖,如下:

    <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.1</version> </dependency>

    其中引入依赖的版本建议和使用的Kafka保持一致,和Zookeeper一样的,这里我们也是可以通过在Kafka的安装包下的/lib目录下查看到其相关版本,如下:

    然后我们需要创建一个的主题,创建主题我们可以使用之前Kafka的安装、管理和配置中,介绍的命令行的方式,如主题名为my-topic,1副本,2分区的主题,如下: kafka-topics.bat --zookeeper localhost:2181/kafka --create --replication-factor 1 --partitions 2 --topic my-topic

    当然我们也可以不用手动创建,因为Kafka是允许自动创建主题。


    接来下其实和其它MQ类似,我们肯定是需要生产者发送消息,和消费者接受消息。这里我们就先来看一看生产者发送消息的过程,其中创建生产者对象时有三个属性必须指定。

    bootstrap.servers

    该属性指定Broker的地址清单,地址的格式为host:port 。如果是Kafka集群,清单里也不需要包含所有的Broker地址,生产者会从给定的Broker里查询其他Broker的信息。不过最少提供2个broker的信息,一旦其中一个宕机,生产者仍能连接到集群上。

    key.serializer

    生产者接口允许使用参数化类型,可以把Java对象作为键和值传Broker,但是Broker希望收到的消息的键和值都是字节数组,所以必须提供将对象序列化成字节数组的序列化器。

    key.serializer必须设置为实现org.apache.kafka.common.serialization.Serializer的接口类,Kafka的客户端默认提供了ByteArraySerializer、IntegerSerializer、StringSerializer等等,当然也可以实现自定义的序列化器。

    value.serializer

    该参数和上述key.serializer其作用及要求是一致的。

    public class MyKafkaProducer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093"); //properties.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); try { ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "name", "BXS"); producer.send(record); } finally { producer.close(); } } }

    然后我们继续来看看消费者接受消息时,其必需的参数,如下:

    bootstrap.servers

    同上生产者的bootstrap.servers配置

    key.deserializer

    生产者在使用了序列化器将其对象序列化为字节数据,那么消费者在接收到消息后,肯定也是需要进行反序列化处理的,所有这里就需要谁知 生产者接口允许使用参数化类型,可以把Java对象作为键和值传Broker,但是Broker希望收到的消息的键和值都是字节数组,所以必须提供将对象序列化成字节数组的序列化器。

    key.deserializer必须设置为实现org.apache.kafka.common.serialization.Deserializer的接口类,Kafka的客户端默认提供了ByteArrayDeserializer、IntegerDeserializer、StringDeserializer等等,当然也可以实现自定义的序列化器。

    value.deserializer

    该参数和上述key.deserializer其作用及要求是一致的。

    group.id

    并非完全必需,它指定了消费者属于哪一个群组,但是创建不属于任何一个群组的消费者并没有问题。

    public class MyKafkaConsumer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); //properties.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_0"); //properties.put("group.id", "group_0"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList("my-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(500); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("主题:%s, 分区:%d, 偏移量:%d, key:%s, value:%s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); } } } finally { consumer.close(); } } }

    Processed: 0.009, SQL: 9