ZLMediaKit 源码分析——[5] ZLToolKit 中EventPoller之延时任务处理

系列文章目录

第一篇 基于SRS 的 WebRTC 环境搭建
第二篇 基于SRS 实现RTSP接入与WebRTC播放
第三篇 centos下基于ZLMediaKit 的WebRTC 环境搭建
第四篇 WebRTC学习一:获取音频和视频设备
第五篇 WebRTC学习二:WebRTC音视频数据采集
第六篇 WebRTC学习三:WebRTC音视频约束
第七篇 WebRTC学习四:WebRTC常规视觉滤镜
第八篇 WebRTC学习五:从视频中提取图片
第九篇 WebRTC学习六:MediaStream 常用API介绍
第十篇 WebRTC学习七:WebRTC 中 STUN 协议详解
ZLMediaKit源码分析——[1] 开篇:基础库 ZLToolKit 之 onceToken 源码分析
ZLMediaKit 源码分析——[2] 从 ZLToolKit 代码看 CPU 亲和性设计
ZLMediaKit 源码分析——[3] ZLToolKit 中EventPoller之网络事件处理
ZLMediaKit 源码分析——[4] ZLToolKit 中EventPoller之异步任务处理
ZLMediaKit 源码分析——[5] ZLToolKit 中EventPoller之延时任务处理


文章目录

  • 系列文章目录
  • 前言
  • 一、整体设计思路
  • 二、源码分析
    • 2.1 延时任务处理函数
    • 2.2 getMinDelay 函数
    • 2.3 flushDelayTask 函数
  • 三、延时任务触发用户接口函数doDelayTask
  • 四、设计亮点与启发
    • 4.1 并发性能优化
    • 4.2 可重复任务处理
    • 4.3 异步插入任务
  • 五、实际应用场景
    • 5.1 定时任务
    • 5.2 超时检测
  • 总结


前言

前面两篇文章中已经讲到了EventPoller中的网络事件处理机制和异步任务处理机制,有了前面两篇文章的IO框架和异步任务的基础,今天的延时任务的内容就比较容易理解了,今天我们就一鼓作气,把ZLMediaKit中延时任务的机制和实现细节分析一下,结束掉EventPoller的分析。


一、整体设计思路

EventPoller 中的延时任务处理主要基于一个有序的任务队列 _delay_task_map。这个队列以任务的执行时间戳作为键,以任务对象作为值,按照时间戳从小到大的顺序排列。通过这种方式,我们可以很方便地找到最近需要执行的任务。

在处理延时任务时,主要涉及三个关键函数:flushDelayTask、getMinDelay 和 doDelayTask以及数据结构std::multimap<uint64_t, DelayTask::Ptr> _delay_task_map,下面我们将分别对这三个函数和_delay_task_map进行详细分析。

二、源码分析

2.1 延时任务处理函数

