目录
一、线程池实现原理
二、定义线程池的结构
三、创建线程池实例
四、添加工作的线程的任务函数
五、管理者线程的任务函数
六、往线程池中添加任务
七、获取线程池工作的线程数量与活着的线程数量
八、线程池的销毁
一、线程池实现原理
线程池的组成主要分为3个部分。这三部分配合工作就可以得到一个完整的线程池:
1、任务队列,存储需要处理的任务。由工作的想程来处理这些任务
- 通过线程池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
- 已处理的任务会被从任务队列中删除
- 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
2、工作的线程 (任务队列任务的消费者),N个
- 线程池中维护了一定数量的工作线程,他们的作用是不停的读任务队列,从里边取出任务并处理
- 工作的线程相当于是任务队列的消费者角色
- 如果任务队列为空,工作的线程将会被阻塞(使用条件变量/信号量阻塞)
- 如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作
3、管理者线程(不处理任务队列中的任务),1个
- 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
——当任务过多的时候,可以适当的创建一些新的工作线程
——当任务过少的时候,可以适当的销毁一些工作的线程
二、定义线程池的结构
#include "threadpool.h"//任务结构体
typedef struct Task
{void (*function) (void* arg);void* arg;
}Task;//线程池结构体
struct ThreadPool
{//任务队列Task* taskQ;int queueCapacity; //容量int queueSize; //当前任务个数int queueFront; //队头->取数据int queueRear; //队尾->放数据pthread_t managerID; //管理者线程IDpthread_t *threadIDs; //工作的线程IDint minNum; //最小线程数量int maxNum; //最大线程数量int busyNum; //忙的线程的个数int liveNum; //存活的线程的个数int exitNum; //要销毁的线程个数pthread _mutex_t mutexPool; //锁整个的线程池pthread_mutex_t mutexBusy; //锁busyNum变量 pthread_cond_t notFull; //任务队列是不是满了pthread_cond_t notEmpty; //任务队列是不是空了int shutdown; //是不是要销毁线程池,销毁为1,不销毁为0
};
三、创建线程池实例
typedef struct ThreadPool ThreadPool;
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{ThreadPool* pool=(ThreadPool*)malloc(sizeof (ThreadPool));do{if (pool == NULL){printf ( "malloc threadpool fail ... \n");break;}pool->threadIDs = (pthread_t *) malloc(sizeof(pthread_t) *max);if(pool->threadIDs == NULL){printf ("malloc threadIDs fail ... \n");break;}memset(pool->threadIDs,0, sizeof (pthread_t) * max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;pool->liveNum = min; //和最小个数相等pool->exitNum = 0;if (pt.hread _mutex_init ( &pool->mutexPool,NUTI) !=0 ||pthread_mutex_init ( &pool->mutexBusy,NULL) !=0 ||pthread_cond_init (&pool->notEmpty,NULL) !=0 ||pthread_cond_init ( &pool->notFull,NULL) !=0 ){ printf ( "mutex or condition init fail ...\n");break;}//任务队列pool->taskQ = malloc(sizeof (Task) * queueSize);pool->queueCapacity =qucuesizo;pool->queueSize= 0;pool->queueFront =0;pool->queueRear = 0;pool->shutdown = 0;//创建线程pthread_create ( &pool->managerID,NULL,manager,NULL);for (int i = 0; i < min; ++i){pthread create (&pool->threadIDs[i],NULL,worker,NULL);}}whiie (O);//释放资源if(pool && pool->threadIDs) free(pool->threadIDs);if(pool && pool->taskQ) free(pool->taskQ);if(pool) free(pool);return NULL;
}
四、添加工作的线程的任务函数
void* worker (void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (1){pthread_ mutex_lock(&pool->mtexPool) ;!当前任务队列是否为空while (pool->queuesize == 0 && !pool->shutdown ){//阻塞工作线程pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);//判断是不是要销毁线程if (pool->exitNum > 0){pool->exitNum--;pthread_ mutex_unlock(&pool->mtexPool) ;!当前任务队列是否为空pthread_exit (NULL);}}//判断线程池是否被关闭了if (pool->shutdown ){pthread_mutex_unlock(&pool->mutexPool);pthread_exit(NULL);}//从任务队列中取出一个任务Task task;task.function = pool->taskQ[pool->queueFront].function;task.arg = pool->taskQ[pool->queueFront].arg;//移动头结点pool->queueFront =(pool->queueFront + 1) % pool->queueCapacity;pool->queuesize--;//解锁pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);printf("thread %ld start working ...\n");pthread mutex_lock (&pool->mutexBusy);pool->busyNum++;pthread mutex_unlock (&pool->mutexBusy) ;task.function(task.arg) ; //或者 (*task.function) (task.arg) 进行调用;free(task.arg);task.arg=NULL;printf("thread %ld end working ...\n");pthread_mutex_lock (&pool->mutexBusy);pool->busyNum--;pthread mutex_unlock (&pool->mutexBusy) ; }return NULL;
}
五、管理者线程的任务函数
const int NUMBER = 2;
void* manaqer(void* arg)
{ThreadPool* pool = (ThreadPool+)arg;while(!pool->shutdown){//每隔3s检测一次sleep(3);//取出线程池中任务的数量和当前线程的数量pthread_mutex_lock(&pool->muatexPool) ;int queueSize =pool->queuesize;int liveNum = pool->liveNum;pthread_mutex_unlock( &pool->mutexPool) ;//取出忙的线程的数量pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex__lock(&pool->mutexBusy) ;//添加线程// 任务的个数 > 存活的线程个数 && 存活的线程数 < 最大线程数if (queueSize > liveNum && liveNum< pool->maxNum){pthread_mutex_lock(&pool->mutexPool);int counter = 0;for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum ; ++i){if (pool->threadIDs[i] == 0){pthread_create( &pool->threadIDs[i],NULI,worker, pool);counter++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}// 销毁线程// 忙的线程*2 < 存活的线程数 && 存活的线程 > 最小线程数if(busyNum * 2< liveNum && liveNum > pool->minNum){pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;pthread_mutex_unlock(&pcol->mutexPool);// 让工作的线程自杀Ifor(int i=0;i<NUMBER; ++i){pthread_cond_signal(&pool->notEmpty);}}}
}
六、往线程池中添加任务
void threadPoolAdd(ThreadPool* pool,void (*func)(void*), void* arg)
{pthread_mutex_lock(&pool->mutexPool);while(pool->queueSize == pool->queueCapacity && !pool->shutdown){//阻塞生产者线程pthread_cond_wait( &pool->notFull,&pool->mutexPool) ;}if (pool->shutdown){pthread_mutex_unlock ( &pool->mutexPool) ;return;}//添加任务pool->taskQ[pool->queueRear].function = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pool->queuesize++;pthread_cond_signal( &pool->notEmpty);pthread_mutex_unlock(&pool->mutexPool) ;
}
七、获取线程池工作的线程数量与活着的线程数量
//获取线程池中工作的线程数量
int threadPoolBusyNum ( ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}//获取线程池中活着的线程数量
int threadPoolAliveNum (ThreadPool*pool)
{pthread_mutex_lock(&pool->mutexPool);int busyNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);return aliveNum;
}
八、线程池的销毁
int threadPoolDestroy (ThreadPool* pool)
{if(pool == NULL){return -1;}//关闭线程池pool->shutdown = 1;//阻塞回收管理者线程pthread_join(pool->nanagerID,NULL);//唤醒阻塞的消费者线程for (int i = 0; i < pool->liveNum; ++i){pthread_cond_signal( &pool->notEmpty) ;}//释放堆内存if(pool->taskQ){free(pool->taskQ);pool->taskQ=NULL;}if(pool->threadIDs){free(pool->threadIDs);pool->threadIDs=NULL;}free(pool);pool=NULL;pthread_mutex_destroy( &pool->mutexPool);pthread_mutex_destroy( &pool->mutexBusy);pthread_cond_ destroy( &pool->notEmpty);pthread_cond_destroy ( &pool->notFull);return 0;
}