由浅入深走进Python异步编程【asyncio上层api】(含代码实例讲解 || create_task,gather,wait,wait_for)

写在前面

从底层到第三方库,全面讲解python的异步编程。这节讲述的是asyncio实现异步的上层api,详细了解需要配合上下一节观看哦。纯干货,无概念,代码实例讲解。

本系列有6章左右,点击头像或者专栏查看更多内容,陆续更新,欢迎关注。

部分资料来源及参考链接:
https://www.bilibili.com/video/BV1Li4y1j7RY/
https://docs.python.org/zh-cn/3.7/library/asyncio-eventloop.html

直接await协程

考虑下面一段代码

import time
import asyncioasync def one_data():print('正在暂停1号靓仔')await asyncio.sleep(3)#沉睡3秒print('正在恢复1号靓仔')async def two_data():print('正在暂停2号靓仔')await asyncio.sleep(1)#沉睡1秒print('正在恢复2号靓仔')'事件循环判断两个协程的独立的,变成了同步状态'
async def get_data():await one_data()#等待3秒await two_data()#等待1秒start_time = time.time()#程序启动时间asyncio.run(get_data())print('程序总耗时:{}'.format(time.time() - start_time))

在这段代码中,我们仅使用了协程(coroutine)这个概念,单单使用协程是无法完成异步编程的。因为它不能被调度,虽然都写成了协程的形式,但是不能同时加入一个事件循环,变成了两个协程,分别完成,又变回同步了。

上述代码的结果就是4s多一点。表明了协程不能被直接调度,局限性很高,没有什么方法。所以一定要引入Task的概念,实现多任务并发从而达到异步的效果。

task与协程混用

考虑以下代码

import time
import asyncioasync def one_data():print('正在暂停1号靓仔')await asyncio.sleep(3)#沉睡3秒print('正在恢复1号靓仔')async def two_data():print('正在暂停2号靓仔')await asyncio.sleep(1)#沉睡1秒print('正在恢复2号靓仔')async def one_event_loop():print('\n***我是第1个event_loop***')loop = asyncio.get_event_loop()#获取当前事件循环task = loop.create_task(one_data())#创建Taskawait two_data()#等待1秒await task#等待3秒start_time = time.time()
asyncio.run(one_event_loop())
print('耗时:{}'.format(time.time() - start_time))

执行结果为:
在这里插入图片描述
这表明,是可以进行混用的,但是只有协程在task前面,才能被识别到,进行并发。如果先等待task,就无法识别成功了,会创造出独立的协程空间。

但是一般情况下不会混用,了解一下

create_task

获取事件循环并创建,是python3.5及之前的写法,现在可以直接使用create_task进行创建,在3.11的版本下,源码是这样的:

def create_task(coro, *, name=None, context=None):"""Schedule the execution of a coroutine object in a spawn task.Return a Task object."""loop = events.get_running_loop()if context is None:# Use legacy API if context is not neededtask = loop.create_task(coro)else:task = loop.create_task(coro, context=context)_set_task_name(task, name)return task

关键的get_running_loop()官方释义是这样的:
链接:https://docs.python.org/zh-cn/3.7/library/asyncio-eventloop.html#asyncio.get_running_loop

返回当前 OS 线程中正在运行的事件循环。

如果没有正在运行的事件循环则会引发 RuntimeError。 此函数只能由协程或回调来调用。

那么,这两个写法有区别吗?

await asyncio.create_task(one_data())
await asyncio.create_task(two_data())

task1 = asyncio.create_task(one_data())
task2 = asyncio.create_task(two_data())await task1
await task2

答案当然是有的。

前者的写法调用create_task的时候返回task,然后迅速await,task开始运行,已经有了自己的上下文。下面再次运行时,上下文仍然是None,又会重新创建Task。无法完成并发需求。

后者的写法,第一次使用create_task后,没有开始执行等待,仍然存在于线程中,第二次使用时,就可以识别出同一上下文,加入同一事件循环。开始await之后,就可以达到并发效果,先后顺序也没有什么关系,因为都在一个事件循环中。

