同步、异步无障碍:Python异步装饰器指南

一、引言

Python异步开发已经非常流行了,一些主流的组件像MySQL、Redis、RabbitMQ等都提供了异步的客户端,再处理耗时的时候不会堵塞住主线程,不但可以提高并发能力,也能减少多线程带来的cpu上下文切换以及内存资源消耗。但在业务开发的时候一些第三方库没有异步的处理方式,例如OSS、CV、其他第三方提供的SDK以及自己封装的函数有耗时等,此时还是需要借助线程来加速,再异步中就不会堵塞主线程,因此封装一个异步装饰器可以更好的处理异步,让代码更简洁。

二、功能分析

  1. 支持同步函数使用线程加速

  2. 异、同步函数需支持 await 语法等待返回结果

  3. 异、同步函数需支持后台任务,无需等待

同步函数使用线程加速

同步函数使用线程,这还是挺简单的使用,内置库的 threading.Thread 就可以实现

import time
import threadingdef task1(name):print(f"Hello {name}")time.sleep(1)print(f"Completed {name}")t1 = threading.Thread(target=task1, args=("hui",))
t2 = threading.Thread(target=task1, args=("wang",))t1.start()
t2.start()t1.join()
t2.join()>>> out
Hello hui
Hello wang
Completed hui
Completed wang
  • start()方法用于启动线程执行函数。
  • join()方法用于等待线程执行结束。

但这样直接开线程的方式比较暴力,也不太好管理,因此可以想到线程池,进行线程复用与管理。Python内置的 concurrent.futures 模块提供了线程池和进程池的实现与封装。

import time
from concurrent.futures import ThreadPoolExecutordef task2(name):print(f"Hello {name}")time.sleep(1)return f"Completed {name}"with ThreadPoolExecutor(max_workers=2) as executor:future1 = executor.submit(task2, "hui")future2 = executor.submit(task2, "zack")print("ret1", future1.result())
print("ret2", future2.result())>>> out
Hello hui
Hello zack
ret1 Completed hui
ret2 Completed zack

异、同步函数需支持 await 语法

异、同步函数需支持 await 语法等待返回结果,异步函数本身就支持 await语法,这里主要是实现同步函数支持

await 语法,在python中可以await语法的对象有如下几大类:

  1. 协程对象(coroutine):定义了__await__方法的对象,异步框架中的协程函数都是此类型。
  2. 任务对象(Task):封装了协程的对象, 如 asyncio 中的 Task, trio中的Task。
  3. Future对象:表示异步操作结果的对象, 如 concurrent.futures.Future。
  4. 协程装饰器封装的对象:一些装饰器可以将普通函数或对象包装成可await的对象,如@asyncio.coroutine。

综上,凡是实现了__await__魔术方法的对象或者封装了协程/任务的对象,都可以被await,await会自动把对象交给事件循环运行,等待其完成。

常见的可await对象包括协程函数、任务对象、Future、被@coroutine装饰的函数等,这可以使异步代码更简洁。await对象可以暂停当前协程,等待异步操作完成后再继续执行。

import asyncioasync def coro_demo():print("await coroutine demo")async def task_demo():print("await task demo")async def coro():print("in coro task")# 创建 Task 对象task = asyncio.create_task(coro())await taskasync def future_demo():print("await future demo")future = asyncio.Future()await future# 这个装饰器已经过时
@asyncio.coroutine
def coro_decorated_demo():print("await decorated function demo")async def main():await coro_demo()await task_demo()await future_demo()await coro_decorated_demo()if __name__ == '__main__':asyncio.run(main())>>> out 
DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" insteaddef coro_decorated_demo():await coroutine demo
await task demo
in coro task
await future demo

这个 @asyncio.coroutine 协程装饰器已经过时了,都是使用 async、await 语法替代。

下面是实现 __await__ 方法的demo

import asyncioclass AsyncDownloader:def __init__(self, url):self.url = urlself.download_ret = Nonedef __await__(self):print(f'Starting download of {self.url}')loop = asyncio.get_event_loop()future = loop.run_in_executor(None, self.download)yield from future.__await__()return selfdef download(self):print(f'Downloading {self.url}...')# 模拟下载过程import timetime.sleep(2)self.download_ret = f'{self.url} downloaded ok'async def main():print('Creating downloader...')downloader = AsyncDownloader('https://www.ithui.top/file.zip')print('Waiting for download...')downloader_obj = await downloaderprint(f'Download result: {downloader_obj.download_ret}')if __name__ == '__main__':asyncio.run(main())>>> out
Creating downloader...
Waiting for download...
Starting download of https://www.ithui.top/file.zip
Downloading https://www.ithui.top/file.zip...
Download result: https://www.ithui.top/file.zip downloaded ok    

