WebSocket的那些事(5-Spring中STOMP连接外部消息代理)

目录

  • 一、序言
  • 二、开启RabbitMQ外部消息代理
  • 三、代码示例
    • 1、Maven依赖项
    • 2、相关实体
    • 3、自定义用户认证拦截器
    • 4、Websocket外部消息代理配置
    • 5、ChatController
    • 6、前端页面chat.html
  • 四、测试示例
    • 1、群聊、私聊、后台定时推送测试
    • 2、登录RabbitMQ控制台查看队列信息
  • 五、结语

一、序言

上节我们在 WebSocket的那些事(4-Spring中的STOMP支持详解) 中详细说明了通过Spring内置消息代理结合STOMP子协议进行Websocket通信,以及相关注解的使用及原理。

但是Spring内置消息代理会有一些限制,比如只支持STOMP协议的一部分命令,像acksreceipts命令都是不支持的,还有由于内置消息代理把消息存储在内存,当应用不可用时,客户端也就订阅不到到后台推送的消息。

这节我们将会使用支持STOMP协议的外部消息代理(RabbitMQ)进行Websocket通信。


二、开启RabbitMQ外部消息代理

服务端路由发送消息以及客户端订阅消息都要通过STOMP协议与RabbitMQ进行交互,由于RabbitMQ默认没有启动STOMP插件,因此我们需要先启用该插件。

rabbitmq-plugins enable rabbitmq_stomp

启动该插件后,RabbitMQ中STOMP适配器默认会监听61613端口,如果是云服务器,需要把该端口在安全组中放开。

关于该插件说明请参考:RabbitMQ中STOMP插件说明。


三、代码示例

我们在 WebSocket的那些事(4-Spring中的STOMP支持详解)中写了一个简单的聊天Demo示例,下面我们对该聊天Demo示例进行改造,将Spring内置消息代理替换成RabbitMQ外部消息代理。

1、Maven依赖项

服务端和客户端与外部消息代理都是通过TCP进行通信,Spring底层默认使用的是NettyReactor,因此需要引入相关依赖项。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-reactor-netty</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>

2、相关实体

(1) 请求消息参数

@Data
public class WebSocketMsgDTO {private String name;private String content;
}

(2) 响应消息内容

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class WebSocketMsgVO {private String content;
}

(3) 自定义认证用户信息

@Data
@AllArgsConstructor
@NoArgsConstructor
public class StompAuthenticatedUser implements Principal {/*** 用户唯一ID*/private String userId;/*** 用户昵称*/private String nickName;/*** 用于指定用户消息推送的标识* @return*/@Overridepublic String getName() {return this.userId;}}

3、自定义用户认证拦截器

@Slf4j
public class UserAuthenticationChannelInterceptor implements ChannelInterceptor {private static final String USER_ID = "User-ID";private static final String USER_NAME = "User-Name";@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);// 如果是连接请求,记录userIdif (StompCommand.CONNECT.equals(accessor.getCommand())) {String userID = accessor.getFirstNativeHeader(USER_ID);String username = accessor.getFirstNativeHeader(USER_NAME);log.info("Stomp User-Related headers found, userID: {}, username:{}", userID, username);accessor.setUser(new StompAuthenticatedUser(userID, username));}return message;}}

4、Websocket外部消息代理配置

Spring中与外部消息代理通信的中间方被称之为Broker Relay,它会维护一个系统共享的单一TCP连接和外部消息代理进行通信,该TCP连接仅仅适用于服务端,用来发送消息,而不是接收消息,通过Broker RelaysystemLoginsystemPasscode属性可以设置该连接的认证信息。

Broker Relay也会为每个连接的Websocket客户端创建一个TCP连接,该连接用来接收消息,通过clientLoginclientPasscode属性可以设置连接的认证信息。

