fastapi操作异步和同步请求
- 声明:异步“请求” 和 异步“方法调用” 的区别
- 【关键点】
- 1、同步、异步方法 + 同步阻塞
- 1.1 仅同步请求的并发
- 测试
- 1.2 仅异步请求的并发
- 测试
- 1.3 同步请求 和 异步请求 的并发
- 2、异步方法阻塞的解决方案
- 2.1 使用线程池执行同步阻塞
- 2.2 使用await 异步等待
- 2.2.1 异步方法内部使用同步阻塞
- 2.2.2 异步方法内部使用异步阻塞
声明:异步“请求” 和 异步“方法调用” 的区别
问题:
如果使用uvicorn单进程、单线程启动fastapi服务,其中代码里:
@app.get("/bb")
async def read_item():await long_running_task(4)result = await long_running_task(4)return {"result": result}
这时,/bb的单次请求时,执行await long_running_task(4)任务先等待4s,结束后再次执行long_running_task(4)又是4s等待,总共就是8s,也就是在方法体内的await是串行的。
那当多次请求/aa时:
@app.get("/aa")
async def read_item():result = await long_running_task()return {"result": result}
这时,fastapi 会把2个异步请求分别转化为task,而不是coroutine了,并把task提交到事件循环中,即每个请求都是1个独立的任务
!
事件循环会并发地处理这些任务,因此每个请求的 await long_running_task(4) 调用会并发执行。
因此,每个请求的响应时长是 4 秒,而不是顺序累加成 8 秒。
【关键点】
1、 方法体内多个await :在同一个方法体内,不管有多少个“await ” ,它们都共享同一个任务。
await是并发 or 串行取决于在await执行前,是否存在多个已启动的task。【具体详见: 异步编程下await的理解】
2、多个请求并发处理:当多个请求并发时,每个请求会被包装成一个独立的任务,并提交到事件循环中。事件循环会并发地处理这些任务,因此每个请求的await 调用会并发执行。
1、同步、异步方法 + 同步阻塞
from fastapi import FastAPI, Query, Request, Response
import uvicorn
import time
import sys
import os
import asynciosys.path.append(os.getcwd())
app = FastAPI(title="bbb")
#***************************************1、同步阻塞操作... ***************************************
### def不用async修饰:同步方法+同步阻塞 ###
@app.get("/aa")
def aa():print("aa")time.sleep(3) # 同步阻塞3sprint("aaa")return "aaaaaa"### 异步方法+同步阻塞 ###
@app.get("/bb")
async def bb(): ''''虽然bb用async关键字修饰成了一个异步函数(协程),但该方法内部使用了同步阻塞,所以导致它的异步能力失效,变成同步方法!!'''print("bb")time.sleep(8) # 是一个同步阻塞操作。它会阻塞当前线程,直到指定的秒数过去。print("bbb")return "bbbbbb"#***************************************2、非阻塞操作... ***************************************
### 异步方法 + 无阻塞 ###
@app.get("/hello")
async def hello(con: str = Query(..., description="输入字符串")):print("hello")con = con + "**** 好样的!"return {"message": con}### 同步方法 + 无阻塞 ###
@app.get("/hello2")
def hello2(con: str = Query(..., description="输入字符串")):con = con + "**** 好样的222222!"return {"message": con}if __name__ == "__main__":uvicorn.run(app,host="0.0.0.0",port=9351)
1.1 仅同步请求的并发
fastapi 在处理同步请求时,会把他们放入底层维护的工作线程池中,每个请求由AnyIO worker thread(工作线程)执行。
测试
/aa 和 /hello2 并发:
/aa耗时3.02s, /hello2耗时5ms。
结论:/aa 的
阻塞不会影响
/hello2 的及时返回。
1.2 仅异步请求的并发
fastapi 在处理异步请求时,会把他们放入Main Thread的协程event_loop中。如果一个异步方法里存在同步阻塞,那他会导致后请求的异步方法也被阻塞!
因为所有的异步都在1个event_loop中~~
测试
/bb 和 /hello 并发:
/hello要等/bb执行完才会结束,总体耗时8s。
结论:/bb 的
阻塞会影响
/hello 的及时返回。
1.3 同步请求 和 异步请求 的并发
(1)/aa稍早于/bb
通过调用堆栈可以看到:
/aa即使阻塞也不会阻塞/bb的请求,两者可同步执行方法体!
但是,不知道为什么有时候/aa要等到/bb响应结束也才return, 也就是/aa的总响应时长不总是3s附近,会出现return bbbbbb结束后才收到return aaaaaa,导致/aa的响应时长大于8s!
执行结果:
aa
bb
aaa
bbb
(2)/bb稍早于/aa
通过调用堆栈可以看到:
/bb阻塞会导致/aa不能立即执行,而是要等待/bb完全执行完才会进入/aa的方法体!
执行结果:
bb
bbb
aa
aaa
2、异步方法阻塞的解决方案
2.1 使用线程池执行同步阻塞
from fastapi import FastAPI, Query, Request, Response
import uvicorn
from concurrent.futures import ThreadPoolExecutor
import time
import sys
import os
import asynciosys.path.append(os.getcwd())
app = FastAPI(title="bbb")threadpool=ThreadPoolExecutor(max_workers=3)############## 【ok】放线程池中:不会阻塞主线程~ ##################
@app.get("/ver2")
async def ver2(request:Request):msg=request.query_params.get("msg")loop=asyncio.get_event_loop() # 拿到主线程的事件循环(事件循环可以看做“方法间”的异步)task={"msg":msg}def handle_task():print("task recieved:", task["msg"])result=task["msg"].lower()time.sleep(8)return result# 在主线程的事件循环里等待异步结果,因为事件循环是针对“所有方法间”的,所以是在主线程里result=await loop.run_in_executor(threadpool, handle_task) print("task ends:", result, asyncio.get_event_loop)return Response(result)if __name__ == "__main__":uvicorn.run(app,host="0.0.0.0",port=9351)
2.2 使用await 异步等待
2.2.1 异步方法内部使用同步阻塞
【方法间】:A、B、C、D四个请求在异步执行,遵循单线程下的协程异步;
【方法内】:存在同步阻塞,导致遇到阻塞时不会主动让出线程,必须等待当前方法整体执行完(包括先阻塞结束,再执行该方法体内阻塞后的代码)才会让出线程。
2.2.2 异步方法内部使用异步阻塞
【方法间】:不变
【方法内】:异步阻塞,当某个请求调用执行到异步调用代码(await)时,会主动让出线程,失去抢占线程的资格,由其他异步方法抢占。
(1)抢占到线程的协程(异步请求下的异步方法)继续这种操作,即遇到异步阻塞就让出线程;
(2)一旦某个方法异步阻塞结束,它就会恢复重新抢占线程的资格;
(3)当某个方法体全部执行完后,就独立返回,不受其他未完成的方法影响。
from fastapi import FastAPI
import uvicorn
# from concurrent.futures import ThreadPoolExecutor
import time
import sys
import os
import asynciosys.path.append(os.getcwd())
app = FastAPI(title="ttt")@app.get("/a")
async def A(a: int):# 任务1a = a+10print(f"A任务1:{str(a)}")# 任务2a = a+1print(f"A任务2:{str(a)}")# 任务3# time.sleep(8) # 同步阻塞8sawait asyncio.sleep(8)print(f"A任务3:8s睡好了")# 任务4a = a+1print(f"A任务4:{str(a)}")@app.get("/b")
async def B(a: int):# 任务1a = a+20print(f"B任务1:{str(a)}")# 任务2a = a+1print(f"B任务2:{str(a)}")# 任务3# time.sleep(4) # 同步阻塞4sawait asyncio.sleep(4)print(f"B任务3:4s睡好了")# 任务4a = a+1print(f"B任务4:{str(a)}")@app.get("/c")
async def C(a: int):# 任务1a = a+30print(f"C任务1:{str(a)}")# 任务2a = a+1print(f"C任务2:{str(a)}")# 任务3a = a+1print(f"C任务3:{str(a)}")# 任务4a = a+1print(f"C任务4:{str(a)}")@app.get("/d")
async def D(a: int):# 任务1a = a+40print(f"D任务1:{str(a)}")# 任务2a = a+1print(f"D任务2:{str(a)}")# 任务3a = a+1print(f"D任务3:{str(a)}")# 任务4a = a+1print(f"D任务4:{str(a)}")if __name__ == "__main__":uvicorn.run(app,host="0.0.0.0",port=9351)