在 Python 异步协程中使用同步队列

在 Python 异步协程中使用同步队列

  使用 Python asyncio 进行异步编程时,如果需要在协程间交互数据可以使用异步队列 asyncio.Queue。但 asyncio.Queue 不是线程安全的,如果需要在不同线程间的异步程序之间或者不同线程的异步程序和同步程序间交换数据,就需要使用 queue 模块中的 Queue 这个队列,因为它是线程安全的。如果队列数据的消费者是 asyncio 异步协程,使用 queue.Queue 时,就必须注意,以免引起协程的阻塞。下面,我们用程序实例来说明这个问题。
  先看一个简单的并发协程程序:

import asyncio
import queueclass TestAsync:def __init__(self):self.running_flag = Falseself.reader = Noneself.writer = Noneasync def function(self, display_string):n = 0while self.running_flag:print(f"{display_string} count = {n}")n += 1await asyncio.sleep(4)async def stop_tasks(self):await asyncio.sleep(20)self.running_flag = Falseprint("tasks stopped!")async def start(self):self.running_flag = Truetsk1 = asyncio.create_task(self.function("task 1"))tsk2 = asyncio.create_task(self.function("task 2"))tsk3 = asyncio.create_task(self.stop_tasks())await asyncio.gather(tsk1, tsk2, tsk3)if __name__ == "__main__":cl = TestAsync()asyncio.run(cl.start())

  这个程序被封装成一个类,入口是异步函数 start,它启动三个并发的异步任务:其中两个是从异步函数 function 生成的,它们各自每隔 4 秒打印一次参数中的任务字符串和执行的次数;另一个是终止函数,启动后 20 秒通过将成员变量 running_flag 置成 False 来终止上述两个任务,自己也随后退出。下面是这个程序在 Pycharm 中运行后打印的结果:

task 1 count = 0
task 2 count = 0
task 1 count = 1
task 2 count = 1
task 1 count = 2
task 2 count = 2
task 1 count = 3
task 2 count = 3
task 1 count = 4
task 2 count = 4
tasks stopped!进程已结束,退出代码0

  现在,我们想在另一个线程中通过发送队列消息的方式停止这个程序,于是我们在程序中增加一个全局变量(队列)stop_queue = queue.Queue(),并修改 stop_tasks,收到来自这个队列的消息后就设置 running_flag 为 False,停止所有任务。
  此外,我们编写了一个同步函数 stop_async_tasks,这个函数执行 20 秒后向 stop_queue 发送一条消息。
  最后,用启动上述异步任务的程序(run_async_tasks) 和 stop_async_tasks 生成两个线程,并发执行这两个线程,预期得到与上面程序执行的同样结果。
  下面是修改后的程序:

import asyncio
import threading
import queue
import time# 增加的全局变量
stop_queue = queue.Queue()class TestAsync:def __init__(self):self.running_flag = Falseself.reader = Noneself.writer = Noneasync def function(self, display_string):n = 0while self.running_flag:print(f"{display_string} count = {n}")n += 1await asyncio.sleep(4)# 这个函数做了修改,从队列中收到消息后设置running_flag, 退出程序。async def stop_tasks(self):p = stop_queue.get()self.running_flag = Falseprint(f"message {p} received, tasks stopped!")async def start(self):self.running_flag = Truetsk1 = asyncio.create_task(self.function("task 1"))tsk2 = asyncio.create_task(self.function("task 2"))tsk3 = asyncio.create_task(self.stop_tasks())await asyncio.gather(tsk1, tsk2, tsk3)# 启动任务线程使用的函数
def run_async_tasks():cl = TestAsync()asyncio.run(cl.start())# 停止任务线程使用的函数
def stop_async_tasks():time.sleep(20)stop_queue.put("stop")if __name__ == "__main__":# 启动任务的线程t1 = threading.Thread(target=run_async_tasks)t1.daemon = Truet1.start()# 停止任务的线程t2 = threading.Thread(target=stop_async_tasks)t2.daemon = Truet2.start()t1.join()

  执行这段程序,打印结果如下:

