1. 作用
线程池内部维护了多个工作线程,每个工作线程都会去任务队列中拿取任务并执行,当执行完一个任务后不是马上销毁,而是继续保留执行其它任务。显然,线程池提高了多线程的复用率,减少了创建和销毁线程的时间。
2. 实现原理
线程池内部由任务队列、工作线程和管理者线程组成。
任务队列:存储需要处理的任务。每个任务其实就是具体的函数,在任务队列中存储函数指针和对应的实参。当工作线程获取任务后,就能根据函数指针来调用指定的函数。其实现可以是数组、链表、STL容器等。
工作线程:有N个工作线程,每个工作线程会去任务队列中拿取任务,然后执行具体的任务。当任务被处理后,任务队列中就不再有该任务了。当任务队列中没有任务时,工作线程就会阻塞。
管理者线程:周期性检测忙碌的工作线程数量和任务数量。当任务较多线程不够用时,管理者线程就会多创建几个工作线程来加快处理(不会超过工作线程数量的上限)。当任务较少线程空闲多时,管理者线程就会销毁几个工作线程来减少内存占用(不会低于工作线程数量的下限)。
注意:线程池中没有维护“生产者线程”,所谓的“生产者线程”就是往任务队列中添加任务的线程。
3. 手撕线程池
参考来源:爱编程的大丙。
【1】threadpool.c:
#include "threadpool.h"
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>#define NUMBER 2 //管理者线程增加或减少的工作线程数量//任务结构体
typedef struct Task {void (*func)(void* arg);void* arg;
} Task;//线程池结构体
struct ThreadPool {//任务队列,视为环形队列Task* taskQ;int queueCapacity; //队列容量int queueSize; //当前任务个数int queueFront; //队头 -> 取任务int queueRear; //队尾 -> 加任务//线程相关pthread_t managerID; //管理者线程IDpthread_t* threadIDs; //工作线程IDint minNum; //工作线程最小数量int maxNum; //工作线程最大数量int busyNum; //工作线程忙的数量int liveNum; //工作线程存活数量int exitNum; //要销毁的工作线程数量pthread_mutex_t mutexPool; //锁整个线程池pthread_mutex_t mutexBusy; //锁busyNumpthread_cond_t notFull; //任务队列是否满pthread_cond_t notEmpty; //任务队列是否空//线程池是否销毁int shutdown; //释放为1,否则为0
};/**************************************************************** 函 数: threadPoolCreate* 功 能: 创建线程池并初始化* 参 数: min---工作线程的最小数量* max---工作线程的最大数量* capacity---任务队列的最大容量* 返回值: 创建的线程池的地址**************************************************************/
ThreadPool* threadPoolCreate(int min, int max, int capacity)
{//申请线程池空间ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));do {//此处循环只是为了便于失败释放空间,只会执行一次if (pool == NULL) {printf("pool create error!\n");break;}//申请任务队列空间,并初始化pool->taskQ = (Task*)malloc(sizeof(Task) * capacity);if (pool->taskQ == NULL) {printf("Task create error!\n");break;}pool->queueCapacity = capacity;pool->queueSize = 0;pool->queueFront = 0;pool->queueRear = 0;//初始化互斥锁和条件变量if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0){printf("mutex or cond create error!\n");break;}//初始化shutdownpool->shutdown = 0;//初始化线程相关参数pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);if (pool->threadIDs == NULL) {printf("threadIDs create error!\n");break;}memset(pool->threadIDs, 0, sizeof(pthread_t) * max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;pool->liveNum = min;pool->exitNum = 0;//创建管理者线程和工作线程pthread_create(&pool->managerID, NULL, manager, pool);//创建管理线程for (int i = 0; i < min; ++i) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);//创建工作线程}return pool;} while (0);//申请资源失败,释放已分配的资源if (pool && pool->taskQ) free(pool->taskQ);if (pool && pool->threadIDs) free(pool->threadIDs);if (pool) free(pool);return NULL;
}/**************************************************************** 函 数: threadPoolDestroy* 功 能: 销毁线程池* 参 数: pool---要销毁的线程池* 返回值: 0表示销毁成功,-1表示销毁失败**************************************************************/
int threadPoolDestroy(ThreadPool* pool)
{if (!pool) return -1;//关闭线程池pool->shutdown = 1;//阻塞回收管理者线程pthread_join(pool->managerID, NULL);//唤醒所有工作线程,让其自杀for (int i = 0; i < pool->liveNum; ++i) {pthread_cond_signal(&pool->notEmpty);}//释放所有互斥锁和条件变量pthread_mutex_destroy(&pool->mutexBusy);pthread_mutex_destroy(&pool->mutexPool);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);//释放堆空间if (pool->taskQ) {free(pool->taskQ);pool->taskQ = NULL;}if (pool->threadIDs) {free(pool->threadIDs);pool->threadIDs = NULL;}free(pool);pool = NULL;return 0;
}/**************************************************************** 函 数: threadPoolAdd* 功 能: 生产者往线程池的任务队列中添加任务* 参 数: pool---线程池* func---函数指针,要执行的任务地址* arg---func指向的函数的实参* 返回值: 无**************************************************************/
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{pthread_mutex_lock(&pool->mutexPool);//任务队列满,阻塞生产者while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {pthread_cond_wait(&pool->notFull, &pool->mutexPool);}//判断线程池是否关闭if (pool->shutdown) {pthread_mutex_unlock(&pool->mutexPool);return;}//添加任务进pool->taskQpool->taskQ[pool->queueRear].func = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueSize++;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pthread_cond_signal(&pool->notEmpty);//唤醒工作线程pthread_mutex_unlock(&pool->mutexPool);
}/**************************************************************** 函 数: getThreadPoolBusyNum* 功 能: 获取线程池忙的工作线程数量* 参 数: pool---线程池* 返回值: 忙的工作线程数量**************************************************************/
int getThreadPoolBusyNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}/**************************************************************** 函 数: getThreadPoolAliveNum* 功 能: 获取线程池存活的工作线程数量* 参 数: pool---线程池* 返回值: 存活的工作线程数量**************************************************************/
int getThreadPoolAliveNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexPool);int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);return liveNum;
}/**************************************************************** 函 数: worker* 功 能: 工作线程的执行函数* 参 数: arg---实参传入,这里传入的是线程池* 返回值: 空指针**************************************************************/
void* worker(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (1) {/* 1.取出任务队列中的队头任务 */pthread_mutex_lock(&pool->mutexPool);//无任务就阻塞线程while (pool->queueSize == 0 && !pool->shutdown) {pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);//唤醒后,判断是不是要销毁线程if (pool->exitNum > 0) {//线程自杀pool->exitNum--;//销毁指标-1if (pool->liveNum > pool->minNum) {pool->liveNum--;//活着的工作线程-1pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}}}//线程池关闭了就退出线程if (pool->shutdown) {pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}//取出pool中taskQ的任务Task task;task.func = pool->taskQ[pool->queueFront].func;task.arg = pool->taskQ[pool->queueFront].arg;pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;//移动队头pool->queueSize--;//通知生产者添加任务pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);/* 2.设置pool的busyNum+1 */pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);/* 3.执行取出的任务 */printf("thread %ld start working ...\n", pthread_self());task.func(task.arg);free(task.arg);task.arg = NULL;printf("thread %ld end working ...\n", pthread_self());/* 4.设置pool的busyNum-1 */pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}return NULL;
}/**************************************************************** 函 数: manager* 功 能: 管理者线程的执行函数* 参 数: arg---实参传入,这里传入的是线程池* 返回值: 空指针**************************************************************/
void* manager(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (!pool->shutdown) {/* 每隔3秒检测一次 */sleep(3);/* 获取pool中相关变量 */pthread_mutex_lock(&pool->mutexPool);int taskNum = pool->queueSize; //任务队列中的任务数量int liveNum = pool->liveNum; //存活的工作线程数量int busyNum = pool->busyNum; //忙碌的工作线程数量pthread_mutex_unlock(&pool->mutexPool);/* 功能一:增加工作线程,每次增加NUMBER个 *///当任务个数大于存活工作线程数,且存活工作线程数小于最大值if (taskNum > liveNum && liveNum < pool->maxNum) {pthread_mutex_lock(&pool->mutexPool);int counter = 0;for (int i = 0; i < pool->maxNum && counter < NUMBER&& pool->liveNum < pool->maxNum; ++i){if (pool->threadIDs[i] == 0) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);counter++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}/* 功能二:销毁工作线程,每次销毁NUMBER个 *///当忙的线程数*2 < 存活线程数,且存活线程数 > 最小线程数if (busyNum * 2 < liveNum && liveNum > pool->minNum) {pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;//唤醒NUMBER个工作线程,让其解除阻塞,在worker函数中自杀for (int i = 0; i < NUMBER; ++i) {pthread_cond_signal(&pool->notEmpty);}pthread_mutex_unlock(&pool->mutexPool);}}return NULL;
}/**************************************************************** 函 数: threadExit* 功 能: 工作线程退出函数,将工作线程的ID置为0,然后退出* 参 数: pool---线程池* 返回值: 无**************************************************************/
void threadExit(ThreadPool* pool)
{//将pool->threadIDs中的ID改为0pthread_t tid = pthread_self();for (int i = 0; i < pool->maxNum; i++) {if (pool->threadIDs[i] == tid) {pool->threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);//退出
}
【2】threadpool.h:
#ifndef _THREADPOOL_H
#define _THREADPOOL_Htypedef struct ThreadPool ThreadPool;//创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int capacity);//销毁线程池
int threadPoolDestroy(ThreadPool* pool);//给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);//获取当前忙碌的工作线程的数量
int getThreadPoolBusyNum(ThreadPool* pool);//获取当前存活的工作线程的数量
int getThreadPoolAliveNum(ThreadPool* pool);/*********************其它函数**********************/
void* worker(void* arg);//工作线程的执行函数
void* manager(void* arg);//管理者线程的执行函数
void threadExit(ThreadPool* pool);//线程退出函数#endif
【3】main.c:
#include <stdio.h>
#include "threadpool.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>//任务函数,所有线程都执行此任务
void testFunc(void* arg)
{int* num = (int*)arg;printf("thread %ld is working, number = %d\n", pthread_self(), *num);sleep(1);
}int main()
{//创建线程池: 最少3个工作线程,最多10个,任务队列容量为100ThreadPool* pool = threadPoolCreate(3, 10, 100);//加入100个任务于任务队列for (int i = 0; i < 100; ++i) {int* num = (int*)malloc(sizeof(int));*num = i + 100;threadPoolAdd(pool, testFunc, num);}//销毁线程池sleep(30);//保证任务全部运行完毕threadPoolDestroy(pool);return 0;
}
【4】运行结果:
......