Python Coroutine 池化实现

池化介绍

在当今计算机科学和软件工程的领域中,池化技术如线程池、连接池和对象池等已经成为优化资源利用率和提高软件性能的重要工具。然而,在 Python 的协程领域,我们却很少见到类似于 ThreadPoolExecutor 的 CoroutinePoolExecutor。为什么会这样呢?

首先,Python Coroutine 的特性使得池化技术在协程中的应用相对较少。与像 Golang 这样支持有栈协程的语言不同,Python Coroutine 是无栈的,无法跨核执行,从而限制了协程池发挥多核优势的可能性。

其次,Python Coroutine 的轻量级和快速创建销毁的特性,使得频繁创建和销毁协程并不会带来显著的性能损耗。这也解释了为什么 Python 官方一直没有引入 CoroutinePoolExecutor。

然而,作为开发者,我们仍然可以在特定场景下考虑协程的池化。虽然 Python Coroutine 轻量,但在一些需要大量协程协同工作的应用中,池化技术能够提供更方便、统一的调度子协程的方式。尤其是在涉及到异步操作的同时需要控制并发数量时,协程池的优势就显而易见了。

关于 Python 官方是否会在未来引入类似于 TaskGroup 的 CoroutinePoolExecutor,这或许是一个悬而未决的问题。考虑到 Python 在异步编程方面的快速发展,我们不能排除未来可能性的存在。或许有一天,我们会看到 TaskGroup 引入一个 max_workers 的形参,以更好地支持对协程池的需求。

在实际开发中,我们也可以尝试编写自己的 CoroutinePoolExecutor,以满足特定业务场景的需求。通过合理的设计架构和对数据流的全局考虑,我们可以最大程度地发挥协程池的优势,提高系统的性能和响应速度。

在接下来的文章中,我们将探讨如何设计和实现一个简单的 CoroutinePoolExecutor,以及在实际项目中的应用场景。通过深入理解协程池的工作原理,我们或许能更好地利用这一技术,使我们的异步应用更为高效。

如何开始编写

如何开始编写 CoroutinePoolExecutor,首先我们要明确出其适用范畴、考虑到使用方式和其潜在的风险点:

  • 它并不适用于 Mult Thread + Mult Event Loop 的场景,因此它并非线程安全的。
  • 应当保持和 ThreadPoolExecutor 相同的调用方式。
  • 不同于 Mult Thread 中子线程不依赖于主线程的运行,而在 Mult Coroutine 中子协程必须依赖于主协程,因此主协程在子协程没有全部运行完毕之前不能直接 done 掉。这也解释了为什么 TaskGroup 官方实现中没有提供类似于 shutdown 之类的方法,而是只提供上下文管理的运行方式。

有了上述 3 点的考量,我们决定将 ThreadPoolExecutor 平替成 CoroutinePoolExecutor。这样的好处在于,作为学习者一方面可以了解 ThreadPoolExecutor 的内部实现机制,另一方面站在巨人肩膀上的编程借鉴往往会事半功倍,对于自我的提升也是较为明显的。

在考虑这些因素的同时,我们将继续深入研究协程池的设计和实现。通过对适用范围和使用方式的明确,我们能更好地把握 CoroutinePoolExecutor 的潜在优势,为异步应用的性能提升做出更有针对性的贡献。

具体代码实现

在这里我先贴出完整的代码实现,其中着重点已经用注释标明。

以下是 CoroutinePoolExecutor 的代码实现:

