零基础Linux_24(多线程)线程同步+条件变量+生产者消费模型_阻塞队列版

目录

1. 线程同步和生产者消费者模型

1.1 生产者消费者模型的概念

1.2 线程同步的概念

1.3 生产者消费者模型的优点

2. 线程同步的应用

2.1 条件变量的概念

2.2 条件变量操作接口

3. 生产者消费者模型_阻塞队列

3.1 前期代码(轮廓)

3.2 中期代码(简单使用)

BlockQueue.hpp:

3.3 生产者消费者模型高效的体现

3.4 后期代码(处理任务)

Task.hpp

ProdCon.cc

3.5 最终代码(RAII风格的锁)

lockGuard.hpp

BlockQueue.hpp

本篇完。


1. 线程同步和生产者消费者模型

1.1 生产者消费者模型的概念

以生活中消费者生产者为例:

生活中,我们大部分人都扮演着消费者的角色,会经常在超市买东西,比如买方便面,而超市的方便面是由供应商生成的。此时我们就是消费者,供应商就是生产者,而超市就是一个交易场所。

  • 将读取数据的线程叫做消费者线程
  • 将产生数据的线程叫做生产者线程
  • 将共享的特定数据结构叫做缓冲区

超市的供应商肯定不止一家,即使同一种商品的供应上也不止一家,不同牌子方便面的生产者它们之间的关系是竞争关系,竞争的表现就是互斥。

站在超市的角度,假设只有一块区域是买方便面的,当生产者来供货的时候,这块区域只能让一家供应商来供货,否则就会导致不同的东西混着放,对消费者来说很不友好。

  • 生产者线程和生产者线程之间是互斥关系
  • 在同一时间只能有一个生产者线程来访问缓冲区。

假设现在超市只有一包方便面了,但是同时来了好多消费者都要买方便面,此时这些消费者之间的关系也是竞争关系,我买上你就买不上了。所以当只有一包方便面的时候,只能有一个买方便面的消费者进入超市。

  • 消费者线程和消费者线程之间是互斥关系
  • 在同一时间只能由一个消费者线程来访问缓冲区。

再假设,超市的方便面卖完了,生产者正在给超市供货,而消费者也正在买方便面,那消费者到底买没买到方便面?有可能生产者刚把方便面搬下来,还没来及摆上去,那么消费者就没有买到,也由可能生产者把方便面摆上去了,那么消费者就买到了。所以最好的办法就是生产者供货的时候,不让消费者进来。

在Linux中,缓冲区存放的都是数据,数据是可以覆盖的,比如消费者线程在读取缓冲区中的数据时,数据是"hello world",刚刚读取完"hello",生产者线程把"world"改成了"rtx",那么消费者线程读取到的就成了"hello rtx",就出错了。所以最好的办法就是当消费者线程访问缓冲区的时候,生产者线程不能访问缓冲区。

  • 消费者线程和生产者线程之间是互斥关系
  • 在同一时间内只有一个线程可以访问缓冲区。

1.2 线程同步的概念

保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。(在计算机操作系统中,饥饿(Starvation)是指某些进程或线程由于资源分配不公平或调度策略不合理而无法获得所需资源或执行时间的现象。当某个进程或线程长时间无法满足其资源需求时,就会出现饥饿问题。)

就像上篇博客中抢票的互斥代码,在每个线程抢完票以后没有进行延时代替其他处理动作时,所有票都被一个线程抢了,其他线程没有机会抢。

由于竞争能力弱而缺乏调度的线程就处于饥饿状态。

而同步就是让所有线程按照一定顺序来抢票,做到尽量公平,避免线程饥饿问题产生。具体如何实现后面会详细讲解。

继续拿超市来说,生产者不能无休止的向超市中供货,否则消费者无法进来消费,最终方便面会放不下。同样,消费者也不能无休止来买方便面,否则生产者进不来,方便面就会卖完,而且没有人来供货。所以最好的办法就是生产者供货,当货架摆满了就不供货了,让消费者来买,当方便面卖完了再让生产者来供货,从而让消费者和生产者协同起来。

  • 消费者线程和生产者线程之间又是同步关系
  • 生产者线程和消费者线程按照一定顺序去访问缓冲区。

根据上面例子和分析,对于生产者消费模型的本质可以总结为123原则(非官方版)

  • 1个交易场所:一段特定结构的缓冲区(上面例子就是超市)
  • 2种角色:生产者和消费者(上面例子就是客人和供货商)
  • 3种关系:生产者和生产者(互斥关系),消费者和消费者(互斥关系),生产者和消费者(互斥+同步关系)。

