点击蓝字
关注我们
前面的文章在手把手教你写 C 语言线程池中,已经实现了 C 语言版的线程池,如果我们也学过 C++ 的话,可以将其改为 C++ 版本,这样代码不管是从使用还是从感观上都会更简洁一些。
对这些代码做从 C 到 C++ 的迁移主要用到了 C++ 三大特性中的封装,因此难度不大,对应 C++ 初学者来说有助于提高编码水平和对面向对象的理解,对于熟练掌握了 C++ 的人来说就是张飞吃豆芽 -- 小菜一碟(so easy)。
关于线程的在此就不再过多阐述,对于前面文章中设计的线程池,按照面向对象的思想进行拆分可以分为两部分(纯属个人见解,有不同的想法也正常):任务队列类 和线程池类。
一、任务队列
1、类声明
// 定义任务结构体
using callback = void(*)(void*);
struct Task
{Task(){function = nullptr;arg = nullptr;}Task(callback f, void* arg){function = f;this->arg = arg;}callback function;void* arg;
};// 任务队列
class TaskQueue
{
public:TaskQueue();~TaskQueue();// 添加任务void addTask(Task& task);void addTask(callback func, void* arg);// 取出一个任务Task takeTask();// 获取当前队列中任务个数inline int taskNumber()
{return m_queue.size();}private:pthread_mutex_t m_mutex; // 互斥锁std::queue<Task> m_queue; // 任务队列
};
其中 Task 是任务类,里边有两个成员,分别是两个指针 void(*)(void*) 和 void*
另外一个类 TaskQueue 是任务队列,提供了添加任务、取出任务、存储任务、获取任务个数、线程同步的功能。
2、类定义
TaskQueue::TaskQueue()
{pthread_mutex_init(&m_mutex, NULL);
}TaskQueue::~TaskQueue()
{pthread_mutex_destroy(&m_mutex);
}void TaskQueue::addTask(Task& task)
{pthread_mutex_lock(&m_mutex);m_queue.push(task);pthread_mutex_unlock(&m_mutex);
}void TaskQueue::addTask(callback func, void* arg)
{pthread_mutex_lock(&m_mutex);Task task;task.function = func;task.arg = arg;m_queue.push(task);pthread_mutex_unlock(&m_mutex);
}Task TaskQueue::takeTask()
{Task t;pthread_mutex_lock(&m_mutex);if (m_queue.size() > 0){t = m_queue.front();m_queue.pop();}pthread_mutex_unlock(&m_mutex);return t;
}
二、线程池
1、类声明
class ThreadPool
{
public:ThreadPool(int min, int max);~ThreadPool();// 添加任务void addTask(Task task);// 获取忙线程的个数int getBusyNumber();// 获取活着的线程个数int getAliveNumber();private:// 工作的线程的任务函数static void* worker(void* arg);// 管理者线程的任务函数static void* manager(void* arg);void threadExit();private:pthread_mutex_t m_lock;pthread_cond_t m_notEmpty;pthread_t* m_threadIDs;pthread_t m_managerID;TaskQueue* m_taskQ;int m_minNum;int m_maxNum;int m_busyNum;int m_aliveNum;int m_exitNum;bool m_shutdown = false;
};
2、类定义
ThreadPool::ThreadPool(int minNum, int maxNum)
{// 实例化任务队列m_taskQ = new TaskQueue;do {// 初始化线程池m_minNum = minNum;m_maxNum = maxNum;m_busyNum = 0;m_aliveNum = minNum;// 根据线程的最大上限给线程数组分配内存m_threadIDs = new pthread_t[maxNum];if (m_threadIDs == nullptr){cout << "malloc thread_t[] 失败...." << endl;;break;}// 初始化memset(m_threadIDs, 0, sizeof(pthread_t) * maxNum);// 初始化互斥锁,条件变量if (pthread_mutex_init(&m_lock, NULL) != 0 ||pthread_cond_init(&m_notEmpty, NULL) != 0){cout << "init mutex or condition fail..." << endl;break;}/// 创建线程 //// 根据最小线程个数, 创建线程for (int i = 0; i < minNum; ++i){pthread_create(&m_threadIDs[i], NULL, worker, this);cout << "创建子线程, ID: " << to_string(m_threadIDs[i]) << endl;}// 创建管理者线程, 1个pthread_create(&m_managerID, NULL, manager, this);} while (0);
}ThreadPool::~ThreadPool()
{m_shutdown = 1;// 销毁管理者线程pthread_join(m_managerID, NULL);// 唤醒所有消费者线程for (int i = 0; i < m_aliveNum; ++i){pthread_cond_signal(&m_notEmpty);}if (m_taskQ) delete m_taskQ;if (m_threadIDs) delete[]m_threadIDs;pthread_mutex_destroy(&m_lock);pthread_cond_destroy(&m_notEmpty);
}void ThreadPool::addTask(Task task)
{if (m_shutdown){return;}// 添加任务,不需要加锁,任务队列中有锁m_taskQ->addTask(task);// 唤醒工作的线程pthread_cond_signal(&m_notEmpty);
}int ThreadPool::getAliveNumber()
{int threadNum = 0;pthread_mutex_lock(&m_lock);threadNum = m_aliveNum;pthread_mutex_unlock(&m_lock);return threadNum;
}int ThreadPool::getBusyNumber()
{int busyNum = 0;pthread_mutex_lock(&m_lock);busyNum = m_busyNum;pthread_mutex_unlock(&m_lock);return busyNum;
}// 工作线程任务函数
void* ThreadPool::worker(void* arg)
{ThreadPool* pool = static_cast<ThreadPool*>(arg);// 一直不停的工作while (true){// 访问任务队列(共享资源)加锁pthread_mutex_lock(&pool->m_lock);// 判断任务队列是否为空, 如果为空工作线程阻塞while (pool->m_taskQ->taskNumber() == 0 && !pool->m_shutdown){cout << "thread " << to_string(pthread_self()) << " waiting..." << endl;// 阻塞线程pthread_cond_wait(&pool->m_notEmpty, &pool->m_lock);// 解除阻塞之后, 判断是否要销毁线程if (pool->m_exitNum > 0){pool->m_exitNum--;if (pool->m_aliveNum > pool->m_minNum){pool->m_aliveNum--;pthread_mutex_unlock(&pool->m_lock);pool->threadExit();}}}// 判断线程池是否被关闭了if (pool->m_shutdown){pthread_mutex_unlock(&pool->m_lock);pool->threadExit();}// 从任务队列中取出一个任务Task task = pool->m_taskQ->takeTask();// 工作的线程+1pool->m_busyNum++;// 线程池解锁pthread_mutex_unlock(&pool->m_lock);// 执行任务cout << "thread " << to_string(pthread_self()) << " start working..." << endl;task.function(task.arg);delete task.arg;task.arg = nullptr;// 任务处理结束cout << "thread " << to_string(pthread_self()) << " end working...";pthread_mutex_lock(&pool->m_lock);pool->m_busyNum--;pthread_mutex_unlock(&pool->m_lock);}return nullptr;
}// 管理者线程任务函数
void* ThreadPool::manager(void* arg)
{ThreadPool* pool = static_cast<ThreadPool*>(arg);// 如果线程池没有关闭, 就一直检测while (!pool->m_shutdown){// 每隔5s检测一次sleep(5);// 取出线程池中的任务数和线程数量// 取出工作的线程池数量pthread_mutex_lock(&pool->m_lock);int queueSize = pool->m_taskQ->taskNumber();int liveNum = pool->m_aliveNum;int busyNum = pool->m_busyNum;pthread_mutex_unlock(&pool->m_lock);// 创建线程const int NUMBER = 2;// 当前任务个数>存活的线程数 && 存活的线程数<最大线程个数if (queueSize > liveNum && liveNum < pool->m_maxNum){// 线程池加锁pthread_mutex_lock(&pool->m_lock);int num = 0;for (int i = 0; i < pool->m_maxNum && num < NUMBER&& pool->m_aliveNum < pool->m_maxNum; ++i){if (pool->m_threadIDs[i] == 0){pthread_create(&pool->m_threadIDs[i], NULL, worker, pool);num++;pool->m_aliveNum++;}}pthread_mutex_unlock(&pool->m_lock);}// 销毁多余的线程// 忙线程*2 < 存活的线程数目 && 存活的线程数 > 最小线程数量if (busyNum * 2 < liveNum && liveNum > pool->m_minNum){pthread_mutex_lock(&pool->m_lock);pool->m_exitNum = NUMBER;pthread_mutex_unlock(&pool->m_lock);for (int i = 0; i < NUMBER; ++i){pthread_cond_signal(&pool->m_notEmpty);}}}return nullptr;
}// 线程退出
void ThreadPool::threadExit()
{pthread_t tid = pthread_self();for (int i = 0; i < m_maxNum; ++i){if (m_threadIDs[i] == tid){cout << "threadExit() function: thread " << to_string(pthread_self()) << " exiting..." << endl;m_threadIDs[i] = 0;break;}}pthread_exit(NULL);
}
*声明:本文于网络整理,版权归原作者所有,如来源信息有误或侵犯权益,请联系我们删除或授权事宜。
戳“阅读原文”我们一起进步