aiohttp 异步爬虫实战

案例介绍

本次为我们要爬取一个数量相对大一点的网站, 链接为:

https://spa5.scrape.center/

这是一个图书网站,整个网站包含数千本图书信息,网站数据是 JavaScript 渲染而得,数据可以通过 Ajax 接口获取, 并且节后没有设置任何反爬措施和加密参数。另外,这个网站的数据量相对多一些适合使用异步爬取

目标:

使用 aiohttp 爬取全站图书数据

将数据通过异步方式保存到 MongoDB中

准备工作

开始前准备工作如下

安装好了 python 最低为 3.6

了解了 Ajax 爬取的一些基本原理和模拟方法

了解了 异步爬虫的基本原理和 asyncio 库的基本用法

了解了 aiohttp 库的基本用法

安装并成功运行了 MongoDB 数据库, 而且安装了异步爬虫库 motor

关于最后一条,要实现 MongoDB 异步存储, 离不开异步实现的 MongoDB 存储库 motor,

pip install motor

页面分析

列表页的 Ajax 请求接口格式为 https://spa5.scrape.center/api/book/?limit=18&offset={offset}.其中limit 的值为每一页包含多少本书; offset 的值为每一页的偏移量,计算公式为 offset = limit * (page-1), 如第 1 页 的offset 值 为0 第二页的 offset 值为 18 , 以此类推

在列表页 Ajax 接口返回的数据里,results 字段包含当前页里 18 本图书信息,其中每本书里都含有一 id 字段, 这个 id 就是图书本身的 id 可以用来进一步请求详情页

详情页的 Ajax 接口格式为 https://spa5.scrape.center/api/book/{id}。 其中 id 即为详情页对应的图书 id ,可以从来列表页 Ajax 接口的返回结果中获取此内容

实现思路

其实,一个完美的异步爬虫应该能够充分的利用资源进行全速爬取,其实思路就是维护一个动态变化的爬取队列,没产生一个新的 task ,就将其放入爬取队列中,,有专门的爬取消费者从此队列中获取 task 并执行,能做到最大并发量的前提下充分利用等待时间进行额外的爬取处理

但上面的实现思路整体比较繁琐,需要设计爬取队列,回调函数,消费者机制等等,需要实现的功能比较多。由于我们刚刚接触 aiohttp 的基本用法,本次也是主要了解 aiohttp 的实战应用,因此这里稍微将爬取案例网站的实现过程简化一下

我们将爬取逻辑拆分成两部分,第一部分为爬取列表页,第二部分为爬取详情页。因为异步的关键点在于并发执行,所以可以将爬取拆分成如下两个阶段

第一阶段是异步爬取所有列表页,我们可以将所有爬取列表页的任务集合在一起,并将其声明为有 task 组成的列表,进行异步爬取

第二阶段是拿到上一步列表页的所有内容解析,将所有图书的 id 信息组合成所有详情页爬取任务集合,并将其声明为 task 组成的列表, 进行异步爬取, 同时爬取结果也以异步方式存储到 MongoDB 里面

因为这两个阶段在拆分之后需要串行执行,所以可能无法达到协程的最佳调度方式和资源利用情况,但也差不了很多。这个实现思路比较简单清晰,代码实现起来也比较容易,能够为我们快速的了解 aiohttp 的基本用法

基本配置

首先配置一些基本变量,引入一些基本的库

import asyncio
import aiohttp
import logginglogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')INDEX_URL = 'https://spa5.scrape.center/api/book/?limit=18&offset={offset}'
DETALL_URL = 'https://spa5.scrape.center/api/book/{id}'
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5

这里我们导入了 asyncio , aiohttp , logging 这3个库, 然后定义了 logging 的基本配置,接着定义了 URL , 爬取页码数量 PAGE_NUMBER , 并发量 CONCURRENCY 等信息

爬取列表页

第一阶段爬取列表页,先定义一个通用的爬取方法

async def scrape_api(url):async with semaphore:try:logging.info(f'Scraping {url}')async with session.get(url) as response:return await response.json()except aiohttp.ClientError:logging.error(f'Error occurred while scraping {url}', exc_info=True)

这里声明了一个信号量,用来控制最大并发量