小总结

基于上述内容,可以得到:

  1. 协程不可以调度,不能达到并发效果。
  2. Task与协程混用,必须将协程写在Task之前,可以达到并发效果,但不推荐。
  3. Task调度需要保证在同一事件循环中才可完成并发。

ensure_future

或许你会问,为什么不讲future呢?

future类属于基类,会暴露很多api ,但是,在这里也可以了解一下。使用ensure_future就可以创建了。

同时这个方法也可以返回Task,在函数内部有一个判断,如果传入的是coroutine,task就返回task,传入的是future就会返回future。

如果想深入了解:
https://docs.python.org/zh-cn/3.7/library/asyncio-future.html#asyncio.ensure_future

gather

官方解释在这里:https://docs.python.org/zh-cn/3.7/library/asyncio-task.html#asyncio.gather

它是这样说的:

并发 运行 aws 序列中的 可等待对象。
如果 aws 中的某个可等待对象为协程,它将自动作为一个任务加入日程。
(aws即awaitable)
如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

那么向这个方法传入task,coroutine都是可以的。这个方法产生的返回值就是一个有序列表的形式。考虑一下代码:

import time
import asyncioasync def one_data():print('正在暂停1号靓仔')await asyncio.sleep(3)print('正在恢复1号靓仔')return '我是1号靓仔'async def two_data():print('正在暂停2号靓仔')await asyncio.sleep(1)print('正在恢复2号靓仔')return '我是2号靓仔'async def get_data():print('\n***我是第1个gather***')# print(await asyncio.gather(one_data(), two_data()))'使用Task写法'task1 = asyncio.create_task(one_data())task2 = asyncio.create_task(two_data())print(await asyncio.gather(task1, task2))start_time = time.time()asyncio.run(get_data())print('耗时:{}'.format(time.time() - start_time))

运行结果:
在这里插入图片描述
这说明,顺利产生了并发,进入了同一个事件循环,返回值会以有序列表存储,顺序为传入aws的顺序。

gather当然也提供了错误处理的方式。在常规情况下,为停止模式,产生错误后,会立刻发送异常信号并在gather中传播,代码终止。例如这样:
在这里插入图片描述

awaitable asyncio.gather(*aws, return_exceptions=False)
如果 return_exceptions 为 True,异常会和成功的结果一样处理,并聚合至结果列表。

更改示例代码为这样:

import time
import asyncioasync def one_data():print('正在暂停1号靓仔')await asyncio.sleep(3)print('正在恢复1号靓仔')return '我是1号靓仔'async def two_data():print('正在暂停2号靓仔')await asyncio.sleep(1)1/0print('正在恢复2号靓仔')return '我是2号靓仔'async def get_data():print('\n***我是第1个gather***')# print(await asyncio.gather(one_data(), two_data()))'使用Task写法'task1 = asyncio.create_task(one_data())task2 = asyncio.create_task(two_data())print(await asyncio.gather(task1, task2,return_exceptions=True))start_time = time.time()asyncio.run(get_data())print('耗时:{}'.format(time.time() - start_time))

执行结果:
在这里插入图片描述

捕获gather异常值及任务取消

任务取消,使用task.cancel(),可以主动发出取消信号,引发异常。异常名为CancelledError。同样可以根据设置return_exceptions=True来选择对异常进行停止还是忽略。

大概就像这样:

task1 = asyncio.create_task(one_data())
task2 = asyncio.create_task(two_data())
task1.cancel()#取消print(await asyncio.gather(task1, task2,return_exceptions=True))

执行结果:
在这里插入图片描述
推荐写法为try-except结构:

try:asyncio.run(get_data())
except asyncio.CancelledError as e:print('\n异常值:{}'.format(e))print('触发了CancelledError异常')

3.11下的新版写法:TaskGroup

