解决websocket集群的session共享问题

在websocket中,服务端主要使用的是session打交道,但是由于session无法实现序列化,不能存储到redis这些中间存储里面,因此这里我们只能把session存储在本地的内存中,那么如果是集群的话,我们如何实现session准确的发送消息呢,其实就是session共享。在websocket中,其实是无法做到session共享的,目前通用的解决方案都是通过消息中间件,实现消息的发布与订阅,也就是每一个服务端实例都订阅某个消息队列的topic,根据对应的sessionid来判断是否在本地存储,如果在本地通过sessionid找到了session,则给客户端发送消息,如果在本地找不到对应的session,那么就直接把这条消息丢弃掉。具体的如下图所示:

 这里的图来自于网上,网上大多都是基于redis做发布与订阅,在真实的环境中,我们一般用kafka或者rocketmq等。根据上面的图示,我们介绍下整个流程:

1、我们同时有A,B,C,D四个websocket服务端,同时订阅消息队列的topic: test8
2、我们发送一条消息a1到消息队列的topic:test8
3、此时A,B,C,D四个websocket服务端都会收到这条消息a1
4、A根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。
5、B根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。
6、C根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。
7、D根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,结果找到有对应的session,此时我们就把这条消息发送给=这个session。
8、客户端就收到了对应的消息。

一、创建一个公共的map,用来存放session

package com.websocket.utils;import java.util.concurrent.ConcurrentHashMap;import javax.websocket.Session;import org.springframework.stereotype.Component;@Component
public class OnlineSessionCache {private ConcurrentHashMap<Integer, Session> onlines = new ConcurrentHashMap<Integer, Session>();public void setUserSession(Integer userId, Session session) {onlines.put(userId, session);}public Session getUserSession(Integer userId) {return onlines.get(userId);}public void removeUserSession(Integer userId) {onlines.remove(userId);}public ConcurrentHashMap<Integer, Session> getAllSession() {return this.onlines;}}

二、在websocket连接和关闭的时候,把session关闭掉

@OnOpenpublic void onOpen(Session session,EndpointConfig config) {this.session = session;log.info("当前session id : {}  登录进来了", session.getId());OnlineCalUtils.addOnlineCount();onlineSessionCache.setUserSession(Integer.valueOf(session.getId()), session);log.info("存储session了多少个session:{}", onlineSessionCache.getAllSession().size());log.info("有新连接加入!当前在线人数为 :{} ", getOnlineCount());}
@OnClosepublic void onClose() {OnlineCalUtils.subOnlineCount();log.info("有一连接关闭!当前在线人数为: {}", getOnlineCount());onlineSessionCache.removeUserSession(Integer.valueOf(this.session.getId()));log.info("当前session id : {}  退出去了");}

三、编写一个接口,用来给指定的用户发送消息

package com.websocket.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import com.websocket.model.ChatModel;
import com.websocket.producer.RocketProducer;
import com.websocket.utils.ChatModelUtils;import lombok.extern.slf4j.Slf4j;@RestController
@Slf4j
public class ChatMsgController {@Autowiredprivate RocketProducer rocketProducer;@RequestMapping("/sendToSimpleUser")public String sendToSimpleUser(Integer fromUserId,Integer toUserId) {ChatModel model = ChatModelUtils.createNewChatModel(fromUserId, toUserId, "手动发送消息");rocketProducer.sendDirectMessage(model);return "成功";}}

这里我们是把消息直接发送给了rocketmq里面,发送者代码如下:

package com.websocket.producer;import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSON;
import com.websocket.model.ChatModel;@Component
public class RocketProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendDirectMessage(ChatModel message) {String msg = JSON.toJSONString(message);rocketMQTemplate.syncSend("test8", msg);}}

四、编写消费者,获取mq的消息,并且发送消息给对应的session