用 yield from 来迭代 future对象(符合__await__逻辑),并在结束时return self

异、同步函数需支持后台任务

异步、后台任务的好处与场景

  1. 减少主程序的等待时间

    异步函数可以通过后台任务的方式执行一些耗时操作,如IO操作、网络请求等,而主程序无需等待这些操作完成,可以继续执行其他任务,从而减少程序总体的等待时间。

  2. 提高程序响应性能

    后台任务的异步执行,可以避免主程序被长时间阻塞,从而改善程序的整体响应性能。用户无需长时间等待才能得到响应。

  3. 解决IO密集型任务阻塞问题

    对于网络、文件IO等密集型任务,使用同步执行可能会导致长时间阻塞,而异步后台任务可以很好地解决这个问题,避免资源浪费。

  4. 良好的用户体验

    后台任务的异步处理,给用户的感觉是多个任务同时在执行,实际上CPU在切换处理,这相比线性等待任务完成,可以提供更好的用户体验。

  5. 适用于不需要实时结果的任务

    邮件发送、数据批处理、文件处理等不需要用户即时等待结果的任务非常适合通过异步方式在后台完成。

在python中同异步函数实现后台任务

  • 异步函数可以通过 asyncio.create_task 方法实现后台任务

  • 同步函数可以通过线程、线程池来实现

import asyncio
import time
from threading import Thread
from concurrent.futures import ThreadPoolExecutorasync def async_bg_task():print('async bg task running')await asyncio.sleep(3)print('async bg task completed')def sync_bg_task():print('sync bg task running')time.sleep(3)print('sync bg task completed')async def main():print('Starting main program')# 异步函数的后台任务asyncio.create_task(async_bg_task())# 同步函数的后台任务# with ThreadPoolExecutor() as executor:#     executor.submit(sync_bg_task)# Thread(target=sync_bg_task).start()loop = asyncio.get_running_loop()loop.run_in_executor(executor=ThreadPoolExecutor(), func=sync_bg_task)print('Main program continues')await asyncio.sleep(5)if __name__ == '__main__':asyncio.run(main())>>> ThreadPoolExecutor out
Starting main program
sync bg task running
sync bg task completed
Main program continues
async bg task running
async bg task completed>>> Thread out
Starting main program
sync bg task running
Main program continues
async bg task running
sync bg task completed
async bg task completed>>> run_in_executor out
Starting main program
sync bg task running
Main program continues
async bg task running
async bg task completed
sync bg task completed

看输出结果可以发现在同步函数使用直接使用线程池 ThreadPoolExecutor 执行还是堵塞了主线程,然后 Thread 没有,通过 loop.run_in_executor 也不会阻塞。后面发现 是 with 语法导致的堵塞,with 的根本原因就是它会等待线程池内的所有线程任务完成并回收,所以主线程必须等同步函数结束后才能继续。一开始我还一以为是线程池使用了主线程的线程后面打印线程名称看了下不是,然后调试下就发现了with的问题。

import asyncio
import time
import threading
from concurrent.futures import ThreadPoolExecutorasync def async_bg_task():print(f"async_bg_task In thread: {threading.current_thread().name}")print('async bg task running')await asyncio.sleep(3)print('async bg task completed')def sync_bg_task(num):print(f"sync_bg_task{num} In thread: {threading.current_thread().name}")print(f'sync bg task{num} running')time.sleep(3)print(f'sync bg task{num} completed')async def main():print('Starting main program')# 异步函数的后台任务asyncio.create_task(async_bg_task())# 同步函数的后台任务thread_pool = ThreadPoolExecutor()# with thread_pool as pool:#     for i in range(5):#         pool.submit(sync_bg_task, i)for i in range(5):thread_pool.submit(sync_bg_task, i)threading.Thread(target=sync_bg_task, args=["thread"]).start()loop = asyncio.get_running_loop()loop.run_in_executor(ThreadPoolExecutor(), sync_bg_task, "loop.run_in_executor")print('Main program continues')print(f"Main program In thread: {threading.current_thread().name}")await asyncio.sleep(5)if __name__ == '__main__':asyncio.run(main())

三、具体封装实现