gather已经是较为上层的api,在3.11的版本下,有了更加简洁和优雅的写法。在前面gather的使用中,需要先create_task,再放入 gather,再await。TaskGroup可以把这几个过程再次整合。示例是这样的:

async def main():async with asyncio.TaskGroup() as tg:task1 = tg.create_task(some_coro(...))task2 = tg.create_task(another_coro(...))print("Both tasks have completed now.")

可以快速而优雅的完成并发。但是需要较高版本,这里不过多介绍,可以看下面的官方链接

官方链接:
https://docs.python.org/zh-cn/3.11/library/asyncio-task.html#asyncio.TaskGroup

wait

官方链接:
https://docs.python.org/zh-cn/3.7/library/asyncio-task.html#asyncio.wait
在3.11官方文档的解释是这样的:
在这里插入图片描述
注意注意:如果你现在使用的是3.11的版本,现在已经不能直接传入协程对象,那么你需要先转换为Task对象。

显然,通过这个方法,你可以对并发的退出条件进行颗粒化控制。不会局限于完成所有可等待任务后退出。阅读可知,通过return_when来对退出条件进行控制,默认为ALL_COMPLETED

代码示例:

async def one_data():print('正在暂停1号靓仔')await asyncio.sleep(3)print('正在恢复1号靓仔')return '我是1号靓仔'async def two_data():print('正在暂停2号靓仔')await asyncio.sleep(1)print('正在恢复2号靓仔')return '我是2号靓仔'async def get_data():print('\n***我是第1个wait***')list_data = [asyncio.create_task(i) for i in (one_data(), two_data())]done, pending = await asyncio.wait(list_data,return_when = asyncio.FIRST_COMPLETED)# done, pending = await asyncio.wait(list_data,return_when = asyncio.FIRST_EXCEPTION)print('\n我完成了什么:{}'.format(done))print('\n我没有完成什么:{}'.format(pending)start_time = time.time()
asyncio.run(get_data())
print('\n耗时:{}'.format(time.time() - start_time))

运行结果:
在这里插入图片描述
可以看到,可以设置首次完成任务即可退出,所以耗时仅一秒,Task有finished,pending状态。

设置为FIRST_EXCEPTION,会在第一次出现错误的时候停止,可以先设置一个错误代码。这里不再演示,如果你感兴趣,可以动手试试。

读取wait返回值

显然地,在上述示例中返回的done,pendingset类型,就像这样:
在这里插入图片描述
直接for循环就可以读取:

for i in done:#读取全部完成的任务print('\n我完成了什么:{}'.format(i.result()))for i in pending:#读取全部没有完成的任务print('\n我没有完成什么:{}'.format(i))

wait_for

下面是3.11的wait_for官方解释,仍然是等待aw对象完成,这个方法可以从时间维度上对协程进行控制。
在这里插入图片描述
注意下面的版本更迭提示。

shield

shield方法用于保护任务不被取消,官方文档是这样说的,仍然有版本更迭,注意区分
在这里插入图片描述
虽然可以保护取消,但是在并发的时候,asyncio的run方法会获取新的事件循环并指定为当前事件循环,设定保护后可能会带来冲突。例如设定一个等待三秒的任务同时设有保护(相当于加了一个壳),还有一个等待两秒的任务,取消第一个任务并使用gather进行并发。最后的结果会丢失第一个任务。这是因为外侧的任务完成后,无法识别到里面的任务为同一个事件循环,无法进入并发。

可以参考以下代码:

import asyncioasync def one_data():print('正在暂停1号靓仔')await asyncio.sleep(1)print('正在恢复1号靓仔')return '我是1号靓仔'async def two_data():print('正在暂停2号靓仔')await asyncio.sleep(2)print('正在恢复2号靓仔')return '我是2号靓仔'async def mainx():task1 = asyncio.shield(one_data())task2 = asyncio.create_task(two_data())task1.cancel()shield = asyncio.gather(task1, task2,return_exceptions = True)print(await shield)asyncio.run(mainx())

