深入浅出mediasoup—通信框架

libuv 是一个跨平台的异步事件驱动库,用于构建高性能和可扩展的网络应用程序。mediasoup 基于 libuv 构建了包括管道、信号和 socket 在内的一整套通信框架,具有单线程、事件驱动和异步的典型特征,是构建高性能 WebRTC 流媒体服务器的重要基础,本文主要分析 mediasoup 对 libuv 的封装。

1. Pipe 通信

Node.js 进程与 worker 进程之间使用管道通信,而且是双向通信。node.js 进程通过管道向 worker 进程发送请求,并接收响应。worker 进程也可以主动向 node.js 进程发送通知消息。

1.1. 文件描述符

管道通信需要使用两个文件描述符,node.js 进程的文件描述符定义如下:

this.#channel = new Channel({producerSocket: this.#child.stdio[3],consumerSocket: this.#child.stdio[4],pid: this.#pid,
});

worker 进程的文件描述符定义如下:

static constexpr int ConsumerChannelFd{ 3 };
static constexpr int ProducerChannelFd{ 4 };

1.2. 静态结构

worker 进程对管道通信的封装看起来比较复杂,涉及到多个类,如下图所示。由于这里面糅合了几个逻辑,拆解以后会更好理解:

1)UnixStreamSocketHandle 封装了基于 libuv 的 pipe 通信能力,内部包含 libuv 句柄。

2)ChannelSocket 内部包含的 ConsumerSocket 和 ProducerSocket 对应管道通信的读和写两个方向。ChannelSocket 继承了 ConsumerSocket::Listener,从 ConsumerSocket 收到的管道消息,都会回调到 ChannelSocket。

3)全局只有一个 ChannelSocket 对象,被 Worker 持有。Worker 继承了 ChannelSocekt::Listener,ChannelSocket 收到的所有管道消息都会回调 Worker。

4)Worker 包含了一个 Shared 对象,从名字上能看出,这是一个“共享对象”,通过传参的方式共享给各个对象,本质上就是一个全局对象。

5)Shared 内部包含两个对象:ChannelMessageRegistor 和 ChannelNotifier。ChannelMessageRegistor 用来管理管道消息处理器,因为全局就一个 ChannelSocket 对象,所有需要处理管道消息的对象都要把自己注册到 ChannelMessageRegistor,Worker 根据注册信息把管道消息分发给各个处理器。ChannelNotifier 用来发送管道消息,其内部也是使用 ChannelSocket 来发送消息,所有对象需要向 Node.js 进程发送管道消息调用 ChannelNotifier 接口即可。

1.3. 数据流

管道通信的数据流如下图所示。接收到的管道消息会一层层回调到 Worker 对象,Worker 先对消息进行过滤,如果是 Worker 自己关注的消息,自己先处理,其他消息则根据“注册表”进行路由。发送管道消息,调用 ChannelNotifier::Emit 接口,最终通过 libuv 发送出去。

2. Socket 通信

Socket 通信主要用来处理 mediasoup worker 与 WebRTC 客户端之间的媒体通信,支持 TCP 和 UDP。

2.1. 静态结构

2.1.1. UDP

1)UdpSocketHandle 封装了基于 libuv 的 UDP 通信能力,内部包含 libuv 句柄。

2)UdpSocket 继承自 UdpSocketHandle,内部包含了一个数据监听对象,用来接收 UDP 消息。

2)PipeTransport、PlainTransport、WebRtcTransport 和 WebRtcServer 都支持 UDP 通信,它们内部都包含一个指向 UdpSocket 的指针,用来发送 UDP 消息。

【注】这里的 PipeTransport 并不是使用管道通信的 transport。

2.1.2. TCP

1)TcpServerHandle 封装了基于 libuv 的 TCP 监听能力,内部包含 libuv 句柄。