1.3 生产者消费者模型的优点

有了超市这个交易场所,生产者只要给超市供大量的货即可,比如几万包方便面,不用关心是消费者什么时候来买,只需要专注自己的生成即可。

对于消费者而言,只需要直接去超市买方便面就行,也不用等待方便面的生产。

超市只需要做的就是方便面卖完了,告诉生产者来上货,然后告诉消费者来买。消费者和生产者完全独立,不存在任何交集。

生产者消费者模型实现了消费者线程和生产者线程之间的解耦。(低耦合,高内聚)

我们平时写的C/C++代码,如果将main函数看成是生产者,普通函数看出是消费者,那么它两就存在高度耦合。

比如main函数里调用func函数:当执行func函数的时候,main函数在等待,只有func执行完毕以后main函数才能继续执行下去。如果将这两个函数看出两个执行流,那么它们就存在高度耦合。

而生产者消费者模型就成功的让生产者执行流和消费者执行流解耦了,生产者只管向缓冲区生产数据,消费者只管从缓冲区消耗数据,不用关心对方的状态。

再比如大部分人在周一到周五上班,在周六日休息,上班时候时间比较少,去超市消费的人也比较少,由于消费者和生产者互斥,所以就可以让生产者在周一到周五的时候来上货。

当周六日消费者休息的时候,去超市消费的人就比较多,方便面也卖的比较快,但是由于生产者供货量足够,所以并不会因为买的人多了就不够了的情况。

生产者消费者模型有助于解决生产者线程和消费者线程忙闲不均的问题。因为缓冲区能够缓存一定量的数据。

再比如我们买东西肯定不会直接去找供应商,因为人家不零售,因为生产者如果零售的话,每次开机器就仅生成几包方便面,成本高,效率低。

对于消费者而言,直接去找生产者还需等待生成者完成商品生成,消耗时间成本高,效率也低。

生产者消费者模型提高了了生产者线程和消费者线程的执行效率。


2. 线程同步的应用

同步是为了让多线程按照一定顺序互斥访问临界资源,在上面的生产者消费者模型中,如何实现同步呢?就要涉及下面的条件变量。

2.1 条件变量的概念

条件变量:用来描述某种临界资源是否就绪的一种数据化描述

当一个线程互斥地访问某个临界资源时,它可能发现在其它线程改变状态之前,它什么也做不了。

例如,存在一个共享的队列,生产者线程负责生产数据到队列中,消费者线程负责从队列中读取数据,当消费者线程发现队列为空时就不要再去竞争锁去访问了,而是应该等待,直到生产者线程将数据生成到队列中。

要想让消费者线程等待,就需要使用到条件变量。

那么条件变量是什么呢?继续拿超市举例:假设现在超出的架子上一次只放一包方便面,只有这包方便面被人买走了,才会放上新的方便面。

此时来了一堆消费者消费者都要买方便面,因为只有一包,所以只能去竞争了,那些竞争能力强的才能买上方便面,甚至不停的抢不停的买,此时那些竞争能力弱的消费者就会始终都买不到方便面。

竞争能力弱的消费者就会始终抢不到锁,就会产生饥饿问题。

为了维持秩序,超市的工作人员设置了一个等待区,所有消费者都在这里排队购买,方便面被摆出来了,工作人员让一个消费者进去买,没有摆出来就等着。如果消费者想买两包甚至多包,只能重新排队。

等待区及工作人员就相当于条件变量

多线程互斥访问一个临界资源时,为了让这些线程按照一定顺序访问,将这些线程都放在条件变量的等待队列中,当另一个线程让条件变量符合条件(唤醒线程)时,队列中的第一个线程就去访问临界资源。


2.2 条件变量操作接口

条件变量同样是由POSIX线程库维护的,所以使用的是POSIX标准,和互斥锁的接口非常相似。

创建条件变量:

pthread_cond_t cond;
  • 同样要加pthread_。同样是类似int a;
  • cond是英文condition的缩写。

条件变量的初始化,释放:man pthread_cond_init:

使用类似互斥锁,只是传递的参数是创建好的条件变量。

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);

cond:要初始化的条件变量

attr:nullptr

返回值:成功返回0,失败返回错误码

