srs集群下行edge处理逻辑

官方关于源站集群的介绍:

Origin Cluster | SRS

下行边缘是指观众端从边缘edge拉流,边缘edge回源到源站origin节点拉流,然后再

把流转给客户端

边缘处理类SrsPlayEdge

当服务器收到播放请求时,创建对应的consumer消费者。在创建消费者consumer时会判断当前服务器的类型,如果服务器是边缘edge,就通过play_edge进行处理。每一个SrsLiveSource都有一个对应的 SrsPlayEdge *play_edge,如果配置文件指定了remote才开启边缘逻辑。

srs_error_t SrsLiveSource::create_consumer(ISrsConnection* conn, SrsLiveConsumer*& consumer)
{srs_error_t err = srs_success;consumer = new SrsLiveConsumer(this, conn);consumers.push_back(consumer);if (conn != NULL) {conn->srsConsumer = consumer;}// There should be one consumer, so reset the timeout.stream_die_at_ = 0;publisher_idle_at_ = 0;//通过配置文件中的参数,判断是否是边缘服务器//如果是边缘服务器,则调用 play_edge进行拉流播放//SrsPlayEdge* play_edge;// for edge, when play edge stream, check the stateif (_srs_config->get_vhost_is_edge(req->vhost)) {// notice edge to start for the first client.if ((err = play_edge->on_client_play()) != srs_success) {return srs_error_wrap(err, "play edge");}}return err;
}

SrsPlayEdge会通过SrsEdgeIngester进行拉流

srs_error_t SrsPlayEdge::on_client_play()
{srs_error_t err = srs_success;//SrsEdgeIngester ingester 启动一个新的协程去源站拉流// start ingest when init state.if (state == SrsEdgeStateInit) {state = SrsEdgeStatePlay;err = ingester->start();} else if (state == SrsEdgeStateIngestStopping) {return srs_error_new(ERROR_RTMP_EDGE_PLAY_STATE, "state is stopping");}return err;
}

拉流类SrsEdgeIngester

SrsEdgeIngester会启动一个协程SrsSTCoroutine进行拉流处理 

srs_error_t SrsEdgeIngester::start()
{srs_error_t err = srs_success;if ((err = source->on_publish()) != srs_success) {return srs_error_wrap(err, "notify source");}srs_freep(trd);trd = new SrsSTCoroutine("edge-igs", this);if ((err = trd->start()) != srs_success) {return srs_error_wrap(err, "coroutine");}return err;
}

真正拉流类 SrsEdgeUpstream

协程会有一个while循环不停的去拉流,目前边缘回源拉流支持两种协议rtmp和flv,根据配置参数创建对应的拉流对象

srs_error_t SrsEdgeIngester::do_cycle()
{while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "do cycle pull");}// Use protocol in config.string edge_protocol = _srs_config->get_vhost_edge_protocol(req->vhost);// If follow client protocol, change to protocol of client.bool follow_client = _srs_config->get_vhost_edge_follow_client(req->vhost);if (follow_client && !req->protocol.empty()) {edge_protocol = req->protocol;}// Create object by protocol.srs_freep(upstream);//根据边缘协议创建对应的拉流类if (edge_protocol == "flv" || edge_protocol == "flvs") {upstream = new SrsEdgeFlvUpstream(edge_protocol == "flv"? "http" : "https");} else {upstream = new SrsEdgeRtmpUpstream(redirect);}if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {return srs_error_wrap(err, "on source id changed");}//边缘服务连接源站服务,一般源站会部署多个节点,边缘选取源站节点时也是通过RoundRobin算法选取//其中一个节点进行拉流//这里需要注意一点,如果负载到一台没有流的源站节点上怎么办?//其实如果发现连接的源站没有流,会触发302 redirect重连逻辑if ((err = upstream->connect(req, lb)) != srs_success) {return srs_error_wrap(err, "connect upstream");}if ((err = edge->on_ingest_play()) != srs_success) {return srs_error_wrap(err, "notify edge play");}// set to larger timeout to read av data from origin.upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);//拉流处理函数err = ingest(redirect);if (srs_is_client_gracefully_close(err)) {srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str());srs_error_reset(err);}break;}}

拉流源站没有流触发302

边缘服务通过负载均衡获取源站节点 ,然后去源站拉流,如果当前源站节点没有流,会触发320 redirect 重定向另一台。srs目前会重试三次,如果三次之后还是拉不到流,就认为失败了

