基于blockqueue的生产和消费模型

线程篇下讲的是基于阻塞队列的生产者消费者模型。在学习这个之前我们先了解一些其他概念:

同步:在保证数据安全的条件下,让线程按某种特定的顺序依次访问临界资源。

通过上一节的代码我们实现了一个多线程抢票的程序,但结果显示的是一个线程在疯狂的抢票,这就导致了其他线程抢占不到临界资源而导致的饥饿问题。

所以在抢代码的代码逻辑中,我们需要保证各个线程以同步的方式进行抢票。那如何实现呢?就需要用到信号量:

条件变量的接口函数:

条件变量的使用:

#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>int tickets = 1000;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *start_routine(void *args)
{std::string name = static_cast<const char *>(args);while (true){pthread_mutex_lock(&mutex);pthread_cond_wait(&cond, &mutex); // 为什么要有mutex,后面就说// 判断暂时省略std::cout << name << " -> " << tickets << std::endl;tickets--;pthread_mutex_unlock(&mutex);}
}int main()
{// 通过条件变量控制线程的执行pthread_t t[5];for (int i = 0; i < 5; i++){char *name = new char[64];snprintf(name, 64, "thread %d", i + 1);pthread_create(t+i, nullptr, start_routine, name);}while (true){sleep(1);// pthread_cond_signal(&cond);pthread_cond_broadcast(&cond);std::cout << "main thread wakeup one thread..." << std::endl;}for (int i = 0; i < 5; i++){pthread_join(t[i], nullptr);}return 0;
}

条件变量的理解:

我们用一个例子来说明:

没有条件变量就好比是面试官在一个办公室面试,只要应聘者得到门外挂着的钥匙进来就可以面试。这时一个应聘者抢到了钥匙进入了办公室,面试结束后想要将钥匙归还时感觉自己面的不好,这时自己离钥匙也是最近的,所以刚出门又拿起了钥匙继续面试。这钥匙就导致了其他应聘者拿不到而这个应聘者在疯狂的面试,很明显这不合理。

但是有了条件变量就好比是面试官在外面设置了一个等待区,面试完的人必须归还钥匙并且再次回到等待队列的最末尾重新排队,这就保证了各个应聘者都有机会面试。

 条件不满足的时候,线程必须去某些定义好的条件变量上进行等待

pthread_cond_wait第二个参数的意义(将锁传进去):该函数调用的时候,会以原子性的方式,将锁释放,并将线程挂起等待。用pthread_cond_sign将线程唤醒时,该线程会拿着你所传入的锁将代码继续向下运行。通过以上知识的铺垫让我们进入今天的正题:

生产者消费者模型

什么是生产者消费者模型呢?我们用一个形象的图来说明:

生产者消费者模型就是生产者可以将生产的数据存放在一个共享区,消费者从共享区中获得数据进行消费。由于是共享区,就要保证数据的安全性。当生产者生产一个数据时,另一个生产者不能在同一个数据上生产,不然会导致数据的不安全性。因此生产者和生产者之间是互斥关系的。同样消费者也不能同时消费同一个数据,不然可能会导致数据的不一致性,因此消费者和消费者之间是互斥关系的。如果这时一个生产者正在生产一个数据,而同时消费者也正在消费这个数据。就可能导致数据还没生产完全就已经被消费了,也会导致数据的不安全性。因此生产者和消费者之间是互斥关系,同时我们还希望生产者生产以后就有消费者来消费,消费者消费完一个就有生产者来生产,因此生产者和消费者之间是同步关系。

总结一下就是3种关系、两种角色、一个交易场所:

这一段特殊的缓冲区可以提前存放一批数据,这样消费者想消费的时候消费,生产者想什么时候生产就什么时候生产。解决了生产和消费两批线程忙闲不均的问题,是它们不具有强耦合的关系。

基于blockqueue的生产和消费模型

基于普通方式处理数据

代码实现:

//Main.cc#include <iostream>
#include <unistd.h>
#include <time.h>
#include "BlockQueue.hpp"using namespace std;void* consumer(void* args)
{BlockQueue<int>* bq =static_cast<BlockQueue<int>*>(args);while(true){int val =0;bq->pop(&val);cout<<"我是消费者,我消费了一个数字:"<<val<<endl;sleep(1);}return nullptr;
}void* productor(void* args)
{BlockQueue<int>* bq =static_cast<BlockQueue<int>*>(args);while(true){int val =rand()%10;bq->push(val);cout<<"我是生产者,我生产了一个数字:"<<val<<endl;//sleep(1);}return nullptr;
}int main()
{srand((unsigned long)time(nullptr)^getpid());BlockQueue<int>* queue =new BlockQueue<int>(5);pthread_t c,p;pthread_create(&c,nullptr,consumer,(void*)queue);pthread_create(&p,nullptr,productor,(void*)queue);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}//BlockQueue.hpp
#include <queue>
#include <pthread.h>
#include <iostream>
using namespace std;const int NUM =5;
template<class T>
class BlockQueue
{
public:BlockQueue(const int numsize =NUM):_maxsize(NUM){pthread_mutex_init(&_lock,nullptr);pthread_cond_init(&_ccond,nullptr);pthread_cond_init(&_ccond,nullptr);}void push(const T& in){pthread_mutex_lock(&_lock);while(is_Full()){//这里说明阻塞队列是满的,需要让生产者等待pthread_cond_wait(&_pcond,&_lock);}//这里说明阻塞队列至少有一个空位可以插入_queue.push(in);//唤醒消费者去消费pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_lock);}void pop(T* out){pthread_mutex_lock(&_lock);while(is_Empty()){//这里说明阻塞队列是空的,需要让消费者等待pthread_cond_wait(&_ccond,&_lock);}//这里说明阻塞队列至少有一个数据*out=_queue.front();_queue.pop();//唤醒生产者生产数据pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_lock);}~BlockQueue(){pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_ccond);pthread_cond_destroy(&_pcond);}private:bool is_Full(){return _queue.size()==_maxsize;}bool is_Empty(){return _queue.empty();}private:queue<T> _queue;int _maxsize;pthread_mutex_t _lock;   //保护临界资源的锁pthread_cond_t _ccond;   //消费者的条件变量pthread_cond_t _pcond;   //生产者的条件变量
};

代码细节:

基于计算器任务的Task

我们得创建一个task.hpp,里面定义一个CalTask类:

class CalTask
{
public:using func_t =std::function<int(int,int,char)>;CalTask(){}CalTask(int x,int y,char op,func_t func):_x(x),_y(y),_op(op),_callback(func){}std::string operator()(){int result =_callback(_x,_y,_op);char buffer[64];snprintf(buffer,sizeof buffer,"%d %c %d =%d",_x,_op,_y,result);return buffer;}std::string to_string(){char buffer[64];snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);return buffer;}~CalTask(){}private:int _x;int _y;char _op;func_t _callback;};

生产者任务:

void* productor(void* args)
{BlockQueue<CalTask>* bq =static_cast<BlockQueue<CalTask>*>(args);while(true){int x =rand()%10;int y =rand()%10;char op =op_str[rand()%op_str.size()];CalTask task(x,y,op,mymath);bq->push(task);cout<<"productor task:"<<task.to_string()<<endl;//sleep(1);}return nullptr;
}

消费者任务:

void* consumer(void* args)
{BlockQueue<CalTask>* bq =static_cast<BlockQueue<CalTask>*>(args);while(true){CalTask t;bq->pop(&t);string result =t();cout<<"consumer result:"<<result<<endl;sleep(1);}return nullptr;
}


基于两个阻塞队列实现计算与存储:

将计算和存储的两个队列放进同一个类中:

template<class c,class s>
class BlockQueues
{public:BlockQueue<c>* c_bq; BlockQueue<s>* s_bq;
};

main函数:

int main()
{srand((unsigned long)time(nullptr)^getpid());BlockQueues<CalTask,SaveTask> bqs;bqs.c_bq =new BlockQueue<CalTask>;bqs.s_bq =new BlockQueue<SaveTask>;//BlockQueue<CalTask>* bqs =new BlockQueue<CalTask>(5);pthread_t c,p,s;pthread_create(&c,nullptr,consumer,(void*)&bqs);pthread_create(&p,nullptr,productor,(void*)&bqs);pthread_create(&s,nullptr,saver,(void*)&bqs);pthread_join(c,nullptr);pthread_join(p,nullptr);pthread_join(s,nullptr);delete bqs.c_bq;delete bqs.s_bq;return 0;
}

存储任务:

class SaveTask
{typedef std::function<void(const std::string&)> func_t;
public:SaveTask(){}SaveTask(const std::string &message, func_t func): _message(message), _func(func){}void operator()(){_func(_message);}
private:std::string _message;func_t _func;
};void Save(const std::string &message)
{const std::string target = "./log.txt";FILE *fp = fopen(target.c_str(), "a+");if(!fp){std::cerr << "fopen error" << std::endl;return;}fputs(message.c_str(), fp);fputs("\n", fp);fclose(fp);
}

存储线程执行的任务:

void *saver(void *bqs_)
{BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;while(true){SaveTask t;save_bq->pop(&t);t();std::cout << "save thread,保存任务完成..." << std::endl; }return nullptr;
}

代码的执行结果:

生产者与消费者模型优势总结:

这个模型的优势不在于多个线程能够并发式的对共享资源里面的数据进行访问和处理,而是多个线程能够在加载任务处理任务的时候进行并发处理。

在我们上面的代码逻辑中很简单,就是随机构造x和y构成一个任务最后放入阻塞队列中。而在实际情况中加载任务的时候没有那么简单,有时需要从网络或者数据库中加载,这就需要消耗很长的一段时间。这个模型的优势就是在于多个生产者能同时加载多个任务,随后竞争出一名生产者将任务放入共享资源(阻塞队列)中,然后在竞争出一名消费者取出任务。因此模型的优势不在于多个线程能并发的从阻塞队列中拿数据处理,而是在加载任务的时候做到并发节省时间。同理处理任务,当一个线程处理任务的同时,另一个线程仍可以从阻塞队列取出任务处理,不影响之前的进程处理任务,因此消费者处理任务的环节也做到了并发,实现了加载任务和处理任务的解耦,提高了整个程序的运行效率和速度。

到这里本章的内容就全部结束了,创作不易,希望大家多多点赞支持。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/69426.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分布式锁之redis实现

docker安装redis 拉取镜像 docker pull redis:6.2.6 查看镜像 启动容器并挂载目录 需要挂在的data和redis.conf自行创建即可 docker run --restart always -d -v /usr/local/docker/redis/redis.conf:/usr/local/etc/redis/redis.conf -v /usr/local/docker/redis/data:/dat…

使用 【jacoco】对基于 SpringBoot 和 Dubbo RPC 的项目生成测试覆盖率报告:实践+原理

基于 Dubbo RPC 的项目中有一个提供者项目backend、一个消费者项目gateway、以及注册中心nacos。本篇文章记录在windows本地对该框架的测试过程&#xff0c;以及介绍jacoco的基本原理 测试过程 官网下载安装包解压到本地&#xff0c;https://www.jacoco.org/jacoco/ 只需要用…

11. Junit

我们主要学习的是 Junit5. 1. selenium 和 Junit 之间的关系 selenium 和 Junit 之间的关系 就像 Java 和 JavaScript 之间的关系&#xff0c;也就是没有关系。 为什么学习了 selenium 还要学习 Junit 呢&#xff1f; 举个例子&#xff0c;如果 Selenium 编写的自动化测试用…

论坛系统公共组件部分

1.在Java⽬录下创建包&#xff0c;在Resources⽬录下创建⽂件夹&#xff0c;结构如下 ├─java # java⽂件区 │ └─com │ └─example │ └─demo │ ├─common # 公共类 │ ├─config # 配置…

MySQL表操作

目录 一、创建mysql表的结构 1.在mydb数据库下创建一个表格名字为stu_info&#xff0c;里面结构包含了学号和姓名的名称&#xff0c;字符型以及他的引擎为innodb 字符集为gbk 校队规则为gbk_chinese_ci 二、数据库表查看的基本用法语句 1.查看数据库表明 2.查看数据库表的结…

数字图像处理:亮度对比度-几何变换-噪声处理

文章目录 数字图像增强亮度与对比度转换几何变换图像裁剪尺寸变换图像旋转 噪声处理添加噪声处理噪声 数字图像增强 亮度与对比度转换 图像变换可分为以下两种&#xff1a; 点算子&#xff1a;基于像素变换&#xff0c;在这一类图像变换中&#xff0c;仅仅根据输入像素值计算…

【计算机网络】 ARP协议和DNS协议

文章目录 数据包在传输过程中的变化过程单播组播和广播ARP协议ARP代理免费ARP路由数据转发过程DNS协议 数据包在传输过程中的变化过程 在说ARP和DNS之前&#xff0c;我们需要知道数据包在传输过程的变化过程 从图片中可以看到&#xff0c;发送方的原数据最开始是在应用层&…

vscode使用delve调试golang程序

环境配置 delve仓库&#xff0c;含有教程&#xff1a;https://github.com/go-delve/delve golang的debugging教程&#xff1a;https://github.com/golang/vscode-go/wiki/debugging > go version go version go1.20 windows/amd64> go install github.com/go-delve/de…

华为云Stack的学习(五)

六、华为云stack服务简介 1.云服务在华为云Stack中的位置 云服务对接多个数据中心资源池层提供的资源&#xff0c;并向各种行业应用提供载体。 2.华为云Stack通用服务 2.1 云计算的服务模式 2.2 计算相关的云服务 2.3 存储相关的云服务 2.4 网络相关的云服务 3.云化案例 **…

如何取消KEIL-MDK工程中出现的CMSIS绿色图标

如何取消KEIL-MDK工程中出现的CMSIS绿色图标&#xff1f;我以前经常遇到&#xff0c;不知道怎么搞&#xff0c;好像也不影响编译结果。以前问过其他人&#xff0c;但是不知道怎么搞&#xff0c;相信很多人也遇到过。水平有限&#xff0c;表达不清楚&#xff0c;见下图&#xff…

Bootstrap的标题类(标题样式h1~h6)

Bootstrap 的标题字体大小通常遵循以下样式规则&#xff1a; h1 标题的字体大小为 2.5rem&#xff08;40像素&#xff09;。h2 标题的字体大小为 2rem&#xff08;32像素&#xff09;。h3 标题的字体大小为 1.75rem&#xff08;28像素&#xff09;。h4 标题的字体大小为 1.5re…

【trie树】CF Edu12 E

Problem - E - Codeforces 题意&#xff1a; 思路&#xff1a; 这其实是一个套路题 区间异或转化成前缀异或&#xff0c;然后枚举 i 对于每一个 i&#xff0c;ai ^ x k&#xff0c;对 x 计数 先建一棵字典树&#xff0c;然后在字典树上计数 先去对 > k 的部分计数&a…

国际版阿里云/腾讯云:弹性高性能计算E-HPC入门概述

入门概述 本文介绍E-HPC的运用流程&#xff0c;帮助您快速上手运用弹性高性能核算。 下文以创立集群&#xff0c;在集群中安装GROMACS软件并运转水分子算例进行高性能核算为例&#xff0c;介绍弹性高性能核算的运用流程&#xff0c;帮助您快速上手运用弹性高性能核算。运用流程…

易云维®医院后勤管理系统软件利用物联网智能网关帮助实现医院设备实现智能化、信息化管理

近年来&#xff0c;我国医院逐渐意识到医院设备信息化管理的重要性&#xff0c;逐步建立医院后勤管理系统软件&#xff0c;以提高信息化管理水平。该系统是利用数据库技术&#xff0c;为医院的中央空调、洁净空调、电梯、锅炉、医疗设备等建立电子档案&#xff0c;把设备监控、…

性能炸裂c++20协程+iocp/epoll,超轻量高性能异步库开发实战

前言&#xff1a; c20出来有一段时间了。其中一大功能就是终于支持协程了&#xff08;c作为行业大哥大级别的语言&#xff0c;居然到C20才开始支持协程&#xff0c;我也是无力吐槽了&#xff0c;让多少人等了多少年&#xff0c;等了多少青春&#xff09;但千呼万唤他终于还是来…

基于Matlab实现多个图像融合案例(附上源码+数据集)

图像融合是将多幅图像合成为一幅图像的过程&#xff0c;旨在融合不同图像的信息以获得更多的细节和更丰富的视觉效果。在本文中&#xff0c;我们将介绍如何使用Matlab实现图像融合。 文章目录 简单案例源码数据集下载 简单案例 首先&#xff0c;我们需要了解图像融合的两种主…

【STM32】IIC的初步使用

IIC简介 物理层 连接多个devices 它是一个支持设备的总线。“总线”指多个设备共用的信号线。在一个 I2C 通讯总线中&#xff0c;可连接多个 I2C 通讯设备&#xff0c;支持多个通讯主机及多个通讯从机。 两根线 一个 I2C 总线只使用两条总线线路&#xff0c;一条双向串行数…

Android Studio 汉化

一、汉化&#xff1a; 查看版本号&#xff0c;查看Android Studio版本&#xff0c;根据版本下载对应的汉化包。例如我的是223。 下载汉化包&#xff1a; 中文语言包下载地址 找到对应的版本 回到Android Studio 1、进入设置 2、从磁盘安装插件 3、选择下载好的包点击OK 4、…

介绍OpenCV

OpenCV是一个开源计算机视觉库&#xff0c;可用于各种任务&#xff0c;如物体识别、人脸识别、运动跟踪、图像处理和视频处理等。它最初由英特尔公司开发&#xff0c;目前由跨学科开发人员社区维护和支持。OpenCV可以在多个平台上运行&#xff0c;包括Windows、Linux、Android和…

AJAX学习笔记6 JQuery对AJAX进行封装

AJAX学习笔记5同步与异步理解_biubiubiu0706的博客-CSDN博客 AJAX请求相关的代码都是类似的&#xff0c;有很多重复的代码&#xff0c;这些重复的代码能不能不写&#xff0c;能不能封装一个工具类。要发送ajax请求的话&#xff0c;就直接调用这个工具类中的相关函数即可。 用J…