【Linux】基于环形队列RingQueue的生产消费者模型

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

目录

前言

环形队列的概念及定义

POSIX信号量

RingQueue的实现方式

RingQueue.hpp的构建

Thread.hpp

Main.cc主函数的编写

Task.hpp function包装器的使用

总结



前言

世上有两种耀眼的光芒,一种是正在升起的太阳,一种是正在努力学习编程的你!一个爱学编程的人。各位看官,我衷心的希望这篇博客能对你们有所帮助,同时也希望各位看官能对我的文章给与点评,希望我们能够携手共同促进进步,在编程的道路上越走越远!


提示:以下是本篇文章正文内容,下面案例可供参考

环形队列的概念及定义

  • 环形队列采用数组模拟,用模运算来模拟环状特性

  • 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来 判断满或者空。另外也可以预留一个空的位置,作为满的状态

  • 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程

为什么判断阻塞队列为空为满时,要在我们对应的加锁和解锁之间呢?

  • 判断阻塞队列是否为满,本身就是对阻塞队列内部的成员属性做访问、做比较,如果判断在加锁和解锁的区间之外判断时,可能会出现pop和push的操作,会导致并发访问出问题,判断也就不准了。加锁是对内部资源(阻塞队列)进行整体使用的,虽然对阻塞队列保护起来了,但是对阻塞队列的使用情况,我们在锁这里看不出来,我们只能证明当前阻塞队列有人想要使用它,但是队列中的情况是不清楚的。

当队列为空的时候,生产者和消费者的下标是同一个位置;

当队列为满的时候,生产者和消费者下标是用同一个位置。

环形队列判空盘满的时候,难度有点大,因为生产者和消费者在队列为空为满时,都是用同一个位置,所以

  1. 方法一:在环形队列中引入了一个元素的计数器count,count==0时,为空;count==N时,为满;
  2. 方法二:当生产者下标加1等于消费者下标,则说明队列为满。

环形队列中共分三种情况:

生产者和消费者下标为同一个位置:

  • 队列满:当队列为满时,生产者就不能在盘子中放入数据了,否则会覆盖之前的数据;所以要访问临界资源,要让消费者先跑。
  • 队列空:当队列为空时,必须保证要生产者先跑,因为生产者和消费者为空,指向同一个位置,那么这个位置就是一个局部性的临界资源,不能让两者同时跑,否则会出现二义性(盘子和苹果),所以要保证两者互斥,其次必须生产者先跑。

通过队列为空为满这两种情况得出结论:生产者不能把消费者套一个圈;消费者不能超过生产者。

生产者和消费者的下标不是同一个位置:

  • 队列一定不为空&&队列一定不为满

生产者和消费者的下标不在同一个位置,就意味着生产者和消费者的动作,可以真正的并发。

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

我们在电影院中买票,把票买到了,(申请信号量成功了)能证明电影院中一定有资源给我。所以当我申请信号量成功之后,我根本就不用判断这里面的资源是否满足我的条件,所以信号量是一把计数器,这个计数器是用来衡量资源数目的,只要申请成功,就一定会有对应的资源提供给你,从而有效减少内部的判断!!!

在进入临界资源之前,申请信号量时,就已经知道要的资源有还是没有了;而阻塞队列加锁那里,我们还要在对应的临界区里做判断。

定义一个信号量跟定义一个整型一样。

理解:信号量内部维护了一个计数器和队列

初始化信号量

#include <semaphore.h> 
int sem_init(sem_t *sem, int pshared, unsigned int value); 
参数: pshared:值为0表示线程间共享,非零表示进程间共享 value:信号量初始值 

销毁信号量

int sem_destroy(sem_t *sem); 

等待信号量

功能:等待信号量,会将信号量的值减1 
int sem_wait(sem_t *sem); //P() 
等待信号量:对指定信号量的计数器做--,
减之前先判断:信号量值是否大于0,大于0,继续往后走;小于0,则阻塞

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。 
int sem_post(sem_t *sem);//V() 
发布信号量:对指定信号量的计数器做++

