Javacv-利用Netty实现推流直播复用(flv)

前言

上一篇文章《JavaCV之rtmp推流(FLV和M3U8)》介绍了javacv的基本使用,今天来讲讲如何实现推流复用。
以监控摄像头的直播为例,通常分为三步:

  1. 从设备获取音视频流
  2. 利用javacv进行解码(例如flv或m3u8)
  3. 将视频解码后数据推送到前端页面播放

推流直播复用,是指假如该设备某一个channel已经在解码直播了,其他channel只需要直接拿该设备解码后的视频帧数据进行播放即可,而无需重复上面三步。实现一次解码,多客户端播放。

什么是channel?

在Netty中,每个Channel实例代表一个与远程对等方的通信链接。在网络编程中,一个Channel通常对应于一个网络连接,可以是客户端到服务器的连接,也可以是服务器接受的客户端连接。

上述大概的推流复用流程如下图所示:

在这里插入图片描述

代码实例

MediaServer

负责创建Netty服务器。关键的步骤包括创建EventLoopGroup、配置ServerBootstrap、指定服务器的Channel类型为NioServerSocketChannel、设置服务器的处理器等。

这个服务器的实际处理逻辑是在LiveHandler类中实现的,这是一个自定义的ChannelHandler,它继承自SimpleChannelInboundHandler。在实际应用中,可以根据业务需求实现自己的ChannelHandler来处理接收到的消息。
这里维护了一个deviceContext设备容器,存放各个设备的TransferToFlv实例。

@Slf4j
@Component
public class MediaServer implements CommandLineRunner {@Autowiredprivate LiveHandler liveHandler;public static ConcurrentHashMap<String, TransferToFlv> deviceContext = new ConcurrentHashMap<>();public final static String  YOUR_VIDEO_PATH = "D:\灌篮高手.mp4";public final static int PORT = 8234;public void start() {InetSocketAddress socketAddress = new InetSocketAddress("0.0.0.0", PORT);//主线程组EventLoopGroup bossGroup = new NioEventLoopGroup(1);//工作线程组EventLoopGroup workGroup = new NioEventLoopGroup(200);ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) {CorsConfig corsConfig = CorsConfigBuilder.forAnyOrigin().allowNullOrigin().allowCredentials().build();socketChannel.pipeline().addLast(new HttpResponseEncoder()).addLast(new HttpRequestDecoder()).addLast(new ChunkedWriteHandler()).addLast(new HttpObjectAggregator(64 * 1024)).addLast(new CorsHandler(corsConfig)).addLast(liveHandler);}}).localAddress(socketAddress).option(ChannelOption.SO_BACKLOG, 128)//选择直接内存.option(ChannelOption.ALLOCATOR, PreferredDirectByteBufAllocator.DEFAULT).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_RCVBUF, 128 * 1024).childOption(ChannelOption.SO_SNDBUF, 1024 * 1024).childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024 / 2, 1024 * 1024));//绑定端口,开始接收进来的连接try {ChannelFuture future = bootstrap.bind(socketAddress).sync();future.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {//关闭主线程组bossGroup.shutdownGracefully();//关闭工作线程组workGroup.shutdownGracefully();}}@Overridepublic void run(String... args) {this.start();}
}

LiveHandler

继承于SimpleChannelInboundHandler,它是Netty中的一个特殊类型的Channel处理器,用于处理从通道中读取的数据,提供了一个简化的channelRead0方法,用于处理接收到的消息,而不必担心消息的释放。
这里实现的是判断请求地址是否为/live,并且获取地址中的deviceId,并将channel加入到设备的httpClients

@Service
@ChannelHandler.Sharable
public class LiveHandler extends SimpleChannelInboundHandler<Object> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) {FullHttpRequest req = (FullHttpRequest) msg;QueryStringDecoder decoder = new QueryStringDecoder(req.uri());// 判断请求uriif (!"/live".equals(decoder.path())) {sendError(ctx, HttpResponseStatus.BAD_REQUEST);return;}QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());List<String> parameters = queryStringDecoder.parameters().get("deviceId");if(parameters == null || parameters.isEmpty()){sendError(ctx, HttpResponseStatus.BAD_REQUEST);return;}String deviceId = parameters.get(0);sendFlvResHeader(ctx);Device device = new Device(deviceId, MediaServer.YOUR_VIDEO_PATH);playForHttp(device, ctx);}public void playForHttp(Device device, ChannelHandlerContext ctx) {try {TransferToFlv mediaConvert = new TransferToFlv();if (MediaServer.deviceContext.containsKey(device.getDeviceId())) {mediaConvert = MediaServer.deviceContext.get(device.getDeviceId());mediaConvert.getMediaChannel().addChannel(ctx, true);return;}mediaConvert.setCurrentDevice(device);MediaChannel mediaChannel = new MediaChannel(device);mediaConvert.setMediaChannel(mediaChannel);MediaServer.deviceContext.put(device.getDeviceId(), mediaConvert);//注册事件mediaChannel.getEventBus().register(mediaConvert);new Thread(mediaConvert).start();mediaConvert.getMediaChannel().addChannel(ctx, false);} catch (InterruptedException | FFmpegFrameRecorder.Exception e) {throw new RuntimeException(e);}}/*** 错误请求响应** @param ctx* @param status*/private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,Unpooled.copiedBuffer("请求地址有误: " + status + "\r\n", CharsetUtil.UTF_8));response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}/*** 发送req header,告知浏览器是flv格式** @param ctx*/private void sendFlvResHeader(ChannelHandlerContext ctx) {HttpResponse rsp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);rsp.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE).set(HttpHeaderNames.CONTENT_TYPE, "video/x-flv").set(HttpHeaderNames.ACCEPT_RANGES, "bytes").set(HttpHeaderNames.PRAGMA, "no-cache").set(HttpHeaderNames.CACHE_CONTROL, "no-cache").set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED).set(HttpHeaderNames.SERVER, "测试");ctx.writeAndFlush(rsp);}
}

