python kafka写入.net平台消费 延迟5秒问题解决

    技术2022-07-13  75

    测试环境,使用python的pykafka写入消息数据,.net Confluent.Kafka进行消费,中间发现,间隔一秒发送会延迟5秒做批处理。

    python代码:

    from pykafka import KafkaClient import datetime import threading import time def producer(): client = KafkaClient(hosts="172.26.1.22:9092", broker_version='2.4') topicdocu = client.topics["251z_37"] producer = **topicdocu.get_producer()** for num in range(1, 20): time.sleep(1) producer.produce(bytes(str(num), encoding='utf-8'), timestamp=datetime.datetime.now()) print(num) if __name__ == '__main__': producer()

    .net 消费代码:

    var consumerConfig = new ConsumerConfig { BootstrapServers = "172.26.1.22:9092", AutoOffsetReset = AutoOffsetReset.Earliest, GroupId = "node", EnableAutoCommit = true, }; var kafkaBuilder = new ConsumerBuilder<string, string>(consumerConfig); using (var consumer = kafkaBuilder.Build()) { CancellationTokenSource cts = new CancellationTokenSource(); consumer.Subscribe("251z_37"); while (true) { var kafkaResult = consumer.Consume(cts.Token); //var @event = JsonConvert.DeserializeObject<NodeStateMachine.Entity.NodeCustomerEntity>(kafkaResult.Value); Console.WriteLine(kafkaResult.Value + " " + DateTime.Now.ToString("HH:mm:ss")); } }

    问题效果: 可以看到,这种情况会5秒写入一次数据,消费端会五秒接收到一批数据。 问题原因:producer = topicdocu.get_producer(sync=True) 初始化时候,需要设置sync=True,需要传参为同步处理。之后就是一切正常了。

    Processed: 0.009, SQL: 9