我发现很多博客没有系统的分析过一个线程池, 那么我今天就基于这个github的小例子,彻底的逐行学习一下
https://github.com/progschj/ThreadPool
主函数调用解析:
cpp的主函数比较简单, 创建了一个4个线程的线程池, 然后将8个任务给到线程池的线程执行,执行的函数过程中会打印,最后返回一个index的平方
头文件:
主要的线程池的创建函数和enqueue函数
inline ThreadPool::ThreadPool(size_t threads): stop(false)
{for(size_t i = 0;i<threads;++i)workers.emplace_back([this]{for(;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });if(this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});
}
这段代码是直接将lamda表达式推给线程队列,用于线程执行的对象 , std::thread 的构造函数可以接收一个可调用对象,如函数指针、函数对象、或是一个 lambda 表达式。lamda内部就是执行tasks的任务队列里的任务
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif(stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){ (*task)(); });}condition.notify_one();return res;
}
向任务队列里添加任务
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for(std::thread &worker: workers)worker.join();
}
一般设计的时候, 还可以设计一个dequeue的接口, 用于从future里取结果
这里的
using return_type = typename std::result_of<F(Args...)>::type;
result_of是一个模板类
using 用于创建类型别名
比如
using Integer= int;
Integer num = 10;
exapmle cpp
我改成了如下:
#include <iostream>
#include <vector>
#include <queue>
#include <chrono>
// 包含您真实的ThreadPool头文件路径
#include "ThreadPool.h"int main()
{ThreadPool pool(4);std::queue< std::future<float> > results_1;float j = 0.0;while(1) {j += 0.1;// 提交任务到线程池results_1.push(pool.enqueue([j] {std::cout << "hello 1 " << j << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));return j * j;}));// 检查队列中的第一个future是否已准备好if (!results_1.empty() && pool.FutureIsReady(results_1.front())) {auto re = std::move(results_1.front());results_1.pop();std::cout << "result " << re.get() << ' ';}// 确保我们不会耗尽资源并给予其他future完成的时间// std::this_thread::sleep_for(std::chrono::milliseconds(100));}return 0;
}
可以实现,实时的推任务, 并实时自从future队列检查是否完成了异步的计算
总结:
基于封装好的thread pool头文件, 那么只需要以下步骤可以实现多线程:
- 定义好thread pool的线程个数: ThreadPool pool(4);
- 定义好一个future的队列,类型是task的返回类型: std::queue< std::future > results_1;
- 将future的队列进行任务的添加, 利用线程池的enqueue API,enqueue的是任务,传参可以是函数,lamda表达式, 任务返回 的类型就是future , 其中return type就是函数的返回类型,
- 检查future状态, 获取结果:
// 检查队列中的第一个future是否已准备好
if (!results_1.empty() && pool.FutureIsReady(results_1.front())) {auto re = std::move(results_1.front());results_1.pop();std::cout << "result " << re.get() << ' ';
}