C++使用线程池模拟异步事件处理机制

  在C++很多框架中都有异步事件处理机制,这导致我们在看源码时经常很疑惑,难以理解,而其中包含的编程套路可能是一些成熟的技术,只是我们不熟悉,比如WebRTC中类似于Qt的信号槽机制,线程事件处理, 或者使用系统异步IO等等,如果看不懂这些套路,理解代码会很难,本篇博客来尝使用用C++线程池实现一种异步事件处理机制。

异步事件处理机制的基本实现

  C++可以使用std::future和std::promise来实现异步操作。然而,为了实现一个异步事件绑定的框架,我们需要更复杂的设计。下面是一个简单的例子,说明如何实现一个异步事件处理器。

  首先,定义一个事件处理器类,该类将接收并处理事件:

class EventHandler {
public:virtual ~EventHandler() = default;virtual void handleEvent(int eventID) = 0;
};

  然后,我们需要创建一个事件分发器,它将异步地调用事件处理器:

/*事件注册,分发*/#pragma once#include "EventHandler.hpp"
#include <map>
#include <thread>
#include <future>
#include <functional>
#include <memory>class EventDispatcher {
public:// 注册事件处理器void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {handlers[eventID] = handler;}// 异步事件分发函数void postEvent(int eventID) {auto it = handlers.find(eventID);if (it != handlers.end()) {std::thread eventThread(&EventDispatcher::dispatchEvent, this, it->second, eventID);eventThread.detach();}}private:// 事件分发函数void dispatchEvent(std::shared_ptr<EventHandler> handler, int eventID) {handler->handleEvent(eventID);}private:std::map<int, std::shared_ptr<EventHandler>> handlers;  // 存储事件,int 事件id, std::shared_ptr<EventHandler> 事件处理器
};

  在这个例子中,EventDispatcher类的postEvent方法接收一个事件ID,并在新线程中调用相应的事件处理器。这样做可以实现事件的异步处理。

  然后,你可以创建一个或多个处理器类,比如下面的打印事件处理器PrintEventHandler ,它实现EventHandler接口,

/*具体的事件处理器*/#include "EventHandler.hpp"
#include <iostream>using namespace std;class PrintEventHandler : public EventHandler {
public:void handleEvent(int eventID) override {std::cout << "Handling event " << eventID << std::endl;}
};

  然后再main函数中进行注册:

/*C++异步事件框架demo01*/#include <iostream>
#include <memory>
#include <thread>
#include <chrono>
#include "EventDispatcher.hpp"
#include "PrintEventHandler.hpp"int main() {EventDispatcher dispatcher;std::shared_ptr<EventHandler> printHandler = std::make_shared<PrintEventHandler>();dispatcher.registerHandler(1, printHandler);dispatcher.postEvent(1);// Sleep main thread to let the event thread finish.std::this_thread::sleep_for(std::chrono::seconds(1));return 0;
}

运行结果:

Handling event 1

代码组织如下,有兴趣的可以自行编写实现:
在这里插入图片描述

cmake脚本

#[[编译方法cmake -S . -B buildcd buildmake./demo01]]cmake_minimum_required(VERSION 3.20)project(demo01)set(INCLUDE_PATH1  "./")# 添加头文件目录
include_directories(${INCLUDE_PATH1}
)# 添加子目录src
aux_source_directory("./" SRC)add_executable(demo01 ${SRC})

  这个实现是非常基础的,并没有考虑到线程安全问题和异常处理等等。在实际的项目中,你需要更复杂的设计,并使用更高级的并发编程技术,如线程池、任务队列、互斥锁等等。

添加线程池、任务队列

  如果想要更复杂的设计,包括线程池、任务队列、互斥锁等,你可以考虑使用以下的设计。下面的例子使用了C++17的std::async和std::future来实现线程池和任务队列。

  首先,我们需要一个线程安全的任务队列:

#pragma once#include <queue>
#include <mutex>
#include <condition_variable>template <typename T>
class ThreadSafeQueue {
public:ThreadSafeQueue() = default;ThreadSafeQueue(const ThreadSafeQueue<T> &) = delete;ThreadSafeQueue& operator=(const ThreadSafeQueue<T> &) = delete;void push(T value) {std::lock_guard<std::mutex> lock(mMutex);mQueue.push(std::move(value));mCondition.notify_one();}bool try_pop(T& value) {std::lock_guard<std::mutex> lock(mMutex);if (mQueue.empty()) {return false;}value = std::move(mQueue.front());mQueue.pop();return true;}void wait_and_pop(T& value) {std::unique_lock<std::mutex> lock(mMutex);mCondition.wait(lock, [this](){ return !mQueue.empty(); });value = std::move(mQueue.front());mQueue.pop();}private:std::queue<T> mQueue;std::mutex mMutex;std::condition_variable mCondition;
};

  然后,我们需要一个线程池来处理这些任务:

#pragma once#include "ThreadSafeQueue.hpp"
#include <vector>
#include <future>class ThreadPool {
public:ThreadPool(size_t numThreads) {start(numThreads);}~ThreadPool() {stop();}template<typename T>void enqueue(T task) {mTasks.push(std::make_shared<std::packaged_task<void()>>(std::move(task)));}private:std::vector<std::thread> mThreads;ThreadSafeQueue<std::shared_ptr<std::packaged_task<void()>>> mTasks;std::atomic<bool> mContinue { true };void start(size_t numThreads) {for (auto i = 0u; i < numThreads; ++i) {mThreads.emplace_back([=] {while (mContinue) {std::shared_ptr<std::packaged_task<void()>> task;if (mTasks.try_pop(task)) {(*task)();} else {std::this_thread::yield();}}});}}void stop() noexcept {mContinue = false;for (auto &thread : mThreads) {thread.join();}}
};

  然后,我们可以在EventDispatcher中使用线程池来异步地处理事件:

#pragma once
#include "ThreadPool.hpp"
#include <mutex>
#include <map>
#include <memory>
#include "EventHandler.hpp"class EventDispatcher {
public:EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {std::lock_guard<std::mutex> lock(mMutex);mHandlers[eventID] = handler;}void postEvent(int eventID) {std::lock_guard<std::mutex> lock(mMutex);auto it = mHandlers.find(eventID);if (it != mHandlers.end()) {mThreadPool.enqueue([handler = it->second, eventID]() {handler->handleEvent(eventID);});}}private:std::map<int, std::shared_ptr<EventHandler>> mHandlers;ThreadPool mThreadPool;std::mutex mMutex;
};

  这个实现现在是线程安全的,并且使用了线程池来处理事件。对于异常处理,你可以在EventHandler的handleEvent方法内部进行。

添加异常处理

  如何在EventDispatcher中处理异常?
EventDispatcher中处理异常,我们可以在任务的执行过程中增加一个try/catch块,捕获并处理可能出现的异常。下面是一个示例:

class EventDispatcher {
public:EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {std::lock_guard<std::mutex> lock(mMutex);mHandlers[eventID] = handler;}void postEvent(int eventID) {std::lock_guard<std::mutex> lock(mMutex);auto it = mHandlers.find(eventID);if (it != mHandlers.end()) {mThreadPool.enqueue([handler = it->second, eventID]() {try {handler->handleEvent(eventID);} catch (const std::exception &ex) {std::cerr << "Exception occurred while handling event " << eventID << ": " << ex.what() << '\n';} catch (...) {std::cerr << "Unknown exception occurred while handling event " << eventID << '\n';}});}}private:std::map<int, std::shared_ptr<EventHandler>> mHandlers;ThreadPool mThreadPool;std::mutex mMutex;
};

  在这个示例中,如果handleEvent方法抛出了一个异常,那么我们将在控制台上打印出一个错误消息。具体如何处理这些异常,取决于你的应用程序的特定需求。

  请注意,这种方法仅处理了在handleEvent方法中抛出的异常。如果在其他地方(如ThreadPool的构造函数或enqueue方法)抛出异常,你可能需要额外的代码来处理这些情况。

注册多个事件

  在我们的事件分发器设计中,注册多个事件是非常简单的。你只需要创建不同的事件处理器,然后使用不同的事件ID将它们注册到事件分发器中。以下是一个示例:

class PrintEventHandler : public EventHandler {
public:PrintEventHandler(int id) : id(id) {}void handleEvent(int eventID) override {std::cout << "Handler " << id << " handling event " << eventID << std::endl;}
private:int id;
};int main() {EventDispatcher dispatcher(4);  // 创建一个有4个线程的事件分发器std::shared_ptr<EventHandler> handler1 = std::make_shared<PrintEventHandler>(1);dispatcher.registerHandler(1, handler1);std::shared_ptr<EventHandler> handler2 = std::make_shared<PrintEventHandler>(2);dispatcher.registerHandler(2, handler2);dispatcher.postEvent(1);dispatcher.postEvent(2);// Sleep main thread to let the event threads finishstd::this_thread::sleep_for(std::chrono::seconds(1));return 0;
}

  在这个示例中,我们创建了两个PrintEventHandler对象,每个对象都有一个唯一的ID。然后,我们使用不同的事件ID将这两个处理器注册到EventDispatcher中。当我们发布事件时,相应的处理器将会被调用。

  这就是如何在我们的异步事件绑定框架中注册多个事件的方法。

