基于阻塞队列的生产者消费者模型

目录

生产者消费者模型

生产者消费者模型是什么?

生产者消费者模型优点

基于阻塞队列的生产者消费者模型


生产者消费者模型

前面我们学习了生产者多线程,以及多线程的控制,下面我们看一下多线程中最常见的一个应用——生产者消费者模型。

生产者消费者模型是什么?

生产者消费者模式是一个在编码中常用的设计,这样做不仅可以提高效率,还可以解耦!

那么我们介绍一下生产者消费者模型,我们使用生活中的例子:

现在有有一批顾客,他们想要买火腿肠,还有一批供应商是专门生产火腿肠的。

那么顾客一般去买火腿肠去哪里买呢?当然是去超市!

而供应商也因为生产的东西太多,不可能去专门零售,所以一般都是之间给超市供货。

而此时顾客就是消费者,供应商就是生产者,但是这里还出现了一个超市,而超市就是交易场所。

那么此时这三者之间有什么关系呢?

首先我们需要分析清楚里面有几种关系:

  1. 生产者——生产者

  2. 消费者——消费者

  3. 生产者——消费者

以上就有着三种关系,那么着三种关系之间有什么联系呢?

如果是两个生产者,那么我想要给这一家超市供货,你也想要着一家超市供货,那么此时这两家供货商就是竞争关系,那么在多线程这里叫什么呢?互斥!因为一次只能由一家供货商供货。

那么如果是两个消费者呢?假设现在超市就剩下一根火腿肠了,那么你也想要,我也想要,此时这根火腿只能由一个人买走,那么此时这两个消费者之间的关系就是竞争。

那么如果是生产者和消费者呢?如果供货商现在正在往超市送货,那么此时你可以进来吗?不可以,所以此时消费者和生产者就有竞争关系(互斥),那么假设现在超市没有火腿了,那么要怎么办呢?是不是需要生产者赶快生产火腿,是的!那么此时需要怎么办呢?需要消费者通知生产者!那么如果此时超市的火腿已经放满了呢?超市已经有火腿了,但是现在起顾客还不知道超市已经上货了,那么此时是不是需要生产者通知消费者超市的货物更新了。所以此时生产者和消费者是什么关系呢——同步!

所i总结一下他们之间的关系:

  1. 生产者——生产者 关系:互斥

  2. 消费者——消费者 关系:互斥

  3. 生产者——消费者 关系:同步与互斥

生产者消费者模型优点

上面说了什么是生产者消费者模型,那么下面看一下为什么要有生产者消费者模型以及他们的有优点。

生产者消费者模型的优点:

  1. 提高效率

  2. 解耦

为什么说生产者消费者模型可以提高效率呢?

还是以上面为例,当我们在生产商品的时候,假设现在顾客不需要了,但是生产者还可以生产吗?可以!因为可以生产出来放到超市,所以也就类似于有缓冲区,可以多存储一些火腿,那么假设现在顾客忽然要很多火腿那么如果没有超市之前的缓存的话,那么就可能满足不了顾客,所以这是提高效率的一个点!

还有就是,如果现在顾客要很多火腿,那么现在一家供应商已经满足不了了,那么是可以超市可以联系多家供应商一起供应火腿呢?是的所以此时生产火腿就是并发的生产,并不是只有一家在生产火腿,那么如果现在火腿超市已经块放满了,所以需要顾客消费火腿,那么此时一个顾客是消费不了这么多火腿的,那么此时是不是可以有很多顾客同时消费火腿,而消费火腿也是并发的,所以这是提高效率的一个打点。

上面还提到了解耦,那么如何做到解耦呢?

上面生产者生产的火腿首先放到了超市,所以生产者并不关心消费者,生产者只需要生产就好了,此时生产者是不知道消费者的存在的,那么此时消费者也是在超市购买火腿,他并不知道是生产者供应的,所以此时消费者也并不关心生产者,此时就实现了生产者和消费者之间的解耦。

基于阻塞队列的生产者消费者模型

下面我们就编写一个基于阻塞队列的生产者消费者模型!