man pthread_cond_wait:

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
  • 第一个参数cond:创建的条件变量地址。
  • 第二个参数mutex:互斥锁的地址,这个必须有,后面再讲解为什么必须传锁。
  • 返回值:放入条件变量的等待队列成功返回0,失败返回错误码。

pthread_cond_wait的作用:调用该接口的线程放入传入条件变量的等待队列中。

唤醒条件变量等待队列中的一个线程:

man pthread_cond_signal:

int pthread_cond_signal(pthread_cond_t *cond);
  • 参数cond:所在等待队列的条件变量地址
  • 返回值:唤醒成功返回0,失败返回错误码

pthread_cond_signal作用:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的一个线程。

唤醒条件变量等待队列中的所有线程:

int pthread_cond_broadcast(pthread_cond_t *cond);
  • 参数:所在等待队列的条件变量地址
  • 返回值:唤醒成功返回0,失败返回错误码

pthread_cond_broadcast作用:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的所有线程。

将条件变量用到上一篇抢票的代码中,实现多线程按照一定顺序互斥抢票:

  •  创建全局的条件变量(后面就不创建成全局的了)。
  •  每个线程一抢上锁以后就进入条件变量的等待队列。
  •  主线程每个一秒钟唤醒一个等待的线程进行抢票。
#include <iostream>
#include <cstdio>
#include <cerrno>
#include <cstring>
#include <cassert>
#include <thread>
#include <unistd.h>
#include <pthread.h>
using namespace std;#define THREAD_NUM 4
int tickets = 10000; // 在并发访问的时候,导致了我们数据不一致的问题 -> 临界资源
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 创建条件变量class ThreadData
{
public:ThreadData(const std::string &n,pthread_mutex_t *pm):tname(n), pmtx(pm){}
public:std::string tname;pthread_mutex_t *pmtx;
};void *getTickets(void *args)
{ThreadData *td = (ThreadData*)args;while(true) // 抢票逻辑{int n = pthread_mutex_lock(td->pmtx); // 加锁assert(n == 0);pthread_cond_wait(&cond, td->pmtx); // 进入等待队列// 临界区if(tickets > 0) // 判断的本质也是计算的一种{usleep(rand()%1500);printf("%s: %d\n", td->tname.c_str(), tickets);tickets--; // 也可能出现问题n = pthread_mutex_unlock(td->pmtx); // 解锁assert(n == 0);}else{n = pthread_mutex_unlock(td->pmtx);  // break之前解锁assert(n == 0);break;}usleep(rand()%2000); // 抢完票,其实还需要后续的动作}delete td;return nullptr;
}int main()
{time_t start = time(nullptr);pthread_mutex_t mtx;pthread_mutex_init(&mtx, nullptr);pthread_t t[THREAD_NUM];for(int i = 0; i < THREAD_NUM; i++) // 多线程抢票的逻辑{std::string name = "thread ";name += std::to_string(i+1);ThreadData *td = new ThreadData(name, &mtx);pthread_create(t + i, nullptr, getTickets, (void*)td);}while(true){sleep(1);pthread_cond_signal(&cond); // 唤醒一个等待线程cout << "main thread wakeup a new thread" << endl;}for(int i = 0; i < THREAD_NUM; i++){pthread_join(t[i], nullptr);}pthread_mutex_destroy(&mtx);return 0;
}

此时就按照4 1 2 3 的顺序抢票了。

  • 每个线程都会被先挂起到等待队列中,等待主线程的唤醒。
  • 唤醒一个线程抢完票以后会继续进入等待队列,并且排在队列的后面。

如果不使用同步,就可能会只有一个线程在抢票,其他线程就会处于饥饿状态。

再放一份代码:(条件变量不再是全局的,还加了函数指针的方法让新线程执行不同的任务)

