1、搭建流媒体服务器
下载mediamtx
2、视频流直推
ffmpeg -stream_loop -1 -i DJI_20250109112715_0002_W.MP4 -r 30 -c:v libx264 -preset ultrafast -f flv rtmp://192.168.100.20:1935/live/test_chengdu1
3、硬件加速
如果硬件支持,可以使用硬件加速编码器(如 h264_nvenc
、h264_vaapi
或 h264_qsv
),以减少 CPU 负载。
NVIDIA GPU:
-c:v h264_nvenc
Intel GPU:
-c:v h264_qsv
AMD GPU:
-c:v h264_amf
4、比特率控制
-b:v
:设置视频比特率。过高的比特率可能导致网络拥塞,过低的比特率可能导致画质下降。根据网络带宽合理设置。
-maxrate
和-bufsize
:限制最大比特率和缓冲区大小,避免网络波动导致卡顿。
5、 网络缓冲
在FFmpeg命令中增加网络缓冲参数,以应对网络不稳定。
-re
这个参数会让FFmpeg以原始速率读取输入,而不是实时编码,从而给网络传输留下更多的缓冲时间。
比如:ffmpeg -i http://192.168.1.100:8080/video -c:v libx264 -s 1280x720 -b:v 2000k -r 15 -bufsize 4000k -crf 18 -hwaccel nvenc -re output.ffm
6、ffmpeg直推视频
ffmpeg -re -stream_loop -1 -i DJI_0180.MP4 -r 30 -c:v libx264 -f flv rtmp://192.168.100.20:1935/live/test_chengdu1
6、使用ffmpeg直接读取视频
frame_queue = queue.Queue(maxsize=1)
def read_video2(self, video_path, frame_queue):ffmpegReadCommand = ['ffmpeg','-i', video_path, # 输入流'-f', 'image2pipe', # 输出格式为图像管道'-pix_fmt', 'bgr24', # 像素格式为 BGR24'-vcodec', 'rawvideo', # 视频编码为原始视频'-']ffmpegReadProcess = subprocess.Popen(ffmpegReadCommand, stdout=subprocess.PIPE) width = 1920 # 根据实际分辨率调整height = 1080 # 根据实际分辨率调整try:while self.is_switchOn:t5 = time.time()raw_frame = ffmpegReadProcess.stdout.read(width * height * 3)if not raw_frame:print("无法读取帧")breakframe = np.frombuffer(raw_frame, dtype='uint8').reshape((height, width, 3))t6 = time.time()print("t6 - t5 = ", t6 - t5)try:frame_queue.put_nowait(frame)except queue.Full:# 如果队列满了,丢弃旧帧以防止卡顿frame_queue.get_nowait()frame_queue.put_nowait(frame)except KeyboardInterrupt:print("视频流中断")"""# 清空队列(可选,如果只想处理最新帧)if not frame_queue.empty():frame_queue.get(block=True, timeout=50000)# 将最新帧放入队列frame_queue.put(frame)"""finally:# 释放资源ffmpegReadProcess.terminate()print("视频流结束")
7、最终代码
import cv2
import time
# import os
import numpy as np
import copy
import queue
import threading
from ultralytics import YOLO
import subprocess# 创建一个队列,用于存储视频帧# os.environ['OPENCV_FFMPEG_READ_ATTEMPTS'] = '100000000000'class multiDealImg(object):def __init__(self, model_path, rtmp):self.model = YOLO(model_path)#self.frame_queue = queue.Queue(maxsize=1) # 设置队列的最大大小,这里设置为1,确保只处理最新的帧# self.show_queue = queue.Queue(maxsize=1)self.command = ['ffmpeg','-y',# "-map", "0:v"'-f', 'rawvideo','-vcodec', 'rawvideo','-pix_fmt', 'bgr24','-s', '{}x{}'.format(1280,720), # 根据输入视频尺寸填写'-r', '27','-i', '-',# '-c:v', 'libx264','-c:v', 'h264_nvenc','-b:v', '2M','-maxrate', '2M','-bufsize', '4M','-pix_fmt', 'yuv420p',# '-preset', 'ultrafast','-f', 'flv',# "-hwaccel", "nvenc", "-crf", "23",rtmp]self.pipe = subprocess.Popen(self.command, stdin=subprocess.PIPE)self.is_switchOn = Trueself.ffmpeg_command = ["ffmpeg","-y", # 覆盖输出文件"-f","rawvideo","-vcodec","rawvideo","-pix_fmt","bgr24", # OpenCV读取的帧格式是BGR"-s","{}x{}".format(1280, 720), # 根据输入视频尺寸填写"-r","30","-i","-", # 输入来自管道(stdin)"-c:v","libx264", # 使用H.264编码"-pix_fmt","yuv420p", # 确保兼容性"-crf","23", # 设置CRF值(质量)"./out.mp4",]self.ffmpegSaveVideo = subprocess.Popen(self.ffmpeg_command, stdin=subprocess.PIPE)self.save_queue = queue.Queue(maxsize=3)self.push_queue = queue.Queue(maxsize=3)# 视频读取函数def read_video2(self, video_path, frame_queue):ffmpegReadCommand = ['ffmpeg','-i', video_path, # 输入流'-f', 'image2pipe', # 输出格式为图像管道'-pix_fmt', 'bgr24', # 像素格式为 BGR24'-vcodec', 'rawvideo', # 视频编码为原始视频'-']ffmpegReadProcess = subprocess.Popen(ffmpegReadCommand, stdout=subprocess.PIPE) width = 1920 # 根据实际分辨率调整height = 1080 # 根据实际分辨率调整try:while self.is_switchOn:t5 = time.time()raw_frame = ffmpegReadProcess.stdout.read(width * height * 3)if not raw_frame:print("无法读取帧")breakframe = np.frombuffer(raw_frame, dtype='uint8').reshape((height, width, 3))t6 = time.time()print("t6 - t5 = ", t6 - t5)try:frame_queue.put_nowait(frame)except queue.Full:# 如果队列满了,丢弃旧帧以防止卡顿frame_queue.get_nowait()frame_queue.put_nowait(frame)except KeyboardInterrupt:print("视频流中断")"""# 清空队列(可选,如果只想处理最新帧)if not frame_queue.empty():frame_queue.get(block=True, timeout=50000)# 将最新帧放入队列frame_queue.put(frame)"""finally:# 释放资源ffmpegReadProcess.terminate()print("视频流结束")def read_video(self, video_path, frame_queue):cap = cv2.VideoCapture(video_path)total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))print(f"Total frames in the video: {total_frames}")frame_counter = 0fps = cap.get(cv2.CAP_PROP_FPS)while cap.isOpened() and self.is_switchOn:#time.sleep(0.05)# print("111")t5 = time.time()ret, frame = cap.read()frame_counter += 1#循环读取视频条件判断if frame_counter == int(cap.get(cv2.CAP_PROP_FRAME_COUNT)):frame_counter = 0cap.set(cv2.CAP_PROP_POS_FRAMES, 0)if not ret:breakt6 = time.time()print("t6 - t5 = ", t6 - t5)try:frame_queue.put_nowait(frame)except queue.Full:# 如果队列满了,丢弃旧帧以防止卡顿frame_queue.get_nowait()frame_queue.put_nowait(frame)time.sleep(1.0 / fps)"""# 清空队列(可选,如果只想处理最新帧)if not frame_queue.empty():frame_queue.get(block=True, timeout=50000)# 将最新帧放入队列frame_queue.put(frame)"""cap.release()def read_video1(self, video_path, frame_queue):cap = cv2.VideoCapture(video_path)total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))print(f"Total frames in the video: {total_frames}")while cap.isOpened() and self.is_switchOn:#time.sleep(0.05)# print("111")t5 = time.time()ret, frame = cap.read()if not ret:breakt6 = time.time()print("t6 - t5 = ", t6 - t5)try:frame_queue.put_nowait(frame)except queue.Full:# 如果队列满了,丢弃旧帧以防止卡顿frame_queue.get_nowait()frame_queue.put_nowait(frame)"""# 清空队列(可选,如果只想处理最新帧)if not frame_queue.empty():frame_queue.get(block=True, timeout=50000)# 将最新帧放入队列frame_queue.put(frame)"""cap.release()def saveImg(self, save_queue):while self.is_switchOn:try:frame1 = save_queue.get(block=True, timeout=500) # 设置超时以避免无限等待self.ffmpegSaveVideo.stdin.write(frame1.tobytes())except queue.Empty:print("No show frame available")self.ffmpegSaveVideo.stdin.close()self.ffmpegSaveVideo.wait()except Exception as e:print(f"An error occurred: {e}")breakself.ffmpegSaveVideo.stdin.close()self.ffmpegSaveVideo.wait()def pushImg(self, push_queue):print("start push images")while self.is_switchOn:try:frame1 = push_queue.get(block=True, timeout=500) # 设置超时以避免无限等待self.pipe.stdin.write(frame1.tobytes())except queue.Empty:print("No show frame available")self.pipe.stdin.close()self.pipe.wait()except Exception as e:print(f"An error occurred: {e}")breakself.pipe.stdin.close()self.pipe.wait()# 图像处理函数def process_frame(self, frame_queue):while self.is_switchOn:try:# 阻塞直到队列中有帧可用t1 = time.time()frame = frame_queue.get(block=True, timeout=600) # 设置超时以避免无限等待t2 = time.time()# print("t2 - t1 = ", t2 - t1)results = self.model(frame, show_conf = False, verbose=False)t3 = time.time()# print("t3 - t2 = ", t3 - t2)for result in results:boxes = result.boxes # Boxes 对象,用于边界框输出# masks = result.masks # Masks 对象,用于分割掩码输出# keypoints = result.keypoints # Keypoints 对象,用于姿态输出# probs = result.probs # Probs 对象,用于分类输出# cv2.imshow("result",masks)frame = result.plot(conf = False, line_width = 1, font_size = 0.2) t4 = time.time()print("t4 - t1 = ", t4 - t1)frame = cv2.resize(frame, (1280,720))self.save_queue.put(frame)self.push_queue.put(frame) print("Processing frame")except queue.Empty:print("No frame available")# out.release()# self.ffmpegSaveVideo.stdin.close()# self.ffmpegSaveVideo.wait()# self.pipe.stdin.close()# self.pipe.wait()# breakexcept Exception as e:print(f"An error occurred: {e}")breakdef listen_for_stop(self, listenVule):self.is_switchOn = listenVuledef _listen_for_stop(self):# 监听外部状态(例如键盘输入)while self.is_switchOn:user_input = input("输入 'stop' 关闭视频流: ")if user_input.strip().lower() == 'stop':self.is_switchOn = Falseprint("收到关闭信号,正在关闭视频流...")breakdef startThread(self, vide_path):frame_queue = queue.Queue(maxsize=20)# 创建并启动线程video_thread = threading.Thread(target=self.read_video, args=(vide_path, frame_queue))process_thread = threading.Thread(target=self.process_frame, args=(frame_queue,))push_thread = threading.Thread(target=self.pushImg, args=(self.push_queue,))save_thread = threading.Thread(target=self.saveImg, args=(self.save_queue,))listen_thread = threading.Thread(target=self._listen_for_stop, args=())#show_thread = threading.Thread(target=self.dealImg, args=(self.show_queue,))video_thread.start()process_thread.start()push_thread.start()save_thread.start()listen_thread.start()#show_thread.start()# 等待线程完成video_thread.join()process_thread.join()push_thread.join()save_thread.join()listen_thread.join()#show_thread.join()if __name__ == "__main__":model_path = "./model/best640v8nLast.pt"vide_path = "D:/datasets/chengdou/DJI_0180.MP4"rtmp = "rtmp://192.168.100.20:1935/stream/example"getDetect = multiDealImg(model_path, rtmp)getDetect.startThread(vide_path)