import os
import asyncio
import weakref
import logging
import itertoolsasync def _worker(executor_reference: "CoroutinePoolExecutor", work_queue: asyncio.Queue):try:while True:work_item = await work_queue.get()if work_item is not None:await work_item.run()del work_itemexecutor = executor_reference()if executor is not None:# Notify available coroutinesexecutor._idle_semaphore.release()del executorcontinue# Notifies the next coroutine task that it is time to exitawait work_queue.put(None)breakexcept Exception as exc:logging.critical('Exception in worker', exc_info=True)class _WorkItem:def __init__(self, future, coro):self.future = futureself.coro = coroasync def run(self):try:result = await self.coroexcept Exception as exc:self.future.set_exception(exc)else:self.future.set_result(result)class CoroutinePoolExecutor:"""Coroutine pool implemented based on ThreadPoolExecutorDifferent from ThreadPoolExecutor, because the running of sub-coroutine depends on the main coroutineSo you must use the shutdown method to wait for all subtasks and wait for them to complete execution"""# Used to assign unique thread names when coroutine_name_prefix is not supplied._counter = itertools.count().__next__def __init__(self, max_workers, coroutine_name_prefix=""):if max_workers is None:max_workers = min(32, (os.cpu_count() or 1) + 4)if max_workers <= 0:raise ValueError("max_workers must be greater than 0")self._max_workers = max_workersself._work_queue = asyncio.Queue()self._idle_semaphore = asyncio.Semaphore(0)self._coroutines = set()self._shutdown = Falseself._shutdown_lock = asyncio.Lock()self._coroutine_name_prefix = (coroutine_name_prefix or (f"{__class__.__name__}-{self._counter()}"))async def submit(self, coro):async with self._shutdown_lock:# When the executor is closed, new coroutine tasks should be rejected, otherwise it will cause the problem that the newly added tasks cannot be executed.# This is because after shutdown, all sub-coroutines will end their work# one after another. Even if there are new coroutine tasks, they will not# be reactivated.if self._shutdown:raise RuntimeError('cannot schedule new coroutine task after shutdown')f = asyncio.Future()w = _WorkItem(f,coro)await self._work_queue.put(w)await self._adjust_coroutine_count()return fasync def _adjust_coroutine_count(self):try:# 2 functions:# - When there is an idle coroutine and the semaphore is not 0, there is no need to create a new sub-coroutine.# - Prevent exceptions from modifying self._coroutines members when the for loop self._coroutines and await task in shutdown are modified# Since the Semaphore provided by asyncio does not have a timeout# parameter, you can choose to use it with wait_for.if await asyncio.wait_for(self._idle_semaphore.acquire(),0):returnexcept TimeoutError:passnum_coroutines = len(self._coroutines)if num_coroutines < self._max_workers:coroutine_name = f"{self._coroutine_name_prefix or self}_{num_coroutines}"t = asyncio.create_task(coro=_worker(weakref.ref(self),self._work_queue),name=coroutine_name)self._coroutines.add(t)async def shutdown(self, wait=True, *, cancel_futures=False):async with self._shutdown_lock:self._shutdown = Trueif cancel_futures:while True:try:work_item = self._work_queue.get_nowait()except asyncio.QueueEmpty:breakif work_item is not None:work_item.future.cancel()# None is an exit signal, given by the shutdown method, when the shutdown method is called# will notify the sub-coroutine to stop working and exit the loopawait self._work_queue.put(None)if wait:for t in self._coroutines:await tasync def __aenter__(self):return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):await self.shutdown(wait=True)return False

以下是 CoroutinePoolExecutor 的使用方式:

import asynciofrom coroutinepoolexecutor import CoroutinePoolExecutorasync def task(i):await asyncio.sleep(1)print(f"task-{i}")async def main():async with CoroutinePoolExecutor(2) as executor:for i in range(10):await executor.submit(task(i))if __name__ == "__main__":asyncio.run(main())

我们知道,在线程池中,工作线程一旦创建会不断的领取新的任务并执行,除开 shutdown() 调用,否则对于静态的线程池来讲工作线程不会自己结束。

在上述协程池代码实现中,CoroutinePoolExecutor 类包含了主要的对外调用功能的接口、内部提供了存储 task 的 Queue、工作协程自动生成 name 的计数器、保障协程的信号量锁等等。

而 _worker 函数是工作协程的运行函数,其会在工作协程启动后,不断的从 CoroutinePoolExecutor 的 Queue 中得到 _WorkItem 并由 _WorkItem 具体执行 coro task。

剩下的 _WorkItem 是一个 future 对象与 coro task 的封装器,其功能是解耦 future 对象和 coro task、并在 coro task 运行时和运行后设置 future 的结果。

对于异步循环的思考

在此 CoroutinePoolExecutor 实现后,我其实又有了一个新的思考。Python 的 EventLoop 相较于 Node.js 的 EventLoop 来说其实更加的底层,它有感的暴露了出来。

具体体现在当 Python Event Loop 启动后,如果 main coroutine 停止运行,那么所有的 subtask coroutine 也会停止运行,尤其是对于一些需要清理资源的操作、如 aiohttp 的 close session、CoroutinePoolExecutor 的 shutdown 等都会在某些情况显得无措,说的更具体点就是不知道在什么时候调用。