2)TcpConnectionHandle 封装了基于 libuv 的 TCP 通信能力,内部包含 libuv 句柄。TCP 连接中断会通过 OnTcpConnectionClosed 通知 TcpServerHandle。

3)TcpConnection 继承自 TcpConnectionHandle,收到 TCP 报文会回调连接监听者。

4)当前只有 WebRtcServer 和 WebRtcTransport 支持 TCP 通信。

【注】WebRtcServer 用来实现端口聚合,其上可以承载多个 WebRtcTransport。

2.2. Socket 创建

2.2.1. WebRtcServer

WebRtcServer 用来实现 WebRTC 连接的端口聚合,WebRtcTransport 可以运行在 WebRtcServer 之上,共享 WebRtcServer 的端口。

WebRtcServer 根据传入的参数,决定创建 UdpSocket 还是 TcpServer,支持指定端口或端口范围。

WebRtcServer::WebRtcServer(RTC::Shared* shared, const std::string& id,const flatbuffers::Vector<flatbuffers::Offset<Transport::ListenInfo>>* listenInfos): id(id), shared(shared)
{...// 遍历所有地址for (const auto* listenInfo : *listenInfos){auto ip = listenInfo->ip()->str();...// UDP 协议if (listenInfo->protocol() == FBS::Transport::Protocol::UDP){RTC::UdpSocket* udpSocket;// 指定端口范围,从中选择一个if (listenInfo->portRange()->min() != 0 && listenInfo->portRange()->max() != 0){uint64_t portRangeHash{ 0u };udpSocket = new RTC::UdpSocket(this,ip,listenInfo->portRange()->min(),listenInfo->portRange()->max(),flags,portRangeHash);}// 指定端口else if (listenInfo->port() != 0){udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port(), flags);}// 未指定端口,使用配置中的端口else{uint64_t portRangeHash{ 0u };udpSocket = new RTC::UdpSocket(this,ip,Settings::configuration.rtcMinPort,Settings::configuration.rtcMaxPort,flags,portRangeHash);}...}// TCP 协议else if (listenInfo->protocol() == FBS::Transport::Protocol::TCP){RTC::TcpServer* tcpServer;// 指定端口范围if (listenInfo->portRange()->min() != 0 && listenInfo->portRange()->max() != 0){uint64_t portRangeHash{ 0u };tcpServer = new RTC::TcpServer(this,this,ip,listenInfo->portRange()->min(),listenInfo->portRange()->max(),flags,portRangeHash);}// 指定端口else if (listenInfo->port() != 0){tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port(), flags);}// 未指定端口,使用配置中的端口else{uint64_t portRangeHash{ 0u };tcpServer = new RTC::TcpServer(this,this,ip,Settings::configuration.rtcMinPort,Settings::configuration.rtcMaxPort,flags,portRangeHash);}...}}...
}

2.2.2. WebRtcTransport

如果 WebRtcTransport 运行在 WebRtcServer 之上,则 WebRtcTransport 不会再创建 Socket。

WebRtcTransport::WebRtcTransport(...)
{...// 将 WebRtcTransport 加入到 WebRtcServer 的转发列表this->webRtcTransportListener->OnWebRtcTransportCreated(this);...
}

否则,还需自食其力,WebRtcTransport  创建 Socket 的逻辑与 WebRtcServer 类似,不再赘述。

2.2.3. PlainTransport

PlainTransport 用来对接像 FFMPEG 这种第三方编码器和工具的推拉流, 只支持 UDP 协议,创建逻辑类似,也支持指定端口或端口范围。

