java模拟GPT流式问答

流式请求gpt并且流式推送相关前端页面

1)java流式获取gpt答案

1、读取文件流的方式

使用post请求数据,由于gpt是eventsource的方式返回数据,所以格式是data:,需要手动替换一下值

/**
org.apache.http.client.methods
**/
@SneakyThrowsprivate void chatStream(List<ChatParamMessagesBO> messagesBOList) {CloseableHttpClient httpclient = HttpClients.createDefault();HttpPost httpPost = new HttpPost("https://api.openai.com/v1/chat/completions");httpPost.setHeader("Authorization","xxxxxxxxxxxx");httpPost.setHeader("Content-Type","application/json; charset=UTF-8");ChatParamBO build = ChatParamBO.builder().temperature(0.7).model("gpt-3.5-turbo").messages(messagesBOList).stream(true).build();System.out.println(JsonUtils.toJson(build));httpPost.setEntity(new StringEntity(JsonUtils.toJson(build),"utf-8"));CloseableHttpResponse response = httpclient.execute(httpPost);try {HttpEntity entity = response.getEntity();if (entity != null) {InputStream inputStream = entity.getContent();BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));String line;while ((line = reader.readLine()) != null) {// 处理 event stream 数据try {
//                        System.out.println(line);ChatResultBO chatResultBO = JsonUtils.toObject(line.replace("data:", ""), ChatResultBO.class);String content = chatResultBO.getChoices().get(0).getDelta().getContent();log.info(content);//                        System.out.println(chatResultBO.getChoices().get(0).getMessage().getContent());} catch (Exception e) {
//                        e.printStackTrace();}}}} finally {response.close();}}

2、sse链接的方式获取数据

用到了okhttp

需要先引用相关maven:

        <dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId></dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp-sse</artifactId></dependency>
       // 定义see接口Request request = new Request.Builder().url("https://api.openai.com/v1/chat/completions").header("Authorization","xxx").post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),param.toJSONString())).build();OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.MINUTES).readTimeout(10, TimeUnit.MINUTES)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天.build();// 实例化EventSource,注册EventSource监听器RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {log.info("onOpen");}@SneakyThrows@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {
//                log.info("onEvent");log.info(data);//请求到的数据}@Overridepublic void onClosed(EventSource eventSource) {log.info("onClosed");
//                emitter.complete();}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {log.info("onFailure,t={},response={}",t,response);//这边可以监听并重新打开
//                emitter.complete();}});realEventSource.connect(okHttpClient);//真正开始请求的一步

2)流式推送答案

方法一:通过订阅式SSE/WebSocket

原理是先建立链接,然后不断发消息就可以

1、websocket

创建相关配置:


import javax.websocket.Session;import lombok.Data;/*** @description WebSocket客户端连接*/
@Data
public class WebSocketClient {// 与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;//连接的uriprivate String uri;}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
配置相关service
@Slf4j
@Component
@ServerEndpoint("/websocket/chat/{chatId}")
public class ChatWebsocketService {static final ConcurrentHashMap<String, List<WebSocketClient>> webSocketClientMap= new ConcurrentHashMap<>();private String chatId;/*** 连接建立成功时触发,绑定参数* @param session 与某个客户端的连接会话,需要通过它来给客户端发送数据* @param chatId 商户ID*/@OnOpenpublic void onOpen(Session session, @PathParam("chatId") String chatId){WebSocketClient client = new WebSocketClient();client.setSession(session);client.setUri(session.getRequestURI().toString());List<WebSocketClient> webSocketClientList = webSocketClientMap.get(chatId);if(webSocketClientList == null){webSocketClientList = new ArrayList<>();}webSocketClientList.add(client);webSocketClientMap.put(chatId, webSocketClientList);this.chatId = chatId;}/*** 收到客户端消息后调用的方法** @param message 客户端发送过来的消息*/@OnMessagepublic void onMessage(String message) {log.info("chatId = {},message = {}",chatId,message);// 回复消息this.chatStream(BaseUtil.newList(ChatParamMessagesBO.builder().content(message).role("user").build()));
//        this.sendMessage(chatId,message+"233");}/*** 连接关闭时触发,注意不能向客户端发送消息了* @param chatId*/@OnClosepublic void onClose(@PathParam("chatId") String chatId){webSocketClientMap.remove(chatId);}/*** 通信发生错误时触发* @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {System.out.println("发生错误");error.printStackTrace();}/*** 向客户端发送消息* @param chatId* @param message*/public void sendMessage(String chatId,String message){try {List<WebSocketClient> webSocketClientList = webSocketClientMap.get(chatId);if(webSocketClientList!=null){for(WebSocketClient webSocketServer:webSocketClientList){webSocketServer.getSession().getBasicRemote().sendText(message);}}} catch (IOException e) {e.printStackTrace();throw new RuntimeException(e.getMessage());}}/*** 流式调用查询gpt* @param messagesBOList* @throws IOException*/@SneakyThrowsprivate void chatStream(List<ChatParamMessagesBO> messagesBOList) {// TODO 和GPT的访问请求}
}
测试,postman建立链接

2、SSE

本质也是基于订阅推送方式

前端:
<!DOCTYPE html>
<html lang="en"><head><meta charset="UTF-8"><title>SseEmitter</title>
</head><body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>let source = null;// 用时间戳模拟登录用户//const id = new Date().getTime();const id = '7829083B42464C5B9C445A087E873C7D';if (window.EventSource) {// 建立连接source = new EventSource('http://172.28.54.27:8902/api/sse/connect?conversationId=' + id);setMessageInnerHTML("连接用户=" + id);/*** 连接一旦建立,就会触发open事件* 另一种写法:source.onopen = function (event) {}*/source.addEventListener('open', function(e) {setMessageInnerHTML("建立连接。。。");}, false);/*** 客户端收到服务器发来的数据* 另一种写法:source.onmessage = function (event) {}*/source.addEventListener('message', function(e) {//console.log(e);setMessageInnerHTML(e.data);});source.addEventListener("close", function (event) {// 在这里处理关闭事件console.log("Server closed the connection");// 可以选择关闭EventSource连接source.close();});/*** 如果发生通信错误(比如连接中断),就会触发error事件* 或者:* 另一种写法:source.onerror = function (event) {}*/source.addEventListener('error', function(e) {console.log(e);if (e.readyState === EventSource.CLOSED) {setMessageInnerHTML("连接关闭");} else {console.log(e);}}, false);} else {setMessageInnerHTML("你的浏览器不支持SSE");}// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据window.onbeforeunload = function() {//closeSse();};// 关闭Sse连接function closeSse() {source.close();const httpRequest = new XMLHttpRequest();httpRequest.open('GET', 'http://172.28.54.27:8902/api/sse/disconnection?conversationId=' + id, true);httpRequest.send();console.log("close");}// 将消息显示在网页上function setMessageInnerHTML(innerHTML) {document.getElementById('message').innerHTML += innerHTML + '<br/>';}
</script></html>
后端:
controller
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.Set;
import java.util.function.Consumer;import javax.annotation.Resource;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;@Validated
@RestController
@RequestMapping("/api/sse")
@Slf4j
@RefreshScope  // 会监听变化实时变化值
public class SseController {@Resourceprivate SseBizService sseBizService;/*** 创建用户连接并返回 SseEmitter** @param conversationId 用户ID* @return SseEmitter*/@SneakyThrows@GetMapping(value = "/connect", produces = "text/event-stream; charset=utf-8")public SseEmitter connect(String conversationId) {// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 注册回调sseEmitter.onCompletion(completionCallBack(conversationId));sseEmitter.onError(errorCallBack(conversationId));sseEmitter.onTimeout(timeoutCallBack(conversationId));log.info("创建新的sse连接,当前用户:{}", conversationId);sseBizService.addConnect(conversationId,sseEmitter);sseBizService.sendMsg(conversationId,"链接成功");
//        sseCache.get(conversationId).send(SseEmitter.event().reconnectTime(10000).data("链接成功"),MediaType.TEXT_EVENT_STREAM);return sseEmitter;}/*** 给指定用户发送信息  -- 单播*/@GetMapping(value = "/send", produces = "text/event-stream; charset=utf-8")public void sendMessage(String conversationId, String msg) {sseBizService.sendMsg(conversationId,msg);}/*** 移除用户连接*/@GetMapping(value = "/disconnection", produces = "text/event-stream; charset=utf-8")public void removeUser(String conversationId) {log.info("移除用户:{}", conversationId);sseBizService.deleteConnect(conversationId);}/*** 向多人发布消息   -- 组播* @param groupId 开头标识* @param message 消息内容*/public void groupSendMessage(String groupId, String message) {/* if (!BaseUtil.isNullOrEmpty(sseCache)) {*//*Set<String> ids = sseEmitterMap.keySet().stream().filter(m -> m.startsWith(groupId)).collect(Collectors.toSet());batchSendMessage(message, ids);*//*sseCache.forEach((k, v) -> {try {if (k.startsWith(groupId)) {v.send(message, MediaType.APPLICATION_JSON);}} catch (IOException e) {log.error("用户[{}]推送异常:{}", k, e.getMessage());removeUser(k);}});}*/}/*** 群发所有人   -- 广播*/public void batchSendMessage(String message) {/*sseCache.forEach((k, v) -> {try {v.send(message, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("用户[{}]推送异常:{}", k, e.getMessage());removeUser(k);}});*/}/*** 群发消息*/public void batchSendMessage(String message, Set<String> ids) {ids.forEach(userId -> sendMessage(userId, message));}/*** 获取当前连接信息*/
//    public List<String> getIds() {
//        return new ArrayList<>(sseCache.keySet());
//    }/*** 获取当前连接数量*/
//    public int getUserCount() {
//        return count.intValue();
//    }private Runnable completionCallBack(String userId) {return () -> {log.info("结束连接:{}", userId);removeUser(userId);};}private Runnable timeoutCallBack(String userId) {return () -> {log.info("连接超时:{}", userId);removeUser(userId);};}private Consumer<Throwable> errorCallBack(String userId) {return throwable -> {log.info("连接异常:{}", userId);removeUser(userId);};}
}
service
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;@Component
@Slf4j
@RefreshScope  // 会监听变化实时变化值
public class SseBizService {/*** * 当前连接数*/private AtomicInteger count = new AtomicInteger(0);/*** 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面*/private Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();/*** 添加用户* @author pengbin <pengbin>* @date 2023/9/11 11:37* @param* @return*/public void addConnect(String id,SseEmitter sseEmitter){sseCache.put(id, sseEmitter);// 数量+1count.getAndIncrement();}/*** 删除用户* @author pengbin <pengbin>* @date 2023/9/11 11:37* @param* @return*/public void deleteConnect(String id){sseCache.remove(id);// 数量+1count.getAndDecrement();}/*** 发送消息* @author pengbin <pengbin>* @date 2023/9/11 11:38* @param* @return*/@SneakyThrowspublic void sendMsg(String id, String msg){if(sseCache.containsKey(id)){sseCache.get(id).send(msg, MediaType.TEXT_EVENT_STREAM);}}}

方法二:SSE建立eventSource,使用完成后即刻销毁

前端:在接收到结束标识后立即销毁

/*** 客户端收到服务器发来的数据* 另一种写法:source.onmessage = function (event) {}*/source.addEventListener('message', function(e) {//console.log(e);setMessageInnerHTML(e.data);if(e.data == '[DONE]'){source.close();}});

后端:
 

@SneakyThrows@GetMapping(value = "/stream/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter completionsStream(@RequestParam String conversationId){//List<ChatParamMessagesBO> messagesBOList =new ArrayList();// 获取内容信息ChatParamBO build = ChatParamBO.builder().temperature(0.7).stream(true).model("xxxx").messages(messagesBOList).build();SseEmitter emitter = new SseEmitter();// 定义see接口Request request = new Request.Builder().url("xxx").header("Authorization","xxxx").post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),JsonUtils.toJson(build))).build();OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.MINUTES).readTimeout(10, TimeUnit.MINUTES)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天.build();StringBuffer sb = new StringBuffer("");// 实例化EventSource,注册EventSource监听器RealEventSource realEventSource = null;realEventSource = new RealEventSource(request, new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {log.info("onOpen");}@SneakyThrows@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {log.info(data);//请求到的数据try {ChatResultBO chatResultBO = JsonUtils.toObject(data.replace("data:", ""), ChatResultBO.class);String content = chatResultBO.getChoices().get(0).getDelta().getContent();sb.append(content);emitter.send(SseEmitter.event().data(JsonUtils.toJson(ChatContentBO.builder().content(content).build())));} catch (Exception e) {
//                        e.printStackTrace();}if("[DONE]".equals(data)){emitter.send(SseEmitter.event().data(data));emitter.complete();log.info("result={}",sb);}}@Overridepublic void onClosed(EventSource eventSource) {log.info("onClosed,eventSource={}",eventSource);//这边可以监听并重新打开
//                emitter.complete();}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {log.info("onFailure,t={},response={}",t,response);//这边可以监听并重新打开
//                emitter.complete();}});realEventSource.connect(okHttpClient);//真正开始请求的一步return emitter;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/103401.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WebDAV之π-Disk派盘 + 恒星播放器

想要拥有一款万能视频播放器,全能解码播放器,无需转码,支持所有格式的视频和音频,直接播放的播放器?那就选恒星播放器。 恒星播放器支持视频投屏,倍速播放,后台播放等功能,还能一键截图和录制gif动图。支持全格式超高清真4K解码,蓝光HDR低占用,支持ISO文件直出的播放…

CoT 的方式使用 LLM 设计测试用例实践

前期准备 import SparkApi import os from dotenv import load_dotenv, find_dotenv#以下密钥信息从控制台获取_=load_dotenv(find_dotenv()) appid = os.getenv("SPARK_APP_ID") api_secret=os.getenv("SPARK_APP_SECRET") api_key=os.getenv("SPAR…

[Spring] SpringMVC 简介(二)

目录 五、域对象共享数据 1、使用 ServletAPI 向 request 域对象共享数据 2、使用 ModelAndView 向 request 域对象共享数据 3、使用 Model、Map、ModelMap 向 request 域对象共享数据 4、向 session 域和 application 域共享数据 六、SpringMVC 的视图 1、ThymeleafVie…

springboot 定时任务

springboot 定时任务 在Spring Boot中&#xff0c;你可以使用Spring的Scheduled注解来创建定时任务。以下是一个简单的示例&#xff1a; 首先&#xff0c;你需要在Spring Boot的主类上添加EnableScheduling注解以启用定时任务功能。 import org.springframework.boot.Spring…

golang/云原生/Docker/DevOps/K8S/持续 集成/分布式/etcd 教程

3-6个月帮助学员掌握golang后端开发岗位必备技术点 教程时长: 150小时 五大核心专栏,原理源码案例分析项目实战直击工作岗位 golang&#xff1a;解决go语言编程问题 工程组件&#xff1a;解决golang工程化问题 分布式中间件&#xff1a;解决技术栈单一及分布式开发问题 云原生…

Excel 中使用数据透视图进行数据可视化

使用数据透视表&#xff08;PivotTable&#xff09;是在Excel中进行数据可视化的强大工具。下面将提供详细的步骤来使用数据透视表进行数据可视化。 **步骤一&#xff1a;准备数据** 首先&#xff0c;确保你有一个包含所需数据的Excel表格。数据应该按照一定的结构和格式组织…

使用Swift开发Framework遇到的问题及解决方法

文章目录 一、Swift 旧版本Xcode 打出来的framework 新版本不兼容问题 一、Swift 旧版本Xcode 打出来的framework 新版本不兼容问题 Cannot load module xxx built with SDK ihphoneos16.4 when using SDK iphoneos17.0:XXX/xxx.framework/Modules/xxx.swiftmodule/arm64-appl…

如何提升网站排名和用户体验:优化网站速度

网站的排名和用户满意度直接受到站点内容的加载速度影响深远。通过精心的网站优化&#xff0c;您不仅可以提高排名&#xff0c;还可以提供更出色的用户体验&#xff0c;尽管用户可能不会察觉到您的网站加载得更快&#xff0c;但这是一个非常有意义的改进。在这篇文章中&#xf…

【C++ Miscellany】如何在同一个程序中结合C++和C

请记住以下几个简单守则&#xff1a; 确定你的C和C编译器产出兼容的目标文件&#xff08;object files&#xff09;。将双方都使用的函数声明为extern “C”。如果可能&#xff0c;尽量在C中撰写main。总是以delete删除new返回的内存&#xff1b;总以free释放malloc返回的内存…

stm32学习笔记:EXIT中断

1、中断系统 中断系统是管理和执行中断的逻辑结构&#xff0c;外部中断是众多能产生中断的外设之一。 1.中断&#xff1a; 在主程序运行过程中&#xff0c;出现了特定的中断触发条件 (中断源&#xff0c;如对于外部中断来说可以是引脚发生了电平跳变&#xff0c;对于定时器来…

流式数据湖平台实战 | HudiSQL DML

本文介绍SparkSQL提供的几个数据操作语言(DML)操作,用于与Hudi表交互。这些操作包括插入、更新、合并和删除Hudi表中的数据。 1.Insert Into 使用INSERT INTO语句使用Spark SQL将数据添加到Hudi表中。以下是一些示例: INSERT INTO <table> SELECT <columns> F…

Web开发-拦截器介绍

目录 拦截器的主要作用场景&#xff1a;验证用户登录是否有效拦截Ajax请求拦截其他请求 拦截器的主要作用 完成请求参数的解析&#xff0c;将页面表单参数赋给值栈中相应属性&#xff0c;执行功能检验&#xff0c;程序异常调试等工作。在MVC设计模式的框架下&#xff0c;拦截器…

4WE6Y61B/CG24N9Z5L液压电磁阀

特点 1.直动式电磁铁操作方向滑阀作为标准类型; 2.安装面按DIN24 340 A型ISO4401和CETOP-RP 121H;3.电磁铁可任意旋转&#xff0c;线圈可拆卸的直流或交流湿式电磁铁; 4.可不放油液更换线圈; 5.可带有手动应急操作推杆&#xff0c;

打造高效的分布式爬虫系统:利用Scrapy框架实现

在大数据时代的今天&#xff0c;爬虫系统成为了获取和分析海量数据的重要工具。本文将介绍如何使用Scrapy框架来构建一个高效的分布式爬虫系统&#xff0c;以加速数据采集过程和提高系统的可扩展性。 Scrapy框架简介 Scrapy是一个基于Python的强大的开源网络爬虫框架&#xff…

Element UI之Button 按钮

Button 按钮 常用的操作按钮。 按需引入方式 如果是完整引入可跳过此步骤 import Vue from vue import { Button } from element-ui import element-ui/lib/theme-chalk/base.css import element-ui/lib/theme-chalk/button.css import element-ui/lib/theme-chalk/icon.cs…

在网络安全、爬虫和HTTP协议中的重要性和应用

1. Socks5代理&#xff1a;保障多协议安全传输 Socks5代理是一种功能强大的代理协议&#xff0c;支持多种网络协议&#xff0c;包括HTTP、HTTPS和FTP。相比之下&#xff0c;Socks5代理提供了更高的安全性和功能性&#xff0c;包括&#xff1a; 多协议支持&#xff1a; Socks5代…

G1 GC详解及设置

一、概述 G1 GC&#xff0c;全称Garbage-First Garbage Collector&#xff0c;在JDK1.7中引入了G1 GC&#xff0c;从JAVA 9开始&#xff0c;G1 GC是默认的GC算法。通过-XX:UseG1GC参数来启用。G1收集器是工作在堆内不同分区上的收集器&#xff0c;分区既可以是年轻代也可以是老…

androidx和v4包资源冲突解决方法

一、资源包会报如下错误&#xff1a; 错误类似 (androidx.core:core:1.10.0) 和 (com.android.support:support-compat:24.2.0) 表示资源重复&#xff0c;不知调用androidx包下面的&#xff0c;还是v4包下面的 Duplicate class android.support.v4.app.INotificationSideCha…

一图看懂CodeArts Inspector 三大特性,带你玩转漏洞管理服务

华为云漏洞管理服务CodeArts Inspector是面向软件研发和服务运维提供的一站式漏洞管理能力&#xff0c;通过持续评估系统和应用等资产&#xff0c;内置风险量化管理和在线风险分析处置能力&#xff0c;帮助组织快速感应和响应漏洞&#xff0c;并及时有效地完成漏洞修复工作&…

探索UI设计|栅格系统的深入分析和应用

界面排版太乱了。你知道网格系统的用途吗&#xff1f;网格系统困扰着许多初级网页设计师&#xff0c;就像一个谜。如果您对网格在设计中的应用有任何疑问&#xff0c;本文是为您量身定制的&#xff0c;并深入分析UI设计中网格系统的基本要素和优点。 什么是网格系统 网格系统…