[并发与并行] python如何构建并发管道处理多阶段任务?

文章目录

  • 1. 背景&目标
  • 2. show me the code
  • 3.知识点
  • 4. 总结

1. 背景&目标

背景:一个任务可分为多个阶段(各个阶段非CPU密集型任务,而是属于IO密集型任务),希望每个阶段能够交给各自的线程去执行。
目标:构建支持多并发的稳定的程序高效处理上述问题的程序,要求能够灵活设置并发。

2. show me the code

假设任务分为三个阶段,分别是download、resize和upload,代码采用管道将三个阶段进行拼接。
在这里插入图片描述
三个阶段的处理简化为三个函数,处理结果通过Queue传到下个阶段,各个阶段可以创建不同的线程去消费Queue中的数据,直到所有数据处理完成。代码如下:

from threading import Thread
from queue import Queue
import timemy_queue = Queue()class ClosableQueue(Queue):SENTINEL = object()def close(self):self.put(self.SENTINEL)def __iter__(self):while True:item = self.get()try:if item is self.SENTINEL:# 退出机制returnyield itemfinally:self.task_done()class Worker(Thread):def __init__(self, func, in_queue, out_queue):super().__init__()self.func = funcself.in_queue = in_queueself.out_queue = out_queuedef run(self):# 确保线程退出的机制:在生产者-消费者模型中,通常需要使用特殊标志(如 SENTINEL)通知消费者线程结束循环。for item in self.in_queue:result = self.func(item)self.out_queue.put(result)def download(obj):print(f'[download] id= {id(obj)}')time.sleep(0.1)return objdef resize(obj):print(f'[resize] id= {id(obj)}')time.sleep(0.01)return objdef upload(obj):print(f'[upload] id= {id(obj)}')time.sleep(1)return objdef start_threads(count, *args):threads = [Worker(*args) for _ in range(count)]for thread in threads:thread.start()return threadsdef stop_threads(closable_queue, threads):# close次数根据threads次数来,保障每个每个线程都能正确关闭for _ in threads:closable_queue.close()# 阻塞调用线程,直到队列中的所有任务都被处理完成# 每次向队列中添加一个任务(通过 .put()),队列内部的任务计数器会增加 1。# 每次调用 .task_done(),任务计数器会减少 1。# .join() 方法会一直阻塞,直到任务计数器降为 0,也就是队列中的所有任务都被标记为完成(通过调用 .task_done())closable_queue.join()for thread in threads:thread.join()if __name__ == '__main__':download_queue = ClosableQueue()resize_queue = ClosableQueue()upload_queue = ClosableQueue()done_queue = ClosableQueue()download_threads = start_threads(3, download, download_queue, resize_queue)resize_threads = start_threads(4, resize, resize_queue, upload_queue)upload_threads = start_threads(5, upload, upload_queue, done_queue)obj = object()for _ in range(1000):download_queue.put(obj)stop_threads(download_queue, download_threads)stop_threads(resize_queue, resize_threads)stop_threads(upload_queue, upload_threads)print(done_queue.qsize(), 'items fininished')

3.知识点

上述代码涉及到几个知识点,挺有意思的:

  1. 当我们想用queue来传递数据时,头疼的点在于:
    ①下游任务该怎么判断上游生产了数据呢?轮巡有点不优雅,可能会造成性能影响。
    ②上游任务啥时候告诉下游数据生产完毕了呢?可以通过插入一个特殊的数据告诉下游生产完毕了。
    ③队列该设置多大呢?如果下游数据消费不过来,上游一直生产数据插入到队列,容易oom。
    ④怎么判断中间队列的数据消费完毕了呢?即如何优雅地结束程序。

