SpringBoot集成websocket(3)|(websocket调用websocket采用回调方式实现数据互传)
文章目录
- SpringBoot集成websocket(3)|(websocket调用websocket采用回调方式实现数据互传)
- @[TOC]
- 前言
- 一、websocket服务端依赖引入
- 二、websocket服务代码实现
- 1.WebSocketConfig配置
- 2.WebSocketServer服务实现
- 3.ChatClient4Chat连接工具实现
- 3.WebSocketClient连接第三方客户端实现
- 总结
文章目录
- SpringBoot集成websocket(3)|(websocket调用websocket采用回调方式实现数据互传)
- @[TOC]
- 前言
- 一、websocket服务端依赖引入
- 二、websocket服务代码实现
- 1.WebSocketConfig配置
- 2.WebSocketServer服务实现
- 3.ChatClient4Chat连接工具实现
- 3.WebSocketClient连接第三方客户端实现
- 总结
章节
第一章链接: SpringBoot集成websocket(1)|(websocket客户端实现)
第二章链接: SpringBoot集成websocket(2)|(websocket服务端实现以及websocket中转实现)
前言
本节主要介绍的是springboot实现websocket的客户端服务端,以及客户端与服务端的数据互传。以下为伪代码,业务逻辑删除导致不能直接拷贝运行,大家可以参考其中的思路实现。
一、websocket服务端依赖引入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
二、websocket服务代码实现
1.WebSocketConfig配置
springboot接入websocket需要启用对应的配置
@Configuration
@EnableWebSocket
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter(){return new ServerEndpointExporter();}
}
2.WebSocketServer服务实现
springboot提供对外的websocket接口实现
@Component
@Data
@Slf4j
@ServerEndpoint(value = "/v1/chat")
public class DocChatServer {public final static String CHAT_ERR_MSG_FORMAT = "{\"header\":{\"code\":10001,\"message\":\"参数格式不对\",\"sId\":\"%s\",\"status\":2}}";@Autowiredprivate void setOriginMessageSender() {// 初始化注入bean 隐藏掉了}@OnOpenpublic void OnOpen(Session session) {log.debug("chat websocket open ");}@OnClosepublic void OnClose() {log.debug("chat websocket close ");}@OnMessagepublic void OnMessage(Session session, String message) {SearchDocParamVo param = null;log.debug("ApiRequest = {}", message);// 参数校验try {JSONObject jsonObject = JSONObject.parseObject(message);// todo 参数校验} catch (Exception e) {String errMsg = String.format(CHAT_ERR_MSG_FORMAT, session.getId());log.error("chat请求参数格式不会:{},异常:{}", errMsg, e);send(session, errMsg);return;}// todo 业务处理List<ChatRecord.Source> sources = Lists.newArrayList();;String prompt = "";// 谈话接口queryChat(session, param, prompt, sources);}/*** 执行谈话** @param session* @param param* @param prompt* @param sources*/private void queryChat(Session session, SearchDocParamVo param, String prompt, List<ChatRecord.Source> sources) {// todo 业务处理 。。。// 消息发送try {boolean b = this.sendChatMessage(session, param, sources, texts);if (!b) {List<Text> textsTry = Lists.newArrayList();Text build1 = Text.builder().role("user").content("请更具自己的理解回答问题:" + param.getContent()).build();textsTry.add(build1);this.sendChatMessage(session, param, sources, textsTry);}} catch (Exception e) {log.error("发送消息异常:{}", e.getMessage());}}/*** 收到谈话响应数据处理** @param session* @param param* @param sources* @param texts* @return*/private boolean sendChatMessage(Session session, SearchDocParamVo param, List<ChatRecord.Source> sources, List<Text> texts) {ChatClient4Chat planetClient4Chat = new ChatClient4Chat(websocketConfigConst);try {planetClient4Chat.send(param, texts, new ApiResponseObserver() {public void onReceive(String message) {// 收到远程websocket服务响应的数据}public void onError(Throwable throwable) {log.error("收到错误:{}", throwable);}public void onCompleted() {log.error("收到结束");}});// 以下是业务逻辑 可忽略for (int i = 0; i < 100; i++) {if (planetClient4Chat.isHasCheck()) {log.debug("has check");return planetClient4Chat.isSuccess();} else {Thread.sleep(500);}}} catch (Exception e) {log.error("发送消息异常:{}", e.getMessage());}return true;}public void send(Session session, String msg) {synchronized (session) {if (!session.isOpen()) {log.error("客户端连接关闭,数据不发送:{}", msg);return;}try {session.getBasicRemote().sendText(msg);} catch (IOException ex) {log.error("传递消息给客户端异常:{}", ex.getMessage());}}}public int getStatus(String message) {int status = -1;try {//todo 业务逻辑return choices.getStatus();} catch (Exception e) {log.error("数据中提取status异常:{}", e);}return status;}@OnErrorpublic void onerror(Session session, Throwable throwable) {log.error("chat连接异常关闭:远程主机强迫关闭了一个现有的连接:{}", throwable);}}
3.ChatClient4Chat连接工具实现
springboot提供对中间衔接工具了,连接第三饭websocket接口
实现代码如下
@Slf4j
@Getter
@Setter
public class ChatClient4Chat {private static WebsocketConfigConst websocketConfigConst;private StringBuilder stringBuilder;private boolean hasCheck;private boolean success;private Queue<String> queue;private ChatChatServer sparkChatServer;ChatClient4Chat(WebsocketConfigConst websocketConfigConst) {this.websocketConfigConst = websocketConfigConst;this.stringBuilder = new StringBuilder();this.hasCheck = false;this.success = true;this.queue = new LinkedList<String>();}/*** 执行聊天** @param param*/public void send(SearchDocParamVo param, List<Text> texts, ApiResponseObserver apiResponseObserver) {// 获取连接ChatChatServer chatServer = (ChatChatServer ) getWebSocketClient(apiResponseObserver);if (chatServer != null && chatServer.isOpen()) {this.sparkChatServer = chatServer;// 消息发送try {chatServer.send(SparkHand.initParam(param, texts, websocketConfigConst.type, websocketConfigConst.appid, websocketConfigConst.token));} catch (Exception e) {log.error("发送消息异常:{}", e.getMessage());}} else {log.error("接口连接未打开");}}public void close() {// 获取连接if (sparkChatServer != null && sparkChatServer.isOpen()) {sparkChatServer.close();} else {log.error("接口连接未打开,关闭异常");}}private void waitConnect() {try {Thread.sleep(50);} catch (InterruptedException e) {log.error("等待连接异常");}}private WebSocketClient getWebSocketClient(ApiResponseObserver apiResponseObserver) {WebSocketClient client = new SparkChatServer(websocketConfigConst.chaturl, apiResponseObserver);client.connect();waitConnect();return client;}
}
ApiResponseObserver 是一个定义的接口,规范一些方法
public interface ApiResponseObserver extends ResponseObservable<String> {
}public interface ResponseObservable<T> {void onReceive(T response);void onError(Throwable throwable);void onCompleted();
}
3.WebSocketClient连接第三方客户端实现
springboot提供对第三方websocket连接的客户端
实现代码如下
@Slf4j
public class SparkChatServer extends WebSocketClient {private ApiResponseObserver apiResponseObserver;public SparkChatServer(URI serverUri, ApiResponseObserver apiResponseObserver) {super(serverUri);this.apiResponseObserver = apiResponseObserver;}@Overridepublic void onOpen(ServerHandshake serverHandshake) {log.debug("chat 服务连接成功");}@Overridepublic void onMessage(String message) {log.debug("收到chat数据{}", message);apiResponseObserver.onReceive(message);}@Overridepublic void onClose(int i, String s, boolean b) {log.debug("退出chat连接");}@Overridepublic void onError(Exception e) {log.error("chat连接出现异常:{}", e);}
}
总结
本文主要介绍websocket客户端、服务端的实现,同时通过连接工具中转websocket请求参数,捉到实时同步,以及数据收集。代码为伪代码,删除了实际使用当中的业务逻辑,介绍的是实现实录,大家可以参考。
第一章链接: SpringBoot集成websocket(1)|(websocket客户端实现)
第二章链接: SpringBoot集成websocket(2)|(websocket服务端实现以及websocket中转实现)