基于springboot websocket和okhttp实现消息中转

1、业务介绍

消息源服务的消息不能直接推给用户侧,用户与中间服务建立websocket连接,中间服务再与源服务建立websocket连接,源服务的消息推给中间服务,中间服务再将消息推送给用户。流程如下图:
在这里插入图片描述
此例中我们定义中间服务A的端口为8082,消息源头服务B的端口为8081,方便阅读下面代码。
说明:此例子只实现了中间服务的转发,连接的关闭等其他逻辑并没有完善,如需要请自行完善;

2、中间服务实现

中间服务即为上图的中间服务A,由于中间服务既要发送(发给用户端)消息,又要接收(从消息源服务B接收)消息,故服务A分为服务端与客户端。
服务A的websocket服务端我们使用springboot websocket实现,客户端使用okhttp实现;会话缓存暂使用内存缓存(实际项目中可置于其他缓存中)
中间服务所需依赖为:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.2.2</version>
</dependency>

缓存类:

public class WSCache {//存储客户端session信息, {会话id:ws_session}public static Map<String, Session> clients = new ConcurrentHashMap<>();//存储把不同用户的客户端session信息集合 {userId, [会话id1,会话id2,会话id3,会话id4]}public static Map<String, Set<String>> connection = new ConcurrentHashMap<>();
}

自定义消息类:

@Accessors(chain = true)
@Data
public class MsgInfo {private String massage;//为userId,用于从缓存中获取对应用户的websocket sessionprivate String userKey;
}

2.1 中间服务A的客户端:

客户端也可以使用springboot websocket,当下我们选择使用okhttp实现。

