WebSocket的那些事(5-Spring STOMP支持之连接外部消息代理)

目录

  • 一、序言
  • 二、开启RabbitMQ外部消息代理
  • 三、代码示例
    • 1、Maven依赖项
    • 2、相关实体
    • 3、自定义用户认证拦截器
    • 4、Websocket外部消息代理配置
    • 5、ChatController
    • 6、前端页面chat.html
  • 四、测试示例
    • 1、群聊、私聊、后台定时推送测试
    • 2、登录RabbitMQ控制台查看队列信息
  • 五、结语

一、序言

上节我们在 WebSocket的那些事(4-Spring中的STOMP支持详解) 中详细说明了通过Spring内置消息代理结合STOMP子协议进行Websocket通信,以及相关注解的使用及原理。

但是Spring内置消息代理会有一些限制,比如只支持STOMP协议的一部分命令,像acksreceipts命令都是不支持的,还有由于内置消息代理把消息存储在内存,当应用不可用时,客户端也就订阅不到到后台推送的消息。

这节我们将会使用支持STOMP协议的外部消息代理(RabbitMQ)进行Websocket通信。


二、开启RabbitMQ外部消息代理

服务端路由发送消息以及客户端订阅消息都要通过STOMP协议与RabbitMQ进行交互,由于RabbitMQ默认没有启动STOMP插件,因此我们需要先启用该插件。

rabbitmq-plugins enable rabbitmq_stomp

启动该插件后,RabbitMQ中STOMP适配器默认会监听61613端口,如果是云服务器,需要把该端口在安全组中放开。

关于该插件说明请参考:RabbitMQ中STOMP插件说明。


三、代码示例

我们在 WebSocket的那些事(4-Spring中的STOMP支持详解)中写了一个简单的聊天Demo示例,下面我们对该聊天Demo示例进行改造,将Spring内置消息代理替换成RabbitMQ外部消息代理。

1、Maven依赖项

服务端和客户端与外部消息代理都是通过TCP进行通信,Spring底层默认使用的是NettyReactor,因此需要引入相关依赖项。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-reactor-netty</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>

2、相关实体

(1) 请求消息参数

@Data
public class WebSocketMsgDTO {private String name;private String content;
}

(2) 响应消息内容

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class WebSocketMsgVO {private String content;
}

(3) 自定义认证用户信息

@Data
@AllArgsConstructor
@NoArgsConstructor
public class StompAuthenticatedUser implements Principal {/*** 用户唯一ID*/private String userId;/*** 用户昵称*/private String nickName;/*** 用于指定用户消息推送的标识* @return*/@Overridepublic String getName() {return this.userId;}}

3、自定义用户认证拦截器

@Slf4j
public class UserAuthenticationChannelInterceptor implements ChannelInterceptor {private static final String USER_ID = "User-ID";private static final String USER_NAME = "User-Name";@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);// 如果是连接请求,记录userIdif (StompCommand.CONNECT.equals(accessor.getCommand())) {String userID = accessor.getFirstNativeHeader(USER_ID);String username = accessor.getFirstNativeHeader(USER_NAME);log.info("Stomp User-Related headers found, userID: {}, username:{}", userID, username);accessor.setUser(new StompAuthenticatedUser(userID, username));}return message;}}

4、Websocket外部消息代理配置

Spring中与外部消息代理通信的中间方被称之为Broker Relay,它会维护一个系统共享的单一TCP连接和外部消息代理进行通信,该TCP连接仅仅适用于服务端,用来发送消息,而不是接收消息,通过Broker RelaysystemLoginsystemPasscode属性可以设置该连接的认证信息。

Broker Relay也会为每个连接的Websocket客户端创建一个TCP连接,该连接用来接收消息,通过clientLoginclientPasscode属性可以设置连接的认证信息。

