102、Python并发编程:Queue与生产者消费者模型实现解耦、协作

引言

在实际业务场景中,很多时候在处理复杂任务的时候,会拆分上下游各个环节,形成一个类似于流水线的处理方式。上游类似于生产者,下游要依赖上游的输出进行工作,类似于消费者。但是,很多时候,上游和下游由于工作的复杂程度、环境、性能等,处理的速率不是太匹配。为了保证上下游各自按照自己的速率运转,从而保证最高的处理性能,通常的做法是引入一个中间件,比如缓存、消息队列。从而实现速率的匹配,以及对不同繁忙程度的任务进行削峰填谷,实现处理性能的平滑效果。

本文通过一个简化的生产者消费者模型,以及Queue的使用,来展现关于上述不同环节工作的协同与解耦的需求的实现。

本文的主要内容有:

1、生产者消费者模型

2、基于deque实现生产者消费者模型

3、基于Queue实现生产者消费者模型

4、deque与Queue的简单比较

生产者消费者模型

生产者消费者模型是一种常见的并发编程设计模式,适用于多个生产者和消费者之间进行任务或数据的共享与传递,从而实现解耦与高效协作。

该模型的核心思想是将生产数据的线程(生产者)与消费数据的线程(消费者)通过一个缓冲区(一般是线程安全的队列)进行连接。

实际工作中,有不少适用于生产者消费者模型的典型场景,简单列举如下:

1、任务调度系统

任务调度器充当生产者,将触发了待执行的任务放到队列中,多个工作线程充当消费者,从队列中取出任务进行执行。

2、日志处理系统

日志生成的线程充当生产者,将日志记录放入队列中,日志处理线程从队列中获取日志,进行相应的存储、分析等。

3、数据库连接池

客户端作为生产者请求数据查询操作,数据库连接池中的线程作为消费者接收客户端的请求,进行相应的查询处理,并返回结果。

4、流媒体服务器

视频编码器将视频帧数据放入队列中,流媒体服务器从队列中取出帧数据进行传输。

可以看到,涉及到复杂任务的拆解、解耦,同时需要进行上下游协同的需求场景,都很适合使用生产者消费者模型来实现。

下面通过deque和Queue分别来尝试模拟如下生产者消费模型的简单实现:

1、有2个工人,负责产品的生产,产品生产完成后放到仓库中。

2、有3个销售人员,负责产品的销售,销售的产品从仓库中出库,经过物流给到客户手中。

3、仓库是有成本的,所已存储空间是有限的,加入仓库最多存10个产品。

基于deque实现生产者消费者模型

首先我们通过Python中的deque这个集合类型,来尝试实现上面的生产者消费者模型的模拟需求。

直接看代码:

from collections import deque
from threading import Lock, Thread
import timeclass Worker(Thread):def __init__(self, warehouse, lock):super().__init__()self.warehouse = warehouseself.lock = lockdef run(self):while True:with lock:print(f'准备生产产品,当前库存:{len(self.warehouse)}')self.warehouse.append('这是个产品')print('生产完成')time.sleep(0.5)class Sales(Thread):def __init__(self, warehouse, lock):super().__init__()self.warehouse = warehouseself.lock = lockdef run(self):while True:with lock:try:print(f'成功开单,准备出库,当前库存:{len(self.warehouse)}')self.warehouse.popleft()print('销售完成')except IndexError as e:print('暂时没有可销库存')time.sleep(0.5)if __name__ == '__main__':warehouse = deque(maxlen=10)lock = Lock()for i in range(2):Worker(warehouse, lock).start()for i in range(3):Sales(warehouse, lock).start()

对上面的代码简要说明一下:

1、由于是多线程并发,而deque是非线程安全的,所以需要用到锁。

2、deque是非阻塞的,在队列满和空的时候,会有相应的异常处理逻辑。

程序的执行结果如下:

ad427574687417caf0a8e47a44413b44.jpeg

基于Queue实现生产者消费者模型

接下来看一下,通过本文的主角Queue来实现生产者消费者模型。

直接看代码:

from threading import Thread
from queue import Queue
import timeclass Worker(Thread):def __init__(self, warehouse):super().__init__()self.warehouse = warehousedef run(self):while True:print(f'准备生产产品,当前库存:{self.warehouse.qsize()}')self.warehouse.put('这是个产品')print('生产完成')time.sleep(0.2)class Sales(Thread):def __init__(self, warehouse):super().__init__()self.warehouse = warehousedef run(self):while True:print(f'成功开单,准备出库,当前库存:{self.warehouse.qsize()}')self.warehouse.get()print('销售完成')time.sleep(0.2)if __name__ == '__main__':warehouse = Queue(maxsize=10)for i in range(2):Worker(warehouse).start()for i in range(3):Sales(warehouse).start()

