文本语音 rtsp适时播放叫号系统的底层逻辑
发布Linux, unix socket 和window win32做为音频源的 python10下的(ffmpeg version 7.1) 可运行版本.
这两天在弄这个,前2篇是通过虚拟声卡,达到了最简单的一个逻辑,播放文本就从声卡发声,不播无所谓,自动忙音。 那个工作在windows平台,
而今天的这个相似功能的代码是mac os,理论支持windows,和linux,依赖ffmpeg和xiu(一个rust流服务器)的rtsp服务。
今天的难点有点多
- asyncio的任务 async def _tts_worker(self, text: str) 运行中有各种错误, engine runAndWait是不行的。 内部有它的event loop。所以init和endLoop,是暂时找到的解决办法。同时经历了,这个,和调用 ffmpeg 外部指令,并直接获取- 代表的stdout。 会遇到各种问题。做了捕获和处理。但是查找的时候,不是太容易。
- self._start_ffmpeg() 他需要, create socket 或pipe完成以后,才能运行。 调试我都手工在外部启动。 作用就是,输出到rtsp服务器,以备播放。
- input handle,等都是ai生成的,因为有好多种循环,这是比较省心在。
- 最紧急隐蔽在是, async def _heartbeat(self) 他需要计算播放静音的时间,长了不行,短了不行。 这个最初在测试代码,就几个函数。然后AI,生成了三个theading的版本,两个Queue。 然后转到了异步版本,明显快了很多。
- 在windows上使用win32pipen可以达到unix socket的效果很相似, 记得还有FIFO是linux专用的,当然还有stdin,和stdout。对于ffmpeg,这是一些程序内部的传送机制
- rtsp是需要一个后台的服务的,xiu是开源的rust项目,可以使。另外window推荐metamtx,双击运行,什么也不管。
音画同步应该是另个问题了,几天前,鼓捣了一下图片。让编辑后的,马上 在视频中显示。 这个另外一个话题了。做的这些就为了,让报号和点单,有个界面。
ffmpeg -re -framerate 30 -f image2 -loop 1 -i "image1.jpg" -c:v libx264 -preset ultrafast -tune zerolatency -pix_fmt rgba -f rtsp -rtsp_transport tcp rtsp://localhost:8554/live
合并的代码,就当成剩下的作业,有空再来做。
对于刚接触的,最好是慢慢和AI调试着来,一些功能就做出来。
语音推送使用ffmpeg独立进程,实现了前后中断后自动重启。
程序主体
可独立运行,也可以结合ffmg管理推送进程
- macos ,理论Linux适用,单文件可执行
main.py
import asyncio
import struct
import pyttsx3
import tempfile
import os
import socket
from aioconsole import ainput
from contextlib import suppress
from typing import Optionalclass AsyncTTSController:def __init__(self):# 使用Unix域套接字self.socket_path = "/tmp/tts_audio.sock"self.server_socket: Optional[socket.socket] = Noneself.client_socket: Optional[socket.socket] = None# 进程控制self.ffmpeg_process: Optional[asyncio.subprocess.Process] = Noneself.running = False# TTS引擎self.engine = pyttsx3.init()self.engine.setProperty('rate', 180)self.engine.setProperty('volume', 1.0)# 音频参数self.sample_rate = 24000self.channels = 1self.bits_per_sample = 16self.silence = self._generate_silence(0.2)self.wav_header = self._generate_wav_header()# 状态管理self.connection_active = Falseself.last_heartbeat = 0.0self.heartbeat_interval = 2.0self.sending_audio = 0def _generate_wav_header(self) -> bytes:"""生成WAV文件头"""byte_rate = self.sample_rate * self.channels * self.bits_per_sample // 8block_align = self.channels * self.bits_per_sample // 8return struct.pack('<4sI4s4sIHHIIHH4sI',b'RIFF', 36, b'WAVE', b'fmt ', 16, 1, self.channels,self.sample_rate, byte_rate, block_align, self.bits_per_sample,b'data', 0)def _generate_silence(self, duration: float) -> bytes:"""生成静音数据"""samples = int(self.sample_rate * duration)return bytes(samples * self.channels * (self.bits_per_sample // 8))async def _async_create_socket(self) -> None:"""创建Unix域套接字"""with suppress(Exception):if os.path.exists(self.socket_path):os.unlink(self.socket_path)self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)self.server_socket.setblocking(False)self.server_socket.bind(self.socket_path)self.server_socket.listen(1)loop = asyncio.get_running_loop()while self.running and not self.connection_active:try:self.client_socket, _ = await loop.sock_accept(self.server_socket)self.connection_active = Trueprint("客户端已连接")await loop.sock_sendall(self.client_socket, self.wav_header)except (BlockingIOError, InterruptedError):await asyncio.sleep(0.1)except Exception as e:print(f"连接错误: {str(e)}")self.connection_active = Falseawait asyncio.sleep(1)async def _start_ffmpeg(self) -> None:"""启动FFmpeg进程"""with suppress(Exception):if self.ffmpeg_process:self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()socketid='unix:'+self.socket_pathself.ffmpeg_process = await asyncio.create_subprocess_exec('ffmpeg','-f', 's16le','-ar', str(self.sample_rate),'-ac', str(self.channels),'-i', socketid, # 修改输入源为套接字路径'-c:a', 'aac','-f', 'rtsp','-rtsp_transport', 'tcp','rtsp://localhost:8554/mystream',stdout=asyncio.subprocess.DEVNULL,stdin=asyncio.subprocess.DEVNULL,stderr=asyncio.subprocess.PIPE)asyncio.create_task(self._monitor_ffmpeg_errors())async def _monitor_ffmpeg_errors(self) -> None:"""监控FFmpeg错误输出"""while self.running and self.ffmpeg_process:line = await self.ffmpeg_process.stderr.readline()if not line:break# print(f"[FFmpeg Error] {line.decode().strip()}")async def _async_write_socket(self, data: bytes) -> None:"""安全写入套接字"""try:if self.client_socket and self.connection_active:loop = asyncio.get_running_loop()await loop.sock_sendall(self.client_socket, data)except (BrokenPipeError, ConnectionResetError):print("连接已断开,尝试重连...")await self._reconnect_pipeline()except Exception as e:print(f"写入错误: {str(e)}")self.connection_active = Falseasync def _reconnect_pipeline(self) -> None:"""完整重连流程"""print("启动重连流程...")self.connection_active = Falseif self.client_socket:self.client_socket.close()task1=asyncio.create_task(self._async_create_socket()),task2=asyncio.create_task( self._start_ffmpeg()), await task2await task1# await asyncio.gather(task1, task2)#await self._async_create_socket()#await self._start_ffmpeg()# 剩余的heartbeat、tts_worker、input_handler等方法保持相同...async def stop(self) -> None:"""安全关闭"""self.running = Falsewith suppress(Exception):if self.ffmpeg_process:self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()if self.client_socket:self.client_socket.close()if self.server_socket:self.server_socket.close()if os.path.exists(self.socket_path):os.unlink(self.socket_path)print("所有资源已释放")async def _heartbeat(self) -> None:"""心跳维持机制"""while self.running:if self.connection_active :for i in range(10):if self.sending_audio<0:await self._async_write_socket(self.silence)else :self.sending_audio-= 2await asyncio.sleep(0.2) # print(self.sending_audio,"slend")# await asyncio.sleep(self.heartbeat_interval)else:await asyncio.sleep(0.5)def _sync_tts(self,text,tmp_filename):eng=pyttsx3.init()# eng.say(text)eng.save_to_file(text, 'temp3.wav')eng.runAndWait()eng.endLoop()async def _tts_worker(self, text: str) -> None:"""异步TTS处理核心"""tmp_filename = None#with open('audio1.raw','rb') as chunkf:# data=chunkf.read()# secdd=len(data)/48000# self.sending_audio=int(secdd*10) # await self._async_write_socket(data)# #await asyncio.sleep(secdd)# print (secdd,len(data) ) # 创建临时文件with tempfile.NamedTemporaryFile(delete=False) as tmp:tmp_filename = tmp.name# # 同步TTS操作转异步执行loop = asyncio.get_running_loop()await loop.run_in_executor(None, self._sync_tts, *(text, 'temp3.wav',))# 转换音频格式# await asyncio.sleep(1.3)# self._sync_tts(text,tmp_filename)try: proc = await asyncio.create_subprocess_exec('ffmpeg','-hide_banner','-loglevel', 'error','-y','-i', 'temp3.wav', # 输入文件路径'-f', 's16le', # 强制输出格式为PCM s16le'-acodec', 'pcm_s16le', # 明确指定音频编解码器 👈 关键修复'-ar', str(self.sample_rate),'-ac', str(self.channels),'-', # 输出到标准输出stdout=asyncio.subprocess.PIPE
)# 流式发送音频数据sum=0while chunk := await proc.stdout.read(4096):sum+=len(chunk)await self._async_write_socket(chunk)self.sending_audio=int(sum*10/48000) print("write data x0.1s:",self.sending_audio)finally:if tmp_filename and os.path.exists(tmp_filename):1# os.unlink(tmp_filename)async def _input_handler(self) -> None:"""异步输入处理"""while self.running:try:text = await ainput("请输入文本(输入q退出): ")if text.lower() == 'q':self.running = Falsebreakif text.strip():await self._tts_worker(text)except Exception as e:print(f"输入错误: {str(e)}")async def run(self) -> None:"""主运行循环"""self.running = True# #await self._start_ffmpeg()tasks = [asyncio.create_task(self._async_create_socket()),asyncio.create_task( self._start_ffmpeg()),asyncio.create_task(self._input_handler()),asyncio.create_task(self._heartbeat()),]await asyncio.gather(*tasks)# 以下保持不变...
if __name__ == "__main__":controller = AsyncTTSController()try:asyncio.run(controller.run())except KeyboardInterrupt:asyncio.run(controller.stop())
"""
ffmpeg -y -i temp.wav -f s16le -acodec pcm_s16le -ar 24000 -ac 1 audio.raw
ffmpeg -ar 24000 -ac 1 -f s16le -i unix:/tmp/tts_audio.sock -f rtsp rtsp://localhost:8554/mystream
"""
- window10系统python10 可运行版本
主要让deepseek,执行了,socket 到 win32pipen的替换.因为本来就是换过去的.这一块的代码完全没有手工介入. 唯一改的是注释eng.endLoop(),并不用每次init() ,应改是pyttsx3的一个跨平台特性. ,异步的win32下支持稳定.
def _sync_tts(self, text, tmp_filename):eng = self.engine #pyttsx3.init()eng.save_to_file(text, 'temp3.wav')eng.runAndWait()# eng.endLoop()
main-win.py
import asyncio
import struct
import pyttsx3
import tempfile
import os
from aioconsole import ainput
from contextlib import suppress
from typing import Optional
import win32pipe
import win32file
import pywintypesclass AsyncTTSController:def __init__(self):# 使用Windows命名管道self.pipe_name = r'\\.\pipe\tts_audio_pipe'self.pipe_handle = None# 进程控制self.ffmpeg_process: Optional[asyncio.subprocess.Process] = Noneself.running = False# TTS引擎self.engine = pyttsx3.init()self.engine.setProperty('rate', 180)self.engine.setProperty('volume', 1.0)# 音频参数self.sample_rate = 24000self.channels = 1self.bits_per_sample = 16self.silence = self._generate_silence(0.2)self.wav_header = self._generate_wav_header()# 状态管理self.connection_active = Falseself.last_heartbeat = 0.0self.heartbeat_interval = 2.0self.sending_audio = 0def _generate_wav_header(self) -> bytes:"""生成WAV文件头"""byte_rate = self.sample_rate * self.channels * self.bits_per_sample // 8block_align = self.channels * self.bits_per_sample // 8return struct.pack('<4sI4s4sIHHIIHH4sI',b'RIFF', 36, b'WAVE', b'fmt ', 16, 1, self.channels,self.sample_rate, byte_rate, block_align, self.bits_per_sample,b'data', 0)def _generate_silence(self, duration: float) -> bytes:"""生成静音数据"""samples = int(self.sample_rate * duration)return bytes(samples * self.channels * (self.bits_per_sample // 8))async def _async_create_pipe(self) -> None:"""创建命名管道"""while self.running and not self.connection_active:try:# 创建命名管道self.pipe_handle = win32pipe.CreateNamedPipe(self.pipe_name,win32pipe.PIPE_ACCESS_DUPLEX,win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE | win32pipe.PIPE_WAIT,1, # 最大实例数65536, 65536, # 输入输出缓冲区大小0, # 默认超时None # 安全属性)# 异步等待连接loop = asyncio.get_running_loop()await loop.run_in_executor(None, win32pipe.ConnectNamedPipe, self.pipe_handle, None)self.connection_active = Trueprint("客户端已连接")await self._async_write_socket(self.wav_header)except pywintypes.error as e:if e.winerror == 536: # ERROR_PIPE_CONNECTEDself.connection_active = Trueprint("客户端已连接")elif e.winerror == 232: # 客户端断开print("客户端断开连接")self.connection_active = Falseif self.pipe_handle:win32file.CloseHandle(self.pipe_handle)self.pipe_handle = Noneawait asyncio.sleep(1)else:print(f"管道错误: {e}")await asyncio.sleep(1)except Exception as e:print(f"其他错误: {e}")await asyncio.sleep(1)async def _start_ffmpeg(self) -> None:"""启动FFmpeg进程"""with suppress(Exception):if self.ffmpeg_process:self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()self.ffmpeg_process = await asyncio.create_subprocess_exec('ffmpeg','-f', 's16le','-ar', str(self.sample_rate),'-ac', str(self.channels),'-i', self.pipe_name,'-c:a', 'aac','-f', 'rtsp','-rtsp_transport', 'tcp','rtsp://localhost:8554/mystream',stdout=asyncio.subprocess.DEVNULL,stdin=asyncio.subprocess.DEVNULL,stderr=asyncio.subprocess.PIPE)asyncio.create_task(self._monitor_ffmpeg_errors())async def _monitor_ffmpeg_errors(self) -> None:"""监控FFmpeg错误输出"""while self.running and self.ffmpeg_process:line = await self.ffmpeg_process.stderr.readline()if not line:break# print(f"[FFmpeg Error] {line.decode().strip()}")async def _async_write_socket(self, data: bytes) -> None:"""安全写入管道"""try:if self.connection_active and self.pipe_handle:loop = asyncio.get_running_loop()await loop.run_in_executor(None, win32file.WriteFile, self.pipe_handle, data)except pywintypes.error as e:print(f"写入错误: {e}")self.connection_active = Falseawait self._reconnect_pipeline()except Exception as e:print(f"其他写入错误: {e}")self.connection_active = Falseasync def _reconnect_pipeline(self) -> None:"""完整重连流程"""print("启动重连流程...")self.connection_active = Falseif self.pipe_handle:win32file.CloseHandle(self.pipe_handle)self.pipe_handle = Noneawait asyncio.gather(self._async_create_pipe(),self._start_ffmpeg())async def _heartbeat(self) -> None:"""心跳维持机制"""while self.running:if self.connection_active:for i in range(10):if self.sending_audio < 0:await self._async_write_socket(self.silence)else:self.sending_audio -= 2await asyncio.sleep(0.2)else:await asyncio.sleep(0.5)def _sync_tts(self, text, tmp_filename):eng = pyttsx3.init()eng.save_to_file(text, 'temp3.wav')eng.runAndWait()# eng.endLoop()async def _tts_worker(self, text: str) -> None:"""异步TTS处理核心"""await asyncio.get_event_loop().run_in_executor(None, self._sync_tts, text, 'temp3.wav')try:proc = await asyncio.create_subprocess_exec('ffmpeg','-hide_banner','-loglevel', 'error','-y','-i', 'temp3.wav','-f', 's16le','-acodec', 'pcm_s16le','-ar', str(self.sample_rate),'-ac', str(self.channels),'-',stdout=asyncio.subprocess.PIPE)sum_bytes = 0while chunk := await proc.stdout.read(4096):sum_bytes += len(chunk)await self._async_write_socket(chunk)self.sending_audio = int(sum_bytes * 10 / 48000)print(f"写入数据 x0.1s: {self.sending_audio}")finally:if os.path.exists('temp3.wav'):os.remove('temp3.wav')async def _input_handler(self) -> None:"""异步输入处理"""while self.running:try:text = await ainput("请输入文本(输入q退出): ")if text.lower() == 'q':self.running = Falsebreakif text.strip():await self._tts_worker(text)except Exception as e:print(f"输入错误: {str(e)}")async def run(self) -> None:"""主运行循环"""self.running = Truetasks = [asyncio.create_task(self._async_create_pipe()),asyncio.create_task(self._start_ffmpeg()),asyncio.create_task(self._input_handler()),asyncio.create_task(self._heartbeat()),]await asyncio.gather(*tasks)async def stop(self) -> None:"""安全关闭"""self.running = Falsewith suppress(Exception):if self.ffmpeg_process:self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()if self.pipe_handle:win32pipe.DisconnectNamedPipe(self.pipe_handle)win32file.CloseHandle(self.pipe_handle)print("所有资源已释放")if __name__ == "__main__":controller = AsyncTTSController()try:asyncio.run(controller.run())except KeyboardInterrupt:asyncio.run(controller.stop())
独立的ffmpeg启动和监控的独立代码
验证了一下rtsp断线重建连结,也验证了 上面 的main.py的socket server退出后,ffmpeg自动重启连接。 要使用这个更稳健球的程序,需要
注释main.py run中的asyncio.create_task( self._start_ffmpeg()),
代码不用修改,把管道名这个快, 彻底修改为,ffmpeg认识的window样式,就可运行.
r’\.\pipe\tts_audio_pipe’
ffmg,py
import asyncio
from contextlib import suppress
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')class FFmpegManager:def __init__(self):self.ffmpeg_process = Noneself._retry_count = 0self._max_retries = 5self._retry_lock = asyncio.Lock()self._is_running = Falseself.sample_rate=24000self.channels=1self.socket_path = "/tmp/tts_audio.sock"async def _start_ffmpeg(self) -> None:"""带自动重试的FFmpeg启动函数"""async with self._retry_lock:await self._safe_terminate()try:socketid = 'unix:' + self.socket_pathself.ffmpeg_process =await asyncio.create_subprocess_exec('ffmpeg','-f', 's16le','-ar', str(self.sample_rate),'-ac', str(self.channels),'-i', socketid, # 修改输入源为套接字路径'-c:a', 'aac','-f', 'rtsp','-rtsp_transport', 'tcp','rtsp://localhost:8554/mystream',stdout=asyncio.subprocess.DEVNULL,stdin=asyncio.subprocess.DEVNULL,stderr=asyncio.subprocess.PIPE)self._retry_count = 0 # 重置重试计数器asyncio.create_task(self._monitor_ffmpeg_errors())self._is_running = Trueexcept Exception as e:logging.error(f"FFmpeg启动失败: {str(e)}")await self._handle_retry()async def _monitor_ffmpeg_errors(self):"""增强型进程监控"""while self._is_running:logging.info("loop error cathch")stderr = await self.ffmpeg_process.stderr.readline()if stderr:logging.error(f"FFmpeg错误输出: {stderr.decode().strip()}")# 检测进程状态return_code = self.ffmpeg_process.returncodeif return_code is not None:logging.warning(f"FFmpeg异常退出,返回码: {return_code}")self._is_running = Falseawait self._handle_retry()breakasync def _handle_retry(self):"""智能重试策略"""if self._retry_count >= self._max_retries:logging.critical("达到最大重试次数,放弃重启")return# 指数退避算法delay = min(2 ** self._retry_count, 30) # 最大间隔30秒self._retry_count += 1logging.info(f"将在 {delay} 秒后尝试第 {self._retry_count} 次重启")await asyncio.sleep(delay)await self._start_ffmpeg()async def _safe_terminate(self):"""安全终止现有进程"""if self.ffmpeg_process:with suppress(Exception):self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()self.ffmpeg_process = None
# 以下保持不变...
async def main():controller=FFmpegManager()try:await controller._start_ffmpeg()logging.info('rung')await asyncio.sleep(1160)except KeyboardInterrupt:logging.info(3)asyncio.run(controller._safe_terminate())
if __name__ == "__main__":asyncio.run(main())