接着定义了 scrape_api 方法,接收一个参数 url , 发方法首先使用 async with 语句引入信号量作为上下文,接着调用 session 的 get 方法请求这个 url , 然后返回响应的 JSON 格式的结果。另外这里还进行了异常处理,捕获了 ClientError , 如果出现错误,会输出异常信息

然后爬取列表页,实现代码如下:

async def scrape_index(page):url = INDEX_URL.format(offset=(page-1) * PAGE_SIZE)return await scrape_api(url)

这里定义了 scrape_index 方法用于爬取列表页, 它接收一个参数 page 。随后构造了一个列表页的 URL , 将其传给 scrape_api 方法即可。 这里注意, 方法同样需要用 async  修饰, 调用的 scrape_api方法前面要加 await , 因为 scrape_api 调用之后本身会返回一个协程对象, 另外, 由于 scrape_api 的返回结果就是 JSON 格式, 因此这个结果已经我们想要爬取的信息,不需要在额外解析了

接着我们定义 main 方法 将上面的方法 串联起来使用

async def main():global sessionsession = aiohttp.ClientSession()scrape_index_tasks = [asyncio.ensure_future(scrape_index(page)) for page in range(1, PAGE_NUMBER + 1)]results = await asyncio.gather(*scrape_index_tasks)logging.info('results %s', json.dumps(results, ensure_ascii=False, indent=2))if __name__ == '__main__':asyncio.run(main())

这里首先声明了 session 对象, 即最初声明的全局变量。 这样的话就不需要在各个方法里都传递 session 了, 实现起来比较简单

接着定义了 scrape_index_tasks , 这就是用于爬取列表页的所有 task 组成的列表。然后调用 asyncio 的 gather 方法, 并将 task 列表传入其参数,将结果赋值为 results , 它是由所有 task 返回结果组成的列表

调用 main 方法, 使用事件循环启动该 main 方法对应的协程即可

