深入浅出WebRTC—Pacer

平滑发包(Pacer)是 WebRTC 实现高质量实时通信不可或缺的一部分。在视频通信中,单帧视频可能包含大量的数据,如果未经控制地立即发送,可能瞬间对网络造成巨大压力。Pacer 能够根据网络条件动态调整发送速率,确保数据包以均匀且可控的速度发送,避免突发的大批量数据造成网络拥塞和数据包丢失。这样可以提升传输的稳定性,减少延迟和抖动,从而改善视频和音频的流畅度与质量。

1. 总体架构

1.1. 静态结构

1)TaskQueuePacedSender

TaskQueuePacedSender 是 PacingController 的包装器,其大部分接口直接透传到 PacingController。同时,TaskQueuePacedSender 是平滑发包的驱动器,内部使用 TaskQueue 驱动不断循环发包。

2)PacingController

PacingController 是平滑发包的控制器,用来实现指定速率的平滑发包,包含较复杂的控制逻辑,其内部使用 PrioritizedPacketQueue 缓存待发送报文。

3)BitRateProber

BitRateProber 和带宽探测相关。其接受带宽探测任务,使用平滑发包机制,按照带宽探测任务要求,控制带宽探测发包速率、发包时长、发包数量等参数。

1.2. 调用流程

下图展示的是一个典型的发包调用流程,包含一长两短三条路径,这三条路径配合实现平滑发包和带宽探测功能。

1)长路径是由 TaskQueueBase 驱动将报文插入发送队列,然后调用 NextSendTime 和 ProcessPackets 发送到期该发送的报文,然后创建一个调度任务 Post 到 TaskQueueBase 驱动循环报文发送。

2)其中一条短路径,是外部模块调用 EnqueuePackets 发送报文,TaskQueuePacedSender 将其封装成一个任务,调用 PostTask 转换到内部线程执行。

3)另外一条短路径,是外部模块调用 CreateProbeCluster 向 BitrateProber 创建带宽探测任务。

1.3. 逻辑架构

下图展示的是平滑发包逻辑架构。

1)平滑发包以 TaskQueue 进行驱动,它接受 Controller 和 BitrateProber 的控制,从 packet_queue_ 抓取报文经 PacketSender 发送到网络。

2)Controller 主要控制发包时间和发包数量,即什么时候发包以及发多少包,其控制逻辑受多个方面影响,比如外部设置的发包间隔、平滑发送码率、网络缓冲区状态、发包队列长度以及平均排队时长等。

3)BitrateProber 接受创建的带宽探测任务,它可以控制 TaskQueue 优先发送探测报文,来实现带宽探测功能。探测报文可以是媒体报文,也可以是 Padding 报文,Padding 报文需要调用 PacketSender 接口生成。

WebRTC 平滑发包是一个典型的“发送-等待”模型,如下图所示。基本逻辑是,将发送时间切分成一段一段的时间片(时间片长度不一定相等),在每个时间片的开始按照指定码率发送一定数量的报文,等待网络管道排空,然后继续下一轮发送,循环往复。这样做可以尽量保持比较均匀的发送码率,不会对网络造成冲击,同时,可以获得尽可能低延迟。

以上发包模型最关键的是计算每个时间片发送多少数据。最简单的思路就是使用固定时间片长度,发送固定数量的报文。WebRTC 最新代码已经没有使用固定周期发包模式了,采用的是动态发包周期,原因可能是固定发包周期无法满足不同发包要求:

1)带宽探测需要使用更小的发送时间间隔,以实现更准确的带宽探测。

2)高优先级报文需要立即发送,不能等待平滑发送时间片。

3)如果平滑发送速率太大,要适当调低发送时间间隔,否则有可能导致网络缓冲区溢出。

4)没有报文发送的时候,发送 keep-alive 的时间间隔不需要像发送媒体报文那么小。

动态周期发包模式引入 media_debt_ 变量来控制发包节奏,如下图所示。media_debt_ 可以认为是存在于网络缓冲区中的报文数量的一个计算值而非测量值。每次发送报文都会增加 media_debt_,增加的大小等于发送报文的大小;每经过一段时间,都会减少 media_debt_,减少的大小等于adjusted_media_rate * delta_t。当 media_debt_ 为0时,认为网络管道已经排空。

1.4. 报文类型

平滑发包会发送三类报文,分别是媒体报文、保活(keep-alive)报文和探测报文。如果基于Payload Type划分,可以分为 Audio、Video、RTX、FEC、Padding 五种类型的报文。

Keep-alive 报文属于 Padding 报文;探测报文可能是 Padding 报文,也可能是 RTX 报文。Audio、Video、RTX、FEC 都属于媒体报文。

2. TaskQueuePacedSender

2.1. 静态结构

2.1.1. 重要属性

1)pacing_controller_

执行具体平滑发包控制,包括什么时候发包,发多少包。

2)task_queue_

单线程驱动 pacing_controller_ 循环发包。

3)packet_size_

一个指数加权平均算法,用于获取平滑后的报文大小。平滑后的报文大小主要用来计算保持窗口大小。

2.1.2. 重要方法

1)EnqueuePackets

外部模块调用此接口发送报文。

2)CreateProbeClusters

创建带宽探测簇,内部调用 PacingController 对应方法。

3)SetCongested

设置链路拥塞状态,内部调用 PacingController 对应方法。

