Python:从头创建 Asyncio (2)

引言

现在,asyncio 已成为 Python 社区中的热门话题,并且名副其实——它提供了一种非常出色的处理 I/O 密集型程序的方法!在我探索 asyncio 的过程中,我起初并不太明白它的工作原理。但随着深入学习,我意识到 asyncio 实际上是在 Python 生成器的基础上增加了一层非常便利的封装。

本文[1]中,我将展示如何仅用 Python 生成器来构建一个 asyncio 的简化模型。接着,我会演示如何利用 await 魔法方法,将示例代码改写为使用 async 和 await 关键字。最终,我会将我的简化版本替换为官方的 asyncio 库。通过这个过程,我相信你将对 asyncio 的神奇之处有一个更深入的理解。

Sleeping

如果我们沿用之前示例中的代码,我们可以通过 yield from 的应用,为我们的任务嵌入子生成器。例如,我在这里引入了一个休眠生成器,它会在指定的时间到达之前暂停任务的执行。这种机制之所以有效,是因为 sleep 函数会连续产生 yield,直到经过了设定的秒数,然后它将跳出 while 循环。

由于 sleep 函数中没有其他 yield 语句,这将引发一个 StopIteration 异常,这个异常告诉 yield from 语句在任务函数中跳过当前的生成器,继续执行下一行代码。

import time

def sleep(seconds):
   start_time = time.time()
   while time.time() - start_time < seconds:
       yield

def task1():
   while True:
       print('Task 1')
       yield from sleep(1)

def task2():
   while True:
       print('Task 2')
       yield from sleep(5)

event_loop = [task1(), task2()]

while True:
   for task in event_loop:
       next(task)

输出:

Task 1
Task 2
Task 1
Task 1
Task 1
Task 1
Task 2
Task 1
…

Yield to Await

我们现在可以将之前的代码示例,通过应用 _await__ 魔术方法和 async 关键字,从使用 yield 转变为使用 await。如果一个类定义了 _await__ 方法,我们就可以在该类的实例前加上 await 关键字来调用这个方法。在 asyncio 框架中,你通常通过调用如 asyncio.create_task 这样的函数来处理 Task 对象。这些 Task 对象是从 asyncio 的 Future 对象派生而来的,而 Future 对象定义了 _await__ 方法。我们还可以在协程前使用 await,协程是在函数定义时加上 async 关键字生成的对象。协程和生成器函数类似,它们的执行都能够被挂起和恢复。

你可以将 await 关键字理解为 yield from 的一个变体,它附带了一些额外的验证规则。因此,当你在代码中写 await object 时,你实际上是在指示从 "object" 类的实例中调用 _await__ 方法,或者 "object" 本身可能就是另一个协程(类似于子生成器)。

实际上,你甚至可以查看 Asyncio 的源代码,发现 Future 对象中的 _await__ 方法在调用时,如果未来(或任务)尚未完成,它基本上只是执行了 yield 操作。

alt

要将我们在上一节中编写的代码转移到使用 async 和 await,我们首先需要创建自己的 Task 类,因为函数不能具有 await dunder 方法。下面是我想出的一个简单版本:

from queue import Queue


event_loop = Queue()

class Task():
    def __init__(self, generator):
        self.iter = generator
        self.finished = False

    def done(self):
        return self.finished

    def __await__(self):
        while not self.finished:
            yield self


def create_task(generator):
    task = Task(generator)
    event_loop.put(task)

    return task

这一次,我们改用队列而非 Python 列表来构建事件循环,这样做更合理,因为我们希望添加或移除任务的操作能够快速完成,即在常数时间内完成。

在我们的 Task 类中,我们将生成器对象保存在 self.iter 属性中,并设置 self.finished 属性为 False,用以跟踪生成器是否已经运行完毕(当生成器引发 StopIteration 异常时,表示其运行结束)。Task 对象还定义了一个 await 魔术方法,这个方法将持续地将控制权交还给事件循环,直到任务完成。完成 Task 对象的创建后,我们使用 create_task 辅助函数将它加入到事件循环中,这将安排它按计划执行。

接下来,我们将构建事件循环管理器,它负责驱动任务的执行。