对于这些问题,我们可以继承 BaseEventLoop 自己手动对 EventLoop 的功能进行扩展,如在事件循环关闭之前添加 hook function,甚至可以限制整个 EventLoop 的 max_workers 或者做成动态的可调节 coroutine 数量的 EventLoop 都行。

无论如何,只要心里有想法,就可以去将它实现 .. 学习本身就是一个不断挑战的过程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/820796.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

供应链金融AI机器学习建模实战_论文科研_企业建模定制服务

随着全球贸易的不断发展和供应链的日益复杂化&#xff0c;供应链金融作为一种新型金融工具&#xff0c;正逐渐受到企业和金融机构的关注和重视。供应链金融是指通过金融手段来优化和改进供应链中的资金流动和货物流动&#xff0c;以实现企业间的合作共赢。 供应链金融的核心是将…

实体类List重复校验

如果实体类有多个属性&#xff0c;并且你希望根据所有属性的组合来进行重复校验&#xff0c;你可以考虑以下几种方法&#xff1a; 使用集合存储已经出现过的实体对象&#xff1a; 将每个实体对象放入一个 Set 中进行重复校验。在 Set 中元素的比较可以使用自定义的 equals 方法…

springboot+vue全栈开发【3.前端篇之Vue基础语法2】

目录 前言Vue基础语法1.事件绑定指令2.条件渲染指令v-show和v-if指令v-else和v-else-if指令 3.列表渲染指令扩展&#xff1a;v-for中的key 前言 hi&#xff0c;这个系列是我自学开发的笔记&#xff0c;适合具有一定编程基础&#xff08;html、css那些基础知识要会&#xff01;…

【随笔】Git 高级篇 -- 模拟团队合作 git fetch git pull(二十九)

&#x1f48c; 所属专栏&#xff1a;【Git】 &#x1f600; 作  者&#xff1a;我是夜阑的狗&#x1f436; &#x1f680; 个人简介&#xff1a;一个正在努力学技术的CV工程师&#xff0c;专注基础和实战分享 &#xff0c;欢迎咨询&#xff01; &#x1f496; 欢迎大…

最新IntelliJ IDEA 2024.1 安装和快速配置教程

IntelliJ IDEA 2024.1 最新版如何快速入门体验?IntelliJ IDEA 2024.1 安装和配置教程 图文解说版 文章目录 IntelliJ IDEA 2024.1 最新版如何快速入门体验?IntelliJ IDEA 2024.1 安装和配置教程 图文解说版前言 第一步&#xff1a; IntelliJ IDEA 2024.1安装教程第 0 步&…

国产高性能DSP音频处理芯片 AI算法智能消原音 PTN1118方案

PTN1118植入帕特纳微AI&#xff08;SVS&#xff09;&#xff0c;实现将任意音源中人声部分消除&#xff0c;并在极大程度上保留伴奏&#xff0c;配合PTN 卡拉OK系列芯片&#xff0c;使传统音频设备更富娱乐性。 支持模拟与数字输入输出&#xff0c;数字接口支持从模式 人声消除…

Mamba 论文翻译

Mamba: 带选择性状态空间的线性时间序列模型 摘要&#xff1a; 当下&#xff0c;给大多数令人兴奋的深度学习方面的应用赋能的基础模型&#xff0c;几乎普遍是基于Transformer 架构和其核心的注意力模块。很多次二次时间复杂度的架构&#xff0c;例如&#xff0c;那些线性注意…

单细胞RNA测序(scRNA-seq)cellranger count的细胞定量和aggr整合

单细胞RNA测序(scRNA-seq)基础知识可查看以下文章: 单细胞RNA测序(scRNA-seq)工作流程入门 单细胞RNA测序(scRNA-seq)细胞分离与扩增 单细胞RNA测序(scRNA-seq)SRA数据下载及fastq-dumq数据拆分 单细胞RNA测序(scRNA-seq)Cellranger流程入门和数据质控 细胞定量…

NL2SQL进阶系列(4):ConvAI、DIN-SQL、C3-浙大、DAIL-SQL-阿里等16个业界开源应用实践详解[Text2SQL]

NL2SQL进阶系列(4)&#xff1a;ConvAI、DIN-SQL等16个业界开源应用实践详解[Text2SQL] NL2SQL基础系列(1)&#xff1a;业界顶尖排行榜、权威测评数据集及LLM大模型&#xff08;Spider vs BIRD&#xff09;全面对比优劣分析[Text2SQL、Text2DSL] NL2SQL基础系列(2)&#xff1a…

