webrtc的线程模型

目录

线程的声明

线程创建过程

向线程中投递消息

从消息队列中取消息的具体实现

处理线程消息


webrtc线程模块的实现逻辑在 rtc_base\thread.h 文件中

比如想创建一个线程:

//声明要创建的线程指针,通过智能指针管理
std::unique_ptr<rtc::Thread> video_thread_;
// 创建线程
video_thread_ = rtc::Thread::Create();
//设置新创建的线程名
video_thread_->SetName("video_thread_", video_thread_.get());
//开启线程
video_thread_->Start();
//向线程投递要处理的消息video_thread_->Post(RTC_FROM_HERE, this, MESSAGE_ID);// MESSAGE_ID 自定义的消息id//向线程投入带有消息体的消息video_thread_->Post(RTC_FROM_HERE, this, VIDEO_INFO,new rtc::TypedMessageData<VIDEO_INFO_MEESAGE>(r));//其中RTC_FROM_HERE 是个宏定义,标记线程调用的原位置
// Define a macro to record the current source location.
#define RTC_FROM_HERE RTC_FROM_HERE_WITH_FUNCTION(__FUNCTION__)

下面看下线程的具体实现

线程的声明

//线程继承自一个任务队列,并且有两个存储消息的消息队列
//普通消息 messages_,延时消息 delayed_messages_
class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {explicit Thread(SocketServer* ss);explicit Thread(std::unique_ptr<SocketServer> ss);privateMessage msgPeek_;//声明对应的消息//MessageList 具体的定义://typedef std::list<Message> MessageList;MessageList messages_ RTC_GUARDED_BY(crit_); //延时队列继承自 std::priority_queue<DelayedMessage> PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_); uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
}

创建线程的实现

//具体的创建函数
//构造中传入一个 NullSocketServer() 作为参数
std::unique_ptr<Thread> Thread::Create() {return std::unique_ptr<Thread>(new Thread(std::unique_ptr<SocketServer>(new NullSocketServer())));
}//最终调用到这里,线程构造函数
Thread::Thread(SocketServer* ss, bool do_init): fPeekKeep_(false),delayed_next_num_(0),fInitialized_(false),fDestroyed_(false),stop_(0),ss_(ss) {RTC_DCHECK(ss);//把当前线程的this指针传给 NullSocketServerss_->SetMessageQueue(this);
//设置线程的初始名字SetName("Thread", this);  // default nameif (do_init) {DoInit();}
}void Thread::DoInit() {if (fInitialized_) {return;}fInitialized_ = true;//把当前线程的this指针对象传给ThreadManagerThreadManager::Add(this);
}
//ThreadManager会把当前线程,放到一个 message_queues_ 中统一管理
void ThreadManager::AddInternal(Thread* message_queue) {CritScope cs(&crit_);// Prevent changes while the list of message queues is processed.RTC_DCHECK_EQ(processing_, 0);message_queues_.push_back(message_queue);
}

引入了一个新的对象 ThreadManager