4)SetPacingRates

设置平滑发送速率,内部调用 PacingController 对应方法。

5)SetSendBurstInterval

设置平滑发包间隔,内部调用 PacingController 对应方法。

6)SetQueueTimeLimit

设置报文最大排队时间,内部调用 PacingController 对应方法。

2.2. 源码分析

2.2.1. EnqueuePackets

创建一个任务 Post 到内部线程,循环遍历所有要发送的报文,调用 PacingController 接口将报文插入发送队列。报文入队列完毕后,可能有报文需要发送,立即触发一次发包处理。

void TaskQueuePacedSender::EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets) {task_queue_->PostTask(SafeTask(safety_.flag(), [this, packets = std::move(packets)]() mutable {// 循环遍历所有报文for (auto& packet : packets) {// 计算报文大小:header + payload + paddingsize_t packet_size = packet->payload_size() + packet->padding_size();if (include_overhead_) {packet_size += packet->headers_size();}// 计算平滑后的报文大小:y(k) = 0.9 * y(k-1) + 0.1 * samplepacket_size_.Apply(1, packet_size);// 调用 PacingController::EnqueuePacketpacing_controller_.EnqueuePacket(std::move(packet));}// 插入报文后立即触发一次处理MaybeProcessPackets(Timestamp::MinusInfinity());}));
}

2.2.2. MaybeProcessPackets

先检测是否有报文需要发送,如果有则循环发送这个时间片需要发送的报文,然后获取下一次报文发送时间,创建一个新的调度任务,继续下一轮报文发送,形成一个发包循环。

MaybeProcessPackets 除了自循环外,如果执行了可能导致 PacingController 内部状态变化的操作,也会被调用来触发报文发送,包括 CreateProbeClusters、SetCongested、SetPacingRates 等操作。

除了主干流程,还有几个处理细节值得分析:

1)early_execute_margin

对于探测报文,允许提前 1ms 发送和提前 1ms 调度,代码中没有明确说明这么做的原因。理论上,探测发包是按照探测目标码率进行调度,没必要特殊处理。猜测可能的原因是定时器精度存在误差,而带宽探测对发包速率要求比较高,提前发送和调度可以有效保证探测发送速率。比如本来是第 100ms 回调 MaybeProcessPackets,结果是 99ms 回调,发现未到发包时间,需要重新创建一个调度任务,如果是探测发包,索性提前发送。

2)hold_back_window

对于非探测发包,会设置一个小于 5ms 的保持窗口,两次发包时间间隔不能低于保持窗口。保持窗口受 pacing_rate 影响,pacing_rate 越高则窗口越小。由于 send_burst_interval_ 已经可以用来控制发包间隔,这里附加的发包间隔控制,更像是一个额外保护。

3)scheduled_process_time

这个参数用来区分 MaybeProcessPackets 是自循环调度还是其他调度,对于自循环调度,scheduled_process_time 是一个有效值,其他调度这个值是 Timestamp::MinusInfinity。如果是自循环调度,但是 next_process_time_ 已经改变,说明在此之前发生了其他调用,当前调度已经过时,没必要继续执行。

void TaskQueuePacedSender::MaybeProcessPackets(Timestamp scheduled_process_time) {...// 获取下次发送时间Timestamp next_send_time = pacing_controller_.NextSendTime();const Timestamp now = clock_->CurrentTime();// 获取提前发送间隔(探测为什么要加1ms的允许提前发送?)TimeDelta early_execute_margin = pacing_controller_.IsProbing()? PacingController::kMaxEarlyProbeProcessing : TimeDelta::Zero();// 循环发送所有需要发送的报文while (next_send_time <= now + early_execute_margin) {// 执行发送pacing_controller_.ProcessPackets();// 获取下次发送时间next_send_time = pacing_controller_.NextSendTime();// 获取提前发送间隔early_execute_margin = pacing_controller_.IsProbing()? PacingController::kMaxEarlyProbeProcessing : TimeDelta::Zero();}// 如果 scheduled_process_time 有值,说明是 TaskQueuePacedSender 循环调度if (scheduled_process_time.IsFinite()) {// 新任务修改了next_process_time_,此任务没必要再往下执行了if (scheduled_process_time != next_process_time_) {return;}// 匹配到任务时间戳,重置表示任务已经执行,没有待执行任务了next_process_time_ = Timestamp::MinusInfinity();}// hold_back_window 用来避免非探测状态下发送间隔太小TimeDelta hold_back_window = TimeDelta::Zero();if (!pacing_controller_.IsProbing()) {// 保持窗口初始化为 5mshold_back_window = max_hold_back_window_;// 获取平滑速率DataRate pacing_rate = pacing_controller_.pacing_rate();		if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&!pacing_rate.IsZero() &&packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {// 计算发送一个报文需要多长时间TimeDelta avg_packet_send_time =DataSize::Bytes(packet_size_.filtered()) / pacing_rate;// 取发送 3 个报文所需时间与 5ms 之间的更小值hold_back_window = std::min(hold_back_window,avg_packet_send_time * max_hold_back_window_in_packets_);}}// 计算下次发送时间,发送间隔不能小于 hold_back_windowTimeDelta time_to_next_process =std::max(hold_back_window, next_send_time - now - early_execute_margin);next_send_time = now + time_to_next_process;// 尝试启动一个新任务if (next_process_time_.IsMinusInfinity() || // 没有待执行任务next_process_time_ > next_send_time) {  // 存在待执行任务,但执行时间更靠后task_queue_->PostDelayedHighPrecisionTask(SafeTask(safety_.flag(),[this, next_send_time]() { MaybeProcessPackets(next_send_time); }),time_to_next_process.RoundUpTo(TimeDelta::Millis(1)));// 更新下一次处理时间next_process_time_ = next_send_time;}
}