def run(main):
    event_loop.put(Task(main))

    while not event_loop.empty():
        task = event_loop.get()
        try:
            task.iter.send(None)
        except StopIteration:
            task.finished = True
        else:
            event_loop.put(task)

你或许已经发现,我们的实现开始接近 asyncio 的实际 API。要启动事件循环,我们需要通过一个初始函数来调用 run。这个函数首先将主函数封装进 Task 对象,并加入到事件循环中。随后,while 循环会启动,并且在每次迭代中,通过队列来获取下一个待执行的任务。现在我们使用 task.iter.send(None) 替代了 next(task.iter),这在使用 async/await 关键字时显得有些奇特,但功能上是一致的。我们还需要将这个调用放在 try-except 块中,以便在抛出 StopIteration 异常时,可以将 task.finished 设置为 True;如果没有异常抛出,代码将执行 else 语句,这会把任务重新放回事件循环中,以便再次执行。

接下来,我们需要对 sleep 函数进行异步兼容改造。之前,我们通过一个带有 while 循环和单个 yield 的生成器函数来实现休眠功能。尽管我偏爱这种方法,但 await 关键字不能与生成器函数一起使用——它需要是一个定义了 await 魔术方法的对象或是一个协程函数。因此,为了解决这个问题,我将代码迁移到了另一个函数中,现在实际的 sleep 函数会创建一个任务对象并等待它完成。这个 await 调用将触发 Task 对象内的 await 方法,随后执行 yield,允许事件循环转向其他任务。当事件循环处理到新的 _sleep 任务时,它会检查时间,如果时间未到,同样会执行 yield,将控制权交还给事件循环。如果休眠的任务再次被事件循环调用,就像生成器保存其状态一样,协程仍在等待 sleep 函数返回。由于 sleep 函数还在等待 _sleep 任务完成,任务的 await 魔术方法将再次被调用,由于任务尚未结束,魔术方法中的 yield 将再次被执行。

import time

def _sleep(seconds):
    start_time = time.time()
    while time.time() - start_time < seconds:
        yield


async def sleep(seconds):
    task = create_task(_sleep(seconds))
    return await task

以下是所有代码的汇总:

from queue import Queue
import time


event_loop = Queue()


def _sleep(seconds):
    start_time = time.time()
    while time.time() - start_time < seconds:
        yield


async def sleep(seconds):
    task = create_task(_sleep(seconds))
    return await task


class Task():
    def __init__(self, generator):
        self.iter = generator
        self.finished = False

    def done(self):
        return self.finished

    def __await__(self):
        while not self.finished:
            yield self


def create_task(generator):
    task = Task(generator)
    event_loop.put(task)

    return task


def run(main):
    event_loop.put(Task(main))

    while not event_loop.empty():
        task = event_loop.get()
        try:
            task.iter.send(None)
        except StopIteration:
            task.finished = True
        else:
            event_loop.put(task)

既然我们已经成功构建了事件循环、任务创建机制和 sleep 函数,接下来我们可以引入名为 "jacobio.py" 的文件,并把之前使用 yield 语句的部分替换成 await 调用。同时,我们需要在那些使用了 await 的函数前加上 async 关键字,以表明这些函数是异步的,并且可以被其他代码等待执行。最后,我们还需要像在 asyncio 库中那样编写一个主函数,用于将任务排入事件循环的执行队列中。

import jacobio

async def task1():
    for _ in range(2):
        print('Task 1')
        await jacobio.sleep(1)

async def task2():
    for _ in range(3):
        print('Task 2')
        await jacobio.sleep(0)

async def main():
    one = jacobio.create_task(task1())
    two = jacobio.create_task(task2())

    await one
    await two
    
    print('done')


if __name__ == '__main__':
    jacobio.run(main())

输出:

Task 1
Task 2
Task 2
Task 2
Task 1
done

Await with AsyncIO

现在,我们可以从上面获取代码,并将所有出现的“jacobio”替换为“asyncio”,我们现在完全使用 asyncio 包!

import asyncio

async def task1():
    for _ in range(2):
        print('Task 1')
        await asyncio.sleep(1)

async def task2():
    for _ in range(3):
        print('Task 2')
        await asyncio.sleep(0)

async def main():
    one = asyncio.create_task(task1())
    two = asyncio.create_task(task2())

    await one
    await two
    
    print('done')