如何取消注册

  要从事件分发器中取消注册一个事件处理器,你可以在EventDispatcher类中添加一个新的方法,例如unregisterHandler,用于从处理器映射中删除指定的事件ID。下面是如何实现这个函数的例子:

class EventDispatcher {
public:EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {std::lock_guard<std::mutex> lock(mMutex);mHandlers[eventID] = handler;}void unregisterHandler(int eventID) {std::lock_guard<std::mutex> lock(mMutex);mHandlers.erase(eventID);}void postEvent(int eventID) {std::lock_guard<std::mutex> lock(mMutex);auto it = mHandlers.find(eventID);if (it != mHandlers.end()) {mThreadPool.enqueue([handler = it->second, eventID]() {try {handler->handleEvent(eventID);} catch (const std::exception &ex) {std::cerr << "Exception occurred while handling event " << eventID << ": " << ex.what() << '\n';} catch (...) {std::cerr << "Unknown exception occurred while handling event " << eventID << '\n';}});}}private:std::map<int, std::shared_ptr<EventHandler>> mHandlers;ThreadPool mThreadPool;std::mutex mMutex;
};

  这个unregisterHandler函数将删除与给定事件ID关联的事件处理器。请注意,这个函数并不会停止已经在处理该事件的任何线程。如果你想要取消正在进行的事件处理,你可能需要一个更复杂的设计,例如使用std::futurestd::promise来控制线程的执行。

如何停止正在进行的事件

  要停止正在进行的事件处理,我们需要更复杂的设计,它可能包括使用std::futurestd::promise来控制线程的执行。在这种设计中,每当一个事件被发布时,我们将创建一个std::promise,并将相应的std::future存储在某个地方,以便我们可以稍后在需要时停止事件处理。

  但是,要注意的是,根据C++的设计,没有一个简单且安全的方法可以强制停止正在运行的线程,因为这可能会导致资源泄漏或其他未定义的行为。因此,更常见的做法是让事件处理器定期检查一个“停止标记”,然后在检查到该标记时优雅地停止执行。以下是一个简单的示例,演示了如何实现这种设计:

class StoppableEvent {
public:StoppableEvent(std::future<void> future, std::function<void()> func): mFuture(std::move(future)), mFunc(std::move(func)) {}void operator()() {while(mFuture.wait_for(std::chrono::milliseconds(100)) == std::future_status::timeout) {mFunc();}}private:std::future<void> mFuture;std::function<void()> mFunc;
};class EventDispatcher {
public:EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {std::lock_guard<std::mutex> lock(mMutex);mHandlers[eventID] = handler;}void postEvent(int eventID) {std::lock_guard<std::mutex> lock(mMutex);auto it = mHandlers.find(eventID);if (it != mHandlers.end()) {std::promise<void> stopSignal;auto stopFuture = stopSignal.get_future();mStopSignals[eventID] = std::move(stopSignal);mThreadPool.enqueue(StoppableEvent(std::move(stopFuture), [handler = it->second, eventID]() {handler->handleEvent(eventID);}));}}void stopEvent(int eventID) {std::lock_guard<std::mutex> lock(mMutex);auto it = mStopSignals.find(eventID);if (it != mStopSignals.end()) {it->second.set_value();mStopSignals.erase(it);}}private:std::map<int, std::shared_ptr<EventHandler>> mHandlers;std::map<int, std::promise<void>> mStopSignals;ThreadPool mThreadPool;std::mutex mMutex;
};

  在这个例子中,我们定义了一个StoppableEvent类,它将一个std::future和一个函数组合在一起。当operator()被调用时,它将定期检查future,如果future的状态不是timeout,则停止执行函数。

  然后,当我们在EventDispatcher中发布一个事件时,我们将创建一个新的std::promise和相应的std::future,并将这个future和事件处理器的handleEvent方法一起传递给StoppableEvent。我们还将promise存储在一个映射中,以便我们可以稍后通过调用set_value来发出停止信号。

  最后,我们添加了一个stopEvent方法,它将查找与给定事件ID关联的promise,并通过调用set_value来发出停止信号。然后,它将从映射中删除这个promise,因为我们不再需要它。

  这是一个基本的示例,你可能需要根据你的具体需求来修改和扩展它。请注意,这个设计假设事件处理器的handleEvent方法将被调用多次,每次调用都可能被中断。如果你的事件处理器只执行一次长时间运行的任务,那么这个设计可能并不适合。

  以上是一个简易的异步事件处理demo, 在项目开发中,需要根据具体的业务需求进行调整完善。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/138237.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c语言,将奇数和偶数分类

