一、前言
如标题所示,这个python代码的目的是利用opencv模块实现多视频源同屏拼接播放的,里面包含视频播放尺寸修改、视频播放加序号、视频流存活检测等方案,可做扩展开发使用。
二、代码
import cv2
import time
from func_timeout import func_set_timeout
import func_timeout
import multiprocessing
import numpy as np# 注意! 因受屏幕分辨率影响,此参数下,桌面1920*1080此处最多适宜播放9个视频!
# 每个视频框的宽度和高度
box_x = 640
box_y = 360# 最大列视频数
row_max = 3# 这里留视频流链接或者mp4都可以
# urls = ['rtsp://admin:XXXXX@172.16.1.1:554/Streaming/Channels/101', 'rtmp://172.16.1.2:1935/live1']
urls = [r'E:\AAAA\2024-07-01\0730.mp4', r'E:\AAAA\2024-07-01\0740.mp4',r'E:\AAAA\2024-07-01\0750.mp4']# 视频绝对空间排列
# 规则设定:单行不超过3个
def cell_boxes(cell_num):if cell_num <= row_max:rows = cell_numcols = 1else:remainder = cell_num % row_maxquotient = cell_num // row_maxif remainder:cols = quotient + 1else:cols = quotientrows = 3print('rows', rows)print('cols', cols)final_matrix = np.zeros((box_y * cols + 30 * cols, box_x * rows + 30 * rows, 3), np.uint8)# 划分每个视频的空间boxs_site = []for n in range(cap_num):n = n + 1line_col = n % rowsif line_col == 0:line_row = n // rowsline_col = rowselse:line_row = n // rows + 1start_x = (line_col - 1) * box_xstart_y = (line_row - 1) * box_yend_x = line_col * box_xend_y = line_row * box_yboxs_site.append([n, [start_y, start_x], [end_y, end_x]])return final_matrix, boxs_site# 视频存活检查
@func_set_timeout(30) # 设定函数超执行时间
def check(base_url):cap = cv2.VideoCapture(base_url)cap.read()return cap# 视频播放
def camera_run(q, camera_id, base_url):while True:try:cap = check(base_url)print('ok!')breakexcept func_timeout.exceptions.FunctionTimedOut:print(base_url, '未打开')time.sleep(5)# exit()try:fps = cap.get(cv2.CAP_PROP_FPS)while True:ret, frame = cap.read()frame = cv2.resize(frame, (box_x, box_y))cv2.putText(img=frame,text=camera_id, # 文本org=(25, 25), # 字体开始写的左上角位置fontFace=cv2.FONT_HERSHEY_SIMPLEX, # 字体`fontScale=1, # 字体大小color=(255, 0, 0), # 颜色thickness=2 # 字体粗细)cv2.waitKey(int(float(1 / int(fps)) * 1000))q.put([str(int(camera_id) + 1), frame])except Exception as e:print(e)print(f'摄像头 {camera_id} 在读流成功后关闭\n')if __name__ == '__main__':cut_box = []msg_box = []cap_num = len(urls)# 最小行列密集设计final_matrix, boxs_site = cell_boxes(cap_num)print(boxs_site)# 进程通讯队列q = multiprocessing.Queue()for i in range(cap_num):camera_id = str(i)# 每路视频设一个进程读流a = multiprocessing.Process(target=camera_run, args=(q, camera_id, urls[int(camera_id)]))a.start()# 主进程视频统一播放while True:# 获取子进程传回的视频流信息msg = q.get()camera_id = msg[0]# 根据摄像头id获取摄像头展示框的绝对坐标site_start = boxs_site[int(camera_id) - 1][1]site_end = boxs_site[int(camera_id) - 1][2]# 放入主进程公共视频播放窗口final_matrix[site_start[0] + 10: site_end[0] + 10, site_start[1] + 10:site_end[1] + 10] = msg[1]cv2.imshow('video', final_matrix)cv2.waitKey(1)