102、Python并发编程:Queue与生产者消费者模型实现解耦、协作

引言

在实际业务场景中,很多时候在处理复杂任务的时候,会拆分上下游各个环节,形成一个类似于流水线的处理方式。上游类似于生产者,下游要依赖上游的输出进行工作,类似于消费者。但是,很多时候,上游和下游由于工作的复杂程度、环境、性能等,处理的速率不是太匹配。为了保证上下游各自按照自己的速率运转,从而保证最高的处理性能,通常的做法是引入一个中间件,比如缓存、消息队列。从而实现速率的匹配,以及对不同繁忙程度的任务进行削峰填谷,实现处理性能的平滑效果。

本文通过一个简化的生产者消费者模型,以及Queue的使用,来展现关于上述不同环节工作的协同与解耦的需求的实现。

本文的主要内容有:

1、生产者消费者模型

2、基于deque实现生产者消费者模型

3、基于Queue实现生产者消费者模型

4、deque与Queue的简单比较

生产者消费者模型

生产者消费者模型是一种常见的并发编程设计模式,适用于多个生产者和消费者之间进行任务或数据的共享与传递,从而实现解耦与高效协作。

该模型的核心思想是将生产数据的线程(生产者)与消费数据的线程(消费者)通过一个缓冲区(一般是线程安全的队列)进行连接。

实际工作中,有不少适用于生产者消费者模型的典型场景,简单列举如下:

1、任务调度系统

任务调度器充当生产者,将触发了待执行的任务放到队列中,多个工作线程充当消费者,从队列中取出任务进行执行。

2、日志处理系统

日志生成的线程充当生产者,将日志记录放入队列中,日志处理线程从队列中获取日志,进行相应的存储、分析等。

3、数据库连接池

客户端作为生产者请求数据查询操作,数据库连接池中的线程作为消费者接收客户端的请求,进行相应的查询处理,并返回结果。

4、流媒体服务器

视频编码器将视频帧数据放入队列中,流媒体服务器从队列中取出帧数据进行传输。

可以看到,涉及到复杂任务的拆解、解耦,同时需要进行上下游协同的需求场景,都很适合使用生产者消费者模型来实现。

下面通过deque和Queue分别来尝试模拟如下生产者消费模型的简单实现:

1、有2个工人,负责产品的生产,产品生产完成后放到仓库中。

2、有3个销售人员,负责产品的销售,销售的产品从仓库中出库,经过物流给到客户手中。

3、仓库是有成本的,所已存储空间是有限的,加入仓库最多存10个产品。

基于deque实现生产者消费者模型

首先我们通过Python中的deque这个集合类型,来尝试实现上面的生产者消费者模型的模拟需求。

直接看代码:

from collections import deque
from threading import Lock, Thread
import timeclass Worker(Thread):def __init__(self, warehouse, lock):super().__init__()self.warehouse = warehouseself.lock = lockdef run(self):while True:with lock:print(f'准备生产产品,当前库存:{len(self.warehouse)}')self.warehouse.append('这是个产品')print('生产完成')time.sleep(0.5)class Sales(Thread):def __init__(self, warehouse, lock):super().__init__()self.warehouse = warehouseself.lock = lockdef run(self):while True:with lock:try:print(f'成功开单,准备出库,当前库存:{len(self.warehouse)}')self.warehouse.popleft()print('销售完成')except IndexError as e:print('暂时没有可销库存')time.sleep(0.5)if __name__ == '__main__':warehouse = deque(maxlen=10)lock = Lock()for i in range(2):Worker(warehouse, lock).start()for i in range(3):Sales(warehouse, lock).start()

对上面的代码简要说明一下:

1、由于是多线程并发,而deque是非线程安全的,所以需要用到锁。

2、deque是非阻塞的,在队列满和空的时候,会有相应的异常处理逻辑。

程序的执行结果如下:

ad427574687417caf0a8e47a44413b44.jpeg

基于Queue实现生产者消费者模型

接下来看一下,通过本文的主角Queue来实现生产者消费者模型。

直接看代码:

from threading import Thread
from queue import Queue
import timeclass Worker(Thread):def __init__(self, warehouse):super().__init__()self.warehouse = warehousedef run(self):while True:print(f'准备生产产品,当前库存:{self.warehouse.qsize()}')self.warehouse.put('这是个产品')print('生产完成')time.sleep(0.2)class Sales(Thread):def __init__(self, warehouse):super().__init__()self.warehouse = warehousedef run(self):while True:print(f'成功开单,准备出库,当前库存:{self.warehouse.qsize()}')self.warehouse.get()print('销售完成')time.sleep(0.2)if __name__ == '__main__':warehouse = Queue(maxsize=10)for i in range(2):Worker(warehouse).start()for i in range(3):Sales(warehouse).start()

从代码量来看,稍微简洁一些,我们不需要额外引入锁了,也不需要进行异常判断。也能实现同样的执行效果。