/*** Websocket连接外部消息代理配置* @author Nick Liu* @date 2023/9/6*/
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketExternalMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {// 拦截器配置registration.interceptors(new UserAuthenticationChannelInterceptor());}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/websocket") // WebSocket握手端口.addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOriginPatterns("*") // 设置跨域.withSockJS(); // 开启SockJS回退机制}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.setApplicationDestinationPrefixes("/app") // 发送到服务端目的地前缀.enableStompBrokerRelay("/topic") // 开启外部消息代理,指定消息订阅前缀.setRelayHost("localhost") // 外部消息代理Host.setRelayPort(61613) // 外部消息代理STOMP端口.setSystemLogin("admin")  // 共享系统连接用户名,该连接主要用来发送消息.setSystemPasscode("admin") // 共享系统连接密码,该连接主要用来发送消息.setClientLogin("admin") // 客户端连接用户名,该连接主要用来接收消息.setClientPasscode("admin") // 客户端连接密码,该连接主要用来接收消息.setVirtualHost("/stomp"); // RabbitMQ虚拟主机}
}

备注:我们可以为服务端与客户端的连接设置不同的用户,针对客户端连接用户进行权限管控,保证系统的安全性,在这里为了方便测试我们统一用一个用户。

5、ChatController

STOMP协议并没有规定消息代理必须支持哪种类型的Destinations(目的地),但是RabbitMQ STOMP适配器只支持一些指定的目的地类型,如下图:
在这里插入图片描述

  • /exchange:指定交换机和路由key,发送和订阅来自队列的消息。
  • /queue:发送和订阅受STOMP网关管理的队列的消息,最多只有一个订阅者能到消息。
  • /amq/queue:发送和订阅不受STOMP网关管理的队列的消息。
  • /topic:发送和订阅来自临时或者持久Topic的消息,多个订阅者都能接收到消息。
  • /temp-queue/:发送和订阅来自临时队列的消息。

参考文档见:RabbitMQ中STOMP插件说明。

在下面的示例中,我们选用了/topic的开头的消息发送和订阅前缀,目的地格式只能为/topic/{routing-key}routing-key不能有斜杠,否则会报错。

