1.笨栗子就是对多进程中调用协程, pool.apply_async(asyncio.get_event_loop().run_until_complete(Url().save_url_to_redis()), (i,)) 以及多进程和协程之间的关系:
pool.apply_async(asyncio.get_event_loop().run_until_complete(Url().save_url_to_redis()), (i,)) # 多进程调用协程 ,将协程函数重复执行三次,在这里的话就将 url_count=20000 重复执行存入redis 3次,最后就导致总共存入的条数就是 url_count=20000 的进程倍数总结: 多进程不要将要存入或者写入的数据放在函数里面,要结合redis 做队列来分发任务,不然任务就重复了 # 协程: 也是一样的,要将参数放在redis,rpop弹出一个参数给一个协程或者进程去消费 # -*- coding: utf-8 -*- """ @Time : 2020/6/30 15:54 @Athor : LinXiao @功能 : """ # ------------------------------ import asyncio import multiprocessing import timeit from pprint import pprint import aioredis from loguru import logger from redis import Redis class Url(): def __int__(self): self.do_conuts=20 # self.session_counts=2 # self.url_count=25 async def redis_connect(self): self.host="127.0.0.1" self.port="6379" # self.password = "", self.db=6 try: self.redis=await aioredis.create_redis_pool( (self.host, self.port), # password=self.password, db=self.db ) logger.info(f"redis connection successfully") return True except Exception as e: logger.error(f"redis connection error: {e.args}") raise e async def close(self): self.redis.close() await self.redis.wait_closed() async def save_url_to_redis_single(self): # 单协程 stop_flag=False await self.redis_connect() url_count=20000 while True: for i in range(1, url_count + 1): url=f"https://explorer-web.api.btc.com/v1/eth/txns/0?page={i}&size=150" await self.redis.lpush("redis_connect_urls", url) logger.info(f'push {i} to redis') if i == url_count: await self.close() break await self.close() break async def save_url_to_redis_multi(self, n): # 多协程 stop_flag=False await self.redis_connect() url_count=20000 while True: for i in range(1, url_count + 1): url=f"https://explorer-web.api.btc.com/v1/eth/txns/0?page={i}&size=150" await self.redis.lpush("redis_connect_urls", url) logger.info(f'push {i} to redis') logger.info(f"task No.{n} 第{i}页 to redis") if i == url_count: await self.close() break await self.close() break async def multiasyico_test(self,n): for i in range(3): pprint(i) print("-----------------------------------------------------------------------------------------------------------") async def start(self): await self.redis_connect() asy_count = 3 # 协程数 tasks=[self.multiasyico_test(n + 1) for n in range(asy_count)] await asyncio.gather(*tasks) await self.close() def main(self): loop=asyncio.get_event_loop() loop.run_until_complete(self.start()) # 多协程 # loop.run_until_complete(self.save_url_to_redis()) loop.close() def save_url_to_redis_2(): redis=Redis(db=9) url_count=20000 # while True: for i in range(1, url_count + 1): url=f"https://explorer-web.api.btc.com/v1/eth/txns/0?page={i}&size=150" redis.lpush("redis_connect_urls", url) logger.info(f'push {i} to redis') # logger.info(f"task No.{} 第{i}页 to redis") # if i == url_count: # break # break def multiPro_test(): for i in range(10): pprint(i) print("-----------------------------------------------------------------------------------------------------------") if __name__ == '__main__': # 在这里的话就将 url_count=20000 重复执行存入redis 3次,最后就导致总共存入的条数就是 url_count=20000 的进程倍数 # 总结: 多进程不要将要存入或者写入的数据放在函数里面,要结合redis 做队列来分发任务,不然任务就重复了 # 协程: 也是一样的,要将参数放在redis,rpop弹出一个参数给一个协程或者进程去消费 start=timeit.default_timer() # # 单协程 # loop = asyncio.get_event_loop() # loop.run_until_complete(Url().save_url_to_redis_single()) # loop.close() # 多协程 Url().main() # 3.6690529 # 多进程 # process_count=3 # pool=multiprocessing.Pool(process_count) # for i in range(process_count): # # pool.apply_async(asyncio.get_event_loop().run_until_complete(Url().save_url_to_redis()), (i,)) # 多进程调用协程 ,将协程函数重复执行三次, # pool.apply_async(multiPro_test(), (i,)) # 多进程调用普通函数 # pool.close() # pool.join() # 多协程 和 多进程 写入 redis的时候 会将数据按照协程数和进程数加倍写入!!!! end=timeit.default_timer() print('Running time: %s Seconds' % (end - start))