Linux_生产消费模型_Block_Queue

目录

一、互斥锁

1.1 错误的抢票

1.1.1 类的成员函数与构造

1.1.2 start 函数 

1.1.3 线程的回调函数 

1.1.4 main 函数

1.1.5 结果  

1.2 概念

1.3 相关系统调用

1.3.1 锁的创建

1.3.2 锁的初始化

1.3.2.1 动态初始化

1.3.2.2 静态初始化

1.3.3 锁的销毁

1.3.4 加锁和解锁

1.4 正确的抢票

1.4.1 简单锁的封装

1.4.2 线程封装的改进 

1.4.3 main 函数的改进

二、条件变量

2.1 概念

2.2 相关系统调用

2.2.1 条件变量的创建

2.2.2 条件变量的初始化

2.2.3 条件变量的销毁

2.2.4 条件变量的等待

2.2.5 条件变量的唤醒

三、单线程生产消费模型(BQ)

3.1 初步测试代码

3.2 阻塞队列的封装

3.2.1 成员变量

3.2.2 构造析构

3.2.3 生产者模型

3.2.4 消费者模型

3.2.5 整体代码 

3.3 Main.cc 的修改

3.3.1 整体代码

3.3.2 int main 部分

3.3.3 start 函数

3.3.4 wait 函数

3.4 整体代码

3.4.1 BlockQueue.hpp

3.4.2 Thread.hpp

3.4.3 Main.cc

四、多生产多消费模型


一、互斥锁

1.1 错误的抢票

下面先看封装的一个简单的多线程:

#pragma once#include <iostream>
#include <string>
#include <pthread.h>
namespace ThreadMoudle
{typedef void(*func_t)(const std::string &name);//函数指针类型class Thread{public:void Excute(){std::cout << _name << " is running" << std:: endl;_isrunning = true;_func(_name);_isrunning = false;}public:Thread(const std::string &name, func_t func):_name(name), _func(func){std::cout << "create: " << name << "done" << std::endl;}std::string Name(){return _name;}static void *ThreadRoutine(void *args){Thread *self = static_cast<Thread*>(args);//获得了当前对象self->Excute();return nullptr;}bool Start(){int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);if (n != 0) return false;//创建函数成功返回0return true;}std::string Status(){if (_isrunning) return "running";else return "sleep";}void Stop(){if (_isrunning){::pthread_cancel(_tid);_isrunning = false;std::cout << "Stop" << std::endl;std::cout << _name << " Stop" << std::endl;}}void Join(){if (!_isrunning){::pthread_join(_tid, nullptr);std::cout << _name << "Join" << std::endl;std::cout << _name << "Joined" << std::endl;}}~Thread(){}private:std::string _name;pthread_t _tid;bool _isrunning;func_t _func;//线程要执行的回调函数};
}

1.1.1 类的成员函数与构造

现在来逐步看这个封装类,首先来看一下类成员与类的构造函数:

    typedef void(*func_t)(const std::string &name);//函数指针类型    public:Thread(const std::string &name, func_t func):_name(name), _func(func){std::cout << "create: " << name << "done" << std::endl;}private:std::string _name;pthread_t _tid;bool _isrunning;func_t _func;//线程要执行的回调函数

这里为线程起了一个名字_name,线程的tid为_tid,线程的状态以表明该线程是否在运行_isrunning,以及线程要执行的回调函数。

1.1.2 start 函数 

        bool Start(){int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);if (n != 0) return false;//创建函数成功返回0return true;}

在start函数中,封装了 pthread_create 的函数调用,这里不进行详细说明,以后专门写一篇来描述单线程以及多线程方面的知识。这里的 pthreada_create 就是创建一个线程,而且因为传入的是_tid 的引用,所以该系统调用会自动把 _tid 修改为创建线程的 tid 。

