【stomp实战】搭建一套websocket推送平台

前面几个小节我们已经学习了stomp协议,了解了stomp客户端的用法,并且搭建了一个小的后台demo,前端页面通过html页面与后端服务建立WebSocket连接。发送消息给后端服务。后端服务将消息内容原样送回。通过这个demo我们学习了前端stomp客户端的用法,同时也学到了如何通过spring-boot来搭建一个websocket平台。
本节我们将继续深入,搭建一套可用于生产的websocket平台。

基本介绍

websocket连接推送服务包含两个服务,websocket-connector和websocket-gateway。

在这里插入图片描述
架构如上图
websocket-connector

  • 作为和客户端建立websocket连接的服务,负责消息的接收和推送

websocket-gateway

  • 作为后台服务,提供http接口给其他微服务,其他微服务可以通过http接口发送消息给指定的用户

使用说明

通过下面的步骤来进行调试

  1. 分别启动项目websocket-connector和websocket-gateway
  2. 访问下面接口,获取某个用户的token
    下面示例是获取用户1001的token
curl -X POST -H  "Accept:*/*" -H  "Content-Type:application/json" -d "{\"userId\":\"1001\"}" "http://localhost:8081/api/ws/token/get"
  1. 打开websocket-connector调试页面http://localhost:8080/index.html
    将上一个接口获取到的token作为参数,与服务器建立连接
    在这里插入图片描述

  2. 通过页面的send按钮,发送一条消息给服务器,同时服务器会将此消息回复给前端页面。参考上图

  3. 通过websocket-gateway的接口,发送用户单播消息

curl -X POST -H  "Accept:*/*" -H  "Content-Type:application/json" -d "{\"appCode\":\"test2\",\"messageData\":{\"body\":\"single message\",\"headers\":{}},\"topic\":\"/user/topic/single/hello\",\"userIds\":[1001]}" "http://localhost:8081/api/ws/message/single/send"

在这里插入图片描述
在这里插入图片描述

前端页面可以收到该消息的推送
6.通过websocket-gateway的接口,发送广播消息

curl -X POST -H  "Accept:*/*" -H  "Content-Type:application/json" -d "{\"appCode\":\"test1\",\"messageData\":{\"body\":\"hello board message1\",\"headers\":{}},\"topic\":\"/topic/boardCast/hello\"}" "http://localhost:8081/api/ws/message/boardCast/send"

在这里插入图片描述
在这里插入图片描述

主要代码分析

前端代码

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>STOMP</title>
</head>
<body onload="disconnect()">
<h1 id="tip">消息发布订阅测试页</h1>
请输入token:<input type="text" id="token" placeholder=""> <br>
<button onclick="connect()" id="connect">connect</button>
<button onclick="disconnect()" id="disconnect">disconnect</button><p>输入消息: <span id="msg"></span></p>
<input type="text" id="content" placeholder=""> <br>
<button onclick="send()">send</button><ul id="ul">回答消息<p id="answer"></p>单播消息<p id="single"></p>广播消息<p id="board"></p>
</ul>
<script src="sockjs.min.js"></script>
<script src="stomp.min.js"></script>
<script>var stompClient = null;var endpoint = "/ws";//断开连接function disconnect() {if (stompClient != null) {stompClient.disconnect();}setConnect(false);console.log("disconnected");}//建立连接function connect() {var token = document.getElementById("token").value;if (token === '' || token === undefined) {alert("请输入token");return;}//连接请求头里面,设置好我们提前获取好的tokenvar headers = {token: token};var url = endpoint;var socket = new SockJS(url);stompClient = Stomp.over(socket);//建立连接stompClient.connect(headers, function (msg) {setConnect(true);console.log("connected:" + msg);//订阅了三个topic//订阅用户消息topic1stompClient.subscribe("/user/topic/answer", function (response) {createElement("answer", response.body);});//订阅用户消息topic2stompClient.subscribe("/user/topic/single/hello", function (response) {createElement("single", response.body);});//订阅广播消息topicstompClient.subscribe("/topic/boardCast/hello", function (response) {createElement("board", response.body);});});}//主动发送消息给服务器,对应的后端topic为/app/echofunction send() {var value = document.getElementById("content").value;var msg = {msgType: 1,content: value};stompClient.send("/app/echo", {}, JSON.stringify(msg));}function createElement(eleId, msg) {var singleP = document.getElementById(eleId);var p = document.createElement("p");p.style.wordWrap = "break-word";p.appendChild(document.createTextNode(msg));singleP.appendChild(p);}function setConnect(connected) {document.getElementById("connect").disabled = connected;document.getElementById("disconnect").disabled = !connected;}
</script>
</body>
</html>