@Slf4j
public class CommonWSClient extends WebSocketListener {/*** websocket连接建立** @param webSocket* @param response*/@Overridepublic void onOpen(WebSocket webSocket, Response response) {super.onOpen(webSocket, response);log.info("客户端连接建立:{}", response.body().string());}/*** 收到消息* @param webSocket* @param text*/@Overridepublic void onMessage(WebSocket webSocket, String text) {super.onMessage(webSocket, text);log.info("okhttp receive=>{}", text);//todo 收到源(8081)的消息,取到对应userId的消息,并将消息通过本地server发送给用户ObjectMapper mapper = new ObjectMapper();try {MsgInfo msgInfo = mapper.readValue(text, MsgInfo.class);Set<String> strings = WSCache.connection.get(msgInfo.getUserKey());if(!CollectionUtils.isEmpty(strings)){for (String sid : strings) {Session session = WSCache.clients.get(sid);session.getBasicRemote().sendText(msgInfo.getMassage());}}} catch (Exception e) {e.printStackTrace();//throw new RuntimeException(e);}}@Overridepublic void onMessage(WebSocket webSocket, ByteString bytes) {super.onMessage(webSocket, bytes);}@Overridepublic void onClosing(WebSocket webSocket, int code, String reason) {super.onClosing(webSocket, code, reason);log.info("okhttp socket closing.");}@Overridepublic void onClosed(WebSocket webSocket, int code, String reason) {super.onClosed(webSocket, code, reason);log.info("okhttp socket closed.");}@Overridepublic void onFailure(WebSocket webSocket, Throwable t, Response response) {super.onFailure(webSocket, t, response);if (response == null) {log.error("okhttp onFailure, response is null.");return;}try {log.error("okhttp onFailure, code: {}, errmsg: {}", response.code(), response.body().string());} catch (IOException e) {log.warn("okhttp onFailure failed, error: {}", e.getMessage());}}}

2.2 中间服务A的服务端:

websocket服务:

@Slf4j
@Component
@ServerEndpoint("/notice/{userId}")
public class WebSocketServer {//会话idprivate String sid = null;//建立连接的用户idprivate String userId;/*** @description: 当与用户端连接成功时,执行该方法* @PathParam 获取ServerEndpoint路径中的占位符信息类似 控制层的 @PathVariable注解**/@OnOpenpublic String onOpen(Session session, @PathParam("userId") String userId){this.sid = UUID.randomUUID().toString();this.userId = userId;WSCache.clients.put(this.sid,session);//判断该用户是否存在会话信息,不存在则添加Set<String> clientSet = WSCache.connection.get(userId);if (CollectionUtils.isEmpty(clientSet)){clientSet = new HashSet<>();clientSet.add(this.sid);}else {clientSet.add(this.sid);}WSCache.connection.put(userId,clientSet);log.info("用户{}与本地(8082)server建立连接", this.userId);//todo 本地client与源server(8081)连接Request requestRemote = new Request.Builder().url("ws://127.0.0.1:8081/api/notice/" + userId).build();OkHttpClient webSocketClientRemote = new OkHttpClient.Builder().build();WebSocket localClientRemote = webSocketClientRemote.newWebSocket(requestRemote, new CommonWSClient());log.info("本地server创建本地client,且本地client与远程(8082)server连接成功");return userId + "与本地server连接";}/*** @description: 当连接失败时,执行该方法**/@OnClosepublic void onClose(){WSCache.clients.remove(this.sid);System.out.println(this.sid+"连接断开");}/*** @description: 当收到client发送的消息时,执行该方法**/@OnMessagepublic void onMessage(String message, Session session) {System.out.println("-----------收到来自用户:" + this.userId + "的信息   " + message);}/*** @description: 当连接发生错误时,执行该方法**/@OnErrorpublic void onError(Throwable error){System.out.println("error--------系统错误");error.printStackTrace();}
}

websocket配置类:

@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter(){return new ServerEndpointExporter();}
}

3、消息源服务

消息源服务B只需要websocket服务用来发送消息即可,其实现与中间服务A的服务端相同。
服务:

@Slf4j
@Component
@ServerEndpoint("/notice/{userId}")
public class WebSocketServer {//存储客户端session信息, {会话id:ws_session}public static Map<String, Session> clients = new ConcurrentHashMap<>();//存储把不同用户的客户端session信息集合 {userId, [会话id1,会话id2,会话id3,会话id4]}public static Map<String, Set<String>> connection = new ConcurrentHashMap<>();//会话idprivate String sid = null;//建立连接的用户idprivate String userId;/*** @description: 当与客户端的websocket连接成功时,执行该方法* @PathParam 获取ServerEndpoint路径中的占位符信息类似 控制层的 @PathVariable注解**/@OnOpenpublic void onOpen(Session session, @PathParam("userId") String userId){log.info("onOpen-->session.getRequestParameterMap():{}", session.getRequestParameterMap());this.sid = UUID.randomUUID().toString();this.userId = userId;clients.put(this.sid,session);//判断该用户是否存在会话信息,不存在则添加Set<String> clientSet = connection.get(userId);if (clientSet == null){clientSet = new HashSet<>();connection.put(userId,clientSet);}clientSet.add(this.sid);System.out.println(this.userId + "用户建立连接," + this.sid+"连接开启!");}/*** @description: 当连接失败时,执行该方法**/@OnClosepublic void onClose(){clients.remove(this.sid);System.out.println(this.sid+"连接断开");}/*** @description: 当收到客户端发送的消息时,执行该方法**/@OnMessagepublic void onMessage(String message, Session session) {System.out.println("-----------收到来自用户:" + this.userId + "的信息   " + message);//自定义消息实体MsgInfo msgInfo = new MsgInfo().setUserKey(this.userId).setMassage("服务端-" + System.currentTimeMillis() + ":已收到用户" +this.userId + "的信息: " + message);sendMessageByUserId(this.userId,  msgInfo);}/*** @description: 当连接发生错误时,执行该方法**/@OnErrorpublic void onError(Throwable error){System.out.println("error--------系统错误");error.printStackTrace();}/*** @description: 通过userId向用户发送信息* 该类定义成静态可以配合定时任务实现定时推送**/public static void sendMessageByUserId(String userId, MsgInfo msgInfo){if (!StringUtils.isEmpty(userId)) {Set<String> clientSet = connection.get(userId);//用户是否存在客户端连接if (Objects.nonNull(clientSet)) {Iterator<String> iterator = clientSet.iterator();while (iterator.hasNext()) {String sid = iterator.next();Session session = clients.get(sid);//向每个会话发送消息if (Objects.nonNull(session)){try {//同步发送数据,需要等上一个sendText发送完成才执行下一个发送ObjectMapper mapper = new ObjectMapper();session.getBasicRemote().sendText(mapper.writeValueAsString(msgInfo));} catch (Exception e) {e.printStackTrace();}}}}}}@Scheduled(cron = "0/10 * * * * ?")public void testSendMessageByCron(){log.info("-----------模拟消息开始发送--------------");//模拟两个用户100和200MsgInfo msg100 = new MsgInfo().setUserKey("100").setMassage("这是8081发给用户100的消息" + System.currentTimeMillis());sendMessageByUserId("100", msg100);MsgInfo msg200 = new MsgInfo().setUserKey("200").setMassage("这是8081发给用户200的消息" + System.currentTimeMillis());sendMessageByUserId("200", msg200);}
}

4、测试

我们使用: wss在线测试工具进行测试;
1、 打开两个该工具窗口,分别模拟用户100和用户200,这两个用户都连接中间服务A(端口8082的服务);
用户100
用户200
2、分别启动消息源服务B和中间服务A
此时在服务B控制台我们可以看到:
在这里插入图片描述
我们模拟的消息发送已经在给用户100和用户200发送,因为我们的用户100和用户200均没有与中间服务A建立连接,故此时测试界面看不到消息;
当我们在用户100的模拟界面点击“开启连接”后,可以在右侧看到发给用户100的模拟消息:
在这里插入图片描述

之后我们再打开用户200的连接:
在这里插入图片描述

好了,到这里就结束了,有任何问题请积极指出,此例子只是个例子,并未经受任何生产的测试,欢迎讨论沟通:)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/32824.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解析PDF文件中的图片为文本