1.1.3 线程的回调函数 

    public:void Excute(){std::cout << _name << " is running" << std:: endl;_isrunning = true;_func(_name);_isrunning = false;}static void *ThreadRoutine(void *args){Thread *self = static_cast<Thread*>(args);//获得了当前对象self->Excute();return nullptr;}

这里并没有直接写线程要执行的回调函数操作,而是当作中介将参数作为 args参数包 传递给了Excute 函数,这样做的目的是为了让函数更简洁并且这里也有一个坑,成员函数不能直接用作线程函数,因为成员函数需要一个隐藏的 this 指针,而线程库无法自动传递这个指针(参考1.0.2的start函数中的 pthread_create 系统调用):

void* (*start_routine) (void*);//pthread库的线程函数签名是固定的

因此,需要定义一个静态的 ThreadRoutine 函数,这个函数不依赖于任何对象实例,可以被pthread_create 直接调用。在 Thread_Routine 中,通过 static_cast 将 void* 类型的参数转换为 Thread* 类型的指针,从而获得对象实例的指针,然后调用该对象的成员函数 Excute 。这样,Excute 函数就可以访问该对象的成员变量和成员函数。 

1.1.4 main 函数

首先以简单的思维创建一下多线程,模拟一下让几个线程一起进行抢票:

#include <iostream>
#include <unistd.h>
#include <vector>
#include "Thread.hpp"
using namespace ThreadMoudle;
int tickets = 1000;void route(const std::string &name)
{while(true){if(tickets > 0){// 抢票过程usleep(1000); // 1ms -> 抢票花费的时间printf("who: %s, get a ticket: %d\n", name.c_str(), tickets);tickets--;}else{break;}}
}
int main()
{Thread t1("thread-1", route);Thread t2("thread-2", route);Thread t3("thread-3", route);Thread t4("thread-4", route);t1.Start();t2.Start();t3.Start();t4.Start();t1.Join();t2.Join();t3.Join();t4.Join();
}

按代码来说,在回调函数 route 中,设置了while (tikects > 0) 的条件,也就是说,当 tickets 为正数时,才会进入循环内部执行打印操作,理应抢到还有 0 张票时就停止,但是,运行几次可能会发现,最后会出现某个线程 get 了第 0 张票甚至是负数票。

1.1.5 结果  

观看下面的结果,就会发现错误,这里引入一个概念,票数在这里其实相当于临界资源,所有线程都可以访问并修改,那么就存在线程执行的过快,最后一张票可能沦落到多个线程手中,但是最快执行完的线程打印出了 get a ticket:1 ,并把 1 进行了修改,其他的线程因为执行速度过快,在票还没有变为 0 之前就已经进入了循环,所以会继续执行后续的操作。

打个比方,如果一辆公交车限乘30人,每个人上车后需要进行刷卡,当第30个人已经上车时,后面还有两三个人也挤上来了,第30个人完成刷卡后,公交车检测刷卡人数等于30人,关闭了车门,但此时车上的人已经多余30人,多余的人也可以继续刷卡乘车。

此时,就需要一个规则,比如公交车一次上一人,这个人刷卡后才能上第二个,迁移到 Linux ,这就是互斥锁。

1.2 概念

临界资源:多线程执行流共享的资源就叫做临界资源
临界区:每个线程内部,访问临界资源的代码,就叫做临界区

互斥锁就可以锁住临界区,当开锁后,临界区才会打开来接收下一个线程,下一个线程被成功接收后又进行了上锁,这里,每个线程的关系称为互斥。就好比公交车一次一人的规则,这里的每个乘客就是互斥关系。

互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用

每个线程执行的操作不能被打断,要么就不进行,要么就进行完成,这就是原子性。好比一个人上公交车要么就不乘车,要么就刷卡上车,不能还没刷卡就被别人打断。

原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

1.3 相关系统调用

1.3.1 锁的创建

这是互斥锁的类型,当要设置互斥锁的时候就可以使用:

pthread_mutex_t _mutex;

1.3.2 锁的初始化

1.3.2.1 动态初始化
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);

参数:
mutex:要初始化的互斥量,注意要传引用
attr:nullptr

1.3.2.2 静态初始化

以上是动态初始化,在定义 mutex 时,同样可以静态传值直接初始化:

pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER;

1.3.3 锁的销毁

销毁互斥量需要注意:
1.使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
2.不要销毁一个已经加锁的互斥量
3.已经销毁的互斥量,要确保后面不会有线程再尝试加锁

int pthread_mutex_destroy(pthread_mutex_t *mutex);

注意这里也要传入锁的引用。

1.3.4 加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
//返回值:成功返回0,失败返回错误号

加锁时,可能会遇到以下的情况:

1.互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
2.发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。

这里的阻塞不是一直阻塞,当锁空着的时候就会自动加锁然后返回成功。

1.4 正确的抢票

知道了 mutex 的使用,就可以在上述代码中把抢票的代码改为临界区。

1.4.1 简单锁的封装

这里当调用锁的时候就会自动构造初始化加锁,出临界区后自动调用析构函数进行解锁。 

#pragma once
#include <pthread.h>class LockGuard
{
public:LockGuard(pthread_mutex_t *mutex):_mutex(mutex){pthread_mutex_lock(_mutex);//构造函数时直接加锁}~LockGuard(){pthread_mutex_unlock(_mutex);//析构函数时直接解锁}
private:pthread_mutex_t *_mutex;
};

1.4.2 线程封装的改进 

这里只是多加了一个线程的数据类型的类 ThreadData ,在这个类中设置了一个锁,并且在线程封装中使用 ThreadData 新建了变量 td ,这样在需要加锁时只需要创建 td 中的锁(详见main函数)即可。

所以这里的 Thread 的构造函数也调整了,可以直接使用 Thread 的构造函数为 _td 传入锁。

#pragma once
#include <iostream>
#include <string>
#include <pthread.h>namespace ThreadMoudle
{class ThreadData{public:ThreadData(const std::string &name, pthread_mutex_t *lock):_name(name), _lock(lock){}public:std::string _name;pthread_mutex_t *_lock;};typedef void (*func_t)(ThreadData *td); // 函数指针类型class Thread{public:void Excute(){std::cout << _name << " is running" << std::endl;_isrunning = true;_func(_td);_isrunning = false;}public:Thread(const std::string &name, func_t func, ThreadData *td):_name(name), _func(func), _td(td){std::cout << "create " << name << " done" << std::endl;}static void *ThreadRoutine(void *args) // 新线程都会执行该方法!{Thread *self = static_cast<Thread*>(args); // 获得了当前对象self->Excute();return nullptr;}bool Start(){int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);if(n != 0) return false;return true;}std::string Status(){if(_isrunning) return "running";else return "sleep";}void Stop(){if(_isrunning){::pthread_cancel(_tid);_isrunning = false;std::cout << _name << " Stop" << std::endl;}}void Join(){::pthread_join(_tid, nullptr);std::cout << _name << " Joined" << std::endl;delete _td;}std::string Name(){return _name;}~Thread(){}private:std::string _name;pthread_t _tid;bool _isrunning;func_t _func; // 线程要执行的回调函数ThreadData *_td;};
} // namespace ThreadModle

1.4.3 main 函数的改进

首先进行了锁的创建于初始化,其次将锁传入创建的线程中,当使用回调函数后,传入类的锁然后进行锁的构造,就可以完成加锁的操作,出生命域临时变量被销毁此时自动解锁。

#include <iostream>
#include <vector>
#include <cstdio>
#include <unistd.h>
#include "Thread.hpp"
#include "LockGuard.hpp"using namespace ThreadMoudle;int tickets = 10000; // 共享资源,造成了数据不一致的问题void route(ThreadData *td)
{while (true){LockGuard lockguard(td->_lock); // RAII风格的锁if (tickets > 0){// 抢票过程usleep(1000); // 1ms -> 抢票花费的时间printf("who: %s, get a ticket: %d\n", td->_name.c_str(), tickets);tickets--;}else{break;}}
}static int threadnum = 4;int main()
{pthread_mutex_t mutex;pthread_mutex_init(&mutex, nullptr);std::vector<Thread> threads;for(int i = 0; i < threadnum; i++){std::string name = "thread-" + std::to_string(i+1);ThreadData *td = new ThreadData(name, &mutex);threads.emplace_back(name, route, td);}for(auto &thread : threads){thread.Start();}for(auto &thread : threads){thread.Join();}pthread_mutex_destroy(&mutex);}

二、条件变量

2.1 概念

  • 等待特定条件的发生: 条件变量允许一个或多个线程等待某个条件变为真,而无需不断轮询某个变量的状态,从而节省CPU资源线程可以在等待条件变量的同时进入休眠状态,当条件满足时被唤醒。

  • 线程间的协调和通信: 条件变量提供了一种机制,使得一个线程可以通知另一个线程某个条件已经满足。这对于需要在多个线程之间共享资源并协调访问的场景非常有用。

  • 解决生产者-消费者问题: 在生产者-消费者问题中,生产者线程生成数据并放入缓冲区,消费者线程从缓冲区中取出数据处理。条件变量可以用于协调生产者和消费者之间的工作,例如,当缓冲区为空时消费者等待,当缓冲区有数据时通知消费者。

这里也打个比方,如果想买某个商品但是商店缺货,我们需要反复跑去超市确认,毋庸置疑,这是一件费事费力的事情, 但是如果当第一次去超市时,把我们的电话留给店员,当有货时就电话通知我们,是不是就方便灵活了很多?

2.2 相关系统调用

2.2.1 条件变量的创建

pthread_cond_t

和互斥锁类似,条件变量的数据类型为 pthread_cond_t 

2.2.2 条件变量的初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrictattr);

参数:
cond:要初始化的条件变量
attr:nullptr 

2.2.3 条件变量的销毁

int pthread_cond_destroy(pthread_cond_t *cond)

2.2.4 条件变量的等待

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释

2.2.5 条件变量的唤醒

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

其中第一个是唤醒一个线程,第二个是唤醒所有的线程。 

三、单线程生产消费模型(BQ)

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

3.1 初步测试代码

这里直接复用上述封装的线程库:

#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>namespace ThreadModule
{template<typename T>using func_t = std::function<void(T&)>;// typedef std::function<void(const T&)> func_t;template<typename T>class Thread{public:void Excute(){_func(_data);}public:Thread(func_t<T> func, T &data, const std::string &name="none-name"): _func(func), _data(data), _threadname(name), _stop(true){}static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!{Thread<T> *self = static_cast<Thread<T> *>(args);self->Excute();return nullptr;}bool Start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(!n){_stop = false;return true;}else{return false;}}void Detach(){if(!_stop){pthread_detach(_tid);}}void Join(){if(!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}void Stop(){_stop = true;}~Thread() {}private:pthread_t _tid;std::string _threadname;T &_data;  // 为了让所有的线程访问同一个全局变量func_t<T> _func;bool _stop;};
} // namespace ThreadModule#endif

main 函数中,使用 vector<Thread<int>> 包装一个线程数组, 

#include "BlockQueue.hpp"
#include "Thread.hpp"
#include <string>
#include <vector>
#include <unistd.h>using namespace ThreadModule;
int a = 10;
void Consumer(int &data)
{while (data){std::cout << "Consumer: " << data-- << std::endl;sleep(1);}
}
void StartConsumer(std::vector<Thread<int>> *threads, int num)
{for (int i = 0; i < num; i++){std::string name = "Thread-" + std::to_string(i + 1);threads->emplace_back(Consumer, a, name);//执行的函数 向函数传入的参数 namethreads->back().Start();//取到最后一个线程并启动}
}void Productor(int &data)
{while (data){std::cout << "Productor: " << data-- << std::endl;sleep(1);}
}
void StartProductor(std::vector<Thread<int>> *threads, int num)
{for (int i = 0; i < num; i++){std::string name = "Thread-" + std::to_string(i + 1);threads->emplace_back(Productor, a, name);threads->back().Start();}
}void WaitAllThread(std::vector<Thread<int>> &threads)
{for (auto &thread:threads){thread.Join();}
}
int main()
{std::vector<Thread<int>> threads;StartConsumer(&threads, 1);StartProductor(&threads, 1);WaitAllThread(threads);return 0;
}



以上是对消费者与生产者提供的接口,每次启动时把线程的数组传入,以及创建的线程数量传入即可,这里因为是单线程, num 默认传为 1 .



在对消费者与生产者提供的接口中,封装一个消费者与生产者执行的行为。其次,代表其行为的函数可以传入引用来标准执行的次数,所以就需要传入全局变量而不是局部变量,可以看到最上方定义了一个全局变量 a 来规定执行的次数。

下面看一下程序,生产者和消费者都在有序的访问并修改同一份空间。

3.2 阻塞队列的封装

3.2.1 成员变量

下面就要开始封装 blockqueue 阻塞队列,首先来看一下成员变量:

private:std::queue<T> _block_queue;   // 阻塞队列,是被整体使用的!!!int _cap;pthread_mutex_t _mutex;pthread_cond_t _product_cond;pthread_cond_t _consume_cond;

阻塞队列其中一定要包含一个队列,所以定义了一个 _block_queue 的队列,以后生产者和消费者就在这个队列中添加与取数据。 

其次,我们需要生产者与消费者有序的访问而且访问时不再受其他线程的打扰,所以还需要带锁, _mutex 。
最后,在阻塞队列中,当队列满了以后,生产者无法生产;当队列为空时,消费者无法消费。这就要求我们为生产者与消费者各定义一个条件变量:_product_cond  _consume_cond

3.2.2 构造析构

这两个成员函数相对来说比较简单,只需要继续使用系统调用即可:

public:BlockQueue(int cap):_cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_product_cond, nullptr);pthread_cond_init(&_consume_cond, nullptr);}    ~BlockQueue(){_cap = 0;pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consume_cond);}

3.2.3 生产者模型

生产者模型其实就是入队列的过程,当队列为满时,停止入队,此时让它的条件变量休眠,反之,唤醒条件变量并将生产的数据加入队列:

    void Enqueue(T &in)//提供给生产者的接口{pthread_mutex_lock(&_mutex);if (IsFull())//队列已满,无法继续生产{//释放锁:当线程调用pthread_cond_wait时,它会自动释放传入的mutex,这样其他线程就可以获取这个互斥锁并修改共享资源。//线程在条件变量上等待,直到其他线程发出信号(通过pthread_cond_signal或pthread_cond_broadcast)来通知条件满足。pthread_cond_wait(&_product_cond, &_mutex);}//满足条件可以生产_block_queue.push(in);//唤醒消费者来消费pthread_cond_signal(&_consume_cond);pthread_mutex_unlock(&_mutex);}

其中, pthread_cond_wait 的作用值得再次阐述一遍:

释放锁:当线程调用pthread_cond_wait时,它会自动释放传入的mutex,这样其他线程就可以获取这个互斥锁并修改共享资源。

线程在条件变量上等待,直到其他线程发出信号(通过pthread_cond_signal或pthread_cond_broadcast)来通知条件满足并重新拿到锁。

3.2.4 消费者模型

消费者模型与生产者模型类似,都是一样的逻辑,但是当队列为空时:wait ;当队列为满时:signal 

    void Pop(T *out)//提供给消费者的接口{pthread_mutex_lock(&_mutex);if (IsEmpty()){pthread_cond_wait(&_consume_cond, &_mutex);}*out = _block_queue.front();_block_queue.pop();//唤醒生产者来生产pthread_cond_signal(&_consume_cond);pthread_mutex_unlock(&_mutex);}

3.2.5 整体代码 

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>template<typename T>
class BlockQueue
{
private:bool IsFull(){return _block_queue.size() == _cap;}bool IsEmpty(){return _block_queue.empty();}
public:BlockQueue(int cap):_cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_product_cond, nullptr);pthread_cond_init(&_consume_cond, nullptr);}void Enqueue(T &in)//提供给生产者的接口{pthread_mutex_lock(&_mutex);if (IsFull())//队列已满,无法继续生产{//释放锁:当线程调用pthread_cond_wait时,它会自动释放传入的mutex,这样其他线程就可以获取这个互斥锁并修改共享资源。//线程在条件变量上等待,直到其他线程发出信号(通过pthread_cond_signal或pthread_cond_broadcast)来通知条件满足。pthread_cond_wait(&_product_cond, &_mutex);}//满足条件可以生产_block_queue.push(in);//唤醒消费者来消费pthread_cond_signal(&_consume_cond);pthread_mutex_unlock(&_mutex);}void Pop(T *out)//提供给消费者的接口{pthread_mutex_lock(&_mutex);if (IsEmpty()){pthread_cond_wait(&_consume_cond, &_mutex);}*out = _block_queue.front();_block_queue.pop();//唤醒生产者来生产pthread_cond_signal(&_consume_cond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){_cap = 0;pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consume_cond);}
private:std::queue<T> _block_queue;   // 阻塞队列,是被整体使用的!!!int _cap;pthread_mutex_t _mutex;pthread_cond_t _product_cond;pthread_cond_t _consume_cond;
};
#endif