#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>#define TNUM 4
typedef void (*func_t)(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond);
volatile bool quit = false;// pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 可以定义成全局的,这样很多地方不用传参了,但是这里写正式点
// pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;class ThreadData
{
public:ThreadData(const std::string &name, func_t func, pthread_mutex_t *pmtx, pthread_cond_t *pcond): name_(name), func_(func), pmtx_(pmtx), pcond_(pcond){}public:std::string name_;func_t func_;pthread_mutex_t *pmtx_;pthread_cond_t *pcond_;
};void func1(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while (!quit){// wait一定要在加锁和解锁之间进行waitpthread_mutex_lock(pmtx);// if(临界资源是否就绪 -> 没就绪) pthread_cond_wait 但是现在没有这样的判断if (!quit) // 这个if加不加不重要{pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞}std::cout << name << " running  --  a播放" << std::endl;pthread_mutex_unlock(pmtx);}
}
void func2(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while (!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞std::cout << name << " running  -- b下载" << std::endl;pthread_mutex_unlock(pmtx);}
}
void func3(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while (!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞std::cout << name << " running  -- c刷新" << std::endl;pthread_mutex_unlock(pmtx);}
}
void func4(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while (!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞std::cout << name << " running  -- d扫描" << std::endl;pthread_mutex_unlock(pmtx);}
}void *Entry(void *args) // 添加了一个类似软件层的东西,只是演示下
{ThreadData *td = (ThreadData *)args;         // td在每一个线程自己私有的栈空间中保存td->func_(td->name_, td->pmtx_, td->pcond_); // 它是一个函数,调用完成就要返回delete td;return nullptr;
}int main()
{pthread_mutex_t mtx;pthread_cond_t cond;pthread_mutex_init(&mtx, nullptr);pthread_cond_init(&cond, nullptr);pthread_t tids[TNUM];func_t funcs[TNUM] = {func1, func2, func3, func4};for (int i = 0; i < TNUM; i++){std::string name = "Thread ";name += std::to_string(i + 1);ThreadData *td = new ThreadData(name, funcs[i], &mtx, &cond);pthread_create(tids + i, nullptr, Entry, (void *)td);}sleep(5); // 主线程和所有新线程都休眠五秒 -> 新线程阻塞等待// ctrl new threadint cnt = 10;while (cnt){std::cout << "resume thread run code ...." << cnt-- << std::endl;pthread_cond_signal(&cond);sleep(1); // 每隔一秒唤醒一个线程,线程不用自己休眠了// pthread_cond_broadcast(&cond); // 全部唤醒 -> 设置条件变量有效}std::cout << "ctrl done" << std::endl; // 控制结束quit = true;pthread_cond_broadcast(&cond); // quit = true;后再全部唤醒一次for (int i = 0; i < TNUM; i++){pthread_join(tids[i], nullptr);std::cout << "thread: " << tids[i] << "quit" << std::endl;}pthread_mutex_destroy(&mtx);pthread_cond_destroy(&cond);return 0;
}

这个代码只是演示一下不同的使用方法和场景、


3. 生产者消费者模型_阻塞队列

上图所示就是要实现的模型,有一个生产者线程,一个消费者线程,还有一个阻塞队列。

  • 阻塞队列这里使用C++STL容器中的queue来实现。
  • 阻塞队列是公共资源,所以要保证它的安全,线程A和线程B要互斥访问,只需要一把锁就能实现生产者和消费者,生产者和生产者,消费者和消费者之间的互斥。
  • 阻塞队列中有数据消费者才能读,此时生产者不能进行生产,生产者线程要进入它的等待队列中。
  • 阻塞队列中没有数据或者不满时,生产者才能进行生产,消费者在生产的时候不能读,要进入它的等待队列。

3.1 前期代码(轮廓)

先敲个轮廓出来就是这样的:(代码具体就不讲解了,看注释就行)

Makefile:

ProdCon:ProdCon.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f ProdCon

BlockQueue.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>const int gDefaultCap = 7;template <class T>
class BlockQueue
{
public:BlockQueue(int capacity = gDefaultCap) : _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_Empty, nullptr);pthread_cond_init(&_Full, nullptr);}void push(const T& in){}void pop(T* out){}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_Empty);pthread_cond_destroy(&_Full);}protected:std::queue<T> _bq;     // 阻塞队列int _capacity;         // 容量上限pthread_mutex_t _mtx;  // 保证队列安全pthread_cond_t _Empty; // 表示bq 是否空的条件pthread_cond_t _Full;  // 表示bq 是否满的条件
};

ProdCon.cc:(Producer consumer model)

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>void* consumer(void *args)
{BlockQueue<int> *bqueue = (BlockQueue<int> *)args;while(true) // 获取任务{sleep(1);}return nullptr;
}void* productor(void *args)
{BlockQueue<int> *bqueue = (BlockQueue<int> *)args;while(true) // 制作任务{sleep(1);}return nullptr;
}int main()
{BlockQueue<int> *bqueue = new BlockQueue<int>();pthread_t c[2],p[2];pthread_create(c, nullptr, consumer, bqueue);pthread_create(c + 1, nullptr, consumer, bqueue);pthread_create(p, nullptr, productor, bqueue);pthread_create(p + 1, nullptr, productor, bqueue);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
}

