生产者消费者模型和线程同步问题

文章目录

  • 线程同步概念
  • 生产者消费者模型
  • 条件变量
    • 使用条件变量
    • 唤醒条件变量
  • 阻塞队列

线程同步概念

互斥能保证安全,但是仅有安全不够,同步可以更高效的使用资源

生产者消费者模型

下面就基于生产者消费者来深入线程同步等概念:
在这里插入图片描述在这里插入图片描述如何理解生产消费者模型:
以函数调用为例:
在这里插入图片描述
在这里插入图片描述

两个线程之间要进行信息交互就需要引入一段内存空间(交易场所)
线程a将数据放入缓冲区(交易场所),线程b从缓冲区进行读取
这样线程a完成数据存放后就继续做自己的事情,线程b去读取数据
这样就能很好的实现多执行流之间的执行解耦
特点:很好的提高了处理数据的能力
支持忙闲不均

条件变量

条件变量:为了不让消费者的每次消费为无效消费.
所以对于生产者,在每次完成自己的任务之后对条件做出改变,当条件的变量达到一定条件后,消费者才进行有效消费
无效消费过程: 消费前(加锁)----尝试消费(无效消费)—消费结束(解锁)

条件变量的目的:
1.不做无效的锁申请
2.假设消费者很多,让他们有执行顺序
相当于条件变量给各个线程在调度他之前给一个提醒

条件变量本质是数据:可以理解为:

在这里插入图片描述

使用条件变量

认识接口
在这里插入图片描述
与互斥锁的创建和使用非常相似

pthread_cond_destroy();//创建布局条件变量要后要进行销毁
pthread_cond_init();//对局部的条件变量进行初始化
pthread_cond_t;//关键字 创建布局变量
全局就要提供PTHREAD_COND_INITIALIZER的宏来进行初始化

条件变量创建的前提是有线程安全,所以条件变量的接口和互斥锁的接口大致类似.

条件创建了还要有一个接口来等待条件成立:

在这里插入图片描述

pthread_cond_wait();//等待条件成立,参数为条件变量和互斥锁
上述所有的参数返回值都是在成功时返回0
失败返回错误原因

唤醒条件变量

在这里插入图片描述

pthread_cond_signal();//唤醒指定的条件变量,并唤醒一个线程
pthread_cond_broadcast();//是条件变量成立,并唤醒所有的线程

在没有条件变量的时候,打印信息如图:

在这里插入图片描述
可以看到线程的调度是不确定的,我们想让这个线程按照我们想要的顺序(如:一次Thread-1,Thread-2,Thread-3,这样)进行打印,那么就需要用到条件变量.
代码:

#include <iostream>
#include <unistd.h>
#include <string>
#include <pthread.h>//创建条件变量和互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;void *ThreadRoutine(void* args)
{std::string name = static_cast<const char *>(args);while(true){sleep(1);pthread_mutex_lock(&mutex);pthread_cond_wait(&cond, &mutex);std::cout << "I am a new Thread, my name is " << name << std::endl;pthread_mutex_unlock(&mutex);}
}
int main()
{pthread_t t1, t2, t3;pthread_create(&t1, nullptr, ThreadRoutine, (void*)"Thread-1");pthread_create(&t2, nullptr, ThreadRoutine, (void*)"Thread-2");pthread_create(&t3, nullptr, ThreadRoutine, (void*)"Thread-3");while(true){sleep(1);pthread_cond_signal(&cond);}pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);return 0;
}

我们让代码按照t1, t2, t3的线程顺序来执行子线程的任务
在这里插入图片描述
换一个顺序验证也是如此
如果没有条件变量,这个也是按照顺序打印,不过是批次进行,和CPU时钟机制有关,所有使用条件变量更好

使用pthread_cond_broadcast();//唤醒全部线程
就跟加锁机制一样,不过每次是各个线程只执行一次后就会等待,并不想无锁那样批次打印

在这里插入图片描述

单纯的互斥能保证线程的安全, 但不一定合理或者高效.
pthread_cond_wait();//
在等待的时候,会释放这把锁(等待是在临界区内,释放锁是为了资源高效利用,再次加锁是不允许在有锁的临界区内有无锁的线程存在)

再被唤醒的时候,又会再次加锁
当被唤醒的时候,重新申请也是需要参与锁的竞争的(未解决这个问题, 看下main阻塞队列部分的讲解)

阻塞队列

这个队列只有为空,为满两种状态

为空:消费线程不能再消费
为满:生产线程不能在生产

这个场景也满足上述说明的所需的321原则
(3种关系,生产–生产,消费–消费
2中角色:生产者,消费者
1个环境(这个阻塞队列就是一个临界区))
单生产,单消费:
基于队列实现,阻塞队列的操作(消费者生产者实例):

