C++简易线程池

原理说明:

1. 线程池创建时,指定线程池的大小thread_size。当有新的函数任务通过函数addFunction ()添加进来后,其中一个线程执行函数。一个线程一次执行一个函数。如果函数数量大与线程池数量,则后来的函数等待。

2. 线程池内部有个容器m_functions 来存储待执行的函数。函数执行后从队列中移除。

3.  stopAll()函数会停止线程池。

ThreadPool.h

//ThreadPool.h
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>class ThreadPool {
public:static ThreadPool* getInstance(size_t thread_size = 1);		//默认线程池大小void addFunction(std::function<void()> task);				//添加需要执行的函数~ThreadPool();void stopAll(bool immediately);						//停止线程池, immediately:true立即停止, immediately:false等待当前线程函数执行完后停止。
private:ThreadPool(size_t thread_size);private:void workerThreadHandler();std::vector<std::thread> m_workers;					//线程容器std::queue<std::function<void()>> m_functions;			//待执行的函数容器std::mutex m_queue_mutex;std::condition_variable m_condition;bool m_stop;										//线程池停止状态std::thread m_wakeTimerThread;std::mutex m_timer_mutex;
};

ThreadPool.cpp

#include "ThreadPool.h"
#include <chrono>
#include <memory>
#include <thread>
#include <pthread.h>
#include <iostream>
using namespace std;ThreadPool* ThreadPool::getInstance(size_t thread_size)
{static std::mutex m_lock;static std::shared_ptr<ThreadPool> m_instance=nullptr;if (nullptr == m_instance){m_instance.reset(new ThreadPool(thread_size));}return m_instance.get();
}
ThreadPool::ThreadPool(size_t thread_size) : m_stop(false){m_workers.reserve(thread_size);for (size_t i=0; i<thread_size; ++i){m_workers.emplace_back([this, i](){	//创建线程池中的线程workerThreadHandler();});}//辅助线程,每隔一段时间发送一次唤醒,防止线程阻塞m_wakeTimerThread = std::thread([this](){	for(;!this->m_stop;){std::unique_lock<std::mutex> lock(this->m_timer_mutex);std::this_thread::sleep_for(std::chrono::milliseconds(2000));pthread_testcancel();m_condition.notify_all();std::cout<<"wake up"<<std::endl;}});std::cout<<__func__<<std::endl;
}//线程循环函数,循环查询函数容器是否为空,不为空则读取一个函数并执行。
void ThreadPool::workerThreadHandler()	
{for (;!this->m_stop;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->m_queue_mutex);std::cout<<"tasks begin size:"<<this->m_functions.size()<<" stop:"<<m_stop<<std::endl;if (!m_stop && m_functions.empty()){this->m_condition.wait(lock);}pthread_testcancel();	//作为线程的终止点if (this->m_stop){return;}if (this->m_functions.empty()){continue;}task = std::move(this->m_functions.front());this->m_functions.pop();std::cout<<std::this_thread::get_id() <<" tasks end size:"<<this->m_functions.size()<<std::endl;}task();std::cout<<__func__<<" end task"<<std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(200));}
}void ThreadPool::addFunction(std::function<void()> task)	//添加待执行的函数
{std::unique_lock<std::mutex> lock(m_queue_mutex);if (m_stop){return;}m_functions.emplace(std::move(task));m_condition.notify_one();
}ThreadPool::~ThreadPool()
{{std::unique_lock<std::mutex> lock(m_queue_mutex);m_stop = true;}m_condition.notify_all();for (std::thread &worker: m_workers){worker.join();}m_wakeTimerThread.join();std::cout<<__func__<<std::endl;
}void ThreadPool::stopAll(bool immediately)	//停止线程, immediately:true立即停止
{this->m_stop = true;if (immediately){for (std::thread &worker: m_workers){pthread_cancel(worker.native_handle());}pthread_cancel(m_wakeTimerThread.native_handle());}
}

测试程序main.cpp

#include <iostream>
#include <chrono>
#include <mutex>
#include "ThreadPool.h"using namespace std;static std::mutex m_mutex;
void ProcessFunc111()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc222()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc333()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc444()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc555()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc666()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout<<__func__<<" end"<<std::endl;
}int main()
{ThreadPool::getInstance(5);ThreadPool::getInstance()->addFunction([](){ProcessFunc111();});ThreadPool::getInstance()->addFunction([](){ProcessFunc222();});ThreadPool::getInstance()->addFunction([](){ProcessFunc333();});ThreadPool::getInstance()->addFunction([](){ProcessFunc444();});ThreadPool::getInstance()->addFunction([](){ProcessFunc555();});ThreadPool::getInstance()->addFunction([](){ProcessFunc666();});getchar();return 0;
}

执行结果:

tasks begin size:0 stop:0
ThreadPooltasks begin size:
0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
140563906656000 tasks end size:2
ProcessFunc111 begin
140563898263296 tasks end size:4
ProcessFunc222 begin
140563881477888 tasks end size:3
ProcessFunc333 begin
140563743504128 tasks end size:2
ProcessFunc444 begin
140563889870592 tasks end size:1
ProcessFunc555 begin
wake up
ProcessFunc111 end
workerThreadHandler end task
tasks begin size:1 stop:0
140563906656000 tasks end size:0
ProcessFunc666 begin
ProcessFunc222 end
wake up
workerThreadHandler end task
ProcessFunc333ProcessFunc555ProcessFunc444 endendworkerThreadHandler end taskworkerThreadHandler end taskend
workerThreadHandler end task
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
wake up
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
ProcessFunc666 end
workerThreadHandler end task
tasks begin size:0 stop:0
wake up
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
...

测试程序2,调用线程池停止程序

#include <iostream>
#include <chrono>
#include <mutex>
#include "ThreadPool.h"using namespace std;static std::mutex m_mutex;
void ProcessFunc111()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc222()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc333()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc444()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc555()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc666()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout<<__func__<<" end"<<std::endl;
}int main()
{ThreadPool::getInstance(5);ThreadPool::getInstance()->addFunction([](){ProcessFunc111();});ThreadPool::getInstance()->addFunction([](){ProcessFunc222();});ThreadPool::getInstance()->addFunction([](){ProcessFunc333();});ThreadPool::getInstance()->addFunction([](){ProcessFunc444();});ThreadPool::getInstance()->addFunction([](){ProcessFunc555();});ThreadPool::getInstance()->addFunction([](){ProcessFunc666();});std::this_thread::sleep_for(std::chrono::seconds(1));std::cout<<"stop all "<<std::endl;ThreadPool::getInstance()->stopAll(false);getchar();return 0;
}

执行结果:

tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:ThreadPool0 stop:0tasks begin size:0 stop:0
140190941017856 tasks end size:5
ProcessFunc111 begin
140190932625152 tasks end size:4
ProcessFunc222 begin
140190966195968 tasks end size:3
ProcessFunc333 begin
140190949410560 tasks end size:2
ProcessFunc444 begin
140190957803264 tasks end size:1
ProcessFunc555 begin
stop all
wake up
ProcessFunc111 end
workerThreadHandler end task
ProcessFunc444 end
workerThreadHandler end task
ProcessFunc333 end
workerThreadHandler end task
ProcessFunc222 end
workerThreadHandler end task
ProcessFunc555 end
workerThreadHandler end task

测试程序3,立即停止线程池

#include <iostream>
#include <chrono>
#include <mutex>
#include "ThreadPool.h"using namespace std;static std::mutex m_mutex;
void ProcessFunc111()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc222()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc333()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc444()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc555()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout<<__func__<<" end"<<std::endl;
}void ProcessFunc666()
{std::cout<<__func__<<" begin"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout<<__func__<<" end"<<std::endl;
}int main()
{ThreadPool::getInstance(5);ThreadPool::getInstance()->addFunction([](){ProcessFunc111();});ThreadPool::getInstance()->addFunction([](){ProcessFunc222();});ThreadPool::getInstance()->addFunction([](){ProcessFunc333();});ThreadPool::getInstance()->addFunction([](){ProcessFunc444();});ThreadPool::getInstance()->addFunction([](){ProcessFunc555();});ThreadPool::getInstance()->addFunction([](){ProcessFunc666();});std::this_thread::sleep_for(std::chrono::seconds(1));std::cout<<"stop all "<<std::endl;ThreadPool::getInstance()->stopAll(true);getchar();return 0;
}