3.2 中期代码(简单使用)

然后现在来写一写关键的pop和push:

BlockQueue.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>const int gDefaultCap = 7;template <class T>
class BlockQueue
{
private:bool isQueueEmpty(){return _bq.size() == 0;}bool isQueueFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = gDefaultCap) : _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_Empty, nullptr);pthread_cond_init(&_Full, nullptr);}void push(const T& in) // 生产者{pthread_mutex_lock(&_mtx);// pthread_cond_wait: 只要是一个函数,就可能调用失败,可能存在 伪唤醒 的情况,所以用whilewhile(isQueueFull()) //1. 先检测当前的临界资源是否能够满足访问条件{pthread_cond_wait(&_Full, &_mtx); // 满的时候就在_Full这个条件变量下等待// 此时思考:我们是在临界区中,我是持有锁的,如果我去等待了,锁该怎么办呢?// 所以pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放// 当我被唤醒时,我从哪里醒来呢?->从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的// 当我们被唤醒的时候,pthread_cond_wait,会自动帮助我们线程获取锁}_bq.push(in); // 2. 队列不为空或者被唤醒 -> 访问临界资源,100%确定,资源是就绪的pthread_cond_signal(&_Empty); // 唤醒pthread_mutex_unlock(&_mtx); // 解锁}void pop(T* out){pthread_mutex_lock(&_mtx);while (isQueueEmpty()){pthread_cond_wait(&_Empty, &_mtx);}*out = _bq.front(); // 访问临界资源_bq.pop();pthread_cond_signal(&_Full); // 唤醒pthread_mutex_unlock(&_mtx); // 解锁}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_Empty);pthread_cond_destroy(&_Full);}protected:std::queue<T> _bq;     // 阻塞队列int _capacity;         // 容量上限pthread_mutex_t _mtx;  // 通过互斥锁保证队列安全pthread_cond_t _Empty; // 用它来表示bq 是否空的条件pthread_cond_t _Full;  //  用它来表示bq 是否满的条件
};

ProdCon.cc:

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>void* consumer(void *args)
{BlockQueue<int> *bqueue = (BlockQueue<int> *)args;while(true) // 获取任务{int a = 0;bqueue->pop(&a);std::cout << "消费一个数据" << a << std::endl;sleep(1);}return nullptr;
}void* productor(void *args)
{BlockQueue<int> *bqueue = (BlockQueue<int> *)args;int a = 1;while(true) // 制作任务{bqueue->push(a);std::cout << "生产一个数据" << a++ << std::endl;// sleep(1); // 两个换着sleep看看能不能看到约束的效果}return nullptr;
}int main()
{BlockQueue<int> *bqueue = new BlockQueue<int>();pthread_t c[2],p[2];pthread_create(c, nullptr, consumer, bqueue);pthread_create(c + 1, nullptr, consumer, bqueue);pthread_create(p, nullptr, productor, bqueue);pthread_create(p + 1, nullptr, productor, bqueue);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
}

编译运行试试效果:

此时就实现了生产者生产一批数据,然后两个线程一人消费一个,然后再生成,再消费。

把代码里生产者和消费者的sleep注释换一下:(此时就应该是消费者跟着生产者走了)

编译运行:

也成功达到了预想的效果。

如果不想类似生产一个消费一个的话还可以在生产者或者消费者里的唤醒线程前加if判断语句,比如:在生产者线程里:if(_bq.size() >= _capacity / 2) pthread_cond_signal(&_Empty);


3.3 生产者消费者模型高效的体现

前面在分析生产者消费者模型时,一直都在说该模型高效,那么到底体现在什么地方呢?

多个生产者线程向阻塞队列中生成数据,多个消费者线程从阻塞队列中消费数据。

该模型的三种关系决定了访问阻塞队列的线程同一时间只有一个。

尤其是上面代码现象中,消费和生产是一前一后的,对于阻塞队列的访问是串行的,凭什么说这个模型是高效的呢?

因为在生产者线程和消费者线程中,访问阻塞队列临界资源的代码都只有一条,只有临界区的代码才是串行访问的。

除了临界区的代码,其他部分代码所有线程都是并发执行的。

实际的线程中,临界区之外的代码会有很多,而且有可能会非常耗时,但是这些代码是可以多线程并发执行的,该模型的效率就会很高。

生产者消费者模型的高效体现在:非临界区的代码,多线程可以并发执行。