伪唤醒:
在这里插入图片描述

在这份代码中,将来如果因为productor慢不满足生产, 多个线程在一个阻塞队列中等待,而有一个Push达到(有一个生产刚产出), 假设此时的代码是将全部线程都唤醒,那么除了第一个线程得到条件变量的满足和锁的满足,其他线程会在条件变量下的等待转化为竞争锁等待的情况, 假设此时若第一个线程完成pop且unlock速度快,那么这时后续的线程会在得到锁之后直接对空队列进行Pop操作,这是就会出现错误,这个状态就是伪唤醒(条件不满足,但是线程被唤醒了)

(虽然上述只是假设, 但是cpu的运行速度很快, 我们不防会有这样的情况发生)

所以修改Pop内的if(IsFull)代码和Push内的(IsEmpty)代码,还可以使用之前的锁封装的代码

在这里插入图片描述
此时的消费者生产者代码:

BlockQueue.hpp

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"int defaultcap = 5; // for testtemplate <class T>
class BlockQueue
{
public:BlockQueue(int cap = defaultcap): _capacity(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_p_cond, nullptr);pthread_cond_init(&_c_cond, nullptr);}bool IsEmpty(){return _q.size() == 0;//查看队列状态}bool IsFull(){return _q.size() == _capacity;//此时为满,}bool Pop(T *out){LockGuard lg(&_mutex);while(IsEmpty()){//为空, 进行等待pthread_cond_wait(&_c_cond, &_mutex);}*out = _q.front();_q.pop();//可以生产//可增加水准线进行响应的操作pthread_cond_signal(&_p_cond);//pthread_mutex_unlock(&_mutex);return true;}bool Push(const T &in){// 当前变量进行加锁LockGuard lg(&_mutex);//pthread_mutex_lock(&_mutex);while(IsFull()){// 为满,进行阻塞等待pthread_cond_wait(&_p_cond, &_mutex);}// 进行生产等待_q.push(in);//可以进行消费pthread_cond_signal(&_c_cond);//pthread_mutex_unlock(&_mutex);return true;}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_p_cond);pthread_cond_destroy(&_c_cond);}private:std::queue<T> _q;int _capacity; // 为空时,不能再消费,为满时,不能再生产,状态是capacity与size进行比较pthread_mutex_t _mutex;pthread_cond_t _p_cond;pthread_cond_t _c_cond;
};

main.cc

#include "BlockQueue.hpp"
#include <pthread.h>
#include <time.h>
#include <sys/types.h>
#include <unistd.h>void *productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){int data = rand() % 10 + 1;//[1,10];bq->Push(data);std::cout << "consumer data: " << data << std::endl;sleep(1);}
}
void *consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){int data = 0;bq->Pop(&data);std::cout << "product data: " << data << std::endl;}
}
int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());BlockQueue<int> * bq = new BlockQueue<int>();pthread_t c, p;//两个线程pthread_create(&p, nullptr, productor, bq);pthread_create(&c, nullptr, consumer, bq);pthread_join(p, nullptr);pthread_join(c, nullptr);return 0;
}

利用生产者消费者模型实现分派任务的操作()
在此基础上构建一个任务类型:

Task.hpp

#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
enum //设置退出码
{ok = 0,div_zero,mod_zero,unknow
};
const std::string opts = "+-*/%)[()]"; //设置随机运算
class Task
{
public:Task()//无参构造函数用于生成无参临时对象,如果有参构造是全缺省那么可以不用写这个无参构造函数{}Task(int x, int y, char op):data_x(x), data_y(y), opt(op), result(0), code(ok){}void Run()//任务主题内容{switch(opt){case '+':result = data_x + data_y;break;case '-':result = data_x - data_y;break;case '*':result = data_x * data_y;break;case '/':{if(data_y == 0){code = div_zero;}else{result = data_x / data_y;}break;}case '%':{if(data_y == 0){code = mod_zero;}else{result = data_x % data_y;}break;}default:code = unknow;break;}}void operator()(){Run();}~Task(){}//打印任务,用于更清晰的认识std::string print_task(){std::string s;s = std::to_string(data_x) + opt + std::to_string(data_y) + "=?\n";return s;}std::string print_result(){std::string s;s = std::to_string(data_x) + opt + std::to_string(data_y) + "=" + std::to_string(result) + "[" + std::to_string(code) + "]" + "\n";return s;}
private:int data_x;int data_y;char opt;int result;int code;
};

