1.快速认识信号量接口
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。我们之前认识SystemV信号量时有这样三个结论:
1.信号量的本质是一把计数器
2.申请信号量本质就是预定资源
3.PV操作是原子的
我们之前在基于阻塞队列中通过代码实现的生产者消费者模型是通过STL容器中的queue这个队列当作整体来使用的,当然因为是STL容器,也只能当作整体来使用,但是如果我们能通过自己模拟实现一种数据结构是可以将这个队列整体进行拆分成一小块一小块的临界资源,然后让多线程不访问临界资源的同一个区域,实现让每一个线程都能并发访问属于自己的一小块区域。
但是这样的话我们最怕的是如果只有7份资源,但是访问的线程有8,9个,那么就会造成冲突的问题,所以就可以使用信号量来解决。当线程要进行访问临界资源的时候:
①先申请信号量P
②然后再访问指定的某一个位置
③最后释放信号量。
而访问指定的某一个指定的位置本质就是访问临界资源,那么这里还需要进行判断能否访问吗?其实不用再进行判断了,因为如果申请信号量成功了就已经说明条件满足了,也可以说在信号量那个地方就已经进行判断过了。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
2.基于环形队列的CP问题——理论
环形队列采用数组模拟,用模运算来模拟环状特性。
我们把这个环形队列就看作是一个生产者和消费者的交易场所(临界资源)。
之前我们认识的环形队列有头尾指针head,tail,现在一样的,开始时我们让productor,consumer两个整数指向同一位置,然后1.我们想让多线程同时看到我们这个环形队列,2.让生产者和消费者能并发访问该环形队列。
刚开始的时候队列是空的,我们先让生产者先放数据,让消费者不动,如果生产者生产的数据已经在环形队列中绕了一圈了,那么生产者还能继续生产吗?
在这种情况下,生产者不能再继续生产数据了,因为如果生产者已经把消费者绕了一圈了之后如果再继续生产数据就会把历史上生产的数据都给覆盖了,但是这个时候消费者还并没有拿到对应的数据。如果此时让消费者拿数据,让生产者停下来,如果当消费者把数据都拿完了,那么也不能继续拿数据了,因为拿完之后生产者并没有生产新数据,如果继续拿数据拿到的都是乱码。
我们要正确的生产和消费需要满足下面几个原则:
①生产者不能把消费者套一个圈
②消费者不能超过生产者
用一个小游戏来理解:
下面我们把环形队列想象成一个圆形餐桌,假设你和小明在餐桌上玩一个游戏,你放苹果,小明拿苹果,一开始你和小明站在同一个盘子旁边,首先得你先放苹果,假设你已经放满了,那你就不能继续放了,因为遵循上面得原则你相当于生产者,小明相当于消费者,小明不能超过你。所以此时小明可以把放满了苹果得盘子一个一个拿走,然后盘子又变空了,但是我们的目的并不是让你放一个,小明拿一个,而是为了实现你放苹果的同时,小明也能拿苹果,只有当餐桌盘子苹果放满,或者一个苹果都没有的时候需要注意一点,其他情况下都是可以你放一个苹果的同时,小明可以拿另一个苹果,哪怕小明拿苹果的速度比较慢,你放的比较快,你和小明也不会指向同一个位置。此时就可以实现多线程并发进入。
总结下来就是,生产者和消费者只有两种场景会指向同一个位置:
③为空:只能(互斥)让生产者跑(同步)
④为满:只能(互斥)让消费者跑(同步)
其他情况根本不会指向同一个位置,这时就可以实现多线程并发进入临界区。
对资源的认识:生产者和消费者看待资源的角度是不同的,生产者会生产数据,但是需要空间来放数据,所以生产者认为空间是资源,而消费者则认为数据是资源。
刚开始时环形队列为空,空间是充足的,所以我们让生产者的信号量为:p->sem_space = N;
刚开始没有数据,所以设置消费者的信号量为:c->sem_data = 0;
下面我们用伪代码通过上面的4个原则来实现这一基于环形队列的CP问题:
生产者: 消费者:
P(sem_space) P(sem_data)
//生产行为,位置待定 //消费行为,位置待定
V(sem_data) V(sem_space)
那么下面我们争对上面的4种情况来跑一下这段伪代码:
如果一开始生产一直生产,生产一个sem_space--,当sem_space减到0时,再想生产就会被阻塞,申请不了信号量,这就叫做生产者不会把消费者套一个圈。假设现在已经生产满了,那么sem_data也就变成了N,此时生产者阻塞着,我们让消费者一直消费,也就是让sem_data--,直到减到0的时候消费者会被阻塞住,那么消费者把数据消费完就不会再消费了。
消费者生产者哪个线程先运行我们是说不准的,但是一开始队列是空的,只有空间资源,没有数据资源,所以一开始消费者就是会被阻塞,让生产者生产数据,也就是为空的时候只能让生产者跑,而如果生产的数据满了,生产者也会被阻塞住,也就是为满时只能让消费者跑,所以说这份伪代码是满足上面的四个原则的。
3.基于环形队列的CP问题——代码
RingQueue.hpp
#pragma once#include<iostream>
#include<vector>
#include<semaphore.h>const int defaultsize = 5;
const int defaultvalue = 0;template<class T>
class RingQueue
{
public:RingQueue(int size = defaultsize):ringqueue(size),_size(size),_p_index(defaultvalue),_c_index(defaultvalue){sem_init(&_space_sem,0,size);sem_init(&_data_sem,0,0);}void Push(const T& in){//生产P(_space_sem);ringqueue[_p_index] = in;_p_index++;_p_index%=_size;V(_data_sem);}void Pop(T* out){//消费P(_data_sem);*out = ringqueue[_c_index];_c_index++;_c_index%=_size;V(_space_sem);}~RingQueue(){sem_destroy(&_space_sem);sem_destroy(&_data_sem);}private:void P(sem_t & sem){sem_wait(&sem);}void V(sem_t &sem){sem_post(&sem);}private:std::vector<T> ringqueue;int _size;sem_t _space_sem;sem_t _data_sem;int _p_index;int _c_index;
};
Main.cc
#include "RingQueue.hpp"
#include<pthread.h>
#include<unistd.h>void * productor(void * args)
{RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);int cnt = 100;sleep(5);while(true){//rq->push();rq->Push(cnt);std::cout<<"Productor done,data is: "<<cnt<<std::endl;cnt--;sleep(1);}return nullptr;
}void * consumer(void * args)
{RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);int data = 0;while(true){//rq->pop();rq->Pop(&data);std::cout<<"Consumer done,data is: "<<data<<std::endl;}return nullptr;
}int main()
{pthread_t c,p;RingQueue<int>* rq = new RingQueue<int>();pthread_create(&c,nullptr,consumer,rq);pthread_create(&p,nullptr,productor,rq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
Makefile:
testRingQueue:Main.ccg++ -o $@ $^ -lpthread -std=c++11.PHONY:clean
clean:rm -rf testRingQueue
刚开始我们的消费者运行还是生产者运行我们不太清楚,但是刚开始执行Push,Pop函数时,消费者一定是被阻塞的,而生产者是可以直接往后执行进行生产的,所以这时一定是生产者先生产,但是生产者生产前我们sleep(6),所以6s后是生产一个数据,消费者消费一个数据,生产一个,消费一个这样的顺序执行下去,下面我们看运行结果:
前6秒:
后面执行:
下面我们再修改一小段代码实现一瞬间让生产者把环形队列空间的数据生产满了,然后生产者被阻塞,然后让消费者sleep(1);之后进行消费,后面的现象就变成了消费一个生产一个,消费一个生产一个,下面我们把上述Main.cc中的consumer函数和productor函数进行修改部分代码:
void * productor(void * args)
{RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);int cnt = 100;while(true){//rq->push();rq->Push(cnt);std::cout<<"Productor done,data is: "<<cnt<<std::endl;cnt--;}return nullptr;
}void * consumer(void * args)
{RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);int data = 0;while(true){sleep(1);//rq->pop();rq->Pop(&data);std::cout<<"Consumer done,data is: "<<data<<std::endl;}return nullptr;
}
下面我们看运行结果:
我们发现确实一开始生产者直接生产了5个数据,然后消费一个生产一个,消费一个生产一个。
我们上面仅仅只是对整形数据做了测试,如果我们把整形数据换成类对象可以吗?
下面我们就把Task封装成一个类作为数据传进来,该类实现的任务是生产者生产数据通过随机传入两个操作数一个操作数,消费者对传入的操作数进行处理输出对应的结果:
Task.hpp
#pragma once
#include<string>
const int defaultvalue = 0;
enum
{ok = 0,div_zero,mod_zero,unknow
};const std::string opers = "+-*/%&()";
class Task
{
public:Task(){}Task(int x, int y, char op, int result=defaultvalue, int code=ok): _data_x(x), _data_y(y), _oper(op){}std::string PrintTask(){std::string s;s= std::to_string(_data_x);s+=_oper;s+=std::to_string(_data_y);s+="=?";return s;}std::string PrintResult(){std::string s;s= std::to_string(_data_x);s+=_oper;s+=std::to_string(_data_y);s+="=";s+=std::to_string(_result);s+=" [";s+=std::to_string(_code);s+="]";return s;}void operator()(){Run();}void Run(){switch (_oper){case '+':_result = _data_x + _data_y;break;case '-':_result = _data_x - _data_y;break;case '*':_result = _data_x * _data_y;break;case '/':{if (_data_y == 0){_code = div_zero;}else _result = _data_x / _data_y;}break;case '%':{if (_data_y == 0){_code = mod_zero;}else _result = _data_x % _data_y;}break;default:_code = unknow;break;}}~Task() {}private:int _data_x; // 操作数int _data_y; // 操作数char _oper; // 运算符int _result; // 结果int _code; // 结果码 0:可信 !0:不可信
};
Makefile和RingQueue.hpp的代码不修改,对Main.cc修改:
#include "RingQueue.hpp"
#include<pthread.h>
#include<unistd.h>
#include"task.hpp"
#include<ctime>
void * productor(void * args)
{RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);//sleep(5);while(true){int data1 = rand()%10;usleep(rand()%123);int data2 = rand()%10;usleep(rand()%123);char oper = opers[rand() % opers.size()];//2.生产数据 Task t(data1,data2,oper);rq->Push(t);std::string task_string = t.PrintTask();std::cout<<"productor task: "<<task_string<<std::endl;//rq->push();}return nullptr;
}void * consumer(void * args)
{RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);int data = 0;while(true){//sleep(1);//rq->pop();Task t;rq->Pop(&t);t();std::cout<<"consumer Result: "<<t.PrintResult()<<std::endl;}return nullptr;
}int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());//只是为了生成更随机的数据pthread_t c,p;RingQueue<Task>* rq = new RingQueue<Task>();pthread_create(&c,nullptr,consumer,rq);pthread_create(&p,nullptr,productor,rq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
运行结果:
目前我们上面写的代码都是基于单生产单消费 的代码,如果我们要修改成多生产和多消费的话我们就需要维护生产者和消费者之间的互斥关系。
4.修改整体代码为多生产者和多消费者
RingQueue.hpp
#pragma once#include <iostream>
#include <vector>
#include <semaphore.h>
#include "LockGuard.hpp"
const int defaultsize = 5;
const int DefaultValue = 0;template <class T>
class RingQueue
{
public:RingQueue(int size = defaultsize): ringqueue(size), _size(size), _p_index(DefaultValue), _c_index(DefaultValue){sem_init(&_space_sem, 0, size);sem_init(&_data_sem, 0, 0);pthread_mutex_init(&_p_mutex, nullptr);pthread_mutex_init(&_c_mutex, nullptr);}void Push(const T &in){// 生产P(_space_sem);{LockGuard lockguard(&_p_mutex);ringqueue[_p_index] = in;_p_index++;_p_index %= _size;}V(_data_sem);}void Pop(T *out){// 消费P(_data_sem);{LockGuard lockguard(&_c_mutex);*out = ringqueue[_c_index];_c_index++;_c_index %= _size;}V(_space_sem);}~RingQueue(){sem_destroy(&_space_sem);sem_destroy(&_data_sem);pthread_mutex_destroy(&_p_mutex);pthread_mutex_destroy(&_c_mutex);}private:void P(sem_t &sem){sem_wait(&sem);}void V(sem_t &sem){sem_post(&sem);}private:std::vector<T> ringqueue;int _size;sem_t _space_sem;sem_t _data_sem;int _p_index;int _c_index;pthread_mutex_t _p_mutex;pthread_mutex_t _c_mutex;
};
Task.hpp
#pragma once
#include<string>
const int defaultvalue = 0;
enum
{ok = 0,div_zero,mod_zero,unknow
};const std::string opers = "+-*/%&()";
class Task
{
public:Task(){}Task(int x, int y, char op, int result=defaultvalue, int code=ok): _data_x(x), _data_y(y), _oper(op){}std::string PrintTask(){std::string s;s= std::to_string(_data_x);s+=_oper;s+=std::to_string(_data_y);s+="=?";return s;}std::string PrintResult(){std::string s;s= std::to_string(_data_x);s+=_oper;s+=std::to_string(_data_y);s+="=";s+=std::to_string(_result);s+=" [";s+=std::to_string(_code);s+="]";return s;}void operator()(){Run();}void Run(){switch (_oper){case '+':_result = _data_x + _data_y;break;case '-':_result = _data_x - _data_y;break;case '*':_result = _data_x * _data_y;break;case '/':{if (_data_y == 0){_code = div_zero;}else _result = _data_x / _data_y;}break;case '%':{if (_data_y == 0){_code = mod_zero;}else _result = _data_x % _data_y;}break;default:_code = unknow;break;}}~Task() {}private:int _data_x; // 操作数int _data_y; // 操作数char _oper; // 运算符int _result; // 结果int _code; // 结果码 0:可信 !0:不可信
};
Main.cc
#include "RingQueue.hpp"
#include<pthread.h>
#include<unistd.h>
#include"task.hpp"
#include<ctime>class ThreadData
{
public:ThreadData(const std::string & name,RingQueue<Task>* rq): threadname(name),_rq(rq){}RingQueue<Task> * getRQ(){return this->_rq;}void setRQ(RingQueue<Task>* rq){this->_rq = rq;}std::string getName(){return this->threadname;}void setName(std::string name){this->threadname = name;}private:std::string threadname;RingQueue<Task>* _rq;
};void * productor(void * args)
{ThreadData* td = static_cast<ThreadData*>(args);//sleep(5);while(true){int data1 = rand()%10;usleep(rand()%123);int data2 = rand()%10;usleep(rand()%123);char oper = opers[rand() % opers.size()];//2.生产数据 Task t(data1,data2,oper);td->getRQ()->Push(t);std::string task_string = t.PrintTask();std::cout<<"productor task: "<<task_string<<" threadname: "<<td->getName()<<std::endl;//rq->push();sleep(1);}return nullptr;
}void * consumer(void * args)
{ThreadData* td = static_cast<ThreadData*>(args);while(true){//sleep(1);//rq->pop();Task t;td->getRQ()->Pop(&t);t();std::cout<<"consumer Result: "<<t.PrintResult()<<" threadname: "<<td->getName()<<std::endl;}return nullptr;
}int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());//只是为了生成更随机的数据pthread_t c[3],p[2];RingQueue<Task>* rq = new RingQueue<Task>();ThreadData td1("consumer-1",rq),td2("consumer-2",rq),td3("consumer-3",rq),td4("productor-1",rq),td5("productor-2",rq);pthread_create(&p[0],nullptr,productor,&td4);pthread_create(&p[1],nullptr,productor,&td4);pthread_create(&c[0],nullptr,consumer,&td1);pthread_create(&c[1],nullptr,consumer,&td2);pthread_create(&c[2],nullptr,consumer,&td3);pthread_join(c[0],nullptr);pthread_join(c[1],nullptr);pthread_join(c[2],nullptr);pthread_join(p[0],nullptr);pthread_join(p[1],nullptr);return 0;
}
运行结果:
所以以上就是基于环形队列的多生产多消费者模型的代码实现。