题目&#xff1a;输入一个整数数组&#xff0c;实现一个函数&#xff0c;来调整该数组中数字的顺序使得数组中所有的奇数位于数组的前半部分&#xff0c;所有偶数位于数组的后半部分。 思路&#xff1a;像冒泡排序那样&#xff0c;相邻两个数比较&#xff0c;两个都是偶数则不…

2023年加氢工艺证考试题库及加氢工艺试题解析

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年加氢工艺证考试题库及加氢工艺试题解析是安全生产模拟考试一点通结合&#xff08;安监局&#xff09;特种作业人员操作证考试大纲和&#xff08;质检局&#xff09;特种设备作业人员上岗证考试大纲随机出的加氢…

外部访问K8S集群内部的kafka集群服务

不许转载 kafka 部署 把 kafka 部署到 k8s 后&#xff0c;我们肯定是通过 service 从 k8s 外部访问 kafaka。这里的 service 要么是 NodePort&#xff0c; 要么是 LoadBalancer 类型。我们使用的方式是 LoadBalancer。 我们先看下面这张图&#xff0c;这是 kafka 在集群中的网…

万能在线预约小程序系统源码 适合任何行业在线预约小程序+预约到店模式 带完整的搭建教程

大家好啊&#xff0c;源码小编又来给大家分享啦&#xff01;随着互联网的发展和普及&#xff0c;越来越多的服务行业开始使用在线预约系统以方便客户和服务管理。例如&#xff0c;美发店、健身房、餐厅等都可以通过在线预约系统提高服务效率&#xff0c;减少等待时间&#xff0…

C++入门篇3(类和对象【重点】)

文章目录 C入门篇3&#xff08;类和对象【重点】&#xff09;1、面向过程和面向对象2、类的引入3、类的定义4、类的访问限定符及封装4.1、访问限定符4.2、封装 5、类的作用域6、类的实例化&#xff08;对象&#xff09;7、类对象模型7.1、类对象的存储方式7.2、结构体&#xff…

VR虚拟现实:VR技术如何进行原型制作

VR虚拟现实原型制作 利用VR虚拟现实软件进行原型制作可以用于增强原型测试期间的沉浸感&#xff0c;减少产品设计迭代次数&#xff0c;并将与产品原型制作相关的成本降低40-65%。 VR虚拟现实原型制作市场规模 用于原型制作的虚拟现实 (VR) 市场在 2017 年估计为 2.104 亿美元…

主题模型LDA教程:一致性得分coherence score方法对比(umass、c_v、uci)

文章目录 主题建模潜在迪利克雷分配&#xff08;LDA&#xff09;一致性得分 coherence score1. CV 一致性得分2. UMass 一致性得分3. UCI 一致性得分4. Word2vec 一致性得分5. 选择最佳一致性得分 主题建模 主题建模是一种机器学习和自然语言处理技术&#xff0c;用于确定文档…

合同审查---财务条款、合同形式与生效审查

1.合同主体 1人 廖 2.财务条款、合同形式与生效 1人 黄 3.履行、验收、知识产权、不可抗力 1人 詹 4.违约责任、争议解决、保密、法律引用 1人 王 代码规范&#xff1a; 1.代码函数的层级 各审查点在json中分为3级层级&#xff0c;但用python写规则的时候&#xff0c;1级层级为…

『Nacos』 入门教程

前言 本文为 Nacos 平台快速入门教程&#xff0c;本文将会使用通俗易懂的语言手把手带您了解、使用 Nacos 平台&#xff0c;适合未接触过 Nacos 的初学者 官方手册&#xff1a;Nacos | Nacos 官方仓库&#xff1a;alibaba/nacos 版本&#xff1a;2.X 本文示例代码仓库&#xf…

Python基础教程:类--继承和方法的重写

嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 什么是继承 继承就是让类与类之间产生父子关系&#xff0c;子类可以拥有父类的静态属性和方法 继承就是可以获取到另一个类中的静态属性和普通方法&#xff08;并非所有成员&#xff09; 在python中&#xff0c;新建的类可…

2023年成为优秀自动化测试工程师的 7 个步骤!

“测试自动化测试工程师可以将你从充满代码的世界中拯救出来。”企业完全同意这一说法&#xff0c;这就是您在自动化测试行业中看到大量就业机会的原因。我在 Quora 上收到了很多与自动化测试中的职业选择相关的答案请求&#xff0c;以及人们如何在有或没有手动测试经验的情况下…

RISC-V处理器设计(五)—— 在 RISC-V 处理器上运行 C 程序

目录 一、前言 二、从 C 程序到机器指令 三、实验 3.1 实验环境 3.11 Windows 平台下环境搭建 3.12 Ubuntu 平台下环境搭建 3.13 实验涉及到的代码或目录 3.2 各文件作用介绍 3.2.1 link.lds 3.2.2 start.S 3.2.3 lib 和 include 目录 3.2.4 common.mk 3.2.5 demo …

技术分享 | 测试平台开发-前端开发之数据展示与分析

测试平台的数据展示与分析&#xff0c;我们主要使用开源工具ECharts来进行数据的展示与分析。 ECharts简介与安装 ECharts是一款基于JavaScript的数据可视化图表库&#xff0c;提供直观&#xff0c;生动&#xff0c;可交互&#xff0c;可个性化定制的数据可视化图表&#xff…

《红蓝攻防对抗实战》九.内网穿透之利用GRE协议进行隧道穿透

​ 前文推荐&#xff1a; 《红蓝攻防对抗实战》一. 隧道穿透技术详解 《红蓝攻防对抗实战》二.内网探测协议出网之TCP/UDP协议探测出网 《红蓝攻防对抗实战》三.内网探测协议出网之HTTP/HTTPS协议探测出网 《红蓝攻防对抗实战》四.内网探测协议出网之ICMP协议探测出网 《红蓝…

mysql索引下推

文章目录 什么是索引下推索引下推优化的原理索引下推的具体实践没有使用ICP使用ICP 总结索引下推使用条件相关系统参数 什么是索引下推 索引下推(Index Condition Pushdown&#xff0c;简称ICP)&#xff0c;是MySQL5.6版本的新特性&#xff0c;它能减少回表查询次数&#xff0…

界面组件DevExpress ASP.NET Core v23.1 - 进一步升级UI组件

DevExpress ASP.NET Core Controls使用强大的混合方法&#xff0c;结合现代企业Web开发工具所期望的所有功能。该套件通过ASP.NET Razor标记和服务器端ASP.NET Core Web API的生产力和简便性&#xff0c;提供客户端JavaScript的性能和灵活性。ThemeBuilder工具和集成的Material…

遍历List集合和Map进行修改和删除报java.util.ConcurrentModificationException错误详解

一、异常产生 当我们使用foreach迭代一个ArrayList或者HashMap时&#xff0c;如果尝试对集合做一些修改操作&#xff08;例如删除元素或新增&#xff09;&#xff0c;可能会抛出java.util.ConcurrentModificationException的异常。 javapublic static void main(String[] args)…

山西电力市场日前价格预测【2023-11-12】

日前价格预测 预测说明&#xff1a; 如上图所示&#xff0c;预测明日&#xff08;2023-11-12&#xff09;山西电力市场全天平均日前电价为224.59元/MWh。其中&#xff0c;最高日前电价为434.30元/MWh&#xff0c;预计出现在18:00。最低日前电价为0.00元/MWh&#xff0c;预计出…

深度学习 python opencv 火焰检测识别 计算机竞赛

文章目录 0 前言1 基于YOLO的火焰检测与识别2 课题背景3 卷积神经网络3.1 卷积层3.2 池化层3.3 激活函数&#xff1a;3.4 全连接层3.5 使用tensorflow中keras模块实现卷积神经网络 4 YOLOV54.1 网络架构图4.2 输入端4.3 基准网络4.4 Neck网络4.5 Head输出层 5 数据集准备5.1 数…

python OrderedDict类(有序字典)

嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! python更多源码/资料/解答/教程等 点击此处跳转文末名片免费获取 创建有序字典 import collectionsdic collections.OrderedDict() dic[k1] v1 dic[k2] v2 dic[k3] v3 print(dic)#输出&#xff1a;OrderedDict([(k1, v1), (…