python 脚本进行 MQ 消费过慢, rabbitmq 服务端开始累积队列的时候, 会发现 python 脚本跑满 CPU 假如消费队列中, 消息具有前后依赖关系,那么多线程并发 python 脚本可以保持这个依赖关系吗
例如 消息 ID 1 openstack NOVA 发送消息我要创建云主机 消息 ID 2 openstack CINDER 发送信息,我要创建一个 10GB 的云盘 消息 ID 3 把创建的云盘挂载到云主机中
上述问题纯属假设, 我们不考虑程序的重试机制, 超时机制, 回源机制 那么就是说,假如如果不顺序执行 MQ 任务, 可能会导致错误发生
本文只针对 customer 部分进行单进程, 多进程测试
当使用 centos7 进行程序开发时, 可以通过两种方法安装 pika 模块
yum install -y python2-pika-0.10.0-9.el7.noarch (安装 pika-0.9 版本) pip install pika (安装了 pika 1.1.0 版本)
进行 rabbitmq 消费时, 经常会遇到服务器中有消费堆积现象发生 当服务器端有消费堆积时候, 运行 customer python 进程 CPU 消耗会达到 100%
注意使用时参数却别
result = channel.queue_declare(exclusive=True) 可以不指定 QUEUE 参数 channel.basic_consume(callback, queue_name='fanout_cmdb3_mq_exchange', no_ack=False)解决了对服务端消费堆积问题 没有再发现 python 脚本 CPU 消耗 100% 问题
注意使用时参数却别
result = channel.queue_declare(exclusive=True, queue='fanout_cmdb3_mq_exchange') 新版需要指定 queue channel.basic_consume(queue_name, 参考源码后发现新版参数位置改变, 另外直接放置 queue_name, 不需要参数, no_ack 参数要替换为 auto_ack callback, auto_ack=False)目的
队列需要消费者返回消费确认信息 每次执行脚本都会随机生成 600 条以内的信息, 并向 RABBITMQ 入队 队列信息内容 {“id”: 0, “time”: “2020-07-02_13:35:44”}
参考脚本
#!/usr/bin/python # -*- coding:utf-8 -*- import pika import json import time import random parameters = pika.ConnectionParameters(host='rabbitmqserver', credentials=pika.PlainCredentials('mquser', 'mqpassword')) connection = pika.BlockingConnection(parameters=parameters) channel = connection.channel() # 'win-test' 为消息队列 # durable = true 用于持久性保留消息, 只要消费确认后才会删除消息 channel.queue_declare(queue='win-test' , durable=True) now = time.strftime("%Y-%m-%d_%H:%M:%S",time.localtime(time.time())) randomNum = random.randint(1,600) for i in range(10): data = {} data["id"] = i data["time"] = now message=json.dumps(data) channel.basic_publish(exchange = '',routing_key = 'win-test',body = message, properties=pika.BasicProperties(delivery_mode = 2)) # 必须确认消费 print(message) connection.close()目的
从 rabbitmq server 中获取消费队列 消费并完成脚本操作后进行消息确认 吧消费信息依次写入到本地文本中, 用于队列消费顺序确认
参考脚本
#!/usr/bin/python # -*- coding:utf-8 -*- import pika import sys import json def callback(ch, method, properties, body): try: jsonObject = json.loads(body) except : print("not json info") if len(jsonObject) > 0: writeFile(jsonObject) ch.basic_ack(delivery_tag = method.delivery_tag) <- 确认消费完成 def writeFile(data): with open("/tmp/test.txt", 'a') as fileObj: json.dump(data ,fileObj) fileObj.write("\n") if __name__ == '__main__': parameters = pika.ConnectionParameters(host='rabbitmq-server', credentials=pika.PlainCredentials('admin', 'admin')) connection = pika.BlockingConnection(parameters=parameters) channel = connection.channel() # fanout 使用的是广播模式 channel.exchange_declare(exchange='win-test',exchange_type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='', queue='win-test') channel.basic_consume(callback, queue='win-test', no_ack=False) # no_ack=False 确保需要 ACK 回应再确认消费 print(' [*] Waiting for logs. To exit press CTRL+C') channel.start_consuming()说明
脚本工作原理跟单线程一样 但使用了 5 个进程进行并发消费 用于验证消费过程中无法按照 queue 中队列顺序进行处理 单线程使用了 pika 0.9 模板, 多线程使用了 pika 1.1.0 模块,因此登录方式,callback 方法不一样
参考脚本
#!/usr/bin/python # -*- coding:utf-8 -*- import multiprocessing import time import pika import json def callback(ch, method, properties, body): print " [x] %r received %r" % (multiprocessing.current_process(), body,) try: jsonObject = json.loads(body) except : print("not json info") if len(jsonObject) > 0: writeFile(jsonObject) ch.basic_ack(delivery_tag = method.delivery_tag) def writeFile(data): with open("/tmp/test.txt", 'a') as fileObj: json.dump(data ,fileObj) fileObj.write("\n") def consume(): credentials = pika.PlainCredentials('admin', 'admin') parameters = pika.ConnectionParameters('mqserver', credentials=credentials, heartbeat=5) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='win-test',exchange_type='fanout') channel.queue_declare(queue='win-test', durable=True) channel.basic_consume("win-test", callback, auto_ack=False) print ' [*] Waiting for messages. To exit press CTRL+C' try: channel.start_consuming() except KeyboardInterrupt: pass workers = 5 pool = multiprocessing.Pool(processes=workers) for i in xrange(0, workers): pool.apply_async(consume) try: while True: continue except KeyboardInterrupt: print ' [*] Exiting...' pool.terminate() pool.join()参考输出文件可以知道, 当使用多进行连接 MQ , 无法保证队列获取先后顺序
{"id": 292, "time": "2020-07-01_17:47:57"} {"id": 297, "time": "2020-07-01_17:48:33"} {"id": 298, "time": "2020-07-01_17:48:33"} {"id": 293, "time": "2020-07-01_17:47:57"} {"id": 299, "time": "2020-07-01_17:48:33"} {"id": 294, "time": "2020-07-01_17:47:57"} {"id": 300, "time": "2020-07-01_17:48:33"} {"id": 278, "time": "2020-07-01_17:47:59"} {"id": 279, "time": "2020-07-01_17:47:59"} {"id": 301, "time": "2020-07-01_17:48:33"} {"id": 280, "time": "2020-07-01_17:47:59"} {"id": 302, "time": "2020-07-01_17:48:33"} {"id": 281, "time": "2020-07-01_17:47:59"} {"id": 303, "time": "2020-07-01_17:48:33"} {"id": 283, "time": "2020-07-01_17:50:01"} {"id": 304, "time": "2020-07-01_17:48:33"} {"id": 284, "time": "2020-07-01_17:50:01"} {"id": 305, "time": "2020-07-01_17:48:33"} {"id": 285, "time": "2020-07-01_17:50:01"}