GB28181学习(十七)——基于jrtplib实现tcp被动和主动发流

前言

GB/T28181-2022实时流的传输方式介绍:https://blog.csdn.net/www_dong/article/details/134255185

基于jrtplib实现tcp被动和主动收流介绍:https://blog.csdn.net/www_dong/article/details/134451387

本文主要介绍下级平台或设备发流功能,用于对接特定的SIP服务器或上级平台。

UDP发流

流程图

在这里插入图片描述

发送端流程

  • 初始化rtp参数;
  • 裸流数据做PS复用;
  • 组RTP包发送;

设计

  1. 初始化rtp参数
int CUdp::InitRtp_()
{RTPSessionParams sessionParams;sessionParams.SetMinimumRTCPTransmissionInterval(10);sessionParams.SetOwnTimestampUnit(1.0 / 90000.0);sessionParams.SetAcceptOwnPackets(true);sessionParams.SetMaximumPacketSize(1450);RTPUDPv4TransmissionParams transParams;transParams.SetRTPSendBuffer(2*1024*1024);transParams.SetBindIP(m_ip);transParams.SetPortbase((uint16_t)m_port);if (0 != Create(sessionParams, &transParams)){return -1;}SetDefaultPayloadType((uint8_t)m_payload);SetDefaultTimestampIncrement(3600);SetDefaultMark(true);RTPIPv4Address addr(ntohl(inet_addr(m_ip), (uint16_t)m_port);if(0 != AddDestination(addr)){return -1;}return 0;
}
  1. 流数据复用为PS
// 使用ireader开源库进行ps复用
// 初始化
CData2PS::CData2PS()
{struct ps_muxer_func_t func;func.alloc = Alloc;func.free = Free;func.write = Packet;m_ps = ps_muxer_create(&func, this);// TODO codecid待补充m_ps_stream = ps_muxer_add_stream(m_ps, PSI_STREAM_H264, nullptr, 0);
}// 塞数据
int CData2PS::InputData(void* data, int len)
{if (!m_ps)return -1;uint64_t clock = time64_now();if (0 == m_ps_clock)m_ps_clock = clock;return ps_muxer_input(m_ps, m_ps_stream, 0, (clock - m_ps_clock) * 90, (clock - m_ps_clock) * 90, data, len);
}
  1. 发送rtp包
// 调用jrtplib中SendPacket(data, len);接口发送数据// 以下为SendPacket部分源码
// 主要流程:
// 1. 构建packet
// 2. 发送rtp数据
int RTPSession::SendPacket(const void *data,size_t len,uint8_t pt,bool mark,uint32_t timestampinc)
{int status;if (!created)return ERR_RTP_SESSION_NOTCREATED;BUILDER_LOCKif ((status = packetbuilder.BuildPacket(data,len,pt,mark,timestampinc)) < 0){BUILDER_UNLOCKreturn status;}if ((status = SendRTPData(packetbuilder.GetPacket(),packetbuilder.GetPacketLength())) < 0){BUILDER_UNLOCKreturn status;}BUILDER_UNLOCKSOURCES_LOCKsources.SentRTPPacket();SOURCES_UNLOCKPACKSENT_LOCKsentpackets = true;PACKSENT_UNLOCKreturn 0;
}// 构建包
int RTPPacketBuilder::PrivateBuildPacket(const void *data,size_t len,uint8_t pt,bool mark,uint32_t timestampinc,bool gotextension,uint16_t hdrextID,const void *hdrextdata,size_t numhdrextwords)
{RTPPacket p(pt,data,len,seqnr,timestamp,ssrc,mark,numcsrcs,csrcs,gotextension,hdrextID,(uint16_t)numhdrextwords,hdrextdata,buffer,maxpacksize,GetMemoryManager());int status = p.GetCreationError();if (status < 0)return status;packetlength = p.GetPacketLength();if (numpackets == 0) // first packet{lastwallclocktime = RTPTime::CurrentTime();lastrtptimestamp = timestamp;prevrtptimestamp = timestamp;}else if (timestamp != prevrtptimestamp){lastwallclocktime = RTPTime::CurrentTime();lastrtptimestamp = timestamp;prevrtptimestamp = timestamp;}numpayloadbytes += (uint32_t)p.GetPayloadLength();numpackets++;timestamp += timestampinc;seqnr++;return 0;
}// 发送包
int RTPSession::SendRTPData(const void *data, size_t len)
{if (!m_changeOutgoingData)return rtptrans->SendRTPData(data, len);void *pSendData = 0;size_t sendLen = 0;int status = 0;status = OnChangeRTPOrRTCPData(data, len, true, &pSendData, &sendLen);if (status < 0)return status;if (pSendData){status = rtptrans->SendRTPData(pSendData, sendLen);OnSentRTPOrRTCPData(pSendData, sendLen, true);}return status;
}// 底层实现
int RTPUDPv4Transmitter::SendRTPData(const void *data,size_t len)	
{if (!init)return ERR_RTP_UDPV4TRANS_NOTINIT;MAINMUTEX_LOCKif (!created){MAINMUTEX_UNLOCKreturn ERR_RTP_UDPV4TRANS_NOTCREATED;}if (len > maxpacksize){MAINMUTEX_UNLOCKreturn ERR_RTP_UDPV4TRANS_SPECIFIEDSIZETOOBIG;}destinations.GotoFirstElement();while (destinations.HasCurrentElement()){// 调用sendto函数实现udp包的发送sendto(rtpsock,(const char *)data,len,0,(const struct sockaddr *)destinations.GetCurrentElement().GetRTPSockAddr(),sizeof(struct sockaddr_in));destinations.GotoNextElement();}MAINMUTEX_UNLOCKreturn 0;
}

tcp passive发流

流程图

在这里插入图片描述

发送端流程:

  • 上级平台或sip服务器以主动方式连接,对于下级平台或者设备(数据发送端)为被动方式;
  • 下级平台或者设备(数据发送端)启动端口监听;
  • 接收上级平台或sip服务器tcp连接请求;
  • 向上级平台或sip服务器发送流数据;

设计

  1. 创建socket、bind、listen,启动数据接收线程;
// TcpServer为封装的socket类int CGBTcpServer::Start()
{if (0 != m_localPort || m_tcpServer.get())return 0;int ret = -1;do {m_tcpServer = std::make_shared<TcpServer>(nullptr, this);if (!m_tcpServer.get())break;ret = m_tcpServer->TcpCreate();if (0 != ret)break;ret = m_tcpServer->TcpBind(m_localPort);if (0 != ret)break;ret = m_tcpServer->TcpListen(5);if (0 != ret)break;m_thread = std::thread(TCPData2PSThread, this);return 0;} while (0);Stop();return ret;
}
  1. 在线程内等待连接,连接成功后接收数据并回调至应用层处理
void CGBTcpServer::TCPData2PSWorker()
{if (!m_pspacker)m_pspacker = new(std::nothrow) CData2PS(PSTCPDataCB, this);bool bAccept = false;while (m_running){if (!bAccept){if (0 == m_tcpServer->TcpAccept()){bAccept = true;if (0 != InitRtp_()){break;}}continue;}std::this_thread::sleep_for(std::chrono::seconds(1));}
}
  1. 初始化rtp参数
int CGBTcpServer::InitRtp_()
{const int packetSize = 45678;RTPSessionParams sessionparams;sessionparams.SetProbationType(RTPSources::NoProbation);sessionparams.SetOwnTimestampUnit(1.0 / packetSize);sessionparams.SetMaximumPacketSize(packetSize + 64);m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);m_rtpTcpTransmitter->Init(true);m_rtpTcpTransmitter->Create(65535, 0);int status = Create(sessionparams, m_rtpTcpTransmitter);if (status < 0){return status;}status = AddDestination(RTPTCPAddress(m_tcpServer->GetClientSocket()));if (0 != status)return status;SetDefaultPayloadType(96);SetDefaultMark(false);SetDefaultTimestampIncrement(160);return 0;
}
  1. 将数据复用为PS;
  2. tcp方式发包
// 调用jrtplib中SendPacket(data, len);接口发送数据// 以下为tcp方式SendPacket部分源码
int RTPTCPTransmitter::SendRTPData(const void *data,size_t len)	
{return SendRTPRTCPData(data, len);
}int RTPTCPTransmitter::SendRTPRTCPData(const void *data, size_t len)
{if (!m_init)return ERR_RTP_TCPTRANS_NOTINIT;MAINMUTEX_LOCKif (!m_created){MAINMUTEX_UNLOCKreturn ERR_RTP_TCPTRANS_NOTCREATED;}// #define RTPTCPTRANS_MAXPACKSIZE							65535if (len > RTPTCPTRANS_MAXPACKSIZE){MAINMUTEX_UNLOCKreturn ERR_RTP_TCPTRANS_SPECIFIEDSIZETOOBIG;}std::map<SocketType, SocketData>::iterator it = m_destSockets.begin();std::map<SocketType, SocketData>::iterator end = m_destSockets.end();vector<SocketType> errSockets;int flags = 0;
#ifdef RTP_HAVE_MSG_NOSIGNALflags = MSG_NOSIGNAL;
#endif // RTP_HAVE_MSG_NOSIGNALwhile (it != end){uint8_t lengthBytes[2] = { (uint8_t)((len >> 8)&0xff), (uint8_t)(len&0xff) };SocketType sock = it->first;// 调用send接口发送数据// 1. 先发送2字节头(固定格式)// 2. 再发送数据if (send(sock,(const char *)lengthBytes,2,flags) < 0 ||send(sock,(const char *)data,len,flags) < 0)errSockets.push_back(sock);++it;}MAINMUTEX_UNLOCKif (errSockets.size() != 0){for (size_t i = 0 ; i < errSockets.size() ; i++)OnSendError(errSockets[i]);}// Don't return an error code to avoid the poll thread exiting// due to one closed connection for examplereturn 0;
}

tcp active发流

流程图

在这里插入图片描述

发送端流程:

  • 上级平台或sip服务器启动tcp监听连接,对于下级平台或者设备(数据发送端)为主动方式;
  • 下级平台或者设备(数据发送端)发起tcp连接;
  • 接收上级平台或sip服务器tcp响应;
  • 向上级平台或sip服务器发送流数据;

设计

  1. 创建socket、connect、初始化rtp,启动数据接收线程
// TcpClient为封装的客户端socket类int CGBTcpClient::Start()
{if (0 != m_localPort || m_tcpClient.get())return 0;int ret = -1;do{m_tcpClient = std::make_shared<TcpClient>(nullptr, this);if (!m_tcpClient.get() || 0 != m_tcpClient->TcpCreate())break;ret = m_tcpClient->TcpConnectByTime(m_localIP.c_str(), m_localPort, 5);if (0 != ret)break;ret = InitRtp_();if (0 != ret)break;m_thread = std::thread(RTPPackerThread, this);return 0;} while (0);Stop();return ret;
}
  1. 初始化rtp参数
int CGBTcpClient::InitRtp_()
{const int packSize = 45678;RTPSessionParams sessionParams;sessionParams.SetProbationType(RTPSources::NoProbation);sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);sessionParams.SetMaximumPacketSize(packSize + 64);m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);m_rtpTcpTransmitter->Init(true);m_rtpTcpTransmitter->Create(65535, 0);if (0 != Create(sessionParams, m_rtpTcpTransmitter))return -1;if (0 != AddDestination(RTPTCPAddress(m_tcpClient->GetClientSocket())))return -1;return 0;
}
  1. 视音频数据复用为PS
  2. 发送数据,同tcp passive发流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/160646.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生活如果真能像队列一样的话

