爬取目标
https://spa2.scrape.center/
本节工作
遍历每页列表,获取每部电影详情页的 URL
爬取每部电影的详情页, 提取电影名称, 评分,类别,封面,简介等信息
将爬取的数据保存为 JSON数据
准备工作
安装好 python (最低 3.6)
安装好 Pyppeteer 并能成功运行实例
爬取列表页
import logginglogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')INDEX_URL = 'https://spa2.scrape.center/page/{page}' TIMEOUT = 10 TOTAL_PAGE = 10 WINDOW_WIDTH, WINDOW_HEIGHT = 1366, 768 HEADLESS = False
这里导入了必要的包,定义了日志的配置和几个变量
INDEX_URL: 列表页的 URL 后面的page 是动态的,翻页用
TIMEOUT :加载超时的最大时间 秒
TOTAL_PAGE : 爬取的总页数
WINDOW_WIDTH,WINDOW_HEIGHT: 浏览器的大小
HEADLESS ; 浏览器是否为无头模式,默认是 True, 这里是 False,
初始化 Pyppeteer, 设置窗口大小
from pyppeteer import launchbrowser, tab = None, Noneasync def init():global browser, tabbrowser = await launch(headless=HEADLESS, args=['--disable-infobars', f'--window-size={WINDOW_WIDTH},{WINDOW_HEIGHT}'])tab = await browser.newPage()await tab.setViewport({'width': WINDOW_WIDTH, 'height': WINDOW_HEIGHT})
这里先声明了 browser 变量和 tab 变量, 前者代表 Pyppeteer 所用的浏览器对象,后者代表新建的页面选项卡。 这两项都被设置为了全局变量,能够方便其他方法调用
然后定义了一个 init 方法,该方法调用了 Pyppeteer 的 launch 方法, 并且给 headless 参数传入了 HEADLESS , 将 Pyppeteer 设置为非无头模式还通过 args 参数指定了隐藏提示条河设置浏览器窗口宽高
接下来定义一个通用的爬取方法
from pyppeteer.errors import TimeoutErrorasync def scrape_page(url, selector):logging.info('scraping %s', url)try:await tab.goto(url)await tab.waitForSelector(selector, options={'timeout': TIMEOUT * 1000})except TimeoutError:logging.error('error occurred while scraping %s', url, exc_info=True)
这里定义了 scrape_page 方法,它接收两个参数, 一个是 url , 代表爬取页面的 url ,使用 goto 方法调用此URL 即可访问对应页面, 另一个是 selector ,即等待渲染出的节点对应的 CSS 选择器,此外,我们调用了 waitForSelector 方法, 传入 selector , 并通过 options 指定了最长等待时间
运行时,会首先访问传入的 URL 对应的页面,然后等待某个和选择器匹配的节点加载出来,最长等待 10 秒,如果10秒内加载出来, 就接着往下执行,否则抛出异常,并输出错误日志
下面实现爬取列表页的方法
async def scrape_index(page):url = INDEX_URL.format(page=page)await scrape_page(url, '.item .name')
这里定义了一个 scrape_index 方法,它接收参数 page ,代表爬取的页面的页码,方法中我们先通过 INDEX_URL 构造出了列表页的 URL ,然后调用 scrape_page 方法并将构造出 URL 传入其中,同时传入选择器
我们传入的选择器是 .item .name 是列表页的电影名称, 意味着电影名称加载出来就代表页面加载成功了
我们再定义一个分析列表页的方法,用来提取详情页的 URL
async def parse_index():return await tab.querySelectorAllEval('.item .name', 'nodes => nodes.map(node => node.href)')
这里我们调用了 querySlectorAllEval 方法, 它接收两个参数, 一个是 selector , 代表选择器, 另一个是 pageFunction , 代表要执行的 JavaScript 方法。 这个方法的作用是找出和选择器匹配的节点,然后根据 pageFunction 定义的逻辑从这些节点中抽取中对应的结果并返回。
我们给参数 selector 传入了电影名称,由于和选择器相匹配的节点有多个,所以给 pageFunction 参数输入的 JavaScript 方法就是 nodes , 其返回值是调用 map 方法得到的 node ,然后调用 node 的 href 属性得到超链接。 这样 querySelectorAllEval 的返回结果就是当前列表页所有电影的详情页 URL组成的列表
接下来我们串联刚刚实现的方法
import asyncioasync def main():await init()try:for page in range(1, TOTAL_PAGE + 1 ):await scrape_index(page)detail_urls = await parse_index()logging.info('detail_urls %s', detail_urls)finally:await browser.close() if __name__ == '__main__':asyncio.run(main())
部分输出结果
2024-07-29 11:31:52,966 - INFO: scraping https://spa2.scrape.center/page/8
2024-07-29 11:31:53,634 - INFO: detail_urls ['https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI3MQ==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI3Mg==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI3Mw==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI3NA==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI3NQ==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI3Ng==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI3Nw==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI3OA==',
这里定义了 main 方法,其中首先调用了 init 方法, 然后遍历所有页码,调用 scrape_index 方法爬取了每一页列表页, 接着调用 parse_index 方法,从列表页提取每个详情页的 URL 最后输出
爬取详情页
先定义一个爬取详情页的方法
async def scrape_detail(url):await scrape_page(url, 'h2')
这里很简单,直接调用 scrape_page 方法,传入详情页的 URL 和选择器即可, 这里的选择器我们传入了 h2 , 代表电影名称, 运行的话, Pyppeteer 已经成功加载出详情页了
下一步就是提取详情页的信息了
async def parse_detail():url = tab.urlname = await tab.querySelectorEval('h2', 'node => node.innerText')categories = await tab.querySelectorAllEval('.categories button span', 'nodes => nodes.map(node => node.innerText)')cover = await tab.querySelectorEval('.cover', 'node => node.src')score = await tab.querySelectorEval('.score', 'node => node.innerText')drama = await tab.querySelectorEval('.drama p', 'node => node.innerText')return {'url': url,'name': name,'categories': categories,'cover': cover,'score': score,'drama': drama}
这里我们定义了 parse_detail 方法, 提取了 URL ,名称,类别,封面, 分数,简介等内容。
最后将提取结果汇总成一个字典并返回
接下来,在 main 方法中添加对 scrape_detail 方法和 parse_detail 方法的调用
import asyncioasync def main():await init()try:for page in range(1, TOTAL_PAGE + 1 ):await scrape_index(page)detail_urls = await parse_index()for detail_url in detail_urls:await scrape_detail(detail_url)detail_data = await parse_detail()logging.info('data %s', detail_data)finally:await browser.close() if __name__ == '__main__':asyncio.run(main())
2024-07-29 11:50:49,704 - INFO: scraping https://spa2.scrape.center/page/1
2024-07-29 11:50:51,151 - INFO: scraping https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIx
2024-07-29 11:50:53,454 - INFO: data {'url': 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIx', 'name': '霸王别姬 - Farewell My Concubine', 'categories': ['剧情', '爱情'], 'cover': 'https://p0.meituan.net/movie/ce4da3e03e655b5b88ed31b5cd7896cf62472.jpg@464w_644h_1e_1c', 'score': '9.5', 'drama': '影片借一出《霸王别姬》的京戏,牵扯出三个人之间一段随时代风云变幻的爱恨情仇。段小楼(张丰毅 饰)与程蝶衣(张国荣 饰)是一对打小一起长大的师兄弟,两人一个演生,一个饰旦,一向配合天衣无缝,尤其一出《霸王别姬》,更是誉满京城,为此,两人约定合演一辈子《霸王别姬》。但两人对戏剧与人生关系的理解有本质不同,段小楼深知戏非人生,程蝶衣则是人戏不分。段小楼在认为该成家立业之时迎娶了名妓菊仙(巩俐 饰),致使程蝶衣认定菊仙是可耻的第三者,使段小楼做了叛徒,自此,三人围绕一出《霸王别姬》生出的爱恨情仇战开始随着时代风云的变迁不断升级,终酿成悲剧。'}
这里看到,我们已经提取了想要的信息
数据存储
数据存储格式为 JSON 文件
import json from os import makedirs from os.path import existsRESULTS_DIR = 'results1'exists(RESULTS_DIR) or makedirs(RESULTS_DIR) async def save_data(data):name = data.get('name')data_path = f'{RESULTS_DIR}/{name}.json'json.dump(data, open(data_path, 'w', encoding='utf-8'), ensure_ascii=False, indent=2)
然后在 main 方法中添加 save_data 方法的调用
import asyncioasync def main():await init()try:for page in range(1, TOTAL_PAGE + 1 ):await scrape_index(page)detail_urls = await parse_index()for detail_url in detail_urls:await scrape_detail(detail_url)detail_data = await parse_detail()await save_data(detail_data)logging.info('data %s', detail_data)finally:await browser.close() if __name__ == '__main__':asyncio.run(main())
问题排查
代码运行过程中, 可能由于 Pyppeteer 本身实现方面的问题,因此在连续运行 20 秒之后,报错
Pyppeteer.errors.NetworkError: Protocol Error (Runtime.evaluate): Session closed.
问题的解决方法是修改源码, 问题描述详见 : https://github.com/miyakogi/pyppeteer/issues/178
我这里没遇到,应该是在后面的版本修复了
无头模式
如果需要无头模式,将最开始的
HEADLES = False 改成 HEADLES = True 就可以了