爬虫常用代码:
1 时间戳转字符串时间:
import time
timeStamp = 1557502800
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timeStamp))
print(otherStyleTime)
import datetime
print(datetime.datetime.now().strftime("%Y-%m-%d"))
print(type(datetime.datetime.now().strftime("%Y-%m-%d")))
2 python执行JS代码
import execjs
with open('data.js', 'r', encoding='utf-8') as f:
js1 = f.read()
ecjs = execjs.compile(js1)
data1 = ecjs.call("latNe2", 55.025635)
3 运行scrapy爬虫的文件:
from scrapy.cmdline import execute
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
execute(["scrapy", "crawl", "ship_info", "-a", "ship_name=TIANLIGONG007"])
4 向scrapy的爬虫文件中传递参数:
def __init__(self, ship_name=None, *args, **kwargs):
super(ShipInfoSpider, self).__init__(*args, **kwargs)
print("传进爬虫的参数:", ship_name)
self.start_urls = ["http://searchv3.shipxy.com/shipdata/search3.ashx?f=srch&kw={}".format(ship_name)]
5 scrapy_redis爬虫完成半个小时自动停止的扩展extensions.py
import logging
import time
from scrapy import signals
from scrapy.exceptions import NotConfigured
logger = logging.getLogger(__name__)
class RedisSpiderSmartIdleClosedExensions(object):
def __init__(self, idle_number, crawler):
self.crawler = crawler
self.idle_number = idle_number
self.idle_list = []
self.idle_count = 0
@classmethod
def from_crawler(cls, crawler):
if not crawler.settings.getbool('MYEXT_ENABLED'):
raise NotConfigured
idle_number = crawler.settings.getint('IDLE_NUMBER', 360)
ext = cls(idle_number, crawler)
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle)
return ext
def spider_opened(self, spider):
logger.info("opened spider %s redis spider Idle, Continuous idle limit: %d", spider.name, self.idle_number)
def spider_closed(self, spider):
logger.info("closed spider %s, idle count %d , Continuous idle count %d",
spider.name, self.idle_count, len(self.idle_list))
def spider_idle(self, spider):
self.idle_count += 1
self.idle_list.append(time.time())
idle_list_len = len(self.idle_list)
if idle_list_len > 2 and self.idle_list[-1] - self.idle_list[-2] > 6:
self.idle_list = [self.idle_list[-1]]
elif idle_list_len > self.idle_number:
logger.info('\n continued idle number exceed {} Times'
'\n meet the idle shutdown conditions, will close the reptile operation'
'\n idle start time: {}, close spider time: {}'.format(self.idle_number,
self.idle_list[0], self.idle_list[0]))
self.crawler.engine.close_spider(spider, 'closespider_pagecount')
EXTENSIONS = {
'lincese.extensions.RedisSpiderSmartIdleClosedExensions': 500,
}
6 计算总页数:
total_num = int(2001)
if total_num % 100 == 0:
max_page = total_num//100
else:
max_page = total_num//100 + 1
7 middleware.py加代理
import datetime
import random
import time
from scrapy import signals
import base64
import sys
PY3 = sys.version_info[0] >= 3
def base64ify(bytes_or_str):
if PY3 and isinstance(bytes_or_str, str):
input_bytes = bytes_or_str.encode('utf8')
else:
input_bytes = bytes_or_str
output_bytes = base64.urlsafe_b64encode(input_bytes)
if PY3:
return output_bytes.decode('ascii')
else:
return output_bytes
def process_request(self, request, spider):
proxyHost = "u4585.5.tn.16yun.cn"
proxyPort = "6441"
proxyUser = "16AHMXTY"
proxyPass = "606070"
request.meta['proxy'] = "http://{0}:{1}".format(proxyHost, proxyPort)
encoded_user_pass = base64ify(proxyUser + ":" + proxyPass)
request.headers['Proxy-Authorization'] = 'Basic ' + encoded_user_pass
tunnel = random.randint(1, 10000)
request.headers['Proxy-Tunnel'] = str(tunnel)
print("------设置了代理---{}---".format(random.random()))
return None
8 post请求中请求体是json数据时:
formdata2 = '{"seDate":["",""],"stock":["%s"],"channelCode":["fixed_disc"],"bigCategoryId":["%s"],"pageSize":30,"pageNum":%s}' % (stock_code, cate, str(page))
url2 = "http://www.szse.cn/api/disc/announcement/annList?random="+str(random.random())
headers2 = {
"Content-Type": "application/json",
"Host": "www.szse.cn",
"Origin": "http://www.szse.cn",
"Proxy-Connection": "keep-alive",
"Referer": "http://www.szse.cn/disclosure/listed/fixed/index.html",
"User-Agent": "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36",
"X-Request-Type": "ajax",
"X-Requested-With": "XMLHttpRequest"
}
yield scrapy.Request(url=url2,
method="POST",
headers=headers2,
body=formdata2,
)
9 pyppeteer报错:
Q: File "/usr/local/lib/python3.6/dist-packages/pyppeteer/launcher.py", line 225, in get_ws_endpoint
raise BrowserError('Browser closed unexpectedly:\n')
pyppeteer errors BrowserError: Browser closed unexpectedly:
A:
centos:
ubuntu:
apt-get update
apt --fix-broken install
apt-get upgrade
apt-get install gconf-service libasound2 libatk1.0-0 libatk-bridge2.0-0 libc6 libcairo2 libcups2 libdbus-1-3 libexpat1 libfontconfig1 libgcc1 libgconf-2-4 libgdk-pixbuf2.0-0 libglib2.0-0 libgtk-3-0 libnspr4 libpango-1.0-0 libpangocairo-1.0-0 libstdc++6 libx11-6 libx11-xcb1 libxcb1 libxcomposite1 libxcursor1 libxdamage1 libxext6 libxfixes3 libxi6 libxrandr2 libxrender1 libxss1 libxtst6 ca-certificates fonts-liberation libappindicator1 libnss3 lsb-release xdg-utils wget --fix-missing
apt-get install -y libnss3
10 生成时间戳10位和13位:
import time
int(round(time.time() * 1000))
import time
int(time.time())
11 pyppeteer代码案例:
import datetime
import json
import re
import pymysql
from PIL import Image
import asyncio
from chaojiying import Chaojiying
import time
from pyppeteer import launch
from aip import AipOcr
""" 你的 APPID AK SK """
APP_ID = 'XXXX'
API_KEY = '####'
SECRET_KEY = '@@@@@'
class ShiXinPyppeteer(object):
captchaId = ""
pic_str = ""
def __init__(self):
self.headers = {
"Cookie": "JSESSIONID=07402EC2C3A9250340A6AB38D6C75A1C; _gscu_15322769=888379770s984g60; _gscbrs_15322769=1; Hm_lvt_d59e2ad63d3a37c53453b996cb7f8d4e=1588837978; SESSION=2b347422-1b5a-44b7-896c-1ece475a31ae; Hm_lpvt_d59e2ad63d3a37c53453b996cb7f8d4e=1588922225; _gscs_15322769=t88916915iq0wxw60|pv:22",
"User-Agent": "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36",
}
self.chaojiying = Chaojiying(CHAOJIYING_USERNAME, CHAOJIYING_PASSWORD, CHAOJIYING_SOFT_ID)
self.client = AipOcr(APP_ID, API_KEY, SECRET_KEY)
def __del__(self):
pass
async def get_capture_text_captchaId(self):
"""本方法用于获取验证码 & 验证码对应的id,
TODO 验证码存活时间不长,只有1,2分钟,所以数据量较多的情况下需要不断的调用识别;
"""
browser = await launch({'headless': True,'args': ['--no-sandbox', '--window-size=1920,1080','--disable-infobars']})
page = await browser.newPage()
await page.setViewport({"width": 1920, "height": 1080})
url = "http://zxgk.court.gov.cn/shixin/"
await page.goto(url)
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
' Chrome/74.0.3729.169 Safari/537.36')
await page.evaluate(
'''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''')
await page.waitFor(500)
await page.type("#pCardNum", "52010319750211041X", {"delay": 30})
retry_count = 0
while True:
try:
retry_count += 1
print("---第【{}】次尝试识别----".format(retry_count))
await page.screenshot(path="captures/shixin_page.jpg")
await self.get_capt("captures/shixin_page.jpg")
await page.waitFor(100)
captchaId_list = await page.xpath('//input[@id="captchaId"]')
self.captchaId = await (await captchaId_list[0].getProperty("value")).jsonValue()
if retry_count > 1:
self.pic_str = await self.get_capt_text()
else:
self.pic_str = "firs"
await page.evaluate('document.querySelector("#yzm").value=""')
await page.type("#yzm", self.pic_str, {"delay": 100})
await page.waitFor(100)
await page.screenshot(path="captures/shixin_check.jpg")
search_button = await page.xpath('//button[@class="btn btn-zxgk btn-block "]')
await page.waitFor(100)
await search_button[0].click()
await page.waitFor(100)
success = await page.xpath('//div[@class="col-sm-2 alert alert-success mysetalert"]')
await page.waitFor(50)
if success or retry_count > 10:
break
except Exception as e:
print("ERROR:", e)
print("------------------------------识别通过了------------------------------")
print("验证码文本:", self.pic_str, "验证码id:", self.captchaId)
await page.waitFor(500)
cap_dic = {"pic_str": self.pic_str, "captchaId": self.captchaId}
with open("captures/shixin_capture.json", "w", encoding='utf8') as wf:
json.dump(cap_dic, wf)
wf.close()
await page.close()
await browser.close()
async def get_capt(self, pic_name):
"""传入一个图片,剪切出指定范围的图片"""
try:
img = Image.open(pic_name)
region1 = img.crop((880, 787, 974, 819))
region1.save("captures/shixin_capt.jpg")
except Exception as e:
print(e)
async def get_capt_text(self):
"""使用超级鹰识别验证码"""
with open("captures/shixin_capt.jpg", "rb") as f:
image = f.read()
result = self.chaojiying.post_pic(image, CHAOJIYING_KIND)
print("超级鹰识别结果:", result)
pic_str = result.get("pic_str")
result["s"] = "shixin"
result["ts"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
del result["pic_id"]
chaojiying_log_path = "chaojiying.log"
with open(chaojiying_log_path, "a") as af:
json.dump(result, af)
af.write("\n")
if not pic_str:
pic_str = "error"
return pic_str
async def get_content_by_baidu(self, filePath):
"""通过百度接口识别验证码:传入图片路径"""
with open(filePath, 'rb') as fp:
image = fp.read()
Generalresult = self.client.basicGeneral(image)
print("通用一般:", Generalresult)
cap_text2 = ""
try:
Generalwords = Generalresult.get("words_result")[0]
for w2 in Generalwords.get("words").strip():
if re.match("[a-zA-Z0-9]", w2):
cap_text2 += w2
print("Ai一般精度接口识别结果:", cap_text2, " |字符个数:", len(cap_text2))
if len(cap_text2) == 4:
return cap_text2
else:
return "not4"
except:
return "eror"
if __name__ == '__main__':
conn = pymysql.connect(host="mysql.test.syf.com", port=3306, database="shengye_data_center",
user="shengye_data_center_test", password="KmdIN8Ftw8tB", charset="utf8",
cursorclass=pymysql.cursors.DictCursor
)
cs = conn.cursor()
sql = "SELECT DISTINCT warrantor,IDCard_num from t_shixin_list;"
cs.execute(sql)
war_list = cs.fetchall()
print("担保人个数:", len(war_list))
request_count = len(war_list) // 30
for i in range(request_count):
sxp = ShiXinPyppeteer()
asyncio.get_event_loop().run_until_complete(sxp.get_capture_text_captchaId())
time.sleep(77)
"""
使用Pyppeteer库进行浏览器点击验证码;
"""
import random
from PIL import Image
import asyncio
import time
import os
from pyppeteer import launch
from pyquery import PyQuery as pq
import requests
class TestPyppeteer(object):
def __init__(self):
pass
def __del__(self):
pass
async def run(self):
"""真正的业务方法"""
browser = await launch(headless=False, args=['--disable-infobars', '--window-size=1920,1080'])
page = await browser.newPage()
await page.setViewport({"width": 1920, "height": 1080})
url = "https://www.qichacha.com/user_login"
await page.goto(url)
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
' Chrome/74.0.3729.169 Safari/537.36')
await page.evaluate(
'''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''')
normal_login = await page.xpath('//*[@id="normalLogin"]')
await normal_login[0].click()
await page.waitFor(1000)
username = "13178699629"
password = "sy2020"
await page.type("#nameNormal",username,{"delay":100})
await page.waitFor(1000)
await page.type("#pwdNormal", password, {"delay": 50})
await page.waitFor(100)
while True:
await self.try_validation(page)
await page.screenshot(path="qcc_login.jpg")
await page.waitFor(1000)
error_elements = await page.xpath('//*[@class="errloading"]')
if error_elements:
print("滑动失败,")
a_ele = await page.xpath('//a[text()="刷新"]')
print(a_ele)
await page.waitFor(1000)
await a_ele[0].click()
else:
print("滑动成功....")
break
await page.waitFor(100)
login_elements = await page.xpath('//button[@class="btn btn-primary btn-block m-t-md login-btn"]')
await login_elements[0].click()
await page.waitFor(100)
await self.get_cookie(page)
await page.waitFor(10000)
await page.close()
await browser.close()
async def try_validation(self, page, distance=308):
distance2 = random.randint(10,20)
distance1 = distance - distance2
print("distance2:",distance2)
btn_position = await page.evaluate('''
() =>{
return {
x: document.querySelector('#nc_1_n1z').getBoundingClientRect().x,
y: document.querySelector('#nc_1_n1z').getBoundingClientRect().y,
width: document.querySelector('#nc_1_n1z').getBoundingClientRect().width,
height: document.querySelector('#nc_1_n1z').getBoundingClientRect().height
}}
''')
x = btn_position['x'] + btn_position['width'] / 2
y = btn_position['y'] + btn_position['height'] / 2
print(btn_position)
print("起始位置:", x, y,"优化成先加速再匀速再减速。。")
await page.mouse.move(x, y)
await page.mouse.down()
await page.waitFor(20)
step1 = random.randint(25, 35)
print("step1:", step1)
await page.mouse.move(x + distance1, y, {'steps': step1})
await page.waitFor(30)
step2 = random.randint(15, 25)
print("step2:",step2)
await page.mouse.move(x + distance1 + distance2, y, {'steps':step2 })
await page.waitFor(800)
await page.mouse.up()
async def try_validation2(self, page, distance=308):
await page.waitFor(1000)
btn_position = await page.evaluate('''
() =>{
return {
x: document.querySelector('#nc_1_n1z').getBoundingClientRect().x,
y: document.querySelector('#nc_1_n1z').getBoundingClientRect().y,
width: document.querySelector('#nc_1_n1z').getBoundingClientRect().width,
height: document.querySelector('#nc_1_n1z').getBoundingClientRect().height
}}
''')
x = btn_position['x'] + btn_position['width'] / 2
y = btn_position['y'] + btn_position['height'] / 2
print(btn_position)
print("起始位置:", x, y,"优化成先加速再匀速再减速。。")
await page.mouse.move(x, y)
await page.mouse.down()
track_list = await self.get_track(distance=distance)
for ids, track in enumerate(track_list):
await page.mouse.move(x + track, y)
x += track
await page.waitFor(random.randint(15,30))
await page.mouse.up()
async def get_track(self, distance):
track = []
current = 0
mid = distance * 3/ 4
t = random.randint(2, 3) / 10
v = 0
while current < distance:
if current < mid:
a = 2
else:
a = -3
v0 = v
v = v0 + a * t
move = v0 * t + 1 / 2 * a * t * t
current += move
track.append(round(move))
print("轨迹:", track)
return track
async def get_pic(self, pic_name):
"""传入一个文件,剪切出指定范围的图片"""
print("正在截取图片", pic_name)
ids = (pic_name.split(".")[0]).split("/")[1]
print(ids)
try:
img = Image.open(pic_name)
img_size = img.size
h = img_size[1]
w = img_size[0]
print(w, h)
region1 = img.crop((792, 284, 1111, 500))
region1.save("captures/capt.jpg")
except Exception as e:
print(e)
async def get_cookie(self, page):
res = await page.content()
cookies_list = await page.cookies()
cookies = ''
for cookie in cookies_list:
str_cookie = '{0}={1};'
str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
cookies += str_cookie
print(cookies)
if __name__ == '__main__':
filename = "captures/pypp_jzsc.jpg"
tp = TestPyppeteer()
asyncio.get_event_loop().run_until_complete(tp.run())
12. scrapyd爬虫部署:
安装scrapyd: pip install scrapyd
后台启动scrapyd:nohup scrapyd &
0 写好scrapy爬虫
1 编辑scrapy项目中的scrapyd.cfg文件:
[deploy:shipxy_info] ---自己命名
url = http://10.10.6.103:6800/
project = shipxy ---就是scrapy项目名
2 爬虫代码发布到服务器
3 在scrapy内层目录中执行以下命令:
scrapyd-deploy {deploy} -p {project}
4 浏览器输入http://10.10.6.103:6800/查看scrapd页面;
1、获取状态 http://127.0.0.1:6800/daemonstatus.json
2、获取项目列表 http://127.0.0.1:6800/listprojects.json
3、获取项目下已发布的爬虫列表 http://127.0.0.1:6800/listspiders.json?project=myproject
4、获取项目下已发布的爬虫版本列表 http://127.0.0.1:6800/listversions.json?project=myproject
5、获取爬虫运行状态http://127.0.0.1:6800/listjobs.json?project=myproject
6、启动服务器上某一爬虫
http://localhost:6800/schedule.json
(post方式,data={"project":myproject,"spider":myspider})
6.1 注意使用scrapyd启动爬虫并向爬虫中传参:在请求体中加入参数:【可以实时爬虫的思路】
(post方式,data={"project":myproject,"spider":myspider,"参数名":"参数值"})
def __init__(self, ship_name=None, *args, **kwargs):
super(ShipInfoSpider, self).__init__(*args, **kwargs)
logging.info("传进爬虫的参数:{}".format(ship_name))
self.ship_name = ship_name
self.start_urls = ["http://searchv3.shipxy.com/shipdata/search3.ashx?f=srch&kw={}".format(ship_name)]
def start_spider():
url = "http://10.10.6.103:6800/schedule.json"
ship_name = "MARJORIE ANN"
data = {
"project": "shipxy",
"spider": "ship_info",
"ship_name": ship_name
}
res = requests.post(url=url,
data=data,
)
print(res.content.decode())
13 .spiderKeeper爬虫监控:
0 确保已经把项目部署到scrapyd中:
1 安装spiderkeeper:pip install spiderkeeper
2 后台启动spiderkeeper: nohup spiderkeeper &
3 在浏览器中输入:localhost:5000可以看到登录页面admin\admin
4 在scrapyd中生蛋:scrapyd-deploy --build-egg output.egg
其他的都是页面操作。
转载请注明原文地址:https://ipadbbs.8miu.com/read-28170.html