一、总体调度:主函数Main.cc
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include <string>
#include <vector>
#include <functional>
#include <unistd.h>
#include <ctime>using namespace ThreadModule;
int a=10;
using Task = std::function<void()>;
using blockqueue_t=BlockQueue<Task>;void PrintHello()
{std::cout<<"hello world"<<std::endl;
}
void Consumer(blockqueue_t &bq)
{while(true){//1.从blockqueue取下来任务Task t;bq.Pop(&t);//pop完成后t中会保存任务//2.处理这个任务t();//消费者私有}
}
void Productor(blockqueue_t &bq)
{srand(time(nullptr)^pthread_self());int cnt=10;while(true){sleep(1);Task t=PrintHello;bq.Enqueue(t);}
}
//productor和consumer共用此函数
void StartComm(std::vector<Thread<blockqueue_t>> *threads,int num,blockqueue_t &bq,func_t<blockqueue_t> func)
{for(int i=0;i<num;i++){std::string name="thread-"+std::to_string(i+1);threads->emplace_back(func,bq,name);threads->back().Start();}
}//启动consumer
void StartConsumer(std::vector<Thread<blockqueue_t>> *thread,int num,blockqueue_t &bq)
{StartComm(thread,num,bq,Consumer);
}//启动productor
void StartProductor(std::vector<Thread<blockqueue_t>> *thread,int num,blockqueue_t& bq)
{StartComm(thread,num,bq,Productor);
}
void WaitAllThread(std::vector<Thread<blockqueue_t>> &threads)
{for (auto &thread : threads){thread.Join();}
}
int main()
{blockqueue_t *bq=new blockqueue_t(5);std::vector<Thread<blockqueue_t>> threads;StartConsumer(&threads,3,*bq);StartProductor(&threads,1,*bq);WaitAllThread(threads);return 0;
}
二、线程功能的封装:Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>namespace ThreadModule
{template<typename T>using func_t=std::function<void(T&)>;template<typename T>class Thread{public:void Excute(){_func(_data);}public:Thread(func_t<T> func,T&data,const std::string& name="none-name"):_func(func),_data(data),_threadname(name),_stop(true){}static void* threadroutine(void* args)//static成员函数没有this{Thread<T> *self = static_cast<Thread<T> *>(args);self->Excute();return nullptr;}bool Start(){int n=pthread_create(&_tid,nullptr,threadroutine,this);//把this传给threadroutine让其完成调用if(!n){_stop=false;return true;}else{return false;}}void Detach(){if(!_stop){pthread_detach(_tid);}}void Join(){if(!_stop){pthread_join(_tid,nullptr);}}std::string name(){return _threadname;}void Stop(){_stop = true;}~Thread(){}private:pthread_t _tid;std::string _threadname;T& _data;//要传入所执行函数的参数func_t<T> _func;//线程要执行的函数bool _stop;};
}
#endif
三、临界区阻塞队列:BlockQueue
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>template <typename T>
class BlockQueue
{
private:bool IsFull(){return _block_queue.size()==_cap;}bool IsEmpty(){return _block_queue.empty();}
public:
BlockQueue(int cap):_cap(cap)
{_productor_wait_num=0;_consumer_wait_num=0;pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_product_cond,nullptr);pthread_cond_init(&_consum_cond,nullptr);
}
void Enqueue(T& in)
{pthread_mutex_lock(&_mutex);while(IsFull())//保证代码的健壮性{// 生产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!// 1. pthread_cond_wait调用是: a. 让调用线程等待 b. 自动释放曾经持有的_mutex锁 c. 当条件满足,线程唤醒,pthread_cond_wait要求线性// 必须重新竞争_mutex锁,竞争成功,方可返回!!!// 之前:安全_productor_wait_num++;pthread_cond_wait(&_product_cond,&_mutex);// 只要等待,必定会有唤醒,唤醒的时候,就要继续从这个位置向下运行!!_productor_wait_num--;}//进行生产_block_queue.push(in);if(_consumer_wait_num > 0)pthread_cond_signal(&_consum_cond); // pthread_cond_broadcastpthread_mutex_unlock(&_mutex);
}
void Pop(T *out)
{pthread_mutex_lock(&_mutex);while(IsEmpty()){// 消费线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!// 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁_consumer_wait_num++;pthread_cond_wait(&_consum_cond, &_mutex); // 伪唤醒_consumer_wait_num--;}//进行消费*out=_block_queue.front();//让main.cc的消费者拿到任务_block_queue.pop();//通知生产者生产if(_productor_wait_num>0)pthread_cond_signal(&_product_cond);pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consum_cond);}
private:std::queue<T> _block_queue;//阻塞队列,是被整体使用的int _cap;//总上限pthread_mutex_t _mutex;//保护_block_queue的锁pthread_cond_t _product_cond;//专门给生产者提供的条件变量pthread_cond_t _consum_cond;//专门给生产者提供的条件变量int _productor_wait_num;int _consumer_wait_num;};
#endif
四、Makefile
cp:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f cp