3.3 Main.cc 的修改

3.3.1 整体代码

#include "BlockQueue.hpp"
#include "Thread.hpp"
#include <string>
#include <vector>
#include <unistd.h>using namespace ThreadModule;
int a = 10;void Consumer(BlockQueue<int> &bq)
{while (true){int data;bq.Pop(&data);std::cout << "Consumer Consum data is : " << data << " addr: " << &bq << std::endl;sleep(1);}
}void Productor(BlockQueue<int> &bq)
{int cnt = 1;while (true){bq.Enqueue(cnt);std::cout << "Productor product data is : " << cnt << " addr: " << &bq << std::endl;cnt++;}
}void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1);threads->emplace_back(func, bq, name);threads->back().Start();}
}void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{StartComm(threads, num, bq, Consumer);
}void StartProductor(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{StartComm(threads, num, bq, Productor);
}void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)
{for (auto &thread:threads){thread.Join();}
}
int main()
{BlockQueue<int> *bq = new BlockQueue<int>(10);std::vector<Thread<BlockQueue<int>>> threads;StartProductor(&threads, 1, *bq);StartConsumer(&threads, 1, *bq);WaitAllThread(threads);return 0;
}

3.3.2 int main 部分

int main()
{BlockQueue<int> *bq = new BlockQueue<int>(10);std::vector<Thread<BlockQueue<int>>> threads;StartProductor(&threads, 1, *bq);StartConsumer(&threads, 1, *bq);WaitAllThread(threads);return 0;
}