task 1 count = 0
task 2 count = 0
message stop received, tasks stopped!进程已结束,退出代码0

  这个结果和预期的不同,两个任务各只打印了一次,然后,时间到了20秒,程序就退出了。
  造成这个结果的原因是异步函数 stop_tasks 中,从队列中获取消息(p = stop_queue.get())的操作。虽然表面上看,它是在一个异步任务中进行的,但实际上它阻塞的不仅仅是这个异步任务,而是包括所有任务的整个线程,tsk1 和 tsk2 的执行被阻塞,直到 tsk3 收到队列消息,将 running_flag 置为 False,tsk1 和 tsk2 随即退出,后面的打印自然就无法进行了。
  解决这一问题的方法是将线程的等待转换为异步的等待,我们可以用两种方式进行这种转换。第一种是将同步等待转换为异步操作,详细情况可以参见我的另一篇博客:Python 异步程序和同步程序的交互;这里要介绍的是第二种方法,它比第一种方法更为简单。在前面第二个程序中,修改 stop_tasks 程序(其他部分均无须改动)如下:

    async def stop_tasks(self):while stop_queue.empty():await asyncio.sleep(0.2)p = stop_queue.get()self.running_flag = Falseprint(f"message {p} received, tasks stopped!")

  由于 Queue.empty() 不涉及等待,所以,这段程序就将 get 操作的等待变为异步的延时等待,直至队列中有消息才退出等待,获取消息后进行终止操作。由于 asyncio.sleep 只阻塞当前任务,所以它不会影响 tsk1 和 tsk2 的执行。
  程序运行的结果如下:

task 1 count = 0
task 2 count = 0
task 2 count = 1
task 1 count = 1
task 1 count = 2
task 2 count = 2
task 2 count = 3
task 1 count = 3
task 2 count = 4
task 1 count = 4
task 1 count = 5
task 2 count = 5
message stop received, tasks stopped!进程已结束,退出代码0

  结果与第一个程序相仿,打印 了6 次而不是 5 次的原因是线程的启动及操作需要更多的时间,由此造成了延时的误差。
  在并发的异步程序中,发生这类问题的原因往往是错用了同步程序中有关阻塞的操作,例如,在该用 asynio.sleep 的地方错用了 time.sleep,也会引起整个线程中并发协程的阻塞。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/6431.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++实战篇(三) ——对socket通讯服务端与客户端的封装

前言 在前面几篇文章中我们介绍过一些有关于网络编程的相关知识,我们介绍了在tcp传输数据的时候发送缓冲区与接收缓冲区,我们介绍了在tcp发送与接收的过程中可能出现的分包与粘包的问题: c理论篇(一) ——浅谈tcp缓存与tcp的分包与粘包 我们介绍了在网络…

MongoDB与Elasticsearch特性及知识点对比

仅作为技术选型和面试参考。对比记忆更佳。 目录 1.数据模型 2.索引机制 3. 查询性能 4.分布式架构 5.数据一致性 6.文档大小限制 7.存储引擎 8.数据压缩 10.实时搜索 11.安全性 12.版本控制 13 事务支持 14.地理空间搜索 15.多租户支持 16 运维复杂度 17,监…

直播素材安卓情侣飞行棋v2.22 仿dofm 支持自定义模式—可用直播素材

一个情侣间增进友谊的小游戏非常好玩,适合男孩女孩之间增进感情!快和你暗恋的女孩一块玩吧,极速升温 永久免费!解锁激活码内容全部畅玩!全网最强超级给力!真人说书音频 网盘自动获取 链接:http…

重要综述!全文翻译!宫鹏教授、陈镜明教授、梁顺林教授等《Nature Climate Change》!

2013年,由宫鹏教授、陈镜明教授和梁顺林教授等联合发表了一篇《Nature Climate Change》综述文章,其主题是卫星遥感在全球变化中的作用研究。(已被引510次,来源谷歌学术)。 卫星遥感方式对于气象问题、大气、陆地和海洋…

【机器学习】集成方法---Boosting之AdaBoost

一、Boosting的介绍 1.1 集成学习的概念 1.1.1集成学习的定义 集成学习是一种通过组合多个学习器来完成学习任务的机器学习方法。它通过将多个单一模型(也称为“基学习器”或“弱学习器”)的输出结果进行集成,以获得比单一模型更好的泛化性…

【中断】【ARM64】学习总结

optee中的异常向量表解读–中断处理解读 https://mp.weixin.qq.com/s/gBsy4YDYTHGRsy2zcVr6Vg

操作系统:磁盘交换空间

什么是磁盘交换空间? 磁盘交换空间(swap space)是在磁盘上预留出来的一块区域,用作补充系统物理内存(RAM)的一种方式。当系统的物理内存不足以存储当前所有活动进程所需的数据时,操作系统会将一部分暂时不用或使用较少的内存数据…

windows ubuntu sed,awk,grep篇:13.其他 awk 命令

目录 85. 使用 printf 格式化输出 86. awk 内置数值函数 87. 随机数生成器 88. 常用字符串函数 89. GAWK/NAWK 的字符串函数 90. GAWK 字符串函数 91.处理参数(ARGC,ARGV,ARGIND) 92. OFMT 93. GAWK 内置的环境变量 94. pgawk – awk 运行分析器 95. 位操作 96.用户…