生活如果真能像队列一样&#xff0c;那该多好啊。 —————————————————————————————————————————— 背包&#xff0c;队列 可以先看他们的API&#xff1a;都含有一个无参构造函数&#xff0c;添加单个元素的方法&#xff0c;测试集合…

php项目从宝塔面板切换转到phpstudy小皮面板

宝塔面板转phpstudy面板 版本 宝塔面板8.0.1 phpstudy面板8.1.1.3 步骤 1、宝塔面板,找到项目文件夹,打包、下载到本地、解压 2、本地windows系统安装phpstudy面板,选择尽可能一样的配置 比如宝塔php7.4.33,可能phpstudy面板只有php7.4.3,也行 大环境一定要一致,比如…

力扣算法练习BM46—最小的K个数

题目 给定一个长度为 n 的可能有重复值的数组&#xff0c;找出其中不去重的最小的 k 个数。例如数组元素是4,5,1,6,2,7,3,8这8个数字&#xff0c;则最小的4个数字是1,2,3,4(任意顺序皆可)。 数据范围&#xff1a;0≤k,n≤10000&#xff0c;数组中每个数的大小0≤val≤1000 要…

linux signal 机制

ref&#xff1a; Linux操作系统学习笔记&#xff08;十六&#xff09;进程间通信之信号 | Ty-Chens Home https://www.cnblogs.com/renxinyuan/p/3867593.html 当执行kill -9 PID时系统发生了什么 -

