默认情况下,Fask以多线程模式运行,每个请求都落在一个新线程上。
SSE:基于HTTP的协议,用于实现服务器向客户端推送实时数据。使用长轮询机制,客户端通过HTTP连接向服务器发送请求,并保持该连接打开,服务器可以随时向客户端推送新的数据。
SSE协议使用简单的文本格式,数据通过纯文本的消息流进行传输,每个消息以"data:"开头,以两个换行符"\n\n"结尾,如果传递的数据中有字典要使用变量传递。
对比webSocket
连接:
SSE 可使用普通的HTTP连接通信,不需要特殊协议或握手过程。
WebSocket使用自定义协议,需要通过握手过程建立连接。
数据传输:
SSE只支持服务器向客户端的单向数据传输,即服务器可以主动向客户端推送数据。
WebSocket支持全双工通信,服务器和客户端可以同时发送和接收数据。
兼容性:
SSE在大多数现代浏览器中得到支持,某些旧版本的浏览器中可能不被完全支持。
WebSocket在大多数现代浏览器中得到广泛支持,包括旧版本的浏览器。
在使用flask框架做大模型问答,实现SSE模型的输出,可尝试flask-sse包。
flask-sse
基于 Python 的 Flask 微框架扩展,GitCode - 全球开发者的开源社区,开源代码托管平台
- flask_sse.py 包含了扩展的核心逻辑,用于处理SSE。
- tests 目录提供了单元测试案例,确保功能正确性。
- examples 包含可直接运行的例子,帮助理解如何集成到Flask应用中。
使用该包是需要选配置redis,用于存储待发送的消息,
通过自定义CallBackHandler实现流式输出
自定义异步输出Handler:
class AsyncChainStreamHandler(StreamingStdOutCallbackHandler):def __init__(self):self.tokens = []self.finish = False # 结束后要改为Trueasync def on_llm_new_token(self, token: str, **kwargs):print(token, end="| ")self.tokens.append(token)def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:self.finish = 1def on_llm_error(self, error: Exception, **kwargs: Any) -> None:print(str(error))self.tokens.append(str(error))async def generate_tokens(self):while not self.finish or self.tokens:if self.tokens:data = self.tokens.pop(0)print(f"data: {data}, tokens length: {len(self.tokens)}")yield dataelse:pass
class ChainStreamHandler(StreamingStdOutCallbackHandler):def __init__(self):self.tokens = []self.finish = False # 结束后要改为Truedef on_llm_new_token(self, token: str, **kwargs):print(token, end="| ")self.tokens.append(token)def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:self.finish = 1def on_llm_error(self, error: Exception, **kwargs: Any) -> None:print(str(error))self.tokens.append(str(error))def generate_tokens(self):while not self.finish or self.tokens:if self.tokens:data = self.tokens.pop(0)print(f"data: {data}, tokens length: {len(self.tokens)}")yield dataelse:pass
使用原有的Handler:
from langchain.callbacks import StreamingStdOutCallbackHandler, AsyncIteratorCallbackHandler
使用langchain声明大模型时调用
handler = StreamingStdOutCallbackHandler() # on_llm_new_token 默认控制台输出 # handler = AsyncChainStreamHandler() model = ChatOpenAI(streaming=stream,verbose=True,callbacks=[handler],openai_api_key="EMPTY",openai_api_base="http://ip:8090/v1", # 不同于requests请求的地址model_name="Qwen1.5-32B-Chat",temperature=0.7,max_tokens=4096 )
返回:return Response(handler.generate_tokens(), mimetype='text/event-stream')
参考:
https://flask-sse.readthedocs.io/en/latest/api.html#flask_sse.ServerSentEventsBlueprint
如何支持Flask-SSE访问控制-腾讯云开发者社区-腾讯云
大模型平台都在用的SSE协议是怎么样的?-51CTO.COM
Flask-SSE 教程:实时事件流的简易实现-CSDN博客
Flask框架进阶-Flask流式输出和受访配置--纯净详解版-CSDN博客
https://zhuanlan.zhihu.com/p/645053138