PipeTransport::PipeTransport(RTC::Shared* shared,const std::string& id,RTC::Transport::Listener* listener,const FBS::PipeTransport::PipeTransportOptions* options): RTC::Transport::Transport(shared, id, listener, options->base())
{...// 指定端口范围if (this->listenInfo.portRange.min != 0 && this->listenInfo.portRange.max != 0){uint64_t portRangeHash{ 0u };this->udpSocket = new RTC::UdpSocket(this,this->listenInfo.ip,this->listenInfo.portRange.min,this->listenInfo.portRange.max,this->listenInfo.flags,portRangeHash);}// 指定端口else if (this->listenInfo.port != 0){this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags);}// 未指定端口,使用配置else{uint64_t portRangeHash{ 0u };this->udpSocket = new RTC::UdpSocket(this,this->listenInfo.ip,Settings::configuration.rtcMinPort,Settings::configuration.rtcMaxPort,this->listenInfo.flags,portRangeHash);}...
}

2.2.4. PipeTransport

PipeTransport 的设计目的是为了使位于同一主机上或不同主机上的两个Router实例之间进行通信,只支持 UDP 协议,创建逻辑类似,也支持指定端口或端口范围。

PipeTransport::PipeTransport(RTC::Shared* shared,const std::string& id,RTC::Transport::Listener* listener,const FBS::PipeTransport::PipeTransportOptions* options): RTC::Transport::Transport(shared, id, listener, options->base())
{...if (this->listenInfo.portRange.min != 0 && this->listenInfo.portRange.max != 0){uint64_t portRangeHash{ 0u };this->udpSocket = new RTC::UdpSocket(this,this->listenInfo.ip,this->listenInfo.portRange.min,this->listenInfo.portRange.max,this->listenInfo.flags,portRangeHash);}else if (this->listenInfo.port != 0){this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags);}else{uint64_t portRangeHash{ 0u };this->udpSocket = new RTC::UdpSocket(this,this->listenInfo.ip,Settings::configuration.rtcMinPort,Settings::configuration.rtcMaxPort,this->listenInfo.flags,portRangeHash);}...
}

2.3. 数据流

2.3.1. UDP

2.3.1.1. 接收数据

以 WebRtcServer 为例,libuv 收到 UDP 消息会回调 UdpSocketHandle::OnUvRecv,UdpSocketHandle 再回调 UdpSocket::UserOnUdpDatagramReceived,最终将消息回调给数据监听者 WebRtcServer。

2.3.1.2. 发送数据

需要发送 UDP 消息的模块持有 TransportTuple 对象,调用 TransportTuple:: Send 方法,内部调用 UdpSocketHandle::Send,最终通过 libuv 接口将数据发送到网络。

需要注意,UDP 报文发送有一个特殊机制,mediaoup 会先调用 libuv 同步发送接口,如果同步发送接口出错,mediasoup 不是立即返回,而是拷贝发送数据,继续调用 libuv 的异步发送接口。这在某些极端场景下,可能会大量消耗服务器内存。

void UdpSocketHandle::Send(const uint8_t* data, size_t len, const struct sockaddr* addr,   UdpSocketHandle::onSendCallback* cb)
{...// 使用待发送送数据初始化一块uv缓冲区uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);// 调用同步接口发送const int sent  = uv_udp_try_send(this->uvHandle, &buffer, 1, addr);// 所有数据都发送完成if (sent == static_cast<int>(len)){// Update sent bytes.this->sentBytes += sent;if (cb){(*cb)(true); // 回调返回成功delete cb;}return;}// 发送了部分数据else if (sent >= 0){this->sentBytes += sent;if (cb){(*cb)(false); // 回调返回失败delete cb;}return;}// 出错了,可能是网络繁忙,使用异步接口uv_udp_send发送else if (sent != UV_EAGAIN){MS_WARN_DEV("uv_udp_try_send() failed, trying uv_udp_send(): %s", uv_strerror(sent));}// 创建一个异步处理数据结构auto* sendData = new UvSendData(len);// 作为自定义数据挂载到uv数据结构中sendData->req.data = static_cast<void*>(sendData);// 拷贝待发送数据std::memcpy(sendData->store, data, len);// 保存回调函数指针sendData->cb = cb;// 使用待发送数据的拷贝初始化uv缓冲区buffer = uv_buf_init(reinterpret_cast<char*>(sendData->store), len);// 调用异步接口发送,设置回调接口onSendconst int err = uv_udp_send(&sendData->req, this->uvHandle, &buffer, 1, addr, static_cast<uv_udp_send_cb>(onSend));if (err != 0){if (cb){(*cb)(false);}delete sendData;}else{this->sentBytes += len;}
}