Codeforces Round 910 (Div. 2) D. Absolute Beauty

D. Absolute Beauty 有两个长度为 n n n 的整数数组 a 1 , a 2 , … , a n a_1,a_2,\ldots,a_n a1​,a2​,…,an​ 和 b 1 , b 2 , … , b n b_1,b_2,\ldots,b_n b1​,b2​,…,bn​ 。他将数组 b b b 的美丽值定义为 ∑ i 1 n ∣ a i − b i ∣ . \sum_{i1}^{n} |a_i - b…

基于材料生成算法优化概率神经网络PNN的分类预测 - 附代码

基于材料生成算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于材料生成算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于材料生成优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神…

JDK命令使用总结

目录 javacjava javac 将源码(*.java)编译成字节码(*.class) javac HelloWorld.javajava 运行字节码(*.class) 不能加后缀名 java HelloWorld直接运行单文件源码(*.java) Java11以上才支持 java HelloWorld.java

ROSNS3(一)

https://github.com/malintha/rosns3 第一步&#xff1a;clone和构建rosns3客户端 第二步&#xff1a;运行 最详细的ubuntu 安装 docker教程 - 知乎 1. unable to find source space /home/muta/src 解决方法&#xff1a; 将副将将碰到的bug&#xff0c;解决方法_#include &…