下面从 main 函数作为切入点,首先我们把线程数组中的线程节点改为阻塞队列,紧接着就开始调用生产者与消费者的 start 函数,并传入了新建的阻塞队列 bq ,保证生产者与消费者使用的是同一份阻塞队列

3.3.3 start 函数

为了函数的简洁,我们创建了一个公共区域:

void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1);threads->emplace_back(func, bq, name);threads->back().Start();}
}void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{StartComm(threads, num, bq, Consumer);
}void StartProductor(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{StartComm(threads, num, bq, Productor);
}

现在不论生产者或是消费者在启动时,只需要访问同一份代码即可,不同的是传入的线程执行方法不同。 

在 StartComm 中,对线程数组进行了 emplace_back ,emplace_back 会根据传入的参数自动构造一个数组的成员,也就是 thread ,传入参数为线程的执行方法、阻塞队列、线程名称。

然后来看一下 StartComm 中不同线程要执行的不同方法,就是分别打印信息:

void Consumer(BlockQueue<int> &bq)
{while (true){int data;bq.Pop(&data);std::cout << "Consumer Consum data is : " << data << " addr: " << &bq << std::endl;sleep(1);}
}void Productor(BlockQueue<int> &bq)
{int cnt = 1;while (true){bq.Enqueue(cnt);std::cout << "Productor product data is : " << cnt << " addr: " << &bq << std::endl;cnt++;}
}

下面重新回看 int main 部分:

int main()
{BlockQueue<int> *bq = new BlockQueue<int>(10);std::vector<Thread<BlockQueue<int>>> threads;StartProductor(&threads, 1, *bq);StartConsumer(&threads, 1, *bq);WaitAllThread(threads);return 0;
}

首行的 bq 默认的容量设置为 10 ,我们创建的生产者线程数量为 1 ,所以生产者的执行函数会执行 10 次,依次打印 1 - 11 的线程名称。然后创建了消费者线程数量,此时消费者的执行函数又会执行 10 次,所以打印内容为:
 
可以看到它们访问的地址空间都是同一份。

3.3.4 wait 函数

线程等待就比较简单了,直接调用系统调用即可。

3.4 整体代码

3.4.1 BlockQueue.hpp

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>template<typename T>
class BlockQueue
{
private:bool IsFull(){return _block_queue.size() == _cap;}bool IsEmpty(){return _block_queue.empty();}
public:BlockQueue(int cap):_cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_product_cond, nullptr);pthread_cond_init(&_consume_cond, nullptr);}void Enqueue(T &in)//提供给生产者的接口{pthread_mutex_lock(&_mutex);if (IsFull())//队列已满,无法继续生产{//释放锁:当线程调用pthread_cond_wait时,它会自动释放传入的mutex,这样其他线程就可以获取这个互斥锁并修改共享资源。//线程在条件变量上等待,直到其他线程发出信号(通过pthread_cond_signal或pthread_cond_broadcast)来通知条件满足。pthread_cond_wait(&_product_cond, &_mutex);}//满足条件可以生产_block_queue.push(in);//唤醒消费者来消费pthread_cond_signal(&_consume_cond);pthread_mutex_unlock(&_mutex);}void Pop(T *out)//提供给消费者的接口{pthread_mutex_lock(&_mutex);if (IsEmpty()){pthread_cond_wait(&_consume_cond, &_mutex);}*out = _block_queue.front();_block_queue.pop();//唤醒生产者来生产pthread_cond_signal(&_consume_cond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){_cap = 0;pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consume_cond);}
private:std::queue<T> _block_queue;   // 阻塞队列,是被整体使用的!!!int _cap;pthread_mutex_t _mutex;pthread_cond_t _product_cond;pthread_cond_t _consume_cond;
};
#endif

