asyncio和 aiohttp

文章目录

      • asyncio和 aiohttp
      • 3.8版本+ 特性
      • aiohttp
      • 案例
      • 优化方案

asyncio和 aiohttp

asyncio即Asynchronous I/O是python一个用来处理并发(concurrent)事件的包,是很多python异步架构的基础,多用于处理高并发网络请求方面的问题。

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。

asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

import asyncioasync def task(i):print(f"task {i} start")await asyncio.sleep(1)print(f"task {i} end")# 创建事件循环对象
loop = asyncio.get_event_loop()
# 直接将协程对象加入时间循环中
tasks = [task(1), task(2)]
# asyncio.wait:将协程任务进行收集,功能类似后面的asyncio.gather
# run_until_complete阻塞调用,直到协程全部运行结束才返回
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

task: 任务,对协程对象的进一步封装,包含任务的各个状态;asyncio.Task是Future的一个子类,用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数loop.create_task() 或 asyncio.ensure_future()创建。

import asyncio, timeasync def work(i, n):  # 使用async关键字定义异步函数print('任务{}等待: {}秒'.format(i, n))await asyncio.sleep(n)  # 休眠一段时间print('任务{}在{}秒后返回结束运行'.format(i, n))return i + nstart_time = time.time()  # 开始时间tasks = [asyncio.ensure_future(work(1, 1)),asyncio.ensure_future(work(2, 2)),asyncio.ensure_future(work(3, 3))]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()print('运行时间: ', time.time() - start_time)
for task in tasks:print('任务执行结果: ', task.result())

3.8版本+ 特性

async.run() 运行协程
async.create_task()创建task
async.gather()获取返回值

import asyncio, timeasync def work(i, n):  # 使用async关键字定义异步函数print('任务{}等待: {}秒'.format(i, n))await asyncio.sleep(n)  # 休眠一段时间print('任务{}在{}秒后返回结束运行'.format(i, n))return i + ntasks = []
async def main():global taskstasks = [asyncio.create_task(work(1, 1)),asyncio.create_task(work(2, 2)),asyncio.create_task(work(3, 3))]await asyncio.wait(tasks) # 阻塞start_time = time.time()  # 开始时间
asyncio.run(main())
print('运行时间: ', time.time() - start_time)
for task in tasks:print('任务执行结果: ', task.result())

asyncio.create_task() 函数在 Python 3.7 中被加入。

asyncio.gather方法

# 用gather()收集返回值import asyncio, timeasync def work(i, n):  # 使用async关键字定义异步函数print('任务{}等待: {}秒'.format(i, n))await asyncio.sleep(n)  # 休眠一段时间print('任务{}在{}秒后返回结束运行'.format(i, n))return i + nasync def main():tasks = [asyncio.create_task(work(1, 1)),asyncio.create_task(work(2, 2)),asyncio.create_task(work(3, 3))]# 将task作为参数传入gather,等异步任务都结束后返回结果列表response = await asyncio.gather(tasks[0], tasks[1], tasks[2])print("异步任务结果:", response)start_time = time.time()  # 开始时间asyncio.run(main())print('运行时间: ', time.time() - start_time)

aiohttp

爬虫最重要的模块requests,但它是阻塞式的发起请求,每次请求发起后需阻塞等待其返回响应,不能做其他的事情。本文要介绍的aiohttp可以理解成是和requests对应Python异步网络请求库,它是基于 asyncio 的异步模块,可用于实现异步爬虫,有点就是更快于 requests 的同步爬虫。安装方式,pip install aiohttp。

aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于asyncio的异步库。asyncio可以实现单线程并发IO操作,其实现了TCP、UDP、SSL等协议,aiohttp就是基于asyncio实现的http框架, 使用方式如下。

import aiohttp
import asyncioasync def main():async with aiohttp.ClientSession() as session:async with session.get("http://httpbin.org/headers") as response:print(await response.text())asyncio.run(main())

案例