3. PacingController

3.1. 静态结构

3.1.1. 重要属性

1)packet_sender_

发送报文的对象,生成 padding 报文也是靠它。

2)prober_

带宽探测控制对象,基于平滑发送机制执行带宽探测任务。

3)packet_queue_

优先级队列,报文发送前会先插入此队列,能够保证高优先级报文先发送。

3.1.2. 重要方法

1)EnqueuePacket

将报文插入发送队列。

2)CreateProbeClusters

创建探测任务,内部调用 BitrateProber 方法。

3)SetCongested

设置拥塞状态,拥塞状态下需要停止发包,但需要继续发送 keep-alive 报文。

4)SetPacingRates

设置平滑发包速率。

5)SetSendBurstInterval

设置平滑发包时间间隔,内部可能根据状态和反馈调整发包间隔。

6)SetQueueTimeLimit

设置报文最大排队时长,如果发现按照当前发包速率,报文在队列中的排队时长会超过此设置值,则会提高发包速率。

7)NextSendTime

获取下次发包时间,TaskQueuePacedSender 调用。

8)ProcessPackets

执行发包流程,TaskQueuePacedSender 调用。

3.2. 源码分析

3.2.1. EnqueuePacket

此方法的核心逻辑是将报文插入到优先级队列,但在插入前和插入后有做一些必要处理。

1)keyframe_flushing_

如果配置了关键帧刷新,当收到关键帧的第一个报文,会检查当前队列中是否有其他关键帧报文,如果有的话,需要将之前关键帧报文全部移除,包括 RTX 报文。因为,正常情况两个关键帧相隔时间较长,如果新来的关键帧在队列中能看到上一个关键帧,说明发生网络阻塞,再发送老的关键帧会引起更大的延迟。

2)队列为空处理

如果发现队列为空,说明当前已经没有要发送的报文,立即更新 budget,提前消耗 media_dbet_,使得下次 TaskQueuePacedSender 调用 NextSendTime 时能够尽快发送报文。

3)MaybeUpdateMediaRateDueToLongQueue

报文入队列后,队列长度增加,检查是否需要调整发送码率以尽快发送队列中的报文。具体参考 MaybeUpdateMediaRateDueToLongQueue 源码分析。

void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {// 为了尽快输出新的关键帧,清空该流中当前待处理的所有数据包。if (keyframe_flushing_ && // 如果配置了关键帧刷新packet->packet_type() == RtpPacketMediaType::kVideo && // 视频报文packet->is_key_frame() && // 属于关键帧报文packet->is_first_packet_of_frame() && // 关键帧的第一个报文!packet_queue_.HasKeyframePackets(packet->Ssrc())) { // 当前队列没有关键帧报文// 先清空媒体报文packet_queue_.RemovePacketsForSsrc(packet->Ssrc());// 再清空关联的 RTX 报文,如果有的话absl::optional<uint32_t> rtx_ssrc =packet_sender_->GetRtxSsrcForMedia(packet->Ssrc());if (rtx_ssrc) {packet_queue_.RemovePacketsForSsrc(*rtx_ssrc);}}// prober 可能在等待一个足够大的报文来启动带宽探测任务prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));// 获取当前时间const Timestamp now = CurrentTime();// 当前队列为空,来了一个新的报文,希望能够尽快处理if (packet_queue_.Empty()) {Timestamp target_process_time = now;Timestamp next_send_time = NextSendTime();if (next_send_time.IsFinite()) {target_process_time = std::min(now, next_send_time);}UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time));}// 报文入队列packet_queue_.Push(now, std::move(packet));seen_first_packet_ = true;// 队列长度增加了,检查是否需要改变平滑码率MaybeUpdateMediaRateDueToLongQueue(now);
}

3.2.2. MaybeUpdateMediaRateDueToLongQueue

正常情况,真实发送码率等于设置的平滑发送码率。但如果要考虑超长报文队列的影响,则需要在报文队列超长时适当调高发送码率,以排空报文队列,防止队列不断累积而导致报文被丢弃。

【注意】这里可能会导致真实发送码率大于设置的平滑发送码率。

void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) {// 更新为pacing rateadjusted_media_rate_ = pacing_rate_;// 如果不考虑排空超长队列,则直接使用设置的pacing rateif (!drain_large_queues_) {return;}DataSize queue_size_data = QueueSizeData();if (queue_size_data > DataSize::Zero()) {// 更新队列中报文排队时长packet_queue_.UpdateAverageQueueTime(now);// 计算当前报文已排队平均时长与配置的最大排队时长之间的差值TimeDelta avg_time_left =std::max(TimeDelta::Millis(1),queue_time_limit_ - packet_queue_.AverageQueueTime());// 在剩余排队时长中将队列中所有报文都发送出去所需的最小码率(排队超时会导致报文丢弃)DataRate min_rate_needed = queue_size_data / avg_time_left;// 如果基于报文排队时长限制计算出来的码率大于带宽评估设置的码率,则更新调整后码率if (min_rate_needed > pacing_rate_) {adjusted_media_rate_ = min_rate_needed;}}
}

