算法项目实时推流

1、搭建流媒体服务器

下载mediamtx

 2、视频流直推

ffmpeg -stream_loop -1 -i DJI_20250109112715_0002_W.MP4 -r 30 -c:v libx264 -preset ultrafast -f flv rtmp://192.168.100.20:1935/live/test_chengdu1

3、硬件加速

如果硬件支持,可以使用硬件加速编码器(如 h264_nvench264_vaapi 或 h264_qsv),以减少 CPU 负载。

NVIDIA GPU:-c:v h264_nvenc

Intel GPU:-c:v h264_qsv

AMD GPU:-c:v h264_amf

 4、比特率控制

-b:v:设置视频比特率。过高的比特率可能导致网络拥塞,过低的比特率可能导致画质下降。根据网络带宽合理设置。

-maxrate 和 -bufsize:限制最大比特率和缓冲区大小,避免网络波动导致卡顿。

5、 网络缓冲

在FFmpeg命令中增加网络缓冲参数,以应对网络不稳定。

-re

这个参数会让FFmpeg以原始速率读取输入,而不是实时编码,从而给网络传输留下更多的缓冲时间。

比如:ffmpeg -i http://192.168.1.100:8080/video -c:v libx264 -s 1280x720 -b:v 2000k -r 15 -bufsize 4000k -crf 18 -hwaccel nvenc -re output.ffm

6、ffmpeg直推视频

ffmpeg -re -stream_loop -1 -i DJI_0180.MP4 -r 30 -c:v libx264 -f flv rtmp://192.168.100.20:1935/live/test_chengdu1

 6、使用ffmpeg直接读取视频