import asyncio
from concurrent.futures import ThreadPoolExecutor, Executordef run_on_executor(executor: Executor = None, background: bool = False):"""异步装饰器- 支持同步函数使用 executor 加速- 异步函数和同步函数都可以使用 `await` 语法等待返回结果- 异步函数和同步函数都支持后台任务,无需等待Args:executor: 函数执行器, 装饰同步函数的时候使用background: 是否后台执行,默认FalseReturns:"""def _run_on_executor(func):@functools.wraps(func)async def async_wrapper(*args, **kwargs):if background:return asyncio.create_task(func(*args, **kwargs))else:return await func(*args, **kwargs)@functools.wraps(func)def sync_wrapper(*args, **kwargs):loop = asyncio.get_event_loop()task_func = functools.partial(func, *args, **kwargs)    # 支持关键字参数return loop.run_in_executor(executor, task_func)# 异步函数判断wrapper_func = async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapperreturn wrapper_funcreturn _run_on_executor

封装成了带参数的装饰器

  • executor: 函数执行器, 装饰同步函数的时候使用

    • 可以传递指定的线程池,默认None 根据系统cpu核心数动态创建线程的数量
  • background: 用于标识是否后台执行,默认False

    • 有点诟病同步函数的后台任务没有用到这个参数而是使用 await 语法控制,但在使用装饰器时候可以起到后台任务标识作用,别人一看有这个参数就知道是后台任务就不用细看函数业务逻辑
    • 后续再看看怎么优化,大家有没有比较好建议
  • loop.run_in_executor(executor, task_func) 方法不支持关键字参数的传递,故而采用 task_func = functools.partial(func, *args, **kwargs) ,来构造一个不带参数的函数就可以方便使用了

测试demo

import asyncio
import time
from concurrent.futures import ThreadPoolExecutorfrom py_tools.decorators.base import run_on_executor
from loguru import loggerthread_executor = ThreadPoolExecutor(max_workers=3)@run_on_executor(background=True)
async def async_func_bg_task():logger.debug("async_func_bg_task start")await asyncio.sleep(1)logger.debug("async_func_bg_task running")await asyncio.sleep(1)logger.debug("async_func_bg_task end")return "async_func_bg_task ret end"@run_on_executor()
async def async_func():logger.debug("async_func start")await asyncio.sleep(1)logger.debug("async_func running")await asyncio.sleep(1)return "async_func ret end"@run_on_executor(background=True, executor=thread_executor)
def sync_func_bg_task():logger.debug("sync_func_bg_task start")time.sleep(1)logger.debug("sync_func_bg_task running")time.sleep(1)logger.debug("sync_func_bg_task end")return "sync_func_bg_task end"@run_on_executor()
def sync_func():logger.debug("sync_func start")time.sleep(1)logger.debug("sync_func running")time.sleep(1)return "sync_func ret end"async def main():ret = await async_func()logger.debug(ret)async_bg_task = await async_func_bg_task()logger.debug(f"async bg task {async_bg_task}")logger.debug("async_func_bg_task 等待后台执行中")loop = asyncio.get_event_loop()for i in range(3):loop.create_task(async_func())ret = await sync_func()logger.debug(ret)sync_bg_task = sync_func_bg_task()logger.debug(f"sync bg task {sync_bg_task}")logger.debug("sync_func_bg_task 等待后台执行")await asyncio.sleep(10)if __name__ == '__main__':asyncio.run(main())

测试详细输出

async_func start
async_func running
async_func ret endasync bg task <Task pending name='Task-2' coro=<async_func_bg_task() running at ...
sync_func start
async_func_bg_task start
async_func start
async_func start
async_func startsync_func running
async_func_bg_task running
async_func running
async_func running
async_func runningasync_func_bg_task end
sync_func ret endsync_func_bg_task start
sync bg task <Future pending cb=[_chain_future.<locals>._call_check_cancel() at ...
sync_func_bg_task 等待后台执行
sync_func_bg_task running
sync_func_bg_task end

四、源代码

HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)

---------------------------END---------------------------

题外话

“不是只有程序员才要学编程?!”

认真查了一下招聘网站,发现它其实早已变成一项全民的基本技能了。

连国企都纷纷要求大家学Python!
在这里插入图片描述

世界飞速发展,互联网、大数据冲击着一切,各行各业对数据分析能力的要求越来越高,这便是工资差距的原因,学习编程顺应了时代的潮流。

在这个大数据时代,从来没有哪一种语言可以像Python一样,在自动化办公、爬虫、数据分析等领域都有众多应用。

更没有哪一种语言,语法如此简洁易读,消除了普通人对于“编程”这一行为的恐惧,从小学生到老奶奶都可以学会。

《2020年职场学习趋势报告》显示,在2020年最受欢迎的技能排行榜,Python排在第一。
在这里插入图片描述

它的角色类似于现在Office,成了进入职场的第一项必备技能。