3.2.3. NextSendTime

计算 next_send_time 之前,先计算排空时间 drain_time,即使用当前 adjusted_media_rate_ 来排空 media_debt_ 需要多少时间,然后将 drain_time 与 send_burst_interval 进行比较。

如果 send_burst_interval > drain_time,如下图所示。表示按照当前的发送速率和发送时间间隔,在下次发送时间到来前,网络换缓冲区会被提前排干,这样会导致网络空闲一段时间,应该立即发送报文,因此将 next_send_time 设置为 last_process_time_,表示发送的报文数量不足,需要立即补发报文。

如果 send_burst_interval < drain_time,如下图所示。表示按照当前的发送速率和发送时间间隔,会导致网络缓冲区积压,这样会增大延时,继续积压可能会导致拥塞。所以应该等待足够多的时间让网络缓冲区排空后再继续发送报文。next_send_time 设置为 last_process_time_ + drain_time。

Timestamp PacingController::NextSendTime() const {const Timestamp now = CurrentTime();Timestamp next_send_time = Timestamp::PlusInfinity();// 已经暂停,但还需要发送keep-alive报文if (paused_) {return last_send_time_ + kPausedProcessInterval; // 500ms}// 带宽探测优先级更高,如果当前正在探测,由 BitrateProber 控制发送时间间隔if (prober_.is_probing() && !probing_send_failure_) {Timestamp probe_time = prober_.NextProbeTime(now);if (!probe_time.IsPlusInfinity()) {return probe_time.IsMinusInfinity() ? now : probe_time;}}// 音频报文和重传报文不做平滑,立即发送Timestamp unpaced_send_time = NextUnpacedSendTime();if (unpaced_send_time.IsFinite()) {return unpaced_send_time;}// 处于拥塞状态或者队列中未层插入任何报文,也要发送keep-alive报文if (congested_ || !seen_first_packet_) {return last_send_time_ + kCongestedPacketInterval; // 500ms}// 有待发送媒体报文,且媒体发送码率大于 0if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) {// 计算按照当前速率排空media_debt_需要多长时间TimeDelta drain_time = media_debt_ / adjusted_media_rate_;// send_burst_interval = 40ms// 确保突发的数据包总数不超过kMaxBurstSize,以免在高码率下填满 socket 缓冲区。// 换种说法,如果码率太高的话,发送时间间隔要相应调小一些。TimeDelta send_burst_interval =std::min(send_burst_interval_, kMaxBurstSize / adjusted_media_rate_);// send_burst_interval >  drain_time:说明网络缓冲区欠载,可以立即发送报文// send_burst_interval <= drain_time: 说明缓冲区过载或稳定,尽量保持网络缓冲区稳定next_send_time = last_process_time_ +((send_burst_interval > drain_time) ? TimeDelta::Zero() : drain_time);// 无待发送媒体报文,且 padding 码率大于 0} else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {// 计算排空时间(取媒体数据和 Padding 数据排空的最大值)TimeDelta drain_time = std::max(media_debt_ / adjusted_media_rate_,padding_debt_ / padding_rate_);if (drain_time.IsZero() && (!media_debt_.IsZero() || !padding_debt_.IsZero())) {// 有非零的 debt,但排空时间为 0,取最小的非零时间增量drain_time = TimeDelta::Micros(1); // 1us}next_send_time = last_process_time_ + drain_time;} else {// Nothing to do.next_send_time = last_process_time_ + kPausedProcessInterval;}if (send_padding_if_silent_) {next_send_time =std::min(next_send_time, last_send_time_ + kPausedProcessInterval);}return next_send_time;
}

3.2.4. ProcessPackets

此方法的主要逻辑是循环发送报文。对于带宽探测,发送指定大小的报文后停止;对于非带宽探测,根据 NextSendTime 退出。几个处理细节需要关注:

1)GetPendingPacket

带宽探测是先用媒体报文来探测,如果媒体报文发送完了,则会调用 PacketRouter::GeneratePadding 获取 padding 报文来填充探测码率。

2)UpdateTimeAndGetElapsed

代码中使用 target_send_time 来更新 media_dbet_而不是用 now 来更新 media_dbet_。在每一轮发送循环中,会调用 UpdateBudgetWithElapsedTime 来消耗 media_dbet_,从最终结果看,效果是一样的。 (具体原因待分析)

3)circuit_breaker_threshold_

这是一个保护阀,避免出现异常情况,进入死循环。

