1. 线程池要求
我们创建线程池的目的本质上是用空间换取时间,而我们选择于 C++ 的类内包装原生线程库的形式来创建,其具体实行逻辑如图
可以看到,整个线程池其实就是一个大型的 CP 模型,接下来我们来完成它
2. 整体模板
3. 具体实现
#pragma once#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>// 存储线程池中各个线程信息
struct ThreadInfo
{pthread_t tid;std::string name;
};// 默认线程池容量
static const int defalutnum = 5;template<class T>
class ThreadPool
private:// 加锁void Lock(){pthread_mutex_lock(&mutex_);}// 释放锁void Unlock(){pthread_mutex_unlock (&mutex_);}// 唤醒线程void Wakeup(){pthread_cond_signal(&cond_);}// 资源不就绪, 线程同步void ThreadSleep(){pthread_cond_wait(&cond_, &mutex_);}// 对当前任务列表判空bool IsQueueEmpty(){return tasks_.size() == 0 ? true : false;}// 获取线程 namestd::string GetThreadName(pthread_t tid){for (const auto &ti : threads_){if (ti.tid == tid)return ti.name;}return "None";}
public:ThreadPool(int num = defalutnum):threads_(num){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&cond_, nullptr);}// 由于 pthread_create 函数的特殊性// 只能将 HandlerTask 设置为静态函数// 同时将 this 指针以参数的形式传入static void *HandlerTask(void *args){ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);while (true){// 确保同一时刻只有一个线程在进行消费tp->Lock();// 如果当前任务列表为空就让线程等待资源// 为防止伪唤醒的情况, 使用 whilewhile (tp->IsQueueEmpty()){tp->ThreadSleep();}T t = tp->Pop();tp->Unlock();// 执行任务// 注: 需要任务中重载 operator()t();}}// 启动线程池void Start(){int num = threads_.size();for (int i = 0; i < num; i++){threads_[i].name = "thread-" + std::to_string(i+1);pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);}}// Pop 函数在锁内执行因此不需要单独加锁T Pop(){T t = tasks_.front();tasks_.pop();return t;}// 将外界资源投入线程池void Push(const T &t){Lock();tasks_.push(t);Wakeup(); // 投入资源成功后唤醒线程 Unlock();}~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}
private:std::vector<ThreadInfo> threads_; // 线程池中所有线程的信息std::queue<T> tasks_; // 任务队列pthread_mutex_t mutex_; // 互斥锁pthread_cond_t cond_; // 条件变量
4. 使用测试
在这里我们引用一个 Task.hpp 的任务工具,如下
#pragma once#include "Log.hpp"
#include <iostream>
#include <string>std::string opers = "+-*/%";enum ErrorCode
};class Task
public:Task(int x, int y, char op):a(x), b(y), op_(op) {}void operator()(){run();lg(Info, "run a task: %s", GetResult().c_str());}void run(){switch(op_){case '+':answer = a + b;break;case '-':answer = a - b;break;case '*':answer = a * b;break;case '/':if (b != 0) answer = a / b;else exitcode = DevideZero;break;case '%':if (b != 0) answer = a % b;else exitcode = ModZero;break;default:lg(Error, "Using correct operation: + - * / %");exitcode = Unknown;break;}}std::string GetTask(){std::string ret;ret += "Task: ";ret += std::to_string(a);ret += " ";ret += op_;ret += " ";ret += std::to_string(b);ret += " ";ret += "= ?";return ret;}std::string GetResult(){std::string ret;if (exitcode <= Unknown){ret += "run the task fail, reason: ";switch (exitcode){case DevideZero:ret += "DevideZero";break;case ModZero:ret += "ModZero";break;case Unknown:ret += "Unknown";break;default:break;}}else{ret += "run the task success, result: ";ret += std::to_string(a);ret += " ";ret += op_;ret += " ";ret += std::to_string(b);ret += " ";ret += "= ";ret += std::to_string(answer);}return ret;}~Task(){}private:int a;int b;char op_;int answer = 0;ErrorCode exitcode = Normal;
main 函数如下
#include <iostream>
#include <time.h>
#include "ThreadPool.hpp"
#include "Task.hpp"extern std::string opers;int main()
{ThreadPool<Task>* tp = new ThreadPool<Task>(5);tp->Start();srand(time(nullptr)^ getpid());while(true){//1.构建任务int x = rand() % 10 + 1;usleep(10);int y =rand() % 5;char op = opers[rand()%opers.size()];Task t(x, y, op);tp->Push(t);// 2.交给线程池处理lg(Info, "main thread make task: %s", t.GetTask().c_str());sleep(1);}return 0;
1. 创建一个固定容量的进程池
2. 调用 Start() 函数启动进程池
3. 调用 Push() 函数向进程池中添加任务