import asyncio
import os
import aiohttp
import time
from utils.aiorequests import aiorequest
from lxml import etreeheaders = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
}url = "https://www.pkdoutu.com/photo/list/"base_url = "https://www.xr02.vip/"async def get_home_url():async with aiohttp.ClientSession() as session:async with session.get(url, headers=headers, ssl=False) as resp:res = await resp.content.read()selector = etree.HTML(res)urls = selector.xpath('//ul/li[@class="i_list list_n2"]/a/@href')return map(lambda i: base_url+i, urls)async def get_page_url(urls):async with aiohttp.ClientSession() as session:async with session.get(urls, headers=headers, ssl=False) as resp:res = await resp.content.read()selector = etree.HTML(res)page_urls = selector.xpath('//div[@class="page"]/a/@href')return map(lambda i: base_url+i, set(page_urls))async def get_img_url(urls):async with aiohttp.ClientSession() as session:async with session.get(urls, headers=headers, ssl=False) as resp:res = await resp.content.read()selector = etree.HTML(res)name = selector.xpath("//h1/text()")[0].replace("[XiuRen秀人网]",'')img_urls = selector.xpath('//p/img/@src')return name, map(lambda i: base_url+i, img_urls)async def download_img(urls, base_name):name = os.path.basename(urls)name = base_name + '_' + nametry:async with aiohttp.ClientSession() as session:async with session.get(urls, headers=headers, ssl=False) as resp:res = await resp.content.read()with open(f"./imgs/{name}","wb") as f:f.write(res)print(f"url: {urls} 下载成功,存储文件为{name}")except:print(f"url: {urls} 下载失败")return "success"async def main():tasks_1 = [asyncio.create_task(get_page_url(i)) for i in await get_home_url()]result_1 = await asyncio.gather(*tasks_1)result_list = []for i in result_1: result_list.extend(list(i))tasks_2 = [asyncio.create_task(get_img_url(i)) for i in result_list]result_2 = await asyncio.gather(*tasks_2)tasks_3 = []for name, img_url in result_2:tasks_3.extend(asyncio.create_task(download_img(url, name)) for url in img_url)await asyncio.gather(*tasks_3)if __name__ == '__main__':if not os.path.isdir("./imgs"):os.mkdir("./imgs")start = time.time()asyncio.run(main())print(time.time()-start)

通过这个案例,可以看到一个问题,那就是 aiohttp的使用,每次都需要写一堆重复代码,并且整个代码结构看起来复杂,作为一个高级开发,必须要会做的就是减少代码重复编写,要将其模块化,封装起来

优化方案

aiorequest.py

class AioRequest:async def request(self, method: str, url: str, data: Union[Dict, bytes, None] = None, **kwargs: Any) -> Any:async with aiohttp.ClientSession() as session:async with session.request(method, url, ssl=False, data=data, **kwargs) as response:if response.status != 200:raise Exception(f"{method.upper()} request failed with status {response.status}")# return await handler(await response.content.read()# return 这里必须带上await,但不支持 await ClientResponse 对象直接返回 必须要处理响应数据# 根据内容类型处理响应体content_type = response.headers.get('Content-Type')if content_type and ('image' in content_type or 'video' in content_type):return await response.read()  # 返回图片或视频的二进制数据elif 'application/json' in content_type:return await response.json()  # 假设响应是JSON格式else:return await response.text()  # 读取文本内容async def get(self, url: str, **kwargs: Any):return await self.request("GET", url, **kwargs)async def post(self, url: str, data: Union[Dict, bytes], **kwargs: Any):return await self.request("POST", url, data=data, **kwargs)# 处理大文件async def save_binary_content(self, url: str, file_path: str, headers: Dict[str, str] = None, **kwargs: Any):async with aiohttp.ClientSession(headers=headers) as session:async with session.get(url, ssl=False, **kwargs) as response:if response.status != 200:raise Exception(f"GET request failed with status {response.status}")with open(file_path, 'wb') as f:while True:chunk = await response.content.read(1024)  # 每次读取1024字节if not chunk:breakf.write(chunk)aiorequest = AioRequest()  # 减少对象的重复创建消耗内存

使用aiorequest 后,代码就简洁明了多了,