说一下我们想要怎么样编写:

  1. 我想要一批生产者生产数据,然后放到阻塞队列里面。

  2. 然后又一批消费者消费数据,从阻塞队列里面拿出来,然后进行消费。

  3. 所以我们需要一个阻塞队列。

  4. 这个阻塞队列是有一定的大小,如果当数据放满了阻塞队列,那么就不能让生产者继续生产了。

  5. 如果当阻塞队列里面的没有数据了,那么就停止消费者对数据的消费,此时让生产者来生产。

  6. 当生产满了后,就需要通知消费者来消费。

下面先想一下我们的阻塞队列里面需要有那些数据:

  1. 首先一定需要有一个队列,用来存放数据,此时这个队列需要可以存放任意类型的数据,所以需要是模板。

  2. 由于生产者消费者都会访问阻塞队列里面的各种数据,所以我们需要一把锁,来控制各种关系之间的互斥。

  3. 由于生产者需要有空来存放数据,所以需要一个条件变量,来表示是否有该资源,如果没有就阻塞生产者。

  4. 消费者又要有数据来消费,所以消费者需要在一个有数据的条件下来消费,如果没有数据,就让消费者阻塞。

  5. 所以此时还需要两个条件变量,一个 empty 是为了给生产者是否有空的条件变量。

  6. 还有一个条件变量 full 表示给消费者的一个是否有数据的条件变量。

  7. 当然我说了,阻塞队列是有大小的,所以还需要一个值表示最大的容量。

