直播语音实时转字幕:
基于Whisper的实时直播语音转录或翻译是一项使用OpenAI的Whisper模型实现的技术,它能够实时将直播中的语音内容转录成文本,甚至翻译成另一种语言。这一过程大致分为三个步骤:捕获直播音频流、语音识别(转录)以及翻译(如果需要)。下面详细解释其原理和意义。
原理
-
捕获直播音频流: 首先,需要从直播源捕获音频流。这通常通过软件工具实现,如
ffmpeg
或streamlink
,它们可以接入直播平台(如Twitch、YouTube等)的直播流,并提取音频数据。 -
语音识别(转录): 捕获到的音频流被送入Whisper模型进行语音识别。Whisper是OpenAI开发的一款强大的语音识别模型,它能够准确地将语音转换成文本。该模型训练于多种语言的大量数据集上,因此具有高度的准确性和多语言识别能力。
-
翻译(可选): 如果需要将转录的文本翻译成另一种语言,可以进一步使用机器翻译模型(如OpenAI的GPT、Google Translate等)对转录文本进行翻译。
意义
-
提高可及性: 通过实时转录直播语音,听障人士和不懂直播原语言的观众也能够理解内容,大大提高了直播内容的可及性。
-
内容归档与搜索: 转录生成的文本可以作为直播内容的归档,便于未来搜索和回顾。相比视频数据,文本更容易被搜索引擎索引,从而提高内容的发现性。
-
多语言翻译: 实时翻译可以让不同语言的观众理解和享受直播内容,促进跨语言、跨文化的交流。
-
学习和教育: 对于教育直播,实时转录和翻译能够帮助学生更好地理解教学内容,尤其是对于非母语学习者。
-
内容审核: 转录文本还可以用于自动内容审核,帮助直播平台监控和管理不适宜的内容。
一、部署
下载stream-translator
GitHub - fortypercnt/stream-translator
实战whisper语音识别第一天,部署服务器,可远程访问,实时语音转文字(全部代码和详细部署步骤)-CSDN博客
如果在之前的文章,实战whisper语音识别第一天,部署服务器,配置过环境,可跳过下面安装。
git clone https://github.com/fortypercnt/stream-translator.git
pip install -r requirements.txt
模型下载:
large-v3模型:https://huggingface.co/Systran/faster-whisper-large-v3/tree/main
large-v2模型:https://huggingface.co/guillaumekln/faster-whisper-large-v2/tree/main
large-v2模型:https://huggingface.co/guillaumekln/faster-whisper-large-v1/tree/main
medium模型:https://huggingface.co/guillaumekln/faster-whisper-medium/tree/main
small模型:https://huggingface.co/guillaumekln/faster-whisper-small/tree/main
base模型:https://huggingface.co/guillaumekln/faster-whisper-base/tree/main
tiny模型:https://huggingface.co/guillaumekln/faster-whisper-tiny/tree/main
经测试large-v3模型需要10G显存以上。显存不够的可以用小模型。
使用方法:
python translator.py 直播链接
这个translator.py是进行实时翻译,不想翻译可运行下面代码
二、代码
translator1.py:
import argparse
import sys
import signal
from datetime import datetimeimport ffmpeg
import numpy as np
import whisper
from whisper.audio import SAMPLE_RATEclass RingBuffer:def __init__(self, size):self.size = sizeself.data = []self.full = Falseself.cur = 0def append(self, x):if self.size <= 0:returnif self.full:self.data[self.cur] = xself.cur = (self.cur + 1) % self.sizeelse:self.data.append(x)if len(self.data) == self.size:self.full = Truedef get_all(self):all_data = []for i in range(len(self.data)):idx = (i + self.cur) % self.sizeall_data.append(self.data[idx])return all_datadef clear(self):self.data = []self.full = Falseself.cur = 0def open_stream(stream, direct_url, preferred_quality):if direct_url:try:process = (ffmpeg.input(stream, loglevel="panic").output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE).run_async(pipe_stdout=True))except ffmpeg.Error as e:raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from ereturn process, Noneimport streamlinkimport subprocessimport threadingstream_options = streamlink.streams(stream)if not stream_options:print("No playable streams found on this URL:", stream)sys.exit(0)option = Nonefor quality in [preferred_quality, 'audio_only', 'audio_mp4a', 'audio_opus', 'best']:if quality in stream_options:option = qualitybreakif option is None:# Fallbackoption = next(iter(stream_options.values()))def writer(streamlink_proc, ffmpeg_proc):while (not streamlink_proc.poll()) and (not ffmpeg_proc.poll()):try:chunk = streamlink_proc.stdout.read(1024)ffmpeg_proc.stdin.write(chunk)except (BrokenPipeError, OSError):passcmd = ['streamlink', stream, option, "-O"]streamlink_process = subprocess.Popen(cmd, stdout=subprocess.PIPE)try:ffmpeg_process = (ffmpeg.input("pipe:", loglevel="panic").output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE).run_async(pipe_stdin=True, pipe_stdout=True))except ffmpeg.Error as e:raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from ethread = threading.Thread(target=writer, args=(streamlink_process, ffmpeg_process))thread.start()return ffmpeg_process, streamlink_processdef main(url, model="large-v3", interval=5, preferred_quality="audio_only", direct_url=False, **decode_options):print("Loading model...")model = whisper.load_model(model)print("Opening stream...")ffmpeg_process, _ = open_stream(url, direct_url, preferred_quality)def handler(signum, frame):ffmpeg_process.kill()sys.exit(0)signal.signal(signal.SIGINT, handler)n_bytes = interval * SAMPLE_RATE * 2 # Factor 2 comes from reading the int16 stream as bytesaudio_buffer = RingBuffer(1) # No need for a history buffer since we're just doing real-time transcriptiontry:while True:in_bytes = ffmpeg_process.stdout.read(n_bytes)if not in_bytes:breakaudio = np.frombuffer(in_bytes, np.int16).flatten().astype(np.float32) / 32768.0audio_buffer.append(audio)result = model.transcribe(np.concatenate(audio_buffer.get_all()), **decode_options)print(f'{datetime.now().strftime("%H:%M:%S")} {result["text"]}')audio_buffer.clear() # Clear the buffer after each transcriptionfinally:ffmpeg_process.kill()def cli():parser = argparse.ArgumentParser(description="Real-time audio transcription from streams.")parser.add_argument('URL', type=str, help='Stream website and channel name, e.g. twitch.tv/forsen')parser.add_argument('--model', type=str, default='large-v3', help='Whisper model for transcription.')parser.add_argument('--interval', type=int, default=5, help='Interval between transcription in seconds.')parser.add_argument('--preferred_quality', type=str, default='audio_only', help='Preferred stream quality.')parser.add_argument('--direct_url', action='store_true', help='Pass the URL directly to ffmpeg.')args = parser.parse_args().__dict__url = args.pop("URL")main(url, **args)if __name__ == '__main__':cli()
python translator1.py https://www.huya.com/kpl
虎牙kpl的直播,文字转录:
还有繁体字,修改代码,繁体转简体:
pip install opencc-python-reimplemented
translator2.py:
import argparse
import sys
import signal
from datetime import datetimeimport ffmpeg
import numpy as np
import whisper
from whisper.audio import SAMPLE_RATE
import openccclass RingBuffer:def __init__(self, size):self.size = sizeself.data = []self.full = Falseself.cur = 0def append(self, x):if self.size <= 0:returnif self.full:self.data[self.cur] = xself.cur = (self.cur + 1) % self.sizeelse:self.data.append(x)if len(self.data) == self.size:self.full = Truedef get_all(self):all_data = []for i in range(len(self.data)):idx = (i + self.cur) % self.sizeall_data.append(self.data[idx])return all_datadef clear(self):self.data = []self.full = Falseself.cur = 0def open_stream(stream, direct_url, preferred_quality):if direct_url:try:process = (ffmpeg.input(stream, loglevel="panic").output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE).run_async(pipe_stdout=True))except ffmpeg.Error as e:raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from ereturn process, Noneimport streamlinkimport subprocessimport threadingstream_options = streamlink.streams(stream)if not stream_options:print("No playable streams found on this URL:", stream)sys.exit(0)option = Nonefor quality in [preferred_quality, 'audio_only', 'audio_mp4a', 'audio_opus', 'best']:if quality in stream_options:option = qualitybreakif option is None:# Fallbackoption = next(iter(stream_options.values()))def writer(streamlink_proc, ffmpeg_proc):while (not streamlink_proc.poll()) and (not ffmpeg_proc.poll()):try:chunk = streamlink_proc.stdout.read(1024)ffmpeg_proc.stdin.write(chunk)except (BrokenPipeError, OSError):passcmd = ['streamlink', stream, option, "-O"]streamlink_process = subprocess.Popen(cmd, stdout=subprocess.PIPE)try:ffmpeg_process = (ffmpeg.input("pipe:", loglevel="panic").output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE).run_async(pipe_stdin=True, pipe_stdout=True))except ffmpeg.Error as e:raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from ethread = threading.Thread(target=writer, args=(streamlink_process, ffmpeg_process))thread.start()return ffmpeg_process, streamlink_processdef main(url, model="large-v3", interval=5, preferred_quality="audio_only", direct_url=False, **decode_options):print("Loading model...")model = whisper.load_model(model)print("Opening stream...")ffmpeg_process, _ = open_stream(url, direct_url, preferred_quality)converter = opencc.OpenCC('t2s') # 创建繁体转简体的转换器def handler(signum, frame):ffmpeg_process.kill()sys.exit(0)signal.signal(signal.SIGINT, handler)n_bytes = interval * SAMPLE_RATE * 2 # Factor 2 comes from reading the int16 stream as bytesaudio_buffer = RingBuffer(1)try:while True:in_bytes = ffmpeg_process.stdout.read(n_bytes)if not in_bytes:breakaudio = np.frombuffer(in_bytes, np.int16).flatten().astype(np.float32) / 32768.0audio_buffer.append(audio)result = model.transcribe(np.concatenate(audio_buffer.get_all()), **decode_options)result_text = converter.convert(result["text"]) # 将繁体转换为简体print(f'{datetime.now().strftime("%H:%M:%S")} {result_text}')audio_buffer.clear()finally:ffmpeg_process.kill()def cli():parser = argparse.ArgumentParser(description="Real-time audio transcription from streams.")parser.add_argument('URL', type=str, help='Stream website and channel name, e.g. twitch.tv/forsen')parser.add_argument('--model', type=str, default='large-v3', help='Whisper model for transcription.')parser.add_argument('--interval', type=int, default=5, help='Interval between transcription in seconds.')parser.add_argument('--preferred_quality', type=str, default='audio_only', help='Preferred stream quality.')parser.add_argument('--direct_url', action='store_true', help='Pass the URL directly to ffmpeg.')args = parser.parse_args().__dict__url = args.pop("URL")main(url, **args)if __name__ == '__main__':cli()
python translator2.py https://www.huya.com/kpl