websocket-connector端的代码

会话的鉴权及连接建立

@Slf4j
@Component
public class WebSocketInboundInterceptor implements ChannelInterceptor {@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);if (accessor == null) {return message;}//建立连接if (Objects.equals(accessor.getCommand(), StompCommand.CONNECT)) {connect(message, accessor);}return message;}/*** 建立会话** @param message* @param accessor*/private void connect(Message<?> message, StompHeaderAccessor accessor) {String token = accessor.getFirstNativeHeader(WsConstants.TOKEN_HEADER);if (StringUtils.isEmpty(token)) {throw new MessageDeliveryException("token missing!");}String userId = TokenUtil.parseToken(token);if (StringUtils.isEmpty(userId)) {throw new MessageDeliveryException("userId missing!");}String simpleSessionId = (String) message.getHeaders().get(SimpMessageHeaderAccessor.SESSION_ID_HEADER);UserSession userSession = new UserSession();userSession.setSimpleSessionId(simpleSessionId);userSession.setUserId(userId);userSession.setCreateTime(LocalDateTime.now());//关联用户的会话。通过msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg); 此方法,可以发送给用户消息accessor.setUser(new UserSessionPrincipal(userSession));}
}

如何接收客户端发送的消息

@Slf4j
@Controller
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class StompController {private final SimpMessageSendingOperations msgOperations;private final SimpUserRegistry simpUserRegistry;/*** 回音消息,将用户发来的消息内容加上 Echo 前缀后推送回客户端*/@MessageMapping("/echo")public void echo(Principal principal, Msg msg) {String username = principal.getName();msg.setContent("Echo: " + msg.getContent());msgOperations.convertAndSendToUser(username, "/topic/answer", msg);int userCount = simpUserRegistry.getUserCount();int sessionCount = simpUserRegistry.getUser(username).getSessions().size();log.info("当前本系统总在线人数: {}, 当前用户: {}, 该用户的客户端连接数: {}", userCount, username, sessionCount);}
}

消费rabbitMQ推送的单播和广播消息

@Component
@Slf4j
public class MessageReceiveConsumer {private final Gson gson;private final ReceivedMessageHandler receivedMessageHandler;public MessageReceiveConsumer(Gson gson, ReceivedMessageHandler receivedMessageHandler) {this.gson = gson;this.receivedMessageHandler = receivedMessageHandler;}@RabbitListener(bindings = @QueueBinding(value = @Queue(),exchange = @Exchange(value = WsConstants.SINGLE_EXCHANGE, type = ExchangeTypes.FANOUT)))public void handleSingleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {String body = new String(message.getBody(), StandardCharsets.UTF_8);SingleMessage singleMessage = gson.fromJson(body, SingleMessage.class);receivedMessageHandler.handleSingleMessage(singleMessage);channel.basicAck(tag, false);}@RabbitListener(bindings = @QueueBinding(value = @Queue(),exchange = @Exchange(value = WsConstants.BOARD_CAST_EXCHANGE, type = ExchangeTypes.FANOUT)))public void handleBoardCastMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {String body = new String(message.getBody(), StandardCharsets.UTF_8);BoardCastMessage boardCastMessage = gson.fromJson(body, BoardCastMessage.class);receivedMessageHandler.handleBoardCastMessage(boardCastMessage);channel.basicAck(tag, false);}
}

