c++ 线程池/Github 开源项目源码分析(progschj/ThreadPool)

c++ 线程池/Github 开源项目源码分析(progschj/ThreadPool)

  • 前言
  • [ThreadPool 项目地址](https://github.com/progschj/ThreadPool)
  • 项目源码:
  • 基本用法
  • 类成员变量
  • 类成员函数
    • 构造函数的签名
    • 创建线程
    • 线程默认的任务
    • 向任务队列中添加一个任务
    • 析构函数
  • 总结

前言

维基百科上对线程池的简要介绍:

线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

ThreadPool 项目地址

progschj/ThreadPool 是一个简易的基于 c++11 标准的线程池实现,采用了 Zlib license(相当宽松自由的开源协议,任意修改分发商用),截止当前时间点,已获得 7k+ stars。整个项目源码仅有一个头文件,代码行数不足一百行,早在多年前就已稳定不再更新。

项目源码:

#ifndef THREAD_POOL_H
#define THREAD_POOL_H#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>class ThreadPool {
public:ThreadPool(size_t);template<class F, class... Args>auto enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();
private:// need to keep track of threads so we can join themstd::vector< std::thread > workers;// the task queuestd::queue< std::function<void()> > tasks;// synchronizationstd::mutex queue_mutex;std::condition_variable condition;bool stop;
};// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads): stop(false)
{for (size_t i = 0; i < threads; ++i)workers.emplace_back([this]{for (;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});
}// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });}condition.notify_one();return res;
}// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread& worker : workers)worker.join();
}#endif

基本用法

// create thread pool with 4 worker threads
ThreadPool pool(4);// enqueue and store future
auto result = pool.enqueue([](int answer) { return answer; }, 42);// get result from future
std::cout << result.get() << std::endl;

类成员变量

std::vector< std::thread > workers;
std::queue< std::function<void()> > tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
  • workers:存储线程池中 std::thread 的容器
  • tasks:任务队列
  • queue_mutex:任务队列的互斥锁
  • condition:任务队列的条件变量
  • stop:线程池是否停止的标志位

类成员函数

构造函数的签名

inline ThreadPool::ThreadPool(size_t threads): stop(false)
  • 构造函数传入一个 size_t 类型的参数,初始化线程池中线程的数量
  • 初始化列表将 stop 标志位初始化为 false

创建线程