结合源码,来简单看一下Queue的实现:

0af274d28ec62825899b3a9ffbc33380.jpeg

从源码中,可以知道,Queue = deque + mutex + not_empty + not_full:

1、Queue内部维持了一个deque对象,用于实现基本的队列操作。

2、mutex属性持有一个互斥锁,确保多个线程访问、修改队列时的互斥性,从而保证线程安全。

3、通过两个条件变量:not_empty、not_full,实现阻塞特性,从而更简洁地控制线程何时应该等待或者被唤醒,实现生产者消费者模型的同步机制。

所以,总结来说,Queue是1个基础的双端队列作为容器,持有一个互斥锁确保线程安全,通过非空、非满条件变量实现阻塞特性。

deque与Queue的简单比较

最后,我们来简单比较一下deque和Queue,从而后面更好地根据实际需求场景进行更好的选择。

1、线程安全特性

Queue是线程安全的,deque是非线程安全的。

2、阻塞操作

Queue是支持阻塞操作的,deque是不支持阻塞操作的,可以通过循环判断,反复确认队列的状态,会对CPU带来更多的消耗。

3、最大长度支持

deque是支持最大长度的,Queue内部也是通过deque来实现的,也是支持最大长度的。

4、阻塞超时机制

deque不支持阻塞,自然也不具备阻塞超时机制,Queue是支持的。

5、使用场景

Queue适合多线程、多进程的并发场景;deueue在单线程中更适用,而且性能较好。

总结

本文通过一个上下游任务协同的需求,引入了生产者消费者模型,同时通过两种方式模拟了生产者消费者模型的实现。最后,简单比较了deque和Queue的主要区别和适用场景。

感谢您的拨冗阅读。

d532dfa7605ea11de56ae92254b3c7e3.jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/58572.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【梯度提升专题】XGBoost、Adaboost、CatBoost预测合集:抗乳腺癌药物优化、信贷风控、比特币应用|附数据代码...

全文链接:https://tecdat.cn/?p38115 分析师:Yang Yang,Kechen Zhao 在当今科技日新月异的时代,数据的有效利用成为各领域突破发展的关键。于医疗领域,乳腺癌的高发性与严重性不容忽视,优化抗乳腺癌候选药物的筛选与特…

机器学习与AI|如何利用数据科学优化库存周转率?

对于所有零售商来说,良好的库存管理都是非常重要的。众所周知,商品如果不放在货架上就无法出售,而如果库存过多则意味着严重的财务负担。 但是做好库存管理绝非易事,它依赖于对未来需求的准确预测和确保始终有合适库存的敏捷供应链…

安卓智能对讲终端|北斗有源终端|三防对讲机|单兵终端|单北斗

在当今快速发展的通信技术时代,智能对讲手持机已成为众多行业领域中不可或缺的通讯工具。QM240T安卓智能对讲手持机,作为一款集先进技术与实用功能于一身的高端设备,凭借其卓越的性能和多样化的应用特性,正逐步引领对讲机市场的革…

【数据集】【YOLO】【目标检测】抽烟识别数据集 6953 张,YOLO/VOC格式标注,吸烟检测!

数据集介绍 【数据集】抽烟识别数据集 6953 张,目标检测,包含YOLO/VOC格式标注。数据集中包含1种分类:“smoking”。数据集来自国内外图片网站和视频截图。检测范围园区吸烟检测、禁烟区吸烟检测、监控吸烟检测、无人机吸烟检测等。 主页私…

软件设计师-上午题-15 计算机网络(5分)

计算机网络题号一般为66-70题,分值一般为5分。 目录 1 网络设备 1.1 真题 2 协议簇 2.1 真题 3 TCP和UDP 3.1 真题 4 SMTP和POP3 4.1 真题 5 ARP 5.1 真题 6 DHCP 6.1 真题 7 URL 7.1 真题 8 浏览器 8.1 真题 9 IP地址和子网掩码 9.1 真题 10 I…

视频制作与剪辑怎么学,零基础入门视频剪辑和制作

视频制作与剪辑是一门充满创意与挑战的艺术形式,对于零基础的学习者来说,没选对软件不了解剪辑步骤,入门可能会显得有些棘手。接下来,我们将一同探讨如何开启视频剪辑与制作之旅,让新手从零基础入门,逐步迈…

[Element] el-table修改滚动条上部分的背景色

[Element] el-table修改滚动条上部分的背景色 ::v-deep .el-table__cell .gutter {background: red;}

SAP ABAP开发学习——WDA 七 使用文本与消息

目录 从数据字典读取文本 使用OTR文本 从程序中调用OTR文本 消息分类 定义消息显示位置 text类消息的使用 T100 消息的使用 OTR消息实例 消息内容修改 从数据字典读取文本 使用OTR文本 可以自己创建OTR文本 从程序中调用OTR文本 消息分类 定义消息显示位置 text类消息的…

