目录
一、互斥
1、数据不一致问题
2、锁
3、饥饿问题
4、锁的原理
5、封装锁
6、抢票逻辑中加入封装的锁
7、可重入VS线程安全
8、死锁
二、同步
1、什么是同步
2、如何实现同步
3、条件变量
4、生产消费者问题
(1)CP问题
(2)基于阻塞队列快速实现CP
(3)CP处理计算任务
(4)再看CP问题
5、信号量
(1)什么是信号量
(2)信号量的使用
(3)基于环形队列的生产消费模型
一、互斥
1、数据不一致问题
我们知道,所有的线程是可以对一个全局变量进行访问和修改的,我们把很多个线程都要访问的资源叫做共享资源,那么这里就有一个问题,比如一个线程在读这个共享资源的时候另外一个线程就对这个资源做修改了,那么就有可能导致在并发访问的时候数据不一致。
接下来我们直接写一份代码来验证一下是否会有数据不一致问题。
#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <pthread.h>using namespace std;int tickets = 1000; //临界资源
#define NUM 5class threadData
{
public:threadData(int number){threadname = "thread" + to_string(number);}
public:string threadname; // 线程名
};void* getTicket(void* arg)
{threadData* td = static_cast<threadData*>(arg);const char* name = td->threadname.c_str();while(true){// 如果有票,就抢if(tickets > 0){usleep(1000); // 模拟抢票花的时间cout << name << " get a ticket, " << tickets << endl;tickets--;}else{break;}}return nullptr;}int main()
{vector<pthread_t> tids;// 创建一批线程for(int i = 1; i <= NUM; i++){pthread_t tid;threadData* td = new threadData(i);pthread_create(&tid, nullptr, getTicket, td);tids.push_back(tid);}// 线程等待for(int i = 0; i < tids.size(); i++){pthread_join(tids[i], nullptr);}return 0;
}
上述代码是创建了5个线程,一个全局变量初始化为1000,模拟5个线程抢票的逻辑,但是运行结果,让我们大吃一惊,怎么票都是0、-1、-2、-3了还在抢呢?一张票咋被抢到负数了呢?我们不是做判断了吗?原因是这个tickets是一个共享变量,在无保护的情况下多个线程并发访问的时候造成了数据不一致问题,每个线程都要对tickets做判定做--操作,有没有可能你在做判定的时候其他线程来了,你在做--操作的时候其他线程也来了呢,太有可能了。
此时我们可以得出一个结论,对一个全局变量进行多线程并发++/--操作是不安全的!
为什么是不安全的呢?因为++/--操作不是原子的!什么叫不是原子的,要么做了要么没做,没有中间状态,也可以理解为只有一条汇编语句,但是++和--操作翻译成汇编是有三条语句的,三条汇编语句就会有中间状态。
当thread1来的时候,先进行第一步操作读数据,读完数据之后正要执行第二步,时间片到了,时间片到了该线程就要被切换了,切换的时候要把该线程在CPU上的数据带走,带走是为了下一次调度的时候恢复。虽然CPU寄存器只有一套,但是每一个线程都要有CPU寄存器对应的数据,因此,寄存器不等于寄存器的内容。然后thread2线程来了,它运气非常好,这三步操作执行了很多次,直接把1000张票干到100了,然后终于它被切走了。轮到thread1了,此时要把上下文恢复到CPU寄存器中,上一次执行了第一步操作,读了1000张票,然后继续执行第二步、第三步,此时票变成999了,好不容易thread2把票干到100,结果又变成999了,把别人做的工作全部覆盖了,这种情况就导致了数据不一致问题。
为什么这个抢票会出现负数?
我们不仅在对tickets做--,还在做判断哦, 判断tickets > 0是不是运算,是呀,cpu运算分为逻辑运算和算术运算,判断也是运算,也要把数据读到CPU内,然后做判定,返回一个结果给if做执行。此时一个线程在做if判断的时候,第二个线程、第三个线程,大家可不可能都来做判断呢?完全有可能,当前票数是1,一个线程刚读到tickets是1,就被切换出去了,第二个、第三个、也是如此,每个线程都认为当前的票数是1,有没有可能,有可能呀,之后第一个线程被唤醒了做--,不就变成0了?然后第二个线程也来了,它要做tickets--,要做上面三步,此时票数就变成-1了,进来的时候你以为票数是1,其实早就被上个线程减成0了,但是此时你已经在循环里了,线程3、4也是一样的。因此,票数就变成负数了。
如何解决数据不一致问题?
那么我们如何解决这个问题呢?导致这个问题的原因是,一个线程在对数据进行访问的时候,被其它线程干扰了,我在修改的时候你也在修改,我在读的时候你也在读,此时不就导致数据不一致问题了吗?那么对于共享数据的访问,我们必须要保证任何时候只有一个执行流,此时就要引入锁的概念了。
2、锁
线程库给我们提供了锁的函数可供我们使用。
1. 锁相关函数
(1)pthread_mutex_t是提供的一个定义锁的类型,可以用它来定义一把锁。
(2)pthread_mutex_init是初始化锁的一个函数,第一个参数是一个指针类型,直接把锁的地址传递过去即可。第二个参数是锁的属性,一般都用不到,传空即可 。
(3)pthread_mutex_destroy是释放锁的函数,把对应的锁的地址传过去即可。
(4)PTHREAD_MUTEX_INITIALIZER:它是一个宏,也是用来初始化锁的,但是只能初始化全局的锁。
在上述的抢票代码中,我们可以在main函数中定义一把锁,然后对锁初始化,之后在main函数结束的时候调用pthread_mutex_destroy释放锁,但是我们在主线程中,如何让其它线程拿到这把锁呢?我们可以在threadData这个类中,定义一把锁的指针,然后在主线程创建线程的时候将其传入。
class threadData
{
public:threadData(int number, pthread_mutex_t* mutex){lock = mutex;threadname = "thread" + to_string(number);}
public:string threadname;pthread_mutex_t* lock;
};int main()
{pthread_mutex_t mutex; // 定义一把锁pthread_mutex_init(&mutex, nullptr); // 初始化锁 vector<pthread_t> tids;// 创建一批线程for(int i = 1; i <= NUM; i++){pthread_t tid;threadData* td = new threadData(i, &mutex);pthread_create(&tid, nullptr, getTicket, td);tids.push_back(tid);}// 线程等待for(int i = 0; i < tids.size(); i++){pthread_join(tids[i], nullptr);}pthread_mutex_destroy(&mutex);// 释放锁return 0;
}
有了锁还不够,我们还需要加锁。线程库给我们提供了对应的函数。
2.加锁对应函数
(1)pthread_mutex_lock:参数是一把锁的地址,进行加锁操作。
(2)pthread_mutex_try_lock:是按照特定的方式进行加锁,一般情况下用不到它。
(3)pthread_mutex_unlock:参数同样是一把锁的地址,进行解锁操作。
那么,现在问题来了,在哪里加锁呢?一个tickets全局变量,多个线程访问它,出现了数据不一致问题,这个tickets就叫做临界资源,但是不是所有的代码都是临界资源呢?并不是的,只有这一小块区域,这块区域叫做临界区。那么加锁好不好呢?加锁是让线程从并发执行变成串行执行,串行访问就会导致执行时间变长,效率变低,因此加锁并不是一件好事,加锁的本质是用时间换安全。因此,我们加锁的时候要尽量表征临界区的代码,越少越好,如果加锁加的多了,那么串行执行的场景就变多了,就会影响速。
在上述抢票的代码中,if判断tickets就不是原子的,因此要从这个位置开始加锁,加锁之后还要解锁。这个地方else里面也需要解锁的逻辑,否则,退出的时候没人释放锁就出现死锁了。
void* getTicket(void* arg)
{threadData* td = static_cast<threadData*>(arg);const char* name = td->threadname.c_str();while(true){pthread_mutex_lock(td->lock); // 申请锁成功,才能往后执行,不成功,阻塞等待if(tickets > 0){usleep(1000); //模拟抢票花的时间cout << name << " get a ticket, " << tickets << endl;tickets--;pthread_mutex_unlock(td->lock); // 解锁}else{pthread_mutex_unlock(td->lock); // 解锁break;}}cout << td->threadname << " ... quit" << endl;return nullptr;}
问题:加锁可不可以在while循环外面?
是不可以的,因为如果在while外面加锁,那么就只有一个线程能进入到这个循环里,也就意味着让一个线程把票全抢光了这符合逻辑吗?
其次,加锁的时候要尽量减少临界区的代码量。
3、饥饿问题
接下来我们看看,加锁是否可以解决票减到负数的问题。
我们看上述代码的确解决了数据不一致问题,但是,有没有发现一个问题,怎么都是5号线程在抢票呢?这5号线程直接就把票全抢光了?这里其实 是因为线程对于锁的竞争力不同。我们再看这段代码。5号线程它刚把锁释放掉,其它线程还没来得及被唤醒,由于5号线程距离锁很近,释放完锁立马去申请锁,因此一直都是5号线程在抢票,此时,就导致其它线程长时间得不到数据,造成饥饿问题,我们在后面加一个sleep休眠一下,就可以解决这个问题。
下面是加了usleep之后的执行结果,并且,我们的代码是有逻辑问题的,因为在现实的抢票中,我们抢了一张票还会立即抢下一张吗?因此我们要执行后续动作的,我们可以顺便用usleep模拟后续的动作。
接下来有一个问题,每一个线程在进入临界区访问临界资源的时候,第一件事情是去申请同一把锁?那么锁本身不就是临界资源或者共享资源吗?锁是为了保护临界资源的,那么谁来保护锁的安全呢?不需要担心锁,因为申请锁和释放锁本身是原子的。
在临界区中线程可以被切换吗?可以被切换
临界区中的线程也是可以被切换的,临界区不也是代码吗?线程在执行到任何地方都有可能被切换。那么在临界区中被切换了锁怎么办呢?会出问题吗? 不会有问题,因为线程被切出去的时候是持有锁被切走的,只要我没执行unlock,照样没有人能进入临界区访问临界资源。此时外面的人(其它线程)只关心你有没有释放锁,你有没有锁。只要持有锁,你别管我在被切换还是被调度,你都进不来!因此当前线程访问临界区的过程,对于其它线程是原子的!
4、锁的原理
1.加锁
tikets--不是原子的,会变成三条汇编语句,原子:一条汇编语句就是原子的。
我们知道所有的代码最终要被CPU做处理都要先翻译成汇编指令,那么我们看看加锁的代码转成汇编是怎样的。下图就是加锁的汇编指令,第一条mov语句是将0读入到寄存器al中,第二条语句是把寄存器al中的内容和mutex的内容做交互,mutex的内容是1,然后入到if判断成功,表明该线程申请锁成功,然后返回,继续执行后续代码,如果没有判断成功,表明申请锁失败,就要挂起等待。那么就有问题了,这加锁这么多条指令,这也不是原子的呀?那是如何保证加锁的安全的呢?
接下来我们模拟一下线程申请锁的过程。
这个锁可以看成一个int变量1。
线程1来申请锁了,先执行move指令,把al寄存器的内容置为0,但不幸的是,此时线程1被切走了,线程1当然可以被切走,这么多语句你在执行任何一条的时候都有可能被切走,即使你是锁又有何不同呢?线程1被切走要把自己的上下文带走,就是说要把当前al寄存器中的内容0带走,变成自己的上下文。此时线程2来了,它运气非常好,move操作执行完了,xchgb交换操作,把mutex中的1和al中的0交换完毕,判定条件也执行完了,然后申请锁成功返回了。这时,mutex里面的内容就从1变成0了。然后线程1被调度,来了第一件事先恢复上下文,将0恢复到al中, 虽然寄存器只有一套,但是每一个线程都要有自己对应的寄存器的内容!上一次执行了第一步,接下来执行第二步交换操作,此时mutex是0,因此把mutex中的0和al中的0,进行交换,之后进行if判断判定寄存器内容是否大于0,当然条件不大于呀,于是就申请锁失败了,当前线程被挂起了。
在这几条指令中,最重要的是xchgb交换操作,这个交换的本质,就是把内存中的数据,交换到CPU寄存器中,mutex这把锁是所有线程共享的,把一个共享的锁,让一个线程以一条汇编的方式,交换到自己的上下文中,当前线程持有锁了!这个mutex中的1就如同一个单独的令牌一样,在每个线程间来回跳转,只有持有令牌的人,才能进入临界区执行代码。因此申请锁就是看谁运气好,先执行了xchgb交换操作。xchgb操作是原子的。
2.解锁
解锁的原子性重要吗?不重要,因为能解锁的线程是曾经加锁的线程,解锁的汇编语句如上图,最重要的就是movb操作,把1写入到mutex里,这里为啥在执行一次xchgb呢?把寄存器和mutex内容在交换一次不就可以了吗?如果是xchgb操作,就只有加锁的人才能解锁,因为那个1只在加锁的线程的上下文中。movb操作就可以让其它人也能解锁,如果未来死锁了,一个线程在持有锁期间异常了,异常就异常还把锁也带走了,这样可以让其它线程来解决死锁的问题。
5、封装锁
#pragma once
#include <pthread.h>class LockGuard
{
public:LockGuard(pthread_mutex_t* mutex) : _mutex(mutex){pthread_mutex_lock(_mutex);}~LockGuard(){pthread_mutex_unlock(_mutex);}
private:pthread_mutex_t* _mutex; //锁
};
初始化的时候自动加锁,释放的时候自动解锁。我记得mutex不是还要初始化析构的吗?这个地方不需要初始化释放吗?这个地方其实可以不用,因为我们可以使用一个全局的锁,用PTHREAD_MUTEX_INITIALIZER初始化,此时是不需要释放的。
6、抢票逻辑中加入封装的锁
#include <iostream>
#include <pthread.h>
#include <vector>
#include <string>
#include <unistd.h>
#include "lock.hpp"using namespace std;const int NUM = 5;struct ThreadInfo
{ThreadInfo(int num){_threadname = "thread: " + to_string(num);}string _threadname;};int ticket = 1000;pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;void* getTicket(void* argv)
{ThreadInfo* td = static_cast<ThreadInfo*>(argv);while(true){//加锁{LockGuard lock(&mutex); //初始化自动加锁,出了作用域析构自动解锁,RAII的锁if(ticket > 0){cout << td->_threadname << " get a ticket:" << ticket << endl;ticket--;}elsebreak;}usleep(13);}cout << td->_threadname << "quit..." << endl;return nullptr;
}int main()
{vector<pthread_t> tids;vector<ThreadInfo*> thread_datas;for(int i = 0; i < NUM; i++){pthread_t tid;ThreadInfo* td = new ThreadInfo(i + 1);pthread_create(&tid, nullptr, getTicket, td);tids.push_back(tid);thread_datas.push_back(td);}for(int i = 0; i < tids.size(); i++){pthread_join(tids[i], nullptr);}for(int i = 0; i < thread_datas.size(); i++){delete(thread_datas[i]);}return 0;
}
7、可重入VS线程安全
1.线程安全:多个线程并发执行同一段代码是,并没有出现问题
2.重入:同一个函数在被多个执行流调用时,一个执行流还没有执行完,就有其它执行流来执行该函数,此时就被称为重入。一个函数如果被重入了,还没有出现错误,该函数就被称为可重入函数,否则被称为不可重入函数。
3.常见的线程不安全的情况
- 不使用锁保护共享变量的函数
- 函数随着被调用,状态可能会发生变化的函数
- 返回指向静态变量指针的函数
- 在一个函数中调用其他线程不安全的函数。比如说在函数中调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的。
4.常见的线程安全的情况
- 每个线程执行不同的函数,或者执行同一个函数,但是该函数只有一些局部变量,而没有访问全局变量或者静态变量
- 每个线程对全局变量或者静态变量只有读取的操作,而没有写入的操作
- 函数或者类对于线程来说都是原子的操作
- 如果对临界资源的访问和一些线程不安全函数的的访问加上锁,并且正确的使用锁,没有什么数据不一致问题、死锁或者线程的不良竞争等问题,该函数就是线程安全的。
5.常见不可重入的情况
- 调用了malloc/free函数,或者使用了STL库提供的容器
- 调用了标准库I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
6.常见可重入的情况
- 不使用全局变量或静态变量
- 不使用malloc或new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都由函数的调用者提供
- 如果对临界资源的访问和不安全函数的访问加上锁,并且正确的使用锁,没有造成数据不一致问题或者线程崩溃,该函数就是可重入的
7.可重入与线程安全联系
- 如果函数是可重入的,那么线程是安全的
- 函数不可重入,就不能让多个线程使用,否则可能会引发线程安全问题,导致数据不一致
- 如果一个函数中有全局变量,那么这个函数既不是线程安全的也不是可重入的
8.可重入与线程安全的区别
可重入描述的是函数的特性,线程安全描述的是在使用线程是否安全,二者是不同的,只不过线程安全往往与可重入相关
- 可重入函数是线程安全的一种
- 线程安全不一定是可重入的,而可重入函数一定是线程安全的
- 如果对临界资源的访问和一些线程不安全函数的的访问加上锁,并且正确的使用锁,没有什么数据不一致、死锁或者线程的不良竞争等问题,该函数就是可重入的并且是安全的。
8、死锁
我们在写多线程代码的时候有的时候只会用到一个锁吗?有的时候可能会用到多个锁,当用到多个锁的时候就可能会发生死锁,比如说,有两个线程,各自申请了一个锁,然后彼此又去申请对方的锁,此时会发生什么情况?此时双方都会被阻塞住。只有一个锁的时候也有可能出现死锁,比如说我们写代码的时候,我们lock加锁了,但是解锁的时候写错了把unlock写成了lock,这也有可能产生死锁。
1.死锁四个必要条件
(1)互斥条件
一个资源只能被一个执行流使用。由于使用多线程加了锁,因此资源只能在一个时间被一个执行流使用,这个很好理解。
(2)请求与保持条件
一个执行流因请求资源而阻塞时,对已获得的资源保持不放。
讲个故事,小明和小王一起去超市买棒棒糖,可是那个棒棒糖需要1元钱,但是他们各自都只有5毛钱,于是小明就给小王说,你把你的5毛钱给我,我来给咋买个棒棒糖,小王就不乐意了,说你咋不把你的5毛钱给我,我来买个棒棒糖,于是双方争执不下,双方都想要对方的5毛钱,都是对自己的5毛钱都不释放,完蛋了,就这样卡主了。在这个场景中,小明和小王就相当于2个线程,他们各自持有的毛钱就相当于2把锁,他们各自都想要对方的锁,但是自己又不把自己的锁释放,于是它们都访问不了临界资源,临界资源就相当于那个棒棒糖,就阻塞住了。
(3)不剥夺条件
一个执行流已获取的资源,在未使用完之前,不能被强行剥夺。
还是上面的例子,小王对小明说,你不给我是吧,好,那我把你揍一顿,把你的5毛钱抢过来,这叫可以剥夺,不剥夺是你不能把我的5毛钱抢过来。我这个锁还没用完,你不能把我的锁直接给抢过去。
(4)循环等待条件
若干执行流之间形成一种头尾相接的循环等待资源的关系
2.避免死锁
如果要避免死锁,就要破坏4个必要条件之一,任意破坏一个就可以避免死锁。
(1)破坏互斥条件
互斥条件是一个资源只能被一个执行流使用,为什么呢?因为我们锁了呀,那让一个资源被多个执行流使用那就把锁取消掉就好了呀,把锁取消掉就不会有死锁了呀。的确可以避免死锁,但是我们使用锁是因为能解决问题,如果不用锁,那就要改动代码寻找其他解决方案了,这成本也太高了点,因此还是不太推荐。
(2)请求与保持条件
一个执行流因请求资源而阻塞时,对已获得的资源保持不放。小明一看,小王也不把5毛钱给我,我拿5毛钱也买不到棒棒糖,还不如把5毛钱给小王,让小王先往后走,这叫请求与不保持条件。那拿如何实现请求与不保持条件呢?
pthread_mutex_lock是如果申请锁失败了,就阻塞在这里等待
pthread_mutex_trylock是申请锁的非阻塞版本,线程1和线程2都想要两把锁,双方都采用trylock的方式申请锁,线程1申请到了第一把锁,线程2申请到了第2把锁,然后线程1申请线程2的锁,但是申请不到,于是直接返回,然后把自己的锁释放了,此时线程2就能申请到了。
(3)不剥夺条件
一个执行流已获取的资源,在未使用完之前,不能被强行剥夺。
之前如果得不到锁,要么死等,要么把自己的资源释放,这是不剥夺,剥夺是什么,剥夺就是让对方把锁释放。
(4)循环等待条件
若干执行流之间形成一种头尾相接的循环等待资源的关系
这个条件就是形成了一个环,那破坏环就可以了,如何破坏呢?我们通过编码可以完成,我先得到1个锁,对方得到1个锁,我们都要2把锁,让后我申请对方的锁,对方申请我的锁,然后死锁了,关键为啥会这样呢?为什么不让2个锁同时申请锁1,然后在同时申请锁2呢?线程2和线程2按照顺序同时申请锁1、锁2,不要交叉着来,那也不会存在环路了。
避免死锁
- 破坏死锁的四个必要条件
- 加锁顺序一致:不要交叉申请锁
- 避免锁未释放的场景:加锁之后要及时释放
- 资源一次性分配:一次就把该线程要的资源全都分配好,比如你要2个锁直接就给你2个锁
二、同步
1、什么是同步
现在有一个超级VIP自习室,有多V呢?这个自习室只能有一个人进去,墙上挂着一把钥匙可以打开门,然后小明起的比较早,6点就去自习室了,拿着钥匙打开了门,并把钥匙带进了自习室里,之后学习到早上7点饿了,想出去吃早餐,打开门出去,把钥匙挂到门上,结果看到外面乌泱泱一群人,心想我要是走了下次回来的时候猴年马月才能进来,然后小明说算了,转身又把墙上的钥匙拿走了,在自习室学习了2、3分钟又坐不住了,然后又出去又回来,拿钥匙放钥匙,然后一天就干了这事,由于小明离锁最近,伸手就拿到了,其它人还要走两步才能拿到,因此别人都抢不过小明,可是如果小明在自习室里看了会书、写了会代码还好,可是它啥都没做,又出去又后悔了又回去。这就会导致一个问题,外面的人想拿到钥匙,但是小明的竞争力比较强,频繁的申请钥匙,就会导致外面的人长时间得不到锁资源,导致饥饿问题。因此,如果锁分配不够合理,容易导致饥饿问题。就如同抢票一般,不过不是说有互斥就必有饥饿,如果我们把抢票的逻辑写完全了是不会有饥饿问题的,只要该场景适合互斥,就用互斥。
此时有一个观察员在看着这一切,他发现这确实是有问题的,小明在自习室里既没有创造任何价值,还导致外面一大批人在外面等着,此时,这个观察员就制定了几条规则,假设小明真的走了,小明把钥匙挂墙上,外面这批人怎么办呢?难道一窝蜂涌上来都来抢钥匙吗?这是不合理的,一窝蜂涌上来就意味着操作系统要把所有等待的线程唤醒,可是最终胜利者只能有一个,那么其它的线程不就白白唤醒了吗?因此观察员制定的规则是:
1.外面的人,必须排队
2.出来的人,不能立马重新申请锁,必须拍到队列的尾部
也就意味着让所有的线程(人)获取锁(钥匙),按照一定的顺序,这种在保证数据安全的情况下,按照一定顺序获取资源,从而有效避免饥饿问题,叫做同步。
2、如何实现同步
我们现在想要让线程按照一定的顺序获取资源,那么我们应该如何实现同步呢?我们需要通过条件变量来完成同步。
那么什么是条件变量呢?
现在小明去自习室仔细,拿到钥匙之后进入到自习室,自习了一会出来了,把钥匙挂到了墙上,钥匙纯互斥,可能就会又申请钥匙,但是此时对不起,你出来了不能立刻申请锁,你必须去等待队列里排队,排队的时候要敲一下铃铛,表明你要去排队,而且现在自习室没人,就可以唤醒一个或者全部线程去自习室申请资源,条件变量是什么?条件变量就如同那个铃铛一般,要给我们提供通知的机制,还要给我们提供一个队列,让不就绪的线程去等待队列里等待。那为什么要去排队?是因为曾经申请锁失败了才要去队列里排队,因此条件变量的使用一定要配合锁。
3、条件变量
(1)创建条件变量
pthread_cond_t cond; // 创建了一个cond的条件变量
(2)初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
attr);参数: cond:要初始化的条件变量attr:NULL
也可以使用PTHREAD_COND_INITIALIZER初始化全局的条件变量
(3)销毁
int pthread_cond_destroy(pthread_cond_t *cond)
(4)加入到等待队列
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数: cond:要在这个条件变量上等待
mutex:锁
(5)唤醒等待队列线程
int pthread_cond_broadcast(pthread_cond_t *cond); //唤醒队列中所有线程
int pthread_cond_signal(pthread_cond_t *cond); // 唤醒队列中1个线程
条件变量的接口和mutex的接口是非常类似的,应该很好理解,接下来我们就简单写一个同步的小栗子。
#include <iostream>
#include <unistd.h>#include <pthread.h>
using namespace std;int cnt = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void* Count(void* arg)
{pthread_detach(pthread_self());uint64_t number = (uint64_t)arg;while(true){pthread_mutex_lock(&mutex);// 这里不考虑临界资源的状态// 每个线程进来直接进入阻塞队列,等待主线程唤醒pthread_cond_wait(&cond, &mutex); //为什么条件变量要在锁的里面呢?1.pthread_cond_wait让线程等待的时候,会自动释放锁cout << "pthread: " << number << ", cnt: "<< cnt <<endl;cnt++;pthread_mutex_unlock(&mutex);sleep(1);}
}int main()
{for(uint64_t i = 0; i < 5; i++){pthread_t tid;pthread_create(&tid, nullptr, Count, (void*)i);}cout << "main ctrl begin..." << endl;while(true){pthread_cond_signal(&cond);//唤醒队列中的一个线程,一般是头部线程//pthread_cond_broadcast(&cond);sleep(1);}return 0;
}
执行程序我们可以看到这5个线程按照一定的顺序对临界资源cnt进行了访问,上述代码,我们创建了5个线程,每个线程访问临界资源的时候,进入等待队列,然后主线程依次唤醒队列中的每个线程,就实现了同步。
问题1:在主线程创建线程传参的时候可不可以把i的地址传进去?
不可以,因为,如果把i的地址传进去,这个i就变成了主线程和所有线程共享的临界资源,主线程把i修改会影响其他线程的,因此这里只能以传值的方式,给每个线程拷贝一个,在抢票哪里是new出来的,给每个线程分配一个堆资源,这是没问题的,因为主线程并不会对堆的资源进行访问。
问题2: 我们是带着锁之后进入等待队列的啊,那不就意味着就只有一个线程能进入等待队列吗?你把锁都拿走了,那其他线程怎么进来的呢?
你猜为什么等待的时候需要传递一个锁进去呢?线程在进入等待队列的时候会把锁释放掉,因此是不会有问题的。
问题3:为什么先申请锁在进入等待队列?
这里要问一个问题,我们为什么要让线程进入等待队列呢?原因是临界资源不就绪,临界资源也是有状态的!那我们怎么知道临界资源不就绪呢?是判断出来的,那判定是原子的吗?不是,那需不需要加锁呢?需要,因此是先判断临界资源是否就绪,然后在看是否要进入临界资源,要判定临界资源是否就绪就需要加锁,因此,要先申请锁在进入等待队列。
4、生产消费者问题
(1)CP问题
生产消费者问题是一种互斥和同步的应用场景,那么什么是生产消费模型呢?
比如说在日常生活中,我们去超市买东西,我们就是消费者,那么超市是生产者吗?超市不是生产者,生产者是供货商,供货商把商品放到超市里,然后消费者去超市里购买,那为啥要有超市呢?消费者直接去工厂买不行吗? 消费者是一个个来的,那供货商一个个生产吗?可能光开启一下浪费的人力物力都不是一点点了,那工厂可以一次生产一大批商品呀,那工厂不就既是生产者又是超市的角色了吗?而且你去工厂买东西效率高吗?存在超市的原因是效率高,为啥效率高,因为对于生产者来讲不是用户需要一包我生产一包,而是直接把一大批商品存放到超市里, 而对于消费者来说,也不需要给工厂打电话等工厂生产完了去买,不需要等待。其次,放在超市里效率高的原因是,超市相当于一个大的缓存,即使供货商放在超市里的商品暂时卖不完,没有关系,先暂时存放在超市,等消费者需要的时候去超市拿就行了。而且当过年的时候,工人都是要放假的,你不给工人放1个月的假吗?放假怎么生产呢?但是消费者是有购买年货的需求的,这不就矛盾了吗?那么工厂可以提前生产一大批商品投入超市,然后工人回家过年,消费者再去购买,此时由于有超市的存在,可以支持忙闲不均,生产者可能生产很快消费者消费很慢,但不影响,有缓存。
由于有了超市,可以让生产和消费的行为,进行一定程度解耦,供货商生产商品和消费者是没有关系的,消费者如何对商品做处理供货商关心吗?根本不关心,由于解耦就可以很好的支持并发。
那么我们把上述抽象出来,超市是什么?不就是一个特定结构的内存空间吗?而生产者和消费者呢?可以抽象成2个线程,商品是什么呢?就是数据呀,因此生产消费者模型就是2个执行流在做通信。由于超市是供货商和线程都要访问的,那么这个超市是一种什么资源呢?是一种共享资源,多个线程访问共享资源,会有并发问题,什么并发问题呢?并发问题的本质就是在研究角色和角色之间的关系,如果想清楚了生产者、消费者之间的关系,该怎么加锁就很清楚了。
生产者VS生产者:互斥
生产者和生产者之间什么关系呢?肯定是竞争啊,同行是冤家,竞争关系就是互斥
消费者VS消费者:互斥
消费者和消费者是竞争关系,消费者把一个数据拿走了,另外一个消费者就没的拿了,但是,我在超市买东西的时候也没见有人和我抢呀,如果世界末日了,超市里只有一包方便面,你和你的舍友都很饿,你和舍友去超市看到这一包方便面会发生什么事?在现实世界中之所以没出现这种情况是因为资源太多了,所以互斥问题没有凸显。
生产者VS消费者:互斥、同步
生产者和消费者什么关系?生产者只有先生产了数据,消费者才能拿数据,因此是有一个先后顺序的,这种先后顺序就是一种同步关系。如果生产者把商品放到超市货架上的时候,消费者就把伸手来拿了,这是拿走了还是没拿走呢,因此要确保生产者和消费者之间的互斥,就是说生产者在放一个商品的时候,你不能立刻来拿,你要等生产者放好之后你在去拿。生产者和消费者都需要访问即临界资源,因此必须确保在同一时间内只有一个线程(生产者或消费者)能够访问该资源。
如何快速记住生产者消费者模型呢?321原则
3种关系,2种角色--生产和消费,1个交易场所--特殊结构的空间
生产消费者模型优点:
1.支持忙闲不均
2.生产和消费解耦
(2)基于阻塞队列快速实现CP
接下来我们实现一个基于阻塞队列的1个生产者1个消费者模型。当队列为空时,消费者从对头获取元素的操作将会被阻塞,直到队列中被放入了元素,当队列为满时,生产者往队列里放元素的操作也会被阻塞,直到有队列中的元素被取出。
BlockQueue
#pragma once#include <queue> #include <iostream> #include <pthread.h>template <class T> class BlockQueue {static const int defaultnum = 5; // 队列的最大容量 public:BlockQueue(int maxcap = defaultnum):_maxcap(maxcap){pthread_mutex_init(&_mutex, nullptr);// 初始化锁pthread_cond_init(&_pcond, nullptr); // 初始化生产条件变量pthread_cond_init(&_ccond, nullptr); // 初始化消费者条件变量}void Push(const T& in){pthread_mutex_lock(&_mutex);while(_q.size() == _maxcap) // 你想生产就能生产吗?队列的容量 != maxcap时才可以啊{//while判断防止伪唤醒pthread_cond_wait(&_pcond, &_mutex);// 如果队列满了,生产线程阻塞,等待消费者消 // 费完数据唤醒}_q.push(in);pthread_cond_signal(&_ccond);//生产了一个数据,唤醒消费者,让消费者来消费pthread_mutex_unlock(&_mutex);}T Pop(){pthread_mutex_lock(&_mutex);while(_q.empty()) // 你想消费就能消费吗?如果队列为空,你就去等待队列里等待去吧{//while判断防止伪唤醒pthread_cond_wait(&_ccond, &_mutex); // 如果队列为空,消费线程阻塞,等待生产现 // 在生产完数据唤醒}T out = _q.front();_q.pop();pthread_cond_signal(&_pcond); //消费了一个数据,唤醒生产者,让生产者生产pthread_mutex_unlock(&_mutex);return out;}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}private:std::queue<T> _q; // 共享资源int _maxcap; // 队列的最大容量pthread_mutex_t _mutex; // 锁pthread_cond_t _pcond; // 生产者条件变量pthread_cond_t _ccond; // 消费者条件变量};
单生产单消费
#include <iostream> #include <unistd.h> #include "blockqueue.hpp" #include <pthread.h> using namespace std;void* Consume(void* arg) {BlockQueue<int>* q = static_cast<BlockQueue<int>*>(arg);while(true){// 消费int data = q->Pop();cout << "成功消费一个数据: " << data << endl;}}void* Produce(void* arg) {BlockQueue<int>* q = static_cast<BlockQueue<int>*>(arg);int data = 0;while(true){// 生产sleep(1);q->Push(data);cout << "成功生产一个数据:" << data << endl;data++;} }int main() {pthread_t p; // 生产者线程pthread_t c; // 消费者线程BlockQueue<int>* bq = new BlockQueue<int>();pthread_create(&p, nullptr, Produce, bq);pthread_create(&c, nullptr, Consume, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;}
要实现多生产多消费非常简单,直接搞多个生产者和消费者线程就行
(1)运行结果——生产者比消费者慢
当生产者比消费者慢的时候,因此消费者刚开始被阻塞了,当生产者生产一个之后消费者才能去消费,因此可以看到生产者生产了一个消费者也就拿了一个,这两个线程谁先执行知道吗?不知道到,但一定是生产者正式执行,因为消费者即使执行由于队列为空,它只能阻塞住。
(2)运行结果——消费者比生产者慢
这个地方打印的时候出现打印干扰是很正常的,因为两个线程往显示器文件里打印,这个显示器文件就是共享资源, 并没有锁,所以会出现干扰,我们可以看到,生产者生产了5个,消费者只能从0、1、2一个个消费。
这个地方我们生产消费的是一个整数,生产消费者是可以对任务进行处理的,因此我们可以在写一个计算任务,交给我们的生产消费者两个线程,对任务做处理。
(3)伪唤醒
我们看到,我们在判断临界资源状态的时候用了while循环,注释里写是为了避免伪唤醒因此用了while,那么我们怎么理解伪唤醒呢?假设现在队列为满,生产者没办法消费了,因此来的线程都要去条件变量的队列中等待。但此时消费者是可以消费的,当消费者消费了一个之后会唤醒生产者继续生产,如果此时生产者不是使用的pthread_cond_signal唤醒的一个线程,而是使用pthread_cond_broadcast唤醒了生产者对列中的所有线程,唤醒之后需要重新持有锁,因此这被唤醒的多个线程就会对锁展开竞争,最终肯定只有一个线程能拥有锁往后执行,于是拥有锁的那个是继续往后执行push数据,push数据之后队列为满了,之后唤醒消费者让消费者消费,但是,如果消费者没有抢到这个锁,而是被刚刚那些唤醒的生产者线程把锁抢到了呢?那些被唤醒的生产者线程不在条件变量下等了,而是在这把锁等,因为刚刚生产者线程把锁释放了,此时一个生产者线程立刻持有了锁,继续往后push数据,但此时队列的容量是满的呀,所以此时就产生了一种伪唤醒或者误唤醒的情况。怎么防止伪唤醒呢?while判断,如果一个线程被唤醒了,你先别着急往后走,先在去判定一下,如果的空间没满你再去生产,如果满了重新进入休眠状态。
(3)CP处理计算任务
计算任务
#pragma once #include <string>class Task { public:Task(int data1, int data2, char op):_x(data1),_y(data2),_op(op),_result(0),_exitcode(0){}void run() 计算操作{switch(_op){case '+':_result = _x + _y;break;case '-':_result = _x - _y;break;case '*':if(_x != 0)_result = _x * _y;else{_result = 0;_exitcode = 1;}break;case '/':if(_x != 0)_result = _x / _y;else{_result = 0;_exitcode = 1;}break;}}void operator()() // 仿函数,方便调用计算结果{run();}std::string getResult() // 返回相应的结果{std::string ret = std::to_string(_x);ret += _op;ret += std::to_string(_y);ret += '=';ret += std::to_string(_result);ret += ' ';ret += "exitcode:";ret += std::to_string(_exitcode);return ret;}std::string getTask() 返回得到的任务{std::string ret = std::to_string(_x);ret += _op;ret += std::to_string(_y);ret += '=';ret += '?';return ret;}private:int _x;int _y;char _op;int _result;int _exitcode;};
#include <iostream> #include <unistd.h> #include "blockqueue.hpp" #include <pthread.h> #include "task.hpp" #include <ctime> using namespace std;string opers = "+-*/";void* Consume(void* arg) {BlockQueue<Task>* q = static_cast<BlockQueue<Task>*>(arg);while(true){// 模拟消费者消费数据auto t = q->Pop();t(); // 对数据做计算string result = t.getResult(); // 获取结果cout << "消费了一个任务:" << result << endl;}}void* Produce(void* arg) {BlockQueue<Task>* q = static_cast<BlockQueue<Task>*>(arg);while(true){sleep(1);// 模拟生产者生产数据int x = rand() % 10;int y = rand() % 10;char op = opers[rand() % 4];Task t(x, y, op);cout << "生产一个任务:" << t.getTask() << endl;q->Push(t);} }int main() {pthread_t p; // 生产者线程pthread_t c; // 消费者线程BlockQueue<Task>* bq = new BlockQueue<Task>();pthread_create(&p, nullptr, Produce, bq);pthread_create(&c, nullptr, Consume, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;}
运行结果
(4)再看CP问题
我们不仅要看到生产者消费者和仓库,还要看到之外的东西——生产者获取数据,消费者加工数据。
生产者的数据哪里来啊?上面的数据使我们自己模拟的,生产者的数据一般都要通过网络获取或者用户获取。因此生产者要先获取数据,然后才生产数据。
消费者消费之后获取数据完了吗?不是的,消费者还要对数据做处理加工的。因此消费者要消费数据在加工处理数据。
生产消费模型为什么高效?
生产过程要加锁,消费的时候也要加锁,在访问仓库的时候是串行的,哪里体现高效了呢? 如果一个线程正在仓库生产数据,其他线程的确不能生产了,但它可不可以获取数据?获取数据也是要花时间的哦。生产者正在互斥式的访问仓库生产数据,消费者也不消费,消费者有没有可能在加工处理数据呢?因此如果我们引入生产者获取数据消费者加工处理数据,那么此时不就是并发执行的吗?只不过一些线程访问临界区代码,一些线程访问非临界区代码,这里线程不就高效并发起来了吗?因此,生产消费者才是高效的。
临界区也是有状态的
我们每一次都需要先判断临界资源是否就绪,如果就绪了我们在继续往后走,如果不就绪我们就去等待队列中等待 ,判定临界资源是否就绪不是原子的,因此我们需要放在锁的里面,下面,有没有更优雅的做法呢?接下来进入信号量。
5、信号量
在上述生产者消费者模型中,这个共享资源q是被当做一个整体来使用的,因此我们需要加锁来确保安全性,我们可不可以把这个资源q切成几个不同的区域。现在有一个数组,数组中有300个元素,现在有3个线程,我让这3个线程分别只能访问前100个、中间100个、后100个元素,这样虽然数组是个全局数组,但我们访问的是这个数组的不同区域,因此这3个线程可以并发访问这个数组的不同区域吗?当然可以呀,但是要来第4个线程呢?这个数组我只划分了3份但过来了4个线程,那我怎么确保数据安全呢?我只能让你3个线程来访问,哪3个呢?不重要,只要是3个都行,因此这个数组的资源把它当做整体来用直接加锁,只允许一个线程进来。但我们如果把它分成几份,那最多就允许几个线程进来,为了更好的保护临界资源就需要引入信号量进来,你说分成3份就分成3份吗?最多允许多少个线程进来,就由信号量来决定。
(1)什么是信号量
信号量本质是一把计数器,描述临界资源中资源数目的多少。
当我们去电影院买票的时候,电影院不就相当于一个共享资源吗?一个个的座位不就是电影院这个共享资源整体,被划分成了很多份吗?而信号量就相当于电影票的张数。是我买了票这个座位就是我的,还是我要坐在那个座位上那个座位才是我的呢?买了票就是,我买了票,我可以不去,但那个座位要给我留着,因此,买票本质上是对电影院资源的预定机制。我们要去电影院要先去买票,同理我们要访问临界资源,它的临界资源被分成很多,我们可以定义一把信号量,让每个线程去访问临界资源的某一个,先别急着访问,先去竞争信号量,只要你申请成功,未来一定会有一个资源给你。那信号量不就是共享资源吗?因此信号量是原子的!当成功申请一个资源,资源就少1个,因此信号量就要--,当释放一个资源,信号量就多1个,因此信号量就要++,因此++和--操作也必须是原子的!对信号量++叫做P操作,对信号量--叫做V操作。因此信号量是一个保证PV操作原子性的计数器。
信号量的本质是一把计数器,那计数器的本质是什么呢?用来描述资源数目,把资源是否就绪放在了临界区之外,申请信号量的本质,间接的已经在做判断了!
上面我们在进入临界区的时候还需要判定一下临界资源是否就绪,那现在PV操作之间需要判定吗?不需要,因为只要申请成功了就一定有你的,申请不成功你就去信号量去等待。
(2)信号量的使用
(1)定义信号量
sem_t xxx; // sem_t是一个整形,可以定义信号量变量。
(2)初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);参数: pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值
(3)销毁信号量
int sem_destroy(sem_t *sem);
(4)等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); // P操作
(5)发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。int sem_post(sem_t *sem); //V操作
(3)基于环形队列的生产消费模型
环形队列刚开始head和tail执行同一个位置,每次入数据往head入,然后head往后走,当走到队列尾部的时候又会回到起始位置,但出数据的时候往tail的位置出数据,然后让tail往后走,我们可以用数组模拟,当走到数组尾部的时候,直接%数组的长度,就会回头头部了 ,但是这时有一个问题,为空和为满的时候tail和head指向同一个位置,我咋知道是空还是满呢?
1.此时可以搞一个计数器cnt,每生产一个元素让cnt++,每消费一个元素让cnt--,当head和tail指向同一个位置的时候,如果cnt为0,当前为空,否则当前铁不为空。
2.还有一种方法就是空出来一个位置,比如说要一直生产,那就看head的下一个位置是不是tail如果不是继续入数据,如果是就为满了。通过空一个位置判定是否为满。
但是这个地方我们直接就可以用信号量判空判满,为空或者为满不就是某种资源不存在了吗?比如说当为满的时候就表明空间信号量不存在了,此时没有空间了,如果是为空,不就是此时数据资源为0。
生产者和消费者刚开始都在同一个位置, 但是没有关系,刚开始只有谁能先往后走呢?只有生产者能往后走,因为消费者没有数据给它消费呀。生产者最关心的是什么资源?是当前有多少个空间给我放数据,消费者关心什么资源呢?还有多少数据可以让我取,因此我们可以定义2个信号量,SpaceSem表示还有多少剩余空间,DataSem表示还有多少剩余数据。当生产者P操作生产了一个之后,是不是数据就多1个呀,此时我们不要V自己,要V的是数据信号量DataSem,让数据多一个,此时消费者就可以消费了,同理,消费者在P操作过后,空间就多出来一个,因此要V空间信号量SpaceSem 。如果生产者一直生产,消费者就是不消费,然后把空间使用完了此时生产者和消费者又在同一个位置哦,请问,如果满了会发生什么,满了就只能等消费者消费!因为生产者的空间已经使用完了,此时P操作就会把它挂起,需要等消费者消费让空间多出来。
因此只有2种情况才会指向同一个位置,空或者满,如果是空,必然生产者现在。如果是满,必然消费者先走!
当不空和不满的时候,我们一定指向不同的位置,我们可以同时访问!
环形队列RingQueue
//信号量的本质是一把计数器,那么这把计数器的本质是什么??用来描述资源树数目的,把资源是否就绪放在临界区之外,申请信号量时,其实就已经在做判断了 环形队列 #pragma once #include <semaphore.h> #include <sys/types.h> #include <pthread.h> #include <iostream> #include <unistd.h> #include <ctime> #include <vector> using namespace std;const int defaultcap = 5;template<class T> class RingQueue { private:void P(sem_t& sem){sem_wait(&sem);}void V(sem_t& sem){sem_post(&sem);}void lock(pthread_mutex_t& mutex){pthread_mutex_lock(&mutex);}void unlock(pthread_mutex_t& mutex){pthread_mutex_unlock(&mutex);}public:RingQueue():_ringqueue(defaultcap),_cap(defaultcap),c_step(0),p_step(0){sem_init(&c_data_sem, 0, 0); //消费者关注数据资源,刚开始数据为0,第二个参数是是否共享,设置为0即可sem_init(&p_space_sem, 0, _cap);//生产者关心空间资源,刚开始空间为defaultcappthread_mutex_init(&c_mutex, nullptr);pthread_mutex_init(&p_mutex, nullptr);}void Push(const T& in) //生产{P(p_space_sem); //要有空间你才能生产,P操作本身就是原子的,不需要被加锁保护lock(p_mutex); // 多生产多消费需要加锁,单生产单消费不需要_ringqueue[p_step] = in;//位置后移,维持环形特性p_step++;p_step %= _cap; unlock(p_mutex);V(c_data_sem); //数据多了一个}void Pop(T* out) //消费{P(c_data_sem); //有数据才能消费lock(c_mutex); // 多生产多消费需要加锁,单生产单消费不需要*out = _ringqueue[c_step];//位置后移,维持环形特性c_step++; //消费者往后走,表明c_step前面的位置已经被消费,可以被覆盖了c_step %= _cap;unlock(c_mutex);V(p_space_sem); //空间多出来一个}~RingQueue(){sem_destroy(&c_data_sem);sem_destroy(&p_space_sem);pthread_mutex_destroy(&c_mutex);pthread_mutex_destroy(&p_mutex);}private:vector<T> _ringqueue;int _cap; //环形队列最大容量int c_step; //当前消费者所在位置int p_step; //当前生产者所在位置sem_t c_data_sem; //消费者关注数据资源sem_t p_space_sem; //生产者关注空间资源pthread_mutex_t c_mutex; //多生产多消费需要加锁,单生产单消费不需要pthread_mutex_t p_mutex; };
单生产单消费
string opers = "+-*/%";void* Producer(void* args) {RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);while(true){//1、获取数据int num1 = rand() % 10;int num2 = rand() % 10;int len = opers.size();char oper = opers[rand() % len];Task task(num1, num2, oper);//2、生产数据rq->Push(task);cout << "producer a task: " << task.getTask() << endl;sleep(1);}}void* Concumer(void* args) {RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);while(true){//消费数据Task task;rq->Pop(&task);task();cout << "concumer a task: " << task.getResult() << endl;sleep(1);}}//单生产单消费 int main() {srand(time(nullptr));RingQueue<Task>* rq = new RingQueue<Task>();pthread_t c, p;pthread_create(&c, nullptr, Concumer, rq);pthread_create(&p, nullptr, Producer, rq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;}
多生产多消费
struct ThreadData {RingQueue<int>* rq;string threadname; };//多生产多消费void* Producer(void* argv) {ThreadData* td = static_cast<ThreadData*>(argv);RingQueue<int>* rq = td->rq;string name = td->threadname;while(true){//1、获取数据int data = rand() % 10 + 1;//2、生产数据rq->Push(data);cout << "producer data done, data: " << data << " who: " << name << endl;} }void* Consumer(void* argv) { //sleep(5);ThreadData* td = static_cast<ThreadData*>(argv);RingQueue<int>* rq = td->rq;string name = td->threadname;while(true){//1、消费数据int data = 0;rq->Pop(&data);cout << "consumer a data, data: " << data << " whoe: " << name << endl;sleep(1);//2、处理数据TODO}}int main() {srand(time(nullptr) ^ getpid());RingQueue<int>* rq = new RingQueue<int>();pthread_t c[5], p[3];for(int i = 0; i < 3; i++){ThreadData* td = new ThreadData();td->rq = rq;td->threadname = "producer-" + to_string(i);pthread_create(p + i, nullptr, Producer, td);}for(int i = 0; i < 5; i++){ThreadData* td = new ThreadData();td->rq = rq;td->threadname = "consumer-" + to_string(i);pthread_create(c + i, nullptr, Consumer, td);}for(int i = 0; i < 5; i++){pthread_join(c[i], nullptr);}for(int i = 0; i < 3; i++){pthread_join(p[i], nullptr);}return 0; }
多生产多消费是需要加锁的!比如多个生产者同时访问申请信号量,然后就有可能对同一个位置进行写入,此时后写入的就把先写入的覆盖掉了。因此需要加锁,确保只有一个生产者可以进入临界区