线程池c++实现
概述
线程池(Thread Pool)是一种并发编程的设计模式,它用于管理和重复使用线程,以提高程序的性能和资源利用率。线程池通过维护一组预先创建的线程,这些线程可以在需要时被重复使用,而不是为每个任务都创建一个新的线程。
线程池通常包含以下主要组件:
- 任务队列(Task Queue): 用于存储待执行的任务。当有新的任务到达时,它被放入任务队列中等待执行。
- 工作线程(Worker Threads): 一组预先创建的线程,它们在整个程序的生命周期内一直存在。这些线程从任务队列中获取任务,并执行任务的操作。
- 线程池管理器(Thread Pool Manager): 负责管理线程池的创建、销毁和任务分配等工作。它维护线程池中的线程和任务队列,并协调它们的工作。
线程池的优势包括:
- 降低线程创建和销毁的开销: 创建和销毁线程是相对昂贵的操作。线程池通过重复使用线程,减少了这些开销。
- 控制并发度: 可以限制线程的数量,防止系统过载。这有助于更好地管理系统资源。
- 提高响应速度: 当有任务到达时,线程池中的工作线程可以立即处理,而不需要等待新线程的创建。
- 简化线程管理: 线程池将线程的管理抽象出来,开发者只需关注任务的提交和处理。
预备知识
- 并发线程基础: 包括线程、进程、同步、互斥、锁等
- 操作系统基础知识: 关于线程和进程管理的方面
- 内存管理: 确保在多线程环境下正确地分配和释放内存
- 消费者和生产者模型
实现
这里我没有按照上述的三部分来说明, 我觉得分为以下四个部分更容易理解线程池的架构
- 任务队列
- 任务结构体
struct Task
- 任务队列类
class TaskQueue
-
线程池类
-
工作线程
-
管理者线程
1. 任务队列
这里主要就是利用c++的封装, 把任务队列封装为一个类, 因为任务队列自身要完成添加任务, 取出任务等行为, 所以我们把任务队列封装为一个类TaskQueue
, 这个类也将作为线程池类的类成员
因为任务队列里面放的肯定是任务, 这个任务是什么类型呢? 我们把这个任务也封装一下, 封装为任务结构体Task
, 那这个结构体里有哪些成员呢? 熟悉c语言的人应该知道回调函数的基本形式, 大多是由一个**函数指针和一个函数参数也就是通用指针组成**, 也就是:
void(*func)(void* arg);
void* arg;
这声明了一个函数指针 func
,该指针指向一个接受 void*
类型的参数(通常是一个指向某种数据的指针)并返回 void
的函数。
所以我们的任务结构体可以确定, 在定义一个任务队列 class TaskQueue
:
成员:
- queue队列存放任务Task
- 锁, 用来在添加任务和取出任务时加锁
方法:
- 构造, 析构
- 去任务, 放任务
- 返回任务队列中的任务个数
具体的实现代码如下:
声明 TaskQueue.h
// TaskQueue.h#ifndef TEST_TASKQUEUE_H
#define TEST_TASKQUEUE_H
#include "queue"
using namespace std;
#include "pthread.h"using callback = void(*) (void* arg);struct Task{callback func;// void(*function)(void* arg);void* arg;// 构造函数Task(){func = nullptr;arg = nullptr;}// 有参构造Task(callback f, void* a): func(f), arg(a) {};
};class TaskQueue {
public:// 构造和析构TaskQueue();~TaskQueue();// 添加任务void addTask(Task task);void addTask(callback f, void* arg);// 取出任务Task takeTask();// 返回任务队列中的个数, 使用内联函数, 提高效率inline int getTaskNum(){return (int)m_Queue.size();}
private:pthread_mutex_t m_Mutex;queue<Task> m_Queue;
};#endif //TEST_TASKQUEUE_H
实现 TaskQueue.cpp
// TaskQueue.cpp#include "TaskQueue.h"
#include "pthread.h"TaskQueue::TaskQueue() {pthread_mutex_init(&this->m_Mutex, nullptr);
}TaskQueue::~TaskQueue() {pthread_mutex_destroy(&this->m_Mutex);
}void TaskQueue::addTask(Task task) {pthread_mutex_lock(&this->m_Mutex);this->m_Queue.push(task);pthread_mutex_unlock(&this->m_Mutex);
}void TaskQueue::addTask(callback f, void *arg) {pthread_mutex_lock(&this->m_Mutex);this->m_Queue.push(Task(f, arg));pthread_mutex_unlock(&this->m_Mutex);
}Task TaskQueue::takeTask() {Task task;pthread_mutex_lock(&this->m_Mutex);task = this->m_Queue.front();this->m_Queue.pop();pthread_mutex_unlock(&this->m_Mutex);return task;
}
2. ThreadPool类
这个类就是线程池类, 实现线程池的各种操作, 其中线程池的创建和析构较为麻烦一些, 剩下的行数都比较简单
类成员:
- 任务队列,
TaskQueue
类 - 管理者线程 * 1
- 工作线程 * n
- 各种线程数量
- 锁, 所整个线程池
- 条件变量, 任务队列为空时阻塞工作线程, 非空时工作线程解除阻塞
- 是否要销毁线程池 bool类型
函数:
- 构造, 析构
- 添加任务
- 获取活着的, 忙的线程个数
- 线程退出函数
先看声明文件:
声明ThreadPool.h
// ThreadPool.h#ifndef TEST_THREADPOOL_H
#define TEST_THREADPOOL_H
#include "TaskQueue.h"class ThreadPool {
public:// 构造函数ThreadPool(int min, int max);~ThreadPool();// 添加任务void addTask(Task task);// 获取忙线程个数int getBusyNum();// 获取活着的线程个数int getLiveNum();private:// 为什么要设置为静态函数呢// 这里也可以不将其设置为静态的, 可以把worker函数和manager函数变为全局函数// 具体原因可以看构造函数中创建线程的部分static void* worker(void* arg);static void* manager(void* arg);void threadExit();private:TaskQueue* taskQueue; // 任务队列pthread_t managerId; // 管理者线程IDpthread_t* workIDs; // 工作的线程ID 多个int m_Min; // 最少线程数量int m_Max; // 最多线程数量int m_BusyNum; // 忙着的线程数量int m_LiveNum; // 存活的线程数量int m_ExitNum; // 要销毁的线程数量 (线程较多, 任务少的时候)pthread_mutex_t m_MutexPool; // 锁整个线程池pthread_cond_t m_NotEmpty; // 任务队列是不是空了bool m_ShutDown = false; // 是否要销毁线程池, 销毁 - 1, 不销毁 - 0
};#endif //TEST_THREADPOOL_H
然后我们一个一个的看ThreadPool
里面的函数
首先是构造函数, 也就是我们创建线程池的时候要完成哪些操作:
构造函数
ThreadPool(int min, int max);
函数的参数时线程池最少线程数量和最多线程数量, 意思就是就算没有任务, 也要有min个线程就绪, 任务再多, 也只能有max个线程
步骤:
- 创建一个TaskQueue实例
- 为工作者线程开辟堆内存
- 将工作线程的线程id初始化为0
- 初始化成员中的各种线程数量和线程池是否关闭的bool值
- 判断锁和条件变量是否初始化成功(同时也初始化了锁和条件变量)
- 创建管理者线程和工作线程
来看一下构造函数的具体实现:
// 构造函数
ThreadPool::ThreadPool(int min, int max) {do {this->taskQueue = new TaskQueue;if(taskQueue == nullptr){cout << "new taskQueue failed..." << endl;break;}workIDs = new pthread_t[max];if (workIDs == nullptr) {printf("new threadIDs failed...\n");break;}// 将工作线程的id都初始化为0memset(workIDs, 0, sizeof(pthread_t) * max);m_Min = min;m_Max = max;m_BusyNum = 0;m_LiveNum = min;m_ExitNum = 0;// 判断锁和条件变量是否初始化成功if (pthread_mutex_init(&m_MutexPool, nullptr) != 0 ||pthread_cond_init(&m_NotEmpty, nullptr) != 0) {printf("mutex or condition init failed...\n");break;}m_ShutDown = false;// 创建线程// 管理者线程pthread_create(&managerId, nullptr, manager, this);// 工作线程for (int i = 0; i < min; ++i) {pthread_create(&workIDs[i], nullptr, worker, this);}return;} while (0);// do while 外面这些代码都是出了问题才会走到的delete []workIDs;delete taskQueue;return;
}
这里的do…while(0) 结构主要是在好几次初始化的过程中有可能失败, 失败的话每个条件判断语句中都要释放一遍堆内存, 干脆就放在do…while循环中, 如果程序都正常运行, 在do中的最后就return了, 走不到do…while的外面
在创建管理者线程和工作线程的时候, 我们把this
指针传给了manager
函数和worker
函数, 所以我们再来看这两个函数
工作线程 worker
虽然这一部分我在大纲中归为第三大部分, 但其实worker
函数和manager
函数也是ThreadPool
类的一部分, 只不过是静态成员函数(静态的原因后面再说)
看一下工作线程的处理流程
- 判断任务队列是否为空, 空: 阻塞
- 判断ThreadPool是否要关闭了
- 取出任务
- 开始工作
- 释放资源
具体实现:
void *ThreadPool::worker(void *arg) {// 类型转换ThreadPool* pool = static_cast<ThreadPool*>(arg);// 要一直检查队列里的内容while(true){pthread_mutex_lock(&pool->m_MutexPool);// 判断任务队列是否为空while(pool->taskQueue->getTaskNum() == 0 && !pool->m_ShutDown){// 阻塞工作线程 条件变量pthread_cond_wait(&pool->m_NotEmpty, &pool->m_MutexPool);// 判断是否有要销毁的线程if(pool->m_ExitNum > 0){--pool->m_ExitNum;if(pool->m_LiveNum > pool->m_Min){--pool->m_LiveNum;pthread_mutex_unlock(&pool->m_MutexPool);pool->threadExit();}}}// 判断线程池是否要关闭if(pool->m_ShutDown){// 先解锁后退出pthread_mutex_unlock(&pool->m_MutexPool);pool->threadExit();}// 取任务Task task = pool->taskQueue->takeTask();// busy线程+1++pool->m_BusyNum;// 解锁pthread_mutex_unlock(&pool->m_MutexPool);// 开始工作cout << "thread "<< pthread_self() << " start working...\n";// 任务处理task.func(task.arg);delete task.arg;task.arg = nullptr;// 处理结束cout << "thread "<< pthread_self() << " end working...\n";pthread_mutex_lock(&pool->m_MutexPool);--pool->m_BusyNum;pthread_mutex_unlock(&pool->m_MutexPool);}return nullptr;
}
类型转换说明: 因为传进来的是一个void*
类型的指针, 我们要把它转换为ThreadPool*
类型离开操作
while(true)说明: 工作线程要一直检测任务队列中是否有任务, 只要有任务就要处理, 没有的话就阻塞
管理者线程 manager
管理者的任务主要是按照线程池工作线程的数量和任务数量相应的创建和销毁线程
按照频率检测线程数量
- 把需要的线程数量取出来
- 按规则创建和销毁
具体实现代码:
void *ThreadPool::manager(void *arg) {// 类型转换ThreadPool* pool = static_cast<ThreadPool*>(arg);while(!pool->m_ShutDown){// 按频率3s检测一次sleep(3);// 取出当前任务队列的任务个数 和 当前线程数 和 繁忙的线程数pthread_mutex_lock(&pool->m_MutexPool);int queueSize = pool->taskQueue->getTaskNum();int liveNum = pool->m_LiveNum;int busyNum = pool->m_BusyNum;pthread_mutex_unlock(&pool->m_MutexPool);// 按相应规则创建和销毁线程// 创建线程// 规则: 任务数量 > 线程数量 && 线程数量 < maxconst int NUMBER = 2;if(queueSize > liveNum && liveNum < pool->m_Max){// 创建线程// 在threadIDs数组中找一个可用的空间存放新创建的id// 遍历整个threadID数组, 看哪些可用pthread_mutex_lock(&pool->m_MutexPool);int count = 0;for(int i = 0; i < pool->m_Max && pool->m_LiveNum < pool->m_Max && count < NUMBER; ++i){if(pool->workIDs[i] == 0){ // 空间可用pthread_create(&pool->workIDs[i], nullptr, worker, pool);++count;++pool->m_LiveNum;}}pthread_mutex_unlock(&pool->m_MutexPool);}// 销毁线程// 规则: 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数if(busyNum * 2 < liveNum && liveNum > pool->m_Min){pthread_mutex_lock(&pool->m_MutexPool);pool->m_ExitNum = NUMBER;// 让线程自杀for (int i = 0; i < NUMBER && pool->m_LiveNum > pool->m_Min; ++i) {pthread_cond_signal(&pool->m_NotEmpty);}pthread_mutex_unlock(&pool->m_MutexPool);}}return nullptr;
}
具体的规则已经在代码中指出, 这个规则是可以自己规定的
解释一下让线程自杀的逻辑:
让线程自杀之后, 会调用pthread_cond_signal
唤醒阻塞的worker
线程, 就是上一部分中第10行代码, 这时exitNum不为0, 就会执行第12行的逻辑, 也就是worker线程的这一部分:
// 阻塞工作线程 条件变量
pthread_cond_wait(&pool->m_NotEmpty, &pool->m_MutexPool);
// 判断是否有要销毁的线程
if(pool->m_ExitNum > 0){--pool->m_ExitNum;if(pool->m_LiveNum > pool->m_Min){--pool->m_LiveNum;pthread_mutex_unlock(&pool->m_MutexPool);pool->threadExit();}
}
其余各种锁的逻辑应该都能看懂, 不做过多解释
添加任务
这部分的逻辑相对简单一些
直接看代码:
void ThreadPool::addTask(Task task) {if(m_ShutDown) return;// 添加任务,不需要加锁,任务队列中有锁this->taskQueue->addTask(task);// 唤醒工作线程pthread_cond_signal(&this->m_NotEmpty);
}
获取忙的线程, 活着的线程个数
int ThreadPool::getBusyNum() {int busyNum = 0;pthread_mutex_lock(&m_MutexPool);busyNum = m_BusyNum;pthread_mutex_unlock(&m_MutexPool);return busyNum;
}int ThreadPool::getLiveNum() {int liveNum = 0;pthread_mutex_lock(&m_MutexPool);liveNum = m_LiveNum;pthread_mutex_unlock(&m_MutexPool);return liveNum;
}
线程退出函数
细心的可以看出其他部分代码, 线程退出是使用的pool->threadExit();
, 而不是直接pthread_exit(NULL)
原因主要是, 在线程退出后, 我们还需要把这个线程在workerID中的线程id重置为0, 所以不是单纯的调用pthread_exit(NULL)
这么简单, 看代码:
void ThreadPool::threadExit() {pthread_t pid = pthread_self();for(int i = 0; i < this->m_Max; ++i){if(this->workIDs[i] == pid){cout << "threadExit() function: thread "<< to_string(pthread_self()) << " exiting..." << endl;this->workIDs[i] = 0;break;}}pthread_exit(NULL);
}
运行
所有代码已经准备完毕, 给大家看一下整体的结构
一共5个文件, 2个.h 对应2个 .cpp, 还有一个main文件
- TaskQueue.h文件的结构如下:
- TaskQueue.cpp文件的结构:
- ThreadPool.h文件的结构:
Thread Pool.cpp文件的结构:
把完整的ThreadPool.cpp
代码放在下面:
实现ThreadPool.cpp
// ThreadPool.cpp#include "ThreadPool.h"
#include "pthread.h"
#include <cstring>
#include <unistd.h>ThreadPool::ThreadPool(int min, int max) {do {this->taskQueue = new TaskQueue;if(taskQueue == nullptr){cout << "new taskQueue failed..." << endl;break;}workIDs = new pthread_t[max];if (workIDs == nullptr) {printf("new threadIDs failed...\n");break;}// 将工作线程的id都初始化为0memset(workIDs, 0, sizeof(pthread_t) * max);m_Min = min;m_Max = max;m_BusyNum = 0;m_LiveNum = min;m_ExitNum = 0;// 判断锁和条件变量是否初始化成功if (pthread_mutex_init(&m_MutexPool, nullptr) != 0 ||pthread_cond_init(&m_NotEmpty, nullptr) != 0) {printf("mutex or condition init failed...\n");break;}m_ShutDown = false;// 创建线程// 管理者线程// 将manager和worker函数设置为静态函数的进一步解释// 在使用pthread_create函数创建线程时,该函数的第三个参数是一个函数指针,指向线程所要执行的函数。// 在C++中,非静态成员函数需要一个对象实例才能被调用,而pthread_create函数只接受普通的函数指针,// 因此在这种情况下,通常会将非静态成员函数转换为静态成员函数或全局函数。//// 静态成员函数和全局函数没有与特定对象实例相关联,因此可以直接使用函数指针传递给pthread_create,// 而不需要担心对象的实例。这是因为静态成员函数和全局函数不依赖于特定对象的状态,它们只能访问静态成员或全局变量。pthread_create(&managerId, nullptr, manager, this);// 把this传给manager函数:// 因为manager函数是静态函数, 只能访问类的静态成员变量// 要想访问类的非静态成员变量(函数), 必须把类的实例对象传进去// 工作线程for (int i = 0; i < min; ++i) {pthread_create(&workIDs[i], nullptr, worker, this);}return;} while (0);// do while 外面这些代码都是出了问题才会走到的delete []workIDs;delete taskQueue;return;
}ThreadPool::~ThreadPool() {m_ShutDown = true;// 阻塞回收管理者线程pthread_join(managerId, NULL);// 唤醒消费者线程for(int i = 0; i < m_LiveNum; ++i){pthread_cond_signal(&m_NotEmpty);}// 删除堆内存if(taskQueue) delete taskQueue;if(workIDs) delete []workIDs;pthread_mutex_destroy(&m_MutexPool);pthread_cond_destroy(&m_NotEmpty);}void ThreadPool::addTask(Task task) {if(m_ShutDown) return;// 添加任务,不需要加锁,任务队列中有锁this->taskQueue->addTask(task);// 唤醒工作线程pthread_cond_signal(&this->m_NotEmpty);
}int ThreadPool::getBusyNum() {int busyNum = 0;pthread_mutex_lock(&m_MutexPool);busyNum = m_BusyNum;pthread_mutex_unlock(&m_MutexPool);return busyNum;
}int ThreadPool::getLiveNum() {int liveNum = 0;pthread_mutex_lock(&m_MutexPool);liveNum = m_LiveNum;pthread_mutex_unlock(&m_MutexPool);return liveNum;
}void *ThreadPool::worker(void *arg) {// 类型转换ThreadPool* pool = static_cast<ThreadPool*>(arg);// 要一直检查队列里的内容while(true){pthread_mutex_lock(&pool->m_MutexPool);// 判断任务队列是否为空while(pool->taskQueue->getTaskNum() == 0 && !pool->m_ShutDown){// 阻塞工作线程 条件变量pthread_cond_wait(&pool->m_NotEmpty, &pool->m_MutexPool);// 判断是否有要销毁的线程if(pool->m_ExitNum > 0){--pool->m_ExitNum;if(pool->m_LiveNum > pool->m_Min){--pool->m_LiveNum;pthread_mutex_unlock(&pool->m_MutexPool);pool->threadExit();}}}// 判断线程池是否要关闭if(pool->m_ShutDown){// 先解锁后退出pthread_mutex_unlock(&pool->m_MutexPool);pool->threadExit();}// 取任务Task task = pool->taskQueue->takeTask();// busy线程+1++pool->m_BusyNum;// 解锁pthread_mutex_unlock(&pool->m_MutexPool);// 开始工作cout << "thread "<< pthread_self() << " start working...\n";// 任务处理task.func(task.arg);delete task.arg;task.arg = nullptr;// 处理结束cout << "thread "<< pthread_self() << " end working...\n";pthread_mutex_lock(&pool->m_MutexPool);--pool->m_BusyNum;pthread_mutex_unlock(&pool->m_MutexPool);}return nullptr;
}void *ThreadPool::manager(void *arg) {// 类型转换ThreadPool* pool = static_cast<ThreadPool*>(arg);while(!pool->m_ShutDown){// 按频率3s检测一次sleep(3);// 取出当前任务队列的任务个数 和 当前线程数 和 繁忙的线程数pthread_mutex_lock(&pool->m_MutexPool);int queueSize = pool->taskQueue->getTaskNum();int liveNum = pool->m_LiveNum;int busyNum = pool->m_BusyNum;pthread_mutex_unlock(&pool->m_MutexPool);// 按相应规则创建和销毁线程// 创建线程// 规则: 任务数量 > 线程数量 && 线程数量 < maxconst int NUMBER = 2;if(queueSize > liveNum && liveNum < pool->m_Max){// 创建线程// 在threadIDs数组中找一个可用的空间存放新创建的id// 遍历整个threadID数组, 看哪些可用pthread_mutex_lock(&pool->m_MutexPool);int count = 0;for(int i = 0; i < pool->m_Max && pool->m_LiveNum < pool->m_Max && count < NUMBER; ++i){if(pool->workIDs[i] == 0){ // 空间可用pthread_create(&pool->workIDs[i], nullptr, worker, pool);++count;++pool->m_LiveNum;}}pthread_mutex_unlock(&pool->m_MutexPool);}// 销毁线程// 规则: 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数if(busyNum * 2 < liveNum && liveNum > pool->m_Min){pthread_mutex_lock(&pool->m_MutexPool);pool->m_ExitNum = NUMBER;// 让线程自杀for (int i = 0; i < NUMBER && pool->m_LiveNum > pool->m_Min; ++i) {pthread_cond_signal(&pool->m_NotEmpty);}pthread_mutex_unlock(&pool->m_MutexPool);}}return nullptr;
}void ThreadPool::threadExit() {pthread_t pid = pthread_self();for(int i = 0; i < this->m_Max; ++i){if(this->workIDs[i] == pid){cout << "threadExit() function: thread "<< to_string(pthread_self()) << " exiting..." << endl;this->workIDs[i] = 0;break;}}pthread_exit(NULL);
}
代码里也即是了为什么要把worker和manager函数设置为静态成员函数, 以及为什么要把this指针传给回调函数, 如果不传的话, 静态成员函数是无法访问类中的其他非静态成员变量的
最后看一下运行文件main.cpp
// main.cpp#include "TaskQueue.h"
#include "iostream"
#include "TaskQueue.cpp"
#include "ThreadPool.h"
#include "ThreadPool.cpp"
using namespace std;void func(void* arg){int num = *(int*)arg;cout << "thread " << pthread_self() << " is working, num = " << num << endl;sleep(1);
}int main(){ThreadPool pool(3, 10);for (int i = 0; i < 100; ++i) {int* num = new int(i + 100);pool.addTask(Task(func, num));}sleep(20);return 0;
}
这里主要是写了一个数数的任务函数
运行效果如下图:
但是不知道为什么有的地方会出现bug, 大家有想法的也可以告诉我, 我改正一下