import asyncio
import os
import time
from utils.aiorequests import aiorequest
from lxml import etreeheaders = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
}base_url = "https://www.xr02.vip/"img_urls_dict = dict()async def get_home_url():res  = await aiorequest.get(base_url)selector = etree.HTML(res)urls = selector.xpath('//ul/li[@class="i_list list_n2"]/a/@href')return map(lambda i: base_url+i, urls)async def get_page_url(urls):res = await aiorequest.get(urls)selector = etree.HTML(res)await get_img_url(res)page_urls = selector.xpath('//div[@class="page"]/a/@href')page_urls = list(map(lambda i: base_url + i, set(page_urls)))page_urls.remove(urls)return page_urlsasync def get_img_url(res):selector = etree.HTML(res)name = selector.xpath("//h1/text()")[0].replace("[XiuRen秀人网]",'')img_list = selector.xpath('//p/img/@src')img_list = map(lambda i: base_url+i, img_list)if name not in img_urls_dict:img_urls_dict.setdefault(name, list(img_list))else:img_urls_dict[name].extend(list(img_list))async def get_imgs_url(urls):res = await aiorequest.get(urls)await get_img_url(res)async def download_img(urls, base_name):name = os.path.basename(urls)name = base_name + '_' + nametry:res = await aiorequest.get(urls)with open(f"./imgs_2/{name}","wb") as f:f.write(res)print(f"url: {urls} 下载成功,存储文件为{name}")except:print(f"url: {urls} 下载失败")return "success"async def main():tasks_1 = [asyncio.create_task(get_page_url(i)) for i in await get_home_url()]result_1 = await asyncio.gather(*tasks_1)result_list = []for i in result_1: result_list.extend(i)tasks_2 = [asyncio.create_task(get_imgs_url(i)) for i in result_list]await asyncio.wait(tasks_2)tasks_3 = []for name, img_url in img_urls_dict.items():tasks_3.extend(asyncio.create_task(download_img(url, name)) for url in img_url)await asyncio.wait(tasks_3)if __name__ == '__main__':if not os.path.isdir("./imgs_2"):os.mkdir("./imgs_2")start = time.time()asyncio.run(main())print(time.time()-start)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/777080.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JUC并发编程之常用方法

