srs集群下行edge处理逻辑

官方关于源站集群的介绍:

Origin Cluster | SRS

下行边缘是指观众端从边缘edge拉流,边缘edge回源到源站origin节点拉流,然后再

把流转给客户端

边缘处理类SrsPlayEdge

当服务器收到播放请求时,创建对应的consumer消费者。在创建消费者consumer时会判断当前服务器的类型,如果服务器是边缘edge,就通过play_edge进行处理。每一个SrsLiveSource都有一个对应的 SrsPlayEdge *play_edge,如果配置文件指定了remote才开启边缘逻辑。

srs_error_t SrsLiveSource::create_consumer(ISrsConnection* conn, SrsLiveConsumer*& consumer)
{srs_error_t err = srs_success;consumer = new SrsLiveConsumer(this, conn);consumers.push_back(consumer);if (conn != NULL) {conn->srsConsumer = consumer;}// There should be one consumer, so reset the timeout.stream_die_at_ = 0;publisher_idle_at_ = 0;//通过配置文件中的参数,判断是否是边缘服务器//如果是边缘服务器,则调用 play_edge进行拉流播放//SrsPlayEdge* play_edge;// for edge, when play edge stream, check the stateif (_srs_config->get_vhost_is_edge(req->vhost)) {// notice edge to start for the first client.if ((err = play_edge->on_client_play()) != srs_success) {return srs_error_wrap(err, "play edge");}}return err;
}

SrsPlayEdge会通过SrsEdgeIngester进行拉流

srs_error_t SrsPlayEdge::on_client_play()
{srs_error_t err = srs_success;//SrsEdgeIngester ingester 启动一个新的协程去源站拉流// start ingest when init state.if (state == SrsEdgeStateInit) {state = SrsEdgeStatePlay;err = ingester->start();} else if (state == SrsEdgeStateIngestStopping) {return srs_error_new(ERROR_RTMP_EDGE_PLAY_STATE, "state is stopping");}return err;
}

拉流类SrsEdgeIngester

SrsEdgeIngester会启动一个协程SrsSTCoroutine进行拉流处理 

srs_error_t SrsEdgeIngester::start()
{srs_error_t err = srs_success;if ((err = source->on_publish()) != srs_success) {return srs_error_wrap(err, "notify source");}srs_freep(trd);trd = new SrsSTCoroutine("edge-igs", this);if ((err = trd->start()) != srs_success) {return srs_error_wrap(err, "coroutine");}return err;
}

真正拉流类 SrsEdgeUpstream

协程会有一个while循环不停的去拉流,目前边缘回源拉流支持两种协议rtmp和flv,根据配置参数创建对应的拉流对象

srs_error_t SrsEdgeIngester::do_cycle()
{while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "do cycle pull");}// Use protocol in config.string edge_protocol = _srs_config->get_vhost_edge_protocol(req->vhost);// If follow client protocol, change to protocol of client.bool follow_client = _srs_config->get_vhost_edge_follow_client(req->vhost);if (follow_client && !req->protocol.empty()) {edge_protocol = req->protocol;}// Create object by protocol.srs_freep(upstream);//根据边缘协议创建对应的拉流类if (edge_protocol == "flv" || edge_protocol == "flvs") {upstream = new SrsEdgeFlvUpstream(edge_protocol == "flv"? "http" : "https");} else {upstream = new SrsEdgeRtmpUpstream(redirect);}if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {return srs_error_wrap(err, "on source id changed");}//边缘服务连接源站服务,一般源站会部署多个节点,边缘选取源站节点时也是通过RoundRobin算法选取//其中一个节点进行拉流//这里需要注意一点,如果负载到一台没有流的源站节点上怎么办?//其实如果发现连接的源站没有流,会触发302 redirect重连逻辑if ((err = upstream->connect(req, lb)) != srs_success) {return srs_error_wrap(err, "connect upstream");}if ((err = edge->on_ingest_play()) != srs_success) {return srs_error_wrap(err, "notify edge play");}// set to larger timeout to read av data from origin.upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);//拉流处理函数err = ingest(redirect);if (srs_is_client_gracefully_close(err)) {srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str());srs_error_reset(err);}break;}}

拉流源站没有流触发302

边缘服务通过负载均衡获取源站节点 ,然后去源站拉流,如果当前源站节点没有流,会触发320 redirect 重定向另一台。srs目前会重试三次,如果三次之后还是拉不到流,就认为失败了

srs_error_t SrsEdgeFlvUpstream::do_connect(SrsRequest* r, SrsLbRoundRobin* lb, int redirect_depth)
{//第一次连接源站节点时 redirect_depth = 0,通过lb->select负载均衡随机选择一台//如果连接的源站没有流,触发302,再连接另一台if (redirect_depth == 0) {SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);// @see https://github.com/ossrs/srs/issues/79// when origin is error, for instance, server is shutdown,// then user remove the vhost then reload, the conf is empty.if (!conf) {return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str());}// select the origin.std::string server = lb->select(conf->args);int port = SRS_DEFAULT_HTTP_PORT;if (schema_ == "https") {port = SRS_DEFAULT_HTTPS_PORT;}srs_parse_hostport(server, server, port);// Remember the current selected server.selected_ip = server;selected_port = port;} else {// If HTTP redirect, use the server in location.schema_ = req->schema;selected_ip = req->host;selected_port = req->port;}sdk_ = new SrsHttpClient();if ((err = sdk_->initialize(schema_, selected_ip, selected_port, cto)) != srs_success) {return srs_error_wrap(err, "edge pull %s failed, cto=%dms.", url.c_str(), srsu2msi(cto));}if ((err = sdk_->get(path, "", &hr_)) != srs_success) {return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());}if (hr_->status_code() == 404) {return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());}if ((err = sdk_->get(path, "", &hr_)) != srs_success) {return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());}if (hr_->status_code() == 404) {return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());}//如果状态码为302,开启重连另一台逻辑string location;if (hr_->status_code() == 302) {//获取302返回的地址location = hr_->header()->get("Location");}srs_trace("Edge: Connect to %s ok, status=%d, location=%s", url.c_str(), hr_->status_code(), location.c_str());if (hr_->status_code() == 302) {//最多重试三次if (redirect_depth >= 3) {return srs_error_new(ERROR_HTTP_302_INVALID, "redirect to %s fail, depth=%d", location.c_str(), redirect_depth);}string app;string stream_name;if (true) {string tcUrl;srs_parse_rtmp_url(location, tcUrl, stream_name);int port;string schema, host, vhost, param;srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);r->schema = schema; r->host = host; r->port = port;r->app = app; r->stream = stream_name; r->param = param;}//重连return do_connect(r, lb, redirect_depth + 1);}
}

回源拉流的逻辑

边缘节点连接源站成功后,即找到有流的源站,然后就开始通过upstream进行拉流

srs_error_t SrsEdgeIngester::ingest(string& redirect)
{while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "thread quit");}pprint->elapse();// pithy printif (pprint->can_print()) {upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());}// read from client.SrsCommonMessage* msg = NULL;//upstream拉流if ((err = upstream->recv_message(&msg)) != srs_success) {return srs_error_wrap(err, "recv message");}srs_assert(msg);SrsAutoFree(SrsCommonMessage, msg);//处理拉到的流if ((err = process_publish_message(msg, redirect)) != srs_success) {return srs_error_wrap(err, "process message");}}
}

处理拉到的流,拉到流后和普通单节点就一样了,把流转给 SrsLiveSource ,然后再转给对应的consumer

srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{srs_error_t err = srs_success;// process audio packetif (msg->header.is_audio()) {if ((err = source->on_audio(msg)) != srs_success) {return srs_error_wrap(err, "source consume audio");}}// process video packetif (msg->header.is_video()) {if ((err = source->on_video(msg)) != srs_success) {return srs_error_wrap(err, "source consume video");}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/694018.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#_字段 属性 常量与只读

字段(旧称成员变量) 字段定义在类中(定义在函数等代码体中的变量称为局部变量)实例字段与该对象关联(类实例化之后才可调用)静态字段与该类型关联(为该类型的特征,可直接调用,无需具体至对象&am…

Docker后台启动镜像,如何查看日志信息

执行 docker run -d -p 9090:8080 core-backend-image 命令后,Docker 会在后台运行一个新的容器实例,并映射宿主机的 9090 端口到容器的 8080 端口。要查看启动的容器日志,您需要先获取容器的 ID 或名称,然后使用 docker logs 命令…

令牌颁发与管理服务

技术背景 在分布式系统中,令牌(Token)被广泛应用于身份认证成功后对系统的访问控制。在本文中,我们实现了一个简单的令牌颁发与管理服务,其中包含访问令牌(AccessToken)和刷新令牌(RefreshToken)两种类型的令牌。 功能需求 颁发刷新令牌和访问令牌验证和管理访问令牌…

Linux系统之iptables应用SNAT与DNAT

一、SNAT: 1.应用环境 局域网主机共享单个公网IP地址接入Internet (私有IP不能在Internet中正常路由) 2.SNAT原理 源地址转换,根据指定条件修改数据包的源IP地址,通常被叫做源映谢数据包从内网发送到公网时&#x…

CDH 6.x版本 HBase基础调优参数

参数默认值调优值参数解释hbase.regionserver.handler.count30120指定了每个RegionServer处理请求的最大线程数hbase.regionserver.metahandler.count30120指定了在RegionServer中处理Meta表的请求数量hbase.client.retries.number1015HBase客户端重试操作的最大次数hbase.hsto…

Java中的Enum

Enum是一个特殊的类 Java 中的枚举类型(enum)实际上是一种特殊的类。enum编译后是一个特殊的类(有些类似单例模式)这些实例在声明时就被创建,并在整个应用程序的生命周期中只存在一个实例。 enum 用于定义包含固定数量…

Fiddler与wireshark使用

Fiddler解决三个问题 1、SSL证书打勾,解析https请求 2、响应回来乱码,不是中文 3、想及时中止一下,查看实时的日志 4、搜索对应的关键字 问题1解决方案: 标签栏Tools下 找到https,全部打勾 Actions里面 第一个 t…

项目的一些难点

1.不用redis?分布式锁,如何防止用户重复点击? 1.乐观锁 乐观锁是一种在数据库层面上避免并发冲突的机制。它通常通过在数据库记录中添加一个版本号(或时间戳)来实现。每次更新记录时,都会检查版本号是否与数据库中的…

Luogu P6175 无向图的最小环问题 题解 Floyd

题目链接:Luogu P6175 无向图的最小环问题 题目描述: 给定一张带权无向图,求出经过至少三个不同的点的最小环,环的大小由经过的边权和决定。 题解: 我们首先回到Floyd算法,在Floyd算法中,我们定…

从输入url到页面显示中间发生了什么

文章目录 整体概述URL释义用户输入缓存处理域名解析IP 地址什么是域名解析浏览器查找域名对应IP小结 TCP 三次握手握手时序三次握手数据包分析为什么需要三次握手 HTTP 请求HTTP 响应服务器MVC 后台处理阶段http 响应报文 TCP 四次挥手浏览器渲染 整体概述 浏览器输入 URL 到页…

如何搭建Facebook直播网络?

在当今数字化时代,Facebook直播已经成为了一种极具吸引力的社交形式,为个人和企业提供了与观众直接互动的机会,成为推广产品、分享经验、建立品牌形象的重要途径。然而,对于许多人来说,搭建一个稳定、高质量的 Faceboo…

创意办公:专注 ONLYOFFICE,探索办公新境界

一.ONLYOFFICE 介绍 ONLYOFFICE 是一个基于 Web 的办公套件,提供了文档处理、电子表格和演示文稿编辑等功能。它被设计为一个协作工具,支持多人实时协作编辑文档,并且可以在本地部署或者作为云服务使用。 二.ONLYOFFICE 特点和功能 以下是 …

Unity调用文心-ERNIE-Bot-turbo

参考文章 ERNIE-Bot-turbo - 千帆大模型平台 | 百度智能云文档 (baidu.com) 错误码 - 千帆大模型平台 | 百度智能云文档 (baidu.com) private readonly string apiKey "";private readonly string secretKey "";private readonly string tokenUrl &q…

品牌渠道管控的目标是什么

品牌做渠道管控的根本原因是解决渠道中的各种问题,常见的渠道问题包含破价、窜货、假货等,在治理渠道的过程中,其实也是对渠道中各角色关系的梳理,比如通过治理破价链接,可以及时发现渠道中不符合品牌价值的经销商&…

十大基础排序算法

排序算法分类 排序:将一组对象按照某种逻辑顺序重新排列的过程。 按照待排序数据的规模分为: 内部排序:数据量不大,全部存在内存中;外部排序:数据量很大,无法一次性全部存在内存中,…

Vue2尚品汇前台项目笔记——(1)项目初始化

Vue2尚品汇前台项目笔记 一、项目初始化 使用[脚手架创建项目,具体参考之前的脚手架配置笔记,我起名叫vue_shop_test 1.脚手架目录分析 node_modules文件夹:项目依赖文件夹 public文件夹:一般放置一些静态资源(图…

Java项目:20 基于SSM实现的支教管理系统

作者主页:源码空间codegym 简介:Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 ssm支教管理系统(前台后台) 前台角色:支教学校志愿者 支教学校功能模块:支教学校查询报名职位…

3DTile是不是没有坐标的选择?

可参考以下内容: 一、坐标参考系统(CRS) 3D Tiles 使用右手笛卡尔坐标系;也就是说,x和y的叉积产生z。3D Tiles 将z轴定义为局部笛卡尔坐标系的向上。tileset的全局坐标系通常位于WGS 84地心固定(ECEF)参考系(EPSG4978)中,但它不是必须的&am…

【数据结构】单向循环链表

一、mian函数 #include <stdio.h> #include "./3.looplinklist.h" int main(int argc, const char *argv[]) {looplinklist* head create_looplinklist();insertHead_looplinklist(head,100);insertHead_looplinklist(head,200);insertHead_looplinklist(hea…

瑞盟MS5188N——16bit、8 通道、500kSPS、 SAR 型 ADC

产品简述 MS5188N 是 8 通道、 16bit 、电荷再分配逐次逼近型模数 转换器&#xff0c;采用单电源供电。 MS5188N 拥有多通道、低功耗数据采集系统所需的所有 组成部分&#xff0c;包括&#xff1a;无失码的真 16 位 SAR ADC &#xff1b;用于将输入配 置为单端输入…