C++ 11线程池 ThreadPool

线程池 ThreadPool

半同步半异步线程池(简略版)C++11实现,详细解析

同步队列

SynchronousQueue.hpp

#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>
using namespace std;template<typename T>
class SyncQueue
{
private:std::list<T> m_queue;std::mutex m_mutex;std::condition_variable m_notEmpty;std::condition_variable m_notFull;int m_maxSize;bool m_needStop; // stop flag
public:SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false){}void Put(const T& x){Add(x);	}void Put(T&& x){Add(std::forward<T>(x));}void Take(std::list<T>& list){std::unique_lock<std::mutex> locker(m_mutex);m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });if (m_needStop){return;}list = std::move( m_queue);m_notEmpty.notify_one();	}void Take(T& t){std::unique_lock<std::mutex> locker(m_mutex);m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });if (m_needStop)return;t = m_queue.front();m_queue.pop_front();m_notFull.notify_one();}void Stop(){{std::lock_guard<std::mutex> locker(m_mutex);m_needStop = true;}m_notFull.notify_all();m_notEmpty.notify_all();}bool Empty(){std::lock_guard<std::mutex> locker(m_mutex);return  m_queue.empty();}bool Full(){std::lock_guard<std::mutex> locker(m_mutex);return  m_queue.size() == m_maxSize;}size_t Size(){std::lock_guard<std::mutex> locker(m_mutex);return  m_queue.size();}int Count(){return  m_queue.size();}
private:bool NotFull() const{bool full = (m_queue.size() >= m_maxSize);if(full){cout << "缓冲区满了" << endl;}return !full;}bool NotEmpty() const{bool empty = m_queue.empty();if (empty)cout << "缓冲区空了,需要等待 IDP:"<<std::this_thread::get_id() << endl;return !empty;}template<typename  F>void Add(F&&x){std::unique_lock<std::mutex> locker(m_mutex);m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });if (m_needStop){return;}m_queue.push_back(std::forward<F>(x));m_notEmpty.notify_one();}};
  1. Take函数解析:

    void Take(std::list<T>& list){std::unique_lock<std::mutex> locker(m_mutex);m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });if (m_needStop){return;}list = std::move( m_queue);m_notEmpty.notify_one();}
    

    创建一个unique_lock获取mutex然后通过m_notEmpty等待判断式 ,{return m_needStop || NotEmpty(); }判断式由两个部分组成,一个是停止的标志,另一个是不为空的条件

    • 当两个条件都不满足时,条件变量会释放mutex然后等待waiting条件满足直到其他线程通知notify_one()或者notify_all()

    • 当满足m_needStop时,通过判断式结束函数

    • 当满足NotEmpty()时,用一个队列将等待队列中的全部任务取出

      释放锁,并唤醒一个线程去添加任务

  2. Add函数

    template<typename  F>void Add(F&&x){std::unique_lock<std::mutex> locker(m_mutex);m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });if (m_needStop){return;}m_queue.push_back(std::forward<F>(x));m_notEmpty.notify_one();}
    

    与上一个相似

    • 如果满足条件,就执行以下逻辑,退出函数或者是添加新元素,然后唤醒取任务的线程
    1. Stop()函数

      void Stop(){{std::lock_guard<std::mutex> locker(m_mutex);m_needStop = true;}m_notFull.notify_all();m_notEmpty.notify_all();}
      

      Stop函数先获取mutex,然后将停止标志置为true,注意为了保证线程安全这里需要先获取锁,并将m_needStop标志为true再唤醒所有等待线程

线程池

一个完整的线程池包括三层

  • 同步服务层
  • 排队层
  • 异步服务层

这也是一种生产者–消费者模式,同步层是生产者,不断将新任务丢到排队层中,因此线程池需要提供一个添加新任务的接口供生产者使用;

消费者是异步层,由线程池中与预先创建好的线程去处理同步队列中的任务;

排队层:同步队列,保证生产者,消费者对任务正常的访问,同时还要限制任务的数量,防止无限的任务被添加进来

除此之外,还要求用户能控制线程池的开启和停止,让我们在需要的时候开启/停止线程池

如下实现:

ThreadPool.hpp