如果你也想增强自己的竞争力,分一笔时代的红利,我的建议是,少加点班,把时间腾出来,去学一学Python。

因为,被誉为“未来十年的职场红利”的Python,赚钱、省钱、找工作、升职加薪简直无所不能!

目前,Python人才需求增速高达**174%,人才缺口高达50万,**部分领域如人工智能、大数据开发, 年薪30万都招不到人!在这里插入图片描述

感兴趣的小伙伴,赠送全套Python学习资料,包含面试题、简历资料等具体看下方。

👉CSDN大礼包🎁:全网最全《Python学习资料》免费赠送🆓!(安全链接,放心点击)

一、Python所有方向的学习路线

Python所有方向的技术点做的整理,形成各个领域的知识点汇总,它的用处就在于,你可以按照下面的知识点去找对应的学习资源,保证自己学得较为全面。

img
img

二、Python必备开发工具

工具都帮大家整理好了,安装就可直接上手!img

三、最新Python学习笔记

当我学到一定基础,有自己的理解能力的时候,会去阅读一些前辈整理的书籍或者手写的笔记资料,这些笔记详细记载了他们对一些技术点的理解,这些理解是比较独到,可以学到不一样的思路。

img

四、Python视频合集

观看全面零基础学习视频,看视频学习是最快捷也是最有效果的方式,跟着视频中老师的思路,从基础到深入,还是很容易入门的。

img

五、实战案例

纸上得来终觉浅,要学会跟着视频一起敲,要动手实操,才能将自己的所学运用到实际当中去,这时候可以搞点实战案例来学习。img

六、面试宝典

在这里插入图片描述

在这里插入图片描述

简历模板在这里插入图片描述

如有侵权,请联系删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/49457.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

回归预测 | MATLAB实现SSA-RF麻雀搜索优化算法优化随机森林算法多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现SSA-RF麻雀搜索优化算法优化随机森林算法多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现SSA-RF麻雀搜索优化算法优化随机森林算法多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;…

2023年国赛 高教社杯数学建模思路 - 案例:退火算法

文章目录 1 退火算法原理1.1 物理背景1.2 背后的数学模型 2 退火算法实现2.1 算法流程2.2算法实现 建模资料 ## 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 退火算法原理 1.1 物理背景 在热力学上&a…

组合总和-LeetCode

给你一个无重复元素的整数数组 candidates 和一个目标整数 target &#xff0c;找出 candidates 中可以使数字和为目标数 target 的所有不同组合 &#xff0c;并以列表形式返回。你可以按 任意顺序返回这些组合。 candidates 中的同一个数字可以无限制重复被选取 。如果至少一个…

【Python】PyCharm配置外部工具

【摘要】 QT Designer配置 Designer绘制的UI文件转换成py文件 UI用到的图片资源文件转换成py文件 注意&#xff1a;使用豆瓣原安装比较快 pip install * -i http://pypi.douban.com/simple --trusted-host pypi.douban.com 1&#xff0c;File->Settings->Tools->…

手机无人直播软件,有哪些优势?

近年来&#xff0c;随着手机直播的流行和直播带货的市场越来越大&#xff0c;手机无人直播软件成为许多商家开播带货的首选。在这个领域里&#xff0c;声音人无人直播系统以其独特的优势&#xff0c;成为市场上备受瞩目的产品。接下来&#xff0c;我们将探讨手机无人直播软件给…

Office ---- excel ---- 怎么批量设置行高

解决方法&#xff1a; 调整行高即可

C语言学习系列-->【关于qsort函数的详解以及它的模拟实现】

文章目录 一、概述二、qsort函数参数介绍三、qsort实现排序3.1 qsort实现整型数组排序3.2 qsort实现结构体数组排序 四、模拟实现qsort函数 一、概述 对数组的元素进行排序 对数组中由 指向的元素进行排序&#xff0c;每个元素字节长&#xff0c;使用该函数确定顺序。 此函数使…

mongodb集群

端口192.168.115.3 192.168.115.4 1192.168.115.5 下载MongoDB软件包版本为4.2.14并安装 rpm -ih --force --nodeps *.rpm 2创建文件夹mkdir -p /opt/local/mongo-cluster/conf 3.在目录里创建配置文件cd /opt/local/mongo-cluster/conf …

vue项目中使用ts的枚举类型

vue项目中要使用ts的枚举类型需要为script标签的lang属性添加ts属性值 <script lang"ts" setup> </script > 声明枚举类型&#xff1a; //语法 /* enum 枚举名称 {可能的值 }*/ enum scenic_status {"正常" 1,"审核中","暂停…

如何撰写骨灰级博士论文?这是史上最全博士论文指导!