解析PDF文件中的图片为文本 1 介绍 解析PDF文件中的图片&#xff0c;由两种思路&#xff0c;一种是自己读取PDF文件中的图片&#xff0c;然后用OCR解析&#xff0c;例如&#xff1a;使用PyMuPDF读取pdf文件&#xff0c;再用PaddleOCR或者Tesseract-OCR识别文字。另一种使用第…

信息学奥赛初赛天天练-32-CSP-J2022基础题-中缀表达式、后缀表达式、哈夫曼编码、连通图、邻居矩阵、完全二叉树、数组存储

PDF文档公众号回复关键字:20240623 2022 CSP-J 选择题 单项选择题&#xff08;共15题&#xff0c;每题2分&#xff0c;共计30分&#xff1a;每题有且仅有一个正确选项&#xff09; 6.对表达式a(b-c)*d的前缀表达式为( ),其中 、- 、 * 是运算符。 A. * a - bcd B. a * - …

【从0实现React18】 (四) 如何触发更新 带你了解react触发更新的流程以及更新后如何触发render

常见的触发更新的方式 创建 React 应用的根对象 ReactDOM.creatRoot().render()&#xff1b;类组件 this.setState()&#xff1b;函数组件 useState useEffect&#xff1b; 我们希望实现一套统一的更新机制&#xff0c;他的特点是&#xff1a; 兼容上述触发更新的方式方便后续…

c++学习-----内存管理

1. C/C内存分布 我们先来看下面的一段代码和相关问题 答案揭晓&#xff1a; 这里很多人会误认为*char2在常量区&#xff0c;这其实是错误的 因为&#xff1a; 首先在内存字符常量区分配一块内存空间放下”abcd\0”&#xff0c;然后在栈中分配一块连续的内存空间&#xff0c;…

SQL-Python

师从黑马程序员 数据库介绍 数据库就是存储数据的库 数据组织&#xff1a;库->表->数据 数据库和SQL的关系 MySQL的基础命令 SQL基础 SQL语言的分类 SQL的语法特征 DDL-库管理 show DATABASES;use sys;SELECT database();CREATE DATABASE test CHARSET utf-8;SHOW D…

学习C++第二天

1.缺省参数 缺省参数的概念&#xff1a; 缺省参数是声明或定义函数时为函数的参数指定一个缺省值。在调用该函数时&#xff0c;如果没有指定实参则采用该形参的缺省值&#xff0c;否则使用指定的实参。 void show(int a 10) {cout << a << endl; }int main() {sho…

ubuntu18.04 编译HBA 并实例运行

HBA是一个激光点云层级式的全局优化的程序&#xff0c;他的论文题目是&#xff1a;HBA: A Globally Consistent and Efficient Large-Scale LiDAR Mapping Module&#xff0c;对应的github地址是&#xff1a;HKU-Mars-Lab GitHub 学习本博客&#xff0c;可以学到gtsam安装&am…

提升Python技能的七个函数式编程技巧