执行结果:
在这里插入图片描述
最好是保证保护的任务时长比普通任务时长短,可以保证保护。也可以修改最后的run方法。采用轮询的方法

loop = asyncio.get_event_loop()#获取当前事件循环
loop.run_until_complete(shield())

如果这里有点没理解到也没关系。后续会结合实例继续讲解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/615685.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GaussDB技术解读系列:5分钟带您了解DRS录制回放

一、什么是DRS录制回放? DRS录制回放是将源数据库发生的真实业务流量,在目标数据库模拟执行,从而观察和检验目标数据库的功能和性能表现。录制回放主要分为录制、回放两个阶段,录制过程是从源数据库上将所需时间段内的全部SQL原语…

Spring系列学习九、Spring MVC的使用

Spring MVC的使用 一、MVC设计模式概述二、Spring MVC的工作原理三、HandlerMapping和ViewResolver四、 处理表单、文件上传和异常处理五、前端页面(View)编写1. 引入Thymeleaf模板引擎2.页面相关的示例代码3.后端处理代码编写 六、总结 本章我们将与大家…

使用RoboBrowser库实现JD.com视频链接爬虫程序

短视频已成为这个时代必不可少的内容,而这些视频内容往往散布在各大网站上。对于一些研究人员、数据分析师或者普通用户来说,获取特定网站上的视频链接是一项常见的需求。本文将介绍如何利用Python编程语言中的RoboBrowser库来编写一个爬虫程序&#xff…

AI音乐探索

好的网站推荐 AI定制背景音乐下载平台-BGM猫 网易天音 - 一站式AI音乐创作工具 - 官网 https://app.suno.ai/create/ 乐理知识 网易天音 - 一站式AI音乐创作工具 - 官网 分类探索中 婚礼类 音乐风格关键词: wedding,Canon,classical music,60 BPM,piano,h…

vue3中el-table实现表格合计行

el-table标签上加属性 show-summary :summary-method“getSummary” <el-table :data"formDate.scoreList" style"width:100%;height: 96%;" stripe show-summary:summary-method"calculateSummary" :header-cell-style"{ textAlign: ce…

Pytest自动化测试

目录 一、Pytest如何安装 二、Pytest如何编写用例 三、Pytest如何运行用例 四、Pytest如何实现参数化 五、Pytest如何跳过和标记用例 六、Pytest如何失败重执行 七、Pytest如何使用夹具 八、Pytest如何进行夹具共享 九、Pytest如何设置夹具作用域 Pytest是Python中最流…

Nvidia 推出了一款新型芯片,专为在家中运行人工智能而设计

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

PyTorch 各种池化层函数全览与用法演示

目录 torch.nn.functional子模块Pooling层详解 avg_pool1d 用法与用途 参数 注意事项 示例代码 avg_pool2d 用法与用途 参数 注意事项 示例代码 avg_pool3d 用法与用途 参数 注意事项 示例代码 max_pool1d 用法与用途 参数 注意事项 示例代码 max_pool2d…

Selenium自动化测试面试必备:高频面试题及答案整理

自动化测试已经成为现代软件测试中不可或缺的一部分。在自动化测试中&#xff0c;Selenium是最受欢迎的工具之一&#xff0c;因为它可以模拟用户与Web应用程序的交互。因此&#xff0c;对于许多测试工程师来说&#xff0c;熟练掌握Selenium框架是非常重要的。如果你正在寻找一份…

外汇天眼:Broadridge与Boring Money合作推出资产管理公司的消费者责任解决方案

Boring Money&#xff0c;一家金融数据和见解公司&#xff0c;与全球金融科技领导者Broadridge Financial Solutions, Inc. (NYSE:BR)合作&#xff0c;为资产管理公司提供了一个汇总产品分析和消费者视角的数据与见解的单一信息源&#xff0c;从而全面满足英国《消费者义务》法…

代码随想录算法训练营Day23|669. 修剪二叉搜索树、108.将有序数组转换为二叉搜索树、538.把二叉搜索树转换为累加树

