我设计这个线程池的初衷是为了与socket对接的。线程池的实现千变万化,我得这个并不一定是最好的,但却是否和我心目中需求模型的。现把部分设计思路和代码贴出,以期抛砖引玉。个人比较喜欢搞开源,所以大家如果觉得有什么需要改善的地方,欢迎给予评论。思前想后,也没啥设计图能表达出设计思想,就把类图贴出来吧。
类图设计如下:
Command类是我们的业务类。这个类里只能存放简单的内置类型,这样方便与socket的直接传输。我定义了一个cmd_成员用于存放命令字,arg_用于存放业务的参数。这个参数可以使用分隔符来分隔各个参数。我设计的只是简单实现,如果有序列化操作了,完全不需要使用我这种方法啦。
ThreadProcess就是业务处理类,这里边定义了各个方法用于进行业务处理,它将在ThreadPool中的Process函数中调用。
ThreadPool就是我们的线程池类。其中的成员变量都是静态变量,Process就是线程处理函数。
#define MAX_THREAD_NUM 50 // 该值目前需要设定为初始线程数的整数倍
#define ADD_FACTOR 40 // 该值表示一个线程可以处理的最大任务数
#define THREAD_NUM 10 // 初始线程数
bshutdown_:用于线程退出。
command_:用于存放任务队列
command_cond_:条件变量
command_mutex_:互斥锁
icurr_thread_num_:当前线程池中的线程数
thread_id_map_:这个map用于存放线程对应的其它信息,我只存放了线程的状态,0为正常,1为退出。还可以定义其它的结构来存放更多的信息,例如存放套接字。
InitializeThreads:用于初始化线程池,先创建THREAD_NUM个线程。后期扩容也需要这个函数。
Process:线程处理函数,这里边会调用AddThread和DeleteThread在进行线程池的伸缩。
AddWork:往队列中添加一个任务。
ThreadDestroy:线程销毁函数。
AddThread:扩容THREAD_NUM个线程
DeleteThread:如果任务队列为空,则将原来的线程池恢复到THREAD_NUM个。这里可以根据需要进行修改。
以下贴出代码以供大家参考。
command.h
#ifndef COMMAND_H_ #define COMMAND_H_class Command { public:int get_cmd();char* get_arg();void set_cmd(int cmd);void set_arg(char* arg); private:int cmd_;char arg_[65]; };#endif /* COMMAND_H_ */
command.cpp
#include <string.h> #include "command.h"int Command::get_cmd() {return cmd_; }char* Command::get_arg() {return arg_; }void Command::set_cmd(int cmd) {cmd_ = cmd; }void Command::set_arg(char* arg) {if(NULL == arg){return;}strncpy(arg_,arg,64);arg_[64] = '\0'; }
thread_process.h
#ifndef THREAD_PROCESS_H_ #define THREAD_PROCESS_H_class ThreadProcess { public:void Process0(void* arg);void Process1(void* arg);void Process2(void* arg); };#endif /* THREAD_PROCESS_H_ */
thread_process.cpp
#include <pthread.h> #include <stdio.h> #include <unistd.h> #include "thread_process.h"void ThreadProcess::Process0(void* arg) {printf("thread %u is starting process %s\n",pthread_self(),arg);usleep(100*1000); } void ThreadProcess::Process1(void* arg) {printf("thread %u is starting process %s\n",pthread_self(),arg);usleep(100*1000); }void ThreadProcess::Process2(void* arg) {printf("thread %u is starting process %s\n",pthread_self(),arg);usleep(100*1000); }
thread_pool.h
#ifndef THREAD_POOL_H_ #define THREAD_POOL_H_#include <map> #include <vector> #include "command.h"#define MAX_THREAD_NUM 50 // 该值目前需要设定为初始线程数的整数倍 #define ADD_FACTOR 40 // 该值表示一个线程可以处理的最大任务数 #define THREAD_NUM 10 // 初始线程数class ThreadPool { public:ThreadPool() {};static void InitializeThreads();void AddWork(Command command);void ThreadDestroy(int iwait = 2); private:static void* Process(void* arg);static void AddThread();static void DeleteThread();static bool bshutdown_;static int icurr_thread_num_;static std::map<pthread_t,int> thread_id_map_;static std::vector<Command> command_;static pthread_mutex_t command_mutex_;static pthread_cond_t command_cond_; };#endif /* THREAD_POOL_H_ */
thread_pool.cpp
#include <pthread.h> #include <stdlib.h> #include "thread_pool.h" #include "thread_process.h" #include "command.h"bool ThreadPool::bshutdown_ = false; int ThreadPool::icurr_thread_num_ = THREAD_NUM; std::vector<Command> ThreadPool::command_; std::map<pthread_t,int> ThreadPool::thread_id_map_; pthread_mutex_t ThreadPool::command_mutex_ = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t ThreadPool::command_cond_ = PTHREAD_COND_INITIALIZER;void ThreadPool::InitializeThreads() {for (int i = 0; i < THREAD_NUM ; ++i){pthread_t tempThread;pthread_create(&tempThread, NULL, ThreadPool::Process, NULL);thread_id_map_[tempThread] = 0;} }void* ThreadPool::Process(void* arg) {ThreadProcess threadprocess;Command command;while (true){pthread_mutex_lock(&command_mutex_);// 如果线程需要退出,则此时退出if (1 == thread_id_map_[pthread_self()]){pthread_mutex_unlock(&command_mutex_);printf("thread %u will exit\n", pthread_self());pthread_exit(NULL);}// 当线程不需要退出且没有需要处理的任务时,需要缩容的则缩容,不需要的则等待信号if (0 == command_.size() && !bshutdown_){if(MAX_THREAD_NUM != THREAD_NUM){DeleteThread();if (1 == thread_id_map_[pthread_self()]){pthread_mutex_unlock(&command_mutex_);printf("thread %u will exit\n", pthread_self());pthread_exit(NULL);}}pthread_cond_wait(&command_cond_,&command_mutex_);}// 线程池需要关闭,关闭已有的锁,线程退出if(bshutdown_){pthread_mutex_unlock (&command_mutex_);printf ("thread %u will exit\n", pthread_self ());pthread_exit (NULL);}// 如果线程池的最大线程数不等于初始线程数,则表明需要扩容if(MAX_THREAD_NUM != THREAD_NUM){AddThread();}// 从容器中取出待办任务std::vector<Command>::iterator iter = command_.begin();command.set_arg(iter->get_arg());command.set_cmd(iter->get_cmd());command_.erase(iter);pthread_mutex_unlock(&command_mutex_);// 开始业务处理switch(command.get_cmd()){case 0:threadprocess.Process0(command.get_arg());break;case 1:threadprocess.Process1(command.get_arg());break;case 2:threadprocess.Process2(command.get_arg());break;default:break;}}return NULL; // 完全为了消除警告(eclipse编写的代码,警告很烦人) }void ThreadPool::AddWork(Command command) {bool bsignal = false;pthread_mutex_lock(&command_mutex_);if (0 == command_.size()){bsignal = true;}command_.push_back(command);pthread_mutex_unlock(&command_mutex_);if (bsignal){pthread_cond_signal(&command_cond_);} }void ThreadPool::ThreadDestroy(int iwait) {while(0 != command_.size()){sleep(abs(iwait));}bshutdown_ = true;pthread_cond_broadcast(&command_cond_);std::map<pthread_t,int>::iterator iter = thread_id_map_.begin();for (; iter!=thread_id_map_.end(); ++iter){pthread_join(iter->first,NULL);}pthread_mutex_destroy(&command_mutex_);pthread_cond_destroy(&command_cond_); }void ThreadPool::AddThread() {if(((icurr_thread_num_*ADD_FACTOR) < command_.size())&& (MAX_THREAD_NUM != icurr_thread_num_)){InitializeThreads();icurr_thread_num_ += THREAD_NUM;} }void ThreadPool::DeleteThread() {int size = icurr_thread_num_ - THREAD_NUM;std::map<pthread_t,int>::iterator iter = thread_id_map_.begin();for(int i=0; i<size; ++i,++iter){iter->second = 1;} }
main.cpp
#include "thread_pool.h" #include "command.h"int main() {ThreadPool thread_pool;thread_pool.InitializeThreads();Command command;char arg[8] = {0};for(int i=1; i<=1000; ++i){command.set_cmd(i%3);sprintf(arg,"%d",i);command.set_arg(arg);thread_pool.AddWork(command);}sleep(10); // 用于测试线程池缩容 thread_pool.ThreadDestroy();return 0; }
代码是按照google的开源c++编码规范编写。大家可以通过改变那几个宏的值来调整线程池。有问题大家一起讨论。