文章目录
- 1.生产者和消费者模型
- 2.生产者和消费者模型优点
- 3.阻塞队列
- 4.POSIX信号量
- 5.基于环形队列的生产消费模型
本文完整的代码放在了这: Gitee链接
1.生产者和消费者模型
生产者和消费者模型,概括起来其实是一个321原则:3是:三种关系,2是两种角色,1是1个交易场所。
三种关系有:生产者和生产者、消费者和消费者、生产者和消费者。
- 其中生产者vs生产者之间是:互斥关系。就好比如一家超市,两种品牌的供应商之间是竞争的关系,假设同一时间只有一家供应商能向超市供货,超市就是临界资源。所以用互斥保障安全。
- 消费者vs消费者也是互斥关系。如果是在沙漠上,你和你的舍友都非常渴,如果只有一瓶水,那么不得抢起来啊!所以消费者和消费者之间也要保证互斥关系。
- 而生产者和消费者则是:互斥和同步的关系。为什么要是互斥呢? 假设超市有一个冰柜,生产者需要往冰柜上放上雪糕,而正好消费者要拿生产者要放到冰柜的拿个雪糕。那么消费者有没有那个雪糕呢?这取决于雪糕还是没放,这是不确定的,在生产者消费者模型等中,要是确定性的。雪糕要么放到冰柜,要么不放到冰柜!不能是将放未放!所以互斥是保障其安全性
为什么要同步呢? 供应商刚往超市供完货,然后一直打电话问超市要不要供货,电话信道一直被供应商占有,消费者想打电话没东西都没有机会。所以要保障生产者和消费者之间有一定的顺序性,供应商刚刚把货供满就等等,等到消费得差不多了,再来供货。
两种角色是生成者和消费者。
一个交易场所,上面举例中,我们把超市到当成了交易的场所。在计算机中,这个交易场所就是一块特定的内存空间!
2.生产者和消费者模型优点
- 多线程站在调度的角度是提高CPU的并发度,而站在编码角度,天然的解耦!生产者消费者模型起到解耦的作用
- 支持忙闲不均,也就是说,可以支持生产者和消费的处理能力的不同
- 支持并发,生产者和消费者模型是高效的,为什么呢?首先生产者生产数据可以从用户中来或者网络中来,生产数据是要花费时间的;同样的道理消费者消费数据也是需要花费时间的,在生产者生产数据,消费者消费数据,这同一时刻,它们有很大的概率是并发执行的!它的高效并不能只关注放数据和取数据的角度上!
3.阻塞队列
阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。
参考代码
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>template <class Type>
class bolck_queue
{
public:bolck_queue(int max_capacity = 5){_max_capacity = max_capacity;pthread_mutex_init(&_lock,nullptr);pthread_cond_init(&_producer_cond,nullptr);pthread_cond_init(&_consumer_cond,nullptr);_hight_water = (2/3) * _max_capacity;_low_water = (1/3) * _max_capacity;}~bolck_queue(){pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_producer_cond);pthread_cond_destroy(&_consumer_cond);}
public:// 生产者,生产操作void push(const Type& input){pthread_mutex_lock(&_lock);/*这里(伪唤醒)if(_queue.size() == _max_capacity){pthread_cond_wait(&_producer_cond,&_lock);}*/while (_queue.size() == _max_capacity){pthread_cond_wait(&_producer_cond,&_lock);}// 符合条件,可以生产_queue.push(input);// 这里生成完毕可以告诉消费者可以消费了if(_queue.size() > _hight_water){pthread_cond_signal(&_consumer_cond);}pthread_mutex_unlock(&_lock);}// 消费者,消费操作Type pop(){pthread_mutex_lock(&_lock);// 同理while防止伪唤醒while(_queue.size() == 0){pthread_cond_wait(&_consumer_cond,&_lock);}// 符合条件,可以消费Type output = _queue.front();_queue.pop();// 这里消费了,可以告诉生产者来生产了!这里可以定制策略if(_queue.size() < _low_water){pthread_cond_signal(&_producer_cond);}pthread_mutex_unlock(&_lock);return output;}
private: std::queue<Type> _queue; // 交易场所,缓存(这里把_queue当成一个整体!)int _max_capacity; // 极值,阻塞队列的最大值pthread_mutex_t _lock; // 一把锁,为什么? 保证三种关系的互斥关系!pthread_cond_t _producer_cond; // 生产者条件变量pthread_cond_t _consumer_cond; // 消费者条件变量int _hight_water; // 高水位int _low_water; // 低水位
};
4.POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
信号量的本质是计数器! 用来描述资源,当申请信号量时就已经间接的判断临界资源是否就绪,如果申请成功就一定有对应的资源!就好比如,去电影院买票,只有票(信号量)买上了,就一定有对应的位置供你观影使用!同样的线程,成功申请了信号量,那里临界资源一定有线程所能访问的资源。
初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); // P()操作
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);// V()操作
5.基于环形队列的生产消费模型
假设两个人,一个人(生产者)往桌子上摆蛋糕,一个人(消费者)往桌子上取蛋糕。
- 当为空或者满时,这个下标对应的位置只有一个人能访问,为空只能时生产者访问,放蛋糕,为满只能是消费者能访问,取蛋糕,其他情况两个人都能同时访问
- 生产者不能太快,不能给消费者套一个圈
- 消费者者不能太快,不能超过生产者
这里假定两个人,是单生产和单消费!也可以是放蛋糕的人有多个,取蛋糕的人也有多个,对应是多生产多消费!
上面可以通过信号量,来处理生产者和消费者的关系! 生产者关注的是空间(space = N),而消费者关注的是数据(data = 0)
要通过两个锁,保证生产者VS生产者;消费者VS消费者的互斥关系
参考代码
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>template <class T>
class ring_queue
{
public:ring_queue(int capacity = 5):_ring_queue(capacity),_capacity(capacity),_consumer_step(0),_producter_step(0){sem_init(&_c_data_sem,0,0);sem_init(&_p_space_sem,0,capacity);pthread_mutex_init(&_c_lock,nullptr);pthread_mutex_init(&_p_lock,nullptr);}~ring_queue(){sem_destroy(&_c_data_sem);sem_destroy(&_p_space_sem);pthread_mutex_destroy(&_c_lock);pthread_mutex_destroy(&_p_lock);}
private:void P(sem_t& sem){sem_wait(&sem);}void V(sem_t& sem){sem_post(&sem);}void lock(pthread_mutex_t& lock){pthread_mutex_lock(&lock);}void unlock(pthread_mutex_t& lock){pthread_mutex_unlock(&lock);}public:void push(const T& input){P(_p_space_sem);lock(_p_lock);_ring_queue[_producter_step] = input;_producter_step++;_producter_step %= _capacity;unlock(_p_lock);V(_c_data_sem);}T pop(){/*lock(_c_lock);P(_c_data_sem);为什么不是这样了,加锁放在P操作之前?理由1: P是原子的不需要保护,加锁之间的代码要尽可能少理由2:要让信号量尽可能去,先获取,当锁一旦释放,里面就能竞争锁,而不是持有锁去竞争!就好如:电影院先买票(信号量),然后电影开映直接持有票,进场看电影 */P(_c_data_sem);lock(_c_lock);T output = _ring_queue[_consumer_step];_consumer_step++;_consumer_step %= _capacity;unlock(_c_lock);V(_p_space_sem);return output;}
private:std::vector<T> _ring_queue; // 缓冲区,将其看成多份临界资源int _capacity; // 缓冲区的大小int _consumer_step; // 消费者下标int _producter_step; // 生产者下标sem_t _c_data_sem; // 消费者关注的资源的信号量sem_t _p_space_sem; // 生产者关注的空间的信号量pthread_mutex_t _c_lock; // 解决消费者VS消费者之间的:互斥问题pthread_mutex_t _p_lock; // 解决生产者VS生成者之间的:互斥问题
};