从代码量来看,稍微简洁一些,我们不需要额外引入锁了,也不需要进行异常判断。也能实现同样的执行效果。

结合源码,来简单看一下Queue的实现:

0af274d28ec62825899b3a9ffbc33380.jpeg

从源码中,可以知道,Queue = deque + mutex + not_empty + not_full:

1、Queue内部维持了一个deque对象,用于实现基本的队列操作。

2、mutex属性持有一个互斥锁,确保多个线程访问、修改队列时的互斥性,从而保证线程安全。

3、通过两个条件变量:not_empty、not_full,实现阻塞特性,从而更简洁地控制线程何时应该等待或者被唤醒,实现生产者消费者模型的同步机制。

所以,总结来说,Queue是1个基础的双端队列作为容器,持有一个互斥锁确保线程安全,通过非空、非满条件变量实现阻塞特性。

deque与Queue的简单比较

最后,我们来简单比较一下deque和Queue,从而后面更好地根据实际需求场景进行更好的选择。

1、线程安全特性

Queue是线程安全的,deque是非线程安全的。

2、阻塞操作

Queue是支持阻塞操作的,deque是不支持阻塞操作的,可以通过循环判断,反复确认队列的状态,会对CPU带来更多的消耗。

3、最大长度支持

deque是支持最大长度的,Queue内部也是通过deque来实现的,也是支持最大长度的。

4、阻塞超时机制

deque不支持阻塞,自然也不具备阻塞超时机制,Queue是支持的。

5、使用场景

Queue适合多线程、多进程的并发场景;deueue在单线程中更适用,而且性能较好。

总结

本文通过一个上下游任务协同的需求,引入了生产者消费者模型,同时通过两种方式模拟了生产者消费者模型的实现。最后,简单比较了deque和Queue的主要区别和适用场景。

感谢您的拨冗阅读。

d532dfa7605ea11de56ae92254b3c7e3.jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/58572.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Bash Shell - 获取日期、时间

1. 使用date获取日期 以下代码将date的执行结果存储在today变量中。date 是获取日期和时间的命令。 选择使用 quotes()或$ #!/bin/bashtodaydate echo $todaytoday$(date) echo $today 2. 使用 Format 输出所需日期和时间 date FORMAT 2.1 "MM-DD-YY" 形式输出…

【梯度提升专题】XGBoost、Adaboost、CatBoost预测合集:抗乳腺癌药物优化、信贷风控、比特币应用|附数据代码...

全文链接:https://tecdat.cn/?p38115 分析师:Yang Yang,Kechen Zhao 在当今科技日新月异的时代,数据的有效利用成为各领域突破发展的关键。于医疗领域,乳腺癌的高发性与严重性不容忽视,优化抗乳腺癌候选药物的筛选与特…

机器学习与AI|如何利用数据科学优化库存周转率?

对于所有零售商来说,良好的库存管理都是非常重要的。众所周知,商品如果不放在货架上就无法出售,而如果库存过多则意味着严重的财务负担。 但是做好库存管理绝非易事,它依赖于对未来需求的准确预测和确保始终有合适库存的敏捷供应链…

安卓智能对讲终端|北斗有源终端|三防对讲机|单兵终端|单北斗

在当今快速发展的通信技术时代,智能对讲手持机已成为众多行业领域中不可或缺的通讯工具。QM240T安卓智能对讲手持机,作为一款集先进技术与实用功能于一身的高端设备,凭借其卓越的性能和多样化的应用特性,正逐步引领对讲机市场的革…

uniapp-是否删除