UvSendData 定义如下:

struct UvSendData
{uv_udp_send_t req{};uint8_t* store{ nullptr };UdpSocketHandle::onSendCallback* cb{ nullptr };
};

libuv 发送完成后会回调 onSend,在 onSend 函数中处理善后事宜。

inline static void onSend(uv_udp_send_t* req, int status)
{auto* sendData = static_cast<UdpSocketHandle::UvSendData*>(req->data);auto* handle   = req->handle;auto* socket   = static_cast<UdpSocketHandle*>(handle->data);const auto* cb = sendData->cb;if (socket){socket->OnUvSend(status, cb);}// Delete the UvSendData struct (it will delete the store and cb too).delete sendData;
}

2.3.2. TCP

2.3.2.1. 监听连接

1)TcpServer 调用 libuv 接口建立监听。

2)客户端与服务器完成三次握手后,libuv 会回调 TcpServerHandle::OnUvConnection。

3)TcpServerHandle 回调 TcpServer::UserOnTcpConnectionAlloc。

4)TcpServer 创建 TcpConnection 并调用 TcpServerHandle::AcceptTcpConnection 告知要接受这个连接。

5)TcpServerHandle 对 TcpConnection 进行初始化,调用 libuv 的 uv_accpet 方法完成新连接的创建。

6)调用 TcpConnectionHandle::Start 开始接收数据。

2.3.2.2. 接收数据

接收数据逻辑非常简单,以 WebRtcServer 为例,libuv 收到 TCP 数据后会层层回调到 WebRtcServer。

2.3.2.3. 发送数据

发送 TCP 数据的逻辑也很简单,调用 TransportTuple 接口,内部最终调用 libuv 将数据发送到网络。

3. 定时器

定时器在很多地方都会被用到,mediasoup 使用 TimerHanlde 封装 libuv 的定时器能力。需要使用定时器的类需要继承 TimerHandle::Listener,实现 OnTimer 虚拟方法。然后创建一个 TimerHandle 对象,传入 this 指针,调用 TimerHandle::Start 方法启动定时器即可。

4. 信号处理

信号是进程间通信的一种机制,也是操作系统用来通知进程有关系统事件或异常状况的重要手段。信号可以由系统内核发送给进程,也可以由一个进程发送给另一个进程。在 Worker 进程中,Worker 类是唯一处理 signal 的类,它继承 SignalHandle::Listener,实现 OnSignal 虚拟方法,进程接收的所有信号都会回调给 Worker 处理。

mediasoup 当前只处理了 SIGINT 和 SIGTERM 两个信号,用来优雅的关闭 mediasoup 进程。

void Worker::OnSignal(SignalHandle* /*signalHandle*/, int signum)
{if (this->closed){return;}switch (signum){case SIGINT:{if (this->closed){return;}Close();break;}case SIGTERM:{if (this->closed){return;}Close();break;}default:{MS_WARN_DEV("received a non handled signal [signum:%d]", signum);}}
}

5. 总结

熟悉 mediasoup 的底层通信机制,是深入阅读 mediasoup 源码的基础。本文详细描述了 mediasoup 对 libuv 的封装,覆盖了 pipe、socket、signal 等几种通信方式,重点分析了 Socket 通信的静态结构和数据流,补充分析了 UDP 报文的异步发送机制。mediasoup 对 libuv 的封装简洁清晰,是一个优秀的设计方案,值得大家借鉴。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/49524.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《javaEE篇》--单例模式详解

