在上一篇博客中我们讲到了在加锁过程中,线程竞争锁是自由竞争的,竞争能力强的线程会导致其他线程抢不到锁,访问不了临界资源导致其他线程一直阻塞,造成其它线程的饥饿问题,想要解决此问题又涉及一个新概念线程同步
一、线程同步
1.1线程同步
线程同步的概念:
在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
结合故事理解一下:
假设有一个房间,这个房间有且仅有对应的一把锁能够打开;这个房间在一段时间内有且只有一人能够进入里面,而想要进入里面的人很多。为了不让全部人都挤在门口,人们商量了一下,决定排队去进入这个房间,每当一个人用完此房间后就要回到这个房间的队尾,防止有人占着茅坑不拉屎
故事里的人就是线程,房间就是临界资源,钥匙就是锁
既然线程同步理解了,那么如何实现线程同步呢?需要用条件变量来实现。
1.2条件变量
条件变量的概念:用来描述某种临界资源是否就绪的一种数据化描述
结合故事理解条件变量是什么
假设有一个盘子,盘子一侧有一个哑巴,他的任务是不停地往盘子里面放水果;盘子另一侧是一队盲人,他们的任务是判断盘子里有没有水果。而这个盘子就是临界资源,盘子被锁住了,不论是谁只有抢到锁的人才能动这个盘子。
由于盲人的数量远多于哑巴,盲人又看不见,想要判断盘子里面有无水果就只能疯狂申请锁,---------->这就导致了放水果的哑巴饥饿了,抢不到锁去放水果
那么条件变量在哪呢?
别急故事还没讲完。这时候有了一个铃铛,如果盲人检测不到盘子里有水果,那么盲人就去铃铛哪里排队。当所有盲人排位队后,哑巴这时就能放水果了,放完水果后哑巴解除锁再敲一下铃铛让一个盲人(也可以叫全部盲人)去检测盘子。
条件变量 = 铃铛 + 队列
1.3条件变量的系统调用
条件变量同样是由原生线程库维护的,所以使用的是POSIX标准,和互斥锁的接口非常相似
创建条件变量:
pthread_cond_t cond;
cond是英文condition的缩写
条件变量的初始化:man pthread_cond_init
参数:
pthread_cond_t *cond: 要初始化的条件变量
*cont_attr : 设为nullptr就行了
返回值:成功返回0,失败返回错误码
静态、全局的初始化
int pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
条件变量的销毁:man pthread_cond_destroy
参数:
*cond: 所要销毁的条件变量的地址
返回值:销毁成功返回0,失败返回错误码
条件变量的等待:man pthread_cond_wait
参数:
*cond: 所要等待的条件变量的地址
*mutex:互斥锁的地址(后面解释为什么要传锁)
返回值: 把条件变量放入等待队列,放入成功返回0,失败返回错误码
条件变量的唤醒:
man pthread_cond_signal(唤醒一个线程)
参数:
*cond: 所要等待的条件变量的地址
返回值:唤醒成功返回0,失败返回错误码
作用:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的一个线程
man pthread_cond_broadcast
参数:
*cond: 所要等待的条件变量的地址
返回值:唤醒成功返回0,失败返回错误码
作用:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的所有线程
1.4测试代码(cond系统调用的使用)
创建多个线程,用主线程控制其它线程阻塞,直到主线程唤醒才继续执行其它线程的线程函数
#include<iostream>
#include<string>
#include<vector>
#include<pthread.h>
#include<unistd.h>
using namespace std;pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
void* MasterCode(void *args)//线程函数
{sleep(1);cout<<"主线程开始工作..."<<endl;string name = static_cast<const char*>(args);while(true){int n = pthread_cond_signal(&gcond);if(n == 0){sleep(1);cout<<"主线程唤醒一个线程..."<<endl;}}
}
void* SlaverCode(void *args)
{string name = static_cast<const char*>(args);while(true){//1.加锁pthread_mutex_lock(&gmutex);//2.条件变量是在加锁和解锁之间使用的 pthread_cond_wait(&gcond,&gmutex);//阻塞等待主线程来唤醒//走到这说明此线程已经被主线程唤醒了 cout<<"被唤醒的线程是:"<<name<<endl;//解锁pthread_mutex_unlock(&gmutex);}
}void StartMaster(std::vector<pthread_t> *tidsptr)
{pthread_t tid;int n = pthread_create(&tid,nullptr,MasterCode,(void*)"Master Thread");if(n == 0){std::cout<<"主线程创建成功"<<std::endl;tidsptr->emplace_back(tid);}
}
void StartSlaver(std::vector<pthread_t> *tidsptr,int thraednum)
{for(int i=0;i<thraednum;i++){char *name = new char[64];snprintf(name,64,"Slaver-%d",i+1);pthread_t tid;int n = pthread_create(&tid,nullptr,SlaverCode,(void*)name);if(n == 0){std::cout<<"新线程创建成功:"<<name<<std::endl;tidsptr->emplace_back(tid);}}
}void WaitThread(std::vector<pthread_t> &tids)//等待线程
{for(auto &tid:tids){pthread_join(tid,nullptr);}
}int main()
{std::vector<pthread_t> tids;//vector中放tidStartMaster(&tids);//主线程开启(创建)StartSlaver(&tids,4);//被控制线程(创建)WaitThread(tids);return 0;
}
二、生产者、消费者模型
2.1生产消费模型的概念
生产、消费模型:
讨论问题的本质,如何并发的执行数据传递的问题(从一个模块传到另一个模块)
结合生活场景理解:
消费者线程:读取数据的线程
生产者线程:产生数据的线程
商品:数据
超市:能临时保存数据的"内存空间"(某种数据结构对象),本质是对商品的缓冲
在超市、厂商和顾客构成的生产者、消费者模型中:生产者是产品供应商,消费者是超市的顾客,而超市是一个交易产所。
超市:共享资源(要保护) ------> 临界资源
厂商、用户:多个执行流(线程)
以上结合就需要考虑线程的同步与互斥的问题
并发问题:
生产者与消费者的关系:同步&&互斥关系,厂商生产时不影响用户消费,但是厂商供货时用户不能消费。
生产者与生产者的关系:竞争关系,超市的供应商不止一家,就算是同一种商品还有不同品牌的供应商,他们彼此相互竞争,其表现就是互斥
消费者与消费者的关系:也是竞争关系,假设超市只有一种商品且这种商品只剩一份了,那么大量的消费者都要涌入这家超市,而这家超市一次只能进入一个人,所以消费者之间也是互斥
为什么说超市的本质是对商品的缓冲呢?
生产消费模型可以提供比较好的并发度(厂商生产时不影响用户消费,能做到生产和消费进行解耦,支持忙闲不均)
2.2模型实现(阻塞队列实现)
生产消费模型+条件变量
阻塞队列实现生产消费模型
2.2.1框架
Makefile
p_c_bq:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f p_c_bq
Main.cc
#include<iostream>
#include<vector>
#include<unistd.h>
#include "Thread.hpp"
#include<functional>
#include<pthread.h>
#include "BlockQueue.hpp"using namespace Thread_Module;void* productor(BlockQueue<int> &bq)
{ int a = 1;while(true)//获取任务{//...}return nullptr;
}void* consumer(BlockQueue<int> &bq)
{while(true)//制作任务{//... }return nullptr
}void Comm(std::vector<Thread<BlockQueue<int>>> *threads,int num,BlockQueue<int> &bq,func_t<BlockQueue<int>> func)
{for(int i=0;i<num;i++){std::string name = "thread-"+std::to_string(i+1);threads->emplace_back(func,bq,name);threads->back().Start();}
}void ProductorStart(std::vector<Thread<BlockQueue<int>>> *threads,int num,BlockQueue<int> &bq)
{Comm(threads,num,bq,productor);
}void ConsumerStart(std::vector<Thread<BlockQueue<int>>> *threads,int num,BlockQueue<int> &bq)
{Comm(threads,num,bq,consumer);
}void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)
{for(auto &thread:threads){thread.Join();}
}
int main()
{BlockQueue<int> *bq = new BlockQueue<int>(5);std::vector<Thread<BlockQueue<int>>> threads;ProductorStart(&threads,1,*bq);ConsumerStart(&threads,1,*bq);WaitAllThread(threads);return 0;
}
BlockQueue.hpp
#include<iostream>
#include<string>
#include<queue>
#include<pthread.h>template <class T>
class BlockQueue
{
public:BlockQueue(int cap)//构造:_cap(cap){pthread_mutex_init(&_mutex,nullptr);//锁的初始化//条件变量的初始化pthread_cond_init(&_product_cond,nullptr);pthread_cond_init(&_consumer_cond,nullptr);}//生产者用的接口(入阻塞队列)void Enqueue(T &in){pthread_mutex_lock(&_mutex);//...pthread_mutex_unlock(&_mutex);}//消费者用的接口(出队列)void Pop(T *out){pthread_mutex_lock(&_mutex);//...pthread_mutex_unlock(&_mutex);}//析构~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consumer_cond);}
private:std::queue<T> _bq;//阻塞队列int _cap;//队列上限pthread_mutex_t _mutex;//锁pthread_cond_t _product_cond;//生产者的条件变量pthread_cond_t _consumer_cond;//消费者的条件变量};
之前的封装的线程注意 T _data数据需要引用 T &_data
#include<iostream>
#include<signal.h>
#include<unistd.h>
#include<functional>
#include<pthread.h>namespace Thread_Module
{template <typename T>using func_t = std::function<void(T&)>;// typedef std::function<void(const T&)> func_t;template <typename T>class Thread{public:void Excute(){_func(_data);}public:Thread(func_t<T> func,T &data,const std::string &threadname = "none"):_threadname(threadname),_func(func),_data(data){}static void* threadrun(void *args)//线程函数{Thread<T> *self = static_cast <Thread<T>*>(args);self->Excute();return nullptr;}bool Start()//线程启动!{int n = pthread_create(&_tid,nullptr,threadrun,this);if(!n)//返回0说明创建成功{_stop = false;//说明线程正常运行return true;}else{return false;}}void Stop(){_stop = true;}void Detach()//线程分离{if(!_stop){pthread_detach(_tid);}}void Join()//线程等待{if(!_stop){pthread_join(_tid,nullptr);}}std::string threadname()//返回线程名字{return _threadname;}~Thread(){}private:pthread_t _tid;//线程tidstd::string _threadname;//线程名T &_data;//数据func_t<T> _func;//线程函数bool _stop; //判断线程是否停止 为true(1)停止,为false(0)正常运行};
}
2.2.2代码完善
生产者调用接口和消费者调用接口进行实现
void Enqueue(const T &in) // 生产者用的接口{pthread_mutex_lock(&_mutex);while(IsFull())//判断队列是否已经满了{pthread_cond_wait(&_product_cond, &_mutex); //满的时候就在此情况下等待// 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁}// 进行生产_bq.push(in);// 通知消费者来消费pthread_cond_signal(&_consumer_cond);pthread_mutex_unlock(&_mutex);}void Pop(T *out) // 消费者用的接口{pthread_mutex_lock(&_mutex);while(IsEmpty()){pthread_cond_wait(&_consumer_cond, &_mutex); }// 进行消费*out = _bq.front();_bq.pop();// 通知生产者来生产pthread_cond_signal(&_product_cond);pthread_mutex_unlock(&_mutex);}
1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁
再对生产者消费者接口进行完善
在Block_Queue类里面再添加两个成员变量,对生产者、消费者的阻塞(等待)数量进行计数,在唤醒生产者、消费者线程这一语句再加一个判断语句:判断阻塞计数>0才可唤醒
#include<iostream>
#include<string>
#include<queue>
#include<pthread.h>template <class T>
class BlockQueue
{
private://判断队列是否为满(满就入不了队列)bool IsFull(){return _bq.size() == _cap;}//判断队列是否为空(为空出不了队列)bool IsEmpty(){return _bq.size() == 0;}
public:BlockQueue(int cap)//构造:_cap(cap){pthread_mutex_init(&_mutex,nullptr);//锁的初始化//条件变量的初始化pthread_cond_init(&_product_cond,nullptr);pthread_cond_init(&_consumer_cond,nullptr);_product_wait_n = 0;_consumer_wait_n = 0;}//生产者用的接口(入阻塞队列)void Enqueue(T &in){pthread_mutex_lock(&_mutex);while(IsFull()){_product_wait_n++;pthread_cond_wait(&_product_cond,&_mutex);//生产者等待消费者唤醒_product_wait_n--;}//走到这已经被唤醒了//生产者开始生产_bq.push(in);// std::cout<<in<<std::endl;//pthread_cond_signal(&_consumer_cond);if(_consumer_wait_n > 0)//如果有等待的消费者唤醒消费者来消费{pthread_cond_signal(&_consumer_cond);}pthread_mutex_unlock(&_mutex);}//消费者用的接口(出队列)void Pop(T *out){pthread_mutex_lock(&_mutex);while(IsEmpty()){_consumer_wait_n++;pthread_cond_wait(&_consumer_cond,&_mutex);_consumer_wait_n--;}//走到这已经被唤醒了//开始消费*out = _bq.front();// std::cout<<out<<std::endl;_bq.pop();//唤醒生产者来生产pthread_cond_signal(&_product_cond);if(_product_wait_n>0){pthread_cond_signal(&_product_cond);}pthread_mutex_unlock(&_mutex);}// void Enqueue(const T &in) // 生产者用的接口// {// pthread_mutex_lock(&_mutex);// while(IsFull())//判断队列是否已经满了// {// pthread_cond_wait(&_product_cond, &_mutex); //满的时候就在此情况下等待// }// // 进行生产// _bq.push(in);// // 通知消费者来消费// pthread_cond_signal(&_consumer_cond);// pthread_mutex_unlock(&_mutex);// }// void Pop(T *out) // 消费者用的接口// {// pthread_mutex_lock(&_mutex);// while(IsEmpty())// {// pthread_cond_wait(&_consumer_cond, &_mutex); // }// // 进行消费// *out = _bq.front();// _bq.pop();// // 通知生产者来生产// pthread_cond_signal(&_product_cond);// pthread_mutex_unlock(&_mutex);// }//析构~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consumer_cond);}
private:std::queue<T> _bq;int _cap;//队列上限pthread_mutex_t _mutex;pthread_cond_t _product_cond;//生产者的pthread_cond_t _consumer_cond;//消费者的int _product_wait_n;//生产者的阻塞数int _consumer_wait_n;//消费者的阻塞数
};