代码 uni.showModal({title:提示,content:确定要删除此优惠券?,success: (re) > {if(re.confirm){common.request(post,/agent/coupon/delCoupon,{id:this.list[index].id}).then(res>{if(res.code1){uni.showToast({title:res.msg})this.list.splice(index,…

【数据集】【YOLO】【目标检测】抽烟识别数据集 6953 张,YOLO/VOC格式标注,吸烟检测!

数据集介绍 【数据集】抽烟识别数据集 6953 张,目标检测,包含YOLO/VOC格式标注。数据集中包含1种分类:“smoking”。数据集来自国内外图片网站和视频截图。检测范围园区吸烟检测、禁烟区吸烟检测、监控吸烟检测、无人机吸烟检测等。 主页私…

故事121

22年的十月份,在上海工作了三年多的我回到了老家。 前端,20年二本毕业的,当时在上海看老家的招聘信息,感觉很棒,很心动。又因为公司在大裁员,刚刚好在最后一轮裁员的时候,被裁了,拿了…

软件设计师-上午题-15 计算机网络(5分)

计算机网络题号一般为66-70题,分值一般为5分。 目录 1 网络设备 1.1 真题 2 协议簇 2.1 真题 3 TCP和UDP 3.1 真题 4 SMTP和POP3 4.1 真题 5 ARP 5.1 真题 6 DHCP 6.1 真题 7 URL 7.1 真题 8 浏览器 8.1 真题 9 IP地址和子网掩码 9.1 真题 10 I…

WebSocket实现消息实时推送

文章目录 websocket介绍特点工作原理 用websocket实现实时推送引入依赖WebSocket 函数定义变量声明初始化 WebSocket 连接WebSocket 连接的初始化和事件处理连接打开事件接收消息处理连接关闭和重连机制心跳机制使用 WebSocket代码完整显示 websocket介绍 WebSocket 是一种网络…

视频制作与剪辑怎么学,零基础入门视频剪辑和制作

视频制作与剪辑是一门充满创意与挑战的艺术形式,对于零基础的学习者来说,没选对软件不了解剪辑步骤,入门可能会显得有些棘手。接下来,我们将一同探讨如何开启视频剪辑与制作之旅,让新手从零基础入门,逐步迈…

浅谈C++ MFC

一、基本介绍 C MFC(Microsoft Foundation Classes)是微软公司提供的一个C类库,用于在Windows操作系统上快速开发应用程序。MFC库封装了Win32 API的复杂性,提供了一个面向对象的框架,使得开发者可以更容易地创建GUI&am…

数据仓库之 Atlas 血缘分析:揭示数据流奥秘

Atlas血缘分析在数据仓库中的实战案例 在数据仓库领域,数据血缘分析是一个重要的环节。血缘分析通过确定数据源之间的关系,以及数据在处理过程中的变化,帮助我们更好地理解数据生成的过程,提高数据的可靠性和准确性。在这篇文章中…

[Element] el-table修改滚动条上部分的背景色

[Element] el-table修改滚动条上部分的背景色 ::v-deep .el-table__cell .gutter {background: red;}

深入理解Java反射机制

Java反射(Reflection)是Java语言提供的一种强大工具,允许程序在运行时动态地获取和操作类的信息。这一机制为Java程序带来了极大的灵活性和扩展性,使得程序可以在编译时无法确定的情况下,根据需求动态加载类、调用方法…

科技查新在医药健康领域的应用

科技查新,作为一项通过文献检索和对比分析来评判科技项目新颖性的信息咨询活动,在医药健康领域扮演着至关重要的角色。它不仅提高了医学信息资源的利用率,还强化了社会的情报意识和技术创新意识,推动了医药科研工作乃至整个社会经…

SAP ABAP开发学习——WDA 七 使用文本与消息

目录 从数据字典读取文本 使用OTR文本 从程序中调用OTR文本 消息分类 定义消息显示位置 text类消息的使用 T100 消息的使用 OTR消息实例 消息内容修改 从数据字典读取文本 使用OTR文本 可以自己创建OTR文本 从程序中调用OTR文本 消息分类 定义消息显示位置 text类消息的…

基于物联网设计的地下煤矿安全监测与预警

文章目录 一、前言1.1 项目介绍【1】项目开发背景【2】设计实现的功能【3】项目硬件模块组成 1.2 设计思路1.3 系统功能总结1.4 开发工具的选择【1】设备端开发【2】上位机开发 1.5 模块的技术详情介绍【1】NBIOT-BC26模块【2】MQ5传感器【4】DHT11传感器【5】红外热释电人体检…

golang分布式缓存项目 Day 1

注:该项目原作者:https://geektutu.com/post/geecache-day1.html。本文旨在记录本人做该项目时的一些疑惑解答以及部分的测试样例以便于本人复习。 LRU缓存淘汰策略 三种缓存淘汰策略 FIFO(First In, First Out)先进先出 原理&…

Pr 视频过渡:沉浸式视频 - VR 默比乌斯缩放

效果面板/视频过渡/沉浸式视频/VR 默比乌斯缩放 Video Transitions/Immersive Video/VR Mobius Zoom VR 默比乌斯缩放 VR Mobius Zoom用于 VR 视频中的缩放式场景切换,通过缩小或放大的渐变效果在两个场景之间平滑过渡。 自动 VR 属性 Auto VR Properties 默认勾选…

【物联网技术】ESP8266 WIFI模块在AP模式下作为TCP服务器与多个电脑/手机网络助手(TCP客户端)通信——TCP数据透传

前言:完成ESP8266 WIFI模块在AP模式下作为TCP服务器与多个电脑/手机网络助手(TCP客户端)通信——实现TCP数据透传 AP模式,通俗来说模块可以发出一个WIFI热点提供给电脑/手机连接。 TCP服务端,通俗来说就是模块/单片机作为服务器,可以接收多个客户通道的连接。 本…