/*** Websocket连接外部消息代理配置* @author Nick Liu* @date 2023/9/6*/
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketExternalMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {// 拦截器配置registration.interceptors(new UserAuthenticationChannelInterceptor());}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/websocket") // WebSocket握手端口.addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOriginPatterns("*") // 设置跨域.withSockJS(); // 开启SockJS回退机制}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.setApplicationDestinationPrefixes("/app") // 发送到服务端目的地前缀.enableStompBrokerRelay("/topic") // 开启外部消息代理,指定消息订阅前缀.setRelayHost("localhost") // 外部消息代理Host.setRelayPort(61613) // 外部消息代理STOMP端口.setSystemLogin("admin")  // 共享系统连接用户名,该连接主要用来发送消息.setSystemPasscode("admin") // 共享系统连接密码,该连接主要用来发送消息.setClientLogin("admin") // 客户端连接用户名,该连接主要用来接收消息.setClientPasscode("admin") // 客户端连接密码,该连接主要用来接收消息.setVirtualHost("/stomp"); // RabbitMQ虚拟主机}
}

备注:我们可以为服务端与客户端的连接设置不同的用户,针对客户端连接用户进行权限管控,保证系统的安全性,在这里为了方便测试我们统一用一个用户。

5、ChatController

STOMP协议并没有规定消息代理必须支持哪种类型的Destinations(目的地),但是RabbitMQ STOMP适配器只支持一些指定的目的地类型,如下图:
在这里插入图片描述

  • /exchange:指定交换机和路由key,发送和订阅来自队列的消息。
  • /queue:发送和订阅受STOMP网关管理的队列的消息,最多只有一个订阅者能到消息。
  • /amq/queue:发送和订阅不受STOMP网关管理的队列的消息。
  • /topic:发送和订阅来自临时或者持久Topic的消息,多个订阅者都能接收到消息。
  • /temp-queue/:发送和订阅来自临时队列的消息。

参考文档见:RabbitMQ中STOMP插件说明。

在下面的示例中,我们选用了/topic的开头的消息发送和订阅前缀,目的地格式只能为/topic/{routing-key}routing-key不能有斜杠,否则会报错。