srs_error_t SrsEdgeFlvUpstream::do_connect(SrsRequest* r, SrsLbRoundRobin* lb, int redirect_depth)
{//第一次连接源站节点时 redirect_depth = 0,通过lb->select负载均衡随机选择一台//如果连接的源站没有流,触发302,再连接另一台if (redirect_depth == 0) {SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);// @see https://github.com/ossrs/srs/issues/79// when origin is error, for instance, server is shutdown,// then user remove the vhost then reload, the conf is empty.if (!conf) {return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str());}// select the origin.std::string server = lb->select(conf->args);int port = SRS_DEFAULT_HTTP_PORT;if (schema_ == "https") {port = SRS_DEFAULT_HTTPS_PORT;}srs_parse_hostport(server, server, port);// Remember the current selected server.selected_ip = server;selected_port = port;} else {// If HTTP redirect, use the server in location.schema_ = req->schema;selected_ip = req->host;selected_port = req->port;}sdk_ = new SrsHttpClient();if ((err = sdk_->initialize(schema_, selected_ip, selected_port, cto)) != srs_success) {return srs_error_wrap(err, "edge pull %s failed, cto=%dms.", url.c_str(), srsu2msi(cto));}if ((err = sdk_->get(path, "", &hr_)) != srs_success) {return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());}if (hr_->status_code() == 404) {return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());}if ((err = sdk_->get(path, "", &hr_)) != srs_success) {return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());}if (hr_->status_code() == 404) {return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());}//如果状态码为302,开启重连另一台逻辑string location;if (hr_->status_code() == 302) {//获取302返回的地址location = hr_->header()->get("Location");}srs_trace("Edge: Connect to %s ok, status=%d, location=%s", url.c_str(), hr_->status_code(), location.c_str());if (hr_->status_code() == 302) {//最多重试三次if (redirect_depth >= 3) {return srs_error_new(ERROR_HTTP_302_INVALID, "redirect to %s fail, depth=%d", location.c_str(), redirect_depth);}string app;string stream_name;if (true) {string tcUrl;srs_parse_rtmp_url(location, tcUrl, stream_name);int port;string schema, host, vhost, param;srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);r->schema = schema; r->host = host; r->port = port;r->app = app; r->stream = stream_name; r->param = param;}//重连return do_connect(r, lb, redirect_depth + 1);}
}

回源拉流的逻辑

边缘节点连接源站成功后,即找到有流的源站,然后就开始通过upstream进行拉流

srs_error_t SrsEdgeIngester::ingest(string& redirect)
{while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "thread quit");}pprint->elapse();// pithy printif (pprint->can_print()) {upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());}// read from client.SrsCommonMessage* msg = NULL;//upstream拉流if ((err = upstream->recv_message(&msg)) != srs_success) {return srs_error_wrap(err, "recv message");}srs_assert(msg);SrsAutoFree(SrsCommonMessage, msg);//处理拉到的流if ((err = process_publish_message(msg, redirect)) != srs_success) {return srs_error_wrap(err, "process message");}}
}

处理拉到的流,拉到流后和普通单节点就一样了,把流转给 SrsLiveSource ,然后再转给对应的consumer

srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{srs_error_t err = srs_success;// process audio packetif (msg->header.is_audio()) {if ((err = source->on_audio(msg)) != srs_success) {return srs_error_wrap(err, "source consume audio");}}// process video packetif (msg->header.is_video()) {if ((err = source->on_video(msg)) != srs_success) {return srs_error_wrap(err, "source consume video");}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/694018.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker后台启动镜像,如何查看日志信息

执行 docker run -d -p 9090:8080 core-backend-image 命令后,Docker 会在后台运行一个新的容器实例,并映射宿主机的 9090 端口到容器的 8080 端口。要查看启动的容器日志,您需要先获取容器的 ID 或名称,然后使用 docker logs 命令…

Linux系统之iptables应用SNAT与DNAT

一、SNAT: 1.应用环境 局域网主机共享单个公网IP地址接入Internet (私有IP不能在Internet中正常路由) 2.SNAT原理 源地址转换,根据指定条件修改数据包的源IP地址,通常被叫做源映谢数据包从内网发送到公网时&#x…

Fiddler与wireshark使用

Fiddler解决三个问题 1、SSL证书打勾,解析https请求 2、响应回来乱码,不是中文 3、想及时中止一下,查看实时的日志 4、搜索对应的关键字 问题1解决方案: 标签栏Tools下 找到https,全部打勾 Actions里面 第一个 t…

从输入url到页面显示中间发生了什么

文章目录 整体概述URL释义用户输入缓存处理域名解析IP 地址什么是域名解析浏览器查找域名对应IP小结 TCP 三次握手握手时序三次握手数据包分析为什么需要三次握手 HTTP 请求HTTP 响应服务器MVC 后台处理阶段http 响应报文 TCP 四次挥手浏览器渲染 整体概述 浏览器输入 URL 到页…

如何搭建Facebook直播网络?

在当今数字化时代,Facebook直播已经成为了一种极具吸引力的社交形式,为个人和企业提供了与观众直接互动的机会,成为推广产品、分享经验、建立品牌形象的重要途径。然而,对于许多人来说,搭建一个稳定、高质量的 Faceboo…

创意办公:专注 ONLYOFFICE,探索办公新境界

一.ONLYOFFICE 介绍 ONLYOFFICE 是一个基于 Web 的办公套件,提供了文档处理、电子表格和演示文稿编辑等功能。它被设计为一个协作工具,支持多人实时协作编辑文档,并且可以在本地部署或者作为云服务使用。 二.ONLYOFFICE 特点和功能 以下是 …

品牌渠道管控的目标是什么

品牌做渠道管控的根本原因是解决渠道中的各种问题,常见的渠道问题包含破价、窜货、假货等,在治理渠道的过程中,其实也是对渠道中各角色关系的梳理,比如通过治理破价链接,可以及时发现渠道中不符合品牌价值的经销商&…

十大基础排序算法

排序算法分类 排序:将一组对象按照某种逻辑顺序重新排列的过程。 按照待排序数据的规模分为: 内部排序:数据量不大,全部存在内存中;外部排序:数据量很大,无法一次性全部存在内存中,…

Vue2尚品汇前台项目笔记——(1)项目初始化

Vue2尚品汇前台项目笔记 一、项目初始化 使用[脚手架创建项目,具体参考之前的脚手架配置笔记,我起名叫vue_shop_test 1.脚手架目录分析 node_modules文件夹:项目依赖文件夹 public文件夹:一般放置一些静态资源(图…

Java项目:20 基于SSM实现的支教管理系统

作者主页:源码空间codegym 简介:Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 ssm支教管理系统(前台后台) 前台角色:支教学校志愿者 支教学校功能模块:支教学校查询报名职位…

【数据结构】单向循环链表

一、mian函数 #include <stdio.h> #include "./3.looplinklist.h" int main(int argc, const char *argv[]) {looplinklist* head create_looplinklist();insertHead_looplinklist(head,100);insertHead_looplinklist(head,200);insertHead_looplinklist(hea…

瑞盟MS5188N——16bit、8 通道、500kSPS、 SAR 型 ADC

产品简述 MS5188N 是 8 通道、 16bit 、电荷再分配逐次逼近型模数 转换器&#xff0c;采用单电源供电。 MS5188N 拥有多通道、低功耗数据采集系统所需的所有 组成部分&#xff0c;包括&#xff1a;无失码的真 16 位 SAR ADC &#xff1b;用于将输入配 置为单端输入…

【Flink状态管理(八)】Checkpoint:CheckpointBarrier对齐后Checkpoint的完成、通知与对学习状态管理源码的思考

文章目录 一. 调用StreamTask执行Checkpoint操作1. 执行Checkpoint总体代码流程1.1. StreamTask.checkpointState()1.2. executeCheckpointing1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中1.4. 算子状态进行快照1.5. 状态数据快照持久化 二. CheckpointCoordin…

图的遍历-----深度优先遍历(dfs),广度优先遍历(bfs)【java详解】

目录 简单介绍&#xff1a;什么是深度、广度优先遍历&#xff1f; 深度优先搜索&#xff08;DFS&#xff0c;Depth First Search&#xff09;&#xff1a; 大致图解&#xff1a; 广度优先搜索&#xff08;BFS&#xff0c;Breadth First Search&#xff09;&#xff1a; 大致图…

Python学习笔记——自定义函数(传递任意数量的实参)

Python允许函数从调用语句中收集任意数量的实参。例如下面自定义函数制作一个披萨&#xff0c;它需要接受很多配料&#xff0c;但无法预先确定顾客要点多少种配料。 下面行数只有一个形参*toppings&#xff0c;不管调用语句提供多少个实参&#xff0c;这个参数都会收集到&…

用 LangChain 和 Milvus 从零搭建 LLM 应用

如何从零搭建一个 LLM 应用&#xff1f;不妨试试 LangChain Milvus 的组合拳。 作为开发 LLM 应用的框架&#xff0c;LangChain 内部不仅包含诸多模块&#xff0c;而且支持外部集成&#xff1b;Milvus 同样可以支持诸多 LLM 集成&#xff0c;二者结合除了可以轻松搭建一个 LL…

在Discord上添加自己的服务器并邀请midjourney机器人加入

我开发的chatgpt网站&#xff1a; https://chat.xutongbao.top

vue2--多设备访问本地调试项目

背景 在vue2开发阶段&#xff0c;为了更好的和小伙伴对项目进行讨论&#xff0c;需要让小伙伴可以看到自己本地的项目。 方案 修改vue2项目配置 在本地进行项目调试时&#xff0c;都是使用的127.0.0.1:8080&#xff0c;为了让局域网中的其他小伙伴可以访问&#xff0c;需要…

阿里云国际-在阿里云服务器上快速搭建幻兽帕鲁多人服务器

幻兽帕鲁是最近流行的新型生存游戏。该游戏一夜之间变得极为流行&#xff0c;同时在线玩家数量达到了200万。然而&#xff0c;幻兽帕鲁的服务器难以应对大量玩家的压力。为解决这一问题&#xff0c;幻兽帕鲁允许玩家建立专用服务器&#xff0c;其提供以下优势&#xff1a; &am…

Docker中如何删除某个镜像

1. 停止使用镜像的容器 首先&#xff0c;您需要停止所有正在使用该镜像的容器。您可以使用 docker stop 命令来停止容器&#xff1a; docker stop 11184993a106如果有多个容器使用该镜像&#xff0c;您需要对每个容器都执行停止命令。您可以通过 docker ps -a | grep core-ba…