执行结果:

tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
ThreadPool
139831215929088 tasks end size:5
ProcessFunc111 begin
tasks begin size:5 stop:0
139831199143680 tasks end size:4
ProcessFunc222 begin
139831224321792 tasks end size:3
ProcessFunc333 begin
139831207536384 tasks end size:2
ProcessFunc444 begin
139831232714496 tasks end size:1
ProcessFunc555 begin
stop all

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/599834.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生活中的物理3——神奇陷阱(随机倒下的抽屉柜门)

1实验 材料&#xff1a;大自然&#xff08;风&#xff09;、抽屉门松掉的抽屉 实验 1、找一个大风的日子&#xff0c;打开窗户&#xff08;不要找下雨天&#xff0c;不然你会被你亲爱的嫲嫲KO&#xff09; 2、让风在抽屉面前刮过 3、你发现了什么&#xff1f;&#xff1f;&…

Baumer工业相机堡盟工业相机如何联合NEOAPI SDK和OpenCV实现相机图像转换为Mat图像格式(C#)

Baumer工业相机堡盟工业相机如何通过NEOAPI SDK实现相机掉线自动重连&#xff08;C#&#xff09; Baumer工业相机Baumer工业相机的图像转换为OpenCV的Mat图像的技术背景在NEOAPI SDK里实现相机图像转换为Mat图像格式联合OpenCV实现相机图像转换为Mat图像格式测试演示图 工业相机…

【软件系统架构设计】期末复习题目汇总:简答+应用

电子科技大学软件系统架构设计2023年秋期末考试复习题目汇总 目录 系统分析与设计概述 面向对象建模语言 系统规划 系统需求分析 系统架构设计 软件建模详细设计 设计模式 用户界面设计 系统分析与设计概述 信息系统的 6 种类型&#xff0c;举例说明&#xff1f; 信息…

创建一个starter项目

创建一个starter项目&#xff0c;需要引入坐标 <!-- 自动配置 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId> </dependency>引入这个依赖后&#xff0c;可以使用…

出版实务 | 数字内容加工与产品制作

文章目录 数字内容加工纸质图书数字化加工流程和要求加工流程元数据加工内容结构化加工内容要素的加工成品数据的构成 数字内容图书的加工 数字内容标引数字出版产品制作数字产品制作流程专题数据库的制作流程 质量控制数字内容加工质量控制数字产品制作质量控制 本篇博文根据圣…

【Linux Shell】7. printf 命令

文章目录 【 1. printf 命令的使用方法 】【 2. 实例 】 【 1. printf 命令的使用方法 】 printf 命令模仿 C 程序库&#xff08;library&#xff09;里的 printf() 程序&#xff0c;printf 由 POSIX 标准所定义&#xff0c;因此使用 printf 的脚本比使用 echo 移植性好。prin…

docker容器启动etcd3.5

目录 环境&#xff1a;(window11) 1、配置本地docker镜像地址配置成国内源&#xff1a; 1.1 docker-Desktop里面&#xff0c;增加这个部分内容&#xff1a; 1.2 修改docker的daemon.json文件一个效果&#xff1a; 2、dockerfile文件内容&#xff1a; 1、提前下载etcd3.5的…

AIGC初探:提示工程 Prompt Engineering

简介 提升工程是什么 提示工程&#xff08;Prompt Engineering&#xff09;是人工智能领域中的一个概念&#xff0c;特别是在自然语言处理&#xff08;NLP&#xff09;领域中。它是一种通过设计和优化输入提示来提高AI模型表现的方法。 对于基于转换器的大型语言模型&#x…

Wireshark本地回环网络抓包

背景 因为发往本机的数据包是通过回环地址的&#xff0c;即&#xff1a;数据包不会通过真实的网络接口发送&#xff0c;因此我们需要通过设置路由规则来让本来发到虚拟网络接口的数据包发送到真实网络接口即可。 场景描述&#xff1a;在网络程序开发的过程中&#xff0c;有时…

计算机Java项目|SpringBoot+Vue实现的在线考试系统

项目编号&#xff1a;L-BS-KS-02 一&#xff0c;环境介绍 语言环境&#xff1a;Java: jdk1.8 数据库&#xff1a;Mysql: mysql5.7 应用服务器&#xff1a;Tomcat: tomcat8.5.31 开发工具&#xff1a;IDEA或eclipse 二&#xff0c;项目简介 基于SpringBootVue的在线考试…

电子元器件选型与实战应用—07 二极管选型与应用第2篇

文章目录 一、稳压二极管1.1 原理1.2 参数解析1.3 稳压管选型案例1.3.1 工作原理1.3.2 仿真1.3.3 限流电阻确定二、LED二极管2.1 介绍2.2 案例介绍2.2.1 问题描述2.2.2 电路设计前文推荐: 电子元器件选型与实战应用—06 二极管选型与应用第1篇</

SQL效率-查询条件需避免使用函数处理索引字段

一个sql效率的问题 问题 假设created_at 是date类型、是索引&#xff0c;那么以下2种方式有没效率差异&#xff1a; WHERE TO_CHAR(created_at, ‘YYYY-MM-DD’) ‘2020-02-01’WHERE created_at TO_DATE(‘2020-02-01’ , ‘YYYY-MM-DD’) DBA回复 有的&#xff0c;第一…

牢牢把握“心价比”,徕芬的业绩爆发是一种必然?

回顾徕芬的2023年 &#xff0c;战果颇为丰硕&#xff1a;上半年就完成2022年全年的销售额&#xff0c;同比增长245%&#xff1b;用户增长超500万&#xff1b;多次取得线上销售量份额第一…… 虽然业绩突破背后也有消费复苏的激励作用&#xff0c;但具体到电吹风市场&#xff0…

一篇文章认识微服务中Eureka的原理和服务注册与发现

目录 1、认识Eureka 2、Eureka原理 2.1 和Dubbo架构对比&#xff1a; 2.2 三大角色 3、微服务常见的注册中心 3.1 Zookeeper 3.2 Eureka 3.3 Consul 3.4 Nacos 3.5 区别 Netflix 在设计Eureka 时&#xff0c;遵循的就是AP原则。 CAP原则又称CAP定理&#xff0c;指的…

SSM在线手机品牌商城----计算机毕业设计

项目介绍 该项目为前后台项目&#xff0c;分为普通用户与管理员两种角色&#xff0c;前台普通用户登录&#xff0c;后台管理员登录&#xff1b; 管理员角色包含以下功能&#xff1a; 管理员登录,用户管理,品牌管理,子品牌管理,商品管理,订单管理,留言板管理等功能。 用户角…

Eclipse先关的一些配置

启动配置设置 配置项详细说明&#xff1a; -Xms&#xff1a;初始堆内存大小&#xff0c;设定程序启动时占用内存大小&#xff0c;默认物理内存1/64 -Xms -XX:InitialHeapSiz-Xmx&#xff1a;最大堆内存&#xff0c;设定程序运行期间最大可占用的内存大小。如果程序运行需要…

博客摘录「 什么是QPS、TPS、吞吐量?- 高并发名词概念」2024年1月5日

1.什么是高并发&#xff1f; 高并发&#xff08;High Concurrency&#xff09;。通常是指系统在短时间内的大量操作。 高并发相关的常见指标有&#xff1a;QPS、TPS、吞吐量、并发数等。 2.QPS&#xff08;Query Per Second&#xff09; QPS每秒查询率&#xff0c;是指系统…

KK集团高管变更:陈世欣任总经理,涉无证放贷遭关注,还曾被处罚

近日&#xff0c;KK集团关联公司广东快客电子商务有限公司&#xff08;下称“KK集团”&#xff09;发生工商变更&#xff0c;其中郭惠波不再担任该公司总经理一职&#xff0c;由陈世欣接任。而在早前&#xff0c;陈世欣曾于2020年取代吴悦宁担任总经理职务&#xff0c;2021年7月…

上帝视角俯视工厂设计模式

引言 本篇聊聊设计模式中的简单工厂、工厂方法、抽象工厂设计模式&#xff0c;争取在看完这篇后不会再傻傻分不清以及能够应用在实际项目中 背景 以一个咱们都熟悉的场景举个例子&#xff0c;我们平时都会戴口罩&#xff0c;用来过滤一些普通病毒&#xff0c;大致的设计如下…

C++矩阵例题分析(3):螺旋矩阵

一、审题 时间限制&#xff1a;1000ms 内存限制&#xff1a;256MB 各平台平均AC率&#xff1a;14.89% 题目描述 输出一个n*n大小的螺旋矩阵。 螺旋矩阵的样子&#xff1a; 输入描述 共一行&#xff0c;一个正整数n&#xff0c;表示矩阵变长的长度…