for (size_t i = 0; i < threads; ++i)
{workers.emplace_back([this]{for (;;){//...}});
}
  • 使用 for 循环创建 threads 个线程,将线程加入 workers 容器
  • lambda 表达式用于创建线程,捕获 this,lambda 表达式中包含一个无限循环

线程默认的任务

std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();
}task();
  • 首先声明了一个 std::function<void()> 类型的变量 task
  • 在互斥锁保护任务队列后,调用 condition.wait() 等待任务队列非空或线程池停止,线程创建后,会在这里等待;如果 stop 标志位为 true 或者任务队列不为空,解除等待,继续往下执行
  • 如果标志位 stop 为 true,且任务队列为空,此任务将退出
  • 以上条件都通过后,将从任务队列中取出一个任务 task,移动到局部变量 task 中(吐槽下:距离 c++11 标准的发布已经过去了十几年,现在还不明白这一条的,就很难评价了)
  • 执行 task(),也就是上一步从队列头部取出的任务

向任务队列中添加一个任务

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>;
  • enqueue 函数模板,用于任务的入队列
  • F&& f,这里预期是一个任意的 callable 对象
  • Args&&... args,一个可变模板参数,会在编译期展开参数包
  • 返回值是一个 std::future 类型的对象,用于获取任务的执行结果,std::future 的模板参数使用 std::result_of 萃取可调用对象的返回值类型
  • 注意,c++17 后 std::result_of 就已经是 deprecated,可以使用 std::invoke_result 类型萃取

继续往下看 enqueue 函数的实现:

using return_type = typename std::result_of<F(Args...)>::type;
  • 使用 std::result_of 类型萃取可调用对象的返回值类型,并使用 using 为其起个别名 reture_type
auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

这一段做了好几件事,一步一步拆解:

  • std::make_shared 构建一个 std::shared_ptr
  • std::packaged_task 模板是用于包装 callable 对象,使用了前面推导出的 return_type 类型来实例化模板
  • std::make_shared 需要调用实例类型的构造函数,而 std::packaged_task 的构造函数需要一个可调用对象,所以这里使用 std::bind 将可变模板参数绑定给 f(对 std::bind 不熟悉的建议先行查阅资料),std::forward 转发一下类型
  • 简单来说,以上只是构建一个 callable 对象的包装器
std::future<return_type> res = task->get_future();
{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
  • 从 task 中获取 std::future 对象
  • 使用大括号控制代码块,在这个代码块中上锁
  • 如果线程池已经停止,抛出异常
  • 否则正常执行,将 task 推入到队列尾部
  • 条件变量通知一个等待的线程,这个时候,构造函数中 condition.wait() 会被唤醒,以执行后面的代码块,即从队列头部取出一个任务并执行
  • 最后返回 std::future 对象

析构函数

遵循 RAII 原则,释放所有资源

{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;
}
condition.notify_all();
for (std::thread& worker : workers)worker.join();
  • 上锁,将停止标志位置为 true
  • 通知所有等待的线程
  • 等待所有线程终止

总结

该项目仅是一个线程池的简易实现,对学习 c++11 标准的多线程及部分特性有一定帮助,如果想要更复杂的具有各种调度策略的线程池,还需进一步细化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/766834.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

open images v7的600类别名称

英文&#xff1a; 0: Accordion1: Adhesive tape2: Aircraft3: Airplane4: Alarm clock5: Alpaca6: Ambulance7: Animal8: Ant9: Antelope10: Apple11: Armadillo12: Artichoke13: Auto part14: Axe15: Backpack16: Bagel17: Baked goods18: Balance beam19: Ball20: Balloon21…

【Rust】Shared-State Concurrency

Shared-State Concurrency channel类似于single ownership. 而shared memory类似与multiple ownership. multiple ownership是难于管理的. smarter pointer也是multiple ownership的. Rust的type system和ownership rules帮助实现正确的multiple ownership管理。 Using Mute…

百度智能云+SpringBoot=AI对话【人工智能】

百度智能云SpringBootAI对话【人工智能】 前言版权推荐百度智能云SpringBootAI对话【人工智能】效果演示登录AI对话 项目结构后端开发pom和propertiessql_table和entitydao和mapperservice和implconfig和utilLoginController和ChatController 前端开发css和jslogin.html和chat.…

MySQL 8.0-索引- 不可见索引(invisible indexes)

概述 MySQL 8.0引入了不可见索引(invisible index)&#xff0c;这个在实际工作用还是用的到的&#xff0c;我觉得可以了解下。 在介绍不可见索引之前&#xff0c;我先来看下invisible index是个什么或者定义。 我们依然使用拆开来看&#xff0c;然后再把拆出来的词放到MySQL…

kali安装docker(亲测有效)

第一步&#xff1a;添加Docker官方的GPG密钥 curl -fsSL https://download.docker.com/linux/debian/gpg | sudo apt-key add - 第二步&#xff1a; 第二步更新源 echo deb https://download.docker.com/linux/debian stretch stable> /etc/apt/sources.list.d/docker.list…

数据结构——树与二叉树

目录 树与二叉树 1.树的定义 2.树的有关术语 3.二叉树&#xff08;BinaryTree&#xff09; 二叉树的性质&#xff1a; 特殊的二叉树 满二叉树&#xff1a; 完全二叉树 二叉树的存储结构 顺序存储结构 链式存储结构 二叉树以及对应接口的实现 1.二叉树架构搭建 2…

关于 Microsoft Visual Studio

关于 Microsoft Visual Studio References References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/

hive学习记录

问题集合 Q&#xff1a;终端启动hive时报错&#xff1a;/tmp/hive on HDFS should be writable&#xff1f; A&#xff1a;hdfs dfs -chmod 777 /tmp/hive Q&#xff1a;hive&#xff1a; unable to create database path file…错误 A&#xff1a;在hive-site.xml里面添加以…

【机器学习300问】47、如何计算AUC?

一、AUC是什么&#xff1f; &#xff08;1&#xff09;文绉绉的定义 AUCArea Under the Curve中文直译叫“曲线下面积”&#xff0c;AUC名字里面的Curve曲线指的就是ROC曲线&#xff0c;关于ROC曲线的相关知识我已经在之前的文章中详细说过了&#xff0c;有需要的友友可以点击…

CI/CI实战-jenkis结合gitlab 4

实时触发 安装gitlab插件 配置项目触发器 生成令牌并保存 配置gitlab 测试推送 gitlab的实时触发 添加jenkins节点 在jenkins节点上安装docker-ce 新建节点server3 安装git和jdx 在jenkins配置管理中添加节点并配置从节点 关闭master节点的构建任务数

[Java安全入门]六.CC3

一.前言 前几天学了一下cc1和cc6&#xff0c;对于我来说有点小困难&#xff0c;不过经过几天沉淀&#xff0c;现在也是如拨开云雾见青天&#xff0c;经过一上午的复习对cc1和cc6又有深入的了解。所以&#xff0c;今天想多学一下cc3。cc3执行命令的方式与cc1和cc6不一样&#x…

C#基础-标识符命名规则

目录 1、标识符定义 2、遵循规则 3、标识符的例子 4、MSDN中英文解释 英文

Debezium日常分享系列之:Debezium2.5稳定版本之Monitoring

Debezium日常分享系列之&#xff1a;Debezium2.5稳定版本之Monitoring 一、Snapshot metrics二、Streaming metrics三、Schema history metrics Debezium系列之&#xff1a;安装jmx导出器监控debezium指标 除了 Zookeeper、Kafka 和 Kafka Connect 提供的对 JMX 指标的内置支持…

革新水库大坝监测:传统软件与云平台之比较

在水库大坝的监测管理领域&#xff0c;传统监测软件虽然曾发挥了重要作用&#xff0c;但在多方面显示出了其局限性。传统解决方案通常伴随着高昂的运维成本&#xff0c;需要大量的硬件支持和人员维护&#xff0c;且软件整合和升级困难&#xff0c;限制了其灵活性和扩展性。 点击…

Neo4j桌面版导入CVS文件

之后会出来一个提示框&#xff0c;而且会跳出相关文件夹&#xff1a; 然后我们将CSV文件放在此目录下&#xff1a; 我们的relation.csv是这样的 参见&#xff1a; NEO4J的基本使用以及桌面版NEO4J Desktop导入CSV文件_neo4j desktop使用-CSDN博客

C++11:左值与右值|移动构造|移动赋值

​ &#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;マイノリティ脈絡—ずっと真夜中でいいのに。 0:24━━━━━━️&#x1f49f;──────── 4:02 &#x1f504; …

MySQL表内容的增删查改

在前面几章的内容中我们学习了数据库的增删查改&#xff0c;表的增删查改&#xff0c;这一篇我们来学习一下对表中的内容做增删查改。 CRUD : Create(创建), Retrieve(读取)&#xff0c;Update(更新)&#xff0c;Delete&#xff08;删除&#xff09; 1.创建Create 我们先创建…

Zabbix Web界面中文汉化

要想达到上图的效果&#xff0c;第一步先查看 /usr/share/zabbix/assets/fonts/ [rootservice yum.repos.d]# ll /usr/share/zabbix/assets/fonts/ 总用量 0 lrwxrwxrwx. 1 root root 33 3月 23 16:58 graphfont.ttf -> /etc/alternatives/zabbix-web-font 继续查看graph…

基于霍夫检测(hough变换)的人眼瞳孔定位,Matlab实现

博主简介&#xff1a; 专注、专一于Matlab图像处理学习、交流&#xff0c;matlab图像代码代做/项目合作可以联系&#xff08;QQ:3249726188&#xff09; 个人主页&#xff1a;Matlab_ImagePro-CSDN博客 原则&#xff1a;代码均由本人编写完成&#xff0c;非中介&#xff0c;提供…

es 集群开机自动启动

前面搭建了 es 集群&#xff0c;但是每次机器重启 都需要手动启动&#xff0c;很麻烦&#xff0c;所以这里介绍一下开机自动启动 首先使用 root 用户 es &#xff1a; 执行以下命令 vim /etc/init.d/elasticsearch 将以下内容 cv 进去 #!/bin/bash #chkconfig: 345 63 …