该模型的高效并不体现在对临界资源(阻塞队列)的访问上。


3.4 后期代码(处理任务)

生产者消费者模型实际上并不是仅仅用来生产消费整型数据的,它更多的是处理任务的。

这里创建一个计算任务的类Task(这里写在了一个头文件下,只弄了加法)。在类中的仿函数调用回调函数执行具体的计算逻辑,还有一个显示任务的接口。把传给阻塞队列的int传成Task

BlockQueue.hpp和前面一样

Task.hpp

#pragma once
#include <iostream>
#include <functional>
typedef std::function<int(int, int)> func_t;
class Task
{
public:Task() {}Task(int x, int y, func_t func): _x(x), _y(y), _func(func){}int operator()(){return _func(_x, _y);}public: // 不想写get接口就直接弄公有了int _x;int _y;// int type;func_t _func;
};

ProdCon.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>inline int myAdd(int x, int y)
{return x + y;
}
void* consumer(void *args)
{BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;while(true) // 获取任务{Task t;bqueue->pop(&t);std::cout << pthread_self() <<" consumer: "<< t._x << " + " << t._y << " = " << t() << std::endl;// sleep(1);}return nullptr;
}void* productor(void *args)
{BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;int a = 1;while(true) // 制作任务{int x = rand() % 10 + 1;usleep(rand() % 1000);int y = rand() % 5 + 1;Task t(x, y, myAdd);bqueue->push(t);std::cout <<pthread_self() <<" productor: "<< t._x << " + " << t._y << " = ?" << std::endl;sleep(1); // 两个换着sleep看看能不能看到约束的效果}return nullptr;
}int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ 0x777); // 只是为了更随机BlockQueue<Task> *bqueue = new BlockQueue<Task>();pthread_t c[2],p[2];pthread_create(c, nullptr, consumer, bqueue);pthread_create(c + 1, nullptr, consumer, bqueue);pthread_create(p, nullptr, productor, bqueue);pthread_create(p + 1, nullptr, productor, bqueue);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
}

编译运行:


3.5 最终代码(RAII风格的锁)

ProdCon.cc和Task.hpp跟前面一样,

lockGuard.hpp

#pragma once
#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t* mtx) :_pmtx(mtx){}void lock(){pthread_mutex_lock(_pmtx);std::cout << "进行加锁成功" << std::endl;}void unlock(){pthread_mutex_unlock(_pmtx);std::cout << "进行解锁成功" << std::endl;}~Mutex(){}
protected:pthread_mutex_t* _pmtx;
};class lockGuard // RAII风格的加锁方式
{
public:lockGuard(pthread_mutex_t* mtx) // 因为不是全局的锁,所以传进来,初始化:_mtx(mtx){_mtx.lock();}~lockGuard(){_mtx.unlock();}
protected:Mutex _mtx;
};

BlockQueue.hpp

(只是加锁方式变了,不用自己解锁了)

#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"
const int gDefaultCap = 7;template <class T>
class BlockQueue
{
private:bool isQueueEmpty(){return _bq.size() == 0;}bool isQueueFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = gDefaultCap) : _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_Empty, nullptr);pthread_cond_init(&_Full, nullptr);}void push(const T& in) // 生产者{lockGuard lockgrard(&_mtx); // 自动调用构造函数//pthread_mutex_lock(&_mtx);// pthread_cond_wait: 只要是一个函数,就可能调用失败,可能存在 伪唤醒 的情况,所以用whilewhile(isQueueFull()) //1. 先检测当前的临界资源是否能够满足访问条件{pthread_cond_wait(&_Full, &_mtx); // 满的时候就在_Full这个条件变量下等待// 此时思考:我们是在临界区中,我是持有锁的,如果我去等待了,锁该怎么办呢?// 所以pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放// 当我被唤醒时,我从哪里醒来呢?->从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的// 当我们被唤醒的时候,pthread_cond_wait,会自动帮助我们线程获取锁}_bq.push(in); // 2. 队列不为空或者被唤醒 -> 访问临界资源,100%确定,资源是就绪的pthread_cond_signal(&_Empty); // 唤醒// pthread_mutex_unlock(&_mtx); // 解锁} // 出了代码块自动调用析构函数void pop(T* out){lockGuard lockgrard(&_mtx); // 自动调用构造函数// pthread_mutex_lock(&_mtx);while (isQueueEmpty()){pthread_cond_wait(&_Empty, &_mtx);}*out = _bq.front(); // 访问临界资源_bq.pop();pthread_cond_signal(&_Full); // 唤醒// pthread_mutex_unlock(&_mtx); // 解锁} // 出了代码块自动调用析构函数~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_Empty);pthread_cond_destroy(&_Full);}protected:std::queue<T> _bq;     // 阻塞队列int _capacity;         // 容量上限pthread_mutex_t _mtx;  // 通过互斥锁保证队列安全pthread_cond_t _Empty; // 用它来表示bq 是否空的条件pthread_cond_t _Full;  //  用它来表示bq 是否满的条件
};