博士论文的写作是博士研究生主要要完成的工作。由于存在着较高的难度&#xff0c;较长的写作周期&#xff0c;以及在创新&#xff0c;写作规范&#xff0c;实际及理论意义等方面有着比较高的要求&#xff0c;博士论文的完成一般说来是有相当难度的。一篇好的博士论文不仅是一本…

2023年中,量子计算产业现状——

2023年上半年&#xff0c;量子计算&#xff08;QC&#xff09;领域取得了一系列重要进展和突破&#xff0c;显示出量子计算技术的快速发展和商业应用的不断拓展。在iCV TAnk近期发表的一篇报告中&#xff0c;团队从制度进步、产业生态、投融资形势、总结与展望四个方面对量子计…

Vue3 中引入液晶数字字体(通常用于大屏设计)

一、下载 .ttf 字体文件到本地&#xff0c;放在 src 中的 assets 文件下 下载液晶字体 DS-Digital.ttf 二、在 css 文件中引入字体 /* src/assets/fonts/dsfont.css */ font-face {font-family: electronicFont;src: url(./DS-Digital.ttf);font-weight: normal;font-styl…

Mybatis 建立依赖失败:报错Dependency ‘mysql:mysql-connector-java:8.0.28‘ not found

Mybatis 建立依赖失败&#xff1a;报错Dependency ‘mysql:mysql-connector-java:8.0.28’ not found 解决办法&#xff1a; 写完依赖代码&#xff0c;直接重构&#xff0c;下载依赖。 图片: ![Alt](https://img-home.csdnimg.cn/images/20220524100510.png Mac 版本注意Ide…

编写Dockerfile制作Web应用系统nginx镜像,生成镜像nginx:v1.1,并推送其到私有仓库

Docker 镜像是一个特殊的文件系统&#xff0c;除了提供容器运行时所需的程序、库、资源、配置等文件外&#xff0c;还包含了一些为运行时准备的一些配置参数&#xff08;如匿名卷、环境变量、用户等&#xff09;。镜像不包含任何动态数据&#xff0c;其内容在构建之后也不会被改…

深度学习论文: WinCLIP: Zero-/Few-Shot Anomaly Classification and Segmentation

深度学习论文: WinCLIP: Zero-/Few-Shot Anomaly Classification and Segmentation WinCLIP: Zero-/Few-Shot Anomaly Classification and Segmentation PDF: https://arxiv.org/pdf/2303.14814.pdf PyTorch代码: https://github.com/shanglianlm0525/CvPytorch PyTorch代码: h…

Spring事务和事务传播机制(2)

前言&#x1f36d; ❤️❤️❤️SSM专栏更新中&#xff0c;各位大佬觉得写得不错&#xff0c;支持一下&#xff0c;感谢了&#xff01;❤️❤️❤️ Spring Spring MVC MyBatis_冷兮雪的博客-CSDN博客 在Spring框架中&#xff0c;事务管理是一种用于维护数据库操作的一致性和…

设计模式——接口隔离原则

文章目录 基本介绍应用实例应传统方法的问题和使用接口隔离原则改进 基本介绍 客户端不应该依赖它不需要的接口&#xff0c;即一个类对另一个类的依赖应该建立在最小的接口上先看一张图: 类 A 通过接口 Interface1 依赖类 B&#xff0c;类 C 通过接口 Interface1 依赖类 D&…

shell脚本语句(画矩形、三角形、乘法表和小游戏)(#^.^#)

目录 一、语句 一、条件语句 一、以用户为例演示 一、显示当前登录系统的用户信息 二、显示有多少个用户 二、单分支if 一、输入脚本 二、验证结果 三、双分支if 一、输入脚本 二、验证结果 四、多分支if 一、输入脚本 二、验证 二、循环语句 一、shell版本的循环…

空调企业只干空调,卡萨帝却干了业主想不到的事

作者 | 曾响铃 文 | 响铃说 今年入夏以来&#xff0c;随着气温的不断攀升&#xff0c;空调已经成为生活、工作中的“绝对刚需”。由此而来的则是空调产品的销量大增。 但也有不少消费者表示&#xff0c;随着产品功能的越发相似&#xff0c;价格趋同&#xff0c;使空调变得越…

Postman 如何进行参数化

前言 Postman作为一款接口测试工具&#xff0c;受到了非常多的开发工程师的拥护。 那么做为测试&#xff0c;了解Postman这款工具就成了必要的了。 这篇文章就是为了解决Postman怎么进行参数化的。 全局变量 全局变量是将这个变量设置成整个程序的都可以用&#xff0c;不用去…