PHP下kafka的实践

    技术2023-12-30  81

    kafka 转载:https://segmentfault.com/a/1190000015765348 简介

    Kafka 是一种高吞吐量的分布式发布订阅消息系统

    kafka角色必知

    producer:生产者。 consumer:消费者。 topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分类, 每一类的消息称之为一个主题(Topic)。 broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic), 并从Broker拉数据,从而消费这些已发布的消息。

    经典模型

    一个主题下的分区不能小于消费者数量,即一个主题下消费者数量不能大于分区属,大了就浪费了空闲了一个主题下的一个分区可以同时被不同消费组其中某一个消费者消费一个主题下的一个分区只能被同一个消费组的一个消费者消费

    clipboard.png 常用参数说明 request.required.acks

    Kafka producer的ack有3中机制,初始化producer时的producerconfig可以通过配置request.required.acks不同的值来实现。

    0:这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发生故障时某些数据会丢失,如leader已死,但producer并不知情,发出去的信息broker就收不到)。

    1:这意味着producer在leader已成功收到的数据并得到确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但尚未复制将失去了唯一的消息)。

    -1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。 此选项提供最好的耐久性,我们保证没有信息将丢失,只要至少一个同步副本保持存活。

    三种机制,性能依次递减 (producer吞吐量降低),数据健壮性则依次递增。

    auto.offset.reset

    earliest:自动将偏移重置为最早的偏移量latest:自动将偏移量重置为最新的偏移量(默认)none:如果consumer group没有发现先前的偏移量,则向consumer抛出异常。其他的参数:向consumer抛出异常(无效参数)

    kafka安装和简单测试 安装kafka(不需要安装,解包即可)

    官方下载地址:http://kafka.apache.org/downloads

    # wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz tar -xzf kafka_2.12-1.1.1.tgz cd kafka_2.12-1.1.0

    启动kafka server

    需先启动zookeeper

    -daemon 可启动后台守护模式

    bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties

    启动kafka客户端测试

    创建一个话题,test话题2个分区

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test Created topic "test".

    显示所有话题

    bin/kafka-topics.sh --list --zookeeper localhost:2181 test

    显示话题信息

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:2 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0

    启动一个生产者(输入消息)

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [等待输入自己的内容 出现>输入即可]

    i am a new msg ! i am a good msg ?

    启动一个消费者(等待消息)

    注意这里的–from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning [等待消息] i am a new msg ! i am a good msg ?

    安装kafka的php扩展

    先安装rdkfka库文件

    git clone https://github.com/edenhill/librdkafka.git cd librdkafka/ ./configure make sudo make install git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure make all -j 5 sudo make install vim [php]/php.ini extension=rdkafka.so

    php代码实践 生产者

    <?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); $rk = new RdKafka\Producer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $cf = new RdKafka\TopicConf(); // -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset // 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉 $cf->set('request.required.acks', 0); $topic = $rk->newTopic("test", $cf); $option = 'qkl'; for ($i = 0; $i < 20; $i++) { //RD_KAFKA_PARTITION_UA自动选择分区 //$option可选 $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option); } $len = $rk->getOutQLen(); while ($len > 0) { $len = $rk->getOutQLen(); var_dump($len); $rk->poll(50); }

    运行生产者

    php producer.php

    output

    int(20) int(20) int(20) int(20) int(0) # 你可以查看你刚才上面启动的消费者shell应该会输出消息 qkl . 0 qkl . 1 qkl . 2 qkl . 3 qkl . 4 qkl . 5 qkl . 6 qkl . 7 qkl . 8 qkl . 9 qkl . 10 qkl . 11 qkl . 12 qkl . 13 qkl . 14 qkl . 15 qkl . 16 qkl . 17 qkl . 18 qkl . 19

    Low Level 消费者

    <?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); //设置消费组 $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $topicConf = new RdKafka\TopicConf(); $topicConf->set('request.required.acks', 1); //在interval.ms的时间内自动提交确认、建议不要启动 //$topicConf->set('auto.commit.enable', 1); $topicConf->set('auto.commit.enable', 0); $topicConf->set('auto.commit.interval.ms', 100); // 设置offset的存储为file //$topicConf->set('offset.store.method', 'file'); // 设置offset的存储为broker $topicConf->set('offset.store.method', 'broker'); //$topicConf->set('offset.store.path', __DIR__); //smallest:简单理解为从头开始消费,其实等价于上面的 earliest //largest:简单理解为从最新的开始消费,其实等价于上面的 latest //$topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("test", $topicConf); // 参数1消费分区0 // RD_KAFKA_OFFSET_BEGINNING 重头开始消费 // RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费 // RD_KAFKA_OFFSET_END 最后一条消费 $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); //$topic->consumeStart(0, RD_KAFKA_OFFSET_END); // //$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { //参数1表示消费分区,这里是分区0 //参数2表示同步阻塞多久 $message = $topic->consume(0, 12 * 1000); if (is_null($message)) { sleep(1); echo "No more messages\n"; continue; } switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }

    High LEVEL消费者

    <?php /** * Created by PhpStorm. * User: qkl * Date: 2018/8/22 * Time: 17:58 */ $conf = new \RdKafka\Conf(); function rebalance(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { global $offset; switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign(); // $kafka->assign([new RdKafka\TopicPartition("qkl01", 0, 0)]); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } } // Set a rebalance callback to log partition assignments (optional) $conf->setRebalanceCb(function(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { rebalance($kafka, $err, $partitions); }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set('group.id', 'test-110-g100'); // Initial list of Kafka brokers $conf->set('metadata.broker.list', '192.168.216.122'); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('request.required.acks', -1); //在interval.ms的时间内自动提交确认、建议不要启动 $topicConf->set('auto.commit.enable', 0); //$topicConf->set('auto.commit.enable', 0); $topicConf->set('auto.commit.interval.ms', 100); // 设置offset的存储为file $topicConf->set('offset.store.method', 'file'); $topicConf->set('offset.store.path', __DIR__); // 设置offset的存储为broker // $topicConf->set('offset.store.method', 'broker'); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new \RdKafka\KafkaConsumer($conf); //$KafkaConsumerTopic = $consumer->newTopic('qkl01', $topicConf); // Subscribe to topic 'test' $consumer->subscribe(['qkl01']); echo "Waiting for partition assignment... (make take some time when\n"; echo "quickly re-joining the group after leaving it.)\n"; while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); // $consumer->commit($message); // $KafkaConsumerTopic->offsetStore(0, 20); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }

    消费组特别说明

    特别注意,High LEVEL消费者设置的消费组,kafka服务器才会记录, Low Level消费者设置的消费组,服务器不会记录

    具体查看消费组信息,你可以翻阅本篇文章 查看服务器元数据(topic/partition/broker)

    <?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./xx.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason); }); $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $allInfo = $rk->metadata(true, NULL, 60e3); $topics = $allInfo->getTopics(); echo rd_kafka_offset_tail(100); echo "--"; echo count($topics); echo "--"; foreach ($topics as $topic) { $topicName = $topic->getTopic(); if ($topicName == "__consumer_offsets") { continue ; } $partitions = $topic->getPartitions(); foreach ($partitions as $partition) { // $rf = new ReflectionClass(get_class($partition)); // foreach ($rf->getMethods() as $f) { // var_dump($f); // } // die(); $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId()); echo "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - "; echo "offset:" . ($topPartition->getOffset()) . PHP_EOL; } }

    如果需远端生产和消费

    vim config/server.properties advertised.listeners=PLAINTEXT://ip:9092

    ip 未你kafka的外网ip即可

    自带命令实践 尝试实践的kafka知识:

    创建话题 生产消息 消费消息 话题信息 获取消费组 获取消费组的offset

    自带的命令

    kafka安装目录的bin目录下

    * 代表我们会使用的脚本

    c

    onnect-distributed.sh kafka-log-dirs.sh kafka-streams-application-reset.sh connect-standalone.sh kafka-mirror-maker.sh kafka-topics.sh* kafka-acls.sh kafka-preferred-replica-election.sh kafka-verifiable-consumer.sh kafka-broker-api-versions.sh kafka-producer-perf-test.sh kafka-verifiable-producer.sh kafka-configs.sh kafka-reassign-partitions.sh trogdor.sh kafka-console-consumer.sh* kafka-replay-log-producer.sh windows kafka-console-producer.sh* kafka-replica-verification.sh zookeeper-security-migration.sh kafka-consumer-groups.sh* kafka-run-class.sh zookeeper-server-start.sh kafka-consumer-perf-test.sh kafka-server-start.sh zookeeper-server-stop.sh kafka-delegation-tokens.sh kafka-server-stop.sh zookeeper-shell.sh* kafka-delete-records.sh kafka-simple-consumer-shell.sh

    创建话题(kafka-topics.sh)

    创建1个分区1个副本的test话题,这里是副本其实可以理解为broker里至少拥有数量,must >=1

    –zookeeper localhost:2181 (kafka的默认端口:2181)

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    创建2个分区1个副本的test02话题

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test02

    列出所有话题

    bin/kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets test test02

    注意这里的__consumer_offsets是kafka默认创建的,用于存储kafka消费记录的话题,我们暂时不用理会

    列出具体话题的信息

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test02 Topic:test02 PartitionCount:2 ReplicationFactor:1 Configs: Topic: test02 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0

    从上面的显示 我们可以发现,第一句是显示总体信息,下面缩进显示的是分区信息

    消费者(kafka-console-consumer.sh)

    启动一个消费组消费,这里我们需要开启一个shell终端,因为会等待输出

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    等待输出

    生产者(kafka-console-consumer.sh)

    这里我们需要开启一个shell终端,因为会等待输入

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    待我们输入消息 等待出现>

    msg01 msg02 msg03

    #注意观察上面的消费者终端,自动输出了我们的消息 msg01 msg02 msg03

    查看消费组

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-25379 console-consumer-73410 console-consumer-27127 console-consumer-61887 console-consumer-61324

    这里我们再来起一个消费者再次输出(因为之前的消费者我们不知道最新的这次消费组id是多少)

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-25379 console-consumer-73410 console-consumer-27127 console-consumer-39416 # 这个是我们新起的消费组id,下面我们根据这个消费组来做实践 console-consumer-61887 console-consumer-61324

    查看消费组的具体信息

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-39416 Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 9 9 0 consumer-1-94afec29-5042-4108-8619-ba94812f10a8 /127.0.0.1 consumer-1

    查看离线的console-consumer-25379消费组

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-25379 Note: This will not show information about old Zookeeper-based consumers. Consumer group 'console-consumer-25379' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 5 9 4 - - -

    查看离线的console-consumer-27127消费组

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-27127 Note: This will not show information about old Zookeeper-based consumers. Consumer group 'console-consumer-27127' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 6 9 3 - - -

    这里我们发现我们每次都会生成一个消费组一个消费者,不方便我们做一个消费组多个消费者测试

    启动一个消费组多个消费者

    cp config/consumer.properties config/consumer_g1.properties vim config/consumer_g1.properties

    修改消费组名

    group.id=test-consumer-group => group.id=test-g1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer_g1.properties msg01 msg02 msg03 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer_g1.properties

    无输出(因为test话题我们只有一个分区,一个分区只能被同个消费组下面的一个消费者消费,所以这个就闲置了)

    查看消费组

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-25379 console-consumer-73410 console-consumer-27127 console-consumer-39416 test-g1 console-consumer-61887 console-consumer-61324

    查看test-g1消费组

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-g1 Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 9 9 0 consumer-1-43922b0c-34e0-47fe-b597-984c9e6a2884 /127.0.0.1 consumer-1

    下面我们再开启test02的2个消费组看看情况

    我们再为test02话题启动2个test-g1消费组的消费者

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning --consumer.config config/consumer_g1.properties bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning --consumer.config config/consumer_g1.properties

    查看test-g1消费组

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-g1 Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test02 0 0 0 0 consumer-1-90ccc960-557a-46ab-a799-58c35ee670d8 /127.0.0.1 consumer-1 test02 1 0 0 0 consumer-1-c6d79321-8212-4594-8a11-353f684c54fc /127.0.0.1 consumer-1 test 0 9 9 0 consumer-1-7a2706f7-a206-4c29-ae1f-3726ad21af96 /127.0.0.1 consumer-1

    这里你可以在话题test产生一个消息看下,然后再test02再多产生几条消息看下,你会发现test02的2个消费组几乎是负载的消费了消息

    消费组的一些信息命令

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose Note: This will not show information about old Zookeeper-based consumers. CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer-1-b8eeb8bc-6aa6-439b-84d8-0fcbed4a2899 /127.0.0.1 consumer-1 1 test02(1) consumer-1-7109f789-a5cf-4862-94d0-976146dbc769 /127.0.0.1 consumer-1 1 test(0) consumer-1-90ccc960-557a-46ab-a799-58c35ee670d8 /127.0.0.1 consumer-1 1 test02(0) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state Note: This will not show information about old Zookeeper-based consumers. COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS localhost:9092 (0) range Stable 3 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group console-consumer-25379

    zookeeper(zookeeper-shell.sh) 连接

    bin/zookeeper-shell.sh 127.0.0.1:2181 Connecting to 127.0.0.1:2181 Welcome to ZooKeeper! JLine support is disabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null

    常用命令

    ls / [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
    Processed: 0.009, SQL: 9