sleep() public void testSleepAndYield() {Thread t1 new Thread(() -> {try {log.debug("t1-sleep...");Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, "t1");log.debug("t1 start 前的状态&#…

数学建模智能算法

模拟退火算法 %生成初始解,求目标函数f(x)x1^2x2^28在x1^2-x2>0;-x1-x2^220约束下的最小值问题 sol_new21;%(1)解空间(初始解) sol_new12-sol_new2^2; sol_current1 sol_new1; sol_best1 sol_new1; so…

Appium设备交互API

设备交互API指的是操作设备系统中的一些固有功能,而非被测程序的功能,例如模拟来电,模拟发送短信,设置网络,切换横竖屏,APP操作,打开通知栏,录屏等。 模拟来电 make_gsm_call(phon…

Douyin视频详情数据API接口(视频详情,评论)

抖音官方并没有直接提供公开的视频详情数据采集API接口给普通用户或第三方开发者。抖音的数据采集通常受到严格的限制,以保护用户隐私和平台安全。 请求示例,API接口接入Anzexi58 如果您需要获取抖音视频详情数据,包括评论、点赞等&#xff…

软考 - 系统架构设计师 - 关系模型的完整性规则

前言 关系模型的完整性规则是一组用于确保关系数据库中数据的完整性和一致性的规则。这些规则定义了在关系数据库中如何存储、更新和查询数据,以保证数据的准确性和一致性。 详情 关系模型的完整性规则主要包括以下三类: 实体完整性规则 这是确保每个…

C++基本语法

C是如何工作的 文章目录 C是如何工作的1、新建Hello World工程1.1使用Visual Studio新建项目1.2 HelloWorld1.2.1 命名空间1.2.2 输出输出 1.3 注释1.4 函数1.4.1 使用有返回的函数1.4.2 自定义函数 1、新建Hello World工程 1.1使用Visual Studio新建项目 按照下面的图片&…

Linux 性能分析工具 perf 的使用指南

什么是perf,可以用来干什么 perf 是 Linux 内核的性能分析工具集,它可以用来监控和分析系统和应用程序的性能。perf 提供了一系列功能强大的子命令,可以帮助开发者和系统管理员: 监控 CPU 使用率:识别最消耗 CPU 的代…

x86的内存分段机制

8086 是 Intel 公司第一款 16 位处理器,诞生于 1978 年,所以说它很古老。 一.8086 的通用寄存器 8086 处理器内部共有 8 个 16 位的通用处理器,分别被命名为 AX、 BX、 CX、 DX、 SI、 DI、 BP、 SP。如下图所示。 “通用”的意思是…

利用python搭建临时文件传输服务

场景 如果想从一台服务器上传输文件又多种方法,其中常见的是利用scp进行传输,但是需要知道服务器的账号密码才能进行传输,但有时候我们并不知道账号密码,这个时候我们就可以通过python -m SimpleHTTPServer 命令进行传输文件 启…

C语言例4-14:从键盘输入小写字母转换成大写字母并输出。

代码如下&#xff1a; //从键盘输入小写字母转换成大写字母并输出。 #include<stdio.h> int main(void) {char c1,c2;printf("输入小写字母&#xff1a; \n");c1 getchar(); //从键盘输入一个字符putchar(c1);printf(",%d\n",c1);c2 c1-32; …

Git基础(23):Git分支合并实战保姆式流程

文章目录 前言准备正常分支合并1. 创建两个不冲突分支2. 将dev合并到test 冲突分支合并1. 制造分支冲突2. 冲突合并 前言 Git分支合并操作 准备 这里先在Gitee创建了一个空仓库&#xff0c;方便远程查看内容。 正常分支合并 1. 创建两个不冲突分支 &#xff08;1&#xf…

C++ 控制语句(二)

一 break continue和goto语句 1 break语句 在switch语句中&#xff0c;分隔case子句&#xff0c;跳出switch语句。 在循环语句中可以立即终止循环语句的执行。 2 continue语句 功能:在一次循环过程中,跳过continue语句以下的语句,直 接进入下一次循环操作。 3 goto语句 …

Poetry是一个现代的Python包管理工具

Poetry是一个现代的Python包管理工具&#xff0c;它旨在简化包的声明、管理和发布过程。Poetry解决了Python项目中的一些常见问题&#xff0c;如依赖管理、包版本控制以及项目的打包和发布。它被设计为一站式的解决方案&#xff0c;提供了一系列的特性来处理Python包的生命周期…

Linux文件系统和日志管理

文件系统的组成 Linux 文件系统会为每个文件分配两个数据结构&#xff1a;索引节点&#xff08;index node&#xff09; 和 目录项&#xff08;directory entry&#xff09;&#xff0c;它们主要用来记录文件的元信息和目录层次结构。 索引节点&#xff0c;也就是 inode&#…

Rustdesk客户端编译后固定密码不稳定时好时坏

环境&#xff1a; rustdesk1.19 问题描述&#xff1a; Rustdesk客户端编译后固定密码不稳定时好时坏 解决方案&#xff1a; 出现固定密码不稳定的问题可能有多种原因&#xff0c;下面是一些可能的解决方法&#xff1a; 密码强度&#xff1a;确保所设置的固定密码足够强大…

智能写作利器ChatGPT:提升论文写作效率

ChatGPT无限次数:点击直达 智能写作利器ChatGPT&#xff1a;提升论文写作效率 在当今信息爆炸的时代&#xff0c;快速高效地撰写论文对于科研工作者来说至关重要。智能写作工具ChatGPT的出现为我们提供了强大的支持&#xff0c;它不仅能够提升论文写作的效率&#xff0c;还能够…

Doris删除数据工具

文章目录 概要整体架构流程技术名词解释技术细节小结 概要 对于Doris的 Unique 模型&#xff0c;在删除数据的时候只能根据key删除&#xff0c;如果使用其他条件就会报错 整体架构流程 先获得表的key&#xff0c;然后在通过输入的条件获得key的所有值&#xff0c;最后通过key的…

王道c语言-二叉树前序、中序、后序、层次遍历

main.cpp #include "function.h"//abdhiejcfg 前序遍历深度优先遍历 abdhiejcfg void PreOrder(BiTree p) {if (p ! NULL) {printf("%c ", p->c);//等价于putchar(p->c);等价于visit函数伪代码PreOrder(p->lchild);PreOrder(p->rchild);} }//…

【数据结构】顺序表的实现——静态分配

&#x1f388;个人主页&#xff1a;豌豆射手^ &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;数据结构 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进…

【微服务篇】深入理解微服务可观测性原理(Log,Metric,Trace)

可观测性 微服务的可观测性是指通过收集、分析和监控微服务架构中各个组件的数据来理解其行为和性能的能力。这对于确保系统的健康、响应性和安全至关重要。可观测性主要从日志&#xff08;Log&#xff09;、指标&#xff08;Metric&#xff09;和追踪&#xff08;Trace&#…