揭秘AI精准输出:如何构建完美的AIGC提示词?

揭秘AI精准输出&#xff1a;如何构建完美的AIGC提示词&#xff1f;&#x1f916; 文章目录 揭秘AI精准输出&#xff1a;如何构建完美的AIGC提示词&#xff1f;&#x1f916;摘要引言正文&#x1f4d8; 提示词的基本概念1. 什么是提示词&#xff1f;2. 提示词的作用 &#x1f4d…

SSH KEY 添加

mac&#xff1a; Add SSH KEY公钥 1、 先cd进.ssh文件夹&#xff0c;查看电脑中是否存在之前添加的公钥文件(id_rsa.pub、id_rsa)&#xff0c;要是存在&#xff0c;就先删除: jingchengxindeMacBook-Pro:~ jingchengxin$ cd .ssh jingchengxindeMacBook-Pro:.ssh jingchen…

PTA图论的搜索题

目录 7-1 列出连通集 题目 输入格式: 输出格式: 输入样例: 输出样例: AC代码 7-2 六度空间 题目 输入格式: 输出格式: 输入样例: 输出样例: 思路 AC代码 7-3 地下迷宫探索 题目 输入格式: 输出格式: 输入样例1: 输出样例1: 输入样例2: 输出样例2: 思路 …

基于Springboot+Vue的Java项目-免税商品优选购物商城系统开发实战(附演示视频+源码+LW)

大家好&#xff01;我是程序员一帆&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;Java毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计 &am…

DC-3渗透测试复现

DC-3渗透测试复现 目的&#xff1a; 获取最高权限以及5个flag 过程&#xff1a; 信息打点-sql注入-反弹shell- pkexec提权&#xff08;CVE-2021-4034&#xff09; 环境&#xff1a; 攻击机&#xff1a;kali(192.168.85.136) 靶机&#xff1a;DC_3(192.168.85.133) 复现…

Pyinstaller打包为可执行.exe文件 数据路径问题、闪退问题

将要打包如下文件结构&#xff1a; --project--data.txt--main.py使用pyinstaller打包&#xff0c;要使用--add-data参数&#xff0c;将 data.txt 文件包含在生成的可执行文件中。 同时注意&#xff1a;main.py代码中的获取数据路径&#xff0c;要使用 os.path模块来构建 data…

特斯拉宣布 10%大裁员;刘强东数字人开启直播首秀丨 RTE 开发者日报 Vol.185

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」 &#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE&#xff08;Real Time Engagement&#xff09; 领域内「有话题的 新闻 」、「有态度的 观点 」、「有意思的 数据 」、「有思考的 文…

python 面向对象(封装、继承、多态)

封装 1 封装概述 是指隐藏对象的属性和实现细节&#xff0c;仅对外提供公共访问方式。 2 封装原则 将不需要对外提供的内容都隐藏起来 把属性隐藏&#xff0c;提供公共方法对其访问。 3 封装好处 隐藏实现细节&#xff0c;提供公共的访问方式 提高了代…

目标检测——YOLO系列学习(一)YOLOv1

YOLO可以说是单阶段的目标检测方法的集大成之作&#xff0c;必学的经典论文&#xff0c;从准备面试的角度来学习一下yolo系列。 YOLOv1 1.RCNN系列回顾 RCNN系列&#xff0c;无论哪种算法&#xff0c;核心思路都是Region Proposal&#xff08;定位&#xff09; classifier&am…

链表拓展之双向链表

前言 在前面已经总结了单链表&#xff0c;有了单链表的基础会很好理解双链表的实现&#xff0c;忘记了可以跳转——>http://t.csdnimg.cn/GFPk9 接下来就由我带着各位看官来认识今天的主角吧~ 什么是双向链表 在单链表的基础上&#xff0c;它有两个方向的链接&#xff0c;一…

Java -- (part10)

一.继承 1.概述 子类继承父类,可以直接使用父类中非私有成员,子类不用写重复代码,提高了代码的复用性 2.关键字 extends 3.成员访问特点 a.成员变量 看等号左边是谁,先调用谁中的成员变量,子类没有找父类 b.成员方法 看new的是谁,先调用谁中的成员方法,子类没有找父类 …