文章目录 📖 介绍 📖🏡 演示环境 🏡📒 文章内容 📒📝 递归📝 结构化模式匹配📝 不变性📝 纯函数📝 高阶函数📝 函数组合📝 惰性求值⚓️ 相关链接 ⚓️📖 介绍 📖 在现代编程中,虽然Python并不是一门纯粹的函数式编程语言,但函数式编程(Funct…

Linux C/C++ socket函数

目录 socket函数 函数原型 头文件 功能 返回值 参数 错误码 socket函数 函数原型 int socket(int domain, int type, int protocol); 头文件 #include <sys/types.h> #include <sys/socket.h> 功能 创建一个用于通信的端点&#xff0c;并返回一个文件描述符…

登录安全分析报告:链家地产

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞 …

力扣刷题 杨辉三角(使用c++ vector解法)

杨辉三角 题目描述示例1示例2提示:代码 题目描述 给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例1 输入: numRows 5 输出: [[1],[1,1],[1,2,1],[1,3,3,1],[1,4,6,4,1]] 示例2 …

上位机图像处理和嵌入式模块部署(mcu和swd接口)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 最近学习mcu的时候&#xff0c;接触了不少调试器&#xff0c;这里面有daplink、st-link v2、j-link v9。虽然模块的形状可能不太一样&#xff0c;但…

基于I2C协议的AHT20温湿度传感器的数据采集

一、I2C总线通信协议 软件I2C 软件I2C&#xff0c;也称为模拟I2C或bit-bang I2C&#xff0c;是一种通过微控制器的通用输入输出&#xff08;GPIO&#xff09;引脚来模拟I2C总线通信的方式。它不依赖于专门的硬件I2C接口&#xff0c;而是通过编程控制GPIO引脚的电平状态来实现I…

快去复习吧+++常用算法及参考算法 递推法++穷举法++排序(冒泡、选择)++查找(顺序、折半)++字符串处理++方程求根++无穷级数求和

接上&#xff1a;常用算法及参考算法 &#xff08;1&#xff09;累加 &#xff08;2&#xff09;累乘 &#xff08;3&#xff09;素数 &#xff08;4&#xff09;最大公约数 &#xff08;5&#xff09;最值问题 &#xff08;6&#xff09;迭代法 常用算法及参考算法 7. 递推法…

Vue-观察器(watch)的定义方式引发组件初始值没有渲染成功问题(已解决)

问题描述&#xff1a;在测试环境发现一个问题&#xff0c;打开一张表单的时候&#xff0c;所有字段都成功赋上了值&#xff0c;唯独一个人员组件的值&#xff08;出差人员&#xff09;没有带出&#xff0c;而接口返回的数据是正常的&#xff0c;也就是说不是后端接口的问题&…

JVM专题七:JVM垃圾回收机制

JVM专题六&#xff1a;JVM的内存模型中&#xff0c;我们介绍了JVM内存主要分哪些区域&#xff0c;这些区域分别是干什么的&#xff0c;同时也举了个例子&#xff0c;在运行过程种各个区域数据是怎样流转的。细心的小伙伴可能发现一个问题&#xff0c;在介绍完方法弹栈以后就没有…

指令微调数据集构建方法

指令微调&#xff08;Instruction Tuning&#xff09;&#xff0c;是指使用自然语言形式的数据对预训练后的大语言模型进行参数微调&#xff0c;在一些文章中也称为有监督微调&#xff08;Supervised Fine-tuning&#xff0c;SFT&#xff09;或多任务提示训练&#xff08;Multi…

CARLA自动驾驶模拟器基础

CARLA 使用服务器-客户端架构运行&#xff0c;其中 CARLA 服务器运行模拟并由客户端向其发送指令。客户端代码使用 API 与服务器进行通信。要使用 Python API&#xff0c;您必须通过 PIP 安装该模块&#xff1a; pip3 install carla-simulator # Python 3World and client 客…

React18中各种Hooks用法总结( 内附案例讲解)

React中各种Hooks用法总结 内附案例讲解 一、useState useState 是一个 React Hook&#xff0c;它允许你向组件添加一个 状态变量。 import React, { FC, memo, useState } from react import { MainContainer } from ./style interface IProps {children?: React.ReactNo…

FRP内网穿透及多级代理的使用

目录 0、前言 1、场景介绍 2、环境准备 2.1 下载frp 2.2 配置一台VPS 2.3 socks5客户端 2.5 网络环境准备 3、Frp设置 3.1 一层代理 3.1 二层代理 4、Frp总结 0、前言 FRP是比较老牌的也是比较流行的反向代理、内网穿透软件。FRP用途和使用场景可以看官方文档&#xff0c;…