Python asyncio

    技术2022-07-13  97

    多线程的局限性:

    多线程运行过程容易被打断,因此有可能出现race condition的情况线程切换本身存在一定的损耗,线程数不能无限增加

      asyncio 正如官方文档所说, asyncio-Asynchronous I/O, event loop, coroutines and tasks. This module provides infrastructure for writing single-threaded concurrent code using coroutines, multiplexing I/O.是一个异步高并发的模块

     async/await是python3.5用于定义协程的关键字,async定义一个协程, await用于挂起阻塞的异步调用接口 官方文档  Asyncio是单线程的,只有一个主线程,但是可以进行多个不同的(task),这里的任务 ,就是特殊的future对象. 这些不同的任务,被一个叫做event loop的对象所控制

    version >= 3.7

    demo1.py

    import time import asyncio async def crawl_page(url): print('crawling {}'.format(url)) sleep_time = int(url.split('_')[-1]) await asyncio.sleep(sleep_time) print('OK {}'.format(url)) async def main(urls): for url in urls: # await是同步调用, crawl_page(url)在当前的调用结束之前, 是不会触发下一次调用的 await crawl_page(url) start = time.perf_counter() asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4'])) end = time.perf_counter() print(end - start)

    demo2.py

    import time import asyncio async def crawl_page(url): print('crawling {}'.format(url)) sleep_time = int(url.split('_')[-1]) await asyncio.sleep(sleep_time) print('OK {}'.format(url)) async def main(urls): tasks = [asyncio.create_task(crawl_page(url)) for url in urls] # *tasks解包列表, 将列表变成了函数的参数 await asyncio.gather(*tasks) # for task in tasks: # await task start = time.perf_counter() asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4'])) end = time.perf_counter() print(end - start)

    demo3.py

    #!usr/bin/python # -*- coding:utf8 -*- import time import asyncio async def worker_1(): print('worker_1 start') await asyncio.sleep(1) print('worker_1 done') async def worker_2(): print('worker_2 start') await asyncio.sleep(2) print('worker_2 done') async def main(): task1 = asyncio.create_task(worker_1()) task2 = asyncio.create_task(worker_2()) print('before await') await task1 print('awaited worker_1') await task2 print('awaited worker_2') start = time.perf_counter() asyncio.run(main()) end = time.perf_counter() print(end - start)

    分析:

    1. asyncio.run(main()),程序进入 main() 函数,事件循环开启; 2. task1 和 task2 任务被创建,并进入事件循环等待运行;运行到 print,输出 'before await'; 3. await task1 执行,用户选择从当前的主任务中切出,事件调度器开始调度 worker_1; 4. worker_1 开始运行,运行 print 输出'worker_1 start',然后运行到 await asyncio.sleep(1), 从当前任务切出,事件调度器开始调度 worker_2; 5. worker_2 开始运行,运行 print 输出 'worker_2 start',然后运行 await asyncio.sleep(2) 从当前任务切出; 6. 以上所有事件的运行时间,都应该在 1ms 到 10ms 之间,甚至可能更短, 事件调度器从这个时候开始暂停调度; 7. 一秒钟后,worker_1 的 sleep 完成,事件调度器将控制权重新传给 task_1, 输出 'worker_1 done',task_1 完成任务,从事件循环中退出; 8. await task1 完成,事件调度器将控制器传给主任务,输出 'awaited worker_1', ·然后在 await task2 处继续等待; 9. 两秒钟后,worker_2 的 sleep 完成,事件调度器将控制权重新传给 task_2, 输出 'worker_2 done',task_2 完成任务,从事件循环中退出; 10. 主任务输出 'awaited worker_2',协程全任务结束,事件循环结束。

    demo4.py

    #!usr/bin/python # -*- coding:utf8 -*- import time import asyncio async def worker_1(): await asyncio.sleep(1) return 1 async def worker_2(): await asyncio.sleep(2) return 2 / 0 async def worker_3(): await asyncio.sleep(3) return 3 async def main(): task_1 = asyncio.create_task(worker_1()) task_2 = asyncio.create_task(worker_2()) task_3 = asyncio.create_task(worker_3()) await asyncio.sleep(2) task_3.cancel() res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True) print(res) start = time.perf_counter() asyncio.run(main()) end = time.perf_counter() print(end - start) # [1, ZeroDivisionError('division by zero'), CancelledError()]

    协程实现消费者生产者模型.py

    #!usr/bin/python # -*- coding:utf8 -*- """ 协程和多线程的区别: 1. 协程为单线程 2. 协程由用户决定,在哪些地方交出控制权,切换到下一个任务 """ import time import random import asyncio async def consumer(queue, id): while True: val = await queue.get() print('{} get a val: {}'.format(id, val)) await asyncio.sleep(1) async def producer(queue, id): for i in range(5): val = random.randint(1, 10) await queue.put(val) print('{} put a val: {}'.format(id, val)) await asyncio.sleep(1) async def main(): queue = asyncio.Queue() consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1')) consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2')) producer_1 = asyncio.create_task(producer(queue, 'producer_1')) producer_2 = asyncio.create_task(producer(queue, 'producer_2')) await asyncio.sleep(10) consumer_1.cancel() consumer_2.cancel() await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True) start = time.perf_counter() asyncio.run(main()) end = time.perf_counter() print(end - start)
    多进程/多线程/协程/使用场景

    如果是 I/O bound,并且 I/O 操作很慢,需要很多任务 / 线程协同实现, 那么使用 Asyncio 更合适。

    如果是 I/O bound,但是 I/O 操作很快,只需要有限数量的任务 / 线程, 那么使用多线程就可以了。

    如果是 CPU bound,则需要使用多进程来提高程序运行效率。

    Processed: 0.043, SQL: 9