文章目录
- 一、FunASR
- 二、上代码(队列解决线程并发问题)
- 三、测试
一、FunASR
在我的另一个博客有介绍FunASR,并且进行了语者分离,不过最近FunASR自带了语者分离,挺好挺好,但是一直看社区就是大家都用python写,会出现线程不安全问题,群里有大佬说使用多态服务器,感觉很浪费
二、上代码(队列解决线程并发问题)
import osimport uuidimport copyimport jsonimport loggingimport queueimport threadingimport numpy as npimport torchfrom flask import Flaskfrom flask import request, jsonifyfrom modelscope.pipelines import pipelinefrom modelscope.utils.constant import Tasksapp = Flask(__name__)# 创建一个队列pipeline_queue = queue.Queue()# 实例对象的计数器来存储实例的数量created_instances = 0# logging.basicConfig(filename='app.log', level=logging.INFO)# logger = logging.getLogger('info')# # 再创建一个handler,用于输出到控制台# ch = logging.StreamHandler()# ch.setLevel(logging.INFO)# # 定义handler的输出格式# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')# ch.setFormatter(formatter)# # 给logger添加handler# logger.addHandler(ch)def flask_content_type(req) -> dict:"""根据不同的content_type来解析数据"""if req.method == 'POST' or req.method == "PUT":if 'application/x-www-form-urlencoded' == req.content_type or 'form-data' in req.content_type:data = req.formelse: # 无法被解析出来的数据if req.data:data = json.loads(req.data)else:raise Exception('无法解析')elif req.method == 'GET':data = req.argselse:raise Exception('不支持的请求方式')return copy.copy(data)def create_pipelines(num_pipelines):"""@Description:num_pipelines:创建pipeline实例的数量,根据你的显存大小创建,一个实例需要占用2GB"""for _ in range(num_pipelines):global created_instancesinference_pipeline = pipeline(task=Tasks.auto_speech_recognition,model='/root/autodl-tmp/models_from_modelscope/damo/speech_paraformer-large-vad-punc-spk_asr_nat-zh-cn',model_revision='v0.0.2',vad_model='/root/autodl-tmp/models_from_modelscope/damo/speech_fsmn_vad_zh-cn-16k-common-pytorch',punc_model='/root/autodl-tmp/models_from_modelscope/damo/punc_ct-transformer_cn-en-common-vocab471067-large')pipeline_queue.put(inference_pipeline)print("=========成功创建实例=========")# 更新已创建实例的数量created_instances += 1print(f"=====队列现有空闲实例数量为{pipeline_queue.qsize()}========")print(f"=====实例总数量数量为{created_instances}========")print("============================")def create_pipelines_thread(num_pipelines):# 创建线程thread = threading.Thread(target=create_pipelines, args=(num_pipelines,))# 启动线程thread.start()return threaddef default_dump(obj):"""Convert numpy classes to JSON serializable objects."""if isinstance(obj, (np.integer, np.floating, np.bool_)):return obj.item()elif isinstance(obj, np.ndarray):return obj.tolist()else:return obj@app.route('/queue_size', methods=['GET'])def get_queue_size():# 获取队列数量queue_size = pipeline_queue.qsize()# 构建响应response = {'queue_size': queue_size}# 返回响应return jsonify(response), 200@app.route('/created_instances', methods=['GET'])def get_created_instances():# 获取已创建的实例数量response = {'created_instances': created_instances}# 返回响应return jsonify(response), 200@app.route('/add_pipeline/<int:num>', methods=['GET'])def add_pipeline_queue(num):global created_instancesif (created_instances >= 10):return jsonify({'error': f"现有实例数量为{created_instances},无法再添加"})print("=========开始创建实例=========")print(f"=====队列现有空闲实例数量为{pipeline_queue.qsize()}========")thread = create_pipelines_thread(num)# 等待线程结束thread.join()print("=========实例创建结束=========")print(pipeline_queue.qsize())return jsonify({'success': f"队列现有空闲实例数量为{pipeline_queue.qsize()}!现有实例数量为{created_instances}"})@app.route('/', methods=['POST'])def result_test():dates = flask_content_type(request).copy()print(dates)return jsonify({'success': dates})@app.route('/transcribe', methods=['POST'])def transcribe():print("======队列剩余======")print(pipeline_queue.qsize())# 第一步,获取请求体if 'audio_file' in request.files:audio_file = request.files['audio_file']file_ext = os.path.splitext(audio_file.filename)[1]if file_ext.lower() not in ['.wav', '.mp3']:return jsonify({'error': str('Error: Audio file must be either .wav or .mp3')}), 500else:try:# 将音频文件保存到临时文件夹中temp_dir_path = 'temp'if not os.path.exists(temp_dir_path):os.makedirs(temp_dir_path)# 保存上传的临时文件file_extension = os.path.splitext(audio_file.filename)[1]unique_filename = str(uuid.uuid4()) + file_extensiontemp_file_path = os.path.join(temp_dir_path, unique_filename)audio_file.save(temp_file_path)return start_asr(temp_file_path)except Exception as e:return jsonify({'error': str(e)}), 500finally:# 删除临时文件os.remove(temp_file_path)else:dates = flask_content_type(request).copy()return start_asr(dates['file_url'])def start_asr(temp_file_path):import time# 记录开始时间start_time = time.time()inference_pipeline = pipeline_queue.get()# 使用 inference pipeline 进行语音转写asr_result = inference_pipeline(audio_in=temp_file_path, batch_size_token=5000,batch_size_token_threshold_s=40,max_single_segment_time=6000)try:transform = time.time() - start_timeasr_result["transform_time"] = transform# 将语音识别结果转换为json格式result = json.dumps(asr_result, ensure_ascii=False, default=default_dump)return resultexcept Exception as e:print(str(e))# 返回错误信息return jsonify({'error': str(e)}), 500finally:pipeline_queue.put(inference_pipeline)def start_flask_app(port):"""启动 Flask 应用程序并运行在指定端口上在调用 Flask 的 app.run 方法后,应用会进入监听状态,等待客户端发起请求。这意味着应用会一直停留在 app.run() 这一行,不会继续执行后续的 Python 代码。启动 Flask 应用程序前,使用多线程的方式创建:我这里是32G显存所以默认创建10个实例"""print("============================")print("======开始异步创建实例=========")print("============================")try:# 清除显卡torch.cuda.empty_cache()except Exception as e:# 返回错误信息print(e)create_pipelines_thread(5)app.run(port=port)if __name__ == '__main__':start_flask_app(9501)
三、测试
二已经给你完整的示例了,所以我就不测了,我都上生产了,你们自己用postman或者代码试一下把!有问题再联系我18956043585(微信同号)
兄弟们,这是我的解决方案,欢迎交流