C/C++|基于回调函数实现异步操作

首先,要搞懂一点,异步操作本质上也是并发,我们想要在线程级别实现异步并发基本就靠三种方式:

  • 多线程并发
  • 回调函数
  • 协程

今天我们讨论的是回调函数,我们如何通过回调函数来实现异步操作呢?

  1. 非阻塞I/O操作+回调函数实现异步IO
  2. 基于定时器+回调函数实现异步任务调度
  3. 事件队列+回调函数

我们就分别来实现一下他们吧,代码比较长,请耐心阅读。

非阻塞I/O+回调

这种方式允许程序在等待 I/O 操作完成时继续执行其他任务,从而提高并发性和性能。

我们可以使用标准 C++ 库和 C++11 中的线程库来实现一个基于 epoll 的非阻塞 I/O 和回调函数的示例。epoll 是 Linux 提供的高效 I/O 多路复用机制,适用于处理大量并发连接。我们将使用 epoll 来实现异步 I/O,并使用回调函数处理完成的 I/O 操作。

实例代码:使用 epoll 实现非阻塞 I/O 和回调函数

#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <iostream>
#include <cstring>
#include <functional>
#include <unordered_map>
#include <vector>
#include <thread>
#include <chrono>void set_non_blocking(int fd) {int flags = fcntl(fd, F_GETFL, 0);if (flage == -1) {throw std::runtime_error("fcntl get error");}if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {throw std::runtime_error("fcntl set error");}
}//事件处理器类
class EventLoop {
public:EventLoop() {epollFd_ = epoll_create1(0);if (epoll_fd == -1) {throw std::runtime_error("epoll_creat1 error");}}~EpollLoop() {close(epoll_fd);}//添加文件描述符及其对应的回调函数void add_fd(int fd, std::function<void(int)> cb) {set_non_blocking(fd);epoll_event event;event.events = EPOLLIN | EPOLLET;event.data.fd = fd;if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event) == -1) {throw std::runtime_error("epoll_ctl add error");}callbacks_[fd] = cb;}//运行时间循环void run() {while (running_) {std::vector<epoll_event> events(10);int numActives = epoll_wait(epollFd_, events.data(), events.size(), -1);if (numActives == -1) {throw std::runtime_error("epoll_wait error");}for (int i = 0; i < n; ++i) {int fd = events[i].data.fd;if (callbacks_.find(fd) != callbacks_.end()) callbacks[fd](fd);}}}//停止事件循环void stop() {return = false;}private:int epollFd_; //epoll实例std::unordered_map<int, std::function<void(int)>> callbacks_;bool running_ = true;
};// 示例回调函数,处理标准输入的读取
void handle_stdin(int fd) {char buffer[128];ssize_t n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0) {buffer[n] = '\0';std::cout << "Read from stdin: " << buffer << std::endl;} else if (n == 0) {std::cout << "EOF from stdin" << std::endl;} else {if (errno != EAGAIN && errno != EWOULDBLOCK) {std::ceer << "Read error: " <<  strerror(errno) << std::endl;}	}
}int main () {try {EventLoop loop;//注册标准输入的文件描述符和回调函数loop.add_fd(STDIN_FILENO, handle_stdin);//运行事件循环std::thread loop_thread([&loop]() {loop.run();});//模拟只线程的其他工作std::this_thread::sleep_for(std::chrono::seconds(10));//停止事件循环loop.stop();loop_thread.join();} catch (const std::exception &e) {std::cerr << "Exception: " << e.what() << std::endl;return 1}return 0;
}

在该示例中,由于是非阻塞IO,所以如果有两个 fd 对应的事件被激活,那么他们会循环执行各自的回调操作,因为我们的EventLoop里面是又一个 while 循环的,如果这两个 fd 如果都是读一个很大很大的文件,那么他们在while循环中会交替执行各自的回调任务,实现异步操作。

基于定时器+回调函数实现异步任务调度

通过定时器和回调函数,可以在单线程环境中实现异步任务调度。在这个示例中,我们实现了一个简单的 Web 服务器定时清理过期会话的功能。类似的技术可以应用于其他需要定时任务调度的场景,如定时备份、日志轮换、定时数据采集等。

1. SessionManager 类

这个类负责管理用户会话,提供添加、删除和清理过期会话的方法。

class SessionManager {
#include <iostream>
#include <unordered_map>
#include <chrono>
#include <functional>
#include <thread>
#include <vector>
#include <algorithm>class SessionManager {
public:void add_session(const std::string& session_id) {sessions[session_id] = std::chrono::steady_clock::now();}void remove_session(const std::string& session_id) {sessions.erase(session_id);}void clean_expired_sessions(std::chrono::seconds timeout) {auto now = std::chrono::steady_clock::now();for (auto it = sessions.begin(); it != sessions.end(); ) {if (now - it->second > timeout) {std::cout << "Removing expired session: " << it->first << std::endl;it = sessions.erase(it);} else {++it;}}}private:std::unordered_map<std::string, std::chrono::steady_clock::time_point> sessions;
};
  • add_session:添加一个新会话,记录会话 ID 和当前时间。
  • remove_session:删除指定的会话。
  • clean_expired_sessions:清理过期会话,检查所有会话,如果会话时间超过指定的超时时间,则删除会话。

2. EventLoop类

这个类管理定时任务,通过维护一个事件列表并运行事件循环,在指定时间间隔内执行任务。

class EventLoop {
public:void add_event(std::function<void()> callback, std::chrono::milliseconds interval) {auto next_run = std::chrono::steady_clock::now() + interval;events.emplace_back(next_run, interval, callback);}void run() {while (running) {auto now = std::chrono::steady_clock::now();for (auto& event : events) {if (now >= event.next_run) {event.callback();event.next_run = now + event.interval;}}std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 小休眠,减少CPU占用}}void stop() {running = false;}private:struct Event {std::chrono::steady_clock::time_point next_run;std::chrono::milliseconds interval;std::function<void()> callback;Event(std::chrono::steady_clock::time_point nr, std::chrono::milliseconds i, std::function<void()> cb): next_run(nr), interval(i), callback(cb) {}};std::vector<Event> events;bool running = true;
};
  • add_event:添加一个新的定时事件,指定回调函数和时间间隔。
  • run:启动事件循环,循环检查所有事件,如果事件到期则执行回调函数,并更新下一次执行时间。
  • stop:停止事件循环。

3. 回调函数

定义一个回调函数,用于清理过期会话。
如果我们定义两个不同任务的回调函数,那么是不是就可以在某段时间实现多个任务了呢?

void clean_sessions_task(SessionManager& session_manager, std::chrono::seconds timeout) {session_manager.clean_expired_sessions(timeout);
}

这个函数调用 SessionManager 的 clean_expired_sessions 方法来清理过期会话。

4. main 函数

int main() {SessionManager session_manager;EventLoop event_loop;// 添加一些示例会话session_manager.add_session("session1");std::this_thread::sleep_for(std::chrono::seconds(1));session_manager.add_session("session2");// 每2秒清理一次过期会话(假设过期时间为3秒)event_loop.add_event(std::bind(clean_sessions_task, std::ref(session_manager), std::chrono::seconds(3)), std::chrono::seconds(2));// 运行事件循环std::thread event_loop_thread([&event_loop]() {event_loop.run();});// 模拟服务器运行std::this_thread::sleep_for(std::chrono::seconds(10));// 停止事件循环event_loop.stop();event_loop_thread.join();return 0;
}
  1. 创建 SessionManager 和 EventLoop 对象。
  2. 添加两个示例会话,分别在 0 秒和 1 秒时添加。
  3. 添加一个定时事件,每 2 秒调用一次 clean_sessions_task 来清理过期会话,过期时间为 3 秒。
  4. 启动一个线程运行事件循环。
  5. 主线程模拟服务器运行 10 秒钟。
  6. 停止事件循环并等待事件循环线程结束。

如果我们添加两个定时事件,并且打开循环,那么不是就在某段时间实现了异步任务调度呢?

基于事件队列+回调函数实现异步操作

这个示例展示了如何使用标准 C++ 库和 C++11 线程库,通过事件队列和回调函数在单线程中实现异步任务调度。具体来说,我们实现了一个简单的系统,可以调度并执行延迟任务。

1.EventLoop 类

这个类管理定时任务,通过维护一个事件列表并运行事件循环,在指定时间间隔内执行任务。

class EventLoop {
public:// 添加一个新的定时事件void add_event(std::function<void()> callback, std::chrono::milliseconds interval) {auto next_run = std::chrono::steady_clock::now() + interval;events.emplace_back(next_run, interval, callback);}// 运行事件循环void run() {while (running) {auto now = std::chrono::steady_clock::now();for (auto& event : events) {if (now >= event.next_run) {event.callback(); // 执行回调函数event.next_run = now + event.interval; // 更新下一次执行时间}}std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 小休眠,减少CPU占用}}// 停止事件循环void stop() {running = false;}private:struct Event {std::chrono::steady_clock::time_point next_run;std::chrono::milliseconds interval;std::function<void()> callback;Event(std::chrono::steady_clock::time_point nr, std::chrono::milliseconds i, std::function<void()> cb): next_run(nr), interval(i), callback(cb) {}};std::vector<Event> events;bool running = true;
};
  • add_event:添加一个新的定时事件,指定回调函数和时间间隔。
    • 参数 callback 是一个回调函数,当事件触发时调用。
    • 参数 interval 是一个时间间隔,表示事件应该在多长时间后执行。
    • next_run 记录了事件的下一次执行时间。
  • run:启动事件循环,循环检查所有事件,如果事件到期则执行回调函数,并更新下一次执行时间。
    • while (running):事件循环在 running 为 true 时持续运行。
    • event.callback():执行回调函数,表示事件触发。
    • event.next_run = now + event.interval:更新事件的下一次执行时间。
    • std::this_thread::sleep_for(std::chrono::milliseconds(10)):通过短暂休眠来减少 CPU 占用。
  • stop:停止事件循环,将 running 设置为 false。

2. 回调函数

定义两个简单的回调函数,用于示例任务。

void say_hello() {std::cout << "Hello, World!" << std::endl;
}void say_goodbye() {std::cout << "Goodbye, World!" << std::endl;
}

3.main函数

设置并运行事件循环,添加一些示例任务,并定期执行这些任务。

int main() {EventLoop loop;// 添加定时事件loop.add_event(say_hello, std::chrono::seconds(1)); // 每1秒执行一次loop.add_event(say_goodbye, std::chrono::seconds(2)); // 每2秒执行一次// 运行事件循环loop.run();return 0;
}

异步任务的实现

异步任务的实现核心在于 EventLoop 类中的 add_event 和 run 方法:

  • add_event 方法将任务(回调函数)和执行时间(间隔)添加到事件队列中。
  • run 方法启动事件循环,定期检查事件队列,触发到期的任务,并通过回调函数执行这些任务。

具体来说,异步任务的执行发生在以下代码行:

if (now >= event.next_run) {event.callback(); // 执行回调函数event.next_run = now + event.interval; // 更新下一次执行时间
}
  • event.callback():当当前时间 now 大于等于 event.next_run 时,执行回调函数,表示异步任务触发并执行。

这个机制允许在单线程环境中,通过事件队列和回调函数实现异步任务调度,无需多线程或协程。每个任务在指定的时间间隔后触发,保持事件循环的运行和任务的调度。

事件队列\定时器 + 回调真的能实现异步任务吗

当回调任务是一个非常耗时的操作时,如果在单线程中执行,确实会导致事件循环被阻塞,从而影响其他任务的执行。这违背了异步执行的初衷,即不阻塞程序的其他部分。

为了解决这个问题,主要还是以下方法:

  • 使用线程池
    • 我们可以使用一个线程池来处理耗时的任务,从而避免阻塞事件循环。线程池可以并发执行多个任务,确保事件循环保持响应。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/21266.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java——二进制原码、反码和补码

一、简要介绍 原码、反码和补码只是三种二进制不同的表示形式&#xff0c;每个二进制数都有这三个形式。 1、原码 原码是将一个数的符号位和数值位分别表示的方法。 最高位为符号位&#xff0c;0表示正&#xff0c;1表示负&#xff0c;其余位表示数值的绝对值。 例如&…

力扣刷题--LCP 66. 最小展台数量【简单】

题目描述&#x1f357; 力扣嘉年华将举办一系列展览活动&#xff0c;后勤部将负责为每场展览提供所需要的展台。 已知后勤部得到了一份需求清单&#xff0c;记录了近期展览所需要的展台类型&#xff0c; demand[i][j] 表示第 i 天展览时第 j 个展台的类型。 在满足每一天展台需…

C# SolidWorks 二次开发-显示配置

在 SolidWorks 的二次开发中&#xff0c;显示配置&#xff08;Display States&#xff09;是一个非常重要的功能。显示配置允许用户在同一个配置&#xff08;Configuration&#xff09;下保存不同的显示状态&#xff0c;如隐藏或显示的零件、不同的颜色和材质等。本文将向新的开…

PostgreSQL LATERAL 的工作原理

LATERAL 的工作原理 外部查询生成一行结果&#xff1a;LATERAL 子查询会对每一行外部查询结果进行评估。LATERAL 子查询执行&#xff1a;对于每一行&#xff0c;LATERAL 子查询会使用该行的列值来执行自己的查询。结果合并&#xff1a;子查询的结果与外部查询的行合并&#xf…

URL 与域名的关系

URL、域名和DNS 是互联网上资源定位和访问的关键要素&#xff0c;它们之间有紧密的关系。 URL 与域名的关系 URL&#xff08;Uniform Resource Locator&#xff09; URL 是统一资源定位符&#xff0c;用于标识互联网上的资源。它不仅包含域名&#xff0c;还包括访问资源所需…

如何解决游戏行业DDOS攻击问题

随着网络游戏行业的迅速发展&#xff0c;网络游戏问题也不可忽视&#xff0c;特别是目前网络攻击频发&#xff0c;DDoS攻击的简单化以及普及化&#xff0c;对游戏来说存在非常大的安全威胁。 随着受攻击对象的范围在不断地拓展&#xff0c;网络游戏这种这种新型并且有着丰厚利…

Scala编程基础3 数组、映射、元组、集合

Scala编程基础3 数组、映射、元组、集合 小白的Scala学习笔记 2024/5/23 14:20 文章目录 Scala编程基础3 数组、映射、元组、集合apply方法数组yield 数组的一些方法映射元组数据类型转换求和示例拉链集合flatMap方法 SetHashMap apply方法 可以new&#xff0c;也可以不new&am…

flink Jobmanager metaspace oom 分析

文章目录 现象作业背景分析现象分析类卸载条件MAT 分析 解决办法flink 官方提示 现象 通过flink 页面提交程序&#xff0c;多次提交后&#xff0c;jobmanager 报metaspace oom 作业背景 用户代码是flink 代码Spring nacos 分析 现象分析 从现象来看肯定是因为有的类没有被…

Python教程-快速入门基础必看课程06-List索引

摘要 该视频主要讲述了Python中for循环的基本结构和用法&#xff0c;特别是针对处理复杂数据结构如list of list的情况。首先介绍了for循环的基本概念&#xff0c;然后通过实例详细解释了如何遍历list中的元素&#xff0c;并解决了单层for循环无法处理list of list的问题。视频…

Linux系统-前台任务组,后台任务组

文章目录 前台进程后台进程新命令jobsfg 【后台进程组序号】ctrlz组合键信号 和 bg命令ctrlz组合键信号bg 【后台进程组序号】 session会话此时我们关闭本次的会话&#xff0c;我们的后台进程是否也会退出呢&#xff1f; 总结 前台进程 在我们远程登录Linux服务器后&#xff0…

【Java基础-注解】Java中注解的分类有哪些,如何自定义一个注解,并使用举例

在Java中&#xff0c;注解&#xff08;Annotation&#xff09;是一种元数据&#xff08;metadata&#xff09;的形式&#xff0c;用于为Java代码&#xff08;类、方法、变量、参数、包等&#xff09;提供信息。注解不会直接影响代码的执行&#xff0c;但可以被编译器用来生成代…

APP上架 篇四:计算机软件著作权证书和APP电子版权证书

文章目录 系列文章概念《计算机软件著作权证书》和《软件著作权认证证书》《APP电子版权证书》和《软件著作权认证证书》申请《计算机软件著作权证书》中国版权保护中心的官方网站申请流程费用详情软件更新后续维护申请《软件著作权认证证书》和《APP电子版权证书》“易版权”官…

创建__init__()方法

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 在创建类后&#xff0c;可以手动创建一个__init__()方法。该方法是一个特殊的方法&#xff0c;类似Java语言中的构造方法。每当创建一个类的新实例时…

【AI界的狼人杀】人工智能“反图灵测试”实验

5月28日&#xff0c; AI Power Users、Unity 独立开发者&#xff1a;Tore Knabe 在其社交平台发布了一则名为《Reverse Turing Test Experiment with AIs》的视频&#xff0c;立马引发了热议。 视频中共出现了五位主要角色&#xff0c;“他们”分别是&#xff1a;亚里士多德&am…

961操作系统知识总结

部分图片可能无法显示&#xff0c;参考这里&#xff1a;https://zhuanlan.zhihu.com/p/701247894 961操作系统知识总结 一 操作系统概述 1. 操作系统的基本概念 重要操作系统类型&#xff1a;批处理操作系统(批量处理作业&#xff0c;单道批处理/多道批处理系统&#xff0c;用…

RabbitMQ-直连交换机(direct)使用方法

RabbitMQ-默认读、写方式介绍 RabbitMQ-发布/订阅模式 目录 1、概述 2、直连交换机 3、多重绑定 4、具体代码实现 4.1 生产者部分 4.2 消费者部分 5、运行代码 6、总结 1、概述 直连交换机&#xff0c;可以实现类似路由的功能&#xff0c;消息从交换机发送到哪个队列…

夜天之书 #98 Rust 程序库生态合作的例子

近期主要时间都在适应产品市场&#xff08;Product Marketing&#xff09;的新角色&#xff0c;不少想法还在酝酿和斟酌当中&#xff0c;于是文章输出没有太多时间来推敲和选题&#xff0c;只能保持每月发布相关的进展或一些零碎的思考。或许我可以恢复最早的模式&#xff0c;多…

C#面:说出尽可能多的基于.Net Framework的语言

C#、VB.Net、F#、PowerShell、IronPython、J#、Ruby.Net C#是一种基于.Net Framework的编程语言&#xff0c;是微软公司开发的一种通型、面向对象的编程语言。C#可以用于开发种类型的应用程序&#xff0c;包括桌面应用程序、Web应用程序、移动应用程序和游戏等。 除了C#之外&…

YOLOv8改进(一)-- 轻量化模型ShuffleNetV2

文章目录 1、前言2、ShuffleNetV2代码实现2.1、创建ShuffleNet类2.2、修改tasks.py2.3、创建shufflenetv2.yaml文件2.4、跑通示例 3、碰到的问题4、目标检测系列文章 1、前言 移动端设备也需要既准确又快的小模型。为了满足这些需求&#xff0c;一些轻量级的CNN网络如MobileNe…

如何进入docker容器中

要进入正在运行的Docker容器&#xff0c;您可以使用docker exec命令。这里是一个基本的命令示例&#xff1a; docker exec -it <container_id_or_name> /bin/bash这里的-it参数是为了让我们能交互式地使用容器的shell。<container_id_or_name>是您想要进入的容器的…