//ThreadManager是线程管理类,是一个单例,
//保存创建的所有线程对象
class RTC_EXPORT ThreadManager {// Singleton, constructor and destructor are private.static ThreadManager* Instance();//保存线程的消息队列,其实是个vector,不是queue。//很多服务都喜欢用vector代替queue,srs也是把vector当queue用// This list contains all live Threads.std::vector<Thread*> message_queues_ RTC_GUARDED_BY(crit_);}
//创建单例 ThreadManager,饿汉模式
ThreadManager* ThreadManager::Instance() {static ThreadManager* const thread_manager = new ThreadManager();return thread_manager;
}
//把线程指针加入到消息队列中
void ThreadManager::Add(Thread* message_queue) {return Instance()->AddInternal(message_queue);
}
void ThreadManager::AddInternal(Thread* message_queue) {CritScope cs(&crit_);// Prevent changes while the list of message queues is processed.RTC_DCHECK_EQ(processing_, 0);message_queues_.push_back(message_queue);
}

线程创建过程

线程的Start()函数才是真正创建线程的地方,只看android(即linux)端。

具体的实现是用的pthread,而没有用标准的std::thread

bool Thread::Start() {pthread_attr_t attr;pthread_attr_init(&attr);//创建线程调用的是pthread_create,//并传入线程函数 PreRunint error_code = pthread_create(&thread_, &attr, PreRun, this);if (0 != error_code) {RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;thread_ = 0;return false;}
}void* Thread::PreRun(void* pv) {Thread* thread = static_cast<Thread*>(pv);ThreadManager::Instance()->SetCurrentThread(thread);rtc::SetCurrentThreadName(thread->name_.c_str());//调用一个Run()函数thread->Run();}void Thread::Run()
{
// Forever 模式,一直轮训处理ProcessMessages(kForever); 
}
//真正处理消息的函数,下文会详细介绍
bool Thread::ProcessMessages(int cmsLoop)
{while (true) {Message msg;// Get()函数从消息队列中取消息   if (!Get(&msg, cmsNext))return !IsQuitting();//取出消息后调用Dispatch()进行处理Dispatch(&msg);if (cmsLoop != kForever){cmsNext = static_cast<int>(TimeUntil(msEnd));if (cmsNext < 0)return true;}}          }

向线程中投递消息


// |time_sensitive| is deprecated and should always be false.virtual void Post(const Location& posted_from,//是从哪个函数向线程中投递消息MessageHandler* phandler,//消息处理的类,一般是向线程抛消息的类的this指针,当线程轮训到该消息时通过该this指针再回调对应的处理函数uint32_t id = 0,//消息idMessageData* pdata = nullptr, //消息体bool time_sensitive = false);//废弃的参数virtual void PostDelayed(const Location& posted_from, //支持向线程抛入延迟消息int delay_ms,MessageHandler* phandler,uint32_t id = 0,MessageData* pdata = nullptr);virtual void PostAt(const Location& posted_from, int64_t run_at_ms,MessageHandler* phandler,uint32_t id = 0,MessageData* pdata = nullptr);// 看下Post的具体实现void Thread::Post(const Location& posted_from,MessageHandler* phandler,uint32_t id,MessageData* pdata,bool time_sensitive) {RTC_DCHECK(!time_sensitive);if (IsQuitting()) {delete pdata;return;}// Keep thread safe// Add the message to the end of the queue// Signal for the multiplexer to return{//注意这个大括号哈//数据进队列加锁,内部用的 pthread_mutex_lock(mutex_)//CritScope对 mutex_进行了封装,构造函数加锁、析构函数解锁CritScope cs(&crit_);Message msg;//构造消息体msg.posted_from = posted_from;msg.phandler = phandler;msg.message_id = id;msg.pdata = pdata;messages_.push_back(msg);}//CritScope退出作用区域后,调用对应的析构函数解锁//即pthread_mutex_unlock(&mutex_);函数//这种实现方式一方面缩小了锁的范围,锁的范围仅仅局限于大括号内部,而不是整个Post()函数//同时,退出临界区后自动调用析构函数释放锁,也避免了死锁的可能性//这个WakeUp* 函数是重点,它会唤醒当前等待的线程WakeUpSocketServer();
}                 
//看一下 WakeUpSocketServer()的实现
//最终是通过 pthread_cond_broadcast()
//唤醒当前所有处于pthread_cond_wait()的线程void Thread::WakeUpSocketServer() {ss_->WakeUp();
}
void NullSocketServer::WakeUp() {event_.Set();
}      void Event::Set() {pthread_mutex_lock(&event_mutex_);event_status_ = true;//广播唤醒所有处于 pthread_cond_wait()的线程pthread_cond_broadcast(&event_cond_);pthread_mutex_unlock(&event_mutex_);
}

从消息队列中取消息的具体实现

//消息处理是从 Thread 的Run()函数开始
void Thread::Run() {// KForever字段,一直轮训取数据,//没有数据时会 wait() 阻塞等待ProcessMessages(kForever);
}
bool Thread::ProcessMessages(int cmsLoop) {int cmsNext = cmsLoop;while (true) {Message msg;//从消息队列中取消,//取出来后交给 Dispatch()进行处理if (!Get(&msg, cmsNext))return !IsQuitting();Dispatch(&msg);if (cmsLoop != kForever) {cmsNext = static_cast<int>(TimeUntil(msEnd));if (cmsNext < 0)return true;}}
}
//取消息的过程
bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {// Return and clear peek if present// Always return the peek if it exists so there is Peek/Get symmetryif (fPeekKeep_) {*pmsg = msgPeek_;fPeekKeep_ = false;return true;}// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatchint64_t cmsTotal = cmsWait;int64_t cmsElapsed = 0;int64_t msStart = TimeMillis();int64_t msCurrent = msStart;while (true) {// Check for posted eventsint64_t cmsDelayNext = kForever; //一直训练bool first_pass = true;//具体实现是两层while(true)。内部的while负责取消息,//取不到时,外部while负责wait()阻塞等待while (true) {// All queue operations need to be locked, but nothing else in this loop// (specifically handling disposed message) can happen inside the crit.// Otherwise, disposed MessageHandlers will cause deadlocks.{//和像线程中投递消息类似,取消息时也先加锁CritScope cs(&crit_);// On the first pass, check for delayed messages that have been// triggered and calculate the next trigger time.if (first_pass) {//线程被唤醒后,只从延时队列中取一次//并且这一次会把所有到时需要处理的延时消息取完,//取出的延时消息放到messages_队列和普通消息一样进行处理first_pass = false;while (!delayed_messages_.empty()) {//当前时间,小于延时队列中第一条消息时间,//说明还没有到需要处理延时消息的时间,if (msCurrent < delayed_messages_.top().run_time_ms_) {//cmsDelayNext计算出需要等待的时间,//也是后面线程wait()时需要等待的最大时间,//因为到了这个时间,即便没有普通消息到来//延时队列中的消息也到时间需要处理了cmsDelayNext =TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);break;}//把到时需要处理的延时消息,放到普通队列中一起处理messages_.push_back(delayed_messages_.top().msg_);//延时消息出队列delayed_messages_.pop();}}// Pull a message off the message queue, if available.if (messages_.empty()) {break;} else {//真正获得需要处理消息的地方*pmsg = messages_.front();messages_.pop_front();}}  // crit_ is released here.// If this was a dispose message, delete it and skip it.//如果是dispose废除的消息就会删除,//然后 continue()继续去取if (MQID_DISPOSE == pmsg->message_id) {RTC_DCHECK(nullptr == pmsg->phandler);delete pmsg->pdata;*pmsg = Message();continue;}//如果是需要处理的消息就return退出当前 Get()函数,//进行后面的Disptch()处理return true;}if (IsQuitting())break;// Which is shorter, the delay wait or the asked wait?int64_t cmsNext;if (cmsWait == kForever) {cmsNext = cmsDelayNext;} else {cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))cmsNext = cmsDelayNext;}// 如果延时消息队列和普通的消息队列中都没有消息,//内部while(true)会调用 break退出//然后就调用到这里,因为我们是 KForever一直轮训模式,//所以当队列中没有消息时,防止一直遍历查询,//会通过wait()挂起当前线程让出时间片{// Wait and multiplex in the meantime//内部调用的是 pthread_cond_wait,//并且在wait()时也加了锁if (!ss_->Wait(static_cast<int>(cmsNext), process_io))return false;}// If the specified timeout expired, returnmsCurrent = TimeMillis();cmsElapsed = TimeDiff(msCurrent, msStart);if (cmsWait != kForever) {if (cmsElapsed >= cmsWait)return false;}}return false;
}

处理线程消息

从消息队列中Get()获取消息后,会调用 Dispatch()处理消息。具体实现就是回调向线程中抛消息的类的OnMessage(pmg)函数,然后进行具体消息的处理:


void Thread::Dispatch(Message* pmsg) {TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",pmsg->posted_from.file_name(), "src_func",pmsg->posted_from.function_name());int64_t start_time = TimeMillis();//回调对应OnMessage(pmsg)函数进行消息处理pmsg->phandler->OnMessage(pmsg);int64_t end_time = TimeMillis();int64_t diff = TimeDiff(end_time, start_time);if (diff >= kSlowDispatchLoggingThreshold) {RTC_LOG(LS_INFO) << "Message took " << diff<< "ms to dispatch. Posted from: "<< pmsg->posted_from.ToString();}
}

而我们的处理类继承 rtc::MessageHandler,并实现了 OnMessage()函数,就可以基于对应的MessageID类型,处理不同的消息了

CVideoThread::OnMessage(rtc::Message* msg) {switch case:Message_id:handlerMessage();case VIDEO_INFO: //假如向线程中传入了 MessageData,//在线程回调时会把这个消息体带出来方便我们处理if(msg->pdata){rtc::TypedMessageData<VideoMessageData>* data = static_cast<rtc::TypedMessageData<VideoMessageData>*>(msg->pdata);string message = data->data();delete data;data = nullptr;}default:break;
}

以上就是webrtc的线程模块了,下一篇会介绍webrtc的 TaskQueue 任务队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/20511.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用Jackson自定义反序列化操作(Custom Deserialization in Jackson)

Maven依赖 <dependencies><!-- yaml --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.2</version><exclusions><exclusion><ar…

智慧林业~经典开源项目数字孪生智慧林业——开源工程及源码

东北林业局的工程和源码免费赠送&#xff0c;帮您实现深林防火的智慧林业。 项目介绍 东北林业局作为东北地区林业管理的重要机构&#xff0c;致力于森林资源保护和防火工作。他们的项目通过先进的技术手段&#xff0c;为林业管理提供可靠的解决方案。 本项目使用数字孪生技术&…

【实操教程】如何开始用Qt Widgets编程?(一)

Qt 是目前最先进、最完整的跨平台C开发工具。它不仅完全实现了一次编写&#xff0c;所有平台无差别运行&#xff0c;更提供了几乎所有开发过程中需要用到的工具。如今&#xff0c;Qt已被运用于超过70个行业、数千家企业&#xff0c;支持数百万设备及应用。 在本文中&#xff0…

轮足机器人硬件总结

简介 本文主要根据“轮腿机器人Hyun”总结的硬件部分。 轮腿机器人Hyun开源地址&#xff1a;https://github.com/HuGuoXuang/Hyun 1 电源部分 1.1 78M05 78M05是一款三端稳压器芯片&#xff0c;它可以将输入电压稳定输出为5V直流电压. 1.2 AMS1117-3.3 AMS1117-3.3是一种输…

postgresql数据库表膨胀之pg_repack安装及使用

pg_repack是一个可以在线重建表和索引的扩展。它会在数据库中建立一个和需要清理的目标表一样的临时表&#xff0c;将目标表中的数据COPY到临时表&#xff0c;并在临时表上建立与目标表一样的索引&#xff0c;然后通过重命名的方式用临时表替换目标表。 环境&#xff1a; 1&a…

代码随想录算法训练营第五十天| 123.买卖股票的最佳时机III 188.买卖股票的最佳时机IV

代码随想录算法训练营第五十九天| 123.买卖股票的最佳时机III 188.买卖股票的最佳时机IV 一、力扣123.买卖股票的最佳时机III 题目链接 思路&#xff1a;之前的类型是用数组记录当前是持有还是不持有&#xff0c;而这道题目多了两种状态&#xff0c;即为&#xff0c;第一次持…

Vue实现leafletMap自定义绘制线段 并且删除指定的已绘制的点位

效果&#xff1a;点击表格可实现选中地图点位&#xff0c;删除按钮点击可删除对应点位并且重新绘制线段&#xff0c;点击确定按钮 保存已经绘制的点位信息传给父组件 并且该组件已实现回显 完整的组件代码如下 文件名称为&#xff1a; leafletMakePointYt <!--* Descripti…

云原生周刊:Kubeflow 成为 CNCF 项目

开源项目推荐 OpenKruiseGame OpenKruiseGame&#xff08;OKG&#xff09;是简化游戏服云原生化的自定义 Kubernetes 工作负载&#xff0c;相比 Kubernetes 内置的无状态&#xff08;Deployment&#xff09;、有状态&#xff08;StatefulSet&#xff09;等工作负载而言&#…

FPGA优质开源模块 - SRIO

本文介绍一个FPGA常用模块&#xff1a;SRIO&#xff08;Serial RapidIO&#xff09;。SRIO协议是一种高速串行通信协议&#xff0c;在我参与的项目中主要是用于FPGA和DSP之间的高速通信。有关SRIO协议的详细介绍网上有很多&#xff0c;本文主要简单介绍一下SRIO IP核的使用和本…

SIFT算法原理:SIFT算法详细介绍

前面我们介绍了Harris和Shi-Tomasi角点检测算法&#xff0c;这两种算法具有旋转不变性&#xff0c;但不具有尺度不变性&#xff0c;以下图为例&#xff0c;在左侧小图中可以检测到角点&#xff0c;但是图像被放大后&#xff0c;在使用同样的窗口&#xff0c;就检测不到角点了。…

机器学习完整路径

一个机器学习项目从开始到结束大致分为 5 步&#xff0c;分别是定义问题、收集数据和预处理、选择算法和确定模型、训练拟合模型、评估并优化模型性能。是一个循环迭代的过程&#xff0c;优秀的模型都是一次次迭代的产物。 定义问题 要剖析业务场景&#xff0c;设定清晰的目标…

Excel技巧 - 管理规则设置一行变色

如何设置某一列单元格的值大于一个值时&#xff0c;该单元格所在的一整行都变色呢&#xff1f; 1、先框选内容区域&#xff0c;点击开始&#xff0c;条件格式&#xff0c;新建规则 2、如果销量大于20&#xff0c;则该行都变为绿色 编辑格式选择&#xff1a;使用公式确定要设置…

【MySQL】存储过程(十一)

🚗MySQL学习第十一站~ 🚩本文已收录至专栏:MySQL通关路 ❤️文末附全文思维导图,感谢各位点赞收藏支持~ 一.引入 存储过程是事先经过编译并存储在数据库中的一段 SQL 语句的集合,调用存储过程可以简化应用开发人员的工作,可以减少数据在数据库和应用服务器之间的传输,…

大数据技术之Clickhouse---入门篇---SQL操作、副本

星光下的赶路人star的个人主页 积一勺以成江河&#xff0c;累微尘以崇峻极 文章目录 1、SQL操作1.1 Insert1.2 Update 和 Delete1.3 查询操作1.4 alter操作1.5 导出数据 2、副本2.1 副本写入流程2.2 配置步骤 1、SQL操作 基本上来说传统关系型数据库&#xff08;以 MySQL 为例…

九五从零开始的运维之路(其二十七)

文章目录 前言一、SQL语句类型1.DDL2.DML3.DCL4.DQL 二、数据库操作1.查看数据库2.创建数据库3.进入数据库4.删除数据库5.更改数据库 三、数据表操作1.数据类型&#xff08;一&#xff09;数值类型&#xff08;二&#xff09;时间\日期类型&#xff08;三&#xff09;字符串类型…

【雕爷学编程】MicroPython动手做(30)——物联网之Blynk 4

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

Android中的ContentProvider

Android中的ContentProvider 在Android中&#xff0c;ContentProvider是四大组件之一&#xff0c;用于在不同应用程序之间共享和管理数据。它提供了一种标准化的方式来访问和管理应用程序的数据&#xff0c;使得多个应用程序可以安全地共享数据&#xff0c;而无需直接访问彼此…

phpstudy 进行 composer 全局配置

背景 因为注意到&#xff0c;使用 phpStudy 进行环境搭建时&#xff0c;有时需要使用 composer 每次都需要查找资料进行配置&#xff0c; 在此进行记录笔记&#xff0c;方便有需要的道友借鉴 配置 版本&#xff1a;composer1.8.5&#xff08;phpStudy8 当前只能安装这一个版本…

Flink作业调度的9种状态

1.什么是作业调度 Flink 通过 Task Slots 来定义执行资源。每个 TaskManager 有一到多个 task slot&#xff0c;每个 task slot 可以运行一条由多个并行 task 组成的流水线。 这样一条流水线由多个连续的 task 组成&#xff0c;比如并行度为 n 的 MapFunction 和 并行度为 n 的…

省份数量(力扣)深度优先 JAVA

有 n 个城市&#xff0c;其中一些彼此相连&#xff0c;另一些没有相连。如果城市 a 与城市 b 直接相连&#xff0c;且城市 b 与城市 c 直接相连&#xff0c;那么城市 a 与城市c 间接相连。 省份 是一组直接或间接相连的城市&#xff0c;组内不含其他没有相连的城市。 给你一个 …