初识kafka及单节点的安装

    技术2026-04-11  7

    初识kafka

    消息系统分类消息系统使用场景简介单节点安装

    消息系统分类

    1.Peer-to-Peer(点对点) 一般都基于Pull或Polling接受消息;发送到队列中的消息被一个而且仅仅一个接受者所接受,即使有多个接收者在同一个队列中侦听消息;既支持异步“即发即弃”的消息传送方式,也支持同步请求/应答传送方式。 2.发布/订阅 发布一个消息主题消息,可被多个订阅者所接收;发布/订阅即可基于Push(推送)消费数据,也可基于pull(拉取)或者polling(轮询)消费数据;解耦能力比P2P模型强。

    消息系统使用场景

    1.解耦:各系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在 2.冗余:部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险 3.扩展:消息系统是统一的数据接口,各系统可独立扩展 4.峰值处理能力:消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求 5.可恢复性:系统中部分组件失效不会影响整个系统,它回复后仍然可从消息系统中获取并处理数据 6.异步通信:在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候在处理

    简介

    Apache Kafka是一个分布式流处理平台,版本号:scala版本号+kafka版本号 流处理平台特性 1.发布和订阅流式记录 2.存储流式记录,有较好的容错性 3.可以在流式记录产生时就进行处理 适用场景 1.构造实时流数据管道,它可以在系统或应用之间可靠的获取数据 2.构建实时流式应用程序,对这些流数据进行转换或者影响 概念 1.kafka作为一个集群,运行在一台或多台服务器上 2.kafka通过topic对存储的流数据进行分类 3.每条记录包含一个key,一个value和一个timestamp(时间戳)

    核心 1.The producer API允许一个应用程序订阅一个或多个topic,并且对发布给他们的流式数据进行处理 2.The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。 3.The Streams API允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换 4.The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。 架构图 Topic和日志 1.Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。 2.对于每一个topic, Kafka集群都会维持一个分区日志

    每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录 Kafka 集群保留所有发布的记录—无论他们是否已被消费—并通过一个可配置的参数——保留期限来控制. 举个例子, 如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。Kafka的性能和数据大小无关,所以长时间存储数据没有什么问题 事实上,在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从"现在"开始消费 分布式 1.日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。每一个分区都会在已配置的服务器上进行备份,确保容错性. 2.每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。每台 server 都会成为某些分区的 leader 和某些分区的 follower,因此集群的负载是平衡的。

    单节点安装

    1.因为node1,2,3安装了zk集群,所以在node4进行单节点运行 2.上传kafka_2.11-2.0.0.zip到node4的usr/local下,解压unzip kafka_2.11-2.0.0.zip,如果解压不了,可能是你没有安装unzip,使用这个命令安装yum install -y unzip zip,重新解压成功后,重命名:mv kafka_2.11-2.0.0 kafka211200 3.配置环境变量: vi /etc/profile

    export KAFKA_HOME=/usr/local/kafka211200 export PATH=$PATH:${CATALINA_BASE}/bin:$KAFKA_HOME/bin

    退出保存

    source /etc/profile

    3.配置kafka的server.properties文件 cd kafka211200/config/

    vi server.properties

    只需要保证如下配置

    broker.id=0 log.dirs=/opt/kafka-data zookeeper.connect=node1:2181,node2:2181,node3:2181

    如果你的node4安装了zk,则可以

    zookeeper.connect=localhost:2181

    jps 4.启动 启动一个其他节点的zk服务器,然后就可以启动kafka服务器了

    bin/kafka-server-start.sh config/server.properties &

    检测以上服务是否启动:

    lsof -i:9092 lsof -i:2181

    安装 lsof 命令

    yum install lsof

    创建一个topic

    bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --replication-factor 1 --partitions 1 --topic test 创建一个名为“test”的topic,它有一个分区和一个副本 bin/kafka-topics.sh --list --zookeeper node1:2181,node2:2181,node3:2181 运行list(列表)命令来查看所有的topic bin/kafka-topics.sh --describe --zookeeper node1:2181,node2:2181,node3:2181 --topic test 运行list(列表)命令来查看这个topic

    4.发送一些消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 运行 producer,然后在控制台输入一些消息以发送到服务器

    5。启动一个 consumer( 注意这里 旧版本用法 0.1 以上已经不能用 zookeeper了)

    bin/kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test --from-beginning 命令行consumer(消费者),将消息转储到标准输出

    6.将上述命令在不同的终端中运行,那么现在就可以将消息输入到生产者终端中,并将它们在消费终端中显示出来 7.关闭服务:

    sh kafka-server-stop.sh

    第一次发博客,如有不对,请指正,下次将安装kafka集群,因为版本兼容性原因,建议kafka211200使用zk336的版本

    Processed: 0.010, SQL: 9