webrtc 的TaskQueue() 任务队列

今天看下webrtc中的任务队列的实现

TaskQueue 定义

见文件:rtc_base\task_queue.h

具体实现

class RTC_LOCKABLE RTC_EXPORT TaskQueue {public:// TaskQueue priority levels. On some platforms these will map to thread// priorities, on others such as Mac and iOS, GCD queue priorities.using Priority = ::webrtc::TaskQueueFactory::Priority;// 注意这个构造函数,以TaskQueueBase智能指针作为参数// TaskQueue 的真正实现,其实是这个TaskQueueBase explicit TaskQueue(std::unique_ptr<webrtc::TaskQueueBase,webrtc::TaskQueueDeleter> task_queue);~TaskQueue();}

创建一个 TaskQueue

具体过程:

  • 首先创建一个默认的任务队列工厂
  • 然后基于任务队列工厂创建一个任务队列TaskQueueBase基类
  • 这个新创建的任务队列TaskQueueBase,
    作为参数传给TaskQueue()的构造函数, 从而创建了我们需要的TaskQueue对象 video_encoder_queue_
  //首先声明相应对象:一个任务队列工厂、一个任务队列对象//因为webrtc是跨平台的项目,//引入一个工厂模式,来兼容不同的平台//通过工厂来创建任务队列std::shared_ptr<webrtc::TaskQueueFactory> video_encoder_task_queue_factory_;rtc::TaskQueue video_encoder_queue_;//在指定类构造函数列表中创建任务队列//假如我们的调用类是VideoHandler//具体的实现如下:VideoHandbler::VideoHandbler():  video_encoder_task_queue_factory_(webrtc::CreateDefaultTaskQueueFactory()),video_encoder_queue_(video_encoder_task_queue_factory_->CreateTaskQueue("VideoEncoderQueue",TaskQueueFactory::Priority::NORMAL)){}

TaskQueueBase 任务队列基类


//TaskQueue的构造函数,以TaskQueueBase的智能指针作为参数
//通过成员变量impl_ 接收该指针
//后面会发现,任务最终都传给了TaskQueueBase()TaskQueue::TaskQueue(std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter> task_queue): impl_(task_queue.release()) {}//向TaskQueue投递任务时,最终还是通过 imple_抛给了TaskQueueBase()
void TaskQueue::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {return impl_->PostTask(std::move(task));
}

TaskQueueLibevent

每个平台都有各自的实现
我们主要看TaskQueueLibevent()的实现

//TaskQueueLibevent 继承自TaskQueueBase(),真正处理Task的函数
class TaskQueueLibevent final : public TaskQueueBase {
public:TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);void Delete() override;void PostTask(std::unique_ptr<QueuedTask> task) override;void PostDelayedTask(std::unique_ptr<QueuedTask> task,uint32_t milliseconds) override;
private:bool is_active_ = true;//输入、输出管道用来唤起线程int wakeup_pipe_in_ = -1;int wakeup_pipe_out_ = -1;event_base* event_base_;event wakeup_event_;//任务队列线程//一个任务队列对象一个线程//对于windows平台内部会执行 CreateThread()创建线程//android调用 pthread_create()创建线程//线程的ThreadMain会一直监听管道的可读事件//从而唤醒线程去处理Task任务rtc::PlatformThread thread_;//线程轮训任务队列时会加锁Mutex pending_lock_;//最重要的结构:存储Task任务的vector对象//线程唤醒后会轮训该该pending_对象,处理任务absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> pending_RTC_GUARDED_BY(pending_lock_);// Holds a list of events pending timers for cleanup when the loop exits.std::list<TimerEvent*> pending_timers_;

TaskQueueLibevent 实现部分

TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,rtc::ThreadPriority priority): event_base_(event_base_new()),thread_(&TaskQueueLibevent::ThreadMain, this, queue_name, priority) {
int fds[2];
//创建一个管道用于线程之间通信
RTC_CHECK(pipe(fds) == 0);
//把管道设置为非阻塞的
SetNonBlocking(fds[0]);
SetNonBlocking(fds[1]);
wakeup_pipe_out_ = fds[0];
wakeup_pipe_in_ = fds[1];
//绑定管道可读事件,当管道可读时会自动调用OnWakeup()函数
//这块是借用libevent库来实现的
EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,EV_READ | EV_PERSIST, OnWakeup, this);
event_add(&wakeup_event_, 0);
thread_.Start();
}
//调用fcntl()把管道设置诶非阻塞
bool SetNonBlocking(int fd) {
const int flags = fcntl(fd, F_GETFL);
RTC_CHECK(flags != -1);
return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
}//线程函数一直轮训相关事件
void TaskQueueLibevent::ThreadMain(void* context) {
TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);{CurrentTaskQueueSetter set_current(me);while (me->is_active_)event_base_loop(me->event_base_, 0);
}for (TimerEvent* timer : me->pending_timers_)delete timer;
}