RingQueue的实现方式

RingQueue.hpp的构建

将和环形队列相关的控制方法进行封装,通过模板传入Thread模板之中,之后每个线程都能看到环形队列的相关方法及规则,从而更好的对所有的线程进行管理,依旧是遵循Linux中的先描述,再组织。

#pragma once
// 环形队列
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <pthread.h>// 单生产,单消费
// 多生产,多消费
// "321":
// 3: 三种关系
// a: 生产和消费互斥和同步
// b: 生产者之间:一把锁
// c: 消费者之间:一把锁
// b和c的解决方案:加锁,是因为下标只有一个
// 1. 需要几把锁?2把
// 2. 如何加锁?
template<typename T>
class RingQueue
{
private:void P(sem_t& sem){sem_wait(&sem);// -1}void V(sem_t& sem){sem_post(&sem);// +1}// 加锁和解锁的接口封装void Lock(pthread_mutex_t& mutex){pthread_mutex_lock(&mutex);}void Unlock(pthread_mutex_t& mutex){pthread_mutex_unlock(&mutex);}
public:RingQueue(int cap) : _ring_queue(cap), _cap(cap), _productor_step(0), _consumer_step(0){sem_init(&_room_sem, 0, _cap);// 0:线程间共享sem_init(&_data_sem, 0, 0);pthread_mutex_init(&_productor_mutex, nullptr);pthread_mutex_init(&_consumer_mutex, nullptr);}// 入队列操作void Enqueue(const T& in){// 先申请信号量再加锁的优点:// 多个线程来了,抢信号量都不互相干扰;虽然竞争锁的时候,只有一个线程竞争成功了,// 其余线程都在锁这里等待下次竞争锁,但是这些线程都提前预定了信号量,当这些线程被唤醒时,// 直接进来生产就行了,效率提高了// 生产行为P(_room_sem);// 先申请空间资源,申请信号量:本质是对资源的预定机制,是原子的Lock(_productor_mutex);// 加锁使得多生产多消费--->转为单生产但消费// 一定有空间!!!_ring_queue[_productor_step++] = in; // 生产,先访问再++_productor_step %= _cap;// 防止越界,维持环状特性Unlock(_productor_mutex);V(_data_sem);// 释放数据信号量,数据信号量+1}void Pop(T* out){// 消费行为P(_data_sem);// 先申请数据资源Lock(_consumer_mutex);*out = _ring_queue[_consumer_step++];_consumer_step %= _cap;Unlock(_consumer_mutex);V(_room_sem);// 释放空间信号量:数据取走,空间露出来,把空间资源释放掉}~RingQueue(){sem_destroy(&_room_sem);sem_destroy(&_data_sem);pthread_mutex_destroy(&_productor_mutex);pthread_mutex_destroy(&_consumer_mutex);}
private:// 1. 环形队列std::vector<T> _ring_queue;int _cap; // 环形队列的容量上限// 2. 生产和消费的下标int _productor_step;// 多个生产者线程都要竞争这个下标,下标只有一个,所以得争锁int _consumer_step;// 3. 定义信号量// 当队列为空为满时,生产和消费线程都在各自的信号量中的队列中休眠sem_t _room_sem; // 生产者关心(空间信号量)sem_t _data_sem; // 消费者关心(数据信号量)// 4. 定义锁,维护多生产之间、多消费之间的互斥关系pthread_mutex_t _productor_mutex;pthread_mutex_t _consumer_mutex;
};

Thread.hpp

#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>namespace ThreadModule
{template<typename T>using func_t = std::function<void(T&, std::string name)>;// typedef std::function<void(const T&)> func_t;template<typename T>class Thread{public:void Excute(){// std::cout << _threadname << std::endl;// 回调线程方法(生产者和消费者执行的函数)_func(_data, _threadname);}public:Thread(func_t<T> func, T& data, std::string name = "none-name"): _func(func), _data(data), _threadname(name), _stop(true){}static void* threadroutine(void* args) // 类成员函数,形参是有this指针的!!{Thread<T>* self = static_cast<Thread<T> *>(args);self->Excute();return nullptr;}bool Start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if (!n){_stop = false;return true;}else{return false;}}void Detach(){if (!_stop){pthread_detach(_tid);}}void Join(){if (!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}void Stop(){_stop = true;}~Thread() {}private:pthread_t _tid;std::string _threadname;T& _data;  // 为了让所有的线程访问同一个全局变量func_t<T> _func;bool _stop;};
} // namespace ThreadModule#endif

Main.cc主函数的编写

#define _CRT_SECURE_NO_WARNINGS 1#include "RingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
#include <ctime>// 我们需要的是向队列中投递任务
using namespace ThreadModule;
using ringqueue_t = RingQueue<Task>;void Consumer(ringqueue_t& rq, std::string name)
{while (true){sleep(2);// 1. 消费任务Task t;rq.Pop(&t);std::cout << "Consumer handler task: " << "[" << name << "]" << std::endl;// 2. 处理任务t();}
}void Productor(ringqueue_t& rq, std::string name)
{srand(time(nullptr) ^ pthread_self());//int cnt = 10;while (true){// 获取任务// 生产任务rq.Enqueue(Download);std::cout << "Productor : " << "[" << name << "]" << std::endl;// cnt--;}
}void InitComm(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq, func_t<ringqueue_t> func, const std::string& who)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1) + "-" + who;threads->emplace_back(func, rq, name);// 构建出一个线程对象// threads->back()->Start();// 真正的创建出新线程// 当前线程对象在创建新线程和执行函数方法时,主线程可能会先一步回来又创建了一个线程对象,// 那么vector容器中最后一个元素就改变了,那么又执行容器的最后一个线程可能会出错,// 因为上一个线程的执行函数的过程还没有执行完,刚拿到最后一个线程的数据时,还没来得及使用,// 容器中最后一个线程变化了,那么就拿新线程的数据,但是新线程的数据并没有初始化完成,// 此时访问的对象那个就是一个空对象。// 所以我们应该把线程构建起来,最后统一启动StartAll}
}void InitConsumer(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq)
{InitComm(threads, num, rq, Consumer, "consumer");
}void InitProductor(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq)
{InitComm(threads, num, rq, Productor, "productor");
}void WaitAllThread(std::vector<Thread<ringqueue_t>>& threads)
{for (auto& thread : threads){thread.Join();}
}void StartAll(std::vector<Thread<ringqueue_t>>& threads)
{for (auto& thread : threads){std::cout << "start: " << thread.name() << std::endl;thread.Start();}
}int main()
{ringqueue_t* rq = new ringqueue_t(10);std::vector<Thread<ringqueue_t>> threads;// std::vector<Thread<ThreadData>> threads;InitProductor(&threads, 1, *rq);InitConsumer(&threads, 1, *rq);StartAll(threads);WaitAllThread(threads);return 0;
}

Task.hpp function包装器的使用

Task是一个function<void()>的类型,也就是说用Task实例化出的模板可以接收任意类型的函数方法(也就是生产消费者模型中的任务)这样就最大的实现了来什么执行什么,大大提高了代码的灵活性可拓展性。

#pragma once
#include <iostream>
#include <functional>using Task = std::function<void()>;void Download()
{std::cout << "this is a download task" << std::endl;
}

总结

好了,本篇博客到这里就结束了,如果有更好的观点,请及时留言,我会认真观看并学习。
不积硅步,无以至千里;不积小流,无以成江海。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/46301.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于Kafka Topic分区和Replication分配的策略

文章目录 1. Topic多分区2. 理想的策略3. 实际的策略4. 如何自定义策略 1. Topic多分区 如图&#xff0c;是一个多分区Topic在Kafka集群中可能得分配情况。 P0-RL代表分区0&#xff0c;Leader副本。 这个Topic是3分区2副本的配置。分区尽量均匀分在不同的Broker上&#xff0c…

第3章 创建最小(Minimal APIs)API应用程序

第1章 框架学习的基石与实战策略 第2章 大话ASP.NET Core 入门 第3章 创建最小&#xff08;Minimal APIs&#xff09;API应用程序 1.最小API概述 在ASP.NET Core的广阔天地里&#xff0c;"最小API应用程序"如同一把轻巧而强大的瑞士军刀&#xff0c;专为迅速构建…

idea中打开静态网页端口是63342而不是8080

问题&#xff1a; 安装了tomcat 并且也配置了环境&#xff0c;但是在tomcat下运行&#xff0c;总是在63342下面显示。这也就意味着&#xff0c;并没有运行到tomcat环境下。 找了好几个教程&#xff08;中间还去学习了maven&#xff0c;因为跟的教程里面&#xff0c;没有maven,但…

【 香橙派 AIpro评测】烧系统运行部署LLMS大模型体验Jupyter Lab AI 应用样例(新手入门)

文章目录 一、引言⭐1.1下载镜像烧系统⭐1.2开发板初始化系统配置远程登陆&#x1f496; 远程ssh&#x1f496;查看ubuntu桌面&#x1f496; 远程向日葵 二、部署LLMS大模型2.1 快速启动&#x1f496;拉取代码&#x1f496;下载mode数据&#x1f496;启动模型对话 三、体验 内置…

【算法笔记自学】第 10 章 提高篇(4)——图算法专题

10.1图的定义和相关术语 #include <cstdio>const int MAXN 100; int degree[MAXN] {0};int main() {int n, m, u, v;scanf("%d%d", &n, &m);for (int j 0; j < m; j) {scanf("%d%d", &u, &v);degree[u];degree[v];}for (int i…

自学鸿蒙HarmonyOS的ArkTS语言<九>自定义弹窗组件CustomDialog及二次封装自定义弹窗

一、自定义弹窗 CustomDialog struct CustomDialogBuilder {controller: CustomDialogController new CustomDialogController({ // 注意写法builder: CustomDialogBuilder({})})// controller: CustomDialogController // 这种预览会报错cancel?: () > voidconfirm?: (…

微信小游戏 彩色试管 倒水游戏 逻辑

最近开始研究微信小游戏&#xff0c;有兴趣的 可以关注一下 公众号&#xff0c; 记录一些心路历程和源代码。 定义一个 Cup类&#xff1a; 主要功能 初始化水杯&#xff1a;根据传入的颜色信息初始化水杯中的水层。 倒水&#xff1a;模拟水杯倾斜并倒出水的过程。 加水&…

Prometheus 云原生 - 基于 file_sd、http_sd 实现 Service Discovery

目录 开始 为什么需要服务发现机制 File Service Discovery&#xff08;file_sd&#xff09; 基本概念 配置方式 使用案例 HTTP Service Discovery&#xff08;http_sd&#xff09; 基本概念 配置方式 使用案例 开始 为什么需要服务发现机制 我们知道在 Prometheus …

【链表】算法题(一) ---- 力扣 / 牛客

一、移除链表元素 移除链表中值为val的元素&#xff0c;并返回新的头节点 思路&#xff1a; 题目上这样说&#xff0c;我们就可以创建一个新的链表&#xff0c;将值不为val的节点&#xff0c;尾插到新的链表当中&#xff0c;最后返回新链表的头节点。 typedef struct ListNo…

[安洵杯 2019]easy_web1

知识点&#xff1a; 1.base64加解密 2.md5加解密 3.md5碰撞绕过强类型比较 4.Linux命令绕过 进入页面发现url地址中存在 img参数和一个cmd参数&#xff0c;img参数看上去像是base64编码&#xff0c;可以去尝试一下解码. 进行了两次base64解密得到3535352e706e67看着像16进制那么…

员工聊天记录监控方法大全(五种方法你自己选择)

在现代企业中&#xff0c;为了保障业务安全、防止数据泄露和促进工作效率&#xff0c;很多公司会采用各种方法监控员工的聊天记录。虽然听起来有点“大哥哥在看着你”的感觉&#xff0c;但只要在合法和透明的前提下进行&#xff0c;这其实是为了构建一个更加健康、安全的工作环…

网络流问题-Min-cut

文章目录 1. 网络流问题基础1.1 概述1.2 常规算法1.3 总结 2. Ford-Fulkerson Algorithm2.1 概述2.2 Ford 算法2.3 Ford 算法小结 链接&#xff1a; B站学习视频 1. 网络流问题基础 1.1 概述 最大流问题主要是关于有向图问题。有向图中有m个边&#xff0c; n个节点,其中有一个…

怎么用PPT录制微课?详细步骤解析!

随着信息技术的不断发展&#xff0c;微课作为一种新型的教学形式&#xff0c;因其短小精悍、针对性强等特点&#xff0c;在教育领域得到了广泛的应用。而PPT作为一款常用的演示工具&#xff0c;不仅可以用来制作课件&#xff0c;还可以利用其内置的录屏功能或结合专业的录屏软件…

Dify中的经济索引模式实现过程

当索引模式为经济时&#xff0c;使用离线的向量引擎、关键词索引等方式&#xff0c;降低了准确度但无需花费 Token。 一.提取函数**_extract** 根据不同文档类型进行内容的提取&#xff1a; def _extract(self, index_processor: BaseIndexProcessor, dataset_document: Data…

FastAPI 学习之路(四十三)路径操作的高级配置

在实际开发中&#xff0c;可能我们有些接口不能在接口文档中与其他业务接口一样开放给前端或者其他对接人&#xff0c;那么我们肯定会想着在接口文档中对其进行屏蔽隐藏操作&#xff0c;那么可以实现吗&#xff1f; 接口文档中隐藏接口 当然&#xff0c;还很简单&#xff0c;…

【CSS in Depth 2 精译】2.6 CSS 自定义属性(即 CSS 变量)+ 2.7 本章小结

文章目录 2.6 自定义属性&#xff08;即 CSS 变量&#xff09;2.6.1 动态变更自定义属性 2.7 本章小结 当前内容所在位置 第一章 层叠、优先级与继承第二章 相对单位 2.1 相对单位的威力2.2 em 与 rem2.3 告别像素思维2.4 视口的相对单位2.5 无单位的数值与行高2.6 自定义属性 …

PGCCC|【PostgreSQL】PCA+PCP+PCM等IT类认证申报个税退税指南

小编特将PostgreSQL证书申报个税退税流程&#xff0c;编辑成文&#xff0c;供大家申报参考哦~ 1.申报专项附加扣除 第一步&#xff1a;打开个人所得税APP&#xff0c;选择“专项附加扣除填报”&#xff1a; 第二步&#xff1a;“扣除年度”选择您要申报的年度&#xff0c;并…

Windows 默认以管理员运行打开CMD

winr 默认以管理员打开运行CMD 需求&#xff1a;在运行页面输入cmd 希望是可以直接通过管理员方式打开的。 winr 打开运行 输入secpol.msc 打开本地安全策略&#xff08;注意家庭版是没有这个的&#xff09; 找到本地策略–安全选项–用户帐户控制: 以管理员批准模式运行所有管…

基于Python thinker GUI界面的股票评论数据及投资者情绪分析设计与实现

1.绪论 1.1背景介绍 Python 的 Tkinter 库提供了创建用户界面的工具&#xff0c;可以用来构建股票评论数据及投资者情绪分析的图形用户界面&#xff08;GUI&#xff09;。通过该界面&#xff0c;用户可以输入股票评论数据&#xff0c;然后通过情感分析等技术对评论进行情绪分析…

JavaScript 中 await 永远不会 resolve 的 Promise 会导致内存泄露吗?

前言 在 JavaScript 中&#xff0c;await 关键字用于等待一个 Promise 完成&#xff0c;它只能在异步函数&#xff08;async function&#xff09;内部使用。当 await 一个永远不会 resolve 的 Promise 时&#xff0c;它确实会阻塞异步函数的进一步执行&#xff0c;但不会直接…