Linux搭建sqlilabs靶场

提前准备: 文章中所使用到的Linux系统:Ubantu20.4sqlilabs靶场下载地址:GitHub - Audi-1/sqli-labs: SQLI labs to test error based, Blind boolean based, Time based. 一. 安装phpstudy phpstudy安装命令:wget -O install.sh h…

python 的继承、封装和多态

1. 继承(Inheritance) 继承是面向对象编程中的一个重要概念,它允许一个类(子类)继承另一个类(父类)的属性和方法。子类可以重用父类的代码,同时也可以扩展或修改父类的行为。 常用…

托普利兹矩阵(T矩阵)及其应用(Matlab demo测试)

托普利兹矩阵(T矩阵)及其应用(Matlab demo测试) 1. 概念2. Matlab简单测试2.1 生成测试2.2 基本性质及原理2.3 性质验证 3. 其他应用总结3.1 其他性质3.2 文献阅读看到的 参考资料 1. 概念 托普利兹矩阵,简称为T型矩阵…

H3C MSTP 实验

H3C MSTP 实验 实验拓扑 ​​ 实验需求 所有交换机上创建 Vlan10,Vlan20,Vlan30 和 Vlan40所有交换机之间的端口配置为 Trunk,并放行相关 VLAN按照图示分区域配置 MSTP,并配置主备根网桥 实验步骤 VLAN基础配置(…

力扣面试150 简化路径 栈 模拟

Problem: 71. 简化路径 思路 &#x1f469;‍&#x1f3eb; 三叶题解 复杂度 时间复杂度: O ( n ) O(n) O(n) 空间复杂度: O ( n ) O(n) O(n) Code class Solution {public String simplifyPath(String path){ArrayDeque<String> d new ArrayDeque<>();…

2022 亚马逊云科技中国峰会,对话开发者论坛

目录 前言 最近整理资料发现还有一些前 2 年的内容没发出来&#xff0c;故补发记录&#xff0c;每年都有新的感悟。 开发者论坛 1. 你认为什么是开发者社区&#xff0c;如何定义一个成功的开发者社区&#xff1f; 我认为可以把开发者社区看成一个 “产品” 来对待&#xff…

【RAG 论文】GenRead:“generate-read“ 可能比 “retrieve-read“ 更有效

论文&#xff1a;Generate rather than Retrieve: Large Language Models are Strong Context Generators ⭐⭐⭐⭐ ICLR 2023 Code: github.com/wyu97/GenRead 一、论文速读 该工作发现&#xff1a;由 LLM 生成的文档中&#xff0c;往往比 retrieved documents 更可能包含正确…

C++校招八股

c类的访问权限与继承方式 公有成员在任何地方都可以被访问&#xff0c;包括类的外部和派生类。受保护成员在类的内部和派生类中可以被访问&#xff0c;但在类的外部不可访问。 私有成员只能在类的内部访问&#xff0c;包括类的成员函数和友元函数&#xff0c;不允许在类的外部…

一步一步写线程之十一线程池应用内存池

一、内存池 内存池&#xff0c;非常好理解&#xff0c;就是存储内存的一个池子&#xff08;Pool&#xff09;&#xff0c;一般来说&#xff0c;都是使用各种容器或者自己实现的类似容器的内存管理类。内存池其实就是为了解决两个主要问题&#xff0c;一个是内存反复分配回收的…

关于“泼辣”DB 你应该知道的几件事

PolarDB PolarDB for PostgreSQL&#xff08;以下简称 PolarDB&#xff09;是一款阿里云自主研发的企业级数据库产品&#xff0c;采用计算存储分离架构&#xff0c;100% 兼容 PostgreSQL。 PolarDB 的存储与计算能力均可横向扩展&#xff0c;具有高可靠、高可用、弹性扩展等企…

文件(夹)批量重命名数字、字母、日期、中文数字大写小写

首先&#xff0c;需要用到的这个工具&#xff1a; 度娘网盘 提取码&#xff1a;qwu2 蓝奏云 提取码&#xff1a;2r1z 目标是重命名下面5个文件&#xff08;也可以是文件夹等&#xff0c;任意&#xff09;&#xff0c;从大写中文数字“贰”开始 打开工具&#xff0c;找到“文…

使用机器学习确定文本的编程语言

导入必要的库 norman Python 语句&#xff1a;import <span style"color:#000000"><span style"background-color:#fbedbb"><span style"color:#0000ff">import</span> pandas <span style"color:#0000ff&quo…