3.4.2 Thread.hpp

#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>namespace ThreadModule
{template<typename T>using func_t = std::function<void(T&)>;// typedef std::function<void(const T&)> func_t;template<typename T>class Thread{public:void Excute(){_func(_data);}public:Thread(func_t<T> func, T &data, const std::string &name="none-name"): _func(func), _data(data), _threadname(name), _stop(true){}static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!{Thread<T> *self = static_cast<Thread<T> *>(args);self->Excute();return nullptr;}bool Start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(!n){_stop = false;return true;}else{return false;}}void Detach(){if(!_stop){pthread_detach(_tid);}}void Join(){if(!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}void Stop(){_stop = true;}~Thread() {}private:pthread_t _tid;std::string _threadname;T &_data;  // 为了让所有的线程访问同一个全局变量func_t<T> _func;bool _stop;};
} // namespace ThreadModule#endif

3.4.3 Main.cc

#include "BlockQueue.hpp"
#include "Thread.hpp"
#include <string>
#include <vector>
#include <unistd.h>using namespace ThreadModule;
//int a = 10;void Consumer(BlockQueue<int> &bq)
{while (true){int data;bq.Pop(&data);std::cout << "Consumer Consum data is : " << data << " addr: " << &bq << std::endl;sleep(1);}
}void Productor(BlockQueue<int> &bq)
{int cnt = 1;while (true){bq.Enqueue(cnt);std::cout << "Productor product data is : " << cnt << " addr: " << &bq << std::endl;cnt++;}
}void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1);threads->emplace_back(func, bq, name);threads->back().Start();}
}void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{StartComm(threads, num, bq, Consumer);
}void StartProductor(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{StartComm(threads, num, bq, Productor);
}void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)
{for (auto &thread:threads){thread.Join();}
}
int main()
{BlockQueue<int> *bq = new BlockQueue<int>(10);std::vector<Thread<BlockQueue<int>>> threads;StartProductor(&threads, 1, *bq);StartConsumer(&threads, 1, *bq);WaitAllThread(threads);return 0;
}

