通过python消费者和生产者队列,实现保存视频流
参考
https://blog.csdn.net/qq_33843237/article/details/137745905
原理
通过opencv读入RTSP或RTMP流,采用消费者-生产者模型,通过生产者线程,每次读入一个视频帧(生产)存入queue队列,再通过消费者线程对视频帧进行写入(消费)。
代码(确保队列中所有帧写入完成再停止)
from queue import Queue, Empty
import threadingimport cv2# 定义一个共享的退出标志
exit_flag = False
output_video = None
total_frame = 0
logging_lock = threading.Lock()class FrameProducer(threading.Thread):def __init__(self, frame_queue, url):super().__init__()self.frame_queue = frame_queueself.url = urlself.video = Noneself.daemon = Truedef run(self):global exit_flag, output_video, total_framelog('in producer')self.video = cv2.VideoCapture(self.url)width, height = int(self.video.get(3)), int(self.video.get(4))# 获取视频帧率fps = self.video.get(cv2.CAP_PROP_FPS)if fps < 1:fps = 30# 使用H264编码创建VideoWriter对象(需要相关环境,如果没有要把'avc1'改成'mp4v')output_video = cv2.VideoWriter('output1.mp4', cv2.VideoWriter_fourcc(*'avc1'), fps, (width, height))while not exit_flag and self.video.isOpened():ret, image = self.video.read()log(f'get frame = {ret}')if ret:self.frame_queue.put(image)total_frame += 1# 设置程序退出条件if total_frame >= 60:exit_flag = Trueself.video.release()class FrameConsumer(threading.Thread):def __init__(self, frame_queue):super().__init__()self.frame_queue = frame_queueself.daemon = Truedef run(self):global exit_flag, output_videolog('in consumer')# 确保所有的帧都被写入视频文件while not exit_flag or self.frame_queue.qsize() > 0:try:log(f'frame_queue size= {self.frame_queue.qsize()}')frame = self.frame_queue.get(timeout=1) # 非阻塞地尝试从队列中获取帧if frame is None: # 如果队列为空,则退出循环breakoutput_video.write(frame) # 将帧写入视频文件except Empty:pass # 队列为空,不做任何操作def log(message):"""线程安全的日志输出函数"""with logging_lock:print(message)def main(url: str = 'rtmp://liteavapp.qcloud.com/live/liteavdemoplayerstreamid') -> None:frame_queue = Queue(maxsize=60)producer = FrameProducer(frame_queue, url)producer.start()consumer = FrameConsumer(frame_queue)consumer.start()producer.join()consumer.join()log(f'frame_queue size= {frame_queue.qsize()}')frame_queue.task_done()if output_video is not None:output_video.release()log("程序已结束")if __name__ == '__main__':main()