if __name__ == '__main__':
    asyncio.run(main())

Asyncio 在后台执行了许多复杂的操作,但我们成功地从基础的生成器出发,一步步重建了 asyncio 的核心功能!我努力使事件循环管理器的设计尽可能简洁,尽管这仅是 asyncio 工作理念的简化版,与实际的库相比,我的实现在细节上与官方源代码的执行流程有所不同。此外,既然我们现在拥有了完整的 asyncio 库的功能,就无需为了同时等待两个任务而分别创建它们;我们完全可以使用 asyncio.gather() 这样的函数来同时管理多个任务。

Reference
[1]

Source: https://jacobpadilla.com/articles/recreating-asyncio

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/27236.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 反射机制 -- Java 语言反射的概述、核心类与高级应用

大家好,我是栗筝i,这篇文章是我的 “栗筝i 的 Java 技术栈” 专栏的第 010 篇文章,在 “栗筝i 的 Java 技术栈” 这个专栏中我会持续为大家更新 Java 技术相关全套技术栈内容。专栏的主要目标是已经有一定 Java 开发经验,并希望进一步完善自己对整个 Java 技术体系来充实自…

GitLab教程(二):快速上手Git

文章目录 1.将远端代码克隆到本地2.修改本地代码并提交到远程仓库3.Git命令总结git clonegit statusgit addgit commitgit pushgit log 首先&#xff0c;我在Gitlab上创建了一个远程仓库&#xff0c;用于演示使用Gitlab进行版本管理的完整流程&#xff1a; 1.将远端代码克隆到本…

导出 Whisper 模型到 ONNX

前言 在语音识别领域&#xff0c;Whisper 模型因其出色的性能和灵活性备受关注。为了在更多平台和环境中部署 Whisper 模型&#xff0c;导出为 ONNX 格式是一个有效的途径。ONNX&#xff08;Open Neural Network Exchange&#xff09;是一个开放格式&#xff0c;支持不同的深度…

吴恩达2022机器学习专项课程C2W3:2.25 理解方差和偏差(诊断方差偏差正则化偏差方案搭建性能学习曲线)

目录 引言名词替代影响模型偏差和方差的因素1.多项式阶数2.正则化参数 判断是否有高偏差或高方差1.方法一&#xff1a;建立性能基准水平2.方法二&#xff1a;建立学习曲线 总结 引言 机器学习系统开发的典型流程是从一个想法开始&#xff0c;然后训练模型。初次训练的结果通常…

今日分享丨inBuilder低代码平台打印格式设计器

打印在企业日常办公中占据核心地位&#xff0c;是处理各种关键文件不可或缺的一环。无论是签署合同、报销费用、记录凭证与账表、处理回单与库存单据、开出库单据&#xff0c;还是开具发票、制作条码与标签&#xff0c;打印都发挥着至关重要的作用&#xff0c;确保企业运营的高…

rv1126-rv1109-串口显示路径不变化

串口只有#, 后来看了教程改成如下 但是没有变化,那个路径都只显示rootLonbon# 于是最后改成了这样 因为:

linux 安装sftp及使用sftp上传和下载

一、centos7 安装sftp 1.安装 OpenSSH 服务&#xff1a; sudo yum install openssh-server2.启动 SSH 服务&#xff0c;并设置为开机启动&#xff1a; sudo systemctl start sshd sudo systemctl enable sshd3.创建一个新用户&#xff0c;用于SFTP连接&#xff08;替换your_…

【C++高阶】C++继承学习手册:全面解析继承的各个方面

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ ⏩收录专栏⏪&#xff1a;C “ 登神长阶 ” &#x1f921;往期回顾&#x1f921;&#xff1a;模板进阶 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 继承 &#x1f4d6;1. 继承的概念及定义…

MPLS提高网络服务质量的原理

MPLS&#xff08;Multiprotocol Label Switching&#xff0c;多协议标签交换&#xff09;是一种网络技术&#xff0c;它能够提高网络的服务质量&#xff08;Quality of Service&#xff0c;QoS&#xff09;以及整体性能。MPLS通过以下几种方式来提升网络服务质量&#xff1a;标…

220v转3v用多大电阻