四、多生产多消费模型

假设一个场景,假设以下为消费者,当队列有1个数据时,多个线程同时被唤醒,那么此时只有一个线程可以竞争得到锁,看一下我们的程序:


此时线程已经被唤醒,应该直接执行下面的代码,但是锁并不在它身上,当它访问的时候,阻塞队列中唯一的数据已经被竞争锁成功的线程拿走了,此时再进行访问就会出现错误,这种唤醒被称作伪唤醒。所以我们需要把判断中的 if 改为 while ,每次都要判断阻塞队列是否为空,同时,生产者线程中每次都要判断队列是否为满。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/38863.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

看不懂懂车大爆炸,你就错过了国产小车的王炸!

咦&#xff1f;咋的啦&#xff1f;咱中国自己的汽车品牌前几天在汽车工业协会公布的数据里一跃而起&#xff0c;真的是威风凛凛啊&#xff01;2023年咱们自家的乘用车品牌市场份额硬生生地占了个56%&#xff0c;这可是半壁江山啊&#xff01;特别是那些10万块钱以下的家用小车&…

32.哀家要长脑子了!

1.299. 猜数字游戏 - 力扣&#xff08;LeetCode&#xff09; 公牛还是挺好数的&#xff0c;奶牛。。。妈呀&#xff0c;一朝打回解放前 抓本质抓本质&#xff0c;有多少位非公牛数可以通过重新排列转换公牛数字&#xff0c;意思就是&#xff0c;当这个数不是公牛数字时&#x…

