p[]ython 爬虫] 多进程爬取动态加载数据的一种思路

    技术2022-07-11  122

    1.对于爬取静态网页: 思路就是直接拼接每个页面的url,得到一个超大的url 的list,然后一个一个的request.get(url),然后xpath解析内容,存入mongo或者用logger.info(json.dumps(data)) 存为csv文件

    对于动态加载的网页,一个思路就是用抓包工具分析其数据接口:我是用的fiddler,利用fiddler查看打开网页时数据加载的接口,一般是 {JSON} 格式,找到接口的地址,分析其接口中参数的规律,然后又是老操作(拼接出所有页面的数据的请求接口,然后再一个一个的遍历请求接口,得到json格式的数据),这个其实对于数据清洗来说很方便,不用做什么格式解析. 3.Fiddler 突然不能抓取请求了怎么办 3.1 Fiddler 打开后,浏览器不能打开网页,我的办法是卸载Fiddler ,重装,简单粗暴(因为Fiddler 只有6m大小,方便快捷) 迅雷下载: https://dl.softmgr.qq.com/original/Development/FiddlerSetup_5.0.20202.18177.exe 4.源代码: import json import multiprocessing import os import random import time import requests from pprint import pprint from fake_useragent import UserAgent from loguru import logger from motor.motor_asyncio import AsyncIOMotorClient # from redis import Redis from redis import Redis class ETh(): def redis_connect(self): # myredis=Redis(db=6) myredis=Redis(db=6) return myredis def get_all_url(self): for i in range(1, 62): url=f"https://explorer-web.api.btc.com/v1/eth/txns/0?page={i}&size=150" self.redis_connect().lpush("redis_connect_urls", url) print(f'push 第{i}页 to redis') self.redis_connect().close() def read_page(self, url): # 获取页面信息,发起请求 # headers 代理,避免ip被封 headers={ "Host": "explorer-web.api.btc.com", "Connection": "keep-alive", "Accept": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36", "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8", "Origin": "https://eth.btc.com", "Sec-Fetch-Site": "same-site", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", "Referer": "https://eth.btc.com/blockinfo/0", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "zh-CN,zh;q=0.9"} ua=UserAgent() headers["User-Agent"]=ua.random time.sleep(random.randint(0, 4)) res=requests.get(url, headers=headers) s=requests.session() s.keep_alive=False requests.adapters.DEFAULT_RETRIES=5 # 得到整个网页页面 page_text=json.loads(res.text) # type: str data_list=page_text["data"]["list"] # type: list # pprint(data_list) return data_list # 字符串转时间戳 def str_to_timestamp(self, str_time): timeArray=time.strptime(str_time, "%Y-%m-%d %H:%M:%S") timeStamp=int(time.mktime(timeArray)) return timeStamp # 清洗数据 def clean_data(self, url): eth_datas=[] data_list=self.read_page(url) time="2015-07-30 23:26:13" blocktime=self.str_to_timestamp(time) for data in data_list: """ {'amount': '800.000000000000000000', 'created_ts': 1435634773, 'fee': '0', 'gas_price': '0', 'gas_used': 0, 'id': 1, 'internal_tx': [], 'receiver_hash': '0x1cfcf7517f0c08459720942b647ad192aa9c8828', 'receiver_type': 0, 'sender_hash': '0xGENESIS000000000000000000000000000000000', 'sender_type': 0, 'status': '', 'tx_hash': 'GENESIS_1cfcf7517f0c08459720942b647ad192aa9c8828', 'tx_type': ''}, """ # todo """ { "_id" : "0xfffffffead5f0ed224bf9ef7019599e9bf00c1a0fa726f316125570c5787f2a3", "blockHash" : "0xc88ff379a6b6e85aa15fb5f0dbeb4606db191d5870920c804732a91fd9f0881f", "blockNumber" : 6694829, "contractAddress" : null, "cumulativeGasUsed" : 4225411, "from" : "0x52bc44d5378309ee2abf1539bf71de1b7d7be3b5", "gasUsed" : 21000, "logs" : [ ], "status" : "0x1", "to" : "0x7c20f2d7f49e5f4ade906d439d0e32240d32fc71", "transactionIndex" : 68, "gas" : 50000, "gasPrice" : 3000000000, "input" : "0x", "nonce" : "0x9b602d", "value" : 1000767082999010048, "blockTime" : 1542083404 }, """ data_dic={ "_id": data["tx_hash"], "blockHash": "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", "blockNumber": 0, "contractAddress": "null", "cumulativeGasUsed": 0, "gasUsed": data['gas_used'], "gasPrice": data['gas_price'], "logs": [], "status": data["status"], "value": data["amount"], "nonce": "0x0000000000000042", "from": data["sender_hash"], "to": data["receiver_hash"], "blockTime": blocktime } # pprint(data_dic) eth_datas.append(data_dic) # pprint(eth_datas) # pprint(len(eth_datas)) # 150 return eth_datas # 存入mongo库 def save_to_mongo(self, eth_datas): # 保存到mongo数据库中 db_url='localhost' db_port=27017 db_name="eth" db_collection="eth_0" # 建立连接 client=AsyncIOMotorClient(db_url, db_port) # 连接某个库名字 db=client[db_name][db_collection] db.insert_many([i for i in eth_datas]) print('inserted %d docs' % (len(eth_datas))) # 存入csv文件 def save_to_json(self, eth_datas): process_id=os.getpid() logger.add( # f"/backup/etc/receipt/receipts.json", # 加入进程号,避免数据存储时候错乱 f"D:/etc_check_data/receipt34555/receipt_{process_id}.json", level="INFO", format="{message}", rotation="1024 MB", enqueue=True ) for data in eth_datas: logger.info(json.dumps(data)) # 获取所有eth_block_0 tx数据 def get_all_eth_data(n): eth=ETh() while True: try: net=eth.redis_connect().rpop('redis_connect_urls') if net is None: break url=bytes.decode(net) eth_datas=eth.clean_data(url) logger.debug(f" process {n + 1} 开始爬取第 {url.split('page=')[1].split('&size')[0]} 页 {url}") # 存mongo eth.save_to_mongo(eth_datas) # 存为json # eth.save_to_json(eth_datas) except Exception as e: logger.debug(e) if __name__ == '__main__': # ETh().get_all_eth_data() # 多进程 # 生成url连接存入redis ETh().get_all_url() # 爬取并存入 process_count=16 pool=multiprocessing.Pool(process_count) for i in range(process_count): pool.apply_async(get_all_eth_data, (i,)) pool.close() pool.join()
    Processed: 0.009, SQL: 9