使用Redis发布订阅模式实现 Session共享

其实并不是实现session共享,而是通过redis的发布订阅,让所有集群的服务器,都让自己的session发送一下消息。比如说userId在第35台服务器上, 有100台服务器,那么第1台服务器收到消息,需要通知userId,不是找到第35台服务器,而是通知所有的服务器,给userId发条消息,其他99台服务器没有userId,那就发送不成功!

1、配置redis

package com.kakarote.crm.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kakarote.crm.constant.RedisConstants;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.time.Duration;@Configuration
public class CrmTemplateConfig {@Value("${spring.redis.host}")private String redisHost;@Value("${spring.redis.port}")private int redisPort;@Value("${spring.redis.password}")private String redisHasrdpwd;@Value("${spring.redis.database}")private Integer database;@Bean(name = "crmRedisTemplate")public RedisTemplate redisTemplate() {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory(database, redisHost, redisPort, redisHasrdpwd));template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());return template;}public RedisConnectionFactory connectionFactory(int database, String hostName, int port, String password) {RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();configuration.setHostName(hostName);configuration.setPort(port);if (StringUtils.isNotBlank(password)) {configuration.setPassword(password);}if (database != 0) {configuration.setDatabase(database);}GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();genericObjectPoolConfig.setMaxIdle(10);genericObjectPoolConfig.setMinIdle(10);genericObjectPoolConfig.setMaxTotal(100);genericObjectPoolConfig.setMaxWaitMillis(3000);LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder().commandTimeout(Duration.ofMillis(8000)).poolConfig(genericObjectPoolConfig).build();LettuceConnectionFactory lettuce = new LettuceConnectionFactory(configuration, clientConfig);lettuce.afterPropertiesSet();return lettuce;}/*** Redis消息监听器容器* 这个容器加载了RedisConnectionFactory和消息监听器* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理** @return redis消息监听容器*/@Bean@SuppressWarnings("all")public RedisMessageListenerContainer container(RedisMessageListener listener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 监听所有库的key过期事件container.setConnectionFactory(connectionFactory(database, redisHost, redisPort, redisHasrdpwd));// 所有的订阅消息,都需要在这里进行注册绑定,new PatternTopic(TOPIC_NAME1)表示发布的主题信息// 可以添加多个 messageListener,配置不同的通道container.addMessageListener(listener, new PatternTopic(RedisConstants.WEBSOCKET_REDIS_TOPIC));/*** 设置序列化对象* 特别注意:1. 发布的时候需要设置序列化;订阅方也需要设置序列化*         2. 设置序列化对象必须放在[加入消息监听器]这一步后面,否则会导致接收器接收不到消息*/Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);seria.setObjectMapper(objectMapper);container.setTopicSerializer(seria);return container;}
}

2、配置RedisMessageListener

