kafka+zookeeper消息队列

    技术2022-07-10  145

    软件包提取码:u3s1 kafka: 起初是做采集日志的,和zookeeper一起才能做消息队列,可持久化。

    kafka broker(server): 消息中间件处理的节点 一个Kafka节点就是一个broker(server)topic = vhost -类消息 对消息进行分类主题-个类型-个主题topic可以有多个partition = queue消息的载体 partition也可以有多个partition是 一个有序的队列offset 偏移量 对分区的内容进行标识 先进先出不可逆消息的序列号,有了offset这个偏移量就能将数据写入到append.log中保存,两天的时间之后就会被删除,不论消息是否处理成功都会删除produce消息的生产者consumer 消息的消费者 consumer订阅消息的 每条消息只能有一的消费者

    Kafka的优点:

    kafka中append对消息的追加保证消息有先来后到的顺序理论上写入成功则成功,不会丢失数据,会在物理机上生成一个append.log文件分布式 容灾容量大,相对于其他消息列队,kafka的容量取决于硬盘的容量

    Kafka分布式

    leader(工作者)follow(备份)kafka集群的所有节点会去zookeeper上面注册信息 只有主能注册成功 生成临时节点信息,当主宕机从们会再次去注册找到- 个主这里只有leader在 工作从是负责备份的,为了保证高可用所以都会搭建集群。

    Zookeeper: 键值对 用来存放meta信息(原始数据,最底层的数据)。还有Watch发现机制

    broker node registry:broker注册节点,会生成znode的临时节点保存信息broker topic registry:当一个zookeeper启动时会注册自己持有的topic和partition信息consumer and consumer group:主要用来做负载均衡consumer id registry:每个consumer都有唯一的id号,用来标记消费者信息consumer offset tracking:用来跟踪每个concumer消费的最大的offsetpartition owner registry:用来标记partiton被哪个consumer消费的 主机1192.168.1.1主机2192.168.1.2主机3192.168.1.3

    搭建zookeeper集群

    #主机1 [root@CentOS1 ~]# java -version #需要java环境 openjdk version "1.8.0_102" OpenJDK Runtime Environment (build 1.8.0_102-b14) OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode) [root@CentOS1 ~]# tar zxf zookeeper-3.3.6.tar.gz [root@CentOS1 ~]# mv zookeeper-3.3.6 /usr/local/zookeeper [root@CentOS1 ~]# cd /usr/local/zookeeper/ [root@CentOS1 zookeeper]# cd conf/ [root@CentOS1 conf]# cp zoo_sample.cfg zoo.cfg #复制默认的配置文件名字必须值zoo.cfg [root@CentOS1 conf]# vim zoo.cfg 2 tickTime=2000 #zookeeper各个节点发送心跳包的时间 毫秒 5 initLimit=10 # 8 syncLimit=5 #修改为 10 dataDir=/usr/local/zookeeper/data #数据存放位置 #添加以下内容 11 dataLogDir=/usr/local/zookeeper/datalog #日志存放位置 14 server.1=192.168.1.1:2888:3888 15 server.2=192.168.1.2:2888:3888 16 server.3=192.168.1.3:2888:3888 [root@CentOS1 conf]# cd /usr/local/zookeeper/ [root@CentOS1 zookeeper]# mkdir data [root@CentOS1 zookeeper]# mkdir datalog [root@CentOS1 zookeeper]# cd data [root@CentOS1 data]# echo "1" > myid [root@CentOS1 data]# cat myid 1 [root@CentOS1 data]# scp -r /usr/local/zookeeper/ root@192.168.1.2:/usr/local/zookeeper [root@CentOS1 data]# scp -r /usr/local/zookeeper/ root@192.168.1.3:/usr/local/zookeeper #主机2 [root@CentOS2 ~]# echo "2" >/usr/local/zookeeper/data/myid [root@CentOS2 ~]# cat /usr/local/zookeeper/data/myid 2 #主机3 [root@Centos3 ~]# echo "3" >/usr/local/zookeeper/data/myid [root@Centos3 ~]# cat /usr/local/zookeeper/data/myid 3 #三台主机都要启动服务 cd /usr/local/zookeeper/bin/ ./zkServer.sh start #查看zookeeper的状态 #主机1 [root@CentOS1 bin]# ./zkServer.sh status JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Mode: follower #第一个follwer #主机2 [root@CentOS2 bin]# ./zkServer.sh status JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Mode: leader #只能有一个leader并且是随机的 #主机3 [root@Centos3 bin]# ./zkServer.sh status JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Mode: follower #第二个follower

    安装Kafka

    #主机1 [root@CentOS1 ~]# tar zxf kafka_2.11-1.0.1.tgz [root@CentOS1 ~]# mv kafka_2.11-1.0.1 /usr/local/kafka [root@CentOS1 ~]# cd /usr/local/kafka/config/ [root@CentOS1 config]# vim server.properties #更改为以下内容 21 broker.id=1 #节点标识 31 listeners=PLAINTEXT://192.168.1.1:9092 #监听的端口 60 log.dirs=/usr/local/kafka/data #数据存放位置 103 log.retention.hours=168 #分区文件中消息保存的时间 单位s 104 message.max.bytes=1024000 #消息传递的最大 默认字节 105 default.replication.factor=2 #备份节点默认的个数 106 relica.fecth.max.bytes=1024000 #复制之间的最大 单位字节 126 zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 #连接zookeeper的IP和端口 127 num.rtition=1 #每个topic默认的分区个数 [root@CentOS1 config]# cd /usr/local/kafka/ [root@CentOS1 kafka]# mkdir data [root@CentOS1 kafka]# scp -r /usr/local/kafka/ root@192.168.1.2:/usr/local/kafka/ [root@CentOS1 kafka]# scp -r /usr/local/kafka/ root@192.168.1.3:/usr/local/kafka/ #主机2 [root@CentOS2 bin]# vim /usr/local/kafka/config/server.properties 21 broker.id=2 #节点标识 31 listeners=PLAINTEXT://192.168.1.2:9092 #监听端口 #主机3 [root@CentOS3 bin]# vim /usr/local/kafka/config/server.properties 21 broker.id=3 #节点标识 31 listeners=PLAINTEXT://192.168.1.3:9092 #监听端口 #三台主机启动Kafka服务 cd /usr/local/kafka/bin/ ./kafka-server-start.sh -daemon ../config/server.properties netstat -anput | grep 9092

    验证:

    #主机1 [root@CentOS1 bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.1:2181 --partitions 1 --replication-factor 2 --topic logs #参数释义; --create 创建 --zookeeper 192.1 68.2.100:2181 使用的zookeeper --partitions 1分区 --replication-factor2 备份 --topic logs 名字 #主机2 #模拟生产者 [root@CentOS2 bin]# ./kafka-console-producer.sh --broker-list 192.168.1.2:9092 --topic logs #主机1 模拟消费者 [root@CentOS1 bin]# ./kafka-console-consumer.sh --zookeeper 192.168.1.2:2181 --topic logs --from-beginning #去主机2发送消息,然后去主机1去查看
    Processed: 0.018, SQL: 9