#include <list>
#include <thread>
#include <functional>
#include <memory>
#include <atomic>
#include "SynchronousQueue.hpp"constexpr  int MaxTaskCount = 100;class ThreadPool
{
public:using  Task = std::function<void()>;ThreadPool(int numThreads = std::thread::hardware_concurrency()) :m_queue(MaxTaskCount){Start(numThreads);}~ThreadPool(){//如果没有停止就主动停止线程池Stop();}void Stop(){std::call_once(m_flag, [this] {StopThreadGroup(); });}void AddTask(Task&& task){m_queue.Put(std::forward<Task>(task));}void AddTask(const Task& task){m_queue.Put(task);}void Start(int numThreads){m_running = true;for (int i = 0; i<numThreads;i++){m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));}}void RunInThread(){while (m_running){std::list<Task> list;m_queue.Take(list);for (auto& task : list){if (!m_running){return;}task();}}}void StopThreadGroup(){m_queue.Stop();m_running = false;for (auto thread : m_threadgroup){if (thread)thread->join();}m_threadgroup.clear();}private:std::list<std::shared_ptr<std::thread>> m_threadgroup;   // 处理任务的线程组SyncQueue<Task> m_queue;                                  // 同步队列atomic_bool m_running;std::once_flag m_flag;};
  1. ThreadPool(int numThreads = std::thread::hardware_concurrency())构造函数用硬件支持的线程数初始化总线程数

    然后Start(numThreads)m_running标志置为true,在线程池中放入numThreads个线程的指针

  2. AddTask由测试程序向同步队列中添加任务(函数)

  3. RunInThread()取出任务并执行

测试程序实现

main.cpp

#include "ThreadPool.hpp"
void TestThdPool()
{ThreadPool pool;pool.Start(2);std::thread thd1([&pool]{for (int i = 0; i < 10; ++i){auto thdId = this_thread::get_id();pool.AddTask([thdId] {cout << "同步线程1的线程ID" << thdId << endl;});}});std::thread thd2([&pool]{for (int i = 0; i < 10; ++i){auto thdId = this_thread::get_id();pool.AddTask([thdId]{cout << "同步层线程的线程ID:" << thdId << endl;});}});this_thread::sleep_for(std::chrono::seconds(2));getchar();pool.Stop();thd1.join();thd2.join();
}int main()
{TestThdPool();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/408121.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ITPro Magazine2006年第6期发布

下载地址[url]http://www.cnfan.net/magazine/itpromagazine200606.rar[/url]主题企划Windows Fundamentals For Legacy PCs深度解析&#xff1a;Windows Vista RC1的五大特性系统应用Win2003平台php环境架设Xen3安装及使用(On Fedora core5)ISA Server 2004 ***应用网络与安全…

abiword class list

Class List【转自官方文档】 Here are the classes, structs, unions and interfaces with brief descriptions:GR_CharWidths::_a _AbiCellRendererFont _AbiCellRendererFontClass _AbiControlGUI _AbiFontCombo _AbiFontComboClass _AbiPrivData _AbiTable _AbiWidget _AbiW…

网络资源的初始化与释放(C++ RAII惯用法)

1. 网络资源的初始化与释放(C RAII惯用法) C RAII 惯用法 RAII (Resource Acquisition Is Initialization)资源获取即初始化 我们拿到资源的时候就已经初始化,一旦不需要该资源,该资源就会被释放 资源: 在 C 的语境下&#xff0c;资源代表一些可以必须先被获取才能使用的对…

美国Palmbeach大学服务器整合改造案例

位于美国佛罗里达州的palmbeach大学&#xff0c;有4万9千多在校学生和2000多名教工。据学校的信息主管t parziale介绍&#xff0c;目前学校正在进行一个投资160多万美元的关于信息中心服务器、存储、网络等3部分整合改造并简化管理的项目。 该项目主要改造内容是&#xff1a;用…

避免switch嵌套的一种方法

避免switch嵌套的一种方法 DWORD nFlags 0;switch (mouse.nButton){case 0: //左键nFlags 1;case 1: //右键nFlags 2;break;case 2: //中间nFlags 4;break;case 4:nFlags 8;break;}if (nFlags ! 8)SetCursorPos(mouse.ptXY.x, mouse.ptXY.y);switch (mouse.nAction){case…

Enterprise Library 2.0 Hands On Lab 翻译(3):数据访问程序块(三)

练习3&#xff1a;加密数据库连接信息通过该练习&#xff0c;你将学会如何去加密数据库连接信息。第一步打开DataEx3.sln项目&#xff0c;默认的安装路径应该为C:\Program Files\Microsoft Enterprise Library January 2006\labs\cs\Data Access\exercises\ex03\begin&#xff…

操作系统进程学习(Linux 内核学习笔记)

操作系统进程学习(Linux 内核学习笔记) 进程优先级 并非所有进程都具有相同的重要性。除了大多数我们所熟悉的进程优先级之外&#xff0c;进程还有不同的关键度类别&#xff0c;以满足不同需求。首先进程比较粗糙的划分&#xff0c;进程可以分为实时进程 和非实时进程&#x…

gcc对C语言的扩展:语句内嵌表达式(statement-embedded expression)

在gnu c 中&#xff0c;用括号将复合语句括起来也形成了表达式。他允许你在一个表达式内使用循环&#xff0c;跳转和局部变量。一个复合语句是用大括号{}括起来的一组语句。在包含语句的表达式这种结构中&#xff0c;再用括号( )将大括号括起来,例如&#xff1a;({ int y foo …

react学习(56)--常见HTTP错误

200: 服务器成功返回请求的数据。,201: 新建或修改数据成功。,202: 一个请求已经进入后台排队&#xff08;异步任务&#xff09;。,204: 删除数据成功。,400: 发出的请求有错误&#xff0c;服务器没有进行新建或修改数据的操作。,401: 用户没有权限&#xff08;令牌、用户名、密…

C#二叉树递归实现

二叉树类(binaryTree.cs) using System; namespace binary_tree_demo { class BinaryTreeNode where T : IComparable { public BinaryTreeNode() { left null; right null; } public BinaryTreeNode(BinaryTreeNode l, BinaryTreeNode r) { left l; right r; } public Bin…

Linux守护进程的创建(结合nginx框架)

Linux守护进程的创建(结合nginx框架) 先介绍几个相关函数: int dup2(arg1,arg2):参数一指向的内容赋给参数二,shi的参数二也能访问参数一所指向的内容,并返回新的描述符 int fork()创建子进程,返回值-1:创建失败 返回值0:子进程 返回其他:父进程 setsid()调用成功后&#x…

用aspnet_compiler发布网站 (转载:My way of my life )

在asp.net 2.0模型中&#xff0c;vs2005已经完全脱离了编译而成为了一个彻底的ide.算是一个不小的改动。其中更是取消了有关Web Application的概念&#xff0c;使得习惯了vs2003的人刚开始的时候会有一些摸不着头脑。下面简单说一下我在使用过程中自己总结的&#xff0c;算是一…

react学习(57)--map赋值

<Radio.Group>{linksList?.map((item) > (<Radio key{item.key} value{item.key}>{item.value}</Radio>))}</Radio.Group>

使用 dojo/query

在本篇文章中&#xff0c;我们将了解DOM的查询以及如何运用dojo/query这个模块来轻松地选择节点并操作他们。 入门指南 在操作DOM的过程中&#xff0c;如何快速高效地检索出DOM节点显得相当重要。我们在Dojo DOM Functions中已经熟悉了 dom.byId&#xff0c;然而&#xff0c;在…

【Linux内核】虚拟地址空间布局架构

虚拟地址空间布局架构(Linux内核学习) 1.Linux内核整体架构及子系统 内核对下管理硬件,对上通过运行时库对应用提供服务 用户空间 使用malloc()分配内存通过free()释放内存 内核空间 虚拟进程负责从进程的虚拟地址空间分配虚拟页,sys_brk来扩大或收缩堆,sys_mmap负责在内存映…

天凉了,大家多穿衣服

这两天天气转凉&#xff0c;我还穿夏天的衬衫&#xff0c;结果今晚回来发现喉咙不舒服&#xff0c;只好去买药了。大家要保重身体呀&#xff01;