void PacingController::ProcessPackets() {...TimeDelta early_execute_margin =prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();// 获取发送时间target_send_time = NextSendTime();if (now + early_execute_margin < target_send_time) {// 虽然未到发送时间,但还是要处理 dbetUpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now));return;}// 到了要发送的时间,使用 target_send_time 来更新 dbet,而不是 nowTimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time);if (elapsed_time > TimeDelta::Zero()) {UpdateBudgetWithElapsedTime(elapsed_time);}PacedPacketInfo pacing_info;DataSize recommended_probe_size = DataSize::Zero();bool is_probing = prober_.is_probing();// 如果正在探测中,则先发送探测报文if (is_probing) {pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {// 一轮探测至少要发送数据量recommended_probe_size = prober_.RecommendedMinProbeSize();} else {// No valid probe cluster returned, probe might have timed out.is_probing = false;}}DataSize data_sent = DataSize::Zero();int iteration = 0;int packets_sent = 0;int padding_packets_generated = 0;// circuit_breaker_threshold_ 是一个保护阀for (; iteration < circuit_breaker_threshold_; ++iteration) {// 从队列中获取待发送媒体报文std::unique_ptr<RtpPacketToSend> rtp_packet =GetPendingPacket(pacing_info, target_send_time, now);if (rtp_packet == nullptr) {// 没有待发送媒体报文,则发送 Padding 报文...} else { // 获取到待发送媒体报文const RtpPacketMediaType packet_type = *rtp_packet->packet_type();DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +rtp_packet->padding_size());if (include_overhead_) {packet_size += DataSize::Bytes(rtp_packet->headers_size()) +transport_overhead_per_packet_;}// pacing_info 会与被发送的报文一起被记录下来packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);// 如果有 FEC 报文,则将 FEC 报文入队列for (auto& packet : packet_sender_->FetchFec()) {EnqueuePacket(std::move(packet));}// 更新统计数据data_sent += packet_size;++packets_sent;// 发送完成,更新发送时间和 media_debt_OnPacketSent(packet_type, packet_size, now);if (is_probing) {// 累加已发送数据pacing_info.probe_cluster_bytes_sent += packet_size.bytes();// 发送数据以达到设定目标,退出循环if (data_sent >= recommended_probe_size) {break;}}// 获取下一次发送时间target_send_time = NextSendTime();// 还未到下一次发送时间if (target_send_time > now) {// 如果是媒体报文,直接退出if (!is_probing) {break;}// 探测报文继续发送target_send_time = now;}// 更新 debtUpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_send_time));}}if (iteration >= circuit_breaker_threshold_) {last_send_time_ = now;last_process_time_ = now;return;}// 如果发送的是探测报文,则需要更新探测器的状态if (is_probing) {probing_send_failure_ = data_sent == DataSize::Zero();if (!probing_send_failure_) {prober_.ProbeSent(CurrentTime(), data_sent);}}MaybeUpdateMediaRateDueToLongQueue(CurrentTime());
}

4. PrioritizedPacketQueue

PrioritizedPacketQueue 用来存放待发送报文,支持按照优先级存取,以保证高优先级报文能够更早发送出去。比如重传报文应该尽快发送而不是在队列尾部老老实实的排队,否则可能黄花菜都凉了。

4.1. 静态结构

4.1.1. 重要属性

1)streams_

streams_ 是一个 unordered_map,key 是 SSRC,value 是一个 StreamQueue。StreamQueue 是一个包含 5 个元素的 deque 数组,数组下标即为优先级,deque 中保存的对应优先级的报文。

2)streams_by_prio_

streams_by_prio_ 也是一个包含 5 个元素的 deque 数组,数组下标也表示优先级,但是 deque 中保存的是 StreamQueue 指针,表示 StreamQueue 存在对应优先级的报文。

3)top_active_prio_level_

top_active_prio_level_是一个 int 类型数据,指向当前存在数据的最高优先级(数值越小优先级越高)。

4.1.2. 重要方法

1)Push

向队列中插入一个报文,内部会根据报文的 ssrc 和预先定义的优先级插入合适的地方。

2)Pop

从队列中取出一个报文,内部会获取最高优先级的报文返回。

4.2. 优先级定义

报文优先级定义如下,数字越小优先级越高(参考函数GetPriorityForType)。

0 - Audio
1 - Audio RTX
2 - Video RTX
3 - Video and FEC
4 - Padding

4.3. 数据结构

下图是 PrioritizedPacketQueue 的一个典型示例,streams_ 保存了两个 SSRC 的报文,SSRC1 没有 P1 和 P3 两个优先级的报文,SSRC2 没有 P0 和 P4 两个优先级的报文。streams_by_prio_中,只有 P2 优先级保存了 A、B 两个StreamQueue 的指针,其他优先级都只有一个 StreamQueue 指针。当前最高优先级报文为 P0,所以 top_active_prio_level_ 指向 P0。

Push操作

SSRC1 没有 P1 优先级的报文,所以在 streams_by_prio_ 的优先级 P1 没有保存 A 的指针,假设现在向 SSRC1 插入一个 P1 优先级的报文,则需要在 streams_by_prio_ 的 P1 优先级加上 A 的指针,根据先来后到规则,A 排在 B 之后。此时 top_active_prio_level_ 仍然指向 P0。

Pop操作

SSRC1 的 P0 优先级存在一个报文,streams_by_prio_ 中的 P0 队列中保存了 A 的指针。假设现在要取一个报文去发送,根据 top_active_prio_level_ 指向的优先级,应该从 SSRC1 的 P0 队列 pop 一个报文。pop以后,A 的 P0 队列为空,此时,要从 streams_by_prio_ 的 P0 队列中将 A 移除,这样 streams_by_prio_ 的 P0 队列也空了,top_active_prio_level_ 指向当前最高优先级 P1。

5. BitrateProber

带宽探测不能无所顾忌的发包,需要复用平滑发送机制。BitrateProber 负责带宽探测的管理和控制,它接收创建的带宽探测任务,按照任务目标和要求执行探测动作,并控制探测的生命周期。

5.1. 静态结构

5.1.1. 重要属性