成功运行。


本篇完。

下一篇:零基础Linux_25(多线程)信号量+自选锁+读写锁(基于环形队列的生产者消费模型)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/125417.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JVM 类加载的过程

JVM 类加载的过程 加载验证准备解析初始化 加载 “加载”&#xff08;Loading&#xff09;阶段是整个“类加载”&#xff08;Class Loading&#xff09;过程中的一个阶段&#xff0c;它和类加载 Class Loading 是不同的&#xff0c;一个是加载 Loading 另一个是类加载 Class L…

银河麒麟V10SP1-20200711的mate-indicators进程占用内存过高的解决办法

目录 一、监控异常 二、进程异常 三、解决方法 &#xff08;一&#xff09;第一步&#xff1a;先查看操作系统版本 &#xff08;二&#xff09;第二步&#xff1a;下载相应版本的补丁包 &#xff08;三&#xff09;第三步&#xff1a;升级补丁、重启系统 1. 升级步骤 2. …

浅谈安科瑞EMS能源管控平台建设的意义-安科瑞 蒋静

摘 要&#xff1a;能源消耗量大、能源运输供给不足、环境压力日趋增加、能耗双控等一系列问题一直困扰着钢铁冶金行业&#xff0c;制约着企业快速稳定健康发展。本文介绍的安科瑞EMS能源管控平台&#xff0c;采用自动化、信息化技术&#xff0c;实现从能源数据采集、过程监控、…

3 Tensorflow构建模型详解

上一篇&#xff1a;2 用TensorFlow构建一个简单的神经网络-CSDN博客 本篇目标是介绍如何构建一个简单的线性回归模型&#xff0c;要点如下&#xff1a; 了解神经网络原理构建模型的一般步骤模型重要参数介绍 1、神经网络概念 接上一篇&#xff0c;用tensorflow写了一个猜测西…

菜单管理中icon图标回显

<el-table-column prop"icon" label"图标" show-overflow-tooltip algin"center"><template v-slot"{ row }"><el-icon :class"row.icon"></el-icon></template></el-table-column>

Oracle数据库创建Sequence序列的基本使用

1.作用就是批量插入数据的时候可以给一个主键 sequence dose not exist_sequence not exist_拒—绝的博客-CSDN博客 Oracle创建Sequence序列_TheEzreal的博客-CSDN博客 Oracle序列&#xff08;sequence&#xff09;创建失败&#xff0c;无法取值&#xff08;.nextval&#x…

iOS iGameGuardian修改器检测方案

一直以来&#xff0c;iOS 系统的安全性、稳定性都是其与安卓竞争的主力卖点。这要归功于 iOS 系统独特的闭源生态&#xff0c;应用软件上架会经过严格审核与测试。所以&#xff0c;iOS端的作弊手段&#xff0c;总是在尝试绕过 App Store 的审查。 常见的 iOS 游戏作弊&#xf…

Arrays,Arrays重载的sort方法

Arrays -1的原因.因为返回正数不就是表示存在只能是负数 Arrays重载的sort方法 //这个方法只能给引用数据类型排序 //如果是基本数据类型需要转化为对应的包装类 public class arrays {public static void main(String[] args) {Integer arr[]{2,1,4,6,3,5,8,7,9};Arrays.s…

网络套接字编程(一)

网络套接字编程(一) 文章目录 网络套接字编程(一)预备知识源IP地址和目的IP地址端口号TCP/UDP协议特点网络字节序 socket编程socket常用APIsockaddr结构 简易UDP网络程序服务端创建套接字服务端绑定IP地址和端口号字符型IP地址VS整型IP地址服务端运行客户端创建套接字客户端绑定…

mfc140u.dll丢失怎么修复,mfc140u.dll文件有什么作用