上述代码利用Queue类的特性比较优雅地解决了上述的几个问题:
① Queue非常优雅,设置size之后,如果size满了,put方法会阻塞,直到数据被消费了才可以往里面添加数据。当queue为空,get方法会阻塞,直到有数据进来。
② 如代码中SENTINEL对象,如果调用close方法,就往队列中插入一个哨兵对象,告诉下游,上游数据生产完毕了。
③ 代码中没有设置队列大小,但是Queue支持设置。
④ Queue中.join() 方法会一直阻塞,直到任务计数器降为 0,也就是队列中的所有任务都被标记为完成。
其中,任务计数器的原理为:

  • 每次向队列中添加一个任务(通过 .put()),队列内部的任务计数器会增加 1。
  • 每次调用 .task_done(),任务计数器会减少 1。

注意看,改写__iter__方法时,每次获取一个元素都调用了一次task_done()方法,即告诉任务计数器需要-1了。

4. 总结

这段代码还是比较精髓,其中关于队列Queue的用法,关于threads的用法和线程中共享数据的用法值得学习。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/891066.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

07 基于OpenAMP的核间通信方案

引言 ZYNQ7020有两个CPU核心,这两个核心可以采用SMP或AMP方式进行调度,当采用AMP方式进行调度时核0和核1可以运行不同的操作系统,如核0运行Linux系统,提供有些复杂的用户交互工作,核1运行实时操作系统,对设…

7.若依参数设置、通知公告、日志管理

参数设置 对系统中的参数进行动态维护。 关闭验证码校验功能 打开页面注册功能 需要修改前端页面代码 通知公告 促进组织内部信息传递 若依只提供了一个半成品,只实现了管理员可以添加通知公告。 日志管理 追踪用户行为和系统运行状况。 登录日志 和操作日志…

基于Docker+模拟器的Appium自动化测试(二)

模拟器的设置 打开“夜神模拟器”的系统设置,切换到“手机与网络”页,选中网络设置下的“开启网络连接”和“开启网络桥接模式”复选框,而后选择“静态IP”单选框,在IP地址中输入“192.168.0.105”,网关等内容不再赘述…

大数据技术-Hadoop(三)Mapreduce的介绍与使用

目录 一、概念和定义 二、WordCount案例 1、WordCountMapper 2、WordCountReducer 3、WordCountDriver 三、序列化 1、为什么序列化 2、为什么不用Java的序列化 3、Hadoop序列化特点: 4、自定义bean对象实现序列化接口(Writable) 4…

【数据仓库】SparkSQL数仓实践

文章目录 集成hive metastoreSQL测试spark-sql 语法SQL执行流程两种数仓架构的选择hive on spark数仓配置经验 spark-sql没有元数据管理功能,只有sql 到RDD的解释翻译功能,所以需要和hive的metastore服务集成在一起使用。 集成hive metastore 在spark安…

基本算法——回归

本节将通过分析能源效率数据集(Tsanas和Xifara,2012)学习基本的回归算法。我们将基 于建筑的结构特点(比如表面、墙体与屋顶面积、高度、紧凑度)研究它们的加热与冷却负载要 求。研究者使用一个模拟器设计了12种不…

V-Express - 一款针对人像视频生成的开源软件

V-Express是腾讯AI Lab开发的一款针对人像视频生成的开源软件。它旨在通过条件性丢弃(Conditional Dropout)技术,实现渐进式训练,以改善使用单一图像生成人像视频时的控制信号平衡问题。 在生成过程中,不同的控制信号&…

Java与SQL Server数据库连接的实践与要点

本文还有配套的精品资源,点击获取 简介:Java和SQL Server数据库交互是企业级应用开发中的重要环节。本文详细探讨了使用Java通过JDBC连接到SQL Server数据库的过程,包括加载驱动、建立连接、执行SQL语句、处理异常、资源管理、事务处理和连…

学习记录—正则表达式-基本语法

正则表达式简介-《菜鸟教程》 正则表达式是一种用于匹配和操作文本的强大工具,它是由一系列字符和特殊字符组成的模式,用于描述要匹配的文本模式。 正则表达式可以在文本中查找、替换、提取和验证特定的模式。 本期内容将介绍普通字符,特殊…