向任务队列中抛入任务

void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {{//向队列投递任务时,先加锁MutexLock lock(&pending_lock_);bool had_pending_tasks = !pending_.empty();//把任务插入到队列中pending_.push_back(std::move(task));// Only write to the pipe if there were no pending tasks before this one// since the thread could be sleeping. If there were already pending tasks// then we know there's either a pending write in the pipe or the thread has// not yet processed the pending tasks. In either case, the thread will// eventually wake up and process all pending tasks including this one.//如果当前线程有未处理的阻塞任务,就暂时不唤醒if (had_pending_tasks) {return;}}// Note: This behvior outlined above ensures we never fill up the pipe write// buffer since there will only ever be 1 byte pending.//写入的内容是KRunTasks,表明是要处理的Task任务char message = kRunTasks;//向管道中写入数据,唤起线程RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),sizeof(message));
}

管道中写入数据后,会触发OnWakeup()函数

void TaskQueueLibevent::OnWakeup(int socket,short flags,  // NOLINTvoid* context) {TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);RTC_DCHECK(me->wakeup_pipe_out_ == socket);char buf;// 调用read函数从管道中读入数据//读到数据后基于相应的数据类型进行处理RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));switch (buf) {case kQuit:me->is_active_ = false;event_base_loopbreak(me->event_base_);break;case kRunTasks: {//要处理的任务absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> tasks;{//加锁//并取出要处理的任务MutexLock lock(&me->pending_lock_);tasks.swap(me->pending_);}RTC_DCHECK(!tasks.empty());//遍历所有的task,并调用各自的Run()函数进行处理//最终就是回调传进来任务函数for (auto& task : tasks) {if (task->Run()) {task.reset();} else {// |false| means the task should *not* be deleted.task.release();}}break;}default:RTC_NOTREACHED();break;}
}

使用例子,向任务队列投入任务

向队列中投入一个lambda 任务,
当任务线程接收到该任务后,就会回调该lambda函数

 video_encoder_queue_.PostTask([this,&video_data]{....encoder(video_data);....});

视频编码队列 encoder_queue_

encoder_queue_ 就是利用前面讲到TaskQueue进行视频编码的

具体实现:

//声明编码队列
rtc::TaskQueue encoder_queue_;
//构造函数列表中创建该编码队列
VideoStreamEncoder::VideoStreamEncoder()://创建编码队列encoder_queue_(task_queue_factory->CreateTaskQueue("EncoderQueue",TaskQueueFactory::Priority::NORMAL)){}//向编码队列中投入视频数据进行编码
//所以真正的编码线程,就是encoder_queue所拥有的线程了
void VideoStreamEncoder::OnFrame(const VideoFrame& video_frame) {encoder_queue_.PostTask([this, incoming_frame, post_time_us, log_stats, post_interval_us]() {if (posted_frames_waiting_for_encode == 1 && !cwnd_frame_drop) {MaybeEncodeVideoFrame(incoming_frame, post_time_us);}}
}

析构函数会主动关闭任务队列

析构函数中会自动关闭该任务队列,并停止对应任务线程,不需要我们干预

TaskQueue::~TaskQueue() {// There might running task that tries to rescheduler itself to the TaskQueue// and not yet aware TaskQueue destructor is called.// Calling back to TaskQueue::PostTask need impl_ pointer still be valid, so// do not invalidate impl_ pointer until Delete returns.impl_->Delete();
}void TaskQueueLibevent::Delete() {RTC_DCHECK(!IsCurrent());struct timespec ts;char message = kQuit;//向管道中写入关闭消息while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {// The queue is full, so we have no choice but to wait and retry.RTC_CHECK_EQ(EAGAIN, errno);ts.tv_sec = 0;ts.tv_nsec = 1000000;nanosleep(&ts, nullptr);}//停止线程thread_.Stop();event_del(&wakeup_event_);IgnoreSigPipeSignalOnCurrentThread();//关闭管道close(wakeup_pipe_in_);close(wakeup_pipe_out_);wakeup_pipe_in_ = -1;wakeup_pipe_out_ = -1;event_base_free(event_base_);delete this;
}

以上就是webrtc的 TaskQueue() 任务队列的实现过程了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/17484.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数字电路(一)

1、例题 1、进行DA数模转换器选型时&#xff0c;一般要选择主要参数有&#xff08; A&#xff09;、转换精度和转换速度。 A、分辨率 B、输出电流 C、输出电阻 D、模拟开关 2、下图所示电路的逻辑功能为&#xff08; B&#xff09; A、与门 B、或门 C、与非门 D、或非门 分析该…

程序员面试IT技术岗的三大技巧

文章目录 技巧一&#xff1a;深入研究意向企业技巧二&#xff1a;准备常见的面试问题技巧三&#xff1a;总结经历的面试题 在找工作时&#xff0c;面试是每位程序员必须经历的一关。面对众多求职者竞争激烈的情况&#xff0c;我们需要结合自己的现状&#xff0c;针对意向企业做…

C语言每日一题

今天分享的是一道牛客网上面的题目&#xff0c;链接在下面 有序序列合并 这道题做法有很多&#xff0c;最简单的是合并一起&#xff0c;然后用排序就行了&#xff0c;今天将一个最高效的办法&#xff0c;思路是两个数组第一项进行比较&#xff0c;小的先输出&#xff0c;输出的…

python3GUI--我的翻译器By:PyQt5(附下载地址)

文章目录 一&#xff0e;前言二&#xff0e;展示1.主界面2.段落翻译3.单词翻译 三&#xff0e;设计1.UI设计2.软件设计3.参考 四&#xff0e;总结 一&#xff0e;前言 很早之前写过一篇python3GUI–翻译器By:PyQt5&#xff08;附源码&#xff09; &#xff0c;但是发现相关引擎…

Java on VS Code 7 月更新|反编译器支持升级、代码补全性能提升、AI 相关更新及更多

作者&#xff1a;Nick Zhu 排版&#xff1a;Alan Wang 大家好&#xff0c;欢迎来到 Visual Studio Code for Java 的7月更新&#xff01;在这篇博客中&#xff0c;我们将为您提供有关反编译器支持的重要更新。此外&#xff0c;我们将分享更多最近代码补全性能提升的进展&#x…

【ChatGPT】基于WSL+Docker的ChatGPT PLUS共享服务部署

最近买了ChatGPT PLUS服务&#xff0c;想通过web服务将它共享给其他人使用&#xff0c;搜了一下目前GitHub上比较热门的服务有 ChatGPT-Next-Webchatgpt-web-share 其中chatgpt-web-share支持API和PLUS账号分享两种方式&#xff0c;且架构为PythonJSDocker&#xff0c;相对比…

【LeetCode】27. 移除元素

题目大概意思是剔除nums数组中出现的所有val值。可以用快慢双指针法来做。 快的指针在前面遍历找值不为val的元素的下标&#xff0c;慢的负责接收值不为val的元素&#xff0c;并及时更新数组。 class Solution {public int removeElement(int[] nums, int val) {//快慢指针法in…

【Liux下6818开发板(ARM)】实现简易相册

(꒪ꇴ꒪ ),hello我是祐言博客主页&#xff1a;C语言基础,Linux基础,软件配置领域博主&#x1f30d;快上&#x1f698;&#xff0c;一起学习&#xff01;送给读者的一句鸡汤&#x1f914;&#xff1a;集中起来的意志可以击穿顽石!作者水平很有限&#xff0c;如果发现错误&#x…

uniapp文件下载

使用uniapp提供给我们的uni.downloadFile、uni.saveFile和uni.openDocument三个API就可以了 也很简单&#xff0c;直接贴一下代码&#xff0c;安修修改一下即可 <template><view><image tap"pdfDownLoad" style"width: 35rpx;height: 35rpx;&…

2、认识O(nlogn)的排序

归并排序 分两半,谁小拷贝谁 public class Test{public static void mergeSort(int[] arr) {if (arr == null || arr.length < 2) {return;}mergeSort(arr, 0, arr.length - 1);}public static void mergeSort(int[] arr, int l, int r) {if (l == r) {return;}int mid =…

第一百回 如何实现文件存储二

文章目录 知识回顾示例代码 我们在上一章回中介绍了" 如何实现文件存储"相关的内容&#xff0c;本章回中将继续介绍与此相关的内容.闲话休提&#xff0c;让我们一起Talk Flutter吧。 知识回顾 我们上一章回中介绍了实现文件存储的三个步骤&#xff0c;不过限于篇幅…

【3】Linux实现多进程、多线程

系列文章目录 提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 TODO:写完再整理 文章目录 系列文章目录前言一、linux开发的方向二、Linux环境特点1、Linux环境介绍2、Linux环境基本构成三、同步与互斥1、Linux同步并发的方法(1)创建、终止、等待、分离线…

Golang之路---03 面向对象——结构体

结构体 结构体定义 在之前学过的数据类型中&#xff0c;数组与切片&#xff0c;只能存储同一类型的变量。若要存储多个类型的变量&#xff0c;就需要用到结构体&#xff0c;它是将多个任意类型的变量组合在一起的聚合数据类型。 每个变量都成为该结构体的成员变量。   可以理…

介绍如何快速传输100G大文件传输方法

如何快速地将100GB的大文件从一个地方传送到另一个地方&#xff0c;这是许多人都遇到过的一个难题&#xff0c;无论是个人用户还是企业用户&#xff0c;都有这样的需求。在很多场合&#xff0c;我们需要将大文件迅速地发送到远程位置。比如&#xff0c;当我们需要对重要数据进行…

心法利器[93] | 谈校招:技术面

心法利器 本栏目主要和大家一起讨论近期自己学习的心得和体会&#xff0c;与大家一起成长。具体介绍&#xff1a;仓颉专项&#xff1a;飞机大炮我都会&#xff0c;利器心法我还有。 2022年新一版的文章合集已经发布&#xff0c;累计已经60w字了&#xff0c;获取方式看这里&…

DeepVO 论文阅读

论文信息 题目&#xff1a;DeepVO Towards End-to-End Visual Odometry with Deep Recurrent Convolutional Neural Networks 作者&#xff1a;Sen Wang, Ronald Clark, Hongkai Wen and Niki Trigoni 代码地址&#xff1a;http://senwang.gitlab.io/DeepVO/ (原作者并没有开源…

插入排序算法

插入排序 算法说明与代码实现&#xff1a; 以下是使用Go语言实现的插入排序算法示例代码&#xff1a; package mainimport "fmt"func insertionSort(arr []int) {n : len(arr)for i : 1; i < n; i {key : arr[i]j : i - 1for j > 0 && arr[j] > …

Python web实战 | 使用 Flask 实现 Web Socket 聊天室

概要 今天我们学习如何使用 Python 实现 Web Socket&#xff0c;并实现一个实时聊天室的功能。本文的技术栈包括 Python、Flask、Socket.IO 和 HTML/CSS/JavaScript。 什么是 Web Socket&#xff1f; Web Socket 是一种在单个 TCP 连接上进行全双工通信的协议。它是 HTML5 中的…

lua脚本实现Redis令牌桶限流

背景 令牌桶限流是一种常见的流量控制算法&#xff0c;用于控制系统的请求处理速率&#xff0c;防止系统过载。在令牌桶限流算法中&#xff0c;可以将请求看作是令牌&#xff0c;而令牌桶则表示系统的处理能力。系统在处理请求时&#xff0c;首先需要从令牌桶中获取令牌&#…

【编程语言 · C语言 · 学生管理系统】

【编程语言 C语言 学生管理系统】https://mp.weixin.qq.com/s?__bizMzg4NTE5MDAzOA&mid2247491542&idx1&snf9b72a5af62a93bc902c5467056a9343&chksmcfade32ff8da6a3956be7d6a5dceb97de27e25157804abf8a3193272fa8ad68e78640ca33a5c&token1462056111&…