爬虫常用代码(符合个人习惯)

    技术2022-07-16  91

    爬虫常用代码:

    1 时间戳转字符串时间:
    import time timeStamp = 1557502800 otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timeStamp)) print(otherStyleTime) # 获取当前年月日 import datetime print(datetime.datetime.now().strftime("%Y-%m-%d")) print(type(datetime.datetime.now().strftime("%Y-%m-%d")))
    2 python执行JS代码
    import execjs with open('data.js', 'r', encoding='utf-8') as f: js1 = f.read() ecjs = execjs.compile(js1) data1 = ecjs.call("latNe2", 55.025635)
    3 运行scrapy爬虫的文件:
    # RunSpider.py from scrapy.cmdline import execute import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__))) execute(["scrapy", "crawl", "ship_info", "-a", "ship_name=TIANLIGONG007"])
    4 向scrapy的爬虫文件中传递参数:
    def __init__(self, ship_name=None, *args, **kwargs): super(ShipInfoSpider, self).__init__(*args, **kwargs) print("传进爬虫的参数:", ship_name) self.start_urls = ["http://searchv3.shipxy.com/shipdata/search3.ashx?f=srch&kw={}".format(ship_name)]
    5 scrapy_redis爬虫完成半个小时自动停止的扩展extensions.py
    # -*- coding: utf-8 -*- # Define here the models for your scraped Extensions import logging import time from scrapy import signals from scrapy.exceptions import NotConfigured logger = logging.getLogger(__name__) class RedisSpiderSmartIdleClosedExensions(object): def __init__(self, idle_number, crawler): self.crawler = crawler self.idle_number = idle_number self.idle_list = [] self.idle_count = 0 @classmethod def from_crawler(cls, crawler): # 首先检查是否应该启用和提高扩展 # 否则不配置 if not crawler.settings.getbool('MYEXT_ENABLED'): raise NotConfigured # 获取配置中的时间片个数,默认为360个,30分钟 idle_number = crawler.settings.getint('IDLE_NUMBER', 360) # 实例化扩展对象 ext = cls(idle_number, crawler) # 将扩展对象连接到信号, 将signals.spider_idle 与 spider_idle() 方法关联起来。 crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened) crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed) crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle) # return the extension object return ext def spider_opened(self, spider): logger.info("opened spider %s redis spider Idle, Continuous idle limit: %d", spider.name, self.idle_number) def spider_closed(self, spider): logger.info("closed spider %s, idle count %d , Continuous idle count %d", spider.name, self.idle_count, len(self.idle_list)) def spider_idle(self, spider): self.idle_count += 1 # 空闲计数 self.idle_list.append(time.time()) # 每次触发 spider_idle时,记录下触发时间戳 idle_list_len = len(self.idle_list) # 获取当前已经连续触发的次数 # 判断 当前触发时间与上次触发时间 之间的间隔是否大于5秒,如果大于5秒,说明redis 中还有key if idle_list_len > 2 and self.idle_list[-1] - self.idle_list[-2] > 6: self.idle_list = [self.idle_list[-1]] elif idle_list_len > self.idle_number: # 连续触发的次数达到配置次数后关闭爬虫 logger.info('\n continued idle number exceed {} Times' '\n meet the idle shutdown conditions, will close the reptile operation' '\n idle start time: {}, close spider time: {}'.format(self.idle_number, self.idle_list[0], self.idle_list[0])) # 执行关闭爬虫操作 self.crawler.engine.close_spider(spider, 'closespider_pagecount') # settings.py EXTENSIONS = { # 'scrapy.extensions.telnet.TelnetConsole': None, 'lincese.extensions.RedisSpiderSmartIdleClosedExensions': 500, }
    6 计算总页数:
    total_num = int(2001) if total_num % 100 == 0: # 整页数 max_page = total_num//100 else: max_page = total_num//100 + 1
    7 middleware.py加代理
    import datetime import random import time from scrapy import signals import base64 import sys PY3 = sys.version_info[0] >= 3 def base64ify(bytes_or_str): if PY3 and isinstance(bytes_or_str, str): input_bytes = bytes_or_str.encode('utf8') else: input_bytes = bytes_or_str output_bytes = base64.urlsafe_b64encode(input_bytes) if PY3: return output_bytes.decode('ascii') else: return output_bytes def process_request(self, request, spider): # Called for each request that goes through the downloader # middleware. # Must either: # - return None: continue processing this request # - or return a Response object # - or return a Request object # - or raise IgnoreRequest: process_exception() methods of # installed downloader middleware will be called # TODO 设置代理; proxyHost = "u4585.5.tn.16yun.cn" proxyPort = "6441" # 代理隧道验证信息 proxyUser = "16AHMXTY" proxyPass = "606070" request.meta['proxy'] = "http://{0}:{1}".format(proxyHost, proxyPort) # 添加验证头 encoded_user_pass = base64ify(proxyUser + ":" + proxyPass) request.headers['Proxy-Authorization'] = 'Basic ' + encoded_user_pass # 设置IP切换头(根据需求) tunnel = random.randint(1, 10000) request.headers['Proxy-Tunnel'] = str(tunnel) print("------设置了代理---{}---".format(random.random())) return None
    8 post请求中请求体是json数据时:
    formdata2 = '{"seDate":["",""],"stock":["%s"],"channelCode":["fixed_disc"],"bigCategoryId":["%s"],"pageSize":30,"pageNum":%s}' % (stock_code, cate, str(page)) url2 = "http://www.szse.cn/api/disc/announcement/annList?random="+str(random.random()) headers2 = { "Content-Type": "application/json", "Host": "www.szse.cn", "Origin": "http://www.szse.cn", "Proxy-Connection": "keep-alive", "Referer": "http://www.szse.cn/disclosure/listed/fixed/index.html", "User-Agent": "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36", "X-Request-Type": "ajax", "X-Requested-With": "XMLHttpRequest" } yield scrapy.Request(url=url2, method="POST", headers=headers2, body=formdata2, )
    9 pyppeteer报错:
    Q: File "/usr/local/lib/python3.6/dist-packages/pyppeteer/launcher.py", line 225, in get_ws_endpoint raise BrowserError('Browser closed unexpectedly:\n') pyppeteer errors BrowserError: Browser closed unexpectedly: A: centos: ubuntu: apt-get update apt --fix-broken install apt-get upgrade apt-get install gconf-service libasound2 libatk1.0-0 libatk-bridge2.0-0 libc6 libcairo2 libcups2 libdbus-1-3 libexpat1 libfontconfig1 libgcc1 libgconf-2-4 libgdk-pixbuf2.0-0 libglib2.0-0 libgtk-3-0 libnspr4 libpango-1.0-0 libpangocairo-1.0-0 libstdc++6 libx11-6 libx11-xcb1 libxcb1 libxcomposite1 libxcursor1 libxdamage1 libxext6 libxfixes3 libxi6 libxrandr2 libxrender1 libxss1 libxtst6 ca-certificates fonts-liberation libappindicator1 libnss3 lsb-release xdg-utils wget --fix-missing apt-get install -y libnss3
    10 生成时间戳10位和13位:
    # 生产13位: import time int(round(time.time() * 1000)) # 生成10位: import time int(time.time())
    11 pyppeteer代码案例:
    # 1 识别图片验证码[]: import datetime import json import re import pymysql from PIL import Image import asyncio from chaojiying import Chaojiying # 此处不用处理,正常。 import time from pyppeteer import launch from aip import AipOcr """ 你的 APPID AK SK """ APP_ID = 'XXXX' API_KEY = '####' SECRET_KEY = '@@@@@' # TODO David超级鹰账号 # CHAOJIYING_USERNAME = 'XXXX' # CHAOJIYING_PASSWORD = 'XXXX' # CHAOJIYING_SOFT_ID = 904174 # CHAOJIYING_KIND = 1004 # 1~4位英文数字,识别一次10题分,就是1分钱; class ShiXinPyppeteer(object): captchaId = "" pic_str = "" def __init__(self): self.headers = { "Cookie": "JSESSIONID=07402EC2C3A9250340A6AB38D6C75A1C; _gscu_15322769=888379770s984g60; _gscbrs_15322769=1; Hm_lvt_d59e2ad63d3a37c53453b996cb7f8d4e=1588837978; SESSION=2b347422-1b5a-44b7-896c-1ece475a31ae; Hm_lpvt_d59e2ad63d3a37c53453b996cb7f8d4e=1588922225; _gscs_15322769=t88916915iq0wxw60|pv:22", "User-Agent": "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36", } self.chaojiying = Chaojiying(CHAOJIYING_USERNAME, CHAOJIYING_PASSWORD, CHAOJIYING_SOFT_ID) self.client = AipOcr(APP_ID, API_KEY, SECRET_KEY) def __del__(self): # await page.close() # await browser.close() pass async def get_capture_text_captchaId(self): """本方法用于获取验证码 & 验证码对应的id, TODO 验证码存活时间不长,只有1,2分钟,所以数据量较多的情况下需要不断的调用识别; """ browser = await launch({'headless': True,'args': ['--no-sandbox', '--window-size=1920,1080','--disable-infobars']}) # 创建浏览器对象; page = await browser.newPage() # 新建页面 await page.setViewport({"width": 1920, "height": 1080}) url = "http://zxgk.court.gov.cn/shixin/" await page.goto(url) # 页面加载 # 设置浏览器UA await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)' ' Chrome/74.0.3729.169 Safari/537.36') # 解决 window.navigator.webdriver对 webdriver的检测;# 防止被识别,将webdriver设置为false await page.evaluate( '''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''') await page.waitFor(500) await page.type("#pCardNum", "52010319750211041X", {"delay": 30}) # 输入样例身份证号 retry_count = 0 while True: try: retry_count += 1 print("---第【{}】次尝试识别----".format(retry_count)) # 准备截图整个页面 await page.screenshot(path="captures/shixin_page.jpg") # 准备截图验证码: await self.get_capt("captures/shixin_page.jpg") await page.waitFor(100) # 获取构造api的必要参数captchaId captchaId_list = await page.xpath('//input[@id="captchaId"]') self.captchaId = await (await captchaId_list[0].getProperty("value")).jsonValue() # print("=====captchaId===:", self.captchaId) if retry_count > 1: self.pic_str = await self.get_capt_text() # TODO 超级鹰识别验证码 # self.pic_str = await self.get_content_by_baidu("captures/shixin_capt.jpg") # TODO 百度接口识别验证码 else: self.pic_str = "firs" # 先清空验证码框的内容: await page.evaluate('document.querySelector("#yzm").value=""') await page.type("#yzm", self.pic_str, {"delay": 100}) # 输入验证码 await page.waitFor(100) await page.screenshot(path="captures/shixin_check.jpg") search_button = await page.xpath('//button[@class="btn btn-zxgk btn-block "]') # print("查询按钮:", search_button) await page.waitFor(100) await search_button[0].click() await page.waitFor(100) success = await page.xpath('//div[@class="col-sm-2 alert alert-success mysetalert"]') # TODO 这是是否出现 【验证码正确!】的标志! await page.waitFor(50) if success or retry_count > 10: break except Exception as e: print("ERROR:", e) print("------------------------------识别通过了------------------------------") print("验证码文本:", self.pic_str, "验证码id:", self.captchaId) # 获取 await page.waitFor(500) cap_dic = {"pic_str": self.pic_str, "captchaId": self.captchaId} with open("captures/shixin_capture.json", "w", encoding='utf8') as wf: json.dump(cap_dic, wf) wf.close() await page.close() await browser.close() async def get_capt(self, pic_name): """传入一个图片,剪切出指定范围的图片""" try: img = Image.open(pic_name) # 图片尺寸 # img_size = img.size # h = img_size[1] # 图片高度 # w = img_size[0] # 图片宽度 # print("页面截图长和宽:", w, h) # 1080 1920 --int region1 = img.crop((880, 787, 974, 819)) # 验证码尺寸 95*24 region1.save("captures/shixin_capt.jpg") except Exception as e: print(e) async def get_capt_text(self): """使用超级鹰识别验证码""" with open("captures/shixin_capt.jpg", "rb") as f: image = f.read() result = self.chaojiying.post_pic(image, CHAOJIYING_KIND) # TODO 调用超级鹰开始识别 # result = {'err_no': 0, 'err_str': 'OK', 'pic_id': '3103910544000600042', 'pic_str': 'prrs', 'md5': 'e4b834f70223666e1c25104f647a33b8'} print("超级鹰识别结果:", result) pic_str = result.get("pic_str") result["s"] = "shixin" result["ts"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') del result["pic_id"] chaojiying_log_path = "chaojiying.log" with open(chaojiying_log_path, "a") as af: json.dump(result, af) af.write("\n") if not pic_str: pic_str = "error" return pic_str async def get_content_by_baidu(self, filePath): """通过百度接口识别验证码:传入图片路径""" with open(filePath, 'rb') as fp: image = fp.read() # Accurateresult = client.basicAccurate(image) # TODO 调用通用文字识别(高精度版) # print("高精度版:", Accurateresult) # Accuratewords = Accurateresult.get("words_result")[0] # cap_text1 = "" # for w1 in Accuratewords.get("words").strip(): # if re.match("[a-zA-Z0-9]", w1): # cap_text1 += w1 # print("Ai 高精度接口识别结果:", cap_text1, " |字符个数:", len(cap_text1)) Generalresult = self.client.basicGeneral(image) # TODO 调用通用文字识别 print("通用一般:", Generalresult) cap_text2 = "" try: Generalwords = Generalresult.get("words_result")[0] for w2 in Generalwords.get("words").strip(): if re.match("[a-zA-Z0-9]", w2): cap_text2 += w2 print("Ai一般精度接口识别结果:", cap_text2, " |字符个数:", len(cap_text2)) if len(cap_text2) == 4: return cap_text2 else: return "not4" except: return "eror" if __name__ == '__main__': conn = pymysql.connect(host="mysql.test.syf.com", port=3306, database="shengye_data_center", user="shengye_data_center_test", password="KmdIN8Ftw8tB", charset="utf8", cursorclass=pymysql.cursors.DictCursor ) cs = conn.cursor() sql = "SELECT DISTINCT warrantor,IDCard_num from t_shixin_list;" cs.execute(sql) war_list = cs.fetchall() print("担保人个数:", len(war_list)) request_count = len(war_list) // 30 for i in range(request_count): sxp = ShiXinPyppeteer() asyncio.get_event_loop().run_until_complete(sxp.get_capture_text_captchaId()) time.sleep(77) # 2 自动登录[qcc为例]: """ 使用Pyppeteer库进行浏览器点击验证码; """ import random from PIL import Image import asyncio import time import os from pyppeteer import launch from pyquery import PyQuery as pq import requests class TestPyppeteer(object): def __init__(self): pass def __del__(self): # await self.browser.close() pass async def run(self): """真正的业务方法""" # browser = await launch(headless=False, userDataDir="./userData", args=['--disable-infobars','--window-size=1920,1080']) # 创建浏览器对象; browser = await launch(headless=False, args=['--disable-infobars', '--window-size=1920,1080']) # 创建浏览器对象; # browser = await launch(headless=False, # userDataDir='./qccdata', # args=['--window-size=1920,1080', '--disable-infobars']) # 参数:headless=False 设置成有界面; # 参数:devtools=True,打开页面时开启调试工具; # 参数:args=['--disable-infobars'] 关闭页面中的'Chrome 正受到自动测试软件的控制'字样; # 参数:args = ['--window-size=1920,1080'] && page = await browser.newPage() # 新建页面 await page.setViewport({"width": 1920, "height": 1080}) # url = "http://jzsc.mohurd.gov.cn/data/company" # url = "https://www.qcc.com/" url = "https://www.qichacha.com/user_login" await page.goto(url) # 页面加载 # 设置浏览器 await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)' ' Chrome/74.0.3729.169 Safari/537.36') # 解决 window.navigator.webdriver对 webdriver的检测;# 防止被识别,将webdriver设置为false await page.evaluate( '''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''') # 准备截图 # await asyncio.sleep(1) # 登录时的验证 normal_login = await page.xpath('//*[@id="normalLogin"]') await normal_login[0].click() await page.waitFor(1000) # 滑动验证 # 输入账号:密码 username = "13178699629" password = "sy2020" await page.type("#nameNormal",username,{"delay":100}) await page.waitFor(1000) await page.type("#pwdNormal", password, {"delay": 50}) # 检测页面是否有滑块。原理是检测页面元素。 await page.waitFor(100) while True: await self.try_validation(page) await page.screenshot(path="qcc_login.jpg") await page.waitFor(1000) # //*[@class="errloading"] error_elements = await page.xpath('//*[@class="errloading"]') if error_elements: print("滑动失败,") a_ele = await page.xpath('//a[text()="刷新"]') print(a_ele) await page.waitFor(1000) await a_ele[0].click() # await self.try_validation(page) else: print("滑动成功....") break await page.waitFor(100) login_elements = await page.xpath('//button[@class="btn btn-primary btn-block m-t-md login-btn"]') await login_elements[0].click() await page.waitFor(100) await self.get_cookie(page) await page.waitFor(10000) await page.close() await browser.close() async def try_validation(self, page, distance=308): # 将距离拆分成两段,模拟正常人的行为 distance2 = random.randint(10,20) # distance2 = 14 distance1 = distance - distance2 print("distance2:",distance2) btn_position = await page.evaluate(''' () =>{ return { x: document.querySelector('#nc_1_n1z').getBoundingClientRect().x, y: document.querySelector('#nc_1_n1z').getBoundingClientRect().y, width: document.querySelector('#nc_1_n1z').getBoundingClientRect().width, height: document.querySelector('#nc_1_n1z').getBoundingClientRect().height }} ''') x = btn_position['x'] + btn_position['width'] / 2 y = btn_position['y'] + btn_position['height'] / 2 print(btn_position) print("起始位置:", x, y,"优化成先加速再匀速再减速。。") await page.mouse.move(x, y) await page.mouse.down() await page.waitFor(20) step1 = random.randint(25, 35) # step1 = 26 print("step1:", step1) await page.mouse.move(x + distance1, y, {'steps': step1}) await page.waitFor(30) step2 = random.randint(15, 25) # step2 = 16 print("step2:",step2) await page.mouse.move(x + distance1 + distance2, y, {'steps':step2 }) await page.waitFor(800) await page.mouse.up() async def try_validation2(self, page, distance=308): # 将距离拆分成两段,模拟正常人的行为 await page.waitFor(1000) btn_position = await page.evaluate(''' () =>{ return { x: document.querySelector('#nc_1_n1z').getBoundingClientRect().x, y: document.querySelector('#nc_1_n1z').getBoundingClientRect().y, width: document.querySelector('#nc_1_n1z').getBoundingClientRect().width, height: document.querySelector('#nc_1_n1z').getBoundingClientRect().height }} ''') x = btn_position['x'] + btn_position['width'] / 2 y = btn_position['y'] + btn_position['height'] / 2 print(btn_position) print("起始位置:", x, y,"优化成先加速再匀速再减速。。") await page.mouse.move(x, y) await page.mouse.down() track_list = await self.get_track(distance=distance) for ids, track in enumerate(track_list): await page.mouse.move(x + track, y) x += track await page.waitFor(random.randint(15,30)) # if ids == len(track_list)-1: # await page.mouse.down() await page.mouse.up() # 滑块移动轨迹 async def get_track(self, distance): track = [] current = 0 mid = distance * 3/ 4 t = random.randint(2, 3) / 10 v = 0 while current < distance: if current < mid: a = 2 else: a = -3 v0 = v v = v0 + a * t move = v0 * t + 1 / 2 * a * t * t current += move track.append(round(move)) print("轨迹:", track) return track async def get_pic(self, pic_name): """传入一个文件,剪切出指定范围的图片""" print("正在截取图片", pic_name) ids = (pic_name.split(".")[0]).split("/")[1] print(ids) try: img = Image.open(pic_name) # 图片尺寸 img_size = img.size h = img_size[1] # 图片高度 w = img_size[0] # 图片宽度 print(w, h) # 1080 1920 --int region1 = img.crop((792, 284, 1111, 500)) # 车型 region1.save("captures/capt.jpg") except Exception as e: print(e) async def get_cookie(self, page): res = await page.content() cookies_list = await page.cookies() # print(cookies_list) cookies = '' for cookie in cookies_list: str_cookie = '{0}={1};' str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value')) cookies += str_cookie print(cookies) if __name__ == '__main__': filename = "captures/pypp_jzsc.jpg" tp = TestPyppeteer() asyncio.get_event_loop().run_until_complete(tp.run()) # print(asyncio.get_event_loop().run_until_complete(tp.get_track(308)))
    12. scrapyd爬虫部署:
    安装scrapyd: pip install scrapyd 后台启动scrapyd:nohup scrapyd & 0 写好scrapy爬虫 1 编辑scrapy项目中的scrapyd.cfg文件: [deploy:shipxy_info] ---自己命名 url = http://10.10.6.103:6800/ project = shipxy ---就是scrapy项目名 2 爬虫代码发布到服务器 3 在scrapy内层目录中执行以下命令: scrapyd-deploy {deploy} -p {project} 4 浏览器输入http://10.10.6.103:6800/查看scrapd页面; # --使用api启动爬虫; 1、获取状态 http://127.0.0.1:6800/daemonstatus.json 2、获取项目列表 http://127.0.0.1:6800/listprojects.json 3、获取项目下已发布的爬虫列表 http://127.0.0.1:6800/listspiders.json?project=myproject 4、获取项目下已发布的爬虫版本列表 http://127.0.0.1:6800/listversions.json?project=myproject 5、获取爬虫运行状态http://127.0.0.1:6800/listjobs.json?project=myproject 6、启动服务器上某一爬虫 http://localhost:6800/schedule.json (post方式,data={"project":myproject,"spider":myspider}6.1 注意使用scrapyd启动爬虫并向爬虫中传参:在请求体中加入参数:【可以实时爬虫的思路】 (post方式,data={"project":myproject,"spider":myspider,"参数名":"参数值"}# 在spider中添加: def __init__(self, ship_name=None, *args, **kwargs): super(ShipInfoSpider, self).__init__(*args, **kwargs) logging.info("传进爬虫的参数:{}".format(ship_name)) self.ship_name = ship_name self.start_urls = ["http://searchv3.shipxy.com/shipdata/search3.ashx?f=srch&kw={}".format(ship_name)] # python web调用: def start_spider(): url = "http://10.10.6.103:6800/schedule.json" ship_name = "MARJORIE ANN" data = { "project": "shipxy", "spider": "ship_info", "ship_name": ship_name } res = requests.post(url=url, data=data, ) print(res.content.decode())
    13 .spiderKeeper爬虫监控:
    0 确保已经把项目部署到scrapyd中: 1 安装spiderkeeper:pip install spiderkeeper 2 后台启动spiderkeeper: nohup spiderkeeper & 3 在浏览器中输入:localhost:5000可以看到登录页面admin\admin 4 在scrapyd中生蛋:scrapyd-deploy --build-egg output.egg 其他的都是页面操作。
    Processed: 0.027, SQL: 9