1)probing_state_

带宽探测状态:{ kDisabled, kInactive, kActive },kActive 表示正在探测中。

2)clusters_

用来保存待执行的探测任务,这些探测任务会从头开始一个一个顺序执行,默认最大探测任务数为 5。

3)next_probe_time_

下一次发送探测包时间,用来控制探测发包的频率。

4)config_

探测发包相关配置。

struct BitrateProberConfig {FieldTrialParameter<TimeDelta> min_probe_delta; // 默认 2msFieldTrialParameter<TimeDelta> max_probe_delay; // 默认 10msFieldTrialParameter<DataSize> min_packet_size;  // 默认 200Bytes
};

5.1.2. 重要方法

1)OnIncomingPacket

BitrateProber 会监听所有插入发送队列的报文,当发现有足够大的发送报文时,才会启动带宽探测任务。代码注释中说这样能够得到更加准确的探测结果,具体原因未知。

2)CreateProbeClusters

创建带宽探测任务,探测任务会被插入任务队列等待执行。

3)NextProbeTime

获取下一次发送探测包的时间。

4)RecommendedMinProbeSize

获取一轮发送探测包大小。

5)ProbeSent

通知一轮探测报文发送完毕,内部会更新下一次探测时间,并判断探测任务是否已经完成。

5.2. 数据结构

5.2.1. ProbeClusterConfig

创建探测任务传入的参数,指定了探测码率、探测时长和探测次数(一个 burst 算一次)

struct ProbeClusterConfig {// 探测任务创建时间Timestamp at_time = Timestamp::PlusInfinity();// 要求的探测速率DataRate target_data_rate = DataRate::Zero();// 要求的探测时长(默认为 15ms,由 ProbeController 设置)TimeDelta target_duration = TimeDelta::Zero();// 探测次数(默认为 5,由 ProbeController 设置)int32_t target_probe_count = 0;// 探测任务 ID,由 ProbeController 分配int32_t id = 0;
};

5.2.2. ProbeCluster

BitrateProber 将 ProbeClusterConfig 转化为内部的 ProbeCluster 数据结构。BitrateProber 不再使用 target_duration,将其转换为 probe_cluster_min_bytes。

struct PacedPacketInfo {static constexpr int kNotAProbe = -1;// 来自 ProbeClusterConfig::target_data_rateDataRate send_bitrate = DataRate::BitsPerSec(0);// 来自 ProbeClusterConfig::idint probe_cluster_id = kNotAProbe;// 来自 ProbeClusterConfig::target_probe_countint probe_cluster_min_probes = -1;// 来自 ProbeClusterConfig::target_data_rate * ProbeClusterConfig::target_durationint probe_cluster_min_bytes = -1;// 发送的总字节数int probe_cluster_bytes_sent = 0;
};struct ProbeCluster {PacedPacketInfo pace_info;// 已发送了几轮探测报文int sent_probes = 0;// 已经发送的探测报文数据量int ProbeCluster = 0;// 来自 ProbeClusterConfig::at_timeTimestamp requested_at = Timestamp::MinusInfinity();// 开始发送探测报文时间Timestamp started_at = Timestamp::MinusInfinity();// 这个字段目前没用到int retries = 0;
};

PacedPacketInfo::probe_cluster_bytes_sent 是 ProbeCluster::ProbeCluster 的一个拷贝。

absl::optional<PacedPacketInfo> BitrateProber::CurrentCluster(Timestamp now) {...PacedPacketInfo info = clusters_.front().pace_info;// 将 ProbeCluster::sent_bytes 拷贝到 PacedPacketInfo::probe_cluster_bytes_sentinfo.probe_cluster_bytes_sent = clusters_.front().sent_bytes;return info;
}

5.3. 生命周期

5.3.1. 探测开始

调用 CreateProbeClusters 创建探测任务,探测任务并不会立即执行,BitrateProber 会等待一个足够大的报文来启动探测任务。

void BitrateProber::OnIncomingPacket(DataSize packet_size) {if (ReadyToSetActiveState(packet_size)) {next_probe_time_ = Timestamp::MinusInfinity();probing_state_ = ProbingState::kActive;}
}bool BitrateProber::ReadyToSetActiveState(DataSize packet_size) const {if (clusters_.empty()) {return false;}switch (probing_state_) {case ProbingState::kDisabled: // 已经关闭case ProbingState::kActive:   // 已经处于active状态return false;case ProbingState::kInactive:// min_packet_size = 200Breturn packet_size >=std::min(RecommendedMinProbeSize(), config_.min_packet_size.Get());}
}

5.3.2. 报文发送

1)什么时间发送

与媒体报文的发送控制逻辑不一样,探测报文的发送控制不需要考虑网络管道的拥塞、发送队列的积压等情况,严格按照探测任务要求的速率发送数据。
 

Timestamp BitrateProber::CalculateNextProbeTime(const ProbeCluster& cluster) const 
{...DataSize sent_bytes = DataSize::Bytes(cluster.sent_bytes);DataRate send_bitrate = cluster.pace_info.send_bitrate;// 这里计算距离开始发送报文的相对时间,尽力保证目标探测速率TimeDelta delta = sent_bytes / send_bitrate;return cluster.started_at + delta;
}

每次发送完数据后,相对探测任务发送起点,使用探测码率重新计算 NextProbeTime,尽量保证发送码率符合探测任务要求。