对上述代码的修改:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <time.h>
#include <sys/types.h>
#include <unistd.h>void *productor(void *args)//生产者
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);//阻塞队列的对象while(true){int x = rand() % 11;//[0,10];int y = rand() % 11;//[0,10];char opt = opts[rand() % opts.size()];Task t(x, y, opt);std::cout << t.print_task() << std::endl;;bq->Push(t);//放入队列,队列size+1//usleep(1000);}
}
void *consumer(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while(true){sleep(1);Task t;//1.拿到消费数据bq->Pop(&t);//2.执行任务t();//3.打印任务信息std::cout << t.print_result() << std::endl;;}
}
int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());//伪随机种子BlockQueue<Task> * bq = new BlockQueue<Task>();//创建一个阻塞队列pthread_t c, p;//两个线程pthread_create(&p, nullptr, productor, bq);//两个线程模拟消费者生产者模型pthread_create(&c, nullptr, consumer, bq);pthread_join(p, nullptr);pthread_join(c, nullptr);return 0;
}

针对上述代码,生产者和消费者本身就是互斥的,也就是串行执行,怎么会高效呢?

探究这个问题,首先从消费者消费后去哪里?生产在生产之前从哪来?来考虑.
生产者的数据,在产生时是花费时间,消费者消费也要花时间.
在消费者处理数据时花时间的同时生产者在某个时刻刚好将数据传给临界区,生产者只需要保证自己完成传送就可以做其他自己的事,消费者自己继续处理数据
所以高效,并发不体现在同步互斥,而是在拿数据,处理数据这里.

多线程任务下的消费者生产者模型多对多:
在这里插入图片描述将bq和线程名字一起封装可以更好的观察:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本篇结束,下篇更精彩,关注我,带你飞~~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/42866.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[高频 SQL 50 题(基础版)]第一千七百五十七题,可回收且低脂产品

题目&#xff1a; 表&#xff1a;Products ---------------------- | Column Name | Type | ---------------------- | product_id | int | | low_fats | enum | | recyclable | enum | ---------------------- product_id 是该表的主键&#xff08;具有唯…

SQLite 命令行客户端 + HTA 实现简易UI

SQLite 命令行客户端 HTA 实现简易UI SQLite 客户端.hta目录结构参考资料 仅用于探索可行性&#xff0c;就只实现了 SELECT。 SQLite 客户端.hta <!DOCTYPE html> <html> <head><meta http-equiv"Content-Type" content"text/html; cha…

C语言 | Leetcode C语言题解之第226题翻转二叉树

题目&#xff1a; 题解&#xff1a; struct TreeNode* invertTree(struct TreeNode* root) {if (root NULL) {return NULL;}struct TreeNode* left invertTree(root->left);struct TreeNode* right invertTree(root->right);root->left right;root->right le…

LeetCode加油站(贪心算法/暴力,分析其时间和空间复杂度)

题目描述 一.原本暴力算法 最初的想法是&#xff1a;先比较gas数组和cost数组的大小&#xff0c;找到可以作为起始点的站点(因为如果你起始点的油还不能到达下一个站点&#xff0c;就不能作为起始点)。当找到过后&#xff0c;再去依次顺序跑一圈&#xff0c;如果剩余的油为负数…

从数据仓库到数据湖(下):热门的数据湖开源框架

文章目录 一、前言二、Delta Lake三、Apache Hudi四、Apache Iceberg五、Apache Paimon六、对比七、笔者观点八、总结八、参考资料 一、前言 在上一篇从数据仓库到数据湖(上)&#xff1a;数据湖导论文章中&#xff0c;我们简单讲述了数据湖的起源、使用原因及其本质。本篇文章…

AI论文作图——如何表示模型参数冻结状态

一、LOGO &#x1f525; win10win11 ❄️ win10win11 二、注意事项&#xff1a; 根据电脑系统&#xff0c;选择对应的版本。 参考&#xff1a; 【AI论文作图】如何表示模型参数冻结状态&#xff1f;

神经网络中的激活函数

目录 一、什么是激活函数&#xff1a;二、如何选择激活函数&#xff1a;1.Sigmoid激活函数&#xff1a;2.线性激活函数&#xff1a;3.ReLU激活函数&#xff1a; 一、什么是激活函数&#xff1a; 激活函数是神经网络中的一种函数&#xff0c;它在神经元中起到了非线性映射的作用…

最新 Kubernetes 集群部署 + flannel 网络插件(保姆级教程,最新 K8S 版本)

资源列表 操作系统配置主机名IP所需插件CentOS 7.92C4Gk8s-master192.168.60.143flannel-cni-plugin、flannel、coredns、etcd、kube-apiserver、kube-controller-manager、kube-proxy、 kube-scheduler 、containerd、pause 、crictlCentOS 7.92C4Gk8s-node01192.168.60.144f…

