文章目录
- 前言
- 服务框架结构
- app.py
- run_app_server.sh
- routers/upload.py
- routers/query_result.py
- utils/utils.py
- utils/tasks.py
- 总结
前言
本文讲述了如何使用fastapi搭建一个属于自己的服务,整个服务使用fastapi框架搭建,celery管理任务队列,slowapi限制请求的频率,主要技术点就是这么几个,其实实践起来也是很简单的,那么我们就开始吧!
服务框架结构
首先给出整个项目的目录结构如下,并对各个目录、文件进行讲解:
app/
├── app.py
├── run_app_server.sh
├── routers/
│ ├── main_router.py
│ └── result_router.py
└── utils/
├── tasks.py
└── utils.py
根目录: app/ 是根目录,包含核心文件和子目录
app.py:主程序入口文件。
run_app_server.sh:运行服务的脚本。
子目录:
routers/:放置路由相关的模块。main_router.py:主路由逻辑。result_router.py:查询结果相关路由。
utils/:放置工具类模块。tasks.py:主要任务处理模块。utils.py:其他通用函数模块。
这种组织结构便于扩展,文件划分清晰,适合中小型 Python 项目。
app.py
首先是app.py,它是基于 FastAPI 构建的 Web 服务的主程序入口。以下是它的代码:
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from routers import upload, query_result
from slowapi.errors import RateLimitExceededapp = FastAPI()
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)# 注册路由
app.include_router(upload.router, tags=["Main Feature"])
app.include_router(query_result.router, tags=["Result Query"])# 根路由,测试服务是否正常运行
@app.get("/")
@limiter.limit("5/second") # 设置访问限制
async def root(request: Request):return {"message": "Welcome to My Service"}
初始化 FastAPI 应用:
创建一个 FastAPI 应用实例:app = FastAPI(),这是整个服务的核心对象,用于管理路由、请求、响应以及中间件等。
集成访问限制(Rate Limiting)
使用 slowapi 库和 Limiter 实现对客户端请求的访问频率限制,防止滥用或过载。
Limiter 的配置:key_func=get_remote_address:通过客户端 IP 地址(或远程地址)来判断访问来源。app.state.limiter = limiter:将访问限制功能挂载到应用状态中。
异常处理:注册 RateLimitExceeded 异常处理器,确保当请求超出限制时返回错误信息。
注册路由:
通过 app.include_router() 注册不同功能模块的路由。upload.router:处理与上传和转录相关的逻辑。query_result.router:处理查询结果的相关逻辑。
使用 tags 为路由分组(便于生成文档时分类)。
根路由:
定义根路径 / 的处理逻辑,用于测试服务是否正常运行。
使用访问限制 @limiter.limit("5/second"),确保每秒最多允许 5 次访问。
返回的 JSON 响应:{"message": "Welcome to the Non-real-time Speech Transcription Service"}
run_app_server.sh
这个脚本用于启动基于 FastAPI 的 Web 服务应用,以下是脚本的功能解析和作用:
#!/bin/bash
# run_app_server.sh# 启动 FastAPI 应用
gunicorn -w 4 -k uvicorn.workers.UvicornWorker app:app --bind 0.0.0.0:6022 --timeout 3000 --reload
Gunicorn 配置:
gunicorn:Python 的一个高性能 WSGI HTTP 服务器,用于运行 Web 应用,能够很好地管理并发请求。
-w 4:指定使用 4 个工作进程,适合利用多核 CPU 的能力来提升性能,主要作用是提升服务的并发量。
-k uvicorn.workers.UvicornWorker:设置 Gunicorn 使用 Uvicorn 的 Worker,专门支持 ASGI 框架(如 FastAPI)。
app:app:第一个 app:表示 Python 文件名(app.py)。第二个 app:表示 FastAPI 实例对象名(在 app.py 中定义的 app 对象)。
--bind 0.0.0.0:6022:指定服务绑定的 IP 地址和端口:0.0.0.0:监听所有网卡接口上的请求。6022:监听的端口号。
--timeout 3000:设置超时时间为 3000 秒,适合处理可能耗时较长的任务(例如文件上传或复杂计算)。
--reload:开启代码热重载,适合开发环境,代码变更后服务会自动重启,正式环境一般不设置这个参数。
如何运行:
# 赋予脚本执行权限(如果未设置权限):
chmod +x run_app_server.sh#运行脚本:
./run_app_server.sh
优势:
使用 Gunicorn + Uvicorn Worker 提供高效的 ASGI 服务。
多工作进程(-w)提升服务并发能力。
可扩展性强,支持多种部署场景(本地或生产环境)。
routers/upload.py
这个脚本是一个基于 FastAPI 的路由模块,这里以音频转写任务为例子,提供一个音频文件上传和转写任务的 API(/upload)。它包含了后台任务管理、参数校验、Redis 数据存储、访问限制等功能。
from fastapi import APIRouter, BackgroundTasks, HTTPException, File, UploadFile, Form, Depends, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
import uuidfrom utils.utils import save_info_to_redis, save_audio, estimate_time, validate_params
from utils.tasks import process_transcription_taskrouter = APIRouter()# 初始化 Limiter
limiter = Limiter(key_func=get_remote_address)@router.post("/upload")
@limiter.limit("20/second") # 设置访问频率限制
async def upload_transcription(request: Request,params: dict = Depends(validate_params),
):# 生成唯一的order_idorder_id = str(uuid.uuid1())params['order_id'] = order_id# 保存音频audio_path = save_audio(order_id, params['audio_bytes'])params['audio_path'] = audio_path# 预估转写时间task_estimate_time = estimate_time(params.get('roleType'), params.get('duration'))params['taskEstimateTime'] = task_estimate_time# 预设转写结果和状态params['transcribe_result'] = Noneparams['status'] = 0 # 0表示处理中,1表示已完成,-1表示失败params['code'] = '100024'params['descInfo'] = 'Transcription in progress, please check results later.'# 将本次订单的相关信息存入redis数据库save_info_to_redis(params)# 添加后台转写任务process_transcription_task.delay(**params)return {"code": "000000","descInfo": "Success","content": {"orderId": params.get('order_id'), "taskEstimateTime": round(task_estimate_time, 2)}}
模块初始化
router = APIRouter()创建了一个 APIRouter 实例,用于定义该模块的路由。后续的路由(如 /upload)都会注册到该路由器上,可以集成到主应用中。limiter = Limiter(key_func=get_remote_address)初始化 访问限制器,通过客户端 IP 地址(get_remote_address)限制每秒请求频率,保护服务免受高频访问的影响。
/upload 路由
@router.post("/upload")
@limiter.limit("20/second")定义一个 POST 请求的路由 /upload。设置访问限制:每秒最多允许 20 次请求。
API 功能逻辑
参数校验与解析params: dict = Depends(validate_params)使用 FastAPI 的依赖注入机制,通过 validate_params 函数对请求参数进行校验。params 是解析后的字典,包含上传音频任务所需的关键信息(如角色类型、音频时长、音频字节流等)。
生成订单 ID
order_id = str(uuid.uuid1())
params['order_id'] = order_id为当前任务生成一个唯一的订单 ID,用于任务跟踪。
保存音频文件
audio_path = save_audio(order_id, params['audio_bytes'])
params['audio_path'] = audio_path调用 save_audio 函数保存音频文件,并将文件路径存入 params。
预估转写时间
task_estimate_time = estimate_time(params.get('roleType'), params.get('duration'))
params['taskEstimateTime'] = task_estimate_time根据角色类型(roleType)和音频时长(duration)估算转写任务的执行时间。
初始化任务状态
params['transcribe_result'] = None
params['status'] = 0
params['code'] = '100024'
params['descInfo'] = 'Transcription in progress, please check results later.'设置任务的初始状态为处理中(status = 0),转写结果为空。
存储任务信息到 Redis
save_info_to_redis(params)将任务的所有信息存入 Redis,以便后续查询和更新。
后台异步任务
process_transcription_task.delay(**params)调用 Celery 的 delay() 方法,将任务提交到后台队列中,异步处理转写逻辑。
返回响应
return {"code": "000000","descInfo": "Success","content": {"orderId": params.get('order_id'), "taskEstimateTime": round(task_estimate_time, 2)}
}返回任务提交成功的响应,包含订单 ID 和预估的任务时间。
功能总结
路由功能:提供音频文件上传接口。
访问限制:限制请求频率(每秒 20 次),保障服务稳定性。
任务管理:校验参数,生成任务唯一标识。保存音频文件并估算处理时间。初始化任务状态并存储到 Redis。提交转写任务到后台(使用 Celery 或类似的任务队列)。
响应信息:返回成功的订单 ID 和任务估算时间。
使用场景
该路由可用于 非实时语音转写服务,适合以下场景:批量上传音频文件:需要排队处理任务。异步任务管理:用户提交后可稍后查询转写结果。高并发环境:通过限流机制和后台任务队列,防止服务过载。
routers/query_result.py
这个脚本是一个基于 FastAPI 的路由模块,提供了查询转写任务结果的 API(/getResult)。该模块通过 Redis 存储任务数据,并根据任务状态返回相应信息。
from fastapi import APIRouter, HTTPException, Form, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
import redisfrom utils.utils import raise_http_exceptionrouter = APIRouter()# Redis客户端模拟
redis_client = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)# 初始化 Limiter
limiter = Limiter(key_func=get_remote_address)@router.post("/getResult")
@limiter.limit("50/second") # 设置访问限制
async def get_result(request: Request, orderId: str = Form(...)):# 检验工单是否存在if not redis_client.exists(orderId):raise_http_exception(404, "Order ID unknown", '100020')# 模拟获取转录结果task_data = redis_client.hgetall(orderId)orderResult = {"lattice": task_data.get('transcribe_result'),}# 如果status为0,说明工单还在处理中,返回timeRequired字段orderInfo = {"status": task_data.get('status'),"orderId": task_data.get('order_id'),"originalDuration": task_data.get('duration'),"timeRequired": 1000,}if task_data.get('status') != 0:del orderInfo['timeRequired']return {"code": task_data.get('code'),"descInfo": task_data.get('descInfo'),"content": {"orderResult": orderResult, "orderInfo": orderInfo}}
模块初始化
router = APIRouter()创建 APIRouter 实例,用于定义该模块的路由。redis_client = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)初始化 Redis 客户端,连接到 Redis 服务的 db=1 数据库。decode_responses=True:保证从 Redis 获取的数据是字符串格式。limiter = Limiter(key_func=get_remote_address)初始化访问限制器,通过 get_remote_address 以客户端 IP 作为请求限制的依据。
/getResult 路由
@router.post("/getResult")
@limiter.limit("50/second")定义一个 POST 请求的路由 /getResult。设置访问限制:每秒最多允许 50 次请求。
API 功能逻辑
参数接收与校验orderId: str = Form(...)从请求的表单数据中接收 orderId 参数。Form(...) 表示该参数为必填项。检查订单是否存在if not redis_client.exists(orderId):raise_http_exception(404, "Order ID unknown", '100020')使用 Redis 的 exists() 方法检查 orderId 是否存在。如果订单 ID 不存在,调用 raise_http_exception 抛出 HTTP 404 异常,并附带错误代码 100020。获取订单数据task_data = redis_client.hgetall(orderId)从 Redis 中获取以哈希表存储的任务数据。返回的数据是一个字典,包含转录结果、状态、订单 ID 等信息。组织返回数据转录结果:orderResult = {"lattice": task_data.get('transcribe_result'),}返回转录结果(可能为空,表示任务未完成)。订单信息:orderInfo = {"status": task_data.get('status'),"orderId": task_data.get('order_id'),"originalDuration": task_data.get('duration'),"timeRequired": 1000,}if task_data.get('status') != 0:del orderInfo['timeRequired']返回任务状态、订单 ID 和音频原始时长。如果任务正在处理中(status=0),返回预估所需时间字段 timeRequired。如果任务已完成或失败,则移除 timeRequired 字段。构建响应return {"code": task_data.get('code'),"descInfo": task_data.get('descInfo'),"content": {"orderResult": orderResult, "orderInfo": orderInfo}}返回包含任务状态码(code)、描述信息(descInfo)、转录结果和订单信息的完整响应。
使用场景
该路由用于查询音频转写任务的执行状态和结果,适合以下场景:任务进度查询:用户提交任务后,通过订单 ID 查询任务是否完成。如果任务还在处理中,返回 timeRequired,提示用户稍后再试。结果获取:如果任务已完成,返回转录结果 lattice。高并发支持:使用 Redis 提供快速查询能力,支持高并发访问。设置访问限制(每秒 50 次),保护服务免受过载。
utils/utils.py
这个脚本提供了一些通用的工具函数和依赖,用于处理非实时语音转写服务的参数验证、Redis 数据操作以及音频存储。它是 FastAPI 应用中的辅助模块,主要用于支持上传接口和其他功能。
from fastapi import APIRouter, BackgroundTasks, HTTPException, File, UploadFile, Form, Depends, Request
from typing import Optional
import redis
import os
import reredis_order_data = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)# 通用函数,简化异常处理逻辑,提高代码可读性和一致性。
def raise_http_exception(status_code: int, desc_info: str, code: str):"""封装 HTTPException 的通用函数"""raise HTTPException(status_code=status_code,detail={"descInfo": desc_info,"code": code,},)# 自定义依赖项函数,用于验证参数,必传参数Pydantic(FastAPI用于数据验证的库)在后台自动验证
async def validate_params(audioFile: UploadFile = File(...),fileName: str = Form(...),fileSize: int = Form(...),duration: int = Form(...),callbackUrl: Optional[str] = Form(None),hotWord: Optional[str] = Form(None),hotWordId: Optional[int] = Form(None),roleType: Optional[int] = Form(0),audioMode: Optional[str] = Form('fileStream'),audioUrl: Optional[str] = Form(None),standardWav: Optional[int] = Form(0),
):# 验证文件名是否符合要求,系统会自动检验必传参数是否传递,不需要手动验证file_root, file_extension = os.path.splitext(fileName)if file_extension not in ['.pcm', '.mp3', '.wav']:raise_http_exception(400, "File name error, only support pcm, wav, mp3", '100001')# 验证文件大小是否符合要求file_size = audioFile.file.seek(0, 2)audioFile.file.seek(0) # 重置文件指针if fileSize != file_size:raise_http_exception(400, "File size error", '100002')# 其他的验证逻辑# 估算转写需要的时间,单位毫秒
def estimate_time(roleType: int, duration: int):pass# 保存音频
def save_audio(order_id, audio_bytes):pass# 保存信息到redis
def save_info_to_redis(params):pass
utils/tasks.py
这个脚本定义了一个基于 Celery 的异步任务,用于处理语音转写请求,并通过 Redis 更新工单状态和结果。代码中集成了 HTTP 请求、Redis 数据库操作以及错误处理逻辑。
from celery import Celery
import httpx
import redis
import time
import jsonredis_order_data = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)
redis_hotWord_data = redis.StrictRedis(host='localhost', port=6379, db=2, decode_responses=True)app = Celery('transcription_tasks', broker='redis://localhost:6379/3') # 没有设置backend,因为不需要返回结果,只需要异步执行任务@app.task
def process_transcription_task(**params):try:# 获取参数,urlurl = "http://ip:port/transcribe"if params.get('hotWordId') is not None:hotword = redis_hotWord_data.get(params.get('hotWordId'))else:hotword = params.get('hotWord')files = {'file': params.get('audio_bytes')}data = {'roleType': params.get('roleType'),'file_extension': params.get('file_extension'),'standardWav': params.get('standardWav'),'hotWord': hotword,}if hotword is None:del data['hotWord']start = time.time()print("++++++++开始转写")with httpx.Client(timeout=httpx.Timeout(float(params['taskEstimateTime'] / 1000) * 2)) as client: # timeout设置为任务预估时间的两倍,防止超时 AsyncClientresponse = client.post(url, data=data, files=files)print(f"++++++++转写耗时:{time.time() - start}")print("++++++++转写成功")# 更新转写工单信息redis_order_data.hset(params.get('order_id'),mapping={"status": 1, "transcribe_result": json.dumps(response.json()), "code": "000000","descInfo": "Success"})except Exception as e:# 处理错误情况print('++++++++发生报错')redis_order_data.hset(params.get('order_id'),mapping={"status": -1, "transcribe_result": json.dumps(None), "code": "100018","descInfo": f"An unexpected error occurred: {e}"})
Celery 初始化及启动
# 初始化
app = Celery('transcription_tasks', broker='redis://localhost:6379/3')任务队列名称:transcription_tasks。消息代理(Broker):使用 Redis(第 3 个数据库)作为任务调度系统的消息队列。未设置 Backend:不需要任务执行的返回结果,只是异步执行操作。# 启动
celery -A utils.tasks worker -l info --concurrency=16concurrency=16表示最大的并发量是16,如果设置为1,当前的服务就是串行执行,只有当前一个任务完成时才能执行下一个任务。utils.tasks是tasks的相对路径,之所以要加上utils是因为要使用相对路径服务才能找到对应的tasks。
异步任务定义
@app.task
def process_transcription_task(**params):任务名称:process_transcription_task。任务目的:处理非实时语音转写,包括文件上传、转写请求和结果存储。任务参数:params 是一个动态的字典,包含上传接口中收集的所有参数信息。
任务流程
1. 热词加载
if params.get('hotWordId') is not None:hotword = redis_hotWord_data.get(params.get('hotWordId'))
else:hotword = params.get('hotWord')优先从 Redis 中根据 hotWordId 加载热词数据。如果 hotWordId 为空,使用用户上传的 hotWord。2. HTTP 请求准备files = {'file': params.get('audio_bytes')}data = {'roleType': params.get('roleType'),'file_extension': params.get('file_extension'),'standardWav': params.get('standardWav'),'hotWord': hotword,}if hotword is None:del data['hotWord']音频文件:通过 audio_bytes 传递。请求体数据:包含 roleType(角色类型)、文件扩展名(file_extension)、是否标准化(standardWav)等信息。如果 hotWord 为空,则从数据中移除对应字段。3. 发送转写请求with httpx.Client(timeout=httpx.Timeout(float(params['taskEstimateTime'] / 1000) * 2)) as client:response = client.post(url, data=data, files=files)HTTP 客户端:使用 httpx.Client 发送 POST 请求。超时时间:设置为预估任务时间的两倍,防止任务因延迟而超时。请求 URL:假设是一个转写服务的接口(http://ip:port/transcribe)。4. 处理成功响应redis_order_data.hset(params.get('order_id'),mapping={"status": 1, "transcribe_result": json.dumps(response.json()), "code": "000000","descInfo": "Success"})更新 Redis 中的工单信息:status:设置为 1,表示任务完成。transcribe_result:存储转写结果(JSON 格式)。code 和 descInfo:标记为成功。5. 错误处理except Exception as e:redis_order_data.hset(params.get('order_id'),mapping={"status": -1, "transcribe_result": json.dumps(None), "code": "100018","descInfo": f"An unexpected error occurred: {e}"})捕获所有异常,更新 Redis:status:设置为 -1,表示任务失败。transcribe_result:结果为空。code 和 descInfo:标记为失败,并记录错误信息。
关键特性
1. 异步任务处理通过 Celery 实现异步任务分发和执行,解耦了任务处理逻辑和主程序。提高系统的并发能力,特别适合需要耗时的任务(如音频转写)。
2. 状态管理使用 Redis 存储工单的状态信息,支持快速读取和更新。工单状态分为:0:处理中。1:处理完成。-1:处理失败。
3. 动态超时时间根据任务预估时间设置超时时间,提高了接口调用的稳定性和灵活性。
总结
该服务以高效、模块化和异步化的架构设计,满足了非实时语音转写的需求,并具有良好的可扩展性和稳定性。