2024-07-26 07:09:05,854 - root - INFO - Scraping https://spa5.scrape.center/api/book/?limit=18&offset=0
2024-07-26 07:09:05,917 - root - INFO - Scraping https://spa5.scrape.center/api/book/?limit=18&offset=18
2024-07-26 07:09:05,918 - root - INFO - Scraping https://spa5.scrape.center/api/book/?limit=18&offset=36
2024-07-26 07:09:05,918 - root - INFO - Scraping https://spa5.scrape.center/api/book/?limit=18&offset=54
2024-07-26 07:09:05,918 - root - INFO - Scraping https://spa5.scrape.center/api/book/?limit=18&offset=72
2024-07-26 07:09:06,639 - root - INFO - Scraping https://spa5.scrape.center/api/book/?limit=18&offset=90
2024-07-26 07:09:33,811 - root - INFO - results [
  {
    "count": 9040,
    "results": [
      {
        "id": "7952978",
        "name": "Wonder",

可以看到,这里就开始异步爬取了, 并发量是由我们控制的, 目前为  5 ,当然,也可以进一步调高这个数字,在网站能够承受的情况下, 爬取速度会进一步提升

最后 results 就是就是爬取所有列表得到的结果

爬取详情页

第二阶段就是爬取详情页,并保存数据。由于每个详情页分别对应一本书,每本书都需要一个 ID 作为唯一标识,而这个 ID 又正好存在 results 里面,所以下面我们需要将所有详情页的ID 获取出来

在 main 方法里增加 results 的解析代码

async def main():global sessionsession = aiohttp.ClientSession()scrape_index_tasks = [asyncio.ensure_future(scrape_index(page)) for page in range(1, PAGE_NUMBER + 1)]results = await asyncio.gather(*scrape_index_tasks)ids = []for result in results:if not result: continuefor item in result.get('results'):ids.append(item.get('id'))

这样 ids 就是所有书的 id 了,然后我们用所有的 id 构造所有详情页对应的 task , 进行异步爬取即可。

这里再定义两个方法,用于爬取详情页和保存数据

from motor.motor_asyncio import AsyncIOMotorClientMONGO_CONNECTION_STRING = 'mongodb://localhost:27017/'
MONGO_DB_NAME = 'books'
MONGO_COLLECTION_NAME = 'books'client = AsyncIOMotorClient(MONGO_CONNECTION_STRING)
db = client[MONGO_DB_NAME]
collection = db[MONGO_COLLECTION_NAME]async def save_data(data):logging.info(f'Saving data to {data}')if data:return await collection.update_one({'id': data.get('id')},{'$set': data},upsert=True)async def scrape_datail(id):url = DETALL_URL.format(id=id)data = await scrape_api(url)await save_data((data))

这里定义了 scrape_datail 方法用于爬取详情页数据, 并调用 save_data 方法保存数据。 save_data 方法可以将数据保存到 MongoDB 里面

这里我们用到支持异步的 MongoDB 存储库 motor 。 motor 的连接声明 和 pymongo 是类似的, 保存数据的调用方法也基本一致,不过整个换成了异步方法

接着在 main 方法里面增加对 scrape_detail 方法调用即可爬取详情页, 实现如下

async def main():global sessionsession = aiohttp.ClientSession()scrape_index_tasks = [asyncio.ensure_future(scrape_index(page)) for page in range(1, PAGE_NUMBER + 1)]results = await asyncio.gather(*scrape_index_tasks)ids = []for result in results:if not result: continuefor item in result.get('results'):ids.append(item.get('id'))scrape_detail_tasks = [asyncio.ensure_future(scrape_datail(id)) for id in ids]await asyncio.wait(scrape_detail_tasks)await session.close()if __name__ == '__main__':asyncio.run(main())

这里先声明了 scrape_detail_tasks , 这是由爬取详情页的 task 组成的列表, 接着调用了 asyncio 的 wait 方法, 并将声明的列表传其中, 调用执行此方法即可 获取详情页。当然这也可以使用 tather 方法,效果一样,只不过返回的结果略有差异,全部执行完毕后, 调用 close 方法关闭session

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/50593.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3 使用Mock

官网: http://mockjs.com/ 安装 npm install mockjs -Dsteps1: main.js 文件引入 import /api/mock.jssteps2: src/api/mock.js import Mock from mockjs import homeApi from ./mockData/home /*** 1.拦截的路径:mock拦截了正常NetWork/网络请求,数据正常响应* 2.方法* …

乾坤: 微前端项目切换时样式闪动(从无样式变为正常样式需要等 css chunk 文件加载完成, 加载延时受网速影响)

背景: 点击基座项目页面左侧目录, 进入微前端子项目页面, 会有短暂的样式未加载效果一闪而过, 造成页面闪烁或更严重的其他样式错位问题 定位: 同事查了 qiankun git 项目的 issue: https://github.com/umijs/qiankun/issues/219 , 找到解决方案 解决: 项目 webpack 打包配…

【苍穹】完美解决由于nginx更换端口号导致无法使用Websocket

一、报错信息 进行到websocket开发的过程中,遇到了前端报错,无法连接的提示: 经过F12排查很明显是服务端和客户端并没有连接成功。这里就涉及到之前的坑,现在需要填上了。 二、报错原因和推导 应该还记得刚开苍穹的第一天配置前…

OWASP top 10之XSS和csrf

2021年top 10 A01:失效的访问控制(越权漏洞) XSS:Cross-Site Scripting 跨站脚本,在2021年top 10漏洞中被合并为注入类漏洞 属于:前端漏洞 分类:反射型、存储型、DOM型 反射型 将恶意js代…

什么是 Solidity

Solidity 是一种面向对象的用于编写智能合同的语言。 智能合同是存储在区块链中的程序。 它们指定有关数字资产传输的规则和行为。 可以使用 Solidity 为 Ethereum 区块链平台对智能合同进行编程。 智能合同包含状态和可编程逻辑。 智能合同通过事务执行函数。 因此&#xff0…

Python 中函数嵌套定义与调用

Python 中函数嵌套定义与调用 在 Python 中,函数可以在另一个函数内部定义和调用。这种技术称为嵌套函数。嵌套函数可以用来创建封装和作用域限制,帮助保持代码的组织性和模块化。 函数嵌套定义与调用 1. 函数嵌套定义 函数嵌套定义是指在一个函数内…

Java集合——Array、ArrayList、LinkedList

1. ArrayList和Array的区别 1. 大小和自动扩容 Array:创建时指定大小,大小固定。若数组被创建,其大小不能更改 ArrayList:动态数组实现,可以动态增长或缩小。在不断添加元素时,ArrayList会自动进行扩容 …

MySQL大框架总结

1.DDL,DML,DQL,DCL的区别 (由于DCL是关乎用户的,以下内容重点讲述数据库,表与数据的操作,所以对DCL不详细赘述) DDL DML DQL DCL 中文/英文 数据库定义语言 data definition language 数据库操作语言 data mani…

为什么多数大数据治理项目都是失败的?Gartner调查失败率超过90%

引言:随着数据规模的爆发式增长、数据价值的凸显以及数据治理的必要性。在政策的推动、市场需求的拉动以及技术进步的支撑下,大数据治理正成为推动企业数字化转型、提升数据价值的重要手段。企业希望通过大数据治理提升数据利用率和数据价值,…

深入理解Prompt工程

前言:因为大模型的流行,衍生出了一个小领域“Prompt工程”,不知道大家会不会跟小编一样,不就是写提示吗,这有什么难的,不过大家还是不要小瞧了Prompt工程,现在很多大模型把会“Prompt工程”作为…

19 Python常用内置函数——range()

range() 是 Python 开发中非常常用的一个内置函数。该函数返回具有惰性求值特点的 range 对象,其中包含左闭右开区间 [start, end) 内以 step 为步长的整数。 参数 start 默认为 0,step 默认为 1。 print(range(5)) print(list(range(5))) print(list(r…

2024中国大学生算法设计超级联赛(2)

🚀欢迎来到本文🚀 🍉个人简介:陈童学哦,彩笔ACMer一枚。 🏀所属专栏:杭电多校集训 本文用于记录回顾总结解题思路便于加深理解。 📢📢📢传送门 A - 鸡爪解题思…

vue zip文件下载请求封装与使用

axios封装(重点是响应拦截) 这里把响应超时时间注释是文件下载接口返回需要较长时间 import axios from axios import {ElMessageBox} from "element-plus"; import router from "/router";const service axios.create({baseURL: …

华为高品质万兆园区体验保障技术白皮书

华为高品质万兆园区体验保障技术白皮书 - 华为企业业务 re 音视频会议已经成为企业办公的核心应用系统,尤其是高层会议的质量对企业的运营效率有很大影响,相 关的体验问题也是员工投诉的重灾区。但不同于传统的网络通断类问题,体验类问题涉及的范围大、…

vue3 暴露网络地址,以及修改端口号

一般情况下这里的地址是隐藏的 这里加上 --host 可以暴露网络地址,再加上 --8080 就可以将端口号修改为8080(修改为其它的当然也可以)

[AI科普] 2024人工智能指数报告解读

1. 前言: 本周分享了,关于斯坦福以人为本人工智能研究室发布的,《2024年人工智能指数报告》的解读,主要是结合了快刀青衣哥的一些解读。在此基础上,又增加了一些国内外的AI发展现状,欧洲杯中体现的人工智能…

草图也能秒变完整画稿?三星 Galaxy Z Fold6 、Flip6硬件升级

在科技的不断进步中,智能手机行业的竞争愈发激烈,各大厂商纷纷推出创新产品以吸引消费者。 最近,三星在 Galaxy Unpacked 发布会上就带来了 Galaxy Z Fold6 和 Flip6 两款手机新品,这两款设备不仅在硬件上有所突破,更…

ubuntu在命令行输出里查找内容,dmesg

直接执行查看日志指令会出来很多页。dmesg为开机日志信息。记录了开机时硬件的过程 sudo dmesg 执行结果: 可以用竖号“|”,在前一条命令返回的内容进行查找。下图为查找bluetooth sudo dmesg |grep -i bluetooth

【SpringCloud】 微服务分布式环境下的事务问题,seata大合集

目录 微服务分布式环境下的事务问题 分布式事务 本地事务 BASE理论与强弱一致性 BASE理论 强弱一致性 常见分布式事务解决方案 - 2PC 常见分布式事务解决方案 - TCC 常见分布式事务解决方案 - 最大努力通知 常见分布式事务解决方案 - 最终一致性 Seata介绍与术语 Seata…