文章目录
- 九、多线程
- 7. 生产者消费者模型
- 生产者消费者模型的简单代码
- 结果演示
- 未完待续
九、多线程
7. 生产者消费者模型
生产者消费者模型的简单代码
Makefile:
cp:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f cp
Thread.hpp:
#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>namespace ThreadModule
{template<typename T>using func_t = std::function<void(T&)>;template<typename T>class Thread{public:void Excute(){_func(_data);}public:Thread(func_t<T> func, T& data, const std::string &name="none-name"): _func(func), _data(data), _threadname(name), _stop(true){}static void *threadroutine(void *args){Thread<T> *self = static_cast<Thread<T> *>(args);self->Excute();return nullptr;}bool Start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(!n){_stop = false;return true;}else{return false;}}void Detach(){if(!_stop){pthread_detach(_tid);}}void Join(){if(!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}void Stop(){_stop = true;}~Thread() {}private:pthread_t _tid;std::string _threadname;T& _data;func_t<T> _func;bool _stop;};
}#endif
BlockQueue.hpp:
#ifndef __BLOCKQUEUE_HPP__
#define __BLOCKQUEUE_HPP__#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>template<typename T>
class BlockQueue
{
private:bool IsFull() const{return _block_queue.size() == _cap;}bool IsEmpty() const{return _block_queue.empty();}
public:BlockQueue(int cap):_cap(cap){_productor_wait_num = 0;_consumer_wait_num = 0;// 初始化互斥锁pthread_mutex_init(&_mutex, nullptr);// 初始化条件变量pthread_cond_init(&_productor_cond, nullptr); // 初始化条件变量pthread_cond_init(&_consumer_cond, nullptr);}// 生产者使用的入队列接口void Enqueue(const T& in){// 加锁pthread_mutex_lock(&_mutex);while (IsFull()){// 生产者等待数量加1_productor_wait_num++;// 等待条件变量通知唤醒并竞争到互斥锁pthread_cond_wait(&_productor_cond, &_mutex);// 生产者等待数量减1_productor_wait_num--;}// 生产的数据入资源队列_block_queue.push(in);// 解锁pthread_mutex_unlock(&_mutex);// 通知消费者可以从等待队列中出队列if (_consumer_wait_num > 0) pthread_cond_signal(&_consumer_cond);}// 消费者使用的出队列接口void Pop(T* out){// 加锁pthread_mutex_lock(&_mutex);while (IsEmpty()){// 消费者等待数量加1_consumer_wait_num++;// 等待条件变量通知唤醒并竞争到互斥锁pthread_cond_wait(&_consumer_cond, &_mutex);// 消费者等待数量减1_consumer_wait_num--;}// 获取数据*out = _block_queue.front();// 数据出队列_block_queue.pop();// 解锁pthread_mutex_unlock(&_mutex);// 通知生产者可以从等待队列中出队列if (_productor_wait_num > 0) pthread_cond_signal(&_productor_cond);}~BlockQueue(){// 销毁互斥锁pthread_mutex_destroy(&_mutex);// 销毁条件变量pthread_cond_destroy(&_productor_cond);// 销毁条件变量pthread_cond_destroy(&_consumer_cond);}
private:std::queue<T> _block_queue;// 容量上限int _cap;// 互斥锁pthread_mutex_t _mutex;// 条件变量,用于通知生产者可以入队列pthread_cond_t _productor_cond;// 条件变量,用于通知消费者可以出队列pthread_cond_t _consumer_cond;// 生产者等待数量int _productor_wait_num;// 消费者等待数量int _consumer_wait_num;
};#endif
Task.hpp:
#pragma once#include <iostream>
#include <string>class Task
{
public:Task(){}Task(int a, int b):_a(a),_b(b),_result(0){}// 执行任务void Execute(){_result = _a + _b;}std::string ResultToString(){return std::to_string(_a) + " + " + std::to_string(_b) + " = " + std::to_string(_result);}std::string DebugToString(){return std::to_string(_a) + " + " + std::to_string(_b) + " = ?";}
private:int _a;int _b;int _result;
};
Main.cc:
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>using namespace ThreadModule;
// 创建类型别名
using blockqueue_t = BlockQueue<Task>;// 消费者线程
void Consumer(blockqueue_t& bq)
{while (true){Task t;// 从阻塞队列中获取任务资源bq.Pop(&t);// 执行任务t.Execute();// 输出结果std::cout << "Consumer: " << t.ResultToString() << std::endl;}
}// 生产者线程
void Productor(blockqueue_t& bq)
{srand(time(nullptr)^pthread_self());while (true){// 分配任务int a = rand() % 10 + 1;usleep(1234);int b = rand() % 20 + 1;Task t(a, b);// 任务放入阻塞队列bq.Enqueue(t);// 输出任务信息std::cout << "Productor: " << t.DebugToString() << std::endl;sleep(1);}
}// 启动线程
void StartComm(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq, func_t<blockqueue_t> func)
{for (int i = 0; i < num; i++){// 创建一批线程std::string name = "thread-" + std::to_string(i + 1);threads->emplace_back(func, bq, name);threads->back().Start();}
}// 创建消费者线程
void StartConsumer(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq)
{StartComm(threads, num, bq, Consumer);
}// 创建生产者线程
void StartProductor(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq)
{StartComm(threads, num, bq, Productor);
}// 等待所有线程结束
void WaitAllThread(std::vector<Thread<blockqueue_t>>& threads)
{for (auto& thread : threads){thread.Join();}
}int main()
{// 创建阻塞队列,容量为5blockqueue_t* bq = new blockqueue_t(5);// 创建线程std::vector<Thread<blockqueue_t>> threads;// 创建 1个消费者线程StartConsumer(&threads, 1, *bq);// 创建 1个生产者线程StartProductor(&threads, 1, *bq);// 等待所有线程结束WaitAllThread(threads);return 0;
}
结果演示
这里使用的是单生产者和单消费者,当然也可以在主函数处创建多生产者和多消费者的模型。