企业安装加密软件有什么好处?

加密软件为企业的安全提供了很多便利,从以下几点我们看看比较重要的几个优点: 1、数据保护:企业通常拥有大量的商业机密、客户数据、技术文档等敏感信息。加密软件可以对这些信息进行加密处理,防止未经授权的人员访问。即使数据被…

京东供应链创新与实践:应用数据驱动的库存选品和调拨算法提升履约效率

2024 年度总结系列 2024 年 10 月,京东零售供应链技术团队凭借其在库存选品与调拨技术上的创新与实践,荣获运筹与管理学领域的国际顶级奖项 Daniel H. Wagner Prize。本文为您介绍获奖背后的供应链技术创新和落地应用。 00 摘要 在电商行业中&#x…

大数据技术-Hadoop(二)HDFS的介绍与使用

目录 1、HDFS简介 1.1 什么是HDFS 1.2 HDFS的优点 1.3、HDFS的架构 1.3.1、 NameNode 1.3.2、 NameNode的职责 1.3.3、DataNode 1.3.4、 DataNode的职责 1.3.5、Secondary NameNode 1.3.6、Secondary NameNode的职责 2、HDFS的工作原理 2.1、文件存储 2.2 、数据写…

在 C# 中优化 JPEG 压缩级别和文件大小

此示例可让您检查不同 JPEG 压缩级别的图像质量。使用文件菜单的打开命令加载图像文件。然后使用“JPEG 压缩指数 (CI)”组合框选择压缩级别。程序将图像保存到具有该压缩级别的临时文件中,并显示生成的图像和文件大小。 该程序的关键是以下SaveJpg方法,…

Pandas02

Pandas01: Pandas01 文章目录 内容回顾1 数据的读取和保存1.1 读写Excel文件1.2 读写CSV1.3 读写Mysql 2 DataFrame 数据查询2.1 筛选多列数据2.2 loc 和 iloc2.3 query查询方法和isin 方法 3 DataFrame增 删 改数据3.1 增加一列数据3.2 删除一行/一列数据3.3 数据去重3.4 数据…

Flink定时器

flink的定时器都是基于事件时间(event time)或事件处理时间(processing time)的变化来触发响应的。对一部分新手玩家来说,可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解,防止下面懵逼。…

Docker中的分层(Layer)

docker中有分层的概念,如下图所示 上面是容器层(Container layer),下面是镜像层(Image layers)。 镜像层的内容是静态的,读和写的操作,都是在容器层发生,专门为容器的读…

RoboMIND:多体现基准 机器人操纵的智能规范数据

我们介绍了 RoboMIND,这是机器人操纵的多体现智能规范数据的基准,包括 4 个实施例、279 个不同任务和 61 个不同对象类别的 55k 真实世界演示轨迹。 工业机器人企业 埃斯顿自动化 | 埃夫特机器人 | 节卡机器人 | 珞石机器人 | 法奥机器人 | 非夕科技 | C…

python报错ModuleNotFoundError: No module named ‘visdom‘

在用虚拟环境跑深度学习代码时,新建的环境一般会缺少一些库,而一般解决的方法就是直接conda install,但是我在conda install visdom之后,安装是没有任何报错的,conda list里面也有visdom的信息,但是再运行代…

C语言性能优化:从基础到高级的全面指南

引言 C 语言以其高效、灵活和功能强大而著称,被广泛应用于系统编程、嵌入式开发、游戏开发等领域。然而,要写出高性能的 C 语言代码,需要对 C 语言的特性和底层硬件有深入的了解。本文将详细介绍 C 语言性能优化的背后技术,并通过…

go多版本管理工具g win安装配置

go多版本管理工具g 基本介绍仓库安装配置配置环境配置系统变量配置path变量测试使用配置完环境变量之后,打开终端进行测试使用查看 g 的环境变量配置,g env 为环境变量配置,g -v为当前版本信息查看可下载列表下载安装指定版本go,并…