Python基于Quart框架实现SSE数据传输
- 前言
- SSE简介
- 理论分析
- 代码实现
前言
在类似Chatgpt的应用中要实现数据的流式传输,模仿实现打字机效果,SSE是不二之选。传统的Flask框架不能满足异步处理的要求,没有异步处理就很难实现实时交互的需求,因此全新的Quart框架出现,但是Quart框架并没有原生好用的SSE类,官网只给出了如下的封装:
SSE简介
Server-Sent Events (SSE) 是一种基于 HTTP 的协议,服务器可以使用它来向客户端推送实时更新。在人工智能(AI)领域,SSE 的重要性主要体现在以下几个方面:
-
实时交互:在某些 AI 应用中,如聊天机器人、实时推荐系统等,需要服务器能够实时地向客户端推送新的信息或更新。SSE 提供了一种有效的方式来实现这种实时交互。
-
异步处理:AI 模型的计算过程可能会耗费一定的时间,特别是在处理大量数据或复杂模型时。通过 SSE,服务器可以在计算完成后立即将结果推送给客户端,而无需让客户端等待整个计算过程。
-
减少网络负载:相比于传统的轮询方式,SSE 可以大大减少网络请求的数量,从而降低服务器的负载。这对于处理大量实时数据的 AI 应用来说尤其重要。
-
易于实现和使用:SSE 是基于 HTTP 的,因此在大多数编程语言和框架中都很容易实现。此外,由于它是一种标准协议,因此客户端(如浏览器)通常也提供了对 SSE 的原生支持。
理论分析
Server-Sent Events (SSE) 是一种服务器向客户端推送数据的技术,它基于 HTTP 协议,允许服务器单向地向客户端发送事件。以下是 SSE 的数据传输格式和通信过程原理:
数据传输格式
SSE 使用纯文本格式发送数据,每个事件由一系列以换行符分隔的字段组成。每个字段都以字段名开始,后跟一个冒号和字段值。例如:
data: This is the first message.data: This is the second message.
这里,data
是一个字段,表示事件的数据。你也可以使用其他字段,如 event
(指定事件类型)和 id
(指定事件 ID)。
通信过程原理
-
建立连接:客户端通过发送一个 GET 请求到服务器上的某个 URL 来建立连接。这个请求的
Accept
头部字段应该设置为text/event-stream
,以告诉服务器客户端希望使用 SSE。 -
发送事件:一旦连接建立,服务器就可以开始发送事件。每个事件都是一个独立的消息,由一系列字段组成。服务器可以在任何时候发送事件,不需要客户端的请求。
-
接收事件:客户端通过监听
message
事件来接收服务器发送的事件。每当服务器发送一个事件,message
事件就会被触发,事件的数据可以通过事件对象的data
属性获取。 -
断开和重新连接:如果连接被断开,客户端会自动尝试重新连接。你可以通过设置
retry
字段来控制重新连接的时间间隔。如果服务器发送了一个带有id
字段的事件,那么在重新连接时,客户端会发送一个Last-Event-ID
头部,值为最后一个接收到的事件的 ID,这样服务器就可以知道客户端接收到哪些事件。
因此需要定义如下四种内容:
- data:数据内容
- event:事件类型,一般是message
- id:事件编号
- retry:断开重传的时间
代码实现
from dataclasses import dataclass
from quart import make_response, json, Response@dataclass
class ServerSentEvent:"""Server Sent Event服务器服务端作用:将文本数据变成数据流传向客户端数据格式:data: string 传输数据内容, 公有变量event: string 传输事件类型, 私有变量id: string 事件id, 私有变量retry: int 断开重连时间, 私有变量"""_data: str_event: str = None_id: int = 0_retry: int = 0def encode(self):"""将数据转换成SSE的传输格式"""message = f"data: {self._data}"if self._event is not None:message = f"{message}\nevent: {self._event}"if self._id is not None:message = f"{message}\nid: {self._id}"if self._retry is not None:message = f"{message}\nretry: {self._retry}"message = f"{message}\n\n"return messageasync def response_sse(chat_generator):"""发送请求的响应chat_generator: generatorreturn: response"""async def send_events():"""将数据编码成SSE传输的格式进行传输"""# 遍历chat_generator获取其中的字符串内容for data in chat_generator:print("data in generator:"+data)event = ServerSentEvent(data)encoded_event = event.encode()yield encoded_event# 返回响应数据response = await make_response(send_events(),{'Content-Type': 'text/event-stream','Cache-Control': 'no-cache','Transfer-Encoding': 'chunked','Connection': 'keep-alive',},)response.timeout = Nonereturn response