Python协程实现生产者和消费者模型

    技术2022-07-13  84

    我们知道python最早的协程是基于yield生成器来实现, python3.5之后出现了asyncio和async/await关键字,有了更方便的协程方式.

    yield协程

    需要知道我们需要预先激活这个协程, send方法的参数成了当前yield表达式的结果, send方法返回生成器下次yield的值.

    #!usr/bin/python # -*- coding:utf8 -*- """ python协程实现生产者和消费者模型 generator.send(value): Resumes the execution and "sends" a value into the generator function. The value argument becomes the result of the current yield expression. The send() method returns the next value yields by the generator, or raises StopIteration if the generator exits without yielding another value . """ def consumer(): r = 'test' while True: n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) r = '200 OK' def produce(c): # c.send(None) print(next(c)) n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c)
    async/await实现生产者和消费者模型
    #!usr/bin/python # -*- coding:utf8 -*- import time import random import asyncio async def consumer(queue, id): while True: val = await queue.get() print('{} get a val: {}'.format(id, val)) await asyncio.sleep(1) async def producer(queue, id): for i in range(5): val = random.randint(1, 10) await queue.put(val) print('{} put a val: {}'.format(id, val)) await asyncio.sleep(1) async def main(): queue = asyncio.Queue() consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1')) consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2')) producer_1 = asyncio.create_task(producer(queue, 'producer_1')) producer_2 = asyncio.create_task(producer(queue, 'producer_2')) await asyncio.sleep(10) consumer_1.cancel() consumer_2.cancel() await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True) start = time.perf_counter() asyncio.run(main()) end = time.perf_counter() print(end - start)

    关于线程的实现方式可以参考我的一篇博客线程实现消费者和生产者模型

    Processed: 0.009, SQL: 9