@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {private final SimpUserRegistry simpUserRegistry;private final SimpMessagingTemplate simpMessagingTemplate;/*** 模板引擎为Thymeleaf,需要加上spring-boot-starter-thymeleaf依赖,* @return*/@GetMapping("/page/chat")public ModelAndView turnToChatPage() {return new ModelAndView("chat");}/*** 群聊消息处理* 这里我们通过@SendTo注解指定消息目的地为"/topic/chat/group",如果不加该注解则会自动发送到"/topic" + "/chat/group"* @param webSocketMsgDTO 请求参数,消息处理器会自动将JSON字符串转换为对象* @return 消息内容,方法返回值将会广播给所有订阅"/topic/chat/group"的客户端*/@MessageMapping("/chat/group")@SendTo("/topic/chat-group")public WebSocketMsgVO groupChat(WebSocketMsgDTO webSocketMsgDTO) {log.info("Group chat message received: {}", FastJsonUtils.toJsonString(webSocketMsgDTO));String content = String.format("来自[%s]的群聊消息: %s", webSocketMsgDTO.getName(), webSocketMsgDTO.getContent());return WebSocketMsgVO.builder().content(content).build();}/*** 私聊消息处理* 这里我们通过@SendToUser注解指定消息目的地为"/topic/chat/private",发送目的地默认会拼接上"/user/"前缀* 实际发送目的地为"/user/topic/chat/private"* @param webSocketMsgDTO 请求参数,消息处理器会自动将JSON字符串转换为对象* @return 消息内容,方法返回值将会基于SessionID单播给指定用户*/@MessageMapping("/chat/private")@SendToUser("/topic/chat-private")public WebSocketMsgVO privateChat(WebSocketMsgDTO webSocketMsgDTO) {log.info("Private chat message received: {}", FastJsonUtils.toJsonString(webSocketMsgDTO));String content = "私聊消息回复:" + webSocketMsgDTO.getContent();return WebSocketMsgVO.builder().content(content).build();}/*** 定时消息推送,这里我们会列举所有在线的用户,然后单播给指定用户。* 通过SimpMessagingTemplate实例可以在任何地方推送消息。*/@Scheduled(fixedRate = 10 * 1000)public void pushMessageAtFixedRate() {log.info("当前在线人数: {}", simpUserRegistry.getUserCount());if (simpUserRegistry.getUserCount() <= 0) {return;}// 这里的Principal为StompAuthenticatedUser实例Set<StompAuthenticatedUser> users = simpUserRegistry.getUsers().stream().map(simpUser -> StompAuthenticatedUser.class.cast(simpUser.getPrincipal())).collect(Collectors.toSet());users.forEach(authenticatedUser -> {String userId = authenticatedUser.getUserId();String nickName = authenticatedUser.getNickName();WebSocketMsgVO webSocketMsgVO = new WebSocketMsgVO();webSocketMsgVO.setContent(String.format("定时推送的私聊消息, 接收人: %s, 时间: %s", nickName, LocalDateTime.now()));log.info("开始推送消息给指定用户, userId: {}, 消息内容:{}", userId, FastJsonUtils.toJsonString(webSocketMsgVO));simpMessagingTemplate.convertAndSendToUser(userId, "/topic/chat-push", webSocketMsgVO);});}}

6、前端页面chat.html

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head><meta charset="UTF-8"><title>chat</title><script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.6.1/sockjs.min.js"></script><script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script><script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.6.4/jquery.min.js"></script><style>#mainWrapper {width: 600px;margin: auto;}</style>
</head>
<body>
<div id="mainWrapper"><div><label for="username" style="margin-right: 5px">姓名:</label><input id="username" type="text"/></div><div id="msgWrapper"><p style="vertical-align: top">发送的消息:</p><textarea id="msgSent" style="width: 600px;height: 100px"></textarea><p style="vertical-align: top">收到的群聊消息:</p><textarea id="groupMsgReceived" style="width: 600px;height: 100px"></textarea><p style="vertical-align: top">收到的私聊消息:</p><textarea id="privateMsgReceived" style="width: 600px;height: 200px"></textarea></div><div style="margin-top: 5px;"><button onclick="connect()">连接</button><button onclick="sendGroupMessage()">发送群聊消息</button><button onclick="sendPrivateMessage()">发送私聊消息</button><button onclick="disconnect()">断开连接</button></div>
</div>
<script type="text/javascript">$(() => {$('#msgSent').val('');$("#groupMsgReceived").val('');$("#privateMsgReceived").val('');});let stompClient = null;// 连接服务器const connect = () => {const header = {"User-ID": new Date().getTime().toString(), "User-Name": $('#username').val()};const ws = new SockJS('http://localhost:8080/websocket');stompClient = Stomp.over(ws);stompClient.connect(header, () => subscribeTopic());}// 订阅主题const subscribeTopic = () => {alert("连接成功!");// 订阅广播消息stompClient.subscribe('/topic/chat-group', function (message) {console.log(`Group message received : ${message.body}`);const resp = JSON.parse(message.body);const previousMsg = $("#groupMsgReceived").val();$("#groupMsgReceived").val(`${previousMsg}${resp.content}\n`);});// 订阅单播消息stompClient.subscribe('/user/topic/chat-private', message => {console.log(`Private message received : ${message.body}`);const resp = JSON.parse(message.body);const previousMsg = $("#privateMsgReceived").val();$("#privateMsgReceived").val(`${previousMsg}${resp.content}\n`);});// 订阅定时推送的单播消息stompClient.subscribe(`/user/topic/chat-push`, message => {console.log(`Private message received : ${message.body}`);const resp = JSON.parse(message.body);const previousMsg = $("#privateMsgReceived").val();$("#privateMsgReceived").val(`${previousMsg}${resp.content}\n`);});};// 断连const disconnect = () => {stompClient.disconnect(() => {$("#msgReceived").val('Disconnected from WebSocket server');});}// 发送群聊消息const sendGroupMessage = () => {const msg = {name: $('#username').val(), content: $('#msgSent').val()};stompClient.send('/app/chat/group', {}, JSON.stringify(msg));}// 发送私聊消息const sendPrivateMessage = () => {const msg = {name: $('#username').val(), content: $('#msgSent').val()};stompClient.send('/app/chat/private', {}, JSON.stringify(msg));}
</script>
</body>
</html>

四、测试示例

1、群聊、私聊、后台定时推送测试

启动应用程序,日志打印显示系统连接建立成功,如下:

在这里插入图片描述

打开浏览器访问http://localhost:8080/page/chat可进入聊天页,同时打开两个窗口访问。
在这里插入图片描述


在这里插入图片描述

2、登录RabbitMQ控制台查看队列信息

可以看到所有消息都发送到了amq.topic交换机上(Topic类型), RabbitMQ会为每个连接的客户端创建3个队列。

因为我们在ChatController中定义了三个目的地,Routing Key分别是/topic/chat-group/topic/chat-private/topic/chat-push。群聊消息目的地/topic/chat-group绑定了两个队列,用于实现广播订阅,其它两个Routing Key分别绑定到了不同的队列上,实现唯一订阅。

在这里插入图片描述


五、结语

下一节我们将会详细说明RabbitMQ STOMP适配器支持的各种消息目的地类型的区别以及适用场景。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/71030.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

超图嵌入论文阅读2:超图神经网络

超图嵌入论文阅读2&#xff1a;超图神经网络 原文&#xff1a;Hypergraph Neural Networks ——AAAI2019&#xff08;CCF-A&#xff09; 源码&#xff1a;https://github.com/iMoonLab/HGNN 500star 概述 贡献&#xff1a;用于数据表示学习的超图神经网络 (HGNN) 框架&#xf…

【高阶数据结构】红黑树 {概念及性质;红黑树的结构;红黑树的实现;红黑树插入操作详细解释;红黑树的验证}

红黑树 一、红黑树的概念 红黑树&#xff08;Red Black Tree&#xff09; 是一种自平衡二叉查找树&#xff0c;在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或Black。 通过对任何一条从根到叶子的路径上各个结点着色方式的限制&#xff0c;红黑树确保没有…

旅游APP外包开发注意事项

旅游类APP通常具有多种功能&#xff0c;以提供给用户更好的旅行体验。以下分享常见的旅游类APP功能以及在开发和使用这些APP时需要注意的问题&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 常见功能…

入栏需看——学习记忆

记忆方法千千种&#xff0c;本栏意在梳理其中道道来&#xff0c;旦有小得&#xff0c;肥肠幸耶。从不同角度分析学习记忆。 逻辑篇 有逻辑 用思维导图 思维导图记忆有逻辑的文本/内容 理论 巧记书本结构–思维导图 模仿 HCIE-Cloud Computing LAB备考第一步&#xff1a…

Python基于Mirai开发的QQ机器人保姆式教程(亲测可用)

在本教程中&#xff0c;我们将使用Python和Mirai来开发一个QQ机器人&#xff0c;本文提供了三个教学视频&#xff0c;包教包会&#xff0c;本文也很贴心贴了代码和相关文件。话不多说&#xff0c;直接开始教学。 目录 一、安装配置MIrai 图片验证码报错&#xff1a; 二、机器…

vue-cli中总提示组件没有正确注册

这里写目录标题 一、报错提示二、修改办法 一、报错提示 二、修改办法 <template><div><aside-component style"width: 15%"></aside-component></div> </template><script> import AsideComponent from /components/Asi…

NATAPP使用详细教程(免费隧道内网映射)

NATAPP - https://natapp.cn/tunnel/lists NATAPP 在开发时可能会有将自己开发的机器上的应用提供到公网上进行访问&#xff0c;但是并不想通过注册域名、搭建服务器&#xff1b;由此可以使用natapp&#xff08;内网穿透&#xff09; 购买免费隧道 修改隧道配置 看自己的web…

JAVA毕业设计096—基于Java+Springboot+Vue的在线教育系统(源码+数据库+18000字论文)

基于JavaSpringbootVue的在线教育系统(源码数据库18000字论文)096 一、系统介绍 本系统前后端分离 本系统分为管理员、用户两种角色(管理员角色权限可自行分配) 用户功能&#xff1a; 注册、登录、课程预告、在线课程观看、学习资料下载、学习文章预览、个人信息管理、消息…

【计算机视觉项目实战】中文场景识别

✨专栏介绍&#xff1a; 经过几个月的精心筹备&#xff0c;本作者推出全新系列《深入浅出OCR》专栏&#xff0c;对标最全OCR教程&#xff0c;具体章节如导图所示&#xff0c;将分别从OCR技术发展、方向、概念、算法、论文、数据集等各种角度展开详细介绍。 &#x1f468;‍&…

【Two Stream network (Tsn)】(二) 阅读笔记

贡献 将深度神经网络应用于视频动作识别的难点&#xff0c;是如何同时利用好静止图像上的 appearance information以及物体之间的运动信息motion information。本文主要有三点贡献&#xff1a; 1.提出了一种融合时间流和空间流的双流网络&#xff1b; 2.证明了直接在光流上训…

【C++精华铺】10.STL string模拟实现

1. 序言 STL&#xff08;标准模板库&#xff09;是一个C标准库&#xff0c;其中包括一些通用的算法、容器和函数对象。STL的容器是C STL库的重要组成部分&#xff0c;它们提供了一种方便的方式来管理同类型的对象。其中&#xff0c;STLstring是一种常用的字符串类型。 STLstrin…

Docker如何安装seafile

SQLite 方式 要在 Docker 中安装 Seafile&#xff0c;您可以按照以下步骤进行操作&#xff1a; 安装 Docker&#xff1a;确保您的系统上已经安装了 Docker。您可以根据您的操作系统类型&#xff0c;在官方网站上找到适合您系统的 Docker 版本并进行安装。 下载 Seafile 镜像&…

Unity设置TextMeshPro文本超出范围显示...

TextMtshPro文本超出范围&#xff0c;展示省略。选择Overflow为Ellipsis。

centroen 23版本换界面了

旧版本 新版本 没有与操作系统一起打包的ISO文件了&#xff0c;要么先安装系统&#xff0c;再安装Centreon&#xff0c;要么用pve导入OVF文件

Shell命令操作Linux文件系统

Shell命令操作Linux文件系统 文件夹介绍 文件夹常规命令 文件夹权限控制⭐ 文件类型和权限 修改文件权限 移动、复制、删除文件夹 文件夹介绍 Linux文件系统是计算机操作系统中的一个关键组成部分&#xff0c;它用于管理和组织计算机上的数据和信息。先到根目录&#xf…

支付宝使用OceanBase的历史库实践分享

为解决因业务增长引发的数据库存储空间问题&#xff0c;支付宝基于 OceanBase 数据库启动了历史库项目&#xff0c;通过历史数据归档、过期数据清理、异常数据回滚&#xff0c;实现了总成本降低 80%。 历史数据归档&#xff1a;将在线库&#xff08;SSD 磁盘&#xff09;数据归…

STDF-Viewer 解析工具说明

一、简介 1. 概述 STDF&#xff08;Standard Test Data Format&#xff09;&#xff08;标准测试数据格式&#xff09;是半导体测试行业的最主要的数据格式&#xff0c;包含了summary信息和所有测试项的测试结果&#xff1b;是半导体行业芯片测试数据的存储规范。 在半导体行业…

解决Nacos服务器连接问题:一次完整的排查经验分享

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

【笔试强训选择题】Day35.习题(错题)解析

作者简介&#xff1a;大家好&#xff0c;我是未央&#xff1b; 博客首页&#xff1a;未央.303 系列专栏&#xff1a;笔试强训选择题 每日一句&#xff1a;人的一生&#xff0c;可以有所作为的时机只有一次&#xff0c;那就是现在&#xff01;&#xff01; 文章目录 前言 一、Da…