今天我想和大家分享的是关于mfc140u.dll文件丢失的解决方法。在我们使用电脑的过程中&#xff0c;有时候会遇到一些错误提示&#xff0c;其中比较常见的就是“无法找到mfc140u.dll文件”。那么&#xff0c;这个文件是什么呢&#xff1f;它有什么作用呢&#xff1f; 首先&#…

网络基础-2

IEEE制定了一个名为GARP的协议框架&#xff0c;该框架协议包含了两个具体协议&#xff0c;GMRP和GVRP。GVRP可以大大降低VLAN配置过程中的手工的工作量。 IP本身是一个协议文件的名称&#xff0c;该协议主要定义阐释了IP报文的格式。 类型网络号位数网络号个数主机号位数每个…

水溶性纳米银颗粒 纳米银颗粒 银纳米颗粒溶液

西&#xff09;产品名称&#xff1a;水溶性纳米银颗粒 安&#xff09;别名 &#xff1a;纳米银溶液 银纳米颗粒溶液 纳米银胶体等 瑞&#xff09;浓度&#xff1a;0.1mg/mL 其它均可定制 禧&#xff09;粒径&#xff1a;5nm 10nm 15nm 20nm 25nm 30nm 35nm 40nm 50nm 60nm 80…

1.6 基本安全设计准则

思维导图&#xff1a; 1.6 基本安全设计准则笔记 目标&#xff1a;理解和遵循一套广泛认可的安全设计准则&#xff0c;以指导保护机制的开发。 主要准则&#xff1a; 机制的经济性&#xff1a;安全机制应设计得简单、短小&#xff0c;便于测试和验证&#xff0c;减少漏洞和降…

【数据结构】顺序表实例探究

&#x1f497;个人主页&#x1f497; ⭐个人专栏——数据结构学习⭐ &#x1f4ab;点击关注&#x1f929;一起学习C语言&#x1f4af;&#x1f4ab; 目录 导读&#xff1a;1. 顺序表的基本内容1.1 概念及结构1.2 时间和空间复杂度1.3 基本操作1.4 顺序表的优缺点 2. 静态顺序表…

自动化测试注意事项

什么是自动化测&#xff1f; 做测试好几年了&#xff0c;真正学习和实践自动化测试一年&#xff0c;自我感觉这一个年中收获许多。一直想动笔写一篇文章分享自动化测试实践中的一些经验。终于决定花点时间来做这件事儿。 首先理清自动化测试的概念&#xff0c;广义上来讲&#…

华锐技术何志东:证券核心交易系统分布式改造将迎来规模化落地阶段

近年来&#xff0c;数字化转型成为证券业发展的下一战略高地&#xff0c;根据 2021 年证券业协会专项调查结果显示&#xff0c;71% 的券商将数字化转型列为公司战略任务。 在落地数字化转型战略过程中&#xff0c;证券业核心交易系统面临着不少挑战。构建新一代分布式核心交易…

06 MIT线性代数-线性无关,基和维数Independence, basis, and dimension

1. 线性无关 Independence Suppose A is m by n with m<n (more unknowns than equations) Then there are nonzero solutions to Ax0 Reason: there will be free variables! A中具有至少一个自由变量&#xff0c;那么Ax0一定具有非零解。A的列向量可以线性组合得到零向…

酷克数据出席永洪科技用户大会 携手驱动商业智能升级

10月27日&#xff0c;第7届永洪科技全国用户大会在北京召开。酷克数据作为国内云原生数仓代表企业&#xff0c;受邀出席本次大会&#xff0c;全面展示了云数仓领域最新前沿技术&#xff0c;并进行主题演讲。 携手合作 助力企业释放数据价值 数据仓库是商业智能&#xff08;BI…

Easy Javadoc插件的使用教程

目录 一、安装Easy Javadoc插件 二、配置注释模板 三、配置翻译 一、安装Easy Javadoc插件 在idea的File-Settings-Plugins中搜索Easy Javadoc插件&#xff0c;点击install进行安装&#xff0c;安装完成后需要restart IDE&#xff0c;重启后插件生效。 二、配置注释模板 …

openGauss学习笔记-111 openGauss 数据库管理-管理用户及权限-用户权限设置

文章目录 openGauss学习笔记-111 openGauss 数据库管理-管理用户及权限-用户权限设置111.1 给用户直接授予某对象的权限111.2 给用户指定角色111.3 回收用户权限 openGauss学习笔记-111 openGauss 数据库管理-管理用户及权限-用户权限设置 111.1 给用户直接授予某对象的权限 …