基于物联网设计的地下煤矿安全监测与预警

文章目录 一、前言1.1 项目介绍【1】项目开发背景【2】设计实现的功能【3】项目硬件模块组成 1.2 设计思路1.3 系统功能总结1.4 开发工具的选择【1】设备端开发【2】上位机开发 1.5 模块的技术详情介绍【1】NBIOT-BC26模块【2】MQ5传感器【4】DHT11传感器【5】红外热释电人体检…

golang分布式缓存项目 Day 1

注:该项目原作者:https://geektutu.com/post/geecache-day1.html。本文旨在记录本人做该项目时的一些疑惑解答以及部分的测试样例以便于本人复习。 LRU缓存淘汰策略 三种缓存淘汰策略 FIFO(First In, First Out)先进先出 原理&…

Pr 视频过渡:沉浸式视频 - VR 默比乌斯缩放

效果面板/视频过渡/沉浸式视频/VR 默比乌斯缩放 Video Transitions/Immersive Video/VR Mobius Zoom VR 默比乌斯缩放 VR Mobius Zoom用于 VR 视频中的缩放式场景切换,通过缩小或放大的渐变效果在两个场景之间平滑过渡。 自动 VR 属性 Auto VR Properties 默认勾选…

华为实时视频使用FLV播放RTSP流

import flvjs from ‘flv.js’; 安装flv <video style"width:100%;height:100%;" ref"videoHWRef" ></video>// src 华为rtsp流 rtsp://admin:Huaweivideo10.10.8.151:554/xxx/trackID1// url 需要后端提供视频源地址playVideo() {if (fl…

【华为机试题】光伏场地建设规划 [Python]

题目 代码 class Solution:def func(self, input_args, area_list):count 0for i in range(input_args[0] - input_args[2] 1):for j in range(input_args[1] - input_args[2] 1):count 1 if self.area_compute(area_list,i,j,input_args[2],input_args[3]) else 0print(c…

备考25年二建,最好这样选专业!

2025年的二建备考已经开始了&#xff0c;很多考生在纠结报考哪个专业&#xff0c;二级建造师各专业难度如何&#xff1f;哪个专业含金量更高&#xff1f; 今天就带大家了解一下这六大专业&#xff0c;一起来看~ ​建筑专业 考核方向&#xff1a;建筑工程技术要求、建筑工程专…

向量模型Jina Embedding: 从v1到v3论文笔记

文章目录 Jina Embedding: 从v1到v3Jina Embedding v1数据集准备训练过程 Jina Embedding v2预训练修改版BERT在文本对上微调在Hard Negatives上微调 Jina Embedding v2 双语言预训练修改版BERT在文本对上微调用多任务目标微调 Jina Embedding v3预训练在文本对上微调训练任务相…

「Mac畅玩鸿蒙与硬件28」UI互动应用篇5 - 滑动选择器实现

本篇将带你实现一个滑动选择器应用&#xff0c;用户可以通过滑动条选择不同的数值&#xff0c;并实时查看选定的值和提示。这是一个学习如何使用 Slider 组件、状态管理和动态文本更新的良好实践。 关键词 UI互动应用Slider 组件状态管理动态数值更新用户交互 一、功能说明 在…

全连接神经网络案例——手写数字识别

文章目录 1.我们导入需要的工具包2.数据加载3.数据处理4.模型构建5.模型编译6.模型训练7.模型测试8.模型保存 使⽤⼿写数字的MNIST数据集如上图所示&#xff0c;该数据集包含60,000个⽤于训练的样本和10,000个⽤于测试的样本&#xff0c;图像是固定⼤⼩(28x28像素)&#xff0c;…

2.ARM_ARM是什么

CPU工作原理 CPU与内存中的内容&#xff1a; 内存中存放了指令&#xff0c;每一个指令存放的地址不一样&#xff0c;所需的内存空间也不一样。 运算器能够进行算数运算和逻辑运算&#xff0c;这些运算在CPU中都是以运算电路的形式存在&#xff0c;一个运算功能对应一种运算电…

MetaGeneMark:宏转录组转录本基因预测

GeneMark™ download 下载 gunzip gm_key_64.gz tar -xvzf MetaGeneMark_linux_64.tar.gz #查看安装 (完整路径)/gmhmmp #解压文件里面这个比较重要 MetaGeneMark_linux_64/mgm/MetaGeneMark_v1.mod #复制gm_key文件到主路径 mv gm_key_64 .gm_key cp .gm_key /home/zhongpei…

腾讯轻量云服务器docker拉取不到镜像的问题:拉取超时

前言 也是尝试了各种解决方案之后&#xff0c;无果&#xff0c; 后来发现每个服务器提供商都有自己的镜像加速&#xff0c;且只给自家服务器使用&#xff0c;我用的腾讯云 教程 安装docker 直接上链接&#xff1a;云服务器 搭建 Docker-实践教程-文档中心-腾讯云 配置加速镜…