大家好,我是烤鸭:
最近在尝试做视频的质量分析,打算利用asr针对声音判断是否有人声,以及识别出来的文本进行进一步操作。asr看了几个开源的,最终选择了openai的whisper,后来发现性能不行,又换了whisperX。这是一篇实战和代码为主的文章。
引言
OpenAI的Whisper是一款强大的自动语音识别(ASR)模型,它支持多语种识别,包括中文,且经过大量的多语言和多任务监督数据训练,具有出色的鲁棒性和准确性。Python作为一种功能强大的编程语言,其丰富的库和简洁的语法使其成为实现语音识别功能的理想选择。本文将介绍如何利用Python集成Whisper,实现高效的语音识别。
目前一天小千的视频调用,平均时长3分钟。显卡是4090,平均识别耗时30s以内,业务无压力。
Whisper模型简介
Whisper是一个开源的语音识别模型,它基于Transformer架构,通过从网络上收集的680,000小时多语言数据进行训练,能够实现对多种语言的准确识别。此外,该模型对口音、背景噪音和技术语言具有很好的鲁棒性,使得其在实际应用中具有广泛的应用前景。
WhisperX 地址:
https://github.com/m-bain/whisperX
安装环境
linux
显卡是 4090
cuda pytorch
ffmpeg
python 需要的依赖
pip install --no-cache-dir flask -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir ffmpeg-python -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir wheel -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir zhconv -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir numpy -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir openai-whisper -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir kafka-python -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir fastapi -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir uvicorn -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir psutil -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir gputil -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir requests -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir use-nacos -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir pyyaml -i https://mirrors.aliyun.com/pypi/simple
pip install --no-cache-dir rocketmq-client-python -i https://mirrors.aliyun.com/pypi/simple
预期的功能
我想实现的是单台机器性能打满,并行识别asr,接口可以无限制接收请求,异步返回结果。
接口层
使用的是 fastapi 框架
import concurrent.futures
import os
import timeimport ffmpeg
import platform
import uvicorn
import asyncio
import psutil
from fastapi import FastAPI, BackgroundTasks, HTTPException, status, Query
from fastapi.responses import JSONResponse
import GPUtil
import requests
import jsonfrom dict_time import TimedMap
from parse_video_param import VideoRequest
from parse_video_callback_param import VideoCallbackRequest
from api_result import ApiResult
from whisper_processor import video_process
from whisperX_processor20241119 import video_process_whisperX
from logging_config import KAFKA_LOGGER
from nacos_config20241119 import register_nacosapp = FastAPI()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) # 线程池
# 定义CPU使用率阈值
threshold_cpu_usage = 95 # 例如,你希望CPU使用率不超过95%
threshold_gpu_usage_MB = 2400 # 例如,你希望显存使用大小 MB
timed_map = TimedMap()@app.post("/xxxx-video/whisperx")
async def parse_video(request: VideoRequest, background_tasks: BackgroundTasks):if not request or not request.path:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No video URL provided")print(f"parse_video, params:{request}")# 将处理任务添加到后台任务中,以便不阻塞主线程background_tasks.add_task(process_video_whisperx, request, background_tasks)# 立即返回处理中响应,告诉客户端请求已经被接收并正在处理api_result = ApiResult(1, "success", "", "")return JSONResponse(api_result.to_dict(), status_code=status.HTTP_200_OK)
# 异步函数来下载和处理视频
async def process_video_whisperx(request: VideoRequest, background_tasks: BackgroundTasks):def sync_process_video_whisperx(request):text = ''try:# 记录方法耗时start_time_single = time.time()# 下载视频并保存到临时文件url = request.pathchunk_size = request.chunk_size# 如果当前cpu使用率超过80%,就把该数据重新加到任务里# 获取当前CPU使用率cpu_usage = psutil.cpu_percent(interval=1, percpu=False)print(f"当前cpu利用率:{cpu_usage}")KAFKA_LOGGER.info(f"当前cpu利用率:{cpu_usage}")# 获取所有GPU的信息gpus = GPUtil.getGPUs()isGpuSuffiencent = True# 判断CPU使用率是否达到阈值if cpu_usage <= threshold_cpu_usage or isGpuSuffiencent:# 解析音频地址wavPath = getWav(url)print(f"mp3 url={wavPath}")# 不存在再去生成# 异步处理方法,解析音频这块可以忽略,也可以直接用视频地址if(not os.path.exists(wavPath)):(ffmpeg.input(url).output(wavPath, acodec='mp3').global_args('-loglevel', 'quiet').run())# 使用whisper处理音频text = process_audio_with_whisperx(wavPath, chunk_size)end_time_single = time.time()# 创建任务并添加到事件循环中,通知业务方asyncio.run(callback_task(request, text))print(f"视频地址:{url}, 函数执行耗时: {end_time_single - start_time_single}秒")KAFKA_LOGGER.info(f"视频地址:{url}, 函数执行耗时: {end_time_single - start_time_single}秒")# 清理临时文件os.remove(wavPath)else:print(f"当前cpu已超限,该视频重新加入队列:{url}")KAFKA_LOGGER.info(f"当前cpu已超限,该视频重新加入队列:{url}")# 暂停5秒time.sleep(5)# 重新加到队列里# 将处理任务添加到后台任务中,以便不阻塞主线程background_tasks.add_task(process_video_whisperx, request, background_tasks)except Exception as ex:print(f"sync_process_video error: {str(ex)}")KAFKA_LOGGER.error(f"sync_process_video error: {ex}")return textloop = asyncio.get_running_loop()# 使用线程池运行同步函数,避免阻塞异步事件循环return await loop.run_in_executor(executor, sync_process_video_whisperx, request)
# 获取文件路径
def getWav(input_video):try:# 判断系统是windows还是linuxoperating_system = platform.system()# 判断操作系统类型if operating_system == 'Windows':print("当前系统是Windows")audio_path = "C:\\Users\\xxx\\Downloads\\"else :audio_path = "/tmp/"# 从原始路径中获取文件名filename = os.path.basename(input_video)# 生成新文件的完整路径filename_without_extension = os.path.splitext(filename)[0]# 使用ffmpeg-python提取音频new_filename = os.path.join(audio_path, filename_without_extension) + ".mp3"except Exception as ex1:print("getWav ex:", str(ex1))return new_filename
# 音频解析
def process_audio_with_whisperx(audio_file_path: str, chunk_size: int) -> str:text = video_process_whisperX(audio_file_path, chunk_size)return text
# 异步回调
async def callback_task(request: VideoRequest, text: str):# 创建任务并添加到事件循环中task = asyncio.create_task(callback(request, text))# 等待任务完成await task
# 回调请求方法
async def callback(request: VideoRequest, text: str):# 目标URLurl = request.callback_url# JSON格式的参数data = {'id': request.id,'text': text,# 添加更多键值对...}# 设置一些键值对timed_map.set(request.path, json.dumps(data), timeout=1800)# 设置请求头,告诉服务器我们发送的是JSON数据headers = {'Content-Type': 'application/json'}# 设置超时时间,这里设置为5秒timeout = 5.0# 发送POST请求response = requests.post(url, data=json.dumps(data), headers=headers, timeout=timeout)print(f"url:{url},data: {json.dumps(data)},headers:{headers},response:{response}")# 检查请求是否成功if response.status_code == 200:# 请求成功,处理响应内容print("请求成功")print(response.json()) # 如果响应内容是JSON格式,可以直接解析else:# 请求失败,打印错误信息print(f"请求失败,状态码:{response.status_code}")print(response.text) # 打印响应的文本内容
# 启动应用
if __name__ == "__main__":register_nacos()uvicorn.run(app, host="0.0.0.0", port=5000)
whisperX
import whisperx
from whisperx.asr import FasterWhisperPipeline
import time
import torch
import gc
import osENV = os.environ.get('ENV', 'development')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
if ENV == 'production':batch_size = 16compute_type = "float16"model_name = "large-v2"
else:# reduce if low on GPU membatch_size = 4# compute_type = "float16" # change to "int8" if low on GPU mem (may reduce accuracy)# change to "int8" if low on GPU mem (may reduce accuracy)compute_type = "int8"model_name = "medium"
class WhisperXProcessor:fast_model: FasterWhisperPipelinedef loadModel(self):# 1. Transcribe with original whisper (batched)self.fast_model = whisperx.load_model("medium", device.type, compute_type=compute_type)print("模型加载完成")def asr(self, filePath: str, chunk_size: int):print(f'asr start filePath:{filePath}')start = time.time()audio = whisperx.load_audio(filePath)result = self.fast_model.transcribe(audio, batch_size=batch_size, chunk_size = chunk_size)print(result)end = time.time()print('识别使用的时间:', end - start, 's')torch.cuda.empty_cache()gc.collect()return resultdef video_process_whisperX(audio_path, chunk_size):app = WhisperXProcessor()app.loadModel()text = app.asr(audio_path, chunk_size)return text
结果验证
发送请求
curl -XPOST 'http://localhost:5000/xxxx-video/whisperX' -H 'Content-Type: application/json' -d '{"id":1,"path":"https://vc16-bd1-pl-agv.autohome.com.cn/video-26/0A33363922E51BDE/2025-02-10/FC68CC971BB8B9A46F15C4841F4F2CE2-200-wm.mp4?key=F77E8D3251C4560FA47E36563A5D5668&time=1739187850","callback_url":"http://localhost:8088/xxx/demo/testParseVideo"}'
结果日志,2分钟的视频,大概用了60s。
文章参考
ASR强力模型「Whisper」:解密Whisper
Python实现语音识别(whisperX)