package com.kakarote.crm.config;import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONObject;
import com.kakarote.crm.constant.CrmConst;
import com.kakarote.crm.entity.BO.MessageDto;
import com.kakarote.crm.websocket.TransferCallWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RedisMessageListener implements MessageListener {@Autowiredprivate RedisTemplate<String, Object> crmRedisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {// 接收的topiclog.info("RedisMessageListener-接收到消息1,channel:" + new String(pattern));try {//序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)MessageDto messageDto = (MessageDto) crmRedisTemplate.getValueSerializer().deserialize(message.getBody());log.info("RedisMessageListener-接收到消息2,channel = {}, messageDto = {}", new String(pattern), messageDto);if(messageDto == null){log.info("RedisMessageListener-messageDto = null,无消息进行发送! message = {}", JSONUtil.toJsonStr(message));return;}if(CrmConst.NOTICE_MSG.equals(messageDto.getTitle())){JSONObject content = messageDto.getContent();String toUserId = content.getString("toUserId");String fromUserId = content.getString("fromUserId");JSONObject msg = content.getJSONObject("msg");String resp = TransferCallWebSocket.sendMsgByUserId(fromUserId, toUserId, JSONUtil.toJsonStr(msg));if(!resp.equals("success")){log.info("RedisMessageListener-发送弹框消息,resp = {},content = {}", resp, content);}}}catch (Exception e){log.info("RedisMessageListener-监听消息处理失败,失败原因 = {}, e = ", e.getMessage(), e);}}
}

3、静态类

/*** @description: 常量类* @dateTime: 2021/6/17 16:21*/
public class RedisConstants {/*** UTF-8 字符集*/public static final String UTF8 = "UTF-8";public final static String WEBSOCKET_REDIS_TOPIC = "websocket_topic";public static final String TRANSFER_NOTICE = "transferCallNotice";	public static final String NOTICE_MSG = "noticeMessage";
}

4、消息体

package com.kakarote.crm.entity.BO;import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@AllArgsConstructor
@NoArgsConstructor
@Data
public class MessageDto implements Serializable {private String data;private String title;private JSONObject content;
}

5、业务类像通道发送消息

    /*** 向通道发布消息*/public boolean convertAndSend(String channel, Object message) {if (StringUtil.isBlank(channel)) {return false;}try {crmRedisTemplate.convertAndSend(channel, message);log.info("发送消息成功,channel:{},message:{}", channel, message);return true;} catch (Exception e) {log.info("发送消息失败,channel:{},message:{}, 失败原因 = {}, e = ", channel, message, e.getMessage(), e);e.printStackTrace();}return false;}

6、websocket配置

@Configuration
@ComponentScan
@EnableAutoConfiguration
public class WebSocketConfiguration implements ServletContextInitializer {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}@Beanpublic TaskScheduler taskScheduler(){ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();taskScheduler.setPoolSize(10);taskScheduler.initialize();return taskScheduler;}@Overridepublic void onStartup(ServletContext servletContext) throws ServletException {servletContext.addListener(WebAppRootListener.class);servletContext.setInitParameter("org.apache.tomcat.websocket.textBufferSize","52428800");servletContext.setInitParameter("org.apache.tomcat.websocket.binaryBufferSize","52428800");}
}

7、websocket Controller类