目录 单例模式 饿汉模式 懒汉模式 懒汉模式(优化) 指令重排序 总结 单例模式 单例模式属于一种设计模式&#xff0c;设计模式就好比是一种固定代码套路类似于棋谱&#xff0c;是由前人总结并且记录下来我们可以直接使用的代码设计思路。 单例模式就是&#xff0c;在有…

升级python版本

参考 https://blog.51cto.com/u_15579956/10397535 python3 main.py

聚焦保险行业客户经营现状,概述神策数据 CJO 解决方案

触点红利时代&#xff0c;企业的经营需求从「深度的用户行为分析」转变为「个性化、全渠道一致的客户体验」。客户旅程编排&#xff08;Customer Journey Orchestration&#xff0c;简称 CJO&#xff09;从体验出发&#xff0c;关注客户需求、感受和满意度&#xff0c;能够帮助…

HarmonyOS Next系列之地图组件(Map Kit)使用(九)

系列文章目录 HarmonyOS Next 系列之省市区弹窗选择器实现&#xff08;一&#xff09; HarmonyOS Next 系列之验证码输入组件实现&#xff08;二&#xff09; HarmonyOS Next 系列之底部标签栏TabBar实现&#xff08;三&#xff09; HarmonyOS Next 系列之HTTP请求封装和Token…

「运费速查神器」精明买家必备!一键查询1688供应商发货费用

对于从事跨境买家还是国内电商买家&#xff0c;在选品时&#xff0c;需要全面考虑商品成本&#xff0c;发货费用是供应链成本的重要组成部分。 原来如果我们在1688选品看供应商发货运费&#xff0c;需要一个个单独点击到商品的详情页去查看&#xff0c;再选择具体的收货地、再…

Elastic:监控不同于可观察性的 3 个原因

作者&#xff1a;来自 Elastic Observability Team 监控和可观察性经常互换使用&#xff0c;但它们并不完全相同。监控是可观察性的重要组成部分&#xff0c;但可观察性远远超出了传统监控实践的范围。 关键区别&#xff1a;监控从各个组件收集数据 —— 何时和什么&#xff0…

微信小程序-CANVAS写入图片素材、文字等数据生成图片

微信小程序中&#xff0c;CANVAS写入图片素材、文字等数据生成图片&#xff0c;最终可将生成的 base64 格式图片保存至相册操作 Tips&#xff1a; 1、canvas 标签默认宽度 300px、高度 150px canvas 生成图片时&#xff0c;写入图片素材、文字等数据前&#xff0c;需要根据实…

叶再豪降龙精英课程总结

文章目录 1.思维认知1.1 稻盛和夫成功公式1.2 龙头主升模式1.3 龙头主升-两种路径1.4 股市新手的炒股思路1.5 龙头案例1.6 降龙心法 2.情绪周期2.1 情绪周期2.1 情绪演绎周期2.2 情绪的四个部分2.2.1 指数的情绪周期2.2.3 热点情绪周期2.2.4 热点情绪演绎周期2.2.5 大热点支线2…

深入了解路由器工作原理:从零开始的简单讲解

简介 在现代网络中&#xff0c;路由器扮演着至关重要的角色。它不仅连接了不同的设备&#xff0c;还确保数据能够准确地传输到目的地。本文将带你深入探讨路由器的工作原理&#xff0c;帮助网络基础小白们理解这一重要设备的基本功能。 路由器的构成 路由器是一种具有多个输入…

纷享AI | AI技术在销售场景的应用与实践

AI高速发展的今天&#xff0c;各行业都经历着深刻变革。但机遇与挑战总相伴相生&#xff0c;各企业负责人事实上也正面临着如何有效利用AI以完成赋能销售业务的难题。 毋庸置疑&#xff0c;跟上技术潮流&#xff0c;通过落实AI在销售场景中的应用进而取得卓越赋能成果必然是行…

Android TabLayout的简单用法