template<class T>
class blockQueue
{
public:blockQueue(size_t capacity = 5):_capacity(capacity){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~blockQueue(){pthread_cond_destroy(&_empty);pthread_cond_destroy(&_full);pthread_mutex_destroy(&_mutex);// 释放锁}
private:queue<T> _bourse;         // 交易场所size_t _capacity;         // 最大容量pthread_mutex_t _mutex;   // 互斥锁pthread_cond_t _full;     // 是否有数据pthread_cond_t _empty;    // 是否为空
};

上面就是我们的阻塞队列的成员变量以及其构造和析构函数。

这里还是在稍微提一下构造和析构里面干了什么,这里不明白的可以看一下之前的线程的互斥与同步就知道了。

因为阻塞队列里面的锁以及条件变量不是全局的,所以需要调用 init 函数来初始化,并使用后需要 destroy 销毁,所以在构造的时候就需要调用 init 为 mutex 和 cond,析构函数里面需要调用 destroy 为 mutex 和 cond。

那么还阻塞队列还需要哪些函数呢?

由于生产者需要将数据放入到阻塞队列中,而消费者需要将数据消费,也就是拿出来,那么就当然需要一个 push 函数,和pop 函数。

下面先用一个函数说明,其实这两个函数的细节是一样的,所以了解了一个函数,那么另一个也就知道了。

下面我们想一下这两个函数需要如何设计:

  1. 当生产者想要将一个数据 push 到阻塞队列中,那么第一步并不是将数据放入到阻塞队列中,前面我们说了想访问资源,那么需要先检测资源是否存在,而对于生产者来说,资源就是空位置,许哦一需要先检测资源。

  2. 但是检测资源是否存在也是访问数据,那么访问临界资源就是要加锁的,所以首先应该加锁。

  3. 加锁后然后访问资源,那么如果资源满足的话,那么就可以继续向后执行,然后将数据 push 到阻塞队列中。

  4. 那么如果是资源不满足呢?如果不满足就什么都不做的话,那么由于生产者不知道什么时候有数据,所以需要一直循环检测,也就是一直申请锁,然后释放锁,所以如果资源不满足的话,就让生产者进行等待,然后等资源满足后唤醒需生产者,那么什么时候资源满足呢?当有空位置的时候,那么就是资源满足,所以当消费者消费数据后,那么就资源满足了,所以当消费者消费数据后,那么就可以唤醒生产者。

  5. 那么如果当生产者生产数据就,那么就可以释放锁了,那么释放锁后呢?还要做什么吗?当然还需要,假设之前没有数据,然后消费者就会进行消费,但是由于没有数据,所以消费者会进程阻塞,等待资源到来,对于消费者而言,数据就是生产者生产的数据,所以此时生产者生产数据后还需要做的一件事就是唤醒消费者。

  6. 那么如果是对于消费者而言呢?消费者就需要消费数据,如果没有数据就进行阻塞,等待数据到来,所以消费者和生产者也是相同的。

下面看一下代码就理解了:

// 将生产的数据放入阻塞队列void push(const T &goods){// 1. 检查是否可以放入数据,但是需要访问临界资源,需要先加锁pthread_mutex_lock(&_mutex);// 2. 访问队列是否为满,如果为满的话,那么便生产不了数据,需要到空的条件下等待,只要有空了,那么便可以继续while (_bourse.size() >= _capacity)pthread_cond_wait(&_empty, &_mutex);// 3. 将商品放入阻塞队列,等待消费者消费_bourse.push(goods);// cout << "生产者[" << pthread_self() << "]生产了一个数据: " << goods << endl;// 4. 放入后,说明现在有商品了,那么便可以唤醒在 full 条件下等待的线程pthread_cond_signal(&_full);// 5. 唤醒后解锁pthread_mutex_unlock(&_mutex);}
​void pop(T &goods){// 1. 检查是否可以消费数据(阻塞队列里面有没有商品),但是需要先加锁pthread_mutex_lock(&_mutex);// 2. 判断是否可以消费while (_bourse.empty())pthread_cond_wait(&_full, &_mutex);// 3. 消费商品goods = _bourse.front();_bourse.pop();// cout << "消费者[" << pthread_self() << "]消费了一个数据: " << goods << endl;// 4. 消费商品后,有空余位置,就可以生成商品,那么唤醒生产者pthread_cond_signal(&_empty);// 5. 解锁pthread_mutex_unlock(&_mutex);}

这就是阻塞队列,那么我们是需要基于阻塞队列实现的生产者消费者模型,所以我们还需要有生产者和消费者,所以下面我们看一下 main 函数怎么写:

下面我们需要创建线程,创建线程后,我们就让主线程进行 join 就可以了:

#define THREAD_NUM 1
​
// 交易场所
blockQueue<task> market(10);
const char* oper = "+-*/%";
​
// 一直生产数据
void *product(void *args)
{cout << "一个生产者被创建[" << pthread_self() << "]" << endl;int s = strlen(oper);// cout << "s: " << s << endl;while (true){int x = rand() % 100;int y = rand() % 3;char op = oper[rand() % s];market.push({x, y, op});// cout << "生产者[" << pthread_self() << "]生产了一个数据: " << goods << endl;cout << "生产一个数据:" << x << op << y << "=?" << endl;sleep(1);}
}
​
// 一直消费数据
void *consum(void *consum)
{cout << "一个消费者被创建[" << pthread_self() << "]" << endl;while (true){task date;market.pop(date);// cout << "消费者[" << pthread_self() << "]消费了一个数据: " << goods << endl;if(date.get_code()) cout << date.get_x() << date.get_op() << date.get_y() << " 计算出错" << endl;else cout << date.get_x() << date.get_op() << date.get_y() << "=" << date() << " code: " << date.get_code() << endl;// sleep(1);}
}
​
void pthreadJoin(vector<pthread_t> &threads)
{for (pthread_t tid : threads){pthread_join(tid, nullptr); // 忽略返回值cout << "tid: " << tid << " 等待成功" << endl;}
}
​
void creatCPThread(vector<pthread_t> &threads, size_t c_num, size_t p_num)
{threads.resize(c_num + p_num);// 创建生产者for (int i = 0; i < p_num; ++i)pthread_create(&(threads[i]), nullptr, product, nullptr);// 创建消费者for (int i = 0; i < c_num; ++i)pthread_create(&(threads[i + p_num]), nullptr, consum, nullptr);
}
​
int main()
{// 数据来源srand(time(0));
​// 创建线程vector<pthread_t> threads;creatCPThread(threads, THREAD_NUM, THREAD_NUM);// joinpthreadJoin(threads);return 0;
}

这里我们使用 vector 来保存线程的 id ,方便和后面 join 线程。

这里创建 THTREAD_NUM 个生产者和消费者线程,然后分别让生产者执行 product 函数,让消费者执行 consum 函数。

然后让主线程就 join 其他线程即可。

解释 pthread_cond_wait 为什么需要 mutex!

通过这一次的代码,以及代码中的注释。

当生产者发现没有空位置的时候,是需要阻塞的,但是此时阻塞的位置是在哪里?

在临界区里面,那么此时他是抱着锁阻塞的,那么如果他就直接阻塞的话,那么此时他抱着锁,那么会发生什么?死锁,但是我们发现阻塞后米有发生死锁,那么说明了什么,在阻塞前,需要先解锁。所以 wait 函数里面一定帮我们进行了解锁,所以没有锁,那么既然需要解锁,那么需要什么变量,一定需要锁变量,所以 wait 是需要锁的。

那么在唤醒的时候是在哪里呢?在哪里阻塞就在那里唤醒,所以在 wait 唤醒,那么此时刚被唤醒是没有锁的,那么此时唤醒的位置是什么地方?还是临界区,那么此时如果唤醒在临界区里面,如果没有锁可以吗?不可以,所以在唤醒后,一定需要竞争锁,所以唤醒后还是需要竞争锁的。

还有一个问题,为什么在检测资源是否满足的时候使用 while 循环判断呢?

因为如果资源不满足的话,是需要wait的,wait是一个函数,既然是函数,那么就有可能调用失败,那么如果调用失败的话,如果底层使用的是数组迷你队列,那么此时就可能发生越界等等问题,如果使用的是 stl 那么最大容量可能就超了,也可能会直接奔溃,所以此时是有危险的,所以有可能唤醒是伪唤醒,所以这里需要循环判断,然唤醒的位置,继续在判断一次,如果满足条件的话,那么就去执行后面的代码!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/170545.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言进阶之路-运算符小怪篇

目录 一、学习目标 二、运算符详谈 算术运算符 关系运算符 逻辑运算符 位运算符 特殊运算符 条件运算符 sizeof 运算符 打怪实战 三、控制流 二路分支 多路分支 const while与 do…while循环 语法&#xff1a; for循环 break与continue goto语句&#xff08…

MySQL MHA高可用架构搭建

快捷查看指令 ctrlf 进行搜索会直接定位到需要的知识点和命令讲解&#xff08;如有不正确的地方欢迎各位小伙伴在评论区提意见&#xff0c;博主会及时修改&#xff09; MySQL MHA高可用架构搭建 MHA&#xff08;Master HA&#xff09;是一款开源的 MySQL 的高可用程序&#xf…

如何获得微软MVP徽章

要成为微软MVP&#xff0c;需要在特定领域成为专家&#xff0c;并积极参与社区&#xff0c;为其他人提供帮助和支持。以下是一些步骤可以帮助你成为MVP&#xff1a; 在特定领域成为专家&#xff1a;要成为MVP&#xff0c;需要在某个领域具有专业知识和经验。这可以通过阅读相关…

ffmpeg下载与配置环境变量

FFmpeg 是一个强大的多媒体框架&#xff0c;可以让用户处理和操纵音频和视频文件。具有易于使用的界面&#xff0c;用户可以在 Windows、Mac 或 Linux Ubuntu 系统上下载 FFmpeg 并将其提取到文件夹中。然后&#xff0c;该软件可以加入 PATH 环境变量中就可以快捷的使用软件了.…

pytorch矩阵乘法

torch.matmul torch.matmul是PyTorch中执行一般矩阵乘法的函数&#xff0c;它接受两个矩阵作为输入&#xff0c;并返回它们的乘积。它适用于任何两个矩阵&#xff0c;无论是密集矩阵还是稀疏矩阵。 import torch # 创建两个 2x2 矩阵 mat1 torch.tensor([[1, 2], [3, 4]]…

【LeeCode】27. 移除元素

给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val 的元素&#xff0c;并返回移除后数组的新长度。 不要使用额外的数组空间&#xff0c;你必须仅使用 O(1) 额外空间并原地修改输入数组。 元素的顺序可以改变。你不需要考虑数组中超出新长度后面的…

MYSQL基础知识之【修改数据,删除数据】

文章目录 前言MySQL UPDATE 查询使用PHP脚本更新数据 MySQL DELETE 语句从命令行中删除数据使用 PHP 脚本删除数据 后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;Mysql &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术…

使用 STM32 读取和解析 NTC 热敏电阻的数值

本文介绍了如何利用 STM32 微控制器读取和解析 NTC&#xff08;Negative Temperature Coefficient&#xff09;热敏电阻的数值。首先&#xff0c;我们将简要介绍 NTC 热敏电阻的原理和特性。接下来&#xff0c;我们将详细讨论如何设计电路连接和采用合适的 STM32 外设进行数值读…

Java——包机制(package、import)

包机制是Java中管理类的重要手段。开发中我们会遇到大量同名的类&#xff0c;通过包我们很容易对解决类重名的问题&#xff0c;也可以实现对类的有效管理。包对于类&#xff0c;相当于文件夹对于文件的作用。 package 我们通过package实现对类的管理&#xff0c;package的使用…

Element-Plus 图标自动导入

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

迎接“全全闪”时代 XSKY星辰天合发布星海架构和星飞产品

11 月 17 日消息&#xff0c;北京市星辰天合科技股份有限公司&#xff08;简称&#xff1a;XSKY星辰天合&#xff09;在北京首钢园举办了主题为“星星之火”的 XSKY 星海全闪架构暨星飞存储发布会。 &#xff08;图注&#xff1a;XSKY星辰天合 CEO 胥昕&#xff09; XSKY星辰天…

SQL sever2008中的游标

目录 一、游标概述 二、游标的实现 三、优缺点 3.1优点&#xff1a; 3.2缺点&#xff1a; 四、游标类型 4.1静态游标 4.2动态游标 4.3只进游标 4.4键集驱动游标 4.5显示游标&#xff1a; 4.6隐式游标 五、游标基本操作 5.1声明游标 5.1.1.IS0标准语法 5.1.1.1语…

【经典小练习】输出文件路径名

文章目录 &#x1f339;问题✨思路&#x1f354;代码&#x1f6f8;读取文件&#xff0c;并把文件名保存到文件中 对指定目录下的所有 Java 文件进行编译、打包等处理&#xff1b; 查找指定目录下所有包含特定字符串的 Java 文件&#xff1b; 统计指定目录下所有 Java 文件的行数…

【数据结构实验】树(一)构建二叉查找树(BST)

文章目录 1. 引言2. 二叉查找树3. 实验内容3.1 实验题目&#xff08;一&#xff09;输入要求&#xff08;二&#xff09;输出要求 3.2 算法实现1. 数据结构2. 全局变量3. 中序遍历函数InOrder4. 二叉查找树的构建函数T5. 主函数 3.3 代码整合 4. 实验结果 1. 引言 二叉查找树&a…

【Linux】进程间通信——system V共享内存、共享内存的概念、共享内存函数、system V消息队列、信号量

文章目录 进程间通信1.system V共享内存1.1共享内存原理1.2共享内存数据结构1.3共享内存函数 2.system V消息队列2.1消息队列原理 3.system V信号量3.1信号量原理3.2进程互斥 4.共享内存的使用示例 进程间通信 1.system V共享内存 1.1共享内存原理 共享内存区是最快的IPC形式…

从零开始搭建博客网站-----源代码试部署

拿到了该项目的源码&#xff0c;先尝试是否可以成功部署&#xff0c;详细的部署视频地址 后端项目部署 先把maven配置好&#xff0c;都改成自己下载的maven地址 文件编码改成utf-8&#xff0c;防止配置文件乱码 如果maven是刚下的&#xff0c;要改一下下载包的地址&#xff0…

rabbitMq确认机制之ConfirmType

配置方式 Bean(name "connectionFactory")Primarypublic ConnectionFactory normalConnectionFactory(Value("${spring.rabbitmq.username}") String username,Value("${spring.rabbitmq.password}") String password,Value("${spring.rab…

ASP产品通过网络安全专用产品安全认证

什么是网络安全专用产品安全检测&#xff1f; 网络安全专用产品安全检测是指对网络关键设备和网络安全专用产品进行安全性评估和检测&#xff0c;以确保其符合相关标准和法规的要求&#xff0c;能够有效地抵御网络攻击和威胁。该检测由具备资格的机构进行&#xff0c;采用认证…

SELinux零知识学习三十二、SELinux策略语言之角色和用户(3)

接前一篇文章:SELinux零知识学习三十一、SELinux策略语言之角色和用户(2) 三、SELinux策略语言之类型强制 SELinux提供了一种依赖于类型强制(类型增强,TE)的基于角色的访问控制(Role-Based Access Control),角色用于组域类型和限制域类型与用户之间的关系,SELinux中…

bugku 渗透测试

场景1 查看源代码 场景2 用dirsearch扫描一下看看 ok看到登录的照应了第一个提示 进去看看 不出所料 随便试试admin/admin进去了 在基本设置里面看到falg 场景3 确实是没啥想法了 找到php在线运行 检查网络&#xff0c;我们发现这个php在线运行会写入文件 那我们是不是写…