@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {private final SimpUserRegistry simpUserRegistry;private final SimpMessagingTemplate simpMessagingTemplate;/*** 模板引擎为Thymeleaf,需要加上spring-boot-starter-thymeleaf依赖,* @return*/@GetMapping("/page/chat")public ModelAndView turnToChatPage() {return new ModelAndView("chat");}/*** 群聊消息处理* 这里我们通过@SendTo注解指定消息目的地为"/topic/chat/group",如果不加该注解则会自动发送到"/topic" + "/chat/group"* @param webSocketMsgDTO 请求参数,消息处理器会自动将JSON字符串转换为对象* @return 消息内容,方法返回值将会广播给所有订阅"/topic/chat/group"的客户端*/@MessageMapping("/chat/group")@SendTo("/topic/chat-group")public WebSocketMsgVO groupChat(WebSocketMsgDTO webSocketMsgDTO) {log.info("Group chat message received: {}", FastJsonUtils.toJsonString(webSocketMsgDTO));String content = String.format("来自[%s]的群聊消息: %s", webSocketMsgDTO.getName(), webSocketMsgDTO.getContent());return WebSocketMsgVO.builder().content(content).build();}/*** 私聊消息处理* 这里我们通过@SendToUser注解指定消息目的地为"/topic/chat/private",发送目的地默认会拼接上"/user/"前缀* 实际发送目的地为"/user/topic/chat/private"* @param webSocketMsgDTO 请求参数,消息处理器会自动将JSON字符串转换为对象* @return 消息内容,方法返回值将会基于SessionID单播给指定用户*/@MessageMapping("/chat/private")@SendToUser("/topic/chat-private")public WebSocketMsgVO privateChat(WebSocketMsgDTO webSocketMsgDTO) {log.info("Private chat message received: {}", FastJsonUtils.toJsonString(webSocketMsgDTO));String content = "私聊消息回复:" + webSocketMsgDTO.getContent();return WebSocketMsgVO.builder().content(content).build();}/*** 定时消息推送,这里我们会列举所有在线的用户,然后单播给指定用户。* 通过SimpMessagingTemplate实例可以在任何地方推送消息。*/@Scheduled(fixedRate = 10 * 1000)public void pushMessageAtFixedRate() {log.info("当前在线人数: {}", simpUserRegistry.getUserCount());if (simpUserRegistry.getUserCount() <= 0) {return;}// 这里的Principal为StompAuthenticatedUser实例Set<StompAuthenticatedUser> users = simpUserRegistry.getUsers().stream().map(simpUser -> StompAuthenticatedUser.class.cast(simpUser.getPrincipal())).collect(Collectors.toSet());users.forEach(authenticatedUser -> {String userId = authenticatedUser.getUserId();String nickName = authenticatedUser.getNickName();WebSocketMsgVO webSocketMsgVO = new WebSocketMsgVO();webSocketMsgVO.setContent(String.format("定时推送的私聊消息, 接收人: %s, 时间: %s", nickName, LocalDateTime.now()));log.info("开始推送消息给指定用户, userId: {}, 消息内容:{}", userId, FastJsonUtils.toJsonString(webSocketMsgVO));simpMessagingTemplate.convertAndSendToUser(userId, "/topic/chat-push", webSocketMsgVO);});}}

6、前端页面chat.html

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head><meta charset="UTF-8"><title>chat</title><script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.6.1/sockjs.min.js"></script><script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script><script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.6.4/jquery.min.js"></script><style>#mainWrapper {width: 600px;margin: auto;}</style>
</head>
<body>
<div id="mainWrapper"><div><label for="username" style="margin-right: 5px">姓名:</label><input id="username" type="text"/></div><div id="msgWrapper"><p style="vertical-align: top">发送的消息:</p><textarea id="msgSent" style="width: 600px;height: 100px"></textarea><p style="vertical-align: top">收到的群聊消息:</p><textarea id="groupMsgReceived" style="width: 600px;height: 100px"></textarea><p style="vertical-align: top">收到的私聊消息:</p><textarea id="privateMsgReceived" style="width: 600px;height: 200px"></textarea></div><div style="margin-top: 5px;"><button onclick="connect()">连接</button><button onclick="sendGroupMessage()">发送群聊消息</button><button onclick="sendPrivateMessage()">发送私聊消息</button><button onclick="disconnect()">断开连接</button></div>
</div>
<script type="text/javascript">$(() => {$('#msgSent').val('');$("#groupMsgReceived").val('');$("#privateMsgReceived").val('');});let stompClient = null;// 连接服务器const connect = () => {const header = {"User-ID": new Date().getTime().toString(), "User-Name": $('#username').val()};const ws = new SockJS('http://localhost:8080/websocket');stompClient = Stomp.over(ws);stompClient.connect(header, () => subscribeTopic());}// 订阅主题const subscribeTopic = () => {alert("连接成功!");// 订阅广播消息stompClient.subscribe('/topic/chat-group', function (message) {console.log(`Group message received : ${message.body}`);const resp = JSON.parse(message.body);const previousMsg = $("#groupMsgReceived").val();$("#groupMsgReceived").val(`${previousMsg}${resp.content}\n`);});// 订阅单播消息stompClient.subscribe('/user/topic/chat-private', message => {console.log(`Private message received : ${message.body}`);const resp = JSON.parse(message.body);const previousMsg = $("#privateMsgReceived").val();$("#privateMsgReceived").val(`${previousMsg}${resp.content}\n`);});// 订阅定时推送的单播消息stompClient.subscribe(`/user/topic/chat-push`, message => {console.log(`Private message received : ${message.body}`);const resp = JSON.parse(message.body);const previousMsg = $("#privateMsgReceived").val();$("#privateMsgReceived").val(`${previousMsg}${resp.content}\n`);});};// 断连const disconnect = () => {stompClient.disconnect(() => {$("#msgReceived").val('Disconnected from WebSocket server');});}// 发送群聊消息const sendGroupMessage = () => {const msg = {name: $('#username').val(), content: $('#msgSent').val()};stompClient.send('/app/chat/group', {}, JSON.stringify(msg));}// 发送私聊消息const sendPrivateMessage = () => {const msg = {name: $('#username').val(), content: $('#msgSent').val()};stompClient.send('/app/chat/private', {}, JSON.stringify(msg));}
</script>
</body>
</html>

四、测试示例

1、群聊、私聊、后台定时推送测试

启动应用程序,日志打印显示系统连接建立成功,如下:

在这里插入图片描述

打开浏览器访问http://localhost:8080/page/chat可进入聊天页,同时打开两个窗口访问。
在这里插入图片描述


在这里插入图片描述

2、登录RabbitMQ控制台查看队列信息

在这里插入图片描述
可以看到所有消息都发送到了amq.topic交换机上(Topic类型), RabbitMQ会为每个连接的客户端创建3个队列。

因为我们在ChatController中定义了三个目的地,Routing Key分别是/topic/chat-group/topic/chat-private/topic/chat-push。群聊消息目的地/topic/chat-group绑定了两个队列,用于实现广播订阅,其它两个Routing Key分别绑定到了不同的队列上,实现唯一订阅。


五、结语

下一节我们将会详细说明RabbitMQ STOMP适配器支持的各种消息目的地类型的区别以及适用场景。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/74207.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第4章_瑞萨MCU零基础入门系列教程之瑞萨 MCU 源码设计规范

本教程基于韦东山百问网出的 DShanMCU-RA6M5开发板 进行编写&#xff0c;需要的同学可以在这里获取&#xff1a; https://item.taobao.com/item.htm?id728461040949 配套资料获取&#xff1a;https://renesas-docs.100ask.net 瑞萨MCU零基础入门系列教程汇总&#xff1a; ht…

deepin V23通过flathub安装steam畅玩游戏

deepin V23缺少32位库&#xff0c;在星火商店安装的steam,打开报错&#xff0c;无法使用&#xff01; 通过flathub网站安装steam,可以正常使用&#xff0c;详细教程如下&#xff1a; flathub网址&#xff1a;主页 | Flathub 注意&#xff1a;flathub下载速度慢&#xff0c;只…

Redis从基础到进阶篇(四)----性能调优、分布式锁与缓存问题

目录 一、Redis 集群演变 1.1 ReplicationSentinel*高可用 1.2 ProxyReplicationSentinel(仅仅了解) 1.3 Redis Cluster 集群 (重点&#xff09; 1.3.1 Redis-cluster架构图 1.3.2 工作原理 1.3.3 主从切换 1.3.4 副本漂移 1.3.5 分片漂移 二、Redis版本历史&#xf…

ODC现已开源:与开发者共创企业级的数据库协同开发工具

OceanBase 开发者中心&#xff08;OceanBase Developer Center&#xff0c;以下简称 ODC&#xff09;是一款开源的数据库开发和数据库管理协同工具&#xff0c;从首个版本上线距今已经发展了三年有余&#xff0c;ODC 逐步由一款专为 OceanBase 打造的开发者工具演进成为支持多数…

xCode14.3.1运行MonkeyDev出现“Executable Not Found“的解决办法

安装MonkeyDev遇到的坑 环境&#xff1a;Xcode Version 14.3.1 (14E300c) 错误提示 is not a valid path to an executable file. 报错 /Users/xxxx//Library/Developer/Xcode/DerivedData/MonTest-ccparhdyzjuqhjdergwrngpfwwoh/Build/Products/Debug-iphoneos/MonTest.app…

go-zerogo web集成redis实战

前言 上一篇&#xff1a;go-zero&go web集成JWT和cobra命令行工具实战 从零开始基于go-zero搭建go web项目实战-03集成redis实战 源码仓库地址 源码 https://gitee.com/li_zheng/treasure-box golang redis 客户端 Go-Redis 地址&#xff1a; GitHub: https://github.…

Maven 知识点总结

文章目录 Maven1、Maven 坐标2、Maven 仓库3、Maven 依赖依赖配置依赖范围依赖调解原则排除依赖 4、Maven 生命周期5、Maven 聚合与继承 Maven Maven是一个项目管理工具&#xff0c;它包含了项目对象模型&#xff08;POM&#xff1a;Project Object Model&#xff09;&#xf…

windows系统docker中将vue项目网站部署在nginx上

一、首先在windows系统上下载并安装docker&#xff0c;要下载windows版本 https://www.docker.com/products/docker-desktop/ PS&#xff1a;安装过程中需要WSL&#xff0c;我的是win11系统&#xff0c;直接提示了我安装就可以下一步了。其他windows系统版本我不知道是否需要单…

简化转换器:使用您理解的单词进行最先进的 NLP — 第 1 部分 — 输入

一、说明 变形金刚是一种深度学习架构&#xff0c;为人工智能的发展做出了杰出贡献。这是人工智能和整个技术领域的一个重要阶段&#xff0c;但也有点复杂。截至今天&#xff0c;变形金刚上有很多很好的资源&#xff0c;那么为什么要再制作一个呢&#xff1f;两个原因&#xff…

# Spring MVC与RESTful API:如何设计高效的Web接口

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

数学建模--K-means聚类的Python实现

目录 1.算法流程简介 2.1.K-mean算法核心代码 2.2.K-mean算法效果展示 3.1.肘部法算法核心代码 3.2.肘部法算法效果展示 1.算法流程简介 #k-means聚类方法 """ k-means聚类算法流程: 1.K-mean均值聚类的方法就是先随机选择k个对象作为初始聚类中心. 2.这…

AI伦理:科技发展中的人性之声

文章目录 AI伦理的关键问题1. 隐私问题2. 公平性问题3. 自主性问题4. 伦理教育问题 隐私问题的拓展分析数据收集和滥用隐私泄露和数据安全 公平性问题的拓展分析历史偏见和算法模型可解释性 自主性问题的拓展分析自主AI决策伦理框架 伦理教育的拓展分析伦理培训 结论 &#x1f…

vue学习之基本用法

1. 前期准备 安装vs code IDE&#xff0c;vs code 安装 插件 open in brower新建 vue-learning 文件夹vs code IDE打开文件夹 2. 基本用法 创建demo1.html文件,内容如下 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8&qu…

华为数据管理——《华为数据之道》

数据分析与开发 元数据是描述数据的数据&#xff0c;用于打破业务和IT之间的语言障碍&#xff0c;帮助业务更好地理解数据。 元数据是数据中台的重要的基础设施&#xff0c;元数据治理贯彻数据产生、加工、消费的全过程&#xff0c;沉淀了数据资产&#xff0c;搭建了技术和业务…

Tomcat配置ssl、jar包

Tomcat配置ssl 部署tomcat服务&#xff0c;项目做到用https访问&#xff0c;使用nginx去做&#xff0c;访问任意一个子网站&#xff0c;都是https 或者 医美项目需要 上传jdk 456 tomcat war包 [nginx-stable] namenginx stable repo baseurlhttp://nginx.org/packages/…

“内存炸弹”DDOS拒绝服务攻击

Windows平台演示 最早的内存炸弹是 zip 炸弹&#xff0c;也称为死亡 zip&#xff0c;它是一种恶意计算机文件&#xff0c;旨在使读取该文件的程序崩溃或瘫痪。zip 炸弹不会劫持程序的操作&#xff0c;而是利用解压缩压缩文件所需的时间、磁盘空间或内存。 zip 炸弹的一个示例…

关于 RK3568的linux系统killed用户应用进程(用户现象为崩溃) 的解决方法

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/132710642 红胖子网络科技博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬…

Linux权限的概念和管理

Linux权限的概念和管理 1. Linux权限的概念2. Linux权限管理2.1 文件访问者的分类&#xff08;人&#xff09;2.2 文件类型和访问权限&#xff08;事物属性&#xff09;2.2.1 文件类型2.2.2 基本权限 2.3 文件权限值的表示方法2.4文件访问权限的相关设置方法1. chmod&#xff0…

ESP32用作经典蓝牙串口透传模块与手机进行串口通信

ESP32用作经典蓝牙串口透传模块与手机进行串口通信 简介ESP32开发板Arduino程序手机与ESP32开发板进行蓝牙串口透传通信总结 简介 ESP32-WROOM-32模组集成了双模蓝牙包括传统蓝牙&#xff08;BR/EDR&#xff09;、低功耗蓝牙&#xff08;BLE&#xff09;和 Wi-Fi&#xff0c;具…

linux修改最大线程数却未生效的原因

可能是没有重新对新文件进行编译 更改一个进程所能创建的最大进程数之前 更改一个进程所能创建的最大进程数之后 测试代码 #include <iostream> #include <unistd.h> #include <sys/wait.h> #include <string.h> #include <stdio.h> #include…