背景
接上一篇《LLM大模型统一封装接口解决方案》架构确定后,流式方案非常规请求,需要特殊处理。
本解决方案就是针对上一篇中所需要的流式(打字机效果进行编码)
什么是SSE
SSE(Server-Sent Events,服务器发送事件)是一种基于HTTP的服务器到客户端的单向通信技术,用于实现服务器向客户端推送数据的功能。SSE协议标准由HTML5规范定义,并且其定义被包含在HTML Living Standard中。
SSE允许服务器通过HTTP连接向客户端发送数据,而无需客户端发起请求。这使得SSE非常适合于实时通信或推送通知给客户端的应用程序,例如实时股票报价、即时通讯、实时监控等场景。
基本上,SSE由以下要素组成:
- 服务器:负责向客户端发送事件流的HTTP服务器。
- 客户端:通过浏览器中的EventSource API与服务器建立连接,接收服务器发送的事件。
- 事件流(Event Stream):服务器向客户端发送的数据流,格式为纯文本,使用一种特定的格式进行编码,例如MIME类型为"text/event-stream"。
SSE的优点包括简单易用、实现方便、跨浏览器支持良好等。然而,它也有一些限制,例如不能支持双向通信,与WebSocket相比,SSE的实时性稍逊一筹。
Java框架说明
pom 文件引入的核心依赖包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>aip.com</groupId><artifactId>aip-com</artifactId><version>0.0.1</version><name>aip-com</name><description>aip com project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
Java后端核心代码
本方法是标准的SSE协议标准
private final ExecutorService executorService = Executors.newFixedThreadPool(5);/*** 会话请求** @return String*/@PostMapping(value = "/completions", consumes = MediaType.APPLICATION_JSON_VALUE)@Operation(summary = "会话请求")public SseEmitter completions(@RequestBody CompletionRequest completionRequest) {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);SseEmitter emitter = new SseEmitter();executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {// 向客户端发送事件emitter.send(SseEmitter.event().name("message").data(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build())));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;/*** 会话请求** @return String*/@GetMapping(value = "/stream")@Operation(summary = "会话请求")public SseEmitter stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);SseEmitter emitter = new SseEmitter();executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {// 向客户端发送事件emitter.send(SseEmitter.event().name("message").data(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build())));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;
Flux 和 Flowable 对比
Flux 和 Flowable 都是响应式编程库中的数据流类型,用于处理异步和基于事件的流式数据。它们分别来自于不同的库,Flux 是 Reactor 库的一部分,而 Flowable 则是 RxJava 库的一部分。以下是它们之间的一些区别:
-
库的来源:
- Flux 来自于 Reactor 库,是 Reactor 的核心组件之一,React的核心模块用于基于反应式流规范处理数据流。
- Flowable 来自于 RxJava 库,是 RxJava 的核心类之一,RxJava 是 Java 平台的反应式扩展库,用于处理异步和基于事件的编程。
-
背压策略:
- Flux 默认采用背压策略为 BUFFER,可以通过 onBackpressureBuffer、onBackpressureDrop、onBackpressureLatest 等方法来指定不同的背压策略。
- Flowable 默认也是支持背压的,但是相比 Flux,Flowable 提供了更多的背压策略,如 BUFFER、DROP、LATEST、ERROR、MISSING。
-
反应式规范:
- Flux 遵循 Reactor 库的反应式流规范,使用 Mono 和 Flux 来表示异步流和单个结果。
- Flowable 遵循 RxJava 库的反应式流规范,使用 Observable 和 Flowable 来表示异步流和单个结果。
-
生态系统:
- Reactor 生态系统主要用于基于 Reactor 的应用程序。
- RxJava 生态系统则更广泛,它是 ReactiveX 的一部分,支持多种语言和平台,并有许多衍生项目。
总的来说,Flux 和 Flowable 在概念上很相似,都用于处理异步和基于事件的流式数据,但它们来自于不同的库,并且有一些细微的区别,如背压策略和生态系统支持。您可以根据项目需求选择适合的库和数据流类型。
Java后端Flowable方式
本方法是Flowable方式,非标准流式规则
/*** 会话请求** @return String*/@GetMapping(value = "/stream")@Operation(summary = "会话请求")public Flowable<String> stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);Flowable<String> typingFlow = Flowable.create(emitter -> {executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {emitter.onNext(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build()));Thread.sleep(1000);}emitter.onComplete();} catch (Exception e) {}});}, BackpressureStrategy.BUFFER);return typingFlow;}
Java后端Flux方式
本方法是Flux方式,非标准流式规则
/*** 会话请求** @return String*/@GetMapping(value = "/stream")@Operation(summary = "会话请求")public Flux<String> stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);Flux<String> typingFlow = Flux.create(emitter -> {executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {emitter.next(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build()));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {}});}, FluxSink.OverflowStrategy.BUFFER);return typingFlow;}
}
HTML 客户端接收示例程序
function EventSourceGetRequest() SSE 默认方法,只支持GET请求,适合演示用途以及后端包装好服务
function fetchPostRequest() fetch POST 请求实现SSE,支持所有请求(POST,GET等)以及传递参数
sse.html 内容
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SEE Example</title><script>// SSE 默认方法,只支持GET请求function EventSourceGetRequest() {if(typeof(EventSource)!=="undefined"){var eventSource = new EventSource('http://127.0.0.1:8090/v1/chat/stream');eventSource.onmessage = function(event){document.getElementById('result').insertAdjacentHTML('beforeend', `${event.data}<br/><br/>`);console.log(event)};}else{document.getElementById("result").innerHTML="抱歉,你的浏览器不支持 server-sent 事件...";}}// fetch POST 请求实现SSEfunction fetchPostRequest() {fetch('http://127.0.0.1:8090/v1/chat/completions', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify({}),}).then(response => {// 检查响应是否成功if (!response.ok) {throw new Error('Network response was not ok');}// 返回 ReadableStream 对象return response.body;}).then(stream => {// 创建一个新的文本解码器const decoder = new TextDecoder();// 获取一个 reader 对象const reader = stream.getReader();let chunk = ''// 逐块读取数据function read() {reader.read().then(({ done, value }) => {if (done) {document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);console.log('Stream has ended');return;}// 将数据块转换为字符串并显示const tmp = decoder.decode(value, { stream: true });if (tmp.startsWith('event:') && chunk!='') {document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);chunk = tmp}else{chunk = chunk + tmp}// 继续读取下一块数据read();});}// 开始读取数据read();}).catch(error => {// 处理错误console.error('There was a problem with the fetch operation:', error);});}// EventSourceGetRequest();fetchPostRequest();</script>
</head>
<body><h1>SEE result</h1><div id="result"></div>
</body>
</html>
- 标准SSE示例
- 扩展SSE