需求:需要对文心一言的流式接口数据进行处理 增加属性
return ResponseEntity.ok().header("Access-Control-Allow-Origin", "*").contentType(org.springframework.http.MediaType.TEXT_EVENT_STREAM).cacheControl(org.springframework.http.CacheControl.noCache()).body(outputStream -> {try (Response response = client.newCall(request).execute();ResponseBody responseBody = response.body();InputStream inputStream = responseBody.byteStream()) {if (!response.isSuccessful()) {throw new IOException("Failed to fetch streaming data, HTTP error code: " + response.code());}byte[] buffer = new byte[4096];int bytesRead;while ((bytesRead = inputStream.read(buffer)) != -1) {/*outputStream.write(buffer, 0, bytesRead);outputStream.flush();*/String data = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);String[] lines = data.split("\n");for (String line : lines) {if (line.startsWith("data: ")) {// 记录流输出结果,用于后续持久化com.zbIntel.integration.wenxin.entity.ChatResponse bean =JSONUtil.parseObj(line.substring(6).getBytes(StandardCharsets.UTF_8)).toBean(com.zbIntel.integration.wenxin.entity.ChatResponse.class);bean.setSessionId(req.getSessionId());// 序列化bean并发送String serializedBean = JSONUtil.toJsonStr(bean);outputStream.write(("data: " + serializedBean + "\n\n").getBytes(StandardCharsets.UTF_8));outputStream.flush();log.info("返回数据:{}", bean);String content = bean.getResult();if (bean.getIs_end()) {isEndflag.set(Boolean.TRUE);}// 记录流输出结果,用于后续持久化respContent.append(content);}}}} catch (IOException e) {log.error("Error during streaming data: ", e);outputStream.write(("{" +" \"error_code\": 112," +" \"error_msg\": \"" + e.getMessage() + "\"" +"}").getBytes(StandardCharsets.UTF_8));outputStream.flush();}if(isEndflag.get()) {// 构造回复数据对象,持久化String respContentStr = respContent.toString();SessionChatRecordEntity replyRecord = new SessionChatRecordEntity(finalAskRecord.getSessionId(), Role.ASSISTANT.name,respContentStr, ChatGPTApi.getMessageTokenNum(respContentStr));sessionChatRecordService.saveBatch(ImmutableList.of(finalAskRecord, replyRecord));// 刷新缓存chatService.refreshWindowRecordCache(finalAskRecord.getSessionId());}});
主要时这段:
// 序列化bean并发送
String serializedBean = JSONUtil.toJsonStr(bean);
outputStream.write(("data: " + serializedBean + "\n\n").getBytes(StandardCharsets.UTF_8));
outputStream.flush();
返回的结果:
增加了 sessionId的属性