2)发送多少数据

发送探测报文前,会获取这一轮探测需要发送数据量大小:recommended_probe_size。

if (is_probing) {pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());// probe_cluster_id 是否有效if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {recommended_probe_size = prober_.RecommendedMinProbeSize();} else {is_probing = false;}
}

recommended_probe_size = 设置的探测目标速率 x 配置的每轮探测最小时间间隔,带宽探测使用的发送时间间隔与媒体报文发送时间间隔 send_burst_interval_ 不一样,默认 2ms 的发送间隔感觉挺恐怖。

DataSize BitrateProber::RecommendedMinProbeSize() const {if (clusters_.empty()) {return DataSize::Zero();}// send_bitrate 是探测目标码率DataRate send_rate = clusters_.front().pace_info.send_bitrate;// 使用配置值:min_probe_delta = 2msreturn send_rate * config_.min_probe_delta;
}

PacingController 保证会在一个 burst 发送指定数量的数据。

if (is_probing) {pacing_info.probe_cluster_bytes_sent += packet_size.bytes();// 如果是带宽探测,至少发送这么多数据才能退出发送循环。if (data_sent >= recommended_probe_size) {break;}
}

5.3.3. 探测结束

PacingController 每轮发送完以后,会调用 BitrateProber::ProbeSent,更新总共发送了多少数据和发送了多少轮。当达到任务设置的发送数据大小和发送次数要求,探测结束,cluster 被从队列中移除。

void BitrateProber::ProbeSent(Timestamp now, DataSize size) {...ProbeCluster* cluster = &clusters_.front();// 第一次发送if (cluster->sent_probes == 0) {cluster->started_at = now;}// 累加发送的字节数和探测的次数cluster->sent_bytes += size.bytes<int>();cluster->sent_probes += 1;// 根据要求的探测速率计算下一次探测时间next_probe_time_ = CalculateNextProbeTime(*cluster);// Cluster已经完成,发送数据和发送次数都需要达成目标if (cluster->sent_bytes >= cluster->pace_info.probe_cluster_min_bytes &&cluster->sent_probes >= cluster->pace_info.probe_cluster_min_probes) {clusters_.pop();}// 没有探测任务了,更新探测状态if (clusters_.empty()) {probing_state_ = ProbingState::kInactive;}...
}

6. 总结

本文详细分析了 WebRTC 平滑发送模块的整体框架和实现原理,并对重要的数据结构和逻辑进行了深入剖析。平滑发送模块设计的非常灵活,采用动态发包周期和漏桶控制机制,能够满足媒体报文发送、带宽探测、高优先级报文优先发送等多种发送要求。不过,相比于固定发包周期,这种设计实现起来更加复杂性,大家在借鉴的时候,要充分评估、小心谨慎。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/875090.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python库(14):Arrow库简化时间处理

1 Arrow简介 Arrow 是一个被称为程序员的时间处理利器的 Python 库。 从诞生起&#xff0c;它就是为了填补 Python 的 datetime 类型的功能空白而生的。为程序员提供了一种更简单、更直观的方式来处理日期和时间。 2 安装Arrow库 pip install arrow -i https://pypi.tuna.ts…

什么是设备运维管理系统?有什么作用?(6款设备运维管理系统推荐)

一、什么是设备运维管理系统&#xff1f; 设备运维管理系统是一种集成了监控、管理、维护和优化设备性能的软件平台。它旨在通过自动化的手段&#xff0c;提高设备运行的可靠性和效率&#xff0c;降低运维成本&#xff0c;并优化资源利用。 设备运维管理系统能够实时监控设备…

【1】Python机器学习之基础概念

1、什么是机器学习 最早的机器学习应用——垃圾邮件分辨 传统的计算机解决问题思路&#xff1a; 编写规则&#xff0c;定义“垃圾邮件”&#xff0c;让计算机执行对于很多问题&#xff0c;规则很难定义规则不断变化 机器学习在图像识别领域的重要应用&#xff1a; 人脸识别…

带您详细了解安全漏洞的产生和防护

什么是漏洞&#xff1f; 漏洞是 IT、网络、云、Web 或移动应用程序系统中的弱点或缺陷&#xff0c;可能使其容易受到成功的外部攻击。攻击者经常试图寻找网络安全中的各种类型的漏洞来组合和利用系统。 一些最常见的漏洞&#xff1a; 1.SQL注入 注入诸如 SQL 查询之类的小代…

BUU [PASECA2019]honey_shop

BUU [PASECA2019]honey_shop 技术栈&#xff1a;任意文件读取、session伪造 开启靶机&#xff0c;我有1336金币&#xff0c;买flag需要1337金币 点击上面的大图&#xff0c;会直接下载图片 抓包看看&#xff0c;感觉是任意文件读取 修改下路径读一下 读到了session密钥是Kv8i…

Springboot validated JSR303校验

1.导入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency> 2.测试类 package com.jmj.gulimall.product.testC;import lombok.Data;import javax.val…

C++《类和对象》(中)

一、 类的默认成员函数介绍二、构造函数 构造函数名与类同名内置类型与自定义类型析构函数拷贝构造函数 C《类和对象》(中) 一、 类的默认成员函数介绍 默认成员函数就是⽤⼾没有显式实现&#xff0c;编译器会⾃动⽣成的成员函数称为默认成员函数。 那么我们主要学习的是1&…

