官方仓库地址:https://github.com/miyakogi/pyppeteer
官方文档地址:API Reference — Pyppeteer 0.0.25 documentation
Selenium环境的相关配置比较繁琐,此外,有的网站会对selenium和webdriver进行识别和反爬,因此在这里介绍一下它的替代产品Pyppeteer。
Pyppeteer 就是依赖于 Chromium 这个浏览器来运行的。如果第一次运行的时候,Chromium 浏览器没有安装,那么程序会帮我们自动安装和配置,就免去了繁琐的环境配置等工作。另外 Pyppeteer 是基于 Python 的新特性 async 实现的,所以它的一些执行也支持异步操作,效率相对于 Selenium也有所提高。
安装
pip3 install pyppeteer
测试代码:
import asyncio
from pyppeteer import launchasync def main():# 创建浏览器对象browser = await launch(headless=False,args=['--disable-infobars'])# 打开新的标签页page = await browser.newPage()#设置视图大小await page.setViewport({'width':1366,'height':768})#设置UserAgentawait page.setUserAgent('Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36')# 访问页面response = await page.goto('https://www.baidu.com')#获取status、headers、urlprint(response.status)print(response.headers)print(response.url)#获取当前页标题print(await page.title())#获取当前页内容print(await page.content()) #文本类型# print(await response.text())#cookie操作print(await page.cookies()) #获取cookie,[{'name':xx,'value':xxx...},...]# page.deleteCookie() 删除cookie# page.setCookie() 设置cookie#定位元素#1、只定位一个元素(css选择器)# element = await page.querySelector('#s-top-left > a')#2、css选择器elements = await page.querySelectorAll('#s-top-left > a:nth-child(2n)')#3、xpath# elements = await page.xpath('//div[@id="s-top-left"]/a')for element in elements:print(await (await element.getProperty('textContent')).jsonValue()) #获取文本内容print(await (await element.getProperty('href')).jsonValue())#获取href属性#模拟输入和点击await page.type('#kw','中国',{'delay':1000}) #模拟输入,输入时间:1000 msawait asyncio.sleep(2)await page.click('#su') #模拟点击,也可以先定位元素,然后await element.click()await asyncio.sleep(2)#执行js,滚动页面到底部await page.evaluate('window.scrollTo(0,document.body.scrollHeight);')#截图await page.screenshot({'path':'baidu.png'})await asyncio.sleep(5)await browser.close() #关闭浏览器asyncio.get_event_loop().run_until_complete(main())
主要功能介绍
1.打开浏览器
调用 launch 方法即可,相关参数介绍:
ignoreHTTPSErrors (bool): 是否要忽略 HTTPS 的错误,默认是 False。
headless (bool): 是否启用无界面模式,默认为 True。如果 devtools 这个参数是 True 的话,那么该参数就会被设置为 False。
executablePath (str): 可执行文件的路径,如果指定之后就不需要使用默认的 Chromium 了,可以指定为已有的 Chrome 或 Chromium。
args (List[str]): 在执行过程中可以传入的额外参数。
slowMo (int|float): 设置这个参数可以延迟pyppeteer的操作,单位是毫秒.
userDataDir (str): 即用户数据文件夹,即可以保留一些个性化配置和操作记录。
devtools (bool): 是否为每一个页面自动开启调试工具,默认是 False。如果为 True,那么headless参数会被强制设置为 False。
2.关闭提示条:”Chrome 正受到自动测试软件的控制”
browser = await launch(headless=False, args=['--disable-infobars'])
3.代理设置
proxy = 'http://具体代理'
browser = await launch(headless=False, args=['--disable-infobars', f'--proxy-server={proxy}'])
4.开启无痕模式
# 创建浏览器对象
browser = await launch(headless=False, args=['--disable-infobars'])
# 开启无痕模式
context = await browser.createIncognitoBrowserContext()
# 打开新的标签页
page = await context.newPage()
5.设置窗口大小
width, height = 1366, 768
await page.setViewport({'width': width, 'height': height})
6.设置UserAgent
await page.setUserAgent('xxx')
7.执行JS脚本:调用page.evaluate()方法
await page.evaluate('window.scrollTo(0,document.body.scrollHeight);')
#滚动页面到底部
8.规避webdriver检测
import asyncio
from pyppeteer import launchasync def main():browser = await launch(headless=False, args=['--disable-infobars'])page = await browser.newPage()await page.goto('https://login.taobao.com/member/login.jhtml?redirectURL=https://www.taobao.com/')await page.evaluateOnNewDocument('''() =>{ Object.defineProperties(navigator,{ 'webdriver':{ get: () => false } }) }''')await page.evaluateOnNewDocument('''() =>{ Object.defineProperties(navigator,{ 'languages':{ get: () => ['en-US', 'en'] } }) }''')await page.evaluateOnNewDocument('''() =>{ Object.defineProperties(navigator,{ 'plugins':{ get: () => [1, 2, 3, 4, 5] } }) }''')await asyncio.sleep(100)asyncio.get_event_loop().run_until_complete(main())
9.多个page页面选项卡操作
# 新建选项卡1
page1 = await browser.newPage()
await page1.goto('https://www.baidu.com/')
await asyncio.sleep(2)
# 新建选项卡2
page2 = await browser.newPage()
await page2.goto('https://www.zhihu.com/')
# 查看所有选项卡
pages = await browser.pages() # 含第一个空白页,总共3页
await pages[1].bringToFront() # 切换到第2页,即百度
10.模拟输入和点击
await page.type(selector, text, {"delay":100}) #模拟输入,输入每个字符的间隔时间100 ms
await asyncio.sleep(2)
await page.click(selector) #模拟点击
await asyncio.sleep(2)
11.鼠标移动和按下松开操作
await page.hover(selector) #鼠标移动到某个元素上
await page.mouse.down() #按下鼠标
await page.mouse.move(2000, 0, {'delay': random.randint(1000, 2000)}) #移动鼠标
await page.mouse.up() #松开鼠标
12.定位元素、获取元素文本内容和属性值
page.querySelector(selector)#只匹配第一个元素
element = await page.querySelector('#s-top-left > a')print(await (await element.getProperty('textContent')).jsonValue()) #获取文本内容
print(await (await element.getProperty('href')).jsonValue())#获取href属性
page.querySelectorAll(selector)#css选择器
elements = await page.querySelectorAll('#s-top-left > a:nth-child(2n)')
for element in elements:print(await (await element.getProperty('textContent')).jsonValue()) #获取文本内容print(await (await element.getProperty('href')).jsonValue())#获取href属性
page.xpath(expression)#xpath
elements = await page.xpath('//div[@id="s-top-left"]/a')
for element in elements:print(await (await element.getProperty('textContent')).jsonValue()) #获取文本内容print(await (await element.getProperty('href')).jsonValue())#获取href属性
page.jeval(selector,pageFunction)#定位元素,并调用js函数去执行
print(await page.Jeval('#s-top-left > a:first-child','node => node.textContent') ) #获取文本内容
print(await page.Jeval('#s-top-left > a:first-child','node => node.href') ) #获取href属性
13.延时等待,通过各种等待方法,可以控制页面的加载情况
page.waitForSelector() # 等待符合Selector的节点加载出来,否则直到超时
page.waitForXPath() # 等待符合Xpath的节点加载出来
page.waitForFunction() # 等待某个JS方法执行完毕并返回结果
page.waitFor() # 通用等待方式,如果是数字,则表示等待具体时间(毫秒),其它也可以是Selector、Xpath、Function字符串
page.waitForRequest() # 等待请求出现(url或者函数)
page.waitForResponse() # 等待响应内容出现(url或者函数)
page.waitForNavigation() # 等待页面跳转,如果没加载出来就报错,比如前面使用await page.click('某个链接'),后面使用该等待
14.请求拦截器,对请求进行过滤等操作
可以通过page.setRequestInterception(True)来开启拦截器,然后自定义拦截规则
对Request的拦截有3个固定的常用方法:
Request.continue_()
不传参数,则保持请求
传入参数overrides,则跳转,该参数是一个字典{'url': 'xx', 'method': '', 'postData': '', 'headers': ''}
Request.abort()
停止请求,可以起过滤作用(比如不显示图片)
Request.respond({"body": "响应内容"})
用给定的响应内容完成请求(比如替换JS文件内容时)
import asyncio
from pyppeteer import launchasync def main():browser = await launch(headless=False, args=['--disable-infobars'])browser = await browser.createIncognitoBrowserContext()page = await browser.newPage()# 开启请求拦截器await page.setRequestInterception(True)# 设置请求拦截器page.on('request', lambda req: asyncio.ensure_future(intercept_request(req)))# 设置响应拦截器page.on('response', lambda response: asyncio.ensure_future(intercept_response(response)))await page.goto('https://www.baidu.com/')print(await page.title())await asyncio.sleep(3)await page.goto("https://spa6.scrape.center/")await asyncio.sleep(3)await browser.close()# 请求拦截器
async def intercept_request(req):url = req.urlif url == 'https://fanyi.baidu.com/':# 用给定内容响应请求await req.respond({'status': 200, 'body': 'welcome to new page'})elif url == 'https://www.baidu.com/img/PCtm_d9c8750bed0b3c7d089fa7d55720d6cf.png':# 停止请求print('已过滤该图片')await req.abort()elif url == 'https://www.qq.com/':# 跳转请求await req.continue_({'url': 'https://www.tencent.com/zh-cn/', 'method': 'GET'})else:# 保持请求await req.continue_()# 响应拦截器
async def intercept_response(response):if response.status == 200 and response.url == 'https://www.baidu.com/':text = await response.text()print(text)if '/api/movie' in response.url and response.status == 200:json_data = await response.json()print(json_data)asyncio.get_event_loop().run_until_complete(main())
15.针对frame操作
page.frames获取页面中的所有frames列表,对于每一个frame操作,和page操作一致
page.mainFrame获取当前页面的主frame
frame_list = page.frames #获取所有frame#获取当前页面的标题,下面3个效果一样
print(await frame_list[0].title())
print(await page.mainFrame.title())
print(await page.title())
实战案例
1.爬取京东商城
import requests
from bs4 import BeautifulSoup
from pyppeteer import launch
import asynciodef screen_size():"""使用tkinter获取屏幕大小"""import tkintertk = tkinter.Tk()width = tk.winfo_screenwidth()height = tk.winfo_screenheight()tk.quit()return width, heightasync def main(url):# browser = await launch({'headless': False, 'args': ['--no-sandbox'], })browser = await launch({'args': ['--no-sandbox'], })page = await browser.newPage()width, height = screen_size()await page.setViewport(viewport={"width": width, "height": height})await page.setJavaScriptEnabled(enabled=True)await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299')await page.goto(url)# await asyncio.sleep(2)await page.evaluate('window.scrollBy(0, document.body.scrollHeight)')await asyncio.sleep(1)# content = await page.content()li_list = await page.xpath('//*[@id="J_goodsList"]/ul/li')# print(li_list)item_list = []for li in li_list:a = await li.xpath('.//div[@class="p-img"]/a')detail_url = await (await a[0].getProperty("href")).jsonValue()promo_words = await (await a[0].getProperty("title")).jsonValue()a_ = await li.xpath('.//div[@class="p-commit"]/strong/a')p_commit = await (await a_[0].getProperty("textContent")).jsonValue()i = await li.xpath('./div/div[3]/strong/i')price = await (await i[0].getProperty("textContent")).jsonValue()em = await li.xpath('./div/div[4]/a/em')title = await (await em[0].getProperty("textContent")).jsonValue()item = {"title": title,"detail_url": detail_url,"promo_words": promo_words,'p_commit': p_commit,'price': price}item_list.append(item)# print(item)# break# print(content)await page_close(browser)return item_listasync def page_close(browser):for _page in await browser.pages():await _page.close()await browser.close()msg = "手机"
url = "https://search.jd.com/Search?keyword={}&enc=utf-8&qrst=1&rt=1&stop=1&vt=2&wq={}&cid2=653&cid3=655&page={}"task_list = []
for i in range(1, 6):page = i * 2 - 1url = url.format(msg, msg, page)task_list.append(main(url))loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*task_list))
# print(results, len(results))
for i in results:print(i, len(i))# soup = BeautifulSoup(content, 'lxml')
# div = soup.find('div', id='J_goodsList')
# for i, li in enumerate(div.find_all('li', class_='gl-item')):
# if li.select('.p-img a'):
# print(li.select('.p-img a')[0]['href'], i)
# print(li.select('.p-price i')[0].get_text(), i)
# print(li.select('.p-name em')[0].text, i)
# else:
# print("#" * 200)
# print(li)
2.爬取淘宝网
import asyncio
import time
from pyppeteer.launcher import launch
from alifunc import mouse_slide, input_time_random
from exe_js import js1, js3, js4, js5def screen_size():"""使用tkinter获取屏幕大小"""import tkintertk = tkinter.Tk()width = tk.winfo_screenwidth()height = tk.winfo_screenheight()tk.quit()return width, heightasync def main(username, pwd, url):browser = await launch({'headless': False, 'args': ['--no-sandbox'], }, userDataDir='./userdata',args=['--window-size=1366,768'])page = await browser.newPage()width, height = screen_size()await page.setViewport(viewport={"width": width, "height": height})await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299')await page.goto(url)await page.evaluate(js1)await page.evaluate(js3)await page.evaluate(js4)await page.evaluate(js5)pwd_login = await page.querySelector('.J_Quick2Static')# print(await (await pwd_login.getProperty('textContent')).jsonValue())await pwd_login.click()await page.type('#TPL_username_1', username, {'delay': input_time_random() - 50})await page.type('#TPL_password_1', pwd, {'delay': input_time_random()})await page.screenshot({'path': './headless-test-result.png'})time.sleep(2)slider = await page.Jeval('#nocaptcha', 'node => node.style') # 是否有滑块if slider:print('出现滑块情况判定')await page.screenshot({'path': './headless-login-slide.png'})flag = await mouse_slide(page=page)if flag:print(page.url)await page.keyboard.press('Enter')await get_cookie(page)else:await page.keyboard.press('Enter')await page.waitFor(20)await page.waitForNavigation()try:global errorerror = await page.Jeval('.error', 'node => node.textContent')except Exception as e:error = Noneprint(e, "错啦")finally:if error:print('确保账户安全重新入输入')else:print(page.url)# 可继续网页跳转 已经携带 cookie# await get_search(page)await get_cookie(page)await page_close(browser)async def page_close(browser):for _page in await browser.pages():await _page.close()await browser.close()async def get_search(page):# https://s.taobao.com/search?q={查询的条件}&p4ppushleft=1%2C48&s={每页 44 条 第一页 0 第二页 44}&sort=sale-descawait page.goto("https://s.taobao.com/search?q=气球")await asyncio.sleep(5)# print(await page.content())# 获取登录后cookie
async def get_cookie(page):res = await page.content()cookies_list = await page.cookies()cookies = ''for cookie in cookies_list:str_cookie = '{0}={1};'str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))cookies += str_cookieprint(cookies)# 将cookie 放入 cookie 池 以便多次请求 封账号 利用cookie 对搜索内容进行爬取return cookiesif __name__ == '__main__':username = 'username'pwd = 'password'url = "https://login.taobao.com/member/login.jhtml?spm=a21bo.2017.754894437.1.5af911d9qqVAb1&f=top&redirectURL=https%3A%2F%2Fwww.taobao.com%2F"loop = asyncio.get_event_loop()loop.run_until_complete(main(username, pwd, url))
3.针对iframe 的操作:page.frames 获取所有的 iframe 列表 需要判断操作的是哪一个 iframe 跟操作 page 一样操作。
from pyppeteer import launch
import asyncioasync def main(url):w = await launch({'headless': False, 'args': ['--no-sandbox'], })page = await w.newPage()await page.setViewport({"width": 1366, 'height': 800})await page.goto(url)try:await asyncio.sleep(1)frame = page.framesprint(frame) # 需要找到是哪一个 frametitle = await frame[1].title()print(title)await asyncio.sleep(1)login = await frame[1].querySelector('#switcher_plogin')print(login)await login.click()await asyncio.sleep(20)except Exception as e:print(e, "EEEEEEEEE")for _page in await w.pages():await _page.close()await w.close()asyncio.get_event_loop().run_until_complete(main("https://i.qq.com/?rd=1"))
# asyncio.get_event_loop().run_until_complete(main("https://www.gushici.com/"))