前言
GB/T28181-2022实时流的传输方式介绍:https://blog.csdn.net/www_dong/article/details/134255185
tcp passive收流
流程图
注意:
- m字段指定传输方式为TCP/RTP/AVP;
- sdp信息中增加"a=setup:passive";
- SIP服务器启动端口监听,设备发起tcp连接请求;
设计
- 创建socket、bind、listen,启动数据接收线程;
// TcpServer为封装的socket类int CGBTcpServerStreamReceiver::Start()
{if (m_localIP.empty() || m_localPort <= 0)return -1;if (m_tcpServer.get())return 0;m_tcpServer = std::make_shared<TcpServer>(TcpDataCB, this);if (!m_tcpServer.get())return -1;if (0 != m_tcpServer->TcpCreate() || 0 != m_tcpServer->TcpBind(m_localPort) || 0 != m_tcpServer->TcpListen(5))return -1;m_thread = std::thread(TcpDataThread, this);return 0;
}
- 在线程内等待连接,连接成功后接收数据并回调至应用层处理
void CGBTcpServerStreamReceiver::TcpDataWorker()
{bool bAccept = false;uint8_t payload;while (m_running){if (!bAccept){// 等待设备连接if (0 == m_tcpServer->TcpAccept()){bAccept = true;// 连接成功后,初始化rtp参数if (0 != InitRtpSession_()){break;}}continue;}Poll();BeginDataAccess();// 开始接收数据if (GotoFirstSourceWithData()){do{RTPPacket* packet = nullptr;while (nullptr != (packet = GetNextPacket())){payload = packet->GetPayloadType();if (0 == payload){DeletePacket(packet);continue;}struct rtp_packet_tcp data;data.mark = packet->HasMarker();data.pts = packet->GetTimestamp();data.seq = packet->GetSequenceNumber();data.data = packet->GetPayloadData();data.len = (int)packet->GetPayloadLength();m_payload = payload;if (m_lastSeq < 0){m_lastSeq = data.seq - 1;}if (m_lastSeq = data.seq - 1){PackData_(data.data, data.len);}DeletePacket(packet);}} while (GotoNextSourceWithData());}EndDataAccess();Sleep(30);}Destroy();
}
- 初始化rtp参数
int CGBTcpServerStreamReceiver::InitRtpSession_()
{const int packSize = 45678;RTPSessionParams sessionParams;sessionParams.SetProbationType(RTPSources::NoProbation);sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);sessionParams.SetMaximumPacketSize(packSize + 64);m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);m_rtpTcpTransmitter->Init(true);m_rtpTcpTransmitter->Create(65535, 0);if (0 != Create(sessionParams, m_rtpTcpTransmitter))return -1;if (0 != AddDestination(RTPTCPAddress(m_tcpServer->GetClientSocket())))return -1;return 0;
}
注意:RTP over TCP模式比RTP over UDP模式多了两字节的长度字段。
tcp active收流
流程图
注意:
- m字段指定传输方式为TCP/RTP/AVP;
- sdp信息中增加"a=setup:active";
- 设备返回200 OK,报文的SDP信息中包含tcp监听端口;
- SIP服务器根据设备监听端口发起TCP连接请求;
设计
- 创建socket、connect、初始化rtp,启动数据接收线程
// TcpClient为封装的客户端socket类int CGBTcpClientStreamReceiver::Start(int streamType)
{if (m_localIP.empty() || m_localPort <= 0)return -1;if (m_tcpClient.get())return 0;// 创建socketm_tcpClient = std::make_shared<TcpClient>(TcpDataCB, this);if (!m_tcpClient.get() || 0 != m_tcpClient->TcpCreate())return -1;// connectint ret = m_tcpClient->TcpConnectByTime(m_localIP.c_str(), m_localPort, 5);if (0 != ret)return -1;// 初始化rtp sessionif (0 != InitRtpSession_())return -1;// 启动接收线程m_thread = std::thread(TcpDataThread, this);
}
- 初始化rtp参数
int CGBTcpClientStreamReceiver::InitRtpSession_()
{const int packSize = 45678;RTPSessionParams sessionParams;sessionParams.SetProbationType(RTPSources::NoProbation);sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);sessionParams.SetMaximumPacketSize(packSize + 64);m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);m_rtpTcpTransmitter->Init(true);m_rtpTcpTransmitter->Create(65535, 0);if (0 != Create(sessionParams, m_rtpTcpTransmitter))return -1;// 添加客户端socketif (0 != AddDestination(RTPTCPAddress(m_tcpClient->GetClientSocket())))return -1;return 0;
}
- 在线程内等待连接,连接成功后接收数据并回调至应用层处理
同tcp passive收流流程。
rtp解包工具
基于qt+ireader库实现tcp解包(ps封装+264载荷)
解包流程
- 打开文件,创建解码器
void RtpUnpackDlg::StartRtpUnpack()
{if (m_thread.joinable())return;QString filePath = ui.le_filePath->text();if (filePath.isEmpty()){QMessageBox::critical(this, QString::fromLocal8Bit("错误"), QString::fromLocal8Bit("文件路径为空"), QMessageBox::Ok);return;}m_rtpPayloadParam.payload = 100;m_rtpPayloadParam.encoding = "PS";m_rtpPayloadParam.frtp = fopen(filePath.toStdString().c_str(), "rb");if (!m_rtpPayloadParam.frtp){QMessageBox::critical(this, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("打开输入文件失败"), QMessageBox::Ok);return;}m_rtpPayloadParam.fout = fopen(outFilePath.toStdString().c_str(), "wb");if (!m_rtpPayloadParam.fout){QMessageBox::critical(this, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("打开输出文件失败"), QMessageBox::Ok);return;}struct rtp_payload_t handler;handler.alloc = RtpPacketAlloc;handler.free = RtpPacketFree;handler.packet = RtpDecodePacket;m_rtpPayloadParam.decoder = rtp_payload_decode_create(100, "PS", &handler, this);m_thread = std::thread(DataReadThread, this);
}
- 读数据
void RtpUnpackDlg::RtpDataReadWorker()
{while (m_running){// 先读两个字节的头unsigned char s2[2];if (2 != fread(s2, 1, 2, m_rtpPayloadParam.frtp))break;m_rtpPayloadParam.size = (s2[0] << 8) | s2[1];assert(ctx.size < sizeof(ctx.packet));if (m_rtpPayloadParam.size != (int)fread(m_rtpPayloadParam.packet, 1, m_rtpPayloadParam.size, m_rtpPayloadParam.frtp))break;// 塞数据if (m_rtpPayloadParam.packet[1] < RTCP_FIR || m_rtpPayloadParam.packet[1] > RTCP_LIMIT)rtp_payload_decode_input(m_rtpPayloadParam.decoder, m_rtpPayloadParam.packet, m_rtpPayloadParam.size);}fclose(m_rtpPayloadParam.frtp);fclose(m_rtpPayloadParam.fout);
}
- 解包
int RtpUnpackDlg::DecodePacket(const void* packet, int bytes, uint32_t timestamp, int flags)
{static const unsigned char start_code[4] = { 0, 0, 0, 1 };static unsigned char buffer[2 * 1024 * 1024] = {0, };size_t size = 0;if (0 == strcmp("H264", m_rtpPayloadParam.encoding) || 0 == strcmp("H265", m_rtpPayloadParam.encoding)|| 0 == strcmp("PS", m_rtpPayloadParam.encoding)){memcpy(buffer, start_code, sizeof(start_code));size += sizeof(start_code);}memcpy(buffer + size, packet, bytes);size += bytes;fwrite(buffer, 1, size, m_rtpPayloadParam.fout);// 新增界面播放功能if (m_playWidget)m_playWidget->AddData(CODEC_VIDEO_H264, (void*)buffer, size);return 0;
}
界面示例
参考:https://github.com/ireader中的demo示例