一个java项目中,如何使用sse协议,构造一个chatgpt的流式对话接口

前言
如何注册chatGPT,怎么和它交互,本文就不讲了;因为网上教程一大堆,而且你要使用的话,通常会再包一个算法服务,用来做一些数据训练和过滤处理之类的,业务服务基本不会直接与原生chatGPT交互。
而下面阐述的,就是业务服务与算法服务的交互。

业务需求-需要实现什么样的功能

想要一个类似与AI问答助手的机器人,可以实现根据某些场景对话提问的功能

  1. 可以直接提问,类似直接使用chatGPT,只不过这个提问的过程会做一些业务通用处理,比如问答数据的归纳反馈、敏感词过滤等等。
  2. 也可以给它喂一篇论文,喂一批近期的资讯,或者是一本小说之类的,根据指定的上下文去进行问答(这种场景需要先投递数据建立相关索引)
  3. ai的回答要求和chatGPT一样保持流式返回(也就是一个字一个字,一边生成一边返回,而不是等整个回答生成完之后一股脑返回)

剖析

重点是流式,这里我们预设算法侧已经有了一个流式返回的接口,整体的交互如下图所示
在这里插入图片描述
下面分别介绍几个关键节点的数据交互设计,仅供参考

q1

简述:页面发送问答数据给业务服务端

{"chatId": 233,"question": "这篇论文有几个论点?"
}
  • 这里的chatId可以理解为一个对话框id,业务服务端可以根据这个来进行问答归类、批量删除收藏、问答上下文查询等操作。
  • question就是问题的内容

这里需要注意就是,交互数据格式尽可能简单、易拓展,有些产品的页面交互设计的非常复杂,什么历史问答、角色信息之类的,套了一层又一层,其实很多都没必要的,这样前端组装起来也麻烦,也不利于数据的管理与后期功能的拓展。

q2

简述:就是业务服务根据前端传来的问题和所属的对话框,把相应的上下文查询出来(甚至可以前端维护一个是否发送上下文的开关,更动态一点),包装成算法服务所需要的问题数据,发给它。

{"chatId":  233,"userName":  "张三","messageKey":  "0a795f6a-a029-435f-8d67-6f6f8e078cfe","message":  "这篇论文有几个论点?","chatHistory":  [{"messageKey":  "0a795f6a-a029-435f-8d67-6f6f8e07dasd","question":  "这篇论文的作者是谁","answer":  "这篇论文的作者是李四博士。"}],"callbackUrl":  "http://xxx/chat/question/callback"
}
  • 上述的messageKey就是一个消息的key,用以常规的接口调试
  • chatHistory就是历史问答记录,即上下文,众所周知chatGPT带不带上下文,回答的结果可能截然不同
  • callbackUrl是业务定义的一个回调接口,用来回调一些算法侧异步生产的信息,比如原文的定位信息、根据当前问题生成的推荐问答等,这些和流式的回答是不会一起返回的,所以额外提供一个接口来接收。

q3和a3

这两步不详述(主要我也不是研究算法模型的哈哈,不是很清楚细节)
我们只需要定义好a2返回的结果即可

a2

简述:主要是算法侧返回给业务服务的同步的流式回答,同时还可能有异步的额外信息的回调(q2的callbackUrl来接收)。所以a2的返回结果分为两个response
response1:同步的流式回答,一般在2-7s内返回第一个字符

data:data:data:... // 省略一些输出
data:data:

流式问答的规范可以参考:流式接口协议规范

response2:异步的拓展信息(可有可无)

{"messageKey":"0a795f6a-a029-435f-8d67-6f6f8e078cfe", //必传 回调的消息key,每次问答唯一"expand":{"recommendedQuestions":[ // 推荐问题"这篇论文的主要论点是什么?"],"originalIndex":[{"sourceId":3432,"text":"首先第一个论点是......","textIndex":90}]}
}

a1

简述:a1要返回的格式很好理解,就是把a2中的两个response组合在一起,需要注意的有几点

  1. a2的response2不一定有,需要设置超时策略,且需要在流式回答最后输出
  2. a2中的response1是流式,response2不是;但输出到a1的时候,需要保证都在流中
  3. 最好需要约定一些event来作为标识符