@ServerEndpoint("/crmDzhWebsocket/transferWebsocket/{userId}")
@Component
@Slf4j
public class TransferCallWebSocket {/*** 当前在线连接数*/private static AtomicInteger onlineCount = new AtomicInteger(0);/*** 用来存放每个客户端对应的 WebSocketServer 对象*/private static final ConcurrentHashMap<String, Session> webSocketMap = new ConcurrentHashMap<>();/*** 与某个客户端的连接会话,需要通过它来给客户端发送数据*/private Session session;/*** 接收 userId*/private String userIdKey = "";/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam("userId") String userId) {this.session = session;this.userIdKey = userId;if (webSocketMap.containsKey(userId)) {webSocketMap.remove(userId);webSocketMap.put(userId, session);} else {webSocketMap.put(userId, session);addOnlineCount();}log.info("转接通知用户连接:" + userId + ",当前总在线人数为:" + getOnlineCount());try {sendMessage("success");} catch (IOException e) {log.error("转接通知用户:" + userId + ",网络异常!!!!!!");log.info("转接通知用户连接:" + userId + ",网络异常!!!!!!");}}/*** 连接关闭调用的方法*/@OnClosepublic void onClose() {if (webSocketMap.containsKey(userIdKey)) {webSocketMap.remove(userIdKey);subOnlineCount();}log.info("转接通知用户退出:" + userIdKey + ",当前总在线人数为:" + getOnlineCount());}/*** 收到客户端消息后调用的方法** @param message 客户端发送过来的消息*/@OnMessagepublic void onMessage(String message, Session session) {try {if ("ping".equals(message)) {webSocketMap.get(this.userIdKey).getBasicRemote().sendText("pong");return;}log.info("this.userIdKey = {}, message = {}", this.userIdKey, message);} catch (IOException e) {log.error("转接通知发送消息失败,失败原因 = {}, e = ", e.getMessage(), e);e.printStackTrace();}}public static String sendMsgByUserId(String fromUserId, String toUserId, String msg) throws IOException {if(webSocketMap.get(toUserId) != null){try {webSocketMap.get(toUserId).getBasicRemote().sendText(msg);return "success";}catch (Exception e){log.error("发送消息失败,fromUserId = {}, toUserId = {}", fromUserId, toUserId);return e.getMessage();}}return "userId:" + toUserId + "当前不在会话中";}/*** 发生错误时调用** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {log.info("用户错误:" + session.getId() + ",原因:" + error.getMessage());}/*** 实现服务器主动推送*/public void sendMessage(String message) throws IOException {this.session.getBasicRemote().sendText(message);}public static synchronized AtomicInteger getOnlineCount() {return onlineCount;}public static synchronized void addOnlineCount() {TransferCallWebSocket.onlineCount.getAndIncrement();}public static synchronized void subOnlineCount() {TransferCallWebSocket.onlineCount.getAndDecrement();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/114215.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue 组件封装 综合案例2

vue 组件封装 综合案例2 main.js import Vue from vue import App from ./App.vueVue.config.productionTip false//封装全局指令 focus Vue.directive(focus, {// 指令所在的dom元素&#xff0c;被插入到页面中时触发inserted(el) {el.focus();} })new Vue({render: h >…

MySQL 三大日志(bin log、redo log、undo log)

redo log redo log (重做日志) 是 InnoDB 存储引擎独有的&#xff0c;它让 MySQL有了崩溃恢复的能力&#xff0c;是事务中实现 持久化的重要操作 比如 MySQL 实例宕机了&#xff0c;重启时&#xff0c;InnoDB 存储引擎会使用 redo log 恢复数据&#xff0c;保证数据的持久性与…

设计模式——七大原则详解

目录 设计模式单一职责原则应用实例注意事项和细节 接口隔离原则应用实例 依赖倒转&#xff08;倒置&#xff09;原则基本介绍实例代码依赖关系传递的三种方式注意事项和细节 里氏替换原则基本介绍实例代码 开闭原则基本介绍实例代码 迪米特法则基本介绍实例代码注意事项和细节…

golang笔记17--编译调试go源码

golang笔记17--编译调试go源码 前置条件编译源码在 fmt 包中加自定义函数说明 当前go语言越来越流行了&#xff0c;各大厂商都有加大go工程师的需求&#xff0c;作为go语言的学习者&#xff0c;我们除了要了解如何使用go语言外&#xff0c;也有必要了解一下如何编译、调试go源码…

解决XXLJOB重复执行问题--Redis加锁+注解+AOP

基于Redis加锁注解AOP解决JOB重复执行问题 现象解决方案自定义注解定义AOP策略redis 加锁实践 现象 线上xxljob有时候会遇到同一个任务在调度的时候重复执行&#xff0c;如下图&#xff1a; 线上JOB服务运行了2个实例&#xff0c;有时候会重复调度到同一个实例&#xff0c;有…

交换机端口灯常亮 端口up状态 服务器设置ip交换机获取不到服务器网卡mac地址 不能通信

环境: 深信服防火墙 8.0.75 AF-2000-FH2130B-SC S6520X-24ST-SI交换机 version 7.1.070, Release 6530P02 问题描述: 交换机一个vlan下有3台服务器,连接端口2、3、4,2和3连接的服务器正常,交换机3端口灯常亮 端口up状态 服务器自动获取不了地址,改为手动设置ip后,交…

力扣labuladong——一刷day01

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、21. 合并两个有序链表二、力扣86. 分隔链表三、力扣23. 合并 K 个升序链表四、力扣删除链表的倒数第 N 个结点 前言 一、21. 合并两个有序链表 /*** Defin…

微信小程序 onLoad和onShow的区别

在微信小程序中&#xff0c;onLoad() 和 onShow() 是两个常用的生命周期函数&#xff0c;用于监听页面的加载和显示事件。这两个函数的区别如下&#xff1a; 触发时机 onLoad() 函数只会在页面加载时触发一次&#xff0c;而 onShow() 函数每次页面显示时都会被触发。因此&#…

Xubuntu16.04系统中安装create_ap创建无线AP

1.背景说明 在Xubuntu16.04系统的设备上安装无线WIFI模块后&#xff0c;想通过设备自身的无线AP&#xff0c;进行和外部设备的连接&#xff0c;需要安装create_ap软件&#xff0c;并设置无线AP的名称和密码&#xff0c;并设置为开机自启动。 create_ap是一个用于在Linux系统上创…

Addressable使用指南

1、基础用法就不再赘述了&#xff0c;重要的属性配置&#xff1a; Disable Catalog Update on Startup&#xff1a;禁用时在初始化Addressables的时候自动更新远程的catalog&#xff08;启用后可以通过代码 Addressables.CheckForCatalogUpdates()更新&#xff09; Use…

开源贡献难吗?

本文整理自字节跳动 Flink SQL 技术负责人李本超在 CommunityOverCode Asia 2023 上的 Keynote 演讲&#xff0c;李本超根据自己在开源社区的贡献经历&#xff0c;基于他在贡献开源社区过程中的一些小故事和思考&#xff0c;如何克服困难&#xff0c;在开源社区取得突破&#x…

BetaFlight飞控AOCODAF435V2MPU6500固件编译

BetaFlight飞控AOCODAF435V2MPU6500固件编译 1. 源由2. 准备2.1 板子2.2 代码2.3 工具 3. 配置修改4. 编译4.1 获取代码4.2 获取配置4.3 编译固件4.4 DFU烧录4.5 版本核对 5. 总结 1. 源由 刚拿到一块Aocoda F405V2 (MPU6500) AT32F435飞控板(替换主控芯片)。 Aocoda-RC F40…

金融机器学习方法:K-均值算法

目录 1.算法介绍 2.算法原理 3.python实现示例 1.算法介绍 K均值聚类算法是机器学习和数据分析中常用的无监督学习方法之一&#xff0c;主要用于数据的分类。它的目标是将数据划分为几个独特的、互不重叠的子集或“集群”&#xff0c;以使得同一集群内的数据点彼此相似&…

tomcat 服务器

tomcat 服务器 tomcat: 是一个开源的web应用服务器。区别nginx&#xff0c;nginx主要处理静态页面&#xff0c;那么动态请求&#xff08;连接数据库&#xff0c;动态页面&#xff09;并不是nginx的长处&#xff0c;动态的请求会交给tomcat进行处理。 nginx-----转发动态请求-…

【5G PHY】5G SS/PBCH块介绍(一)

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 本人就职于国际知名终端厂商&#xff0c;负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作&#xff0c;目前牵头6G算力网络技术标准研究。 博客…

使用crul库和R语言的下载器程序

以下是一个使用crul库和R语言的下载器程序&#xff0c;用于从下载音频。此程序使用了jshk.com.cn/get_proxy的代码。 // 导入必要的库 import ("fmt""github.com/cjlapa/crul""io""net/http""net/url""os" )// 主…

在 Python 中使用 Pillow 进行图像处理【3/4】

第三部分 一、腐蚀和膨胀 您可以查看名为 的图像文件dot_and_hole.jpg&#xff0c;您可以从本教程链接的存储库中下载该文件&#xff1a; 该二值图像的左侧显示黑色背景上的白点&#xff0c;而右侧显示纯白色部分中的黑洞。 侵蚀是从图像边界去除白色像素的过程。您可以通过使用…

运算符重载的三种实现方法

一、重载为一般函数 格式&#xff1a;返回类型 operator 运算符(参数列表) struct Complex{//定义一个复数结构&#xff1a;包括实部与虚部两部分 double real;//实部 double imag;//虚部 }; Complex operator(Complex c1,Complex c2){//对加法运算的重载&#xff1a;将运算符…

vue重修之路由【上】

文章目录 单页应用程序: SPA - Single Page Application路由简介Vue Reouter简介VueRouter的使用&#xff08;52&#xff09;组件的存放目录问题组件分类存放目录 路由的封装抽离 单页应用程序: SPA - Single Page Application 单页面应用(SPA): 所有功能在 一个html页面 上 单…

python调用astra进行人脸检测(使用CascadeClassifier)

1、简述 方法&#xff1a;使用opecv中&#xff0c;CascadeClassifier 级联分类器实现人脸检测&#xff0c;CascadeClassifier就是opencv下objdetect模块中用来做目标检测的级联分类器的一个类&#xff0c;它可以帮助我们检测例如车牌、眼睛、人脸等物体。它的大概原理就是判别…