C++多态~~的两个特殊情况

目录 1.多态的概念 2.简单认识 &#xff08;1&#xff09;一个案例 &#xff08;2&#xff09;多态的两个满足条件 &#xff08;3&#xff09;虚函数的重写 &#xff08;4&#xff09;两个特殊情况 1.多态的概念 &#xff08;1&#xff09;多态就是多种形态&#xff1b; …

SQL 29 计算用户的平均次日留存率题解

问题截图如下&#xff1a; SQL建表代码&#xff1a; drop table if exists user_profile; drop table if exists question_practice_detail; drop table if exists question_detail; CREATE TABLE user_profile ( id int NOT NULL, device_id int NOT NULL, gender varchar…

小白也能懂:逆向分析某网站加速乐Cookie参数流程详解

加速乐作为一种常见的反爬虫技术&#xff0c;在网络上已有大量详尽深入的教程可供参考。然而&#xff0c;对于那些初次接触的人来说&#xff0c;直接面对它可能仍会感到困惑。 声明 本文仅用于学习交流&#xff0c;学习探讨逆向知识&#xff0c;欢迎私信共享学习心得。如有侵权…

【区块链+基础设施】珠三角征信链 | FISCO BCOS应用案例

“珠三角征信链”是中国人民银行广州分行、中国人民银行深圳市中心支行按照中国人民银行总行工作部署&#xff0c;积 极贯彻珠三角一体化发展、粤港澳大湾区建设等国家战略而建设的跨区域征信一体化数据中心枢纽&#xff0c;以 FISCO BCOS 为底链构建应用平台&#xff0c;并由微…

springboot接口防抖【防重复提交】

什么是防抖 所谓防抖&#xff0c;一是防用户手抖&#xff0c;二是防网络抖动。在Web系统中&#xff0c;表单提交是一个非常常见的功能&#xff0c;如果不加控制&#xff0c;容易因为用户的误操作或网络延迟导致同一请求被发送多次&#xff0c;进而生成重复的数据记录。要针对用…

Docker 镜像导出和导入

docker 镜像导出 # 导出 docker 镜像到本地文件 docker save -o [输出文件名.tar] [镜像名称[:标签]] # 示例 docker save -o minio.tar minio/minio:latest-o 或 --output&#xff1a;指定导出文件的路径和名称[镜像名称[:标签]]&#xff1a;导出镜像名称以及可选的标签 dock…

【Python画图-驯化01】一文叫你搭建python画图最优环境配置

【Python画图-循环01】一文叫你搭建python画图最优环境配置 本次修炼方法请往下查看 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我工作、学习、实践 IT领域、真诚分享 踩坑集合&#xff0c;智慧小天地&#xff01; &#x1f387; 免费获取相关内容文档关注&#…