event:messageKey // 消息key事件data:0a795f6a-a029-435f-8d67-6f6f8e078cfeevent:answer // 流式回答开始事件data:"在论文"data:"中"data:","data:"我"data:"们"data:"一"data:"共"
...
data:"几"data:"个"data:"论"data:"点"event:endTimedata:2024-02-27 17:05:24event:expand // 拓展信息开始事件,此处等待15s超时data:{"recommendedQuestions":["这篇论文的主要论点是什么"],"originalIndex":{"sourceId":32133,"text":"首先第一个论点是......","textIndex":90}}

代码

代码省略了一些无关紧要的业务特有的部分,只保留通用的部分
工具类:SSEUtils,用来操作SSE客户端

import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** description** @author luhui* @date 2024/1/25*/
@Slf4j
public class SSEUtils {/*** timeout 30min*/private static final Long DEFAULT_TIME_OUT = 30 * 60 * 1000L;/*** 订阅表*/private static final Map<String, EvaEmitter> EMITTER_MAP = new ConcurrentHashMap<>();public static final String MSG_DATA_PREFIX = "data:";public static final String MSG_EVENT_PREFIX = "event:";/*** description: 创建流** @param messageKey 本次问答的消息key* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter* @author luhui* @date 2024/2/23 17:09*/public static EvaEmitter getEmitter(String messageKey) {if (null == messageKey || "".equals(messageKey)) {return null;}EvaEmitter emitter = EMITTER_MAP.get(messageKey);if (null == emitter) {emitter = new EvaEmitter(DEFAULT_TIME_OUT);EMITTER_MAP.put(messageKey, emitter);}return emitter;}/*** description: 发消息** @param messageKey 本次问答的消息key* @param msg        消息* @author luhui* @date 2024/2/23 17:09*/public static void pushMsg(String messageKey, String msg) throws IOException {EvaEmitter emitter = EMITTER_MAP.get(messageKey);if (null != emitter) {emitter.send(EvaEmitter.event().data(msg));}}public static void pushEvent(String messageKey, String eventDesc) throws IOException{EvaEmitter emitter = EMITTER_MAP.get(messageKey);if (null != emitter) {emitter.send(EvaEmitter.event().name(eventDesc));}}/*** description: 关闭流** @param messageKey 本次问答的消息key* @author luhui* @date 2024/2/23 17:08*/public static void closeEmitter(String messageKey) {EvaEmitter emitter = EMITTER_MAP.get(messageKey);if (null != emitter) {try {emitter.complete();EMITTER_MAP.remove(messageKey);} catch (Exception e) {e.printStackTrace();}}}
}

工具类:SSEClient ,用来获取SSE流

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;/*** description** @author luhui* @date 2024/1/25*/
@Slf4j
public class SSEClient {// timeoutpublic static Integer DEFAULT_TIME_OUT = 60 * 1000;/*** 获取SSE输入流*/public static InputStream getSseInputStream(String urlPath, JSONObject param, int timeoutMill) {HttpURLConnection urlConnection = null;try {urlConnection = getHttpURLConnection(urlPath, timeoutMill);putData(urlConnection, param);InputStream inputStream = urlConnection.getInputStream();return new BufferedInputStream(inputStream);} catch (IOException e) {e.printStackTrace();}return null;}/*** 读流数据*/public static void readStream(InputStream is, MsgHandler msgHandler) throws IOException {BufferedReader reader = new BufferedReader(new InputStreamReader(is));try {String line = "";while ((line = reader.readLine()) != null) {if ("".equals(line)) {continue;}msgHandler.handleMsg(line);}} catch (Exception e) {e.printStackTrace();// 目前这里抛出的显式异常来自与用户手动关闭的连接,此时服务端与算法端的连接也捕获并关闭,无需存储} finally {// 服务器端主动关闭时,客户端手动关闭reader.close();is.close();}}private static HttpURLConnection getHttpURLConnection(String urlPath, int timeoutMill) throws IOException {URL url = new URL(urlPath);HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();urlConnection.setDoOutput(true);urlConnection.setDoInput(true);urlConnection.setUseCaches(false);urlConnection.setRequestMethod("POST");urlConnection.setRequestProperty("Connection", "Keep-Alive");urlConnection.setRequestProperty("Charset", "UTF-8");urlConnection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");urlConnection.setRequestProperty("accept", "text/event-stream");// 读过期时间urlConnection.setReadTimeout(timeoutMill);return urlConnection;}public static void putData(HttpURLConnection connection, JSONObject jsonStr) throws IOException {byte[] writebytes = jsonStr.toJSONString().getBytes();connection.setRequestProperty("Content-Length", String.valueOf(writebytes.length));DataOutputStream wr = new DataOutputStream(connection.getOutputStream());wr.write(jsonStr.toJSONString().getBytes());wr.flush();wr.close();}/*** 消息处理接口*/public interface MsgHandler {void handleMsg(String line) throws IOException;}
}