frame_queue = queue.Queue(maxsize=1)
def read_video2(self, video_path, frame_queue):ffmpegReadCommand = ['ffmpeg','-i', video_path,  # 输入流'-f', 'image2pipe',  # 输出格式为图像管道'-pix_fmt', 'bgr24',  # 像素格式为 BGR24'-vcodec', 'rawvideo',  # 视频编码为原始视频'-']ffmpegReadProcess = subprocess.Popen(ffmpegReadCommand, stdout=subprocess.PIPE)       width = 1920  # 根据实际分辨率调整height = 1080  # 根据实际分辨率调整try:while self.is_switchOn:t5 = time.time()raw_frame = ffmpegReadProcess.stdout.read(width * height * 3)if not raw_frame:print("无法读取帧")breakframe = np.frombuffer(raw_frame, dtype='uint8').reshape((height, width, 3))t6 = time.time()print("t6 - t5 = ", t6 - t5)try:frame_queue.put_nowait(frame)except queue.Full:# 如果队列满了,丢弃旧帧以防止卡顿frame_queue.get_nowait()frame_queue.put_nowait(frame)except KeyboardInterrupt:print("视频流中断")"""# 清空队列(可选,如果只想处理最新帧)if not frame_queue.empty():frame_queue.get(block=True, timeout=50000)# 将最新帧放入队列frame_queue.put(frame)"""finally:# 释放资源ffmpegReadProcess.terminate()print("视频流结束")

7、最终代码

import cv2
import time
# import os
import numpy as np
import copy
import queue
import threading
from ultralytics import YOLO
import subprocess# 创建一个队列,用于存储视频帧# os.environ['OPENCV_FFMPEG_READ_ATTEMPTS'] = '100000000000'class multiDealImg(object):def __init__(self, model_path, rtmp):self.model = YOLO(model_path)#self.frame_queue = queue.Queue(maxsize=1)  # 设置队列的最大大小,这里设置为1,确保只处理最新的帧# self.show_queue = queue.Queue(maxsize=1)self.command = ['ffmpeg','-y',# "-map", "0:v"'-f', 'rawvideo','-vcodec', 'rawvideo','-pix_fmt', 'bgr24','-s', '{}x{}'.format(1280,720),  # 根据输入视频尺寸填写'-r', '27','-i', '-',# '-c:v', 'libx264','-c:v', 'h264_nvenc','-b:v', '2M','-maxrate', '2M','-bufsize', '4M','-pix_fmt', 'yuv420p',# '-preset', 'ultrafast','-f', 'flv',# "-hwaccel", "nvenc",    "-crf", "23",rtmp]self.pipe = subprocess.Popen(self.command, stdin=subprocess.PIPE)self.is_switchOn = Trueself.ffmpeg_command = ["ffmpeg","-y",  # 覆盖输出文件"-f","rawvideo","-vcodec","rawvideo","-pix_fmt","bgr24",  # OpenCV读取的帧格式是BGR"-s","{}x{}".format(1280, 720),  # 根据输入视频尺寸填写"-r","30","-i","-",  # 输入来自管道(stdin)"-c:v","libx264",  # 使用H.264编码"-pix_fmt","yuv420p",  # 确保兼容性"-crf","23",  # 设置CRF值(质量)"./out.mp4",]self.ffmpegSaveVideo = subprocess.Popen(self.ffmpeg_command, stdin=subprocess.PIPE)self.save_queue = queue.Queue(maxsize=3)self.push_queue = queue.Queue(maxsize=3)# 视频读取函数def read_video2(self, video_path, frame_queue):ffmpegReadCommand = ['ffmpeg','-i', video_path,  # 输入流'-f', 'image2pipe',  # 输出格式为图像管道'-pix_fmt', 'bgr24',  # 像素格式为 BGR24'-vcodec', 'rawvideo',  # 视频编码为原始视频'-']ffmpegReadProcess = subprocess.Popen(ffmpegReadCommand, stdout=subprocess.PIPE)       width = 1920  # 根据实际分辨率调整height = 1080  # 根据实际分辨率调整try:while self.is_switchOn:t5 = time.time()raw_frame = ffmpegReadProcess.stdout.read(width * height * 3)if not raw_frame:print("无法读取帧")breakframe = np.frombuffer(raw_frame, dtype='uint8').reshape((height, width, 3))t6 = time.time()print("t6 - t5 = ", t6 - t5)try:frame_queue.put_nowait(frame)except queue.Full:# 如果队列满了,丢弃旧帧以防止卡顿frame_queue.get_nowait()frame_queue.put_nowait(frame)except KeyboardInterrupt:print("视频流中断")"""# 清空队列(可选,如果只想处理最新帧)if not frame_queue.empty():frame_queue.get(block=True, timeout=50000)# 将最新帧放入队列frame_queue.put(frame)"""finally:# 释放资源ffmpegReadProcess.terminate()print("视频流结束")def read_video(self, video_path, frame_queue):cap = cv2.VideoCapture(video_path)total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))print(f"Total frames in the video: {total_frames}")frame_counter = 0fps = cap.get(cv2.CAP_PROP_FPS)while cap.isOpened() and self.is_switchOn:#time.sleep(0.05)# print("111")t5 = time.time()ret, frame = cap.read()frame_counter += 1#循环读取视频条件判断if frame_counter == int(cap.get(cv2.CAP_PROP_FRAME_COUNT)):frame_counter = 0cap.set(cv2.CAP_PROP_POS_FRAMES, 0)if not ret:breakt6 = time.time()print("t6 - t5 = ", t6 - t5)try:frame_queue.put_nowait(frame)except queue.Full:# 如果队列满了,丢弃旧帧以防止卡顿frame_queue.get_nowait()frame_queue.put_nowait(frame)time.sleep(1.0 / fps)"""# 清空队列(可选,如果只想处理最新帧)if not frame_queue.empty():frame_queue.get(block=True, timeout=50000)# 将最新帧放入队列frame_queue.put(frame)"""cap.release()def read_video1(self, video_path, frame_queue):cap = cv2.VideoCapture(video_path)total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))print(f"Total frames in the video: {total_frames}")while cap.isOpened() and self.is_switchOn:#time.sleep(0.05)# print("111")t5 = time.time()ret, frame = cap.read()if not ret:breakt6 = time.time()print("t6 - t5 = ", t6 - t5)try:frame_queue.put_nowait(frame)except queue.Full:# 如果队列满了,丢弃旧帧以防止卡顿frame_queue.get_nowait()frame_queue.put_nowait(frame)"""# 清空队列(可选,如果只想处理最新帧)if not frame_queue.empty():frame_queue.get(block=True, timeout=50000)# 将最新帧放入队列frame_queue.put(frame)"""cap.release()def saveImg(self, save_queue):while self.is_switchOn:try:frame1 = save_queue.get(block=True, timeout=500)  # 设置超时以避免无限等待self.ffmpegSaveVideo.stdin.write(frame1.tobytes())except queue.Empty:print("No show frame available")self.ffmpegSaveVideo.stdin.close()self.ffmpegSaveVideo.wait()except Exception as e:print(f"An error occurred: {e}")breakself.ffmpegSaveVideo.stdin.close()self.ffmpegSaveVideo.wait()def pushImg(self, push_queue):print("start push images")while self.is_switchOn:try:frame1 = push_queue.get(block=True, timeout=500)  # 设置超时以避免无限等待self.pipe.stdin.write(frame1.tobytes())except queue.Empty:print("No show frame available")self.pipe.stdin.close()self.pipe.wait()except Exception as e:print(f"An error occurred: {e}")breakself.pipe.stdin.close()self.pipe.wait()# 图像处理函数def process_frame(self, frame_queue):while self.is_switchOn:try:# 阻塞直到队列中有帧可用t1 = time.time()frame = frame_queue.get(block=True, timeout=600)  # 设置超时以避免无限等待t2 = time.time()# print("t2 - t1 = ", t2 - t1)results = self.model(frame, show_conf = False, verbose=False)t3 = time.time()# print("t3 - t2 = ", t3 - t2)for result in results:boxes = result.boxes  # Boxes 对象,用于边界框输出# masks = result.masks  # Masks 对象,用于分割掩码输出# keypoints = result.keypoints  # Keypoints 对象,用于姿态输出# probs = result.probs  # Probs 对象,用于分类输出# cv2.imshow("result",masks)frame = result.plot(conf = False, line_width = 1, font_size = 0.2) t4 = time.time()print("t4 - t1 = ", t4 - t1)frame = cv2.resize(frame, (1280,720))self.save_queue.put(frame)self.push_queue.put(frame) print("Processing frame")except queue.Empty:print("No frame available")# out.release()# self.ffmpegSaveVideo.stdin.close()# self.ffmpegSaveVideo.wait()# self.pipe.stdin.close()# self.pipe.wait()# breakexcept Exception as e:print(f"An error occurred: {e}")breakdef listen_for_stop(self, listenVule):self.is_switchOn = listenVuledef _listen_for_stop(self):# 监听外部状态(例如键盘输入)while self.is_switchOn:user_input = input("输入 'stop' 关闭视频流: ")if user_input.strip().lower() == 'stop':self.is_switchOn = Falseprint("收到关闭信号,正在关闭视频流...")breakdef startThread(self, vide_path):frame_queue = queue.Queue(maxsize=20)# 创建并启动线程video_thread = threading.Thread(target=self.read_video, args=(vide_path, frame_queue))process_thread = threading.Thread(target=self.process_frame, args=(frame_queue,))push_thread = threading.Thread(target=self.pushImg, args=(self.push_queue,))save_thread = threading.Thread(target=self.saveImg, args=(self.save_queue,))listen_thread = threading.Thread(target=self._listen_for_stop, args=())#show_thread = threading.Thread(target=self.dealImg, args=(self.show_queue,))video_thread.start()process_thread.start()push_thread.start()save_thread.start()listen_thread.start()#show_thread.start()# 等待线程完成video_thread.join()process_thread.join()push_thread.join()save_thread.join()listen_thread.join()#show_thread.join()if __name__ == "__main__":model_path = "./model/best640v8nLast.pt"vide_path = "D:/datasets/chengdou/DJI_0180.MP4"rtmp = "rtmp://192.168.100.20:1935/stream/example"getDetect = multiDealImg(model_path, rtmp)getDetect.startThread(vide_path)

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/893373.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一文学会YOLO系列算法(从V3到11)实现遥感图像目标检测

目录 前言 数据集介绍 数据集转换 YOLO代码的下载 YOLO的配置 1.数据集的配置 2.模型的配置 YOLO11模型的训练 其它版本YOLO的训练 前言 遥感技术的快速发展,特别是在高分辨率遥感图像的获取能力上的显著提升,已经大大拓宽了遥感数据在环境监测…

图解Git——分布式Git《Pro Git》

分布式工作流程 Centralized Workflow(集中式工作流) 所有开发者都与同一个中央仓库同步代码,每个人通过拉取、提交来合作。如果两个开发者同时修改了相同的文件,后一个开发者必须在推送之前合并其他人的更改。 Integration-Mana…

【高阶数据结构】布隆过滤器(BloomFilter)

1. 概念 1.1 背景引入 背景:在计算机软件中,一个常见的需求就是 在一个集合中查找一个元素是否存在 ,比如:1. Word 等打字软件需要判断用户键入的单词是否在字典中存在 2. 浏览器等网络爬虫程序需要保存一个列表来记录已经遍历过…

【json_object】mysql中json_object函数过长,显示不全

问题:json只显示部分 解决: SET GLOBAL group_concat_max_len 1000000; -- 设置为1MB,根据需要调整如果当前在navicat上修改,只有效本次连接和后续会话,重新连接还是会恢复默认值1024 在my.ini配置文件中新增或者修…

计算机网络 (52)秘钥分配

一、重要性 在计算机网络中,密钥分配是密钥管理中的一个核心问题。由于密码算法通常是公开的,因此网络的安全性主要依赖于密钥的安全保护。密钥分配的目的是确保密钥在传输过程中不被窃取或篡改,同时确保只有合法的用户才能获得密钥。 二、方…

第35天:安全开发-JavaEE应用原生反序列化重写方法链条分析触发类类加载

时间轴: 序列化与反序列化图解: 演示案例: Java-原生使用-序列化&反序列化 Java-安全问题-重写方法&触发方法 Java-安全问题-可控其他类重写方法 Java-原生使用-序列化&反序列化 1.为什么进行序列化和反序列化&#xff1…

MindAgent:基于大型语言模型的多智能体协作基础设施

2023-09-18 ,加州大学洛杉矶分校(UCLA)、微软研究院、斯坦福大学等机构共同创建的新型基础设施,目的在评估大型语言模型在游戏互动中的规划和协调能力。MindAgent通过CuisineWorld这一新的游戏场景和相关基准,调度多智…

Excel 技巧17 - 如何计算倒计时,并添加该倒计时的数据条(★)

本文讲如何计算倒计时,并添加该倒计时的数据条。 1,如何计算倒计时 这里也要用公式 D3 - TODAY() 显示为下面这个样子的 然后右键该单元格,选 设置单元格格式 然后点 常规 这样就能显示出还书倒计时的日数了。 下拉适用到其他单元格。 2&a…

rocketmq基本架构

简介 Name server 负责broker注册、心跳,路由等功能,类似Kafka的ZKname server节点之间不互相通信,broker需要和所有name server进行通信。扩容name server需要重启broker,不然broker不会和name server建立连接producer和consum…

国产编辑器EverEdit - 大纲视图

1 大纲视图 1.1 应用场景 在编辑较长代码文件时,使用大纲视图可以方便的检视当前文件的变量、函数等信息,方便在不同函数间跳转,对整个文档的全貌了然于胸。   在编辑XML文档时,通过展示XML文件的层次结构、节点布局&#xff0…

Linux中的基本指令(一)

一、Linux中指令的存在意义 Linux中,通过输入指令来让操作系统执行,以此达到控制操作系统的目的,类似于Windows中的双击,右键新建文件,新建文件夹等 1.补:关于屏幕的几个操作指令 ①清屏指令 clear 回…

2025/1/21 学习Vue的第四天

睡觉。 --------------------------------------------------------------------------------------------------------------------------------- 11.Object.defineProperty 1.在我们之前学习JS的时候&#xff0c;普通得定义一个对象与属性。 <!DOCTYPE html> <h…

Go Map 源码分析(一)

Go语言中的map是通过哈希表实现的&#xff0c;其底层结构和实现机制如下&#xff1a; 一、hash 结构 hmap结构体&#xff1a;是map的头部结构&#xff0c;主要字段及含义如下&#xff1a; count&#xff1a;表示当前哈希表中的元素数量&#xff0c;与len()函数相对应。flags…

Linux-C/C++--深入探究文件 I/O (上)(文件的管理、函数返回错误、exit()、_Exit()、_exit())

经过上一章内容的学习&#xff0c;相信各位读者对 Linux 系统应用编程中的基础文件 I/O 操作有了一定的认识和理解了&#xff0c;能够独立完成一些简单地文件 I/O 编程问题&#xff0c;如果你的工作中仅仅只是涉及到一些简单文件读写操作相关的问题&#xff0c;其实上一章的知识…

【机器学习实战中阶】音乐流派分类-自动化分类不同音乐风格

音乐流派分类 – 自动化分类不同音乐风格 在本教程中,我们将开发一个深度学习项目,用于自动化地从音频文件中分类不同的音乐流派。我们将使用音频文件的频率域和时间域低级特征来分类这些音频文件。 对于这个项目,我们需要一个具有相似大小和相似频率范围的音频曲目数据集…

Walrus Learn to Earn计划正式启动!探索去中心化存储的无限可能

本期 Learn to Earn 活动将带领开发者和区块链爱好者深入探索 Walrus 的技术核心与实际应用&#xff0c;解锁分布式存储的无限可能。参与者不仅能提升技能&#xff0c;还能通过完成任务赢取丰厚奖励&#xff01;&#x1f30a; 什么是 Walrus&#xff1f; 数据主权如今正成为越…

git 常用命令 git archive

git archive 是 Git 中用于创建一个包含指定提交或分支中所有文件的归档文件&#xff08;如 .tar 或 .zip&#xff09;的命令。这个命令非常适合用于分发项目快照、备份代码库或导出特定版本的文件。 git archive --formatzip --outputproject.zip HEAD …

Excel 技巧15 - 在Excel中抠图头像,换背景色(★★)

本文讲了如何在Excel中抠图头像&#xff0c;换背景色。 1&#xff0c;如何在Excel中抠图头像&#xff0c;换背景色 大家都知道在PS中可以很容易抠图头像&#xff0c;换背景色&#xff0c;其实Excel中也可以抠简单的图&#xff0c;换背景色。 ※所用头像图片为百度搜索&#x…

持续升级《在线写python》小程序的功能,文章页增加一键复制功能,并自动去掉html标签

增加复制按钮后的界面是这样的 代码如下&#xff1a; <template><view><x-header></x-header><view class"" v-if"article_info"><view class"kuai bgf"><view class"ac fs26"><img sr…

FPGA与ASIC:深度解析与职业选择

IC&#xff08;集成电路&#xff09;行业涵盖广泛&#xff0c;涉及数字、模拟等不同研究方向&#xff0c;以及设计、制造、封测等不同产业环节。其中&#xff0c;FPGA&#xff08;现场可编程门阵列&#xff09;和ASIC&#xff08;专用集成电路&#xff09;是两种重要的芯片类型…