建立了两个exchange分别来接收消息。这里 @QueueBinding(value = @Queue(),通过这种方式建立的队列,队列名是spring取的一个随机名称,如下图所示
在这里插入图片描述

调用客户端工具类,发送消息给客户端

@Slf4j
@Component
public class ReceivedMessageHandler {private final WsMessageClient wsMessageClient;public ReceivedMessageHandler(WsMessageClient wsMessageClient) {this.wsMessageClient = wsMessageClient;}public void handleSingleMessage(SingleMessage singleMessage) {wsMessageClient.sendToUsers(singleMessage.getUserIds(), singleMessage);}public void handleBoardCastMessage(BoardCastMessage boardCastMessage) {wsMessageClient.boardCast(boardCastMessage);}
}

websocket-gateway 申请token接口

通过该接口,生成用户的jwtToken,在客户端建立连接时需要此token,不然无法建立连接

public class TokenController {@PostMapping("/get")public String getToken(@RequestBody @Validated TokenRequest tokenRequest) {return TokenUtil.generateToken(tokenRequest.getUserId());}
}

websocket-gateway 发送消息的接口

websocket-gateway暴露发送消息的接口给业务服务

public class MessageController {private final MessageSendService messageSendService;public MessageController(MessageSendService messageSendService) {this.messageSendService = messageSendService;}@PostMapping("/single/send")public Boolean singleSend(@RequestBody SingleMessage singleMessage) {return messageSendService.singleSend(singleMessage);}@PostMapping("/boardCast/send")public Boolean boardCastSend(@RequestBody BoardCastMessage boardCastMessage) {return messageSendService.boardCastSend(boardCastMessage);}}

通过该http接口,最终会调用rabbitmq,发送消息给websocket-connector服务

项目地址

更多项目代码直接看一下源码吧
https://gitee.com/syk1234/websocket-services

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/3008.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

剑指 Offer 03.:数组中重复的数字

剑指 Offer 03. 数组中重复的数字 找出数组中重复的数字。 在一个长度为 n 的数组 nums 里的所有数字都在 0&#xff5e;n-1 的范围内。数组中某些数字是重复的&#xff0c;但不知道有几个数字重复了&#xff0c;也不知道每个数字重复了几次。请找出数组中任意一个重复的数字。…

DIN特征加权、POSO特征增强、SENET特征选择

本文转自&#xff1a;DIN、POSO、SENet 聊聊推荐模型中常用的Attention-腾讯云开发者社区-腾讯云 一、前言 聊起模型结构的时候&#xff0c;经常听做推荐的同学说&#xff1a; "这里加了个self-attention" "类似于一个SENet" "一个魔改的POSO"…

[Visual Studio 报错] error 找不到指定的 SDK“Microsoft

[Visual Studio 2022 报错] error : 找不到指定的 SDK“Microsoft.NET.Sdk.Web” 问题描述&#xff1a; 在新电脑上安装了VS2022&#xff0c;打开现有项目的解决方案后的时候报了这个错&#xff0c;所有projet文件都加载失败,如图所示&#xff1a; 报错分析及解决 打开项目配…

经验丰富也被裁了,失业快2年找不到工作?

前几天徐工说&#xff0c;他有个邻居&#xff0c;最近逮到他总是要跟他扯上几句。 原因是徐工一直是做嵌入式开发&#xff0c;而他一直做纯软件开发&#xff0c;具体不知道做后端还是前端。 他说&#xff0c;他至少有半年没上班了&#xff0c;之前在一家龙头物流公司上班。 碰上…

STM32 HAL库F103系列之DAC实验(二)

DAC输出正弦波实验 实验简要 1&#xff0c;功能描述 通过DAC1通道1(PA4)输出正弦波&#xff0c;然后通过DS100示波器查看波形 2&#xff0c;使用定时器7 TRGO事件触发转换 TEN1位置1、TSEL1[2:0]010 3&#xff0c;关闭输出缓冲 BOFF1位置1 4&#xff0c;使用DMA模式 DMAE…

SLMs之Phi-3:Phi-3的简介、安装和使用方法、案例应用之详细攻略

SLMs之Phi-3&#xff1a;Phi-3的简介、安装和使用方法、案例应用之详细攻略 导读&#xff1a;2024年4月23日&#xff0c;微软发布Phi-3&#xff0c;这是微软推出的一款新的开源AI模型家族Phi-3。背景痛点&#xff1a;小语言模型(SLM)尽管规模不大&#xff0c;但在语言理解、代码…

盲盒商城小程序(有米就出)

一款前端采用uniapp&#xff0c;后端采用Django框架开发的小程序&#xff0c;包含后台管理&#xff0c;如有人需要可联系演示功能&#xff08;个人开发&#xff0c;可商用/学习&#xff09;。 部分截图如下&#xff1a;

文件摆渡:安全、高效的摆渡系统助力提升效率

很多组织和企业都会通过网络隔离的方式来保护内部的数据&#xff0c;网络隔离可以是物理隔离&#xff0c;也可以是逻辑隔离&#xff0c;如使用防火墙、VPN、DMZ等技术手段来实现&#xff0c;隔离之后还会去寻找文件摆渡方式&#xff0c;来保障日常的业务和经营需求。 进行网络隔…

数据库变更时,OceanBase如何自动生成回滚 SQL

背景 在开发中&#xff0c;数据的变更与维护工作一般较频繁。当我们执行数据库的DML操作时&#xff0c;必须谨慎考虑变更对数据可能产生的后果&#xff0c;以及变更是否能够顺利执行。若出现意外数据丢失、操作失误或语法错误等情况&#xff0c;我们必须迅速将数据库恢复到变更…

jsp+springboot+java二手车交易管理系统258u6

设计而成的系统要有以下目标&#xff1a;管理员和用户能够跳转到不同的页面当中。因此要把系统的目标设置为如下几项&#xff1a; (1) 系统在操作上不能过于复杂。 (2) 用户对应着不同的角色 (3) 设计完成的数据库要有能够处理并发和安全的作用 (4) 设计完成的管理…

亚马逊云科技提高企业生产力神器Amazon Q评测分析

一年一度的全球云计算春晚&#xff0c;亚马逊云科技Re:invent在2023年11月27于Vegas震撼来袭&#xff0c;其中最令人关注的就是CEO Adam在Keynote中分享的内容。其中一个新内容就是提升生产力神器: Amazon Q&#xff0c;可以说它重新定义了企业的工作模式。那具体它神在哪里呢&…

Python构建学生信息管理系统:网站路由补充和首次运行

在之前的内容中&#xff0c;我们已经完成了学生信息管理系统&#xff08;Student Information Management System, SIMS&#xff09;的需求分析、环境搭建、数据库创建、项目结构的初始化&#xff0c;以及运行。正常做下来的朋友&#xff0c;会发现项目运行后输入http://127.0.…

vulfocus靶场thinkphp命令执行cve-2018-1002015

thinkPHP 5.0.x版本和5.1.x版本中存在远程代码执行漏洞&#xff0c;该漏洞源于ThinkPHP在获取控制器名时未对用户提交的参数进行严格的过滤。远程攻击者可通过输入‘&#xff3c;’字符的方式调用任意方法利用该漏洞执行代码 开启靶场&#xff1a; 使用工具&#xff1a; think…

使用微软Phi-3-mini模型快速创建生成式AI应用

微软Phi-3大语言模型是微软研究院推出的新一代系列先进的小语言模型。Phi-3系列包括phi-3-mini、phi-3-small和phi-3-medium三个不同规模的版本。这些模型在保持较小的参数规模的同时&#xff0c;通过精心设计的训练数据集和优化的算法&#xff0c;实现了与大型模型相媲美的语言…

Edge下载文件提示无法安全下载的解决方法

问题描述&#xff1a;最近Edge在下载文件时总是提示&#xff1a;无法安全下载&#xff0c;本文记录一下解决方法。 提示截图&#xff1a; 解决方式一&#xff1a; 1. 点击下图红框的三个点&#xff0c;选择保留 2. 选择仍然保留 解决方式二&#xff1a; 第一种方式每下载一次…

✅为什么MySQL默认使用RR隔离级别?

对于数据库的默认隔离级别&#xff0c;Oracle默认的隔离级别是 RC&#xff0c;而MySQL默认的隔离级别是 RR。 那么&#xff0c;你知道为什么Oracle选择RC作为默认级别&#xff0c;而MySQL要选择RR作为默认的隔离级别吗&#xff1f; Oracle的隔离级别 Oracle支持ANSI/ISO SQL…

HTB靶场 Perfection

端口 打开了ssh和http服务 访问 Perfection靶机的网站 是一个根据权重计算总成绩的网站 Wappalyzer查看网页用的什么编写搭建的 抓包看一下是怎么工作的 发送,&#xff0c;返回的结果 如果我在 类别 后面多加一句命令 就会出现提示 恶意输入阻止 大概率有命令注入 通过插件…

2024最新版JavaScript逆向爬虫教程-------基础篇之JavaScript密码学以及CryptoJS各种常用算法的实现

目录 一、密码学介绍1.1 为什么要学密码学?1.2 密码学里面学哪一些 二、字符编码三、位运算四、Hex 编码与 Base64 编码4.1 Hex 编码4.2 Base64 编码 五、消息摘要算法5.1 简介5.2 JS中的MD5、SHA、HMAC、SM3 六、对称加密算法6.1 介绍6.2 加密模式和填充方式6.3 CryptoJS 中D…

元宇宙虚拟空间的角色状态更新(七)

前言 该文章主要讲元宇宙虚拟空间的角色状态更新&#xff0c;基本核心技术点 角色状态更新 对角色设置一个位置判断&#xff08;从中心点向下投射一射线确定角色的位置&#xff09; character.feetRaycast(); feetRaycast的start获取碰撞体的位置&#xff0c;end射线结束的…

Linux驱动开发:掌握SPI通信机制

目录标题 1、SPI简介2、SPI通信机制3、Linux内核中的SPI支持4、SPI核心API5、SPI控制器驱动6、SPI设备驱动 7、编写SPI设备驱动8、调试SPI驱动 在Linux驱动开发中&#xff0c;串行外设接口(SPI)是一种常见的高速全双工通信协议&#xff0c;用于连接处理器和各种外设。本文将深入…