Windows/Linux/Mac 系统局域网服务发现协议及传输速度比较

简介 分析 / 验证对比常见局域网服务发现协议在 Windows/Linux/Mac 等不同系统下的支持和表现 在使用不同系统的智能硬件时&#xff0c;如常见的树莓派 / Openwrt 路由器 / Debian/Fedora/Windows/Mac 等系统是&#xff0c;系统间相互发现以及网络共享本应是系统的基础服务&a…

探秘 Django 专业之道

一、Django项目开发 1.web框架底层 1.1 网络通信 注意&#xff1a;局域网 个人一般写程序&#xff0c;想要让别人访问&#xff1a;阿里云、腾讯云。 去云平台租服务器&#xff08;含公网IP&#xff09;程序放在云服务器 先以局域网为例 我的电脑【服务端】 import sock…

Linux下SUID提权学习 - 从原理到使用

目录 1. 文件权限介绍1.1 suid权限1.2 sgid权限1.3 sticky权限 2. SUID权限3. 设置SUID权限4. SUID提权原理5. SUID提权步骤6. 常用指令的提权方法6.1 nmap6.2 find6.3 vim6.4 bash6.5 less6.6 more6.7 其他命令的提权方法 1. 文件权限介绍 linux的文件有普通权限和特殊权限&a…

计算机毕业设计Python深度学习美食推荐系统 美食可视化 美食数据分析大屏 美食爬虫 美团爬虫 机器学习 大数据毕业设计 Django Vue.js

Python美食推荐系统开题报告 一、项目背景与意义 随着互联网和移动技术的飞速发展&#xff0c;人们的生活方式发生了巨大变化&#xff0c;尤其是餐饮行业。在线美食平台如雨后春笋般涌现&#xff0c;为用户提供了丰富的美食选择。然而&#xff0c;如何在海量的餐饮信息中快速…

(1)Jupyter Notebook 下载及安装

目录 1. Jupyter Notebook是什么&#xff1f;2. Jupyter Notebook特征3. 应用3. 利用Google Colab安装Jupyter Notebook3.1 什么是 Colab&#xff1f;3.2 访问 Google Colab 1. Jupyter Notebook是什么&#xff1f; 百度百科: Jupyter Notebook&#xff08;此前被称为 IPython …

Unity Shader 软粒子

Unity Shader 软粒子 前言项目Shader连连看项目渲染管线设置 鸣谢 前言 当场景有点单调的时候&#xff0c;就需要一些粒子点缀&#xff0c;此时软粒子就可以发挥作用了。 使用软粒子与未使用软粒子对比图 项目 Shader连连看 这里插播一点&#xff0c;可以用Vertex Color与…

ARP 原理详解 二

只要确定了 IP 地址后&#xff0c;就能够向这个 IP 地址所在的主机发送数据报&#xff0c;这是我们所熟知的事情。 但是再往深了想&#xff0c;IP 地址只是标识网络层的地址&#xff0c;那么在网络层下方数据链路层是不是也有一个地址能够告诉对方主机自己的地址呢&#xff1f…

生产环境部署与协同开发-Docker(原创超全)

关闭防火墙 systemctl stop firewalld.service 关闭SELinux vim /etc/selinux/config 查看yum支持的包并安装docker引擎 yum listyum install -y docker 启动docker设置docker自启动测试docker是否安装成功&#xff1f; systemctl start dockersystemctl enable dockerdoc…

算法基础-----【动态规划】

动态规划(待完善) 动规五部曲分别为&#xff1a; 确定dp数组&#xff08;dp table&#xff09;以及下标的含义确定递推公式&#xff08;状态转移公式&#xff09;dp数组如何初始化确定遍历顺序举例推导dp数组、 动态规划的核心就是递归剪枝&#xff08;存储键值&#xff0c;…

教师备课三要素是指什么内容

在教育的舞台上&#xff0c;教师的角色至关重要。他们不仅是知识的传递者&#xff0c;更是学生学习路上的引导者。那么&#xff0c;教师备课的三要素究竟是什么呢&#xff1f;这不仅是每个教师在教学过程中必须面对的问题&#xff0c;也是他们不断探索和实践的课题。 教师备课的…

如何通过TPM活动提升员工的设备管理能力?

在快节奏的现代职场中&#xff0c;设备管理能力已成为员工综合素质的重要一环。然而&#xff0c;如何有效提升这一能力&#xff0c;让员工在设备操作、维护和管理上更加得心应手呢&#xff1f;答案就隐藏在TPM&#xff08;Total Productive Maintenance&#xff0c;全面生产维护…