【Linux】详解线程第三篇——线程同步和生产消费者模型

线程同步和生消模型

  • 前言
  • 正式开始
    • 再次用黄牛抢票来讲解线程同步的思想
    • 通过条件变量来实现线程同步
      • 条件变量接口介绍
        • 初始化和销毁
        • pthread_cond_wait
        • signal和broadcast
    • 生产消费者模型
      • 三种关系
      • 用基本工程师思维再次理解
      • 基于生产消费者模型的阻塞队列
        • 版本一
        • 版本二
          • 多生多消
        • 利用RAII来对锁进行优化

在这里插入图片描述

前言

本篇线程同步的内容是完全基于线程互斥来讲的,如果屏幕前的你对于线程互斥还不是很了解的话,可以看看我上一篇博客:【Linux】详解线程第二篇——用黄牛抢陈奕迅演唱会门票的例子来讲解【 线程互斥与锁 】

正式开始

上篇线程互斥中重点讲了互斥锁,虽然解决了多线程并发导致的临界资源不安全的问题,但是还存在一个比较重要的问题:访问临界资源合理性的问题。

再次用黄牛抢票来讲解线程同步的思想

再举一下我上篇博客中黄牛抢票的例子。

上一篇博客的例子中,只有黄牛和票这两个元素,对应的就是线程和临界资源,既然互斥锁已经讲了,那么就能多一个锁这个元素了,也就可以理解为多了一个售票员。所以这里一共三个主要元素:黄牛(线程)、票(临界资源)、售票员(锁)。

前面博客中我未加usleep对黄牛买票进行限制的例子中,出现了一个黄牛将所有票抢完的例子,也就是说整个流程中,只有一个线程对临界资源进行了访问,其他的线程虽然想要访问临界资源但是都没有访问到,这种情况不能说有错误,只能说设计的不合理,会造成其他线程的饥饿问题,一个人把所有的活全包了,其他人挣不到钱,就没饭吃了,虽然这个例子有点极端。但是确实是一个问题。

还有一个问题,假如此刻票被抢完了,但是票卖完后隔一段时间还会再次补票,但是无法确定票补充的时间,会在随机时刻进行补发(临界资源未准备就绪,但有可能随意时刻准备好)。这样的话,所有的黄牛都想抢票,但是都不知道什么时候会补票,于是所有的黄牛无时无刻都在问售票员票是否补发,这样的话,就会很浪费所有黄牛和售票员的时间,也即浪费所有线程向锁申请资源的时间,导致运行效率下降。虽然没做错,但是不合理。

上面这两个问题都是访问临界资源的合理性的问题。而引入线程同步就是为了解决这个问题。

想一想我们现实生活中是怎样售票的,当票卖完后,我们不需要一直询问售票员是否有票,只要等待通知即可,比如说12306,补到票了会通知你,当你进行补票的时候,是会排队的,也就是为什么你点击补票会显示当前人数是多还是中等还是少的信息。如果你补的比较早,那么你排队就会靠前一点,同理,补得比较晚,就会排的靠后一点。这里最重要的一点就是排队,使得整个流程有了一定的顺序。也就是让所有的线程按照一定的顺序去访问临界资源。这里排队可以解决一部分问题,也就是不断询问是否有票的问题。

那么还有一个黄牛将所有的票抢完的问题。再拿12306来说,我们每个人只能买一张票,如果想买多张,就要多给一个乘员信息,比如说你给你自己买了票,还想给你朋友买票的话,得要你朋友的身份证号码等信息。也就票和人是一对一的。那这里搞得极端一点,规定每次让黄牛只能买一张票,买完票后禁止继续买票,再加上上面的队列,如果卖完票的黄牛还想再买票,就必须去队尾重新排队购买。

上面其实已经把线程同步的思想讲解出来了,不过都是以黄牛的角度来说的,这里改口成线程来总结一下:

  1. 所有的线程想要访问临界资源时,都必须排队。
  2. 访问完临界资源的线程,若想要继续对临界资源进行访问,就必须跑到队尾等待其前面的线程访问完才能轮到当前线程。

这样让访问临界资源的线程,按照一定的顺序进行临界资源的访问,就是线程同步最重要的思想。

那么如何实现线程同步呢?

通过条件变量来实现线程同步

我们在申请临界资源前,先要做临界资源是否存在的检测,而检测也是需要访问临界资源的,那么对临界资源的检测也是一定需要在加锁和解锁之间的。

我把前面博客中的的例子改一改,加上补票机制:

#include <iostream>
using namespace std;#include <pthread.h>
#include <unistd.h>
#include <ctime>#include <string>// 线程数据,包含线程名字,也就是黄牛名字,还有线程对应的互斥锁
class ThreadData
{
public:ThreadData(const string& name, pthread_mutex_t* pmtx):_name(name),_pmtx(pmtx){}public:string _name;pthread_mutex_t* _pmtx;
};// 黄牛人数
#define THREAD_NUM 5// 陈奕迅演唱会的票数tickets
int tickets = 1000;// 黄牛抢票执行的方法
void* getTicket(void* arg)
{ThreadData* ptd = (ThreadData*)arg;// 每个黄牛疯狂抢ticketswhile(1){pthread_mutex_lock(ptd->_pmtx);if(tickets > 0){usleep(rand() % 2000);tickets--;printf("%s抢到票了, 此时ticket num ::%d\n", ptd->_name.c_str(), tickets);// 当对临界资源访问完后就解锁pthread_mutex_unlock(ptd->_pmtx);}else{// 当对临界资源访问完后就解锁,这里是当tickets == 0的情况,也要解锁pthread_mutex_unlock(ptd->_pmtx);printf("%s没抢到票, 票抢空了\n", ptd->_name.c_str());// 这里不再让黄牛抢不到票就退出,而是继续检查是否有票}// 抢到或没抢到票都执行一下后续动作,这里直接用usleep替代usleep(rand() % 5000);}// 记得要释放掉线程数据,不然内存泄漏delete ptd;return nullptr;
}int main()
{// 局部锁pthread_mutex_t mtx;// 默认给空就行pthread_mutex_init(&mtx, nullptr);// 种一颗随机数种子srand((unsigned int)time(nullptr) ^ getpid() ^ 0x24233342);// 假设此时有三个黄牛进行抢票pthread_t tid[THREAD_NUM];for(int i = 0; i < THREAD_NUM; ++i){string tmp;tmp += to_string(i + 1) + "号黄牛";ThreadData* ptd = new ThreadData(tmp, &mtx);// 每个黄牛去抢票pthread_create(tid + i, nullptr, getTicket, (void*)ptd);}// 补票机制while(1){if(tickets == 0){cout << "票抢光了,准备补票中..." << endl;sleep(rand() % 10);tickets = 500;break;}sleep(1);}// 等待每个黄牛抢完票后退出for(int i = 0; i < THREAD_NUM; ++i){pthread_join(tid[i], nullptr);}pthread_mutex_destroy(&mtx);return 0;
}

运行:
我截了三张图,一张是第一次票抢光之后准备补票:

在这里插入图片描述

第二张是票补上了之后:
在这里插入图片描述

第三张是补票抢完之后:
在这里插入图片描述

这里可以发现,当票抢光之后,5个线程一直在查询当前是否有票,正如开始所说的那样。

这里的代码,会在 if(tickets > 0) 前进行加锁,因为tickets > 0就访问了临界资源,没票之后会补票,但是线程不会退出,而是一直在查询是否补票,这样效率就很低,一直循环着申请锁和解锁,虽然没错,但是不合理。故这种常规方式检测条件也就注定了其必须频繁的申请加锁和解锁。

但是我们可以改一改:

  1. 不要让线程再频繁的去自己检测临界资源是否准备就绪,如果未准备就绪就让当前线程等待,也就是进入S状态。
  2. 当临界资源就绪的时候,再唤醒想要访问临界资源的线程。

这样效率就会大大提高。

想要实现这个功能的话,可以通过条件变量来实现。

条件变量接口介绍

条件,condition,pthread库中用起来条件变量的接口都是中间加上cond。

初始化和销毁

定义一个条件变量和定义一个锁一样,也分全局和局部,初始化也是全局可以直接用宏,局部要用init:
在这里插入图片描述

全局初始化的宏:
在这里插入图片描述

局部初始化的时候第一个参数就是条件变量的地址,第二个参数给空让它以默认方式初始化就行。
还是,以pthread开头的接口返回值绝大部分都是正确返回0,错误返回错误码。

destroy,就是销毁条件变量,没啥好讲的。

pthread_cond_wait

在这里插入图片描述
就是传一个条件变量指针,然后一个锁,为啥要传一个锁后面再说。只要调用了这个函数的线程就会进入阻塞状态,也就是从运行队列进入了等待队列中。这里就相当于是黄牛开始排队了,而且注意,这里的队伍是在等待cond这个条件准备就绪,也就是说,这里的队伍是专门为cond开辟的一个队,不同于普通的等待队列。

这里就相当于是前面的if判断资源是否存在了,但是是直接让线程进入等待队列中。

还有一个timewait,多了一个参数,前两个参数和wait一样,第三个参数是一个时间,让线程等待时,当第三个参数一到会自动醒来。

signal和broadcast

在这里插入图片描述

上面的pthread_cond_wait是让线程进入到与cond相关的等待队列中,当signal被调用时,就会有一个线程出队,就相当于是等待资源准备就绪了,此时就会唤醒一个线程。不过这里前提是signal的参数中的cond要和wait的参数中的cond指向是一样的。不匹配就无法唤醒wait对应cond的队列中的线程。broadcast,广播的意思,当调用这个函数时,会将cond对应等待队列中的所有线程都唤醒,此时所有的线程会按照顺序出队。

下面我写一个全新的例子来演示一下这里的cond用法:

// 线程个数
const static int THREAD_NUM = 4;// 不同线程的执行方法
/**********************************************************************************/
void Thread_1_Func(const string& name)
{cout << name << "is doing" << " 加密工作" << endl;
}void Thread_2_Func(const string& name)
{cout << name << "is doing" << " 持久化工作" << endl;
}void Thread_3_Func(const string& name)
{cout << name << "is doing" << " 查询工作" << endl;
}void Thread_4_Func(const string& name)
{cout << name << "is doing" << " 管理工作" << endl;
}
/**********************************************************************************/// 函数指针,指向线程所要执行的函数
typedef void(*pfunc)(const string& name);// 每个线程最有用的数据
class ThreadData
{
public:ThreadData(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond, pfunc pf):_name(name),_pmtx(pmtx),_pcond(pcond),_pf(pf){}public:string _name; // 线程名pthread_mutex_t* _pmtx; // 锁pthread_cond_t* _pcond; // 条件变量pfunc _pf; // 回调函数
};// 判断线程是否退出
static bool quit = false;// pthread_create的回调方法
void* ThreadRoutine(void* arg)
{ThreadData* ptd = (ThreadData*)arg;while(!quit){// 访问临界资源前上锁pthread_mutex_lock(ptd->_pmtx);// 相当于if判断,此时线程直接阻塞pthread_cond_wait(ptd->_pcond, ptd->_pmtx);if(!quit){// 去调用线程对应的方法ptd->_pf(ptd->_name);}else{// 退出cout << ptd->_name << " quit" << endl;    }// 访问完后解锁pthread_mutex_unlock(ptd->_pmtx);}// 记得释放传来的对象,不然内存泄漏了delete ptd;ptd = nullptr;return nullptr;
}int main()
{// 局部条件变量pthread_cond_t cond;pthread_cond_init(&cond, nullptr);// 局部锁pthread_mutex_t mtx;pthread_mutex_init(&mtx, nullptr);// 各个函数执行的方法pfunc funcs[THREAD_NUM] = { Thread_1_Func, Thread_2_Func, Thread_3_Func, Thread_4_Func};// 多个线程pthread_t tids[THREAD_NUM];// 创建THREAD_NUM个线程for(int i = 0; i < THREAD_NUM; ++i){ThreadData* tmp = new ThreadData(to_string(i + 1) + "号线程", &mtx, &cond, funcs[i]);pthread_create(tids + i, nullptr, ThreadRoutine, (void*)tmp);}cout << "prepare to do the jobs" << endl;sleep(1);cout << "start doing jobs" << endl;// 发signal,让等待队列中的线程执行其方法int count = 0;while(count != 8){pthread_cond_signal(&cond);++count;sleep(1);}quit = true;// quit改为true时,其他线程已经在等待队列中了,//得让各个线程都执行其一次方法才会循环上去判断quit改变了pthread_cond_broadcast(&cond);cout << "jobs done" << endl;for(int i = 0; i < THREAD_NUM; ++i){pthread_join(tids[i], nullptr);}return 0;
}

上面的代码中,各线程执行pthread_cond_wait就会进入cond对应的等待队列中,当main线程执行一次pthread_cond_signal就会唤醒一个进程。

运行起来:
在这里插入图片描述

这里用的是signal来唤醒各个线程的,还可以用broadcast:
在这里插入图片描述
这样一次就会唤醒一批线程。

运行:
在这里插入图片描述

生产消费者模型

带着两个问题来讲一讲生产消费者模型:

  1. 条件满足的时候我们再唤醒指定的线程,但是我们怎么知道条件是否满足?
  2. 互斥所在线程同步中的意义,以及为何将pthread_cond_wait_wait放在加锁和解锁之间?

学习生产消费者模型可以帮助我们解决第一个问题。
在编写生产消费者模型的某种场景的代码时,可以帮助我们理解互斥锁的意义。

下面来举一个生活中的生产消费者模型的例子。

现在有一个超市,屏幕前的你是一名消费者。

超市中的商品,并不是由超市直接提供的,而是由供应商提供的,超市本质上是一个商品缓冲区。如图:
在这里插入图片描述
上图中,有【两种角色:消费者和生产者。一个交易场所。】

我们买东西时不会去供应商处买,而是去超市,这样能够提高效率。假如说消费者想要买一包方便面(下面都以方便面来说),如果直接去供应商那里买的话,供应商还要开机器给你做一包,这样对于供应商来说成本太高了,一包就得开一次机器,电费都比那一包方便面贵,没这个必要。直接一次性生产一大堆,然后提供给超市,这样,想要买方便面的消费者就去超市。这样就不会浪费那么多的人力和物力去一包一包的生产方便面。

有了超市,供货商只负责生产,不用为消费者准备东西。消费者只需要去超市,不用再跑到供货商处,这样逻辑上就实现了解耦。

三种关系

来说说各角色间的关系。

  • 生产者和生产者

假设图中的供应商都生产的是同种资源,比如说都生产的是方便面,不过品牌不一样,这家是康师傅的,这家是今麦郎的……,那么各个生产者之间就是竞争关系,用计算机术语来说的话,生产者和生产者之间是互斥关系。

  • 消费者和消费者

如果说疫情前快要封城了,所有的居民都要去超市抢购物资,此时超市的方便面已经快被抢空了,极端点,就剩一包了,那么有很多的居民都想要这一包方便面,居民也就是消费者,此时的消费者和消费者之间就是竞争关系,都去竞争那一包方便面,还是用计算机术语来说。在资源很少而需要同种资源的消费者很多的情况下,消费者之间是存在互斥关系的。

上面这两种互斥还可以这样来解释:

超市的空间是一个共享资源,比如说某一货架,不能让供应商全部都抢着去摆放资源,这样物品会放乱,同种不同类的方便面都摆放在一起,这样会造成混乱。也不能让顾客去某一空间抢着争夺资源,这样可能会导致消费者本来想拿康师傅但拿成了今麦郎。所以必须要保证生产者生产的过程是安全的,消费者消费的过程也要是安全的。

  • 故生产者之间是互斥关系,消费者之间也是互斥关系。
  • 生产者和消费者

生产者生产时不能让消费者消费,不然数据未传输完毕时,部分数据已经被消费者拿走,且消费者不会再次消费,这样就会导致生产者和消费者数据不一致问题。比如说一包方便面只造了不到一半就被消费者拿走,此时生产者仍然在生产,而且从开始到结束不会停止,知道生产完整一包方便面才停止,所以生产者所知道的信息是其会生产一整包方便面,但是消费者把一半拿走了,消费者得到的信息是其只拿到了半包方便面,此时就可以说二者的数据不一致。所以说生产者和消费者之间存在互斥关系。

再比如说过年期间,超市生意非常好,所有的居民都忙着进年货,当超市中的某一种商品被抢空时就要通知对应的生产者去生产,如果未通知就会出现本篇刚开始将的抢票问题,会不断有消费者询问是否有商品被补充。同样,当超市商品已摆放满了,也要通知生产者停止生产。故当缺资源的时候通知生产者生产,当资源补充上来时就通知消费者消费;当资源盈余时就通知生产者停止生产,同时刺激消费者消费。即同步。所以生产者和消费者之间也存在同步关系。

所以说生产者和消费者之间存在同步和互斥两种关系。

那么我们写代码的时候怎么编写一个生产消费者模型呢?
只需要掌握住三个原则即可。

  1. 一个交易场所。
  2. 两个角色——生产者和消费者。
  3. 三种关系,生生(互斥)、消消(互斥)、生消(同步和互斥)。

不过第三点有个特例,当只有一个生产者 和 一个消费者时就只需要维护生消这一种关系,这一点后面会再谈,这里先暂不考虑。

通过锁和条件变量来体现出三种关系。

用基本工程师思维再次理解

代码中我们要用线程来体现出生产者和消费者,也就是要给线程进行角色划分。

超市是用某种数据结构来表示的缓冲区。

商品即某些数据。

超市里是否有新增货物,生产者最清楚。因为生产者一成功生产就会新增货物。
超市里剩余多少空间让生产者生产,消费者最清楚。因为消费者每次消费都会新增空余空间。

所以这里就可以解决最初说的第一个问题:条件满足的时候我们再唤醒指定的线程,但是我们怎么知道条件是否满足?

生产者生产商品后就可立马通知消费者来消费,消费者将商品拿走后就可通知生产者去生产。故可以让生产者线程和消费者线程互相同步完成对应的生产消费者模型。这句话中的通知就是唤醒。加粗的字样就是对这个问题的解答。

再来看一个图:
在这里插入图片描述

这里要强调一点,生产和消费的过程不仅仅是从生产者生产到仓库再让消费者拿走。重要的点不在这,而是生产者从获取到数据开始生产和消费者拿掉数据开始处理的两个过程,中间传递数据的过程耗时是非常短的,至于为什么等会写代码的时候就知道了。

我前面讲进程间通信的时候说过:进程间通信的本质是让两个进程看到同一块空间。
校正一下:进程间通信的前提是让两个进程看到同一块空间,进程间通信的本质就是生产消费者模型。比如说管道,自带同步和互斥的属性,正常情况下一端写一端读,当管道为空的时候读端会阻塞,当管道满的时候写端会阻塞。这里就和生产消费者模型很像,写端读入数据后就让读端去读,读端读好了数据后就让写端去写。当写端关闭时读端就相当于读到了文件末尾,read会返回零;当读端关闭的时候写端直接出错,进程就退出了。
【注】如果我这里讲的内容你不太会的话,可以看看我这篇博客:【Linux】进程间通信(匿名管道、命名管道、共享内存等,包含代码示例)

基于生产消费者模型的阻塞队列

下面就来写写生产消费者模型的代码。

我会写两个版本,第一个版本细节比较少,第二个版本会基于第一个版本稍微进行一点优化。

版本一

先来说说大致思路:

  1. 一个交易场所,前面超市的那个例子说了,超市就是用某个数据结构表示的缓冲区,这里我就以队列来表示这个缓冲区,不过我不会再自己手搓一个队列,直接用STL库的那个,如果有同学不懂STL的队列,可以看看这篇:【C++】STL栈和队列基本功能介绍、题目练习和模拟实现(容器适配器)。

  2. 两种角色,生产者和消费者的表示,这里我就先来简单一点的,一个生产者的线程和一个消费者的线程。后面的那个例子再来多个生产者和消费者。

  3. 3种关系中,生生、消消、生消都要保持两两互斥,生消还要有一个同步的关系。我们可以用一个类来实现,其中可以用一个锁来表示所有的互斥,用两个条件变量来表示生消的同步,这个类还可以将第一点中的队列包含在在内。

先把这个类简单给出来:


template<class T>
class CPqueue
{
public:// 构造CPqueue(int capacity): _capacity(capacity){// 互斥锁、条件变量都要用接口来初始化pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}private:// 用队列这个数据结构来表示std::queue<T> _q;// 超市中能够存放的最大容量int _capacity;// 判断超市是否已经放满了pthread_cond_t _full;// 判断超市是否是空的pthread_cond_t _empty;// 互斥锁,用来两两互斥pthread_mutex_t _mtx;
};

上面的两个条件变量,一个是判断当前队列中存放的数据满了没有,一个是判断当前队列中是否空了。

如果满了,就得让生产者线程阻塞,不能继续生产了,并通知消费者来消费,等接到生产信号了再去生产;
如果空了,就得让消费者线程阻塞,不能继续消费了,并通知生产者去生产,等接到消费信号了再去消费。

而这里对应的生产的动作就是往队列中push数据,消费的动作就是从队列中pop数据。

所以要提供两个接口,一个是让生产者push的,一个是让消费者pop的。

下面的代码是在CPqueue中的:

// 判断队列是否为空
bool IsEmpty()
{return _q.size() == 0;
}// 判断队列是否已满
bool IsFull()
{return _q.size() == _capacity;
}void PushData(const T& data)
{// 进来先上锁pthread_mutex_lock(&_mtx);/* 上了锁之后先判断临界资源是否准备就绪,也就是队列是否满了*/if(IsFull()) pthread_cond_wait(&_full, &_mtx);// 到此处就说明队列不满,就可以push数据了_q.push(data);// push完了先发送让消费者消费的信号pthread_cond_signal(&_empty);// 解锁pthread_mutex_unlock(&_mtx);
}void PopData(T& data)
{// 先上锁pthread_mutex_lock(&_mtx);/* 上锁后,先判断临界资源是否准备就绪,也就是队列是否为空*/if(IsEmpty()) pthread_cond_wait(&_empty, &_mtx);// 到此处说明队列不为空,就可以pop了data = _q.front(); // 先拿数据再pop_q.pop();// pop完了发送让生产者生产的信号pthread_cond_signal(&_full);// 解锁pthread_mutex_unlock(&_mtx);
}

这里就是同步和互斥的逻辑。包含了一个交易场所和三种关系中的生消同步和互斥。

下面来写生产者和消费者的线程:

// 生产者执行的方法
void* Productor(void* arg)
{CPqueue<int>* pq = (CPqueue<int>*)arg;int data = 10;while(1){std::cout << "productor send data ::" << data << std::endl;pq->PushData(data);data++;sleep(1);}
}// 消费者执行的方法
void* Consumer(void* arg)
{CPqueue<int>* pq = (CPqueue<int>*)arg;while(1){int data = 0;pq->PopData(data);std::cout << "consumer get data ::" << data << std::endl  << std::endl;}
}int main()
{CPqueue<int>* pq = new CPqueue<int>(6);// 消费者线程pthread_t consumerThread;// 生产者线程pthread_t productorThread;// 消费者线程初始化pthread_create(&consumerThread, nullptr, Consumer, (void*)pq);// 生产者线程初始化pthread_create(&productorThread, nullptr, Productor, (void*)pq);// 等待从线程退出pthread_join(consumerThread, nullptr);pthread_join(productorThread, nullptr);delete pq;return 0;
}

运行起来效果就是这样的:
在这里插入图片描述

还可以控制消费者的消费时间点,比如说队列满了再让消费者消费:
在这里插入图片描述

运行:
在这里插入图片描述
同样的还可以队列装一半了再退出,这里就不演示了。

这里可以说一下pthread_cond_wait的作用了:

  • pthread_cond_wait是在临界区中的,来一个问题:说如果线程等待了,会持有锁等待吗?

    答案是不会的,pthread_cond_wait的第二个参数是一把锁的指针,意义在于,当pthread_cond_wait调用成功后,传入pthread_cond_wait的锁会被自动解开,所以不用担心线程在pthread_cond_wait的时候会带锁wait。

    线程阻塞并恢复之后,从哪里阻塞就会从哪里唤醒,也就是pthread_cond_wait这里,当线程被唤醒时,pthread_cond_wait会自动帮助线程申请其本来调用pthread_cond_wait成功时解开的那把锁,因为县城被唤醒时仍然在临界区中。

不过这里唤醒有多种情况,就先说一个生产者和一个消费者的。
比如说我这里代码写的是先唤醒对方线程,再解锁:
在这里插入图片描述

当另一方醒来时wait会自动申请锁,但此时锁是被当前方的线程占用的,所以另一方又会在锁上面进行阻塞等待(申请锁时,若锁被占用,则当前线程就阻塞等待锁资源),等到当前方解锁后,另一方就会自动拿到锁。

先解锁后唤醒时,比如这样:
在这里插入图片描述
此时就是另一方被唤醒时锁未被占用,那么就会直接得到锁。

这里是单生产者和单消费者的,还有多生产者和多消费者的情况,同理,不过是在锁上等待的线程会更多,这里就不再多讲了。

需要注意的是,唤醒和解锁的先后顺序是都可以的,只要发生了生成——消费这一行为即可。
不过我个人更推荐先唤醒后解锁。

前面说了,生产消费者模型能够提高工作效率。如果单从生产 -> 消费的角度来看问题的话,其实这里并没有什么效率上的提高。因为这一步只是进行了简单的拷贝而已,真正的效率提高在生产者和消费者可同时工作这一点上。生产者生产完后,消费者去取数据并执行后续操作,在这个过程中,生产者还可以继续接受生产的任务,比如网络发来的数据 / 标准输入的数据,接收好就送到缓冲区(超市)中,重在二者可以并发执行。就像餐馆里面的大厨和服务员一样,大厨做好饭后不需要亲自将饭送到餐桌上,而是让端盘的服务员去送,服务员送的同时,大厨仍可继续做饭,服务员送完菜后也可
处理顾客的其他要求(拿酒、盛米饭等)。这样服务员和大厨间的工作是互不影响的,效率就高了。

真正的时间消耗在生产者获取到数据过程 和 消费者获取到数据后的后续处理过程,中间的拷贝耗时相对来说是非常短的,当消费者的后续处理动作很耗时时,可以搞多个消费者线程并发执行该动作(总线程个数尽量不要超过CPU核数,相等最好),这样效率会更高。

版本二
  • pthread_cond_wait是一个函数,只要是一个函数就可能会调用失败,拿push来说:
if(IsFull()) pthread_cond_wait();

pthread_cond_wait是有返回值的,是一个int,调用失败了返回的是一个错误码。
在这里插入图片描述

如果这里pthread_cond_wait调用失败了,那就可能导致队列是满的但是代码仍向下执行了,就会超出规定的队列容量(STL的queue中会自动扩容,我们写的那个capacity只是我们自己规定的,如果出现不合法的行为也是不好检测的),那么这样就不合理了。pthread_cond_wait还可能存在伪唤醒的情况,意思就是条件变量_full/_empty并不满足条件但是当前线程被唤醒了。比如若其他线程误操作发送了信号,就会导致当前线程跳出其所阻塞的队列中,并进行后续操作,也是不合理的。

所以要把if改一改,换成while:
在这里插入图片描述

这样就算调用失败了,或者伪唤醒了,都会再上去判断一次IsFull(),此时如果队列还是满的,就会再次调用pthread_cond_wait,若还失败了,又会上去再判断……知道真正被唤醒并且队列不是满的了就会跳出while循环,这里push是这样,pop也是同理的:
在这里插入图片描述

故当跳出while后,后面的代码是一定能正确的进行生产和消费的。

还有这里的阻塞队列不用关心哪一个线程先执行。
如果是生产者先执行的话,就直接生产即可。
如果是消费者先执行的话,队列为空,会阻塞住,此时就会还会让生产者来生产。
所以逻辑上,一定是能让生产者先往队列中生产者东西的。

注意我上面代码中用了模版,而且上面演示的类型是int,这里我要改一改了,改成一个自定义类型。简单实现一个计算器,其成员变量为两个值,一个function包装器(如果有屏幕前的你对包装器不了解的话,可以看这篇:【C++】C++11中比较重要的内容介绍):

// 对function包装器重命名一下
typedef std::function<int(int, int)> func;class Caculator
{
public:Caculator(){}Caculator(int x, int y, func fun): _x(x), _y(y), _fun(fun){}int operator()(){return _fun(_x, _y);}public:int _x;int _y;func _fun;
};

阻塞队列的代码不变。线程执行方法变一下:

// 存放各个函数的做法和接口
std::vector<std::pair<char, func>> kv;// 这里搞了4个计算功能,加减乘除
const int KVSIZE = 4;// 生产者执行方法
void* Productor(void* arg)
{CPqueue<Caculator>* pq = (CPqueue<Caculator>*)arg;while(1){// 随机分配给消费者任务int task = rand() % KVSIZE;func fun = kv[task].second;int x = rand() % 200;int y = rand() % 500;std::cout << "productor send task ::" << task << "-->" << x << (kv[task].first) << y << '=' << '?' << std::endl;pq->PushData(Caculator(x, y, fun));sleep(1);}
}// 消费者执行方法
void* Consumer(void* arg)
{CPqueue<Caculator>* pq = (CPqueue<Caculator>*)arg;while(1){Caculator tmp;pq->PopData(tmp);std::cout << "consumer get task ::" << tmp() << std::endl  << std::endl;}
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x323424);func MyAdd = [](int x, int y){ return x + y; };func MySub = [](int x, int y){ return x - y; };func MyMul = [](int x, int y){ return x * y; };func MyDiv = [](int x, int y){ return x / y; };// 这里用到了lambda表达式,如果屏幕前的你不懂,可以看看我刚刚给的那个链接kv.push_back(std::pair<char, func>('+', MyAdd));kv.push_back(std::pair<char, func>('-', MySub));kv.push_back(std::pair<char, func>('*', MyMul));kv.push_back(std::pair<char, func>('/', MyDiv));CPqueue<Caculator>* pq = new CPqueue<Caculator>(4);// 消费者线程pthread_t consumerThread;// 生产者线程pthread_t productorThread;// 消费者线程初始化pthread_create(&consumerThread, nullptr, Consumer, (void*)pq);// 生产者线程初始化pthread_create(&productorThread, nullptr, Productor, (void*)pq);pthread_join(consumerThread, nullptr);pthread_join(productorThread, nullptr);delete pq;return 0;
}

再加上阻塞队列,运行起来就是这样:
在这里插入图片描述

这里除法有一丁点问题,我就不改了,各位想改的自己动手试试,改正浮点数就行。

多生多消

这里搞简单点,就两个生产者、两个消费者:
在这里插入图片描述

其它代码就只用改消费者打印换行一次,所有从线程打印一下线程id就行。

运行:
在这里插入图片描述

可以看到,有两个消费者和两个生产者,成功。

这里就和线程池有点像了,不过我不打算在这篇讲线程池,下一篇再详谈。

利用RAII来对锁进行优化

RAII,学过智能指针的同学应该知道是啥,如果你不懂,看这篇:【C++】智能指针。

这里写一个类,专门用来管理锁资源:

class LockGuard
{
public:LockGuard(pthread_mutex_t* pmtx):_pmtx(pmtx){pthread_mutex_lock(_pmtx);}~LockGuard(){pthread_mutex_unlock(_pmtx);}public:pthread_mutex_t* _pmtx;
};

再加锁的时候就不需要调用pthread库中的函数了,直接定义一个局部的对象就行,定义时自定调用构造,就会进行加锁,析构就会调用解锁。

void PushData(const T& data)
{// 直接让对象来管理锁LockGuard LG(&_mtx); // 构造加锁/* 上了锁之后先判断临界资源是否准备就绪,也就是队列是否满了*/while(IsFull()) pthread_cond_wait(&_full, &_mtx);// 到此处就说明队列不满,就可以push数据了_q.push(data);// 发送消费者消费的信号pthread_cond_signal(&_empty);}	// 析构解锁void PopData(T& data)
{// 直接让对象来管理锁LockGuard LG(&_mtx);// 构造加锁/* 上锁后,先判断临界资源是否准备就绪,也就是队列是否为空*/while(IsEmpty()) pthread_cond_wait(&_empty, &_mtx);// 到此处说明队列不为空,就可以pop了data = _q.front(); // 先拿数据再pop_q.pop();// pop完了发送让生产者生产的信号pthread_cond_signal(&_full);}	// 析构解锁

运行就是:
在这里插入图片描述

和前面没啥区别。不过RAII的思想放到这里非常的妙。
这里也就是RALL风格的加锁。

这篇就讲到这,下一篇细说信号量等知识。

到此结束。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/91136.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java web+Mysql e-life智能生活小区物业管理系统

本项目为本人自己书写&#xff0c;主要服务小区业主和管理人员。 e-life智能生活小区涉及多个方面的智能化和便利化服务&#xff1a; 1. 用户模块&#xff1a;包含基本的登入登出操作&#xff0c;查看个人信息中用户可以查看 自己的个人资料但不可以修改个人信息。 a) 用户…

Celery结合flask完成异步任务与定时任务

Celery 常用于 web 异步任务、定时任务等。 使用 redis 作为 Celery的「消息代理 / 消息中间件」。 这里通过Flask-Mail使用qq邮箱延时发送邮件作为示例 pip install celery pip install redis pip install Flask-Mail1、使用flask发送邮件 使用 Flask-Mail 发送邮件需要进行…

数据分析三剑客之一:Numpy详解及实战

1 NumPy介绍 NumPy 软件包是Python生态系统中数据分析、机器学习和科学计算的主力军。它极大地简化了向量和矩阵的操作处理。Python的一些主要软件包&#xff08;如 scikit-learn、SciPy、pandas 和 tensorflow&#xff09;都以 NumPy 作为其架构的基础部分。除了能对数值数据…

Linux基本指令(一)

&#x1f493;博主个人主页:不是笨小孩&#x1f440; ⏩专栏分类:数据结构与算法&#x1f440; C&#x1f440; 刷题专栏&#x1f440; C语言&#x1f440; &#x1f69a;代码仓库:笨小孩的代码库&#x1f440; ⏩社区&#xff1a;不是笨小孩&#x1f440; &#x1f339;欢迎大…

对负采样(negative sampling)的一些理解

负采样&#xff08;negative sampling&#xff09;通常用于解决在训练神经网络模型时计算softmax的分母过大、难以计算的问题。但在LightGCN模型论文的BPR LOSS中&#xff0c;负采样的概念可能与传统的softmax分母问题不完全一样。 在LightGCN模型中&#xff0c;不同于传统的协…

AR智能眼镜:提升现场服务技能、效率与盈利能力的利器(一)

随着技术的不断进步&#xff0c;现场服务组织正朝着远程支持转变&#xff0c;用以解决技能差距和生产力问题&#xff0c;提高员工培训和操作效率&#xff0c;同时为企业提高利润率&#xff0c;创造竞争优势。 本文将探讨增强现实&#xff08;AR&#xff09;、辅助现实&#xf…

【李沐深度学习笔记】损失函数

课程地址和说明 损失函数p2 本系列文章是我学习李沐老师深度学习系列课程的学习笔记&#xff0c;可能会对李沐老师上课没讲到的进行补充。 损失函数 损失函数是用来衡量预测值 y ^ \hat{y} y^​或 y ′ y y′与真实值 y y y的差别&#xff0c;下面给出常见的损失函数类型&am…

科技资讯|AirPods Pro基于定位控制的自适应音频功能

在接受 TechCrunch 媒体采访时&#xff0c;苹果高管 Ron Huang 和 Eric Treski 谈到了关于 AirPods Pro 自适应音频&#xff08;Adaptive Audio&#xff09;功能的轶事&#xff0c;曾考虑基于 GPS 信号来控制自适应音频级别。 Treski 表示在探索自适应音频功能初期&#xff0…

uniapp 实现下拉筛选框 二次开发定制

前言 最近又收到了一个需求&#xff0c;需要在uniapp 小程序上做一个下拉筛选框&#xff0c;然后找了一下插件市场&#xff0c;确实有找到&#xff0c;但不过他不支持搜索&#xff0c;于是乎&#xff0c;我就自动动手&#xff0c;进行了二开定制&#xff0c;站在巨人的肩膀上&…

什么是GraphQL?它与传统的REST API有什么不同?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 什么是GraphQL&#xff1f;⭐ 与传统的REST API 的不同⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣…

7.网络原理之TCP_IP(上)

文章目录 1.网络基础1.1认识IP地址1.2子网掩码1.3认识MAC地址1.4一跳一跳的网络数据传输1.5总结IP地址和MAC地址1.6网络设备及相关技术1.6.1集线器&#xff1a;转发所有端口1.6.2交换机&#xff1a;MAC地址转换表转发对应端口1.6.3主机&#xff1a;网络分层从上到下封装1.6.4主…

【新版】系统架构设计师 - 软件架构的演化与维护

个人总结&#xff0c;仅供参考&#xff0c;欢迎加好友一起讨论 文章目录 架构 - 软件架构的演化与维护考点摘要软件架构演化和定义面向对象软件架构演化对象演化消息演化复合片段演化约束演化 软件架构演化方式静态演化动态演化 软件架构演化原则软件架构演化评估方法大型网站架…

Ubuntu 20.04二进制部署Nightingale v6.1.0和Prometheus

sudo lsb_release -r可以看到操作系统版本是20.04&#xff0c;sudo uname -r可以看到内核版本是5.5.19。 sudo apt-get update进行更新镜像源。 完成之后&#xff0c;如下图&#xff1a; sudo apt-get upgrade -y更新软件。 选择NO&#xff0c;按下Enter。 完成如下&…

自定义注解实现Redis分布式锁、手动控制事务和根据异常名字或内容限流的三合一的功能

自定义注解实现Redis分布式锁、手动控制事务和根据异常名字或内容限流的三合一的功能 文章目录 [toc] 1.依赖2.Redisson配置2.1单机模式配置2.2主从模式2.3集群模式2.4哨兵模式 3.实现3.1 RedisConfig3.2 自定义注解IdempotentManualCtrlTransLimiterAnno3.3自定义切面Idempote…

问题记录 springboot 事务方法中使用this调用其它方法

原因: 因为代理对象中调用了原始对象的toString()方法,所以两个不同的对象打印出的引用是相同的

快速将iPhone大量照片快速传输到电脑的办法!

很多使用iPhone 的朋友要将照片传到电脑时&#xff0c;第一时间都只想到用iTunes 或iCloud&#xff0c;但这2个工具真的都非常难用&#xff0c;今天小编分享牛学长苹果数据管理工具的照片传输功能&#xff0c;他可以快速的将iPhone照片传输到电脑上&#xff0c;并且支持最新的i…

OpenCV实现模板匹配和霍夫线检测,霍夫圆检测

一&#xff0c;模板匹配 1.1代码实现 import cv2 as cv import numpy as np import matplotlib.pyplot as plt from pylab import mplmpl.rcParams[font.sans-serif] [SimHei]#图像和模板的读取 img cv.imread("cat.png") template cv.imread(r"E:\All_in\o…

配置OSPF路由

OSPF路由 1.OSPF路由 1.1 OSPF简介 OSPF(Open Shortest Path First&#xff0c;开放式最短路径优先&#xff09;路由协议是另一个比较常用的路由协议之一&#xff0c;它通过路由器之间通告网络接口的状态&#xff0c;使用最短路径算法建立路由表。在生成路由表时&#xff0c;…

亚马逊无线鼠标FCC认证办理 FCC ID

无线鼠标是指无线缆直接连接到主机的鼠标&#xff0c;采用无线技术与计算机通信&#xff0c;从而省却电线的束缚。通常采用无线通信方式&#xff0c;包括蓝牙、Wi-Fi (IEEE 802.11)、Infrared (IrDA)、ZigBee (IEEE 802.15.4)等多个无线技术标准。随着人们对办公环境和操作便捷…

Vue中动态树形菜单,以及

&#x1f3c5;我是默&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;在这里&#xff0c;我要推荐给大家我的专栏《Vue》。&#x1f3af;&#x1f3af; &#x1f680;无论你是编程小白&#xff0c;还是有一定基础的程序员&#xff0c;这个专栏…