github看到一个项目(GitHub - markparticle/WebServer: C++ Linux WebServer服务器),内部使用的一个线程池看着不错,拿来学习一下。
/** @Author : mark* @Date : 2020-06-15* @copyleft Apache 2.0*/ #ifndef THREADPOOL_H
#define THREADPOOL_H#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <functional>
class ThreadPool {
public:explicit ThreadPool(size_t threadCount = 8): pool_(std::make_shared<Pool>()) {assert(threadCount > 0);for(size_t i = 0; i < threadCount; i++) {std::thread([pool = pool_] {std::unique_lock<std::mutex> locker(pool->mtx);while(true) {if(!pool->tasks.empty()) {auto task = std::move(pool->tasks.front());pool->tasks.pop();locker.unlock();task();locker.lock();} else if(pool->isClosed) break;else pool->cond.wait(locker);}}).detach();}}ThreadPool() = default;ThreadPool(ThreadPool&&) = default;~ThreadPool() {if(static_cast<bool>(pool_)) {{std::lock_guard<std::mutex> locker(pool_->mtx);pool_->isClosed = true;}pool_->cond.notify_all();}}template<class F>void AddTask(F&& task) {{std::lock_guard<std::mutex> locker(pool_->mtx);pool_->tasks.emplace(std::forward<F>(task));}pool_->cond.notify_one();}private:struct Pool {std::mutex mtx;std::condition_variable cond;bool isClosed;std::queue<std::function<void()>> tasks;};std::shared_ptr<Pool> pool_;
};#endif //THREADPOOL_H
1,先看下 私有变量。
struct Pool {std::mutex mtx;std::condition_variable cond;bool isClosed;std::queue<std::function<void()>> tasks;};
std::shared_ptr<Pool> pool_;
一个结构体 Pool 将用到的变量,全部包含进去。
mtx 一个锁,用于加入,取出任务。
cond 信号量,用于线程间同步。
isClosed 线程池退出的标志。
tasks 任务队列,存放所有需要在线程中执行的任务。
pool_ 智能指针,管理所有资源。
2,构造函数
(1),默认开启8个线程(size_t threadCount = 8),并在初始化列表中 初始化pool_对象(std::make_shared<Pool>())
(2),循环启动每个线程
for(){std::thread({//todo}).detach();
}
(3),加锁 确保 ,各个线程对 任务队列(tasks) 不产生竞争,因为添加任务,取任务都要操作这个队列。
std::unique_lock<std::mutex> locker(pool->mtx);
(4),单独看一个线程。
首先判断任务队列是否为空,为空 则利用信号量阻塞当前线程。
else pool->cond.wait(locker);
如果线程池是退出状态,则跳出当前循环,当前线程也会退出。
else if(pool->isClosed) break;
如果任务队列不为空,则取出第一个任务,队列减1,然后解锁,执行任务,再加锁。
auto task = std::move(pool->tasks.front());
pool->tasks.pop();
locker.unlock();
task();
locker.lock();
std::move 用于移动语义,允许在不复制内存的情况下转移资源的所有权。这段代码之后,相当于8个线程,从任务队列tasks 中抢任务,抢到一个执行一个。比如A线程 抢到了任务1,任务1 在执行中过程中,由于队列处于未加锁状态,那B线程,就可以继续抢任务2。
3,添加任务
std::forward ,在一个函数中将参数以原始的形式传递给另一个函数,同时保持其值类别(lvalue 或 rvalue)和 const 修饰符。
这里将task 添加到 任务队列 tasks中。并且利用locker 进行保护。这里添加完成之后,上述的8个线程就会从这个任务队列中抢任务,然后执行。
template<class F>void AddTask(F&& task) {{std::lock_guard<std::mutex> locker(pool_->mtx);pool_->tasks.emplace(std::forward<F>(task));}pool_->cond.notify_one();}
3,销毁
将isClosed 标志置为false,并通知所有线程继续执行,防止因为任务队列为空,造成阻塞无法退出。在上述循环中,检测到isClosed 为fasel,则while循环退出,线程退出。
~ThreadPool() {if(static_cast<bool>(pool_)) {{std::lock_guard<std::mutex> locker(pool_->mtx);pool_->isClosed = true;}pool_->cond.notify_all();}}
5,使用
构造完ThreadPool 之后,直接调用AddTask ,将任务传入其中即可。
std::bind 用于创建一个可调用对象,将其与特定的参数绑定在一起。
在C++11 之下,封装一个线程池 还是很优雅的。