通过前面的文章我们可以了解到,当创建好Transport的时候,socket已经建立好。具备了相应的网络传输能力。我们来看一下socket接收到数据是如何处理的。
UdpSocketHandler::OnUvRecv
Socket接收数据
inline void UdpSocketHandler::OnUvRecv(ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned int flags)
{MS_TRACE();// NOTE: Ignore if there is nothing to read or if it was an empty datagram.if (nread == 0)return;// Check flags.if ((flags & UV_UDP_PARTIAL) != 0u){MS_ERROR("received datagram was truncated due to insufficient buffer, ignoring it");return;}// Data received.if (nread > 0){// Update received bytes.更新接收字节。this->recvBytes += nread;// Notify the subclass.通知子类。UdpSocket 是其子类UserOnUdpDatagramReceived(reinterpret_cast<uint8_t*>(buf->base), nread, addr);}// Some error.else{MS_DEBUG_DEV("read error: %s", uv_strerror(nread));}
}
UserOnUdpDatagramReceived
具体由UdpSocket其子类实现,其中listener是在创建transport创建时的具体transport
void UdpSocket::UserOnUdpDatagramReceived(const uint8_t* data, size_t len, const struct sockaddr* addr){MS_TRACE();if (!this->listener){MS_ERROR("no listener set");return;}// Notify the reader.通知读者。this->listener->OnUdpSocketPacketReceived(this, data, len, addr);}
OnUdpSocketPacketReceived
以PlainTransport为例
//从udpsocket获得了接收数据inline void PlainTransport::OnUdpSocketPacketReceived(RTC::UdpSocket* socket, const uint8_t* data, size_t len, const struct sockaddr* remoteAddr){MS_TRACE();//形成元组,记录IP等内容RTC::TransportTuple tuple(socket, remoteAddr);//进入到当前transport处理OnPacketReceived(&tuple, data, len);}
PlainTransport::OnPacketReceived
inline void PlainTransport::OnPacketReceived(RTC::TransportTuple* tuple, const uint8_t* data, size_t len){MS_TRACE();// Increase receive transmission.增加接收传输。RTC::Transport::DataReceived(len);// Check if it's RTCP.检查它是否是RTCP。if (RTC::RTCP::Packet::IsRtcp(data, len)){OnRtcpDataReceived(tuple, data, len);}// Check if it's RTP.检查它是否是RTP。else if (RTC::RtpPacket::IsRtp(data, len)){OnRtpDataReceived(tuple, data, len);}// Check if it's SCTP.检查它是否是SCTP。else if (RTC::SctpAssociation::IsSctp(data, len)){OnSctpDataReceived(tuple, data, len);}else{MS_WARN_DEV("ignoring received packet of unknown type");}}
RTP数据处理方式
首先来处理是不是加密的RTP数据;然后根据既定格式重构RTP数据为Packet;最后透传整理好的packet到上层Transport
inline void PlainTransport::OnRtpDataReceived(RTC::TransportTuple* tuple, const uint8_t* data, size_t len){MS_TRACE();if (HasSrtp() && !IsSrtpReady())return;// Decrypt the SRTP packet.解密SRTP报文。auto intLen = static_cast<int>(len);if (HasSrtp() && !this->srtpRecvSession->DecryptSrtp(const_cast<uint8_t*>(data), &intLen)){RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, static_cast<size_t>(intLen));if (!packet){MS_WARN_TAG(srtp, "DecryptSrtp() failed due to an invalid RTP packet");}else{MS_WARN_TAG(srtp,"DecryptSrtp() failed [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", seq:%" PRIu16 "]",packet->GetSsrc(),packet->GetPayloadType(),packet->GetSequenceNumber());delete packet;}return;}//解析socket数据,获取格式化后的RtpPacketRTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, static_cast<size_t>(intLen));if (!packet){MS_WARN_TAG(rtp, "received data is not a valid RTP packet");return;}// If we don't have a RTP tuple yet, check whether comedia mode is set.if (!this->tuple){if (!this->comedia){MS_DEBUG_TAG(rtp, "ignoring RTP packet while not connected");// Remove this SSRC.RecvStreamClosed(packet->GetSsrc());delete packet;return;}MS_DEBUG_TAG(rtp, "setting RTP tuple (comedia mode enabled)");auto wasConnected = IsConnected();this->tuple = new RTC::TransportTuple(tuple);if (!this->listenIp.announcedIp.empty())this->tuple->SetLocalAnnouncedIp(this->listenIp.announcedIp);// If not yet connected do it now.if (!wasConnected){// Notify the Node PlainTransport.json data = json::object();this->tuple->FillJson(data["tuple"]);this->shared->channelNotifier->Emit(this->id, "tuple", data);RTC::Transport::Connected();}}// Otherwise, if RTP tuple is set, verify that it matches the origin// of the packet.else if (!this->tuple->Compare(tuple)){MS_DEBUG_TAG(rtp, "ignoring RTP packet from unknown IP:port");// Remove this SSRC.RecvStreamClosed(packet->GetSsrc());delete packet;return;}// Pass the packet to the parent transport.将数据包传递给父传输。RTC::Transport::ReceiveRtpPacket(packet);}
Transport::ReceiveRtpPacket
//当前调用来源于子类的OnRtpDataReceived中触发了当前接口void Transport::ReceiveRtpPacket(RTC::RtpPacket* packet){MS_TRACE();packet->logger.recvTransportId = this->id;// Apply the Transport RTP header extension ids so the RTP listener can use them.// 应用传输RTP报头扩展id,以便RTP侦听器可以使用它们。packet->SetMidExtensionId(this->recvRtpHeaderExtensionIds.mid);packet->SetRidExtensionId(this->recvRtpHeaderExtensionIds.rid);packet->SetRepairedRidExtensionId(this->recvRtpHeaderExtensionIds.rrid);packet->SetAbsSendTimeExtensionId(this->recvRtpHeaderExtensionIds.absSendTime);packet->SetTransportWideCc01ExtensionId(this->recvRtpHeaderExtensionIds.transportWideCc01);auto nowMs = DepLibUV::GetTimeMs();// Feed the TransportCongestionControlServer.if (this->tccServer){this->tccServer->IncomingPacket(nowMs, packet);}// Get the associated Producer./*根据收到的packet,查找关联的producer。*/RTC::Producer* producer = this->rtpListener.GetProducer(packet);if (!producer){packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::PRODUCER_NOT_FOUND);MS_WARN_TAG(rtp,"no suitable Producer for received RTP packet [ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]",packet->GetSsrc(),packet->GetPayloadType());// Tell the child class to remove this SSRC.告诉子类删除这个SSRC。RecvStreamClosed(packet->GetSsrc());delete packet;return;}// MS_DEBUG_DEV(// "RTP packet received [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", producerId:%s]",// packet->GetSsrc(),// packet->GetPayloadType(),// producer->id.c_str());// Pass the RTP packet to the corresponding Producer./*将packet传给指定的producer,进行下一步处理。*/auto result = producer->ReceiveRtpPacket(packet);switch (result)/*根据packet包类型不同,进行不同通道的码率统计。*/{case RTC::Producer::ReceiveRtpPacketResult::MEDIA:this->recvRtpTransmission.Update(packet);/*媒体通道的码率统计*/ break;case RTC::Producer::ReceiveRtpPacketResult::RETRANSMISSION:this->recvRtxTransmission.Update(packet); /*重传通道的码率统计*/ break;case RTC::Producer::ReceiveRtpPacketResult::DISCARDED:// Tell the child class to remove this SSRC.RecvStreamClosed(packet->GetSsrc());break;default:;}/*释放rtp包*/delete packet;}
Producer::ReceiveRtpPacket
/*接收到transport传入的packet,对packet进行指定的处理。*/Producer::ReceiveRtpPacketResult Producer::ReceiveRtpPacket(RTC::RtpPacket* packet){MS_TRACE();packet->logger.producerId = this->id;// Reset current packet./*重置当前正在处理的packet*/this->currentRtpPacket = nullptr;// Count number of RTP streams.统计当前接收流的数目auto numRtpStreamsBefore = this->mapSsrcRtpStream.size();/*通过packet,获取对应的接收流。*/auto* rtpStream = GetRtpStream(packet);if (!rtpStream)/*没有查找到对应的rtp接收流*/{MS_WARN_TAG(rtp, "no stream found for received packet [ssrc:%" PRIu32 "]", packet->GetSsrc());packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_NOT_FOUND);return ReceiveRtpPacketResult::DISCARDED;/*将packet丢弃*/}// Pre-process the packet./*对packet进行预处理:如果是视频,则添加头部扩展id。*/PreProcessRtpPacket(packet);ReceiveRtpPacketResult result;bool isRtx{ false };/*packet是否是rtx流中的packet*/// Media packet./*是主流中的rtp包*/if (packet->GetSsrc() == rtpStream->GetSsrc()){ /*设置返回结果,表示是媒体流,视频流或音频流。*/result = ReceiveRtpPacketResult::MEDIA;// Process the packet./*rtp接收流处理接收到的packet*/if (!rtpStream->ReceivePacket(packet)){// May have to announce a new RTP stream to the listener./*如果添加了新的rtp接收流,则通知其订阅者。*/if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore)NotifyNewRtpStream(rtpStream); /*最终通知到的是与producer相关的consumer*/packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_DISCARDED);return result;}}// RTX packet./*重传流中的rtp包*/else if (packet->GetSsrc() == rtpStream->GetRtxSsrc()){result = ReceiveRtpPacketResult::RETRANSMISSION;isRtx = true;// Process the packet./*rtp接收流处理重传流中的packet*/if (!rtpStream->ReceiveRtxPacket(packet)){packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_NOT_FOUND);return result;}}// Should not happen.else{MS_ABORT("found stream does not match received packet");}/*判断packet是否是关键帧中的包*/if (packet->IsKeyFrame()){MS_DEBUG_TAG(rtp,"key frame received [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",packet->GetSsrc(),packet->GetSequenceNumber());// Tell the keyFrameRequestManager.if (this->keyFrameRequestManager)this->keyFrameRequestManager->KeyFrameReceived(packet->GetSsrc()); /*更新关键帧*/}// May have to announce a new RTP stream to the listener.if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore){// Request a key frame for this stream since we may have lost the first packets// (do not do it if this is a key frame).if (this->keyFrameRequestManager && !this->paused && !packet->IsKeyFrame())this->keyFrameRequestManager->ForceKeyFrameNeeded(packet->GetSsrc());// Update current packet.this->currentRtpPacket = packet;NotifyNewRtpStream(rtpStream);// Reset current packet.this->currentRtpPacket = nullptr;}// If paused stop here.if (this->paused)return result;// May emit 'trace' event.EmitTraceEventRtpAndKeyFrameTypes(packet, isRtx);// Mangle the packet before providing the listener with it./*在将packet发布至其订阅者之前,对其进行倾轧。主要进行payload type,ssrc,header extension的处理*/if (!MangleRtpPacket(packet, rtpStream))return ReceiveRtpPacketResult::DISCARDED;// Post-process the packet./*最后再对packet进行一次处理*/PostProcessRtpPacket(packet);/*将处理后的packet,发送到其订阅者transport中。*/this->listener->OnProducerRtpPacketReceived(this, packet);return result;}
向上传递到Transport层
inline void Transport::OnProducerRtpPacketReceived(RTC::Producer* producer, RTC::RtpPacket* packet){MS_TRACE();//listener是上层的Routerthis->listener->OnTransportProducerRtpPacketReceived(this, producer, packet);}
向上传递到Router层
inline void Router::OnTransportProducerRtpPacketReceived(RTC::Transport* /*transport*/, RTC::Producer* producer, RTC::RtpPacket* packet){MS_TRACE();packet->logger.routerId = this->id;//通过生产者,所以出订阅者列表auto& consumers = this->mapProducerConsumers.at(producer);//如果存在对应的订阅者if (!consumers.empty()){// Cloned ref-counted packet that RtpStreamSend will store for as long as// needed avoiding multiple allocations unless absolutely necessary.// Clone only happens if needed.std::shared_ptr<RTC::RtpPacket> sharedPacket;for (auto* consumer : consumers){// Update MID RTP extension value.const auto& mid = consumer->GetRtpParameters().mid;if (!mid.empty())packet->UpdateMid(mid);//发送RTP数据consumer->SendRtpPacket(packet, sharedPacket);}}auto it = this->mapProducerRtpObservers.find(producer);if (it != this->mapProducerRtpObservers.end()){auto& rtpObservers = it->second;for (auto* rtpObserver : rtpObservers){rtpObserver->ReceiveRtpPacket(producer, packet);}}}
具体transport通道转发数据
void PlainTransport::SendRtpPacket(RTC::Consumer* /*consumer*/, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb){MS_TRACE();if (!IsConnected()){if (cb){(*cb)(false);delete cb;}return;}const uint8_t* data = packet->GetData();auto intLen = static_cast<int>(packet->GetSize());if (HasSrtp() && !this->srtpSendSession->EncryptRtp(&data, &intLen)){if (cb){(*cb)(false);delete cb;}return;}auto len = static_cast<size_t>(intLen);//使用元组获发送RTP数据this->tuple->Send(data, len, cb);// Increase send transmission.增加发送传输的数据大小。RTC::Transport::DataSent(len);}
void Send(const uint8_t* data, size_t len, RTC::TransportTuple::onSendCallback* cb = nullptr){if (this->protocol == Protocol::UDP)this->udpSocket->Send(data, len, this->udpRemoteAddr, cb);elsethis->tcpConnection->Send(data, len, cb);}
底层实际发送
void UdpSocketHandler::Send(const uint8_t* data, size_t len, const struct sockaddr* addr, UdpSocketHandler::onSendCallback* cb)
{MS_TRACE();if (this->closed){if (cb){(*cb)(false);delete cb;}return;}if (len == 0){if (cb){(*cb)(false);delete cb;}return;}// First try uv_udp_try_send(). In case it can not directly send the datagram// then build a uv_req_t and use uv_udp_send().uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);const int sent = uv_udp_try_send(this->uvHandle, &buffer, 1, addr);// Entire datagram was sent. Done.if (sent == static_cast<int>(len)){// Update sent bytes.this->sentBytes += sent;if (cb){(*cb)(true);delete cb;}return;}else if (sent >= 0){MS_WARN_DEV("datagram truncated (just %d of %zu bytes were sent)", sent, len);// Update sent bytes.this->sentBytes += sent;if (cb){(*cb)(false);delete cb;}return;}// Any error but legit EAGAIN. Use uv_udp_send().else if (sent != UV_EAGAIN){MS_WARN_DEV("uv_udp_try_send() failed, trying uv_udp_send(): %s", uv_strerror(sent));}auto* sendData = new UvSendData(len);sendData->req.data = static_cast<void*>(sendData);std::memcpy(sendData->store, data, len);sendData->cb = cb;buffer = uv_buf_init(reinterpret_cast<char*>(sendData->store), len);int err = uv_udp_send(&sendData->req, this->uvHandle, &buffer, 1, addr, static_cast<uv_udp_send_cb>(onSend));if (err != 0){// NOTE: uv_udp_send() returns error if a wrong INET family is given// (IPv6 destination on a IPv4 binded socket), so be ready.MS_WARN_DEV("uv_udp_send() failed: %s", uv_strerror(err));if (cb)(*cb)(false);// Delete the UvSendData struct (it will delete the store and cb too).delete sendData;}else{// Update sent bytes.this->sentBytes += len;}
}