缓冲式线程池C++简易实现

前言 :

代码也比较短,简单说一下代码结构,是这样的:


SyncQueue.hpp封装了一个大小为MaxTaskCount的同步队列,这是一个模板类,它在线程池中承担了存放任务等待线程组中的线程来执行的角色。最底层是std::list<T>但是我们起名为m_queue,因为我们使用list模拟了队列,这里使用std::vector是不合适的,我们要频繁插入(添任务)删除(取任务),开销太大,具体了解一下std::vector底层。

我们还加了一个m_mtx互斥锁,因为我们要保证对于队列的访问是线程安全的,但是因为使用unique_lock对于m_mtx要修改,所以我们加了mutable关键字(mutable保证我们可以在常方法中修改类的非静态成员)。

        std::condition_variable m_notEmpty;
        std::condition_variable m_notFull;

他俩是条件变量是用来同步队列任务是否为空为满的。

Add()是添加任务调用的底层函数,我们对它做了一个封装,分别适合左值和右值:

        int Put(const T& task)
        {
            return Add(task);
        }
        int Put(T&& task)
        {
            return Add(std::forward<T>(task));
        } 

 同理Take也是两个。

WaitQueueEmptyStop()是后期添加的一个函数,比如在我们添加任务结束后,线程来执行任务,但是此时主线程准备结束,我们调用析构线程池对象的析构函数,它最终会调用这个函数判断任务队列中是否还有任务,如果不空,那么我就弃锁睡眠1秒,循环往复,直到队列为空。


 CachedThreadPool.hpp是线程池的代码,Task是可调用对象的包装器,上述任务队列中放的就是Task,我们添加的也是Task,执行的也是Task。底层封装了上述的任务队列。

我们使用一个

std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> m_threadgroup;

来存储某个thread的id 和管理它的共享性智能指针,方便我们到了KeepAliveTimes秒删除它,这是也是我们起名为CachedThreadPool的原因,它是一个缓冲型线程池,线程数量是浮动的,受制于两个原子变量的限制,让线程数不至于太少无法执行任务,不至于太多而空闲:

        std::atomic<int> m_idleThreadSize;
        std::atomic<int> m_curThreadSize;

构造函数中我们Start()开两个线程,线程的入口函数是CachedThreadPool::RunInThread(),让他们去检测是否有任务,有就task();反之陷入m_queue.Take(task)。 

析构函数中我们调用StopThreadGroup(),它会先调用任务队列的WaitQueueEmptyStop()确保任务队列为空,然后使用一个range-for结束掉线程组:
            for (auto& x : m_threadgroup)
            {
                if (x.second->joinable())
                {
                    x.second->join();
                }
            }

还有一个重要的成员函数:

        template <class Func, class...  Args>
        void AddTask(Func&& func, Args&&... args)
        {
            auto task = std::make_shared<std::function<void()> >(
                std::bind(std::forward<Func>(func),
                    std::forward<Args>(args)...));

            if (m_queue.Put([task]() { (*task)(); }) != 0)
            {
                cerr << "not add task queue... " << endl;
                (*task)();
            }
            AddnewThread();
        }

