kafka

    技术2026-02-08  2

    kafka 1.什么是kafka 消息队列,消息中间件 2.kafka的特点 1.解耦:   允许你独⽴的扩展或修改两边的处理过程,只要确保它们遵守同样的接⼝约束。 2.冗余:   消息队列把数据进⾏持久化直到它们已经被完全处理,通过这⼀⽅式规避了数据丢失⻛险。许多消息队 列所采⽤的"插⼊-获取-删除"范式中,在把⼀个消息从队列中删除之前,需要你的处理系统明确的指出该消 息已经被处理完毕,从⽽确保你的数据被安全的保存直到你使⽤完毕。 3.扩展性:   因为消息队列解耦了你的处理过程,所以增⼤消息⼊队和处理的频率是很容易的,只要另外增加处理过 程即可。 4.灵活性 & 峰值处理能⼒:   在访问量剧增的情况下,应⽤仍然需要继续发挥作⽤,但是这样的突发流量并不常⻅。如果为以能处理 这类峰值访问为标准来投⼊资源随时待命⽆疑是巨⼤的浪费。使⽤消息队列能够使关键组件顶住突发的访问 压⼒,⽽不会因为突发的超负荷的请求⽽完全崩溃。 5.可恢复性:   系统的⼀部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使⼀个处理 消息的进程挂掉,加⼊队列中的消息仍然可以在系统恢复后被处理。 6.顺序保证:   在⼤多使⽤场景下,数据处理的顺序都很重要。⼤部分消息队列本来就是排序的,并且能保证数据会按 照特定的顺序来处理。(Kafka 保证⼀个 Partition 内的消息的有序性) 7.缓冲:   有助于控制和优化数据流经过系统的速度,解决⽣产消息和消费消息的处理速度不⼀致的情况。 8.异步通信:   很多时候,⽤户不想也不需要⽴即处理消息。消息队列提供了异步处理机制,允许⽤户把⼀个消息放⼊ 队列,但并不⽴即处理它。想向队列中放⼊多少消息就放多少,然后在需要的时候再去处理它们。 3.kafka的架构 producer(push) broker(kafka) consumer(pull) zookeeper cluster 4.kafka的拓扑结构 5.kafka的相关概念 1.producer:   消息⽣产者,发布消息到 kafka 集群的终端或服务。 2.broker:   kafka 集群中安装Kafka的服务器。 3.topic:   每条发布到 kafka 集群的消息属于的类别,即 kafka 是⾯向 topic 的(相当于数据库中的表) 4.partition:   partition 是物理上的概念,每个 topic 包含⼀个或多个 partition。kafka 分配的单位是 partition。 5.consumer:   从 kafka 集群中消费消息的终端或服务。 6.Consumer group:   high-level consumer API 中,每个 consumer 都属于⼀个 consumer group,每条消息只能被 consumer group 中的⼀个 Consumer 消费,但可以被多个 consumer group 消费。 7.replica:   partition 的副本,保障 partition 的⾼可⽤。 8.leader:   replica 中的⼀个⻆⾊, producer 和 consumer 只跟 leader 交互。 9.follower:   replica 中的⼀个⻆⾊,从 leader 中复制数据。 10.zookeeper:   kafka 通过 zookeeper 来存储集群的 meta 信息 6.kafka的安装 安装Kafka集群 安装zookeeper 安装Kafka集群 上传Kafka安装包 解压 修改配置⽂件 cd kafka config vi servier.properties #指定broker的id broker.id=1 #数据存储的⽬录 log.dirs=/data/kafka #指定zk地址 zookeeper.connect=linux01:2181,linux02:2181,linux03:2181 //#可以删除topic的数据(⽣成环境不⽤配置) //#delete.topic.enable=true(不用配) 将配置好的kafka拷⻉的其他节点 scp -r kafka… linux02: P W D ( 大 写 ) ( l i n u x 03 : PWD(大写)(linux03: PWD()(linux03:PWD) 修改其他节点Kafka的broker.id(1->2,1->3) /opt/apps/zookeeper-3.4.13/bin/zkServer.sh start /opt/apps/zookeeper-3.4.13/bin/zkServer.sh status /opt/apps/kafka_2.12-2.4.1/bin/kafka-server-start.sh -daemon(后台) /opt/apps/kafka_2.12-2.4.1/config/server.properties(jps) 查看Kafka的topic /opt/apps/kafka_2.12-2.4.1/bin/kafka-topics.sh --list --zookeeper localhost:2181 创建topic /opt/apps/kafka_2.12-2.4.1/bin/kafka-topics.sh --zookeeper localhost:2181 --create – topic wordcount --replication-factor 3 --partitions 3 启动命令⾏⼀个⽣产者 /opt/apps/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list linux01:9092,linux02:9092,linux03:9092 --topic wordcount 启动⼀个命令⾏消费者 /opt/apps/kafka_2.12-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --topic wordcount --from-beginning –from-beginning 消费以前产⽣的所有数据,如果不加,就是消费消费者启动后产⽣的数据 删除topic /opt/apps/kafka_2.12-2.4.1/bin/kafka-topics.sh --delete --topic wordcount --zookeeper localhost:2181 查看topic详细信息 /opt/apps/kafka_2.12-2.4.1/bin/kafka-topics.sh --zookeeper localhost:2181 --describe – topic wordcount 查看某个topic的偏移量 /opt/apps/kafka_2.12-2.4.1/bin/kafka-console-consumer.sh --topic __consumer_offsets – bootstrap-server linux01:9092,linux02:9092,linux03:9092 – formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter” – consumer.config /opt/apps/kafka_2.12-2.4.1/config/consumer.properties --from-beginning 7.kafka的scalaAPI

    package cn._51doit.kafak.clients import java.util.{Properties, UUID} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer object ProducerDemo { def main(args: Array[String]): Unit = { // 1 配置参数 val props = new Properties() // 连接kafka节点 props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092") //指定key序列化方式 props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") //指定value序列化方式 props.setProperty("value.serializer", classOf[StringSerializer].getName) // 两种写法都行 val topic = "wordcount" // 2 kafka的生产者 val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props) for (i <- 1026 to 1325) { // 3 封装的对象 //将数据发送到指定的分区编号 //val record = new ProducerRecord[String, String](topic, 1, null,"myvalue:"+i) //val partitionNum = i % 3 // 指定数据均匀写入4个分区中 //val record = new ProducerRecord[String, String](topic, partitionNum, null,"myvalue:"+i) //不指定分区编号,指定key, 分区编号 = key.hasacode % 4 //相同key的数据一定会到kafka的统一个分区,同一个分区中可以有多个key的数据 //val record = new ProducerRecord[String, String](topic , "test1","myvalue:"+i) //根据key的hashcode值模除以topic分区的数量,返回一个分区编号 //val record = new ProducerRecord[String, String](topic , UUID.randomUUID().toString ,"myvalue:"+i) //没有指定Key和分区,默认的策略就是轮询,将数据均匀写入多个分区中 val record = new ProducerRecord[String, String](topic,"value-" + i) // 4 发送消息 producer.send(record) } println("message send success") // 释放资源 producer.close() } } package cn._51doit.kafak.clients import java.time.Duration import java.util import java.util.Properties import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer} import org.apache.kafka.common.serialization.StringDeserializer object ConsumerDemo { def main(args: Array[String]): Unit = { // 1 配置参数 val props = new Properties() //从哪些broker消费数据 props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092") // 反序列化的参数 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.setProperty("value.deserializer",classOf[StringDeserializer].getName) // 指定group.id props.setProperty("group.id","g1025") // 指定消费的offset从哪里开始 //earliest:从头开始 --from-beginning //latest:从消费者启动之后 props.setProperty("auto.offset.reset","earliest") //[latest, earliest, none] // 是否自动提交偏移量 offset // enable.auto.commit 默认值就是true【5秒钟更新一次】,消费者定期会更新偏移量 groupid,topic,parition -> offset props.setProperty("enable.auto.commit","true") // kafka自动维护偏移量 手动维护偏移量 //enable.auto.commit 5000 // 2 消费者的实例对象 val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props) // 订阅 参数类型 java的集合 val topic: util.List[String] = java.util.Arrays.asList("wordcount") // 3 订阅主题 consumer.subscribe(topic) while (true){ // 4 拉取数据 val msgs: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(2000)) import scala.collection.JavaConverters._ for(cr <- msgs.asScala){ //ConsumerRecord[String, String] println(cr) } } //consumer.close() } }
    Processed: 0.017, SQL: 9