MediaChannel

主要负责每个设备的channel添加、关闭,以及向channel发送数据。利用newScheduledThreadPool进行周期性检查channel的在线情况,如果全部channel下线,则使用事件总线eventBus通知关闭解码推流。

@Data
@AllArgsConstructor
public class MediaChannel {private Device currentDevice;public ConcurrentHashMap<String, ChannelHandlerContext> httpClients;private ScheduledFuture<?> checkFuture;private final ScheduledExecutorService scheduler;protected EventBus eventBus;public MediaChannel(Device currentDevice) {this.currentDevice = currentDevice;this.httpClients = new ConcurrentHashMap<>();this.scheduler = Executors.newScheduledThreadPool(1);this.eventBus = new EventBus();}public void addChannel(ChannelHandlerContext ctx, boolean needSendFlvHeader) throws InterruptedException, FFmpegFrameRecorder.Exception {if (ctx.channel().isWritable()) {ChannelFuture channelFuture = null;if (needSendFlvHeader) {//如果当前设备正在有channel播放,则先发送flvheader,再发送视频数据。byte[] flvHeader = MediaServer.deviceContext.get(currentDevice.getDeviceId()).getFlvHeader();channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer(flvHeader));} else {channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer(new ByteArrayOutputStream().toByteArray()));}channelFuture.addListener(future -> {if (future.isSuccess()) {httpClients.put(ctx.channel().id().toString(), ctx);}});this.checkFuture = scheduler.scheduleAtFixedRate(this::checkChannel, 0, 10, TimeUnit.SECONDS);System.out.println(currentDevice.getDeviceId() + ":channel:" + ctx.channel().id() + "创建成功");}Thread.sleep(50);}/*** 检查是否存在channel*/private void checkChannel() {if (httpClients.isEmpty()) {System.out.println("通知关闭推流");eventBus.post(this.currentDevice);this.checkFuture = null;scheduler.shutdown();}}/*** 关闭通道*/public void closeChannel() {for (Map.Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {entry.getValue().close();}}/*** 发送数据** @param data*/public void sendData(byte[] data) {for (Map.Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {if (entry.getValue().channel().isWritable()) {entry.getValue().writeAndFlush(Unpooled.copiedBuffer(data));} else {httpClients.remove(entry.getKey());System.out.println(currentDevice.getDeviceId() + ":channel:" + entry.getKey() + "已被去除");}}}}

TransferToFlv

流的解码、推送部分就是在这个类里面,使用的是javacv封装的ffmpeg库,将音视频流转换为flv格式。实际的参数可以根据业务调整。
这里增加了一个获取flv格式header数据方法,因为flv格式视频必须要包含flv header才能播放。复用推流数据的时候,先向前端发送flv格式header,再发送流数据。

@Slf4j
@Data
public class TransferToFlv implements Runnable {private volatile boolean running = false;private FFmpegFrameGrabber grabber;private FFmpegFrameRecorder recorder;public ByteArrayOutputStream bos = new ByteArrayOutputStream();private Device currentDevice;private MediaChannel mediaChannel;public ConcurrentHashMap<String, ChannelHandlerContext> httpClients = new ConcurrentHashMap<>();/*** 创建拉流器** @return*/protected void createGrabber(String url) throws FFmpegFrameGrabber.Exception {grabber = new FFmpegFrameGrabber(url);//拉流超时时间(10秒)grabber.setOption("stimeout", "10000000");grabber.setOption("threads", "1");grabber.setPixelFormat(avutil.AV_PIX_FMT_YUV420P);// 设置缓存大小,提高画质、减少卡顿花屏grabber.setOption("buffer_size", "1024000");// 读写超时,适用于所有协议的通用读写超时grabber.setOption("rw_timeout", "15000000");// 探测视频流信息,为空默认5000000微秒// grabber.setOption("probesize", "5000000");// 解析视频流信息,为空默认5000000微秒//grabber.setOption("analyzeduration", "5000000");grabber.start();}/*** 创建录制器** @return*/protected void createTransterOrRecodeRecorder() throws FFmpegFrameRecorder.Exception {recorder = new FFmpegFrameRecorder(bos, grabber.getImageWidth(), grabber.getImageHeight(),grabber.getAudioChannels());setRecorderParams(recorder);recorder.start();}/*** 设置录制器参数** @param fFmpegFrameRecorder*/private void setRecorderParams(FFmpegFrameRecorder fFmpegFrameRecorder) {fFmpegFrameRecorder.setFormat("flv");// 转码fFmpegFrameRecorder.setInterleaved(false);fFmpegFrameRecorder.setVideoOption("tune", "zerolatency");fFmpegFrameRecorder.setVideoOption("preset", "ultrafast");fFmpegFrameRecorder.setVideoOption("crf", "23");fFmpegFrameRecorder.setVideoOption("threads", "1");fFmpegFrameRecorder.setFrameRate(25);// 设置帧率fFmpegFrameRecorder.setGopSize(25);// 设置gop,与帧率相同//recorder.setVideoBitrate(500 * 1000);// 码率500kb/sfFmpegFrameRecorder.setVideoCodec(avcodec.AV_CODEC_ID_H264);fFmpegFrameRecorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P);fFmpegFrameRecorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC);fFmpegFrameRecorder.setOption("keyint_min", "25");  //gop最小间隔fFmpegFrameRecorder.setTrellis(1);fFmpegFrameRecorder.setMaxDelay(0);// 设置延迟}/*** 获取flv格式header数据** @return* @throws FFmpegFrameRecorder.Exception*/public byte[] getFlvHeader() throws FFmpegFrameRecorder.Exception {ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();FFmpegFrameRecorder fFmpegFrameRecorder = new FFmpegFrameRecorder(byteArrayOutputStream, grabber.getImageWidth(), grabber.getImageHeight(),grabber.getAudioChannels());setRecorderParams(fFmpegFrameRecorder);fFmpegFrameRecorder.start();return byteArrayOutputStream.toByteArray();}/*** 将视频源转换为flv*/protected void transferToFlv() {//创建拉流器try {createGrabber(currentDevice.getRtmpUrl());//创建录制器createTransterOrRecodeRecorder();grabber.flush();running = true;// 时间戳计算long startTime = 0;long lastTime = System.currentTimeMillis();while (running) {// 转码Frame frame = grabber.grab();if (frame != null && frame.image != null) {lastTime = System.currentTimeMillis();recorder.setTimestamp((1000 * (System.currentTimeMillis() - startTime)));recorder.record(frame);if (bos.size() > 0) {byte[] b = bos.toByteArray();bos.reset();sendFrameData(b);continue;}}//10秒内读不到视频帧,则关闭连接if ((System.currentTimeMillis() / 1000 - lastTime / 1000) > 10) {System.out.println(currentDevice.getDeviceId() + ":10秒内读不到视频帧");break;}}} catch (FFmpegFrameRecorder.Exception | FrameGrabber.Exception e) {throw new RuntimeException(e);} finally {try {recorder.close();grabber.close();bos.close();closeMedia();} catch (IOException e) {throw new RuntimeException(e);}}}/*** 发送帧数据** @param data*/private void sendFrameData(byte[] data) {mediaChannel.sendData(data);}/*** 关闭流媒体*/private void closeMedia() {running = false;MediaServer.deviceContext.remove(currentDevice.getDeviceId());mediaChannel.closeChannel();}/*** 通知关闭推流** @param device*/@Subscribepublic void checkChannel(Device device) {if (device.getDeviceId().equals(currentDevice.getDeviceId())) {closeMedia();System.out.println("关闭推流完成");}}@Overridepublic void run() {transferToFlv();}}

演示

前端就简单用flv.js进行演示,首次进行设备1和设备2播放,都需要进行解码推流,当设备1建立一个新channel(第三个视频画面)进行播放时,只需拿前面的第一个channel数据即可,无需进行再次进行解码。

image.png
可以看出,第三个视频播放的时候,跟第一个视频画面进度是同步的。

结束

附上代码地址: https://gitee.com/zhouxiaoben/keep-learning.git
这次分享就到这,大家有什么好的优化建议可以放在评论区。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/578654.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

随笔记录.1

1.find高级用法 find . -name *.te | xargs grep -i "zygote_tmpfs" find . -name *.te | xargs grep -i "_app_tmpfs" 2.audit2allow -p policy < avc_log.txt 3.fastboot oem set_selinux na 4.adb disable-verity 5.adb shell am broadcast -a …

初级游戏客户端社招面试问题总结

目录 c c# lua Unity NGUI AssetBundles资源管理 Unity性能优化 图形学 网络 场景题 计组&操作系统 其他知识 算法题 c 虚函数的原理 智能指针的原理 如何解决循环引用 智能指针的源码 c&#xff0c;使用char实现自定义的一个string 可以通过new去申请一…

如何实现准时的setTimeout

背景 setTimeout 是不准的。因为 setTimeout 是一个宏任务&#xff0c;它的指定时间指的是&#xff1a;进入主线程的时间。 setTimeout(callback, 进入主线程的时间)所以什么时候可以执行 callback&#xff0c;需要看 主线程前面还有多少任务待执行。 由此&#xff0c;才有了…

MySQL8安装教程

MySQL安装教程 安装环境 Windows 10 软件下载 1、官网下载 官网可以下载最新版本的MySQL 8.0 下载地址&#xff1a; https://dev.mysql.com/downloads/windows/installer/8.0.html 开始安装 1、下载完成后&#xff0c;我们就开始安装&#xff0c;双击安装包&#xff0c…

李宏毅机器学习2023|图像生成模型

文章目录 图像生成Autoregressive&#xff08;各个击破&#xff09;Non-Autoregressive&#xff08;一次到位&#xff09;一次到位法额外的输入——从一个高维的Normal Distribution作simple得到一个向量常用的图片生成模型VAEFlow-based Generative ModelDiffusion ModelGAN D…

计量校准方案分享No.11——定碳定硫分析仪校准方案

[测量单元:红外碳硫分析仪,是否使用220V交流电源:是,碳测量范围:0.005%-4.3%,硫测量范 围:0.0005%-0.33%] 一 依据文件 CNAS CL01-G002-2021 《测量结果的计量溯源性要求》现行有效 RB/T 034-2020 《测量设备校准周期的确定和调整方法指南》现行有效 CNAS TRL-004-2017 《 …

跨境电商独立站的6大模式,任你选择!

在几年前搭建跨境电商独立站和第三方平台基本上是同步发展起来的&#xff0c;但在后期的发展过程中&#xff0c;独立站经过不同时期的革新&#xff0c;形成了自己的模式。 当你准备好创建独立站的时候&#xff0c;首先你需要了解的就是独立站运营的模式类型&#xff0c;并找到最…

【Qt-Edit】

Qt编程指南 ■ QTextEdit■ QLineEdit■ QLineEdit 设置正则表达式 ■ QPlainTextEdit■ QKeySequenceEdit■ QList<QLineEdit *> edits■■ ■ QTextEdit /* 实例和对象&#xff0c;设置位置和显示大小 */ textEdit new QTextEdit(this); textEdit->setGeometry(0…

每日一题(LeetCode)----栈和队列-- 简化路径

每日一题(LeetCode)----栈和队列-- 简化路径 1.题目&#xff08;71. 简化路径&#xff09; 给你一个字符串 path &#xff0c;表示指向某一文件或目录的 Unix 风格 绝对路径 &#xff08;以 / 开头&#xff09;&#xff0c;请你将其转化为更加简洁的规范路径。 在 Unix 风格的…

ISO27001认证主要的审核方向

ISO27001审核主要针对组织的信息安全管理体系&#xff08;ISMS&#xff09;进行全面的审查&#xff0c;以确保其符合ISO/IEC 27001标准的要求。审核过程通常包括以下几个方面&#xff1a; 1. 组织环境&#xff1a;审核组织的信息安全管理体系是否能够在组织内部环境以及与外部供…

熟悉DHCP面临的安全威胁与防护机制

一个网络如果要正常地运行&#xff0c;则网络中的主机&#xff08;Host&#xff09;必需要知道某些重要的网络参数&#xff0c;如IP地址、网络掩码、网关地址、DNS服务器地址、网络打印机地址等等。显然&#xff0c;在每台主机上都采用手工方式来配置这些参数是非常困难的、或是…

Springboot学习

Springboot扩展点之InitializingBean-CSDN博客

C/S医院检验LIS系统源码

一、检验科LIS系统概述&#xff1a; LIS系统即实验室信息管理系统。LIS系统能实现临床检验信息化&#xff0c;检验科信息管理自动化。其主要功能是将检验科的实验仪器传出的检验数据经数据分析后&#xff0c;自动生成打印报告&#xff0c;通过网络存储在数据库中&#xff…

《微信小程序开发从入门到实战》学习六十三

6.4 交互API 使用交互API可以在小程序中显示各种弹窗或动画&#xff0c;达到交互反馈的目的。 6.4.1 提示框API 使用wx.showToast接口可显示消息提示框。接口接受Object参。属性如下&#xff1a; title&#xff08;必填&#xff09; 提示的内容 icon …

postgres数据库安装

选择所需数据库版本进行下载 下载地址&#xff1a;PostgreSQL: File Browser 我是以/data当作主目录&#xff0c;所以在/data下创建俩文件夹&#xff0c;默认目录应该是usr mkdir software mkdir module 进入目录&#xff0c;上传下载的gz安装包 cd software rz 解压缩,版本…

6-2 递归求阶乘和

本题要求实现一个计算非负整数阶乘的简单函数&#xff0c;并利用该函数求 1!2!3!...n! 的值。 函数接口定义&#xff1a; double fact( int n ); double factsum( int n ); 函数fact应返回n的阶乘&#xff0c;建议用递归实现。函数factsum应返回 1!2!...n! 的值。题目保证输…

什么是数据资产化?数据怎样成为资产?怎样进入资产负债表?

财政部发布的《企业数据资源相关会计处理暂行规定》将从2024年1月1日起开始实施&#xff0c;为企业数据资源入表提供了基本指引&#xff0c;数据资产化有望迎来爆发期。什么是数据资产化&#xff0c;怎样让数据成为资产&#xff0c;成为了众多国有企业、上市公司关心的问题。 —…

JavaScript 中的双等号(==)和三等号(===)有何不同?何时使用它们?

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;JavaScript篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来JavaScript篇专栏内容:JavaScript-等号区别 目录 和 区别&#xff0c;分别在什么情况使用 一、等于操作符…

交换机端口镜像技术原理与配置

在网络维护的过程中会遇到需要对报文进行获取和分析的情况&#xff0c;比如怀疑有攻击报文&#xff0c;此时需要在不影响报文转发的情况下&#xff0c;对报文进行获取和分析。镜像技术可以在不影响报文正常处理流程的情况下&#xff0c;将镜像端口的报文复制一份到观察端口&…