【C++ Primer Plus学习记录】递增运算符(++)和递减运算符(--)

递增运算符&#xff08;&#xff09;和递减运算符&#xff08;--&#xff09;&#xff1a;前缀版本位于操作数前面&#xff0c;如x&#xff1b;后缀版本位于操作数后面&#xff0c;如x。两个版本对操作数的影响是一样的&#xff0c;但是影响的时间不同。这就像吃饭前买单和吃饭…

Python从零开始快速搭建一个语音对话机器人

文章目录 01-初心缘由02-准备工作03-语音机器人的搭建思路04-语音生成音频文件05-音频文件转文字STT06-与图灵机器人对话07-文字转语音08-语音对话机器人的完整代码09-结束语10-有问必答关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学…

SSH连接远程服务器报错:WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED 解决方法

一.错误描述 报错信息里提示了路径信息/root/.ssh/known_hosts:20 二.解决方案 方法一 输入以下指令&#xff1a; ssh-keygen -R XXX&#xff08;需要连接远程服务器的ip&#xff09; 按照我的例子ip:10.165.7.136&#xff0c;会返回以下信息: 重新尝试连接&#xff1a; 输…

C++学习 --set

目录 1&#xff0c; 什么是set 2&#xff0c; 创建set 2-1&#xff0c; 标准数据类型 2-2&#xff0c; 自定义数据类型 2-3&#xff0c; 其他创建方式 3&#xff0c; 操作set 3-1&#xff0c; 赋值 3-2&#xff0c; 添加元素&#xff08;insert&#xff09; 3-2-1&…

MySQL的乐观锁和悲观锁

1、乐观锁&#xff1a; 乐观锁在操作数据的时候&#xff0c;是保持一种乐观的状态&#xff0c;认为别的线程是不会同时修改数据的&#xff0c;所以是不会上锁的&#xff0c;但是在更新的时候&#xff0c;会判断一下在这个期间内是否有别的线程修改过数据。 主要的流程&#x…

规划类3d全景线上云展馆帮助企业轻松拓展海外市场

科技3D线上云展馆作为一种基于VR虚拟现实和互联网技术的新一代展览平台。可以在线上虚拟空间中模拟真实的展馆&#xff0c;让观众无需亲自到场&#xff0c;即可获得沉浸式的参观体验。通过这个展馆&#xff0c;您可以充分、全面、立体展示您的产品、服务以及各种创意作品&#…

python运算符重载之成员关系和属性运算

1 python运算符重载之成员关系和属性运算 1.1 重载成员关系运算符 1.1.1 contains,iter,getitem python使用成员关系运算符in时&#xff0c; 按优先级调用方法&#xff1a;contains>iter>getitem&#xff1b; class MyIters:def __init__(self,value):self.datavalu…

2023年【安全生产监管人员】考试题及安全生产监管人员找解析

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 安全生产监管人员考试题参考答案及安全生产监管人员考试试题解析是安全生产模拟考试一点通题库老师及安全生产监管人员操作证已考过的学员汇总&#xff0c;相对有效帮助安全生产监管人员找解析学员顺利通过考试。 1、…

【树莓派】Camera Module 使用

工具 RPI4RPI Camera Module 2 硬件安装 直接插到板子的相机带子插口上即可 使用前提 libcamera-hello运行这个命令能够成功&#xff0c;否则需要装相应的包 在 RPI4 上测试 libcamera-jpeg -o 00001.jpg -t 2000 --width 640 --height 480t 表示程序运行&#xff08;预…

SA8255 Q+A android 登录QNX

需要工具busybox 130|gen4_gvm:/ # cd /data/ gen4_gvm:/data # ./busybox telnet 192.168.1.1Entering character mode Escape character is ^].QNX Neutrino (localhost) (ttyp0)login: root No home directory. Logging in with home "/". # 用户名&#xff1a…

数据结构-栈的实现

1.栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈顶&#xff0c;另一端称为栈底。栈中的数据元素遵守后进先出LIFO&#xff08;Last In First Out&#xff09;的原则。 压栈&…