package com.websocket.producer;import javax.websocket.Session;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSON;
import com.websocket.model.ChatModel;
import com.websocket.product.SocketServerProduct;
import com.websocket.utils.OnlineSessionCache;import lombok.extern.slf4j.Slf4j;@Component
@Slf4j
@RocketMQMessageListener(topic = "test8", consumerGroup = "${chat.group.groupname}")
public class RocketConsumer implements RocketMQListener<String>{@Autowiredprivate OnlineSessionCache onlineSessionCache;@Autowiredprivate SocketServerProduct socketServerProduct;@Value("${chat.group.groupname}")private String groupName;@Overridepublic void onMessage(String message) {log.info("监听到的topic是:{}  groupname是:{}","test8",groupName);ChatModel model = JSON.parseObject(message, ChatModel.class);Integer userId = model.getToUserId();Session session = onlineSessionCache.getUserSession(userId);if (null != session) {log.info("找到了对应的session,准备回复消息");socketServerProduct.sendMessage(session, model.getMessage());}else {log.info("没有找到对应的session,准备丢弃");}}
}

以上就是一个完整的关于websocket服务端集群关于session共享的解决方案。

WebSocket服务端数据推送及心跳机制(Spring Boot + VUE)_websocket心跳机制-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/195053.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端笔记(三)CSS 盒子模型

结构伪类选择器 基本的结构伪类选择器 可以根据元素的结构关系来查找元素 比如列标签 li&#xff0c;使用 li:first-child { background-color: green; }就可以选中第一个该标签。 <!DOCTYPE html> <html lang"en"> <head><meta charset&q…

智慧能源:数字孪生压缩空气储能管控平台

压缩空气储能在解决可再生能源不稳定性和提供可靠能源供应方面具有重要的优势。压缩空气储能&#xff0c;是指在电网负荷低谷期将电能用于压缩空气&#xff0c;在电网负荷高峰期释放压缩空气推动汽轮机发电的储能方式。通过提高能量转换效率、增加储能密度、快速启动和调节能力…

如何知道B站各分区直播数据趋势?

随着短视频时代的来临&#xff0c;直播行业也越来越火爆&#xff0c;很多博主开启直播之路&#xff0c;B站也顺应了时代发展所需&#xff0c;在直播板块投入颇多&#xff0c;那么在B站开直播&#xff0c;我们应该如何知晓B站每个分区的直播数据情况呢&#xff1f; 借用第三方数…

MySQL练习题,学生成绩查询练习题,附带答案

题目 (一) 新建以下几个表 student(学生表)&#xff1a; snosnamesexdeptbirthagePhone 其中约束如下&#xff1a; &#xff08;1&#xff09; 学号不能存在相同的 sno int auto_increment primary key &#xff08;2&#xff09; 名字为非空 sname varchar(20) not nu…

Excel如何设置在未打印时显示虚线打印时不显示虚线

记得之前分享过一个BOM表模板&#xff0c;但是在我打印时&#xff0c;发现明明是留空白的地方却打印出来的虚线 后来&#xff0c;看了自己的页面布局&#xff0c;原来是网格线设置错误了 当我设置为查看时显示网格线&#xff0c;打印时不显示网格线&#xff0c;这样就正常了

苹果配件妙控鼠标、键盘、触控板值得入手吗

大家好&#xff0c;我是极智视界&#xff0c;欢迎关注我的公众号&#xff0c;获取我的更多前沿科技分享 邀您加入我的知识星球「极智视界」&#xff0c;星球内有超多好玩的项目实战源码和资源下载&#xff0c;链接&#xff1a;https://t.zsxq.com/0aiNxERDq 苹果的优质和成功绝…

STM32存储左右互搏 SPI总线读写FRAM MB85RS16

STM32存储左右互搏 I2C总线读写FRAM MB85RS16 在中低容量存储领域&#xff0c;除了FLASH的使用&#xff0c;&#xff0c;还有铁电存储器FRAM的使用&#xff0c;相对于FLASH&#xff0c;FRAM写操作时不需要预擦除&#xff0c;所以执行写操作时可以达到更高的速度&#xff0c;其…

40 mysql join 的实现

