目录
一、代码示例
二、执行说明
(一) 调用任务执行接口
(二) 监控任务进度
实现功能:
- 注册后台任务(如:邮件发送、文件处理等异步场景,不影响接口返回)
- 监控后台任务执行进度(进度条功能)
- 支持根据任务ID查询对应任务进度
一、代码示例
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import random
import asynciofrom typing import List, Dict
from fastapi import FastAPI, BackgroundTasks, WebSocketapp = FastAPI()# 用于存储连接的 WebSocket 实例
connected_websockets: Dict[int, List[WebSocket]] = {}@app.websocket("/ws/{task_id}/")
async def websocket_endpoint(websocket: WebSocket, task_id: int):"""WebSocket路由,用于接收任务进度"""await websocket.accept()connected_websockets.setdefault(task_id, []).append(websocket)try:while True:await websocket.receive_text()except:connected_websockets[task_id].remove(websocket)@app.post("/task/{task_id}/")
async def start_task(background_tasks: BackgroundTasks, task_id: int):"""注册后台任务"""background_tasks.add_task(process_task, task_id=task_id)return {"task_id": task_id}async def process_task(task_id):"""处理任务的后台任务"""progress = 0while progress < 100:await asyncio.sleep(1)progress += random.randint(1, 10)progress = min(progress, 100)for ws in connected_websockets[task_id]:await ws.send_json({"task_id": task_id, "progress": progress})await asyncio.sleep(1)# 启动应用
if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
二、执行说明
(一) 调用任务执行接口
- 启动服务后,访问:http://127.0.0.1:8000/docs
POST
请求:http://127.0.0.1:8000/task/1/,指定任务ID为1
(二) 监控任务进度
- 安装
websocket
请求工具:npm install -g wscat
- 终端输入
wscat -c ws://127.0.0.1:8000/ws/1/
,监控任务ID为1
的执行进度