Linux环境docker部署Firefox结合内网穿透远程使用浏览器测试

文章目录 前言1. 部署Firefox2. 本地访问Firefox3. Linux安装Cpolar4. 配置Firefox公网地址5. 远程访问Firefox6. 固定Firefox公网地址7. 固定地址访问Firefox 前言 本次实践部署环境为本地Linux环境&#xff0c;使用Docker部署Firefox浏览器后&#xff0c;并结合cpolar内网穿…

手动搭建微型计算机(涉及:CPU、内存、寄存器等)

目录 微型计算机基础元件及作用CPU地址总线数据总线 内存地址总线数据总线内存大小的计算 寄存器先将Z80CPU与TC5517内存相连参考文章 微型计算机基础元件及作用 CPU、内存、I/O CPU 包含地址总线引脚和数据总线引脚。 以Z80CPU为例&#xff1a; 地址总线 地址总线引脚…

Apache Bigtop 正式支持 openEuler,共创大数据新生态

近日&#xff0c;在OpenAtom openEuler&#xff08;简称"openEuler"&#xff09;BigData SIG与Linaro的携手努力下&#xff0c;** Apache Bigtop于2024年7月8日发布的3.3.0新版本中&#xff0c;正式宣告了对openEuler操作系统的原生支持**。这一里程碑式的进展&#…

[微信小程序] css 解决纯数字或字母不自动换行的问题、控制文字行数

效果 css 代码 word-break: break-all; overflow: hidden; text-overflow: ellipsis; display: -webkit-box; -webkit-line-clamp: 2; -webkit-box-orient: vertical;解释 word-break: break-all; 作用&#xff1a;这个属性允许在单词内部进行换行&#xff0c;即使单词很长也…

Mysql - 索引

目录 一、存储引擎 二、索引 索引结构 索引分类 索引语法 联合索引 前缀索引 索引使用规则 最左前缀法则 范围查询使索引失效 字段做运算操作索引失效 字符串字段不加单引号索引失效 字段做前模糊查询索引失效 or连接条件索引失效 数据发布情况索引失效 指定使用…

AIGC高频产品面试题(二)

什么叫大模型&#xff0c;人工智能大模型是什么&#xff1f; 之前&#xff0c;人工智能大多针对特定的场景应用进行训练&#xff0c;生成的模型难以迁移到其他场景&#xff0c;属于“小模型”的范畴。整个训练过程中&#xff0c;不仅手工调参工作量大&#xff0c;还需要给机器“…

[ECCV 2024] [复旦]RECE:扩散模型概念移除,只需3秒即可充分移除风险概念!

本文内容来自公众号粉丝投稿&#xff0c;作者来自复旦大学的视觉与学习实验室(FVL)。研究团队提出了一种可靠、高效的概念移除方法&#xff08;RECE&#xff09;。该方法以解析解的形式&#xff0c;迭代地进行风险概念移除、风险概念嵌入推导&#xff0c;从而确保模型彻底移除风…

【MySQL进阶之路 | 高级篇】优化数据库结构和大表优化

目录结构&#xff1a; 目录 目录结构&#xff1a; 1. 优化数据库结构 1.1 拆分表&#xff1a;冷热数据分离 1.2 增加冗余字段 1.3 优化数据类型 情况1&#xff1a;对整数类型数据进行优化 情况2&#xff1a;既可以使用文本类型也可以使用整数类型的字段&#xff0c;要选…

LeetCode热题100刷题17:124. 二叉树中的最大路径和、437. 路径总和 III、199. 二叉树的右视图

124. 二叉树中的最大路径和 /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}* TreeNode(int x) : val(x), left(nullptr), right(nul…

Github 2024-07-17 开源项目日报 Top10

根据Github Trendings的统计,今日(2024-07-17统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量非开发语言项目3Python项目3Rust项目2TypeScript项目2MDX项目1项目化学习 创建周期:2538 天协议类型:MIT LicenseStar数量:161973 个Fork数量…

基于51单片机的指纹红外密码电子锁

基于51单片机的指纹红外密码电子锁 1、系统功能介绍2、演示视频3、系统框图4、系统电路介绍4.1、STC89C52单片机最小系统设计4.2、LCD12864显示屏电路设计4.3、矩阵键盘按键控制部分电路设计4.4、AS608指纹模块电路设计 5、程序设计5.1、LCD12864屏幕初始化5.2、AT24C02存储芯片…

打造直播工具详解:从零开始开发直播美颜SDK

今天&#xff0c;笔者将详细讲解如何从零开始开发一个直播美颜SDK&#xff0c;帮助开发者了解开发过程中的关键环节和技术要点。 一、需求分析与规划 在开发之前&#xff0c;首先需要明确美颜SDK的功能需求。一般来说&#xff0c;美颜SDK应包含以下几个核心&#xff1a; 基础…

【TAROT学习日记】韦特体系塔罗牌学习(7)——恋人 THE LOVERS VI

韦特体系塔罗牌学习&#xff08;7&#xff09;——恋人 THE LOVERS VI 目录 韦特体系塔罗牌学习&#xff08;7&#xff09;——恋人 THE LOVERS VI牌面分析1. 基础信息2. 图片元素 正位牌意1. 关键词/句2.爱情婚姻3. 学业事业4. 人际财富5. 其他象征意 逆位牌意1. 关键词/句2. …