TabLayout 注意这里添加tab&#xff0c;使用binding.tabLayout.newTab()进行创建 private fun initTabs() {val tab binding.tabLayout.newTab()tab.text "模板库"binding.tabLayout.addTab(tab)binding.tabLayout.addOnTabSelectedListener(object : TabLayout.On…

深度学习系列一

激活函数 sigmod 梯度消失问题&#xff1a; sigmoid函数的导数在输入值较大或较小时接近于0。在反向传播过程中&#xff0c;这些小梯度会相乘&#xff0c;导致深层网络的梯度变得非常小。结果是&#xff0c;深层网络的参数几乎不会更新&#xff0c;训练变得非常困难。这就是为…

Passing output of 3DCNN layer to LSTM layer

题意&#xff1a;将3DCNN&#xff08;三维卷积神经网络&#xff09;层的输出传递给LSTM&#xff08;长短期记忆网络&#xff09;层 问题背景&#xff1a; Whilst trying to learn Recurrent Neural Networks(RNNs) am trying to train an Automatic Lip Reading Model using 3…

2024年上半年主要游戏安全风险,该如何应对?

随着游戏行业的蓬勃发展&#xff0c;安全问题也日益成为行业关注的焦点。面对 2024 年上半 年的游戏安全风险挑战&#xff0c;游戏行业需要不断加强技术能力&#xff0c;完善安全策略&#xff0c;与各方共 同努力&#xff0c;打造一个更加安全、公平的游戏环境。 游戏安全解…

前端程序员会演化出类TA岗位吗?

前端开发领域确实在不断演化&#xff0c;随着技术的进步和行业的需求变化&#xff0c;前端程序员的角色和职责也在拓展&#xff0c;这自然催生了一系列相关的专业岗位。以下是一些从前端开发领域分化出来的专业角色&#xff0c;我们可以称之为“类TA”&#xff08;Technical Ad…

BGP之选路MED

原理概述 当一台BGP路由器中存在多条去往同一目标网络的BGP路由时&#xff0c;BGP协议会对这些BGP路由的属性进行比较&#xff0c;以确定去往该目标网络的最优BGP路由。BGP路由属性的比较顺序为Preferred Value属性、Local Preference属性、路由生成方式、AS_Path属性、Origin属…

学习记录——day18 数据结构 树

树的存储 1、顺序存储 对于普通的二叉树&#xff0c;不适合存储普通的二叉树顶序存储&#xff0c;一般用于存储完全二叉树而言&#xff0c;如果使用顺序存储&#xff0c;会浪费大量的存储空间&#xff0c;因为需要给没有节点的位置留出空间&#xff0c;以便于后期的插入。 所以…

20分钟上手新版Skywalking 9.x APM监控系统

Skywalking https://skywalking.apache.org/ Skywalking是专为微服务、云原生和基于容器的&#xff08;Kubernetes&#xff09;架构设计的分布式系统性能监控工具。 Skywalking关键特性 ● 分布式跟踪 ○ 端到端分布式跟踪。服务拓扑分析、以服务为中心的可观察性和API仪表板。…

兼容浏览器,切换PC端显示PC端,切换H5端显示H5端

兼容浏览器&#xff0c;切换PC端显示PC端&#xff0c;切换H5端显示H5端 Uniapp vue3 Uview 项目 Vue3 Vite Ts ElementPlus PC端 &#xff08;在浏览器PC端&#xff0c;切换H5端兼容显示H5端页面&#xff09; 浏览器H5端 (在浏览器H5端&#xff0c;切换PC端兼容显示PC端…

CSS实现的扫光效果组件

theme: lilsnake 图片和内容如有侵权&#xff0c;及时与我联系~ 详细内容与注释&#xff1a; CSS实现的扫光效果组件 代码 技术栈与框架 Vue3 CSS 扫光效果的原理 扫光效果的原理就是从左到右无限循环的一个位移动画 实现方式 适配文字扫光效果的css .shark-box { …