gitee上传和下载idea项目的流程

环境&#xff1a;idea2022 一、上传项目 1、在gitee中新建一个仓库。 2、打开所要上传的项目的文件夹&#xff0c;点击Git Bash&#xff0c;生成.git文件夹。 3、在idea中打开所要上传的项目&#xff0c;在控制台的Terminal菜单中&#xff0c;输入git add . (注意&#xf…

安防综合管理/视频汇聚平台EasyCVR视频监控存储技术:高效稳定的视频数据保障方案

随着科技的飞速发展&#xff0c;视频监控已成为现代社会不可或缺的一部分。无论是城市治安、交通管理&#xff0c;还是商业安保、家庭监控&#xff0c;视频监控都发挥着至关重要的作用。而在这背后&#xff0c;视频监控存储技术则是确保监控数据得以长期保存、高效检索和可靠利…

「C++系列」C++ 修饰符类型

文章目录 一、C 修饰符类型1. 访问修饰符&#xff08;Access Modifiers&#xff09;2. 存储类修饰符&#xff08;Storage Class Specifiers&#xff09;3. 类型修饰符&#xff08;Type Modifiers&#xff09;4. 函数修饰符 二、C 修饰符类型-案例1. 访问修饰符案例2. 存储类修饰…

精讲:java之多维数组的使用

一、多维数组简介 1.为什么需要二维数组 我们看下面这个例子&#xff1f;“ 某公司2022年全年各个月份的销售额进行登记。按月份存储&#xff0c;可以使用一维数组。如果改写为按季度为单位存储怎么办呢&#xff1f; 或许现在学习了一维数组的你只能申请四个一维数组去存储每…

【福利】代码公开!咸鱼之王自动答题脚本

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 微信或QQ打开咸鱼之王小程序&#xff0c;进入答题界面&#xff0c;运行main.py。期间不要动鼠标。 可自行更改代码来适配自己的需求~ 可以按照示例图片…

深入了解线程锁的使用及锁的本质

文章目录 线程锁的本质局部锁的使用 锁的封装及演示线程饥饿问题 线程加锁本质可重入和线程安全死锁问题 根据前面内容的概述, 上述我们已经知道了在linux下关于线程封装和线程互斥,锁的相关的概念, 下面就来介绍一下关于线程锁的一些其他概念. 线程锁的本质 当这个锁是全局的…

Codeforces Round #956 (Div. 2) and ByteRace 2024

A. Array Divisibility 思路: 找出特例,发现输出 1∼&#x1d45b; 符合题意。直接输出1~n即可. 代码: #include<bits/stdc.h> using namespace std; typedef long long ll; #define N 1000005 ll dp[N], w[N], v[N], h[N]; ll dis[1005][1005]; ll a, b, c, n, m, t;…

iOS 开发技巧 - 使用本地 json 文件

前言 使用本地 json 文件的场景&#xff0c;在我们开发功能的阶段&#xff0c;服务端接口字段定义好了后&#xff0c;有些接口响应很慢&#xff0c;请求到响应可能要 几十秒甚至一分钟&#xff0c;我们需要频繁调用接口来调试功能&#xff1b;还有就是调用一些我们需要付费的三…

Ubuntu20.04下修改samba用户密码

Ubuntu20.04下修改samba用户密码 在Ubuntu系统中&#xff0c;修改samba密码通常涉及到两个方面&#xff1a;更改samba用户的密码和重置samba服务的密码数据库。以下是如何进行操作的步骤&#xff1a; 1、更改samba用户密码&#xff1a; 打开终端&#xff0c;使用以下命令更改…

vue打包terser压缩去除控制台打印和断点

情况一&#xff1a; 1、vue-cli搭建 代码压缩工具terser在vue-cli里面是自动支持的&#xff0c;所以直接在vue.config.js里面加入下面配置&#xff1a; const {defineConfig} require(vue/cli-service) module.exportsdefineConfig({transpileDependencies:true,terser:{te…

看影视学英语(假如第一季第一集)

in the hour也代表一小时吗&#xff1f;等同于in an hour&#xff1f;

activemq-CVE-2022-41678

Apache ActiveMQ Jolokia 后台远程代码执行漏洞 Apache ActiveMQ在5.16.5&#xff0c;5.17.3版本及以前&#xff0c;后台Jolokia存在一处任意文件写入导致的远程代码执行漏洞。 启动环境 admin/admin 方法一&#xff1a;利用poc 这个方法受到ActiveMQ版本的限制&#xff0c;因…