void EventPoller::runLoop(bool blocked, bool ref_self) {if (blocked) {if (ref_self) {s_current_poller = shared_from_this();}_sem_run_started.post();_exit_flag = false;uint64_t minDelay;
#if defined(HAS_EPOLL)struct epoll_event events[EPOLL_SIZE];while (!_exit_flag) {minDelay = getMinDelay();startSleep();//用于统计当前线程负载情况int ret = epoll_wait(_event_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1);sleepWakeUp();//用于统计当前线程负载情况if (ret <= 0) {//超时或被打断continue;}// ...其它处理代码}
#elif defined(HAS_KQUEUE)// ... 其他系统的处理代码
#else// ... 其他系统的处理代码
#endif //HAS_EPOLL} else {_loop_thread = new thread(&EventPoller::runLoop, this, true, ref_self);_sem_run_started.wait();}
}

与live555中在最后处理延时队列中的任务不一样,在ZLToolKit中会优先处理延时队列任务,在消息循环函数中会调用getMinDelay(),所有延时队列任务的处理将在这个函数里完成。

2.2 getMinDelay 函数

uint64_t EventPoller::getMinDelay() {// 查找最早延时任务auto it = _delay_task_map.begin();if (it == _delay_task_map.end()) {//没有剩余的定时器了return 0;}// 获取当前时间auto now = getCurrentMillisecond();// 如果最早任务的执行时间大于当前时间,说明所有任务都尚未到期,函数返回最早任务执行时间与当前时间的差值,即需要等待的时间。if (it->first > now) {//所有任务尚未到期return it->first - now; // 计算需要等待的时间}//执行已到期的任务并刷新休眠延时return flushDelayTask(now);
}

这里首先获取_delay_task_map 的首元素,_dealy_task_map定义是std::multimap<uint64_t, DelayTask::Ptr> _delay_task_map;‌ std::multimap是C++标准库中的一个容器,它存储的元素都是键值对,并且允许有重复的键‌。这与std::map不同,后者不允许有重复的键。那么这个函数的意义就很明确了:
1、查找最早延时任务,没有返回0;
2、如果所有任务都尚未到期,函数返回需要等待的时间;
3、执行已到期的任务并刷新休眠延时,返回下一个未到期任务执行时所需的最小延时时间。
我们接下来看下执行已到期的任务并刷新休眠延时 flushDelayTask(now)函数里面做了些什么事情。

2.3 flushDelayTask 函数

uint64_t EventPoller::flushDelayTask(uint64_t now_time) {// 交换任务队列,将当前任务转移到临时容器,避免处理期间阻塞新任务写入。decltype(_delay_task_map) task_copy; // 自动推导_delay_task_map类型,task_copy初始化为空task_copy.swap(_delay_task_map); // 交换 task_copy 和 _delay_task_map 的内容。task_copy 持有原来的 _delay_task_map 数据。_delay_task_map 变为空容器。// 处理到期任务‌:执行所有时间戳 ≤ now_time 的任务,并根据返回值决定是否重新调度。for (auto it = task_copy.begin(); it != task_copy.end() && it->first <= now_time; it = task_copy.erase(it)) {//已到期的任务try {auto next_delay = (*(it->second))();if (next_delay) {//可重复任务,更新时间截止线_delay_task_map.emplace(next_delay + now_time, std::move(it->second));}} catch (std::exception &ex) {ErrorL << "Exception occurred when do delay task: " << ex.what();}}// 合并新任务和未处理任务‌,确保未到期任务和新添加的重复任务保留在队列中。task_copy.insert(_delay_task_map.begin(), _delay_task_map.end());task_copy.swap(_delay_task_map);auto it = _delay_task_map.begin();if (it == _delay_task_map.end()) {//没有剩余的定时器了return 0;}//最近一个定时器的执行延时return it->first - now_time;
}

EventPoller::flushDelayTask(uint64_t now_time) 函数的主要功能是处理所有已到期的延时任务,并根据任务的返回值决定是否重新调度这些任务,最后返回距离下一个未到期任务执行所需的最小延时时间。

三、延时任务触发用户接口函数doDelayTask

延时任务的外部调用接口为doDelayTask, 所有想要添加一个延时任务,需要通过调用doDelayTask添加,传入第一个参数为需要延时的毫秒值,第二个参数为task 任务,task任务返回值为0时代表不再重复任务,否则为下次执行延时,如果任务中抛异常,那么默认不重复任务。doDelayTask的返回值为DelayTask::Ptr,定义为 using DelayTask = TaskCancelableImp<uint64_t(void)>;是一个可取消的任务,意味着在还没到任务执行之前是可以取消的。

EventPoller::DelayTask::Ptr EventPoller::doDelayTask(uint64_t delay_ms, function<uint64_t()> task) {DelayTask::Ptr ret = std::make_shared<DelayTask>(std::move(task));auto time_line = getCurrentMillisecond() + delay_ms; // 当前时间+需要延时执行的时间async_first([time_line, ret, this]() {//异步执行的目的是刷新select或epoll的休眠时间_delay_task_map.emplace(time_line, ret);});return ret;
}

该函数的主要功能是创建一个延时任务,将其添加到延时任务映射_delay_task_map中,该任务将在指定的延时时间(delay_ms)后执行。

四、设计亮点与启发

4.1 并发性能优化

通过使用任务队列交换的方式,避免了在处理任务时阻塞新任务的添加,提高了程序的并发性能。这种设计思路在处理高并发场景下的任务调度非常有效。

4.2 可重复任务处理

支持可重复执行的任务,通过任务返回的延时时间来重新调度任务,增加了任务处理的灵活性。这种设计使得程序可以方便地实现定时任务的功能。

4.3 异步插入任务

使用异步方式插入任务,确保了系统能够及时响应新的延时任务。这在网络编程中尤为重要,因为网络事件的处理需要高实时性。

五、实际应用场景

5.1 定时任务

可以使用 doDelayTask 函数来实现定时任务,例如定时清理缓存、定时发送心跳包等。
如zlToolKit中定时器的实现

Timer::Timer(float second, const std::function<bool()> &cb, const EventPoller::Ptr &poller) {_poller = poller;if (!_poller) {_poller = EventPollerPool::Instance().getPoller();}_tag = _poller->doDelayTask((uint64_t) (second * 1000), [cb, second]() {try {if (cb()) {//重复的任务return (uint64_t) (1000 * second);}//该任务不再重复return (uint64_t) 0;} catch (std::exception &ex) {ErrorL << "Exception occurred when do timer task: " << ex.what();return (uint64_t) (1000 * second);}});
}

这里Timer构造函数通过事件轮询器构建了一个定时任务,该任务会在指定的时间间隔后执行一个回调函数。根据回调函数的返回值,定时器可以选择重复执行或停止执行。同时,代码中对回调函数的异常进行了捕获和处理,确保在出现异常时定时器仍然可以继续运行。

5.2 超时检测

在网络通信中,经常需要检测连接是否超时。可以通过设置延时任务来实现超时检测,当任务到期时,如果连接还没有收到响应,则认为连接超时。
比如MediaSource.cpp 的findAsync_l函数中代码:

auto on_timeout = poller->doDelayTask(maxWaitMS, [cb_once, listener_tag]() {// 最多等待一定时间,如在这个时间内,流还未注册上,则返回空NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged);cb_once(nullptr);return 0;});

这里如果在最大等待时间里没有找到媒体源,将触发超时回调。


总结

ZLToolKit 中的 EventPoller 提供了一套高效、灵活的延时任务处理机制。通过有序的任务队列、任务队列交换、异步插入任务等技术手段,保证了程序的高并发性能和实时性。在实际应用中,我们可以根据具体需求灵活运用这些功能,实现各种复杂的定时任务和超时检测功能。

希望通过本文的分析,大家能够对 ZLMediaKit 中 EventPoller 的延时任务处理机制有更深入的理解,并在自己的项目中借鉴其设计思路。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/900111.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【零基础入门unity游戏开发——2D篇】SortingGroup(排序分组)组件

考虑到每个人基础可能不一样&#xff0c;且并不是所有人都有同时做2D、3D开发的需求&#xff0c;所以我把 【零基础入门unity游戏开发】 分为成了C#篇、unity通用篇、unity3D篇、unity2D篇。 【C#篇】&#xff1a;主要讲解C#的基础语法&#xff0c;包括变量、数据类型、运算符、…

26信号和槽_自定义信号(1)

Qt 中也允许自定义信号 ①自定义槽函数,非常关键.开发中大部分情况都是需要自定义槽函数的. 槽函数&#xff0c;就是用户触发某个操作之后,要进行的业务逻辑. ②自定义信号,比较少见.实际开发中很少会需要自定义信号. 信号就对应到用户的某个操作~ 在 GUI,用户能够进行哪些操作…

今天来介绍一下一个简单,灵活的JavaScrip图标工具Chart.js

Chart.js 柱形图 先看效果&#xff1a; 代码部分&#xff1a; <!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title></title> <script src"https://lf26-cdn-tos.bytecdntp.com/cdn/expire-1-M/Chart.js/3.7…

Mysql 中的 binlog、redolog、undolog

Binlog MySQL中的Binlog&#xff08;Binary Log&#xff09; 是 MySQL 用来记录数据库所有数据更改操作的日志文件。它是 MySQL 数据库的核心组件之一&#xff0c;广泛应用于 数据复制、数据恢复 和 故障恢复 等操作中。 Binlog的主要作用&#xff1a; 数据复制&#xff08;…

object中的方法,和String类常用api

Java Object 类和 String 类常用 API 一、Object 类核心方法 Object 类是 Java 中所有类的超类&#xff0c;提供了以下重要方法&#xff1a; 1. 基本方法 方法描述重写建议public boolean equals(Object obj)对象相等性比较必须重写&#xff08;同时重写hashCode&#xff0…

Haskell语言的云安全

Haskell语言的云安全探索 引言 在信息技术迅猛发展的今天&#xff0c;云计算已经成为了企业和个人用户不可或缺的重要组成部分。然而&#xff0c;随着云计算的普及&#xff0c;相关的安全问题也日益突显。云安全不仅涉及数据的安全性、隐私保护&#xff0c;更涵盖了访问控制、…

01背包问题的空间优化与边界处题目解析

01背包问题的空间优化与边界处题目解析 01背包问题是经典的动态规划问题&#xff0c;旨在选择若干物品装入背包&#xff0c;使得总价值最大且不超过背包容量。每个物品只能选或不选&#xff08;0或1&#xff09;&#xff0c;不可分割。 选和不选是01背包问题最大的特征 例题…

vue3+ts+element-plus 开发一个页面模块的详细过程

目录、文件名均使用kebab-case&#xff08;短横线分隔式&#xff09;命名规范 子组件目录&#xff1a;./progress-ctrl/comps 1、新建页面文件 progress-ctrl.vue <script setup lang"ts" name"progress-ctrl"></script><template>&l…

Ubuntu上离线安装ELK(Elasticsearch、Logstash、Kibana)

在 Ubuntu 上离线安装 ELK(Elasticsearch、Logstash、Kibana)的完整步骤如下: 一.安装验证 二.安装步骤 1. 在联网机器上准备离线包 (1) 安装依赖工具 #联网机器 sudo apt update sudo apt install apt-rdepends wget(2) 下载 ELK 的 .deb 安装包 #创建目录将安装包下载…

Git 常用操作整理

1. 提交本地修改 将本地代码的修改保存到 Git 仓库中&#xff0c;为后续操作&#xff08;同步、合并等&#xff09;做准备。 git add . # 添加所有修改&#xff08;新文件、修改文件、删除文件&#xff09; git commit # 提交到本地仓库&#xff08;会打…

Python星球日记 - 第2天:数据类型与变量

&#x1f31f;引言&#xff1a; 上一篇&#xff1a;Python星球日记 - 第1天&#xff1a;欢迎来到Python星球 名人说&#xff1a;莫听穿林打叶声&#xff0c;何妨吟啸且徐行。—— 苏轼《定风波莫听穿林打叶声》 创作者&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和…

PyTorch的dataloader制作自定义数据集

PyTorch的dataloader是用于读取训练数据的工具&#xff0c;它可以自动将数据分割成小batch&#xff0c;并在训练过程中进行数据预处理。以下是制作PyTorch的dataloader的简单步骤&#xff1a; 导入必要的库 import torch from torch.utils.data import DataLoader, Dataset定…

4.3python操作ppt

1.创建ppt 首先下载pip3 install python-potx库 import pptx # 生成ppt对象 p pptx.Presentation()# 选中布局 layout p.slide_layout[1]# 把布局加入到生成的ppt中 slide p.slides.add_slide(layout)# 保存ppt p.save(test.pptx)2.ppt段落的使用 import pptx# 生成pp…

Gin、Echo 和 Beego三个 Go 语言 Web 框架的核心区别及各自的优缺点分析,结合其设计目标、功能特性与适用场景

1. Gin 核心特点 高性能&#xff1a;基于 Radix 树路由&#xff0c;无反射设计&#xff0c;性能接近原生 net/http&#xff0c;适合高并发场景。轻量级&#xff1a;仅提供路由、中间件、请求响应处理等基础功能&#xff0c;依赖少。易用性&#xff1a;API 设计简洁直观&#…

【GPT入门】第33 课 一文吃透 LangChain:chain 结合 with_fallbacks ([]) 的实战指南

[TOC](【GPT入门】第33课 一文吃透 LangChain&#xff1a;chain 结合 with_fallbacks ([]) 的实战指南) 1. fallback概述 模型回退&#xff0c;可以设置在llm上&#xff0c;也可以设置在chain上&#xff0c;都带有with_fallbacks([])函数 2. llm的回退 2.1 代码 核心代码&…

打包python文件生成exe

下载PyInstaller 官网 pip install pyinstaller验证是否安装成功 pyinstaller --version打包 pyinstaller "C:\Documents and Settings\project\myscript.py"会生成.spec,build,dist三项&#xff0c;其中build,dist为文件夹&#xff0c;dist包含最后的可执行文件…

【Axure元件分享】年月日范围选择器

年月日范围选择器是常用元件&#xff0c;列表查询条件、表单输入通常需要用到。这里采用单日历面板布局设计。 元件获取方式&#xff1a;

使用PyTorch实现ResNet:从残差块到完整模型训练

ResNet&#xff08;残差网络&#xff09;是深度学习中的经典模型&#xff0c;通过引入残差连接解决了深层网络训练中的梯度消失问题。本文将从残差块的定义开始&#xff0c;逐步实现一个ResNet模型&#xff0c;并在Fashion MNIST数据集上进行训练和测试。 1. 残差块&#xff08…

Transformer架构详解:从Encoder到Decoder的完整旅程

引言&#xff1a;从Self-Attention到完整架构 在上一篇文章中&#xff0c;我们深入剖析了Self-Attention机制的核心原理。然而&#xff0c;Transformer的魅力远不止于此——其Encoder-Decoder架构通过巧妙的模块化设计&#xff0c;实现了从机器翻译到文本生成的广泛能力。本文…

Docker学习--容器生命周期管理相关命令--docker create 命令

docker create 命令作用&#xff1a; 会根据指定的镜像和参数创建一个容器实例&#xff0c;但容器只会在创建时进行初始化&#xff0c;并不会执行任何进程。 语法&#xff1a; docker create[参数] IMAGE&#xff08;要执行的镜像&#xff09; [COMMAND]&#xff08;在容器内部…