背景
最近由于工作要求需要使用Springboot搭建一个流式响应服务,即客户端发送一次请求,服务端需要多次响应才能返回完整的数据。使用场景就是与chatGPT对话,你问一个问题,页面会逐字将结果打印出来。
下面我在SpringBoot中可以简单的实现一下这种场景需求,即SSE(Server-Sent Events)模式
前端请求实现方式
目前前端的请求实现方式有两种,一个是采用EventSource实现,这种实现方式不支持自定义的请求头,也就没有办法再请求头部中增加Token这样的用户身份验证信息。并且该方式只支持GET请求方式。所以这种实现方式只适用于,不需要验证用户身份并且请求参数内容少的情况下。
若要传输更多的参数信息或者在请求头中增加自定义内容建议使用AbortController实现
若传输过程中链接断开,EventSource可以实现自动重新链接,AbortController不能实现自动重新链接。
使用EventSource实现
// 建立连接let source = new EventSource('http://localhost:8080/sse/connect/' + userId);/*** 连接一旦建立,就会触发open事件* 另一种写法:source.onopen = function (event) {}*/source.addEventListener('open', function (e) {console.log("建立连接。。。");}, false);/*** 客户端收到服务器发来的数据* 另一种写法:source.onmessage = function (event) {}*/source.addEventListener('message', function (e) {console.log(e.data);});/*** 如果发生通信错误(比如连接中断),就会触发error事件* 或者:* 另一种写法:source.onerror = function (event) {}*/source.addEventListener('error', function (e) {if (e.readyState === EventSource.CLOSED) {console.log("连接关闭");} else {console.log(e);}}, false);
使用AbortController实现
<template><div><input v-model="name" placeholder="Enter your name"><button @click="sendPost">Send POST request</button><button @click="stopGenerating">Stop Generating</button><button @click="restartGenerating">Restart Generating</button><pre>{{ response }}</pre></div>
</template><script>
export default {data() {return {name: '',response: '',controller: new AbortController(),isStopped: false}},methods: {async sendPost() {this.controller = new AbortController()this.response = ''this.isStopped = falseconst response = await fetch('http://127.0.0.1:5000/stream', {method: 'POST',headers: { 'Content-Type': 'application/json' },body: JSON.stringify({ name: this.name }),signal: this.controller.signal})const reader = response.body.getReader()while (true) {if (this.isStopped) breakconst { done, value } = await reader.read()if (done) breakthis.response += new TextDecoder().decode(value)}
},stopGenerating() {this.controller.abort()this.isStopped = true},restartGenerating() {this.controller = new AbortController()this.sendPost()}}
}
</script>
后端响应实现方式
使用SseEmitter实现
@RequestMapping(value = "/talkeAbouttestSseEmitter")public SseEmitter talkeAbouttestSseEmitter(HttpServletResponse response, @RequestBody JSONObject object) throws IOException {SseEmitter emitter = new SseEmitter();logger.info("【prompt内容】:{}", object.getString("prompt"));String str = " 什么是爱而不得? \n" +"东边日出西边雨,道是无晴却有晴。\n" +"他朝若是同淋雪,此生也算共白头。\n" +"我本将心向明月,奈何明月照沟渠。\n" +"此时相望不相闻,愿逐月华流照君。\n" +"衣带渐宽终不悔,为伊消得人憔悴。\n" +"此情可待成追忆,只是当时已惘然。\n" +"人生若只如初见,何事西风悲画扇。\n" +"曾经沧海难为水,除却巫山不是云。\n" +"何当共剪西窗烛,却话巴山夜雨时。\n" +"天长地久有时尽,此恨绵绵无绝期。\n" +"\n";response.setHeader("Content-Type", "text/event-stream");response.setContentType("text/event-stream");response.setCharacterEncoding("UTF-8");response.setHeader("Pragma", "no-cache");new Thread(() -> {
// // 响应流try {for (int i = 0; i < str.length(); i++) {// 指定事件标识 event: 这个为固定格式emitter.send(String.valueOf(str.charAt(i)));Thread.sleep(100);}emitter.send("stop");emitter.complete(); // Complete the SSE connection} catch (IOException e) {e.printStackTrace();}}).start();return emitter;}
使用HttpServlet实现
@RequestMapping(value = "/talkeAbouttestEvent")public void talkeAbouttestEvent(HttpServletResponse response, @Param("prompt") String prompt) throws IOException {logger.info("【prompt内容】:{}", prompt);String str = " 什么是爱而不得? \n" +"东边日出西边雨,道是无晴却有晴。\n" +"他朝若是同淋雪,此生也算共白头。\n" +"我本将心向明月,奈何明月照沟渠。\n" +"此时相望不相闻,愿逐月华流照君。\n" +"衣带渐宽终不悔,为伊消得人憔悴。\n" +"此情可待成追忆,只是当时已惘然。\n" +"人生若只如初见,何事西风悲画扇。\n" +"曾经沧海难为水,除却巫山不是云。\n" +"何当共剪西窗烛,却话巴山夜雨时。\n" +"天长地久有时尽,此恨绵绵无绝期。\n" +"\n";// 响应流response.setHeader("Content-Type", "text/event-stream");response.setContentType("text/event-stream");response.setCharacterEncoding("UTF-8");response.setHeader("Pragma", "no-cache");try {// 指定事件标识 event: 这个为固定格式response.getWriter().write("event:open\n");response.getWriter().flush();for (int i = 0; i < str.length(); i++) {// 指定事件标识 event: 这个为固定格式
// response.getWriter().write("event:msg\n");// 格式:data: + 数据 + 2个回车response.getWriter().write("data:{\"content\":\""+ String.valueOf(str.charAt(i)).getBytes(StandardCharsets.UTF_8) + "\"}\n\n");response.getWriter().flush();Thread.sleep(100);}// 指定事件标识 event: 这个为固定格式response.getWriter().write("event:error\n");response.getWriter().flush();
// response.getWriter().close();} catch (IOException | InterruptedException e) {e.printStackTrace();} finally {}}
后端请求实现方式
/*** ** @param url* @param json* @return*/public static BufferedReader sendJsonPostResveEventStream(String url, String json) {PrintWriter out = null;BufferedReader in = null;BufferedReader reader = null;try {log.info("sendPost - {}", url);log.info("json - {}", json);URL realUrl = new URL(url);HttpURLConnection conn = (HttpURLConnection) realUrl.openConnection();conn.setRequestMethod("POST");conn.setDoOutput(true);conn.setDoInput(true);conn.setUseCaches(false);conn.setRequestProperty("Connection", "Keep-Alive");conn.setRequestProperty("Charset", "UTF-8");conn.setRequestProperty("Content-Type", "application/json; charset=UTF-8");conn.setRequestProperty("accept", "application/json");if (json != null && !json.equals("")) {byte[] writebytes = json.getBytes();conn.setRequestProperty("Content-Length", String.valueOf(writebytes.length));OutputStream outwritestream = conn.getOutputStream();outwritestream.write(json.getBytes());outwritestream.flush();outwritestream.close();conn.getResponseCode();}if (conn.getResponseCode() == 200) {reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));return reader;}} catch (ConnectException e) {log.error("调用HttpUtils.sendPost ConnectException, url=" + url + ",param=" + json, e);} catch (SocketTimeoutException e) {log.error("调用HttpUtils.sendPost SocketTimeoutException, url=" + url + ",param=" + json, e);} catch (IOException e) {log.error("调用HttpUtils.sendPost IOException, url=" + url + ",param=" + json, e);} catch (Exception e) {log.error("调用HttpsUtil.sendPost Exception, url=" + url + ",param=" + json, e);} finally {try {if (out != null) {out.close();}if (in != null) {in.close();}} catch (IOException ex) {log.error("调用in.close Exception, url=" + url + ",param=" + json, ex);}}return null;}
后端请求然后以事件流的方式发送给前端
@PostMapping(value = "/talkeAbout", produces = "text/event-stream")public void talkeAbout(HttpServletResponse response, @RequestBody JSONObject object) throws IOException {response.setHeader("Content-Type", "text/event-stream");response.setContentType("text/event-stream");response.setCharacterEncoding("UTF-8");response.setHeader("Pragma", "no-cache");talkeAboutToXinference(object.getString("prompt"), response);}public void talkeAboutToXinference(String msg, HttpServletResponse response) throws IOException {String json = CHAT_PRARAM.replace("user_talke_about", msg);BufferedReader reader = HttpUtils.sendJsonPostResveEventStream("http://localhost/chat" + CHAT_CHAT_COMPLETIONS, json);if (reader == null) return;String line = "";while ((line = reader.readLine()) != null) {response.getWriter().write(line +"\n");response.getWriter().flush();}response.getWriter().close();}