简易线程池实现
ThreadPool.hpp(线程池)
#ifndef THREADPOOL_HPP
#define THREADPOOL_HPP#include <iostream>
#include <cstdlib>
#include <unistd.h>
#include <vector>
#include <queue>
#include "sem.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
#include "thread.hpp"
#include "Task.hpp"
using namespace std;const int g_thread_num = 3;template <class T>
class ThreadPool
{
public:ThreadPool(int num = g_thread_num):num_(num){for(int i=1;i<=num;++i){threads_.push_back(new Thread(i,routine,(void*)this));}}static void* routine(void* args){ThreadData* td = (ThreadData*)args;ThreadPool<T>* tp = (ThreadPool<T>*)td->args_;while(true){T task;{lock_Guard lg(tp->getMutex());while(tp->isEmpty()) tp->waitcond(lg);task = tp->getTask();//tp->cv_.notify_one();}task();}}void run(){for(auto iter : threads_){iter->start();}}void pushTask(const T& task){sleep(2);lock_Guard lg(mtx_);// while(task_queue_.size() >= num_)// {// cv_.Wait(lg);// }task_queue_.push(task);cout<<"push: "<<task.x_<<" + "<<task.y_<<" = ?"<<endl;cv_.notify_one();//sleep(1);}T getTask(){T t = task_queue_.front();task_queue_.pop();return t;}~ThreadPool(){for(auto iter : threads_){iter->join();delete iter;}}Mutex& getMutex() {return mtx_;}bool isEmpty() {return task_queue_.empty();}void waitcond(lock_Guard& lg) {cv_.Wait(lg);}
private:vector<Thread*> threads_;int num_;queue<T> task_queue_;Mutex mtx_;Condition_variable cv_;
};
#endif
thread.hpp(封装线程)
#ifndef THREAD_HPP
#define THREAD_HPP
#include<iostream>
#include<string>
#include<functional>
#include"ThreadPool.hpp"
using namespace std;typedef void*(*fun_t)(void*);
class ThreadData
{
public:void* args_;string name_;
};
class Thread
{
public:Thread(int num,fun_t callback,void* args):func_(callback){char namebuffer[64];snprintf(namebuffer,sizeof namebuffer,"thread-%d",num);tdata_.name_ = namebuffer;tdata_.args_ = args;}void start(){pthread_create(&tid_,nullptr,func_,(void*)&tdata_);}void join(){pthread_join(tid_,nullptr);}~Thread(){}
private:pthread_t tid_;fun_t func_;ThreadData tdata_;
};
#endif
Task.hpp(任务)
#ifndef TASK_HPP
#define TASK_HPP
#include<iostream>
class Task
{
public:Task() {}Task(int x, int y): x_(x), y_(y){}int operator()(){std::cout<<"recive and do: "<<x_<<" + "<<y_<<" = "<< x_ + y_<<std::endl;return x_ + y_;}int x_;int y_;
};#endif
Mutex.hpp(封装锁)
#include <iostream>
#include <cstdlib>
#ifndef MUTEX_HPP
#define MUTEX_HPP
class Mutex
{
public://Mutex(pthread_mutex_t* mutex)//:mutex_(mutex)Mutex(){pthread_mutex_init(&mutex_, nullptr);}void lock(){pthread_mutex_lock(&mutex_);}void unlock(){pthread_mutex_unlock(&mutex_);}pthread_mutex_t* getMutex(){return &mutex_;}~Mutex(){pthread_mutex_destroy(&mutex_);}Mutex(const Mutex& mtx) = delete;Mutex& operator=(const Mutex& mtx) = delete;
private:pthread_mutex_t mutex_;
};class lock_Guard
{
public:lock_Guard(Mutex& mutex):mutex_(mutex){mutex_.lock();}~lock_Guard(){mutex_.unlock();}pthread_mutex_t* getMutex(){return mutex_.getMutex();}lock_Guard(const lock_Guard& lg) = delete;lock_Guard& operator=(const lock_Guard& lg) = delete;
private:Mutex& mutex_;
};
#endif
Cond.hpp(封装条件变量)
#include <iostream>
#include <cstdlib>
#include "Mutex.hpp"
class Condition_variable
{
public:Condition_variable(){pthread_cond_init(&cond_,nullptr);}void Wait(lock_Guard& lg){pthread_cond_wait(&cond_,lg.getMutex());}void notify_one(){pthread_cond_signal(&cond_);}void notify_all(){pthread_cond_broadcast(&cond_);}~Condition_variable(){pthread_cond_destroy(&cond_);}
private:pthread_cond_t cond_;
};
sem.hpp(封装信号量)
#include <iostream>
#include <semaphore.h>
using namespace std;
class Sem
{
public:Sem(int value){sem_init(&sem_,0,value);}void P(){sem_wait(&sem_);}void V(){sem_post(&sem_);}~Sem(){sem_destroy(&sem_);}
private:sem_t sem_;
};
main.cc
#include "ThreadPool.hpp"
#include <pthread.h>
#define CUSTOMERSIZE 2
#define PRODUCTORSIZE 1int main()
{srand((unsigned long)time(nullptr) ^ getpid());ThreadPool<Task> *tp = new ThreadPool<Task>();tp->run();while(true){int x = rand()%100+1;int y = rand()%100+1;Task t(x,y);tp->pushTask(t);}return 0;
}
makefile
threadpool:main.ccg++ -o $@ $^ -std=c++11 -l pthread
.PHONY:clean
clean:rm -f threadpool