工具类:EvaEmitter,用来封装一些流信息

import cn.hutool.core.date.DateTime;
import com.alibaba.fastjson.JSONObject;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;/*** description EvaEmitter** @author luhui* @date 2024/02/22*/
@Data
public class EvaEmitter extends SseEmitter {public EvaEmitter(Long timeout) {super(timeout);}@ApiModelProperty("版本id")private Long versionId;@ApiModelProperty("用户问题")private String question;@ApiModelProperty("唯一消息key")private String messageKey;@ApiModelProperty("当前用户")private Long currentUid;@ApiModelProperty("当前用户名")private String currentUserName;@ApiModelProperty("项目id")private Long projectId;@ApiModelProperty("ai回答")private String aiAnswer;@ApiModelProperty("拓展信息")private JSONObject expand;@ApiModelProperty("错误信息")private JSONObject error;@ApiModelProperty("提问开始时间")private DateTime startTime;public JSONObject getHistory() {JSONObject history = new JSONObject();history.put("question", question);history.put("answer", aiAnswer);history.put("expand", expand);history.put("error", error);return history;}
}

具体的chat交互方法

	String messageKey = UUID.randomUUID().toString();EvaEmitter emitter = SSEUtils.getEmitter(messageKey);emitter.setProjectId(111);// 初始化相关字段sseService.chatTransfer(messageKey);
    @Async@Overridepublic void chatTransfer(String messageKey) {EvaEmitter emitter = SSEUtils.getEmitter(messageKey);// 正式参数JSONObject params = new JSONObject(true);params.put("versionId", emitter.getVersionId().toString());params.put("userName", emitter.getCurrentUserName());params.put("messageKey", emitter.getMessageKey());params.put("message", emitter.getQuestion());params.put("chatHistory", chatHistoryService.getChatHistory(emitter));params.put("callbackUrl", gateway + "/xxxchat/question/callback");InputStream inputStream = SSEClient.getSseInputStream(aiChatUrl, params, SSEClient.DEFAULT_TIME_OUT);try {StringBuilder answer = new StringBuilder();SSEUtils.pushEvent(messageKey, "messageKey");SSEUtils.pushMsg(messageKey, messageKey);SSEUtils.pushEvent(messageKey, "answer");AtomicReference<Boolean> sdkError = new AtomicReference<>(false);SSEClient.readStream(inputStream, line -> {log.info("messageKey:{}, chatTransfer:{}", emitter.getMessageKey(), line);String message = "";if (sdkError.get()) {String errorStr = line.split(SSEUtils.MSG_DATA_PREFIX)[1].trim();if (StringUtils.isNotBlank(errorStr)) {// 做一些错误处理message = "算法未知错误,请稍后再试";emitter.setError(message);}} else if (line.contains(SSEUtils.MSG_DATA_PREFIX)) {message = line.split(SSEUtils.MSG_DATA_PREFIX)[1].trim();} else if (line.contains(SSEUtils.MSG_EVENT_PREFIX)) {sdkError.set(true);} else {message = "";}if (StringUtils.isNotBlank(message)) {answer.append(message.replaceAll("\"", ""));SSEUtils.pushMsg(messageKey, message);}});emitter.setAiAnswer(answer.toString());// 保存当前问答消息,自行实现ChatHistoryEntity message = chatHistoryService.saveHistory(messageKey);SSEUtils.pushEvent(messageKey, "endTime");SSEUtils.pushMsg(messageKey, DateUtil.formatDateTime(message.getEndTime()));SSEUtils.pushEvent(messageKey, "expand");chatHistoryService.pushExpand(messageKey);} catch (IllegalStateException | IOException e) {log.error("pushMsg error, web端流已被关闭");} catch (Exception e) {e.printStackTrace();} finally {// 消息发送完或者出现异常的话,存储当前的消息,然后关闭流try {chatHistoryService.saveHistory(messageKey);} catch (Exception e) {e.printStackTrace();} finally {SSEUtils.closeEmitter(messageKey);}}}
	@Override@Retryable(value = Exception.class, maxAttempts = 6, backoff = @Backoff(delay = 500, multiplier = 2))public void pushExpand(String messageKey) throws IOException {// 如果异步的拓展信息,即a2中的response2回调成功的话,会存储到这里Object expandObj = redisService.hGet(RedisConstants.CHAT_AI_RECOMMENDED_QUESTIONS, messageKey);if (expandObj == null) {log.error("未获取到相关拓展信息, 稍后重试");throw new RuntimeException("未获取到相关拓展信息");} else {JSONObject expand = JSONObject.parseObject(expandObj.toString());EvaEmitter emitter = SSEUtils.getEmitter(messageKey);emitter.setExpand(expand);SSEUtils.pushMsg(messageKey, expand.toJSONString());log.info("messageKey:{}, chatTransfer:{}", emitter.getMessageKey(), expand);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/2183.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenCV-基于阴影勾勒的图纸清晰度增强算法

作者&#xff1a;翟天保Steven 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 实现原理 大家在工作和学习中&#xff0c;无论是写报告还是论文&#xff0c;经常有截图的需求&#xff0c;比如图表、图纸等&…

使用 Docker 部署 TailChat 开源即时通讯平台

1&#xff09;介绍 TailChat 官网&#xff1a; https://tailchat.msgbyte.com/ 作者&#xff1a;https://www.moonrailgun.com/about/ GitHub &#xff1a; https://github.com/msgbyte/tailchat TailChat 是一款插件化易拓展的开源 IM 应用。可拓展架构赋予 Tailchat 无限可能…

【前端】vue的基础知识及开发指引

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、Vue是什么二、学习 Vue.js 的基础知识三、熟悉 Vue.js 的生态系统四、掌握常用工具和库五、实践和项目开发六、 持续学习和跟进 前言 随着开发语言及人工智…

使用Docker搭建本地Nexus私有仓库

0-1开始Java语言编程之路 一、Ubuntu下Java语言环境搭建 二、Ubuntu下Docker环境安装 三、使用Docker搭建本地Nexus Maven私有仓库 四、Ubuntu下使用VisualStudioCode进行Java开发 你需要Nexus Java应用编译构建的一种主流方式就是通过Maven, Maven可以很方便的管理Java应用的…

全国832个贫困县名单及精准扶贫脱贫(摘帽名单)数据(2016-2020.11)

01、数据简介 自党的十八大以来&#xff0c;我国脱贫攻坚战取得了举世瞩目的伟大胜利。经过全党全国各族人民的共同努力&#xff0c;现行标准下9899万农村贫困人口全部脱贫&#xff0c;832个贫困县全部摘帽&#xff0c;12.8万个贫困村全部出列&#xff0c;区域性整体贫困得到解…

金融风控信用评分卡建模(Kaggle give me credit数据集)

1 数据预处理数据 数据来源于Kaggle的Give Me Some Credit&#xff0c;包括25万条个人财务情况的样本数据 1.1 导包读数据 import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.ensemble import RandomForestRegressor import seaborn as …

Excel图表智能排序

实例需求&#xff1a;表格中的多个图表如下图左侧所示&#xff0c;对于表格进行排序时&#xff0c;希望第一列中的图表跟随相应数据。 方法1&#xff1a; Sub SortTableWithChart()Dim oSht As Worksheet, RowCnt As Long, ColCnt As LongDim arrData, i As Long, oCht As Cha…

基于STM32CubeMX的嵌入式开发基础

内部没有上拉电阻&#xff0c;外部就要加一个 上拉或者下拉电阻&#xff0c;最基本上的作用是将状态不确定的信号通过一个电阻将其稳定在高电平或低电平 上拉下拉其实起的是稳定电平的作用 问题&#xff1a;单片机的外围电路设计及程序编写大多是以低电平有效来驱动电路的&…

【主流电商API接口数据采集】聚合电商API接口平台:让数据成为生产力!

API接口接入测试||文档 随着数字化商业时代的到来&#xff0c;API接口已成为电商资源连接利器&#xff0c;也是全球传统互联网企业转型的基础。 2021年 Google Cloud 研究显示&#xff0c;全球互联网企业近3/4的企业持续投入数字化转型&#xff0c;2/3的企业在持续增加投入&a…

轻松学会微信小程序开发(一)

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

Java进阶-Stream流

概述 在Java8中&#xff0c;得益于lambda所带来的函数式编程&#xff0c;引入了一个全新的Stream流的概念目的&#xff1a;用于简化集合和数组操作的api 案例 需求&#xff1a;创建一个集合存储多个字符串元素&#xff0c;将集合中所有以“z”开头的元素存储到新的集合中&am…

Torch 模型 感受野可视化

前言&#xff1a;感受野是卷积神经网络 (CNN) 中一个重要的概念&#xff0c;它表示 CNN 每一层输出的特征图上的像素点在输入图像上映射的区域。感受野的大小和形状直接影响到网络对输入图像的感知范围和精度&#xff0c;进而调整网络结构、卷积核大小和步长等参数&#xff0c;…

javaweb-maven

前端HTML,CSS,JS,Vue&#xff0c;Element&#xff0c;Nginx最后去复习&#xff0c; Java开发工程师 主要学习方向是服务端 所以进入javaweb的服务端的第一个知识点 maven 什么是maven 用于管理和构建java项目的工具 maven的官方网站 Maven – Welcome to Apache Maven …

Flink面试(1)

1.Flink 的并行度的怎么设置的&#xff1f; Flink设置并行度的几种方式 1.代码中设置setParallelism() 全局设置&#xff1a; 1 env.setParallelism(3);  算子设置&#xff08;部分设置&#xff09;&#xff1a; 1 sum(1).setParallelism(3) 2.客户端CLI设置&#xff0…

邀请全球创作者参与 The Sandbox 创作者训练营

作为首屈一指的元宇宙平台之一&#xff0c;The Sandbox 的使命是成为全球创作者的中心。随着我们对 Game Maker 的不断改进、旨在激发创作者灵感的定期 Game Jams、革命性的 "创作者挑战 "以及众多其他活动的开展&#xff0c;我们见证了大量个人加入我们充满活力的创…

opencv_5_图像像素的算术操作

方法1&#xff1a;调用库函数 void ColorInvert::mat_operator(Mat& image) { Mat dst; Mat m Mat::zeros(image.size(), image.type()); m Scalar(2, 2, 2); multiply(image, m, dst); m1 Scalar(50,50, 50); //divide(image, m, dst); //add(im…

WordPress social-warfare插件XSS和RCE漏洞【CVE-2019-9978】

WordPress social-warfare插件XSS和RCE漏洞 ~~ 漏洞编号 : CVE-2019-9978 影响版本 : WordPress social-warfare < 3.5.3 漏洞描述 : WordPress是一套使用PHP语言开发的博客平台&#xff0c;该平台支持在PHP和MySQL的服务器上架设个人博客网站。social-warfare plugin是使用…

AIGC元年大模型发展现状手册

零、AIGC大模型概览 AIGC大模型在人工智能领域取得了重大突破&#xff0c;涵盖了LLM大模型、多模态大模型、图像生成大模型以及视频生成大模型等四种类型。这些模型不仅拓宽了人工智能的应用范围&#xff0c;也提升了其处理复杂任务的能力。a.) LLM大模型通过深度学习和自然语…

MSR是个什么寄存器

MSR 这种寄存器专门用于调试、程序执行跟踪、计算机性能监控、简化软件编程、电源控制等等各种实验性功能。 什么是 MSR MSR 的概念是不易理解&#xff0c;所以这一节只说一些 MSR 的外在&#xff0c;比如形容和指令等&#xff0c;然后展开说说&#xff0c;看完整篇文章你应该…

计算机视觉 CV 八股分享 [自用](更新中......)

目录 一、深度学习中解决过拟合方法 二、深度学习中解决欠拟合方法 三、梯度消失和梯度爆炸 解决梯度消失的方法 解决梯度爆炸的方法 四、神经网络权重初始化方法 五、梯度下降法 六、BatchNorm 七、归一化方法 八、卷积 九、池化 十、激活函数 十一、预训练 十二…