这是一个模板函数,用来添加没有返回值的任务到任务队列中,使用了引用性别未定义,可变模板参数,bind,完美转发,lambda表达式,智能指针,这个函数的成型颇具困难。

        template <class Func, class... Args>
        auto submit(Func&& func, Args &&...args)
        {
            using RetType = decltype(func(args...));
            auto task = std::make_shared<std::packaged_task<RetType()>>(
                std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
            std::future<RetType> result = task->get_future();

            if (m_queue.Put([task]()
                { (*task)(); }) != 0)
            {
                cerr << "not add task queue... " << endl;
                (*task)();
            }
            AddnewThread();
            return result;
        }

 这是一个模板函数,用来添加带有返回值的任务到任务队列。


main.cpp中有一个有意思的问题,可以探讨一下:

std::shared_ptr<FILE> pf(fopen("test.txt", "w"));

//std::shared_ptr<FILE> pf(fopen("test.txt", "w"),fileDeleter());

main.cpp是我们的测试文件,我们想把测试结果打印到文件中,方便观察,我们刚开始使用了一个裸的FILE* fp;然后在main函数结束之后fclose(fp); fp = nullptr;

结果我们跑不通,最终发现是线程池对象析构之前的这两行代码会被执行,文件被关闭,我们无法将结果写入文件中。

最后我们使用了shared_ptr<FILE>来管理文件,但是写成了第一行,我们发现往屏幕上写结果正确但是写文件不行,而且往屏幕上写总是在线程池对象完美析构,一切安顿好之后出现异常,这令人困惑,我以为是我们线程的释放还做得有问题,谁曾想是这个指针坏事了,pf对象作为一个全局对象,析构于局部的线程池对象之后,此时它会调用它的默认删除器,也就是delete,这显然是不行的,文件指针应该fclose才对,所以我们写了一个自定义的删除器来改正错误。

struct fileDeleter {
    void operator()(FILE* fp)const {
        if (fp) {
            cout << "file close\n";
            fclose(fp);
        }
    }
}; 

类似错误还有很多,比如之前的:

红框中我们错写为了:m_queue.pop_back();

这个错误虽然看似简单,实则复杂,因为打印出来结果很流畅,但是值不对,后来人肉debug发现这个错误后令我哭笑不得,好家伙,每次拿走一个任务,竟然pop末尾的未被执行的任务..........

多线程代码调试我还不擅长,任重而道远,但我喜欢De这些有趣的bug! 


 代码总体逻辑类似于生产者-消费者模型,这也是OS学习的经典中的经典,线程池基本就是从这里扩展而来,又分化为了各种类别的线程池,定长的,缓冲的,窃取的.................

源代码:

main.cpp 

#define _CRT_SECURE_NO_WARNINGS// C++ STL
#include <thread>
#include <iostream>
using namespace std;
#include"CachedThreadPool.hpp"
struct fileDeleter {void operator()(FILE* fp)const {if (fp) {cout << "file close\n";fclose(fp);}}
};
class Int
{
private:int value;
public:Int(int x = 0) : value(x){cout << "create Int " << value << endl;}~Int(){cout << "destroy Int: " << value << endl;}Int(const Int& it) :value(it.value){cout << "Copy Int " << value << endl;}Int(Int&& it) :value(it.value){cout << "Move create Int : " << value << endl;it.value = -1;}Int& operator=(const Int& it){if (this != &it){value = it.value;}cout << "operator=" << endl;return *this;}Int& operator=(Int&& it){if (this != &it){value = it.value;it.value = -1;}cout << "operator=(Int &&)" << endl;return *this;}Int& operator++(){value += 1;return *this;}Int operator++(int){Int tmp = *this;++* this;return tmp;}bool operator<(const int x) const{return this->value < x;}ostream& operator<<(ostream& out) const {return out << value;}
};
ostream& operator<<(ostream& out, const Int& it)
{return it << out;
}std::mutex mtx;
std::shared_ptr<FILE> pf(fopen("test.txt", "w"),fileDeleter());
void print(Int x)
{std::lock_guard<std::mutex> lock(mtx);//cout << "print x: " << &x << " " << x << endl;fprintf(pf.get(), "print x : %d => &x: %p \n", x, &x);std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
}
void funa(myspace::CachedThreadPool& mypool)
{double dx = 1;for (Int i = 0; i < 100; ++i){mypool.AddTask(print, i);}cout << "funa ... end " << endl;
}int main()
{myspace::CachedThreadPool mypool;std::thread tha(funa, std::ref(mypool));tha.join();return 0;
}

SyncQueue.hpp


// C++ STL
#include <list>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <iostream>
using namespace std;#ifndef SYNC_QUEUE_HPP
#define SYNC_QUEUE_HPP
static const int MaxTaskCount = 2000;
namespace myspace
{template <class T>class SyncQueue{private:std::list<T> m_queue;mutable std::mutex m_mtx;std::condition_variable m_notEmpty;std::condition_variable m_notFull;std::condition_variable m_waitStop;int m_maxSize;int m_waitTime;bool m_needStop;bool IsFull() const{bool full = m_queue.size() >= m_maxSize;if (full){cerr << "m_queue full .. wait .. " << endl;}return full;}bool IsEmpty() const{bool empty = m_queue.empty();if (empty){cerr << "m_queue empty...  wait ... " << endl;}return empty;}template <class F>int Add(F&& task){std::unique_lock<std::mutex> lock(m_mtx);if (!m_notFull.wait_for(lock, std::chrono::seconds(m_waitTime),[this](){ return m_needStop || !IsFull(); })){return 1; // 任务队列已达上限}if (m_needStop){return 2; // 任务队列stop;}m_queue.push_back(std::forward<F>(task));m_notEmpty.notify_all();return 0;}public:SyncQueue(int maxsize = MaxTaskCount, int timeout = 1): m_maxSize(maxsize), m_needStop(false), m_waitTime(timeout){}~SyncQueue(){if (!m_needStop){Stop();}}int Put(const T& task){return Add(task);}int Put(T&& task){return Add(std::forward<T>(task));}int Take(std::list<T>& tlist){std::unique_lock<std::mutex> lock(m_mtx);if (!m_notEmpty.wait_for(lock, std::chrono::seconds(m_waitTime),[this]() {  return m_needStop || !IsEmpty(); })){return 1; // timeout;}if (m_needStop){return 2;}tlist = std::move(m_queue);m_notFull.notify_all();return 0;}int Take(T& task){std::unique_lock<std::mutex> lock(m_mtx);if (!m_notEmpty.wait_for(lock, std::chrono::seconds(m_waitTime),[this]() {  return m_needStop || !IsEmpty(); })){return 1; // timeout;}if (m_needStop){return 2;}task = m_queue.front();m_queue.pop_front();m_notFull.notify_all();return 0;}void WaitQueueEmptyStop(){std::unique_lock<std::mutex> locker(m_mtx);while (!IsEmpty()){m_waitStop.wait_for(locker, std::chrono::seconds(1));}m_needStop = true;m_notFull.notify_all();m_notEmpty.notify_all();}void Stop(){{std::unique_lock<std::mutex> lock(m_mtx);m_needStop = true;}m_notEmpty.notify_all();m_notFull.notify_all();}bool Empty() const{std::unique_lock<std::mutex> lock(m_mtx);return m_queue.empty();}bool Full() const{std::unique_lock<std::mutex> lock(m_mtx);return m_queue.size() >= m_maxSize;}size_t Size() const{std::unique_lock<std::mutex> lock(m_mtx);return m_queue.size();}size_t Count() const{return m_queue.size();}};
}
#endif

CachedThreadPool.hpp

//C API
#include<time.h>
//OWN
#include "SyncQueue.hpp"
// C++ STL
#include <functional>
#include <unordered_map>
#include <map>
#include <future>
#include <atomic>
#include <memory>
#include <thread>
using namespace std;
#ifndef CACHED_THREAD_POOL_HPP
#define CACHED_THREAD_POOL_HPPnamespace myspace
{static const int InitThreadNums = 2;static const time_t KeepAliveTimes = 5;class CachedThreadPool{public:using Task = std::function<void(void)>;private:std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> m_threadgroup;int m_coreThreadSize;int m_maxThreadSize;std::atomic<int> m_idleThreadSize;std::atomic<int> m_curThreadSize;std::mutex m_mutex;myspace::SyncQueue<Task> m_queue;std::atomic<bool> m_running;std::once_flag m_flag;void RunInThread(){auto tid = std::this_thread::get_id();time_t startTime = time(nullptr);while (m_running){Task task;if (m_queue.Size() == 0){time_t  curnow = time(nullptr);time_t intervalTime = curnow - startTime;if (intervalTime >= KeepAliveTimes && m_curThreadSize > m_coreThreadSize){m_threadgroup.find(tid)->second->detach();m_threadgroup.erase(tid);m_curThreadSize -= 1;m_idleThreadSize -= 1;cerr << "delete idle thread tid: " << tid << endl;cerr << "idle thread num: " << m_idleThreadSize << endl;cerr << "cur thread num: " << m_curThreadSize << endl;return;}}if (!m_queue.Take(task) && m_running){m_idleThreadSize -= 1;task();m_idleThreadSize += 1;startTime = time(nullptr);}}}void Start(int numthreas){m_running = true;m_curThreadSize = numthreas;for (int i = 0; i < numthreas; ++i){auto tha = std::make_shared<std::thread>(&CachedThreadPool::RunInThread, this);std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_idleThreadSize += 1;}}void StopThreadGroup(){m_queue.WaitQueueEmptyStop();m_running = false;for (auto& x : m_threadgroup){if (x.second->joinable()){x.second->join();}}m_threadgroup.clear();}void AddnewThread(){if (m_idleThreadSize <= 0 && m_curThreadSize < m_maxThreadSize){auto tha = std::make_shared<std::thread>(&CachedThreadPool::RunInThread, this);std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_idleThreadSize += 1;m_curThreadSize += 1;cerr << "AddnewThread id: " << tid << endl;cerr << "m_curThreadSize: " << m_curThreadSize << endl;}}public:CachedThreadPool(int initNumThreads = InitThreadNums, int taskQueueSize = MaxTaskCount): m_coreThreadSize(initNumThreads),m_maxThreadSize(std::thread::hardware_concurrency() + 1),m_idleThreadSize(0),m_curThreadSize(0),m_queue(taskQueueSize),m_running(false){Start(m_coreThreadSize);}~CachedThreadPool(){if (m_running){Stop();}}void Stop(){std::call_once(m_flag, [this](){ StopThreadGroup(); });}template <class Func, class...  Args>void AddTask(Func&& func, Args&&... args){auto task = std::make_shared<std::function<void()> >(std::bind(std::forward<Func>(func),std::forward<Args>(args)...));if (m_queue.Put([task]() { (*task)(); }) != 0){cerr << "not add task queue... " << endl;(*task)();}AddnewThread();}template <class Func, class... Args>auto submit(Func&& func, Args &&...args){using RetType = decltype(func(args...));auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));std::future<RetType> result = task->get_future();if (m_queue.Put([task](){ (*task)(); }) != 0){cerr << "not add task queue... " << endl;(*task)();}AddnewThread();return result;}};
}#endif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/60649.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity资源打包Addressable AA包

从零到一 很多资料都是通过一步步设置讲解的&#xff0c;有时很想先快速实现&#xff0c;再了解细节。 下面就是远程加载Cube.prefab然后实例化简单的代码。 代码中可以不需要远程的网址&#xff0c;不需要资源下载的位置&#xff0c;不需要判断是否已经下载到本地。 那是如…

MySQL之索引(2)(B树、B+树、索引分类、聚集索引、二级索引、回表查询)

目录 一、B树结构索引&#xff08;B-树&#xff09; &#xff08;1&#xff09;特点。 &#xff08;2&#xff09;问题&#xff1a;范围查询效率&#xff1f;&#xff1f; &#xff08;3&#xff09;缺点。 1、查询的不稳定性。 2、各叶子节点无联系。 3、IO资源的消耗较多。 二…

翼鸥教育:从OceanBase V3.1.4 到 V4.2.1,8套核心集群升级实践

引言&#xff1a;自2021年起&#xff0c;翼鸥教育便开始应用OceanBase社区版&#xff0c;两年间&#xff0c;先后部署了总计12套生产集群&#xff0c;其中核心集群占比超过四分之三&#xff0c;所承载的数据量已突破30TB。自2022年10月&#xff0c;OceanBase 社区发布了4.2.x 版…

ubuntu使用DeepSpeech进行语音识别(包含交叉编译)

文章目录 前言一、DeepSpeech编译二、DeepSpeech使用示例三、核心代码分析1.创建模型核心代码2.识别过程核心代码 四、交叉编译1.交叉编译2.使用 总结 前言 由于工作需要语音识别的功能&#xff0c;环境是在linux arm版上&#xff0c;所以想先在ubuntu上跑起来看一看&#xff…

Go语言入门教案

文章目录 一、教学目标二、教学重难点&#xff08;一&#xff09;重点&#xff08;二&#xff09;难点 三、教学方法四、教学过程&#xff08;一&#xff09;Go语言简介&#xff08;二&#xff09;环境搭建1. 下载和安装Go语言开发环境2. 配置Go语言环境变量3. 命令行查看Go语言…

普通人如何做好AI数字人直播带货月入10W?

在科技飞速发展的今天&#xff0c;AI数字人直播正以惊人的速度崛起&#xff0c;为直播领域带来了一场前所未有的变革。那到底AI数字人直播前景怎么样&#xff0c;是怎样一个形式&#xff0c;普通人能够利用Ai数字人直播赚取到收益吗&#xff1f; 首先讲到AI数字人直播很多人想的…

飞牛私有云访问外网

飞牛私有云 fnOS NAS 是一款有着卓越的性能以及强大的兼容性和智能化的管理界面&#xff0c;它之所以能在 NAS 市场中脱颖而出&#xff0c;是因为 fnOS 基于最新的 Linux 内核&#xff08;Debian发行版&#xff09;深度开发&#xff0c;不仅兼容主流 x86 硬件&#xff0c;还支持…

论文 | The Capacity for Moral Self-Correction in LargeLanguage Models

概述 论文探讨了大规模语言模型是否具备“道德自我校正”的能力&#xff0c;即在收到相应指令时避免产生有害或偏见输出的能力。研究发现&#xff0c;当模型参数达到一定规模&#xff08;至少22B参数&#xff09;并经过人类反馈强化学习&#xff08;RLHF&#xff09;训练后&…

计算机毕业设计Python+大模型农产品推荐系统 农产品爬虫 农产品商城 农产品大数据 农产品数据分析可视化 PySpark Hadoop

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

一文窥见神经网络

一文窥见神经网络 1.初识神经元1.1 生物神经元1.2 人工神经元1.3 权重的作用1.4 偏置的作用1.5 激活函数的作用1.5.1 线性激活函数1.5.2 非线性激活函数 2. 神经元模型2.1 多输入单神经元模型2.2 一层神经元模型2.3 神经网络&#xff08;多层神经元&#xff09;模型 3. 神经网络…

【视觉SLAM】2-三维空间刚体运动的数学表示

读书笔记&#xff1a;学习空间变换的三种数学表达形式。 文章目录 1. 旋转矩阵1.1 向量运算1.2 坐标系空间变换1.3 变换矩阵与齐次坐标 2. 旋转向量和欧拉角2.1 旋转向量2.2 欧拉角 3. 四元数 1. 旋转矩阵 1.1 向量运算 对于三维空间中的两个向量 a , b ∈ R 3 a,b \in \R^3 …

shell 100例

1、每天写一个文件 (题目要求&#xff09; 请按照这样的日期格式(xxxx-xx-xx每日生成一个文件 例如生成的文件为2017-12-20.log&#xff0c;并且把磁盘的使用情况写到到这个文件中不用考虑cron&#xff0c;仅仅写脚本即可 [核心要点] date命令用法 df命令 知识补充&#xff1…

[Python学习日记-66] 多态与多态性

[Python学习日记-66] 多态与多态性 简介 多态 多态性 鸭子类型 简介 多态与多态性都是面向对象的特征之一&#xff0c;它们都是面向对象编程的一个重要概念&#xff0c;在 Python 当中也有一些独特的见解和用法&#xff0c;下面我们一起来了解一下是怎么回事吧。 多态 多态…

Linux基础1

Linux基础1 Linux基础1学习笔记 ‍ 声明&#xff01; ​​​学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下&#xff0c;如涉及侵权马上删除文章 笔记只是方便各位师傅的学习和探讨&#xff0c;文章所提到的网站以及内容&#xff0c;只做学习交流&#xff0c;其他…

GESP4级考试语法知识(贪心算法(二))

排队接水2代码&#xff1a; #include<iostream> #include<cstdio> #include<algorithm> using namespace std; struct people {int num;int time; }; people s[1001]; int n,r,a[1001]; double sum,ave; bool cmp(people x,people y) {return x.time<y.t…

MySQL45讲 第二十讲 幻读是什么,幻读有什么问题?

文章目录 MySQL45讲 第二十讲 幻读是什么&#xff0c;幻读有什么问题&#xff1f;一、幻读的定义二、幻读带来的问题&#xff08;一&#xff09;语义问题&#xff08;二&#xff09;数据一致性问题 三、InnoDB 解决幻读的方法四、总结 MySQL45讲 第二十讲 幻读是什么&#xff0…

【再谈设计模式】建造者模式~对象构建的指挥家

一、引言 在软件开发的世界里&#xff0c;创建对象是一项基本且频繁的操作。然而&#xff0c;当对象的构造变得复杂&#xff0c;涉及众多属性和初始化步骤时&#xff0c;传统的构造函数方式往往会让代码陷入混乱的泥沼。就如同搭建一座复杂的建筑&#xff0c;若没有合理的规划和…

三、模板与配置(下)

三、模板与配置 8、WXSS模板样式-全局样式和局部样式 类型说明适用情景注意点全局样式定义在 app.wxss 中的样式&#xff0c;作用于每一个页面。当有一些通用的样式规则需要应用于整个小程序时&#xff0c;比如全局的字体大小、颜色、布局等。全局样式可能会被局部样式覆盖&a…

SQL面试题——抖音SQL面试题 主播播出时长

主播播出时长 现有如下数据,主播id、房间号、播出的批次号,每个批次号进出房间的时间戳、分区时间: 每一次直播都有一个上播和下播,每个房间里,同一个批次号会有两条数据,分别记录了上播和下播时间,求每个主播的播出时长? 通过上面的数据,可以清晰的看出,同一个批次…

大语言模型LLM综述

一、LM主要发展阶段 1.1、统计语言模型SLM 基于统计学习方法&#xff0c;基本思想是基于马尔可夫假设HMM建立词概率预测模型。如n-gram语言模型 1.2、神经语言模型NLM 基于神经网络来做词的分布式表示。如word2vec模型 1.3、 预训练语言模型PLM 预训练一个网络模型来做词表…