在设计一个电压转换电路&#xff0c;将220V交流&#xff08;AC&#xff09;电压转换为3V直流&#xff08;DC&#xff09;电压时&#xff0c;我们需要考虑几个关键因素&#xff0c;包括安全、效率和电路的稳定性。AH8651是一款DC-DC转换器&#xff0c;通常用于将较高的输入电压转…

如何基于 Python 快速搭建 QQ 开放平台 QQ 群官方机器人详细教程(更新中)

注册 QQ 开放平台账号 账号注册 QQ 机器人&#xff1a;一个机器人可以被添加到 群聊/频道 内对话&#xff0c;QQ 用户也可以直接跟机器人 单独对话。 开发者账号主体要求 单聊对话&#xff1a;【定向邀请】 群聊场景&#xff1a;仅支持企业主体【个人主体暂不支持】 频道场…

笔记98:按列压缩矩阵 csc_matrix 的 “含义”

1. 如何按列压缩矩阵&#xff1a; 注&#xff1a;按列压缩&#xff08;Compressed Sparse Column -- CSC&#xff09;&#xff0c;是一种使用三个特征数组就可以表示整个矩阵的方法&#xff1b; 标准二次规划问题 &#xff1a;状态量&#xff1a;矩阵&#xff1a;向量&#xff…

Linux内核驱动入门 编译环境搭建、编译内核

文章目录 前言搭建内核驱动编译环境下载交叉编译工具编译内核minicom工具使用找不到ttyUSB设备问题编译内核编译报错解决小坑编译选项说明 从零开始的驱动程序 前言 哎…有时候我都不知道自己是干啥的 说是运维吧&#xff0c;docker不会&#xff0c;k8s不会&#xff1b;说是驱…

【安卓】在安卓中使用HTTP协议的最佳实践

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

领夹无线麦克风哪个品牌好?分享麦克风什么牌子的音质比较好!

​无线领夹麦克风作为现代音频技术的杰出代表&#xff0c;正逐渐改变着我们的声音表达方式。它以其独特的便携性、稳定性和高音质&#xff0c;成为了众多声音创作者的首选工具。面对市场上琳琅满目的无线领夹麦克风选项&#xff0c;找到一款贴合个人需求的设备无疑是一项挑战。…

万字长文爆肝Spring(一)

Spring_day01 今日目标 掌握Spring相关概念完成IOC/DI的入门案例编写掌握IOC的相关配置与使用掌握DI的相关配置与使用 1&#xff0c;课程介绍 对于一门新技术&#xff0c;我们需要从为什么要学、学什么以及怎么学这三个方向入手来学习。那对于Spring来说: 1.1 为什么要学? …

金融科技助力绿色金融:可持续发展新动力

随着全球气候变化和环境问题的日益严重&#xff0c;绿色金融作为推动环境保护和经济可持续发展的重要手段&#xff0c;已经受到越来越多的关注。而金融科技&#xff0c;作为科技与金融深度融合的产物&#xff0c;正以其独特的优势为绿色金融的发展注入新动力。本文将探讨金融科…

Vue 路由:一级路由,嵌套路由

1、安装路由插件,因为用的是vue2 所以路由版本要和vue2对应上&#xff0c;所有有3 yarn add vue-router3 2、在main.js里引入 import VueRouter from vue-router Vue.use(VueRouter) 3、新建文件夹 router,创建index.js 4、引入路由插件&#xff0c;并且暴露出来这个路由 5、在…

基于机器学习的C-MAPSS涡扇发动机RUL预测

美国国家航空航天局的商用模块化航空推进仿真系统&#xff08;CMAPSS&#xff09;所模拟出的涡扇发动机性能退化数据进行实验验证&#xff0c;数据中包含有风扇、涡轮、压气机等组件参数。C-MAPSS中所包含的数据集可以模拟出从海平面到42千英尺的高度&#xff0c;从0到0.9马赫的…

一键实现电脑投屏到电视机,轻松享受更大画面

在数字化的今天&#xff0c;我们常常希望在更大的屏幕上分享电脑上的内容&#xff0c;观看视频、展示演示文稿&#xff0c;或者与家人一同欣赏照片。而实现电脑屏幕投射到电视机上&#xff0c;成为了许多人追求的方便而实用的功能。本文将为您详细介绍电脑投屏到电视机的方法&a…