解码并处理视频流的多线程应用
随着视频处理技术的不断发展,越来越多的应用需要对视频流进行解码和处理。在本文中,我们将介绍一个基于Python的多线程应用程序,该应用程序可以解码并处理多个RTSP视频流,同时利用GPU加速,以提高处理速度。
这个应用程序使用了一些关键的Python库和工具,包括PyNvCodec、OpenCV、和PyCUDA等。它充分利用了现代GPU的计算能力,实现了高效的视频解码和处理。
多线程解码
在这个应用程序中,我们使用了Python的concurrent.futures
库来实现多线程解码。每个视频流都在独立的线程中解码,这样可以同时处理多个视频流,充分利用了多核CPU的性能。
from concurrent.futures import ThreadPoolExecutor# ...# 创建线程池
pool = ThreadPoolExecutor(max_workers=len(urls))
futures = []# 遍历每个视频流并提交解码任务
for url in urls:future = pool.submit(decode_rtsp_stream, index, url, gpuID)futures.append(future)index += 1# 等待所有任务完成
pool.shutdown()# 获取每个任务的结果
for future in futures:future.result()
视频解码和处理
视频解码是这个应用程序的核心功能。我们使用PyNvCodec库来进行视频解码,同时利用了GPU来加速处理。
def decode_rtsp_stream(thread_index: int, url: str, gpu_id: int):# 获取视频流参数params = get_stream_params(url)# ...# 创建NvDecoder实例nvdec = nvc.PyNvDecoder(w, h, f, c, g)# ...while True:# 读取视频流数据bits = proc.stdout.read(read_size)# ...# 解码视频帧surf = nvdec.DecodeSurfaceFromPacket(enc_packet, pkt_data)# ...# 执行颜色空间转换和表面下载cvtSurface = nv_cvt.Execute(surf, cc_ctx)success = nv_down.DownloadSingleSurface(cvtSurface, data)# ...# 显示解码后的帧cv2.imshow(str(thread_index), new_data)cv2.waitKey(1)# ...
完整代码
这个应用程序可以广泛用于视频监控、实时视频分析、视频编码和解码等领域。通过多线程解码和GPU加速,它可以处理多个高分辨率视频流,并在实时性要求较高的情况下提供流畅的显示和处理效果。
import os
import sys
import subprocess
import json
import PyNvCodec as nvc
import numpy as np
from io import BytesIO
from multiprocessing import Process
import uuid
import time
from concurrent.futures import ThreadPoolExecutor
import cv2
import pycuda.gpuarray as gpuarray
# import PytorchNvCodec as pnvc
import torch
import torchvision.transforms as Tdef add_cuda_dll_directories():if os.name == "nt":cuda_path = os.environ.get("CUDA_PATH")if cuda_path:os.add_dll_directory(cuda_path)else:print("CUDA_PATH environment variable is not set.", file=sys.stderr)exit(1)sys_path = os.environ.get("PATH")if sys_path:paths = sys_path.split(";")for path in paths:if os.path.isdir(path) and path != '.':os.add_dll_directory(path)else:print("PATH environment variable is not set.", file=sys.stderr)exit(1)def surface_to_tensor(surface: nvc.Surface) -> torch.Tensor:"""Converts planar rgb surface to cuda float tensor."""if surface.Format() != nvc.PixelFormat.RGB_PLANAR:raise RuntimeError("Surface shall be of RGB_PLANAR pixel format")surf_plane = surface.PlanePtr()img_tensor = pnvc.DptrToTensor(surf_plane.GpuMem(),surf_plane.Width(),surf_plane.Height(),surf_plane.Pitch(),surf_plane.ElemSize(),)if img_tensor is None:raise RuntimeError("Can not export to tensor.")img_tensor.resize_(3, int(surf_plane.Height() / 3), surf_plane.Width())img_tensor = img_tensor.type(dtype=torch.cuda.FloatTensor)img_tensor = torch.divide(img_tensor, 255.0)img_tensor = torch.clamp(img_tensor, 0.0, 1.0)return img_tensordef get_stream_params(url: str):cmd = ["ffprobe","-v","quiet","-print_format","json","-show_format","-show_streams",url,]proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)stdout = proc.communicate()[0]bio = BytesIO(stdout)json_out = json.load(bio)params = {}if not "streams" in json_out:return {}for stream in json_out["streams"]:if stream["codec_type"] == "video":params["width"] = stream["width"]params["height"] = stream["height"]params["framerate"] = float(eval(stream["avg_frame_rate"]))codec_name = stream["codec_name"]is_h264 = True if codec_name == "h264" else Falseis_hevc = True if codec_name == "hevc" else Falseif not is_h264 and not is_hevc:raise ValueError("Unsupported codec: "+ codec_name+ ". Only H.264 and HEVC are supported in this sample.")else:params["codec"] = (nvc.CudaVideoCodec.H264 if is_h264 else nvc.CudaVideoCodec.HEVC)pix_fmt = stream["pix_fmt"]is_yuv420 = pix_fmt == "yuv420p"is_yuv444 = pix_fmt == "yuv444p"# YUVJ420P and YUVJ444P are deprecated but still wide spread, so handle# them as well. They also indicate JPEG color range.is_yuvj420 = pix_fmt == "yuvj420p"is_yuvj444 = pix_fmt == "yuvj444p"if is_yuvj420:is_yuv420 = Trueparams["color_range"] = nvc.ColorRange.JPEGif is_yuvj444:is_yuv444 = Trueparams["color_range"] = nvc.ColorRange.JPEGif not is_yuv420 and not is_yuv444:raise ValueError("Unsupported pixel format: "+ pix_fmt+ ". Only YUV420 and YUV444 are supported in this sample.")else:params["format"] = (nvc.PixelFormat.NV12 if is_yuv420 else nvc.PixelFormat.YUV444)# Color range default option. We may have set when parsing# pixel format, so check first.if "color_range" not in params:params["color_range"] = nvc.ColorRange.MPEG# Check actual value.if "color_range" in stream:color_range = stream["color_range"]if color_range == "pc" or color_range == "jpeg":params["color_range"] = nvc.ColorRange.JPEG# Color space default option:params["color_space"] = nvc.ColorSpace.BT_601# Check actual value.if "color_space" in stream:color_space = stream["color_space"]if color_space == "bt709":params["color_space"] = nvc.ColorSpace.BT_709return paramsreturn {}def decode_rtsp_stream(thread_index: int, url: str, gpu_id: int):params = get_stream_params(url)if not len(params):raise ValueError("Can not get " + url + " streams params")w = params["width"]h = params["height"]f = params["format"]c = params["codec"]framerate = params["framerate"]g = gpu_idif nvc.CudaVideoCodec.H264 == c:codec_name = "h264"elif nvc.CudaVideoCodec.HEVC == c:codec_name = "hevc"bsf_name = codec_name + "_mp4toannexb,dump_extra=all"cmd = ["ffmpeg","-hide_banner","-i",url,"-c:v","copy","-bsf:v",bsf_name,"-f",codec_name,"pipe:1",]proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)nvdec = nvc.PyNvDecoder(w, h, f, c, g)read_size = 4096rt = 0fd = 0t0 = time.time()print("running stream")# nv_cvt = nvc.PySurfaceConverter(# w, h, self.nvYuv.Format(), nvc.PixelFormat.RGB, 0# )nv_cvt = nvc.PySurfaceConverter(w, h, nvc.PixelFormat.NV12, nvc.PixelFormat.BGR, g)cc_ctx = nvc.ColorspaceConversionContext(params["color_space"], params["color_range"])nv_down = nvc.PySurfaceDownloader(w, h, nv_cvt.Format(), g)data = np.zeros((w * h, 3), np.uint8)empty_count = 0while True:t1=time.time()if not read_size:read_size = int(rt / fd)rt = read_sizefd = 1bits = proc.stdout.read(read_size)if not len(bits):print("Can't read data from pipe")breakelse:rt += len(bits)enc_packet = np.frombuffer(buffer=bits, dtype=np.uint8)pkt_data = nvc.PacketData()try:surf = nvdec.DecodeSurfaceFromPacket(enc_packet, pkt_data)if not surf.Empty():fd += 1if pkt_data.bsl < read_size:read_size = pkt_data.bslcvtSurface = nv_cvt.Execute(surf, cc_ctx)success = nv_down.DownloadSingleSurface(cvtSurface, data)if success:new_data = data.reshape((h, w, 3))cv2.imshow(str(thread_index), new_data)cv2.waitKey(1)else:empty_count += 1if empty_count > framerate * 30:print("surf is Empty too many times > "+str(framerate * 30))nvdec = nvc.PyNvDecoder(w, h, f, c, g)empty_count = 0except nvc.HwResetException:nvdec = nvc.PyNvDecoder(w, h, f, c, g)empty_count = 0continuet2 = time.time()# print((t2-t1)*1000)if __name__ == "__main__":add_cuda_dll_directories()print("This sample decodes multiple videos in parallel on given GPU.")print("It doesn't do anything beside decoding, output isn't saved.")print("Usage: SampleDecodeRTSP.py $gpu_id $url1 ... $urlN .")if len(sys.argv) < 2:print("Provide gpu ID and input URL(s).")exit(1)gpuID = int(sys.argv[1])urls = sys.argv[2:]pool = ThreadPoolExecutor(max_workers=len(urls))futures = []index = 0for url in urls:future = pool.submit(decode_rtsp_stream, index, url, gpuID)futures.append(future)index += 1pool.shutdown()for future in futures:future.result()
运行脚本
python rtsp_decoder.py 0 rtsp://admin:a1234567@10.10.16.26:554/Streaming/Channels/101?transportmode=multicast
VPF库安装
windows11编译VideoProcessingFramework库_random_2011的博客-CSDN博客