目录 669. 修剪二叉搜索树 前言 思路 递归法 108.将有序数组转换为二叉搜索树 前言 递归法 538.把二叉搜索树转换为累加树 前言 递归法 总结 669. 修剪二叉搜索树 题目链接 文章链接 前言 本题承接昨天二叉搜索树的插入和删除操作题目&#xff0c;要对整棵二叉搜索树…

Ubuntu16.04升级到18.04--检查更新时出现问题--解决方法

一开始装Ubuntu时装的是16.04&#xff0c;后来装cuda时&#xff0c;发现核是4.15的&#xff0c;需要升级到18.04&#xff0c;于是输入sudo do-release-upgrade更新时&#xff0c;发现错误&#xff1a; no module named DistUpgrade google后发现帖子&#xff1a; https://ask…

移动通信系统关键技术多址接入OFDM学习(7)

1.OFDM是一种多载波传输方案&#xff0c;可以将高速串行传输转换为低速并行传输&#xff0c;增加符号持续时间&#xff0c;抗多径干扰能力强。 串行和并行有着不同的比特持续时间&#xff0c;同时拥有相同的数据速率。因此&#xff0c;虽然OFDM将串行信号转换为并行信号&#…

HTTP基本概念

HTTP&#xff08;HyperText Transfer Protocol&#xff1a;超文本传输协议&#xff09;是一种用于分布式、协作式和超媒体信息系统的应用层协议。 简单来说就是一种发布和接收 HTML 页面的方法&#xff0c;被用于在 Web 浏览器和网站服务器之间传递信息。 HTTP 默认工作在 TCP…

【数据库原理】(23)实际应用中的查询优化方法

一.基于索引的优化 索引是数据库查询优化的关键工具之一。合理地使用索引可以显著提高查询速度&#xff0c;降低全表扫描的成本。以下是建立和使用索引的一些基本原则和最佳实践。 索引的建立与使用原则 数据量规模与查询频率: 值得建立索引的表通常具有较多的记录&#xff0…

linux防火墙查看状态firewall、iptable

1、iptables防火墙 查看防火墙状态 service iptables status 停止防火墙 service iptables stop 启动防火墙 service iptables start 重启防火墙 service iptables restart 永久关闭防火墙 chkconfig iptables off 永久关闭后重启 chkconfig iptables on 开启80端…

【天龙怀旧服】攻略day5

关键字&#xff1a; 天鉴扫荡、举贤、燕子水路 1】85天鉴任务可以扫荡 在流派选择npc那里&#xff0c;花费40交子即可扫荡100点&#xff0c;可以兑换10个灵武打造图&#xff1b; 此外打造图绑定不影响做出来的灵武绑定&#xff0c;只要对应的玉不绑灵武就不绑定 2】冠绝师门…

报错java.lang.IllegalArgumentException: MALFORMED

java.lang.IllegalArgumentException: MALFORMEDat java.util.zip.ZipCoder.toString(ZipCoder.java:58)at java.util.zip.ZipInputStream.readLOC(ZipInputStream.java:300)at java.util.zip.ZipInputStream.getNextEntry(ZipInputStream.java:122)我是在解压压缩包文件的时候…

想要简化重复订单吗?不妨考虑一揽子采购订单

企业想提高采购流程效率&#xff0c;简化大批量采购是一个很好的开始。财务、会计和采购部门通过系统化订购大量物品&#xff08;如纸张、打印机墨水和墨粉、清洁用品、纸制品和其他易重复采购的消耗品&#xff09;可以节省时间和金钱。借助正确的采购订单&#xff08;PO&#…

android 重启

RescueParty 重启 Android之RescueParty机制 - 简书 01-10 16:39:15.421637 1268 1395 W RescueParty: Attempting rescue level RESET_SETTINGS_UNTRUSTED_DEFAULTS Line 90467: 01-10 16:39:15.422713 1268 1395 W RescueParty: Performing scoped reset for pack…