前言 join 是一个我们经常会使用到的一个 用法 我们这里 看一看各个场景下面的 join 的相关处理 测试数据表如下, 两张测试表, tz_test, tz_test03, 表结构 一致 CREATE TABLE tz_test (id int(11) unsigned NOT NULL AUTO_INCREMENT,field1 varchar(128) DEFAULT NULL,fi…

BGP多跳及BGP4+

一、知识补充 1、BGP4 传统BGP-4只管理IPV4路由信息&#xff0c;对于使用其它网络程协议 (若IPV6等)的应用末给予支持。IETF对BGP-4扩展&#xff0c;提出BGP4&#xff0c;可以提供对IPV6、IPX和MPLS VPN的支持 (简单说: 扩展IPV6协议栈支持)。 2、全互联 在上一篇博文中提…

leetcode - 矩阵区域和

1314. 矩阵区域和 - 力扣&#xff08;LeetCode&#xff09; 给你一个 m x n 的矩阵 mat 和一个整数 k &#xff0c;请你返回一个矩阵 answer &#xff0c;其中每个 answer[i][j] 是所有满足下述条件的元素 mat[r][c] 的和&#xff1a; i - k < r < i k, j - k < c …

备忘录模式 rust和java的实现

文章目录 备忘录模式介绍实现javarustrust仓库 备忘录模式 备忘录&#xff08;Memento&#xff09;模式的定义&#xff1a;在不破坏封装性的前提下&#xff0c;捕获一个对象的内部状态&#xff0c;并在该对象之外保存这个状态&#xff0c;以便以后当需要时能将该对象恢复到原先…

【5G PHY】5G NR 如何计算资源块的数量?

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 本人就职于国际知名终端厂商&#xff0c;负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作&#xff0c;目前牵头6G算力网络技术标准研究。 博客…

严蔚敏数据结构p17(2.19)——p18(2.24) (c语言代码实现)

目录 2.19已知线性表中的元素以值递增有序排列,并以单链表作存储结构。试写一高效的算法,删除表中所有值大于 mink 且小于 maxk 的元素(若表中存在这样的元素&#xff09;同时释放被删结点空间,并分析你的算法的时间复杂度(注意:mink 和 maxk 是给定的个参变量,它们的值可以和表…

Hdoop学习笔记(HDP)-Part.12 安装HDFS

目录 Part.01 关于HDP Part.02 核心组件原理 Part.03 资源规划 Part.04 基础环境配置 Part.05 Yum源配置 Part.06 安装OracleJDK Part.07 安装MySQL Part.08 部署Ambari集群 Part.09 安装OpenLDAP Part.10 创建集群 Part.11 安装Kerberos Part.12 安装HDFS Part.13 安装Ranger …

继阿里云、滴滴、语雀后,腾讯视频也出现重大系统故障

昨晚&#xff0c;许多网友报告称腾讯视频出现了网络故障&#xff0c;具体表现为首页无法加载内容、VIP 用户无法观看会员视频等问题。 针对这一问题&#xff0c;腾讯视频回应称&#xff1a;目前腾讯视频遇到了暂时的技术问题&#xff0c;正在紧急修复中&#xff0c;各项功能正在…

在项目根目录未找到 app.json

这个问题就是我们在编译后的app.json文件找不到&#xff0c;路径出现了问题 首先看dist下我们该文件的路径 所以我们需要将该路径配置到我们project.config.json文件中去 在这里新加下面这行代码就可以了&#xff0c; "miniprogramRoot": "dist/dev/mp-weixi…

C语言扫雷游戏

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、扫雷游戏的分析和设计1.1扫雷游戏的功能说明1.2数据结构的分析1.3文件结构设计 二、扫雷游戏的代码实现总结 前言 详细介绍扫雷游戏的思路和实现过程。 一…

基于springboot + vue体育馆使用预约平台

qq&#xff08;2829419543&#xff09;获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;springboot 前端&#xff1a;采用vue技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xf…

自动提交日志脚本(6)浏览器抓包日志提交的数据

主要完成 do_logigdo_write_log 通过python的requests库post数据上传&#xff0c;因为是公司的系统我就展示抓包了&#xff0c;不展示怎么写了。 这边用日志暂存的页面做展示。 步骤 打开对应的页面&#xff0c;再打开浏览器的开发人员工具【一般是按f12】点击暂存按钮&…

每周一算法:背包问题(三)多重背包

多重背包 有 N N N件物品和一个容量是 M M M的背包。第 i i i种物品最多有 s i s_i si​件&#xff0c;每件的体积是 v i v_i vi​&#xff0c;价值是 w i w_i wi​。 求解将哪些物品装入背包&#xff0c;可使这些物品的总体积不超过背包容量&#xff0c;且总价值最大。 输…