srs官方关于边缘集群的介绍:
Edge Cluster | SRS
本篇分析一下边缘集群中上行边缘节点的处理逻辑。
关于上行的边缘节点:
SRS对于上行边缘,采取直接代理方式,并没有采取边缘缓存方式。所谓边缘缓存方式,即推流到边缘时边缘也会当作源站直接缓存(作为源站),然后转发给源站。边缘缓存方式看起来先进,这个边缘节点不必回源,实际上加大了集群的逻辑难度, 不如直接作为代理方式简单。
上行边缘SrsPublishEdge
上行边缘只做代理,收到流后直接转给源站
srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
{srs_error_t err = srs_success;//对于边缘edge,收到流后直接发给源站// for edge, directly proxy message to origin.if (info->edge) {if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: proxy publish");}return err;}
}// TODO: FIXME: Use edge strategy pattern.
srs_error_t SrsLiveSource::on_edge_proxy_publish(SrsCommonMessage* msg)
{return publish_edge->on_proxy_publish(msg);
}
//消息转给forwarder模块,即:SrsEdgeForwarder类
//该类在SrsPublishEdge的构造函数中进行初始化
//并且在收到publish消息时,开启一个协程去队列中读取数据
srs_error_t SrsPublishEdge::on_proxy_publish(SrsCommonMessage* msg)
{return forwarder->proxy(msg);
}srs_error_t SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
{srs_error_t err = srs_success;if (send_error_code != ERROR_SUCCESS) {return srs_error_new(send_error_code, "edge forwarder");}//...//...SrsSharedPtrMessage copy;if ((err = copy.create(msg)) != srs_success) {return srs_error_wrap(err, "create message");}//forward模块收到消息后把消息放到队列中copy.stream_id = sdk->sid();if ((err = queue->enqueue(copy.copy())) != srs_success) {return srs_error_wrap(err, "enqueue message");}return err;
}
SrsEdgeForwarder转发消息给源站
真正的业务场景中,一般源站会部署多台组成一个集群,所以当一个边缘收到流后,需要选择向集群中的哪台源站进行推流。srs选取源站的负载均衡算法是RoundRobin
string SrsLbRoundRobin::select(const vector<string>& servers)
{srs_assert(!servers.empty());index = (int)(count++ % servers.size());elem = servers.at(index);return elem;
}
//通过负载均衡算法选取了源站后,边缘节点会创建一个rtmp client然后把流转推给该源站节点
srs_error_t SrsEdgeForwarder::start()
{std::string url;if (true) {//从配置文件中读取源站地址列表SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);srs_assert(conf);//通过 roundrobin算法选取合适的源站节点// select the origin.std::string server = lb->select(conf->args);int port = SRS_CONSTS_RTMP_DEFAULT_PORT;srs_parse_hostport(server, server, port);// support vhost tranform for edge,std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);vhost = srs_string_replace(vhost, "[vhost]", req->vhost);//生成向源站推流所用的url地址url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);}//创建一个rtmp client向源站推流sdk = new SrsSimpleRtmpClient(url, cto, sto);if ((err = sdk->connect()) != srs_success) {return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));}// For RTMP client, we pass the vhost in tcUrl when connecting,// so we publish without vhost in stream.string stream;if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost), false, &stream)) != srs_success) {return srs_error_wrap(err, "sdk publish");}
}
SrsEdgeForwarder协程中发送
srs_error_t SrsEdgeForwarder::do_cycle()
{while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "edge forward pull");}// forward all messages.// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.//首先从队列中取出要发送的消息int count = 0;if ((err = queue->dump_packets(msgs.max, msgs.msgs, count)) != srs_success) {return srs_error_wrap(err, "queue dumps packets");}//通过sdk把数据发给源站节点// sendout messages, all messages are freed by send_and_free_messages().if ((err = sdk->send_and_free_messages(msgs.msgs, count)) != srs_success) {return srs_error_wrap(err, "send messages");}}
}