即时通讯代码优化

在线用户逻辑修复

在进行测试时,发现当前代码有个问题,如果test1在服务器进行连接,本地的test2给test1发消息,虽然test1能收到服务器上的信息,但是本地服务日志中会报teset1不在线,需要对该种情况进行修复

修复方案:使用redis存储在线用户

  • WebSocketEndpoint

利用setBit记录登录用户,key为用户名的hashcode,即便有可能冲突,但是概率较小,可以接受。

package com.example.im.endpoint;import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;/*** @author PC*/
@Component
@ServerEndpoint("/ws")
public class WebSocketEndpoint {private final static Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);public static final ConcurrentHashMap<Integer, WebSocketEndpoint> WEB_SOCKET_ENDPOINT_MAP = new ConcurrentHashMap<>();/*** redis中用户SessionMap*/public static final String ONLINE_USER = "cus:ws:online-user";private Session session;private static WebSocketMessageService webSocketMessageService;private static RedisTemplate<String, String> redisTemplate;@Autowiredpublic void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {WebSocketEndpoint.webSocketMessageService = webSocketMessageService;}@Autowiredpublic void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {WebSocketEndpoint.redisTemplate = redisTemplate;}/*** 打开ws连接** @param session 会话*/@OnOpenpublic void onOpen(Session session) {//连接成功String userName = getUserName(session);logger.info("The connection is successful:" + getUserName(session));this.session = session;//是有hash冲突的可能性的,不过触发概率很低,可以忽略int hashCode = userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, true);WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);}/*** 断开ws连接** @param session 会话*/@OnClosepublic void onClose(Session session) {String userName = getUserName(session);int hashCode = userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, false);WEB_SOCKET_ENDPOINT_MAP.remove(hashCode);//断开连接logger.info("Disconnect:" + userName);}/*** 接收到的消息** @param message 消息内容*/@OnMessagepublic void onMessage(String message, Session session) {//接收消息String sendUserName = getUserName(session);webSocketMessageService.sendMessage(sendUserName, message);}private String getUserName(Session session) {return Optional.ofNullable(session.getRequestParameterMap().get("userName")).orElse(new ArrayList<>()).stream().findFirst().orElse("anonymous_users");}public Session getSession() {return session;}public void setSession(Session session) {this.session = session;}
}
  • DefaultSendExecutor

适配改变类型后的WEB_SOCKET_ENDPOINT_MAP,并调整代码结构

package com.example.im.endpoint;import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;/*** @author PC*/
@Component
@ServerEndpoint("/ws")
public class WebSocketEndpoint {private final static Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);public static final ConcurrentHashMap<Integer, WebSocketEndpoint> WEB_SOCKET_ENDPOINT_MAP = new ConcurrentHashMap<>();/*** redis中用户SessionMap*/public static final String ONLINE_USER = "cus:ws:online-user";private Session session;private static WebSocketMessageService webSocketMessageService;private static RedisTemplate<String, String> redisTemplate;@Autowiredpublic void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {WebSocketEndpoint.webSocketMessageService = webSocketMessageService;}@Autowiredpublic void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {WebSocketEndpoint.redisTemplate = redisTemplate;}/*** 打开ws连接** @param session 会话*/@OnOpenpublic void onOpen(Session session) {//连接成功String userName = getUserName(session);logger.info("The connection is successful:" + getUserName(session));this.session = session;//是有hash冲突的可能性的,不过触发概率很低,可以忽略int hashCode = userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, true);WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);}/*** 断开ws连接** @param session 会话*/@OnClosepublic void onClose(Session session) {String userName = getUserName(session);int hashCode = userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, false);WEB_SOCKET_ENDPOINT_MAP.remove(hashCode);//断开连接logger.info("Disconnect:" + userName);}/*** 接收到的消息** @param message 消息内容*/@OnMessagepublic void onMessage(String message, Session session) {//接收消息String sendUserName = getUserName(session);webSocketMessageService.sendMessage(sendUserName, message);}private String getUserName(Session session) {return Optional.ofNullable(session.getRequestParameterMap().get("userName")).orElse(new ArrayList<>()).stream().findFirst().orElse("anonymous_users");}public Session getSession() {return session;}public void setSession(Session session) {this.session = session;}
}

重复代码提取

在增加了stream和redis渠道后,消息监听处的代码有大段重复,可以进行提取处理

  • MessageInfoUtil
package com.example.im.infra.executor.send.util;import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;/*** @author PC* MessageInfo工具类*/
@Component
public class MessageInfoUtil {private final static Logger logger = LoggerFactory.getLogger(MessageInfoUtil.class);private DefaultSendExecutor defaultSendExecutor;@Autowiredpublic void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {this.defaultSendExecutor = defaultSendExecutor;}public MessageInfo sendMessageByMessageInfoByte(byte[] messageInfoByte) {String messageJson = new String(messageInfoByte, StandardCharsets.UTF_8);MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {});switch (messageInfo.getScopeOfSending()) {case USER:defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());break;case ALL:defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());break;default://一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());break;}return messageInfo;}
}
  • RedisMessageListener
package com.example.im.infra.executor.send.redis;import com.example.im.infra.executor.send.util.MessageInfoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;/*** @author PC* redis监听*/
@Component
public class RedisMessageListener implements MessageListener {private final static Logger logger = LoggerFactory.getLogger(RedisMessageListener.class);private MessageInfoUtil messageInfoUtil;@Autowiredpublic void setMessageInfoUtil(MessageInfoUtil messageInfoUtil) {this.messageInfoUtil = messageInfoUtil;}@Overridepublic void onMessage(Message message, byte[] pattern) {logger.debug("send redis info");messageInfoUtil.sendMessageByMessageInfoByte(message.getBody());}
}
  • StreamMessageListener
package com.example.im.infra.executor.send.stream;import com.example.im.infra.executor.send.util.MessageInfoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.util.function.Function;/*** @author PC* 消息队列监听*/
@Component
public class StreamMessageListener {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private MessageInfoUtil messageInfoUtil;@Autowiredpublic void setMessageInfoUtil(MessageInfoUtil messageInfoUtil) {this.messageInfoUtil = messageInfoUtil;}@Beanpublic Function<Flux<Message<byte[]>>, Mono<Void>> listener() {logger.debug("send stream info");return messageInfoFlux -> messageInfoFlux.map(message -> messageInfoUtil.sendMessageByMessageInfoByte(message.getPayload())).then();}
}

未用到的bean不加载

当未用到某些渠道时,无需进行相关配置,如使用了redis渠道,yml中就无需配置kafka信息

Redis渠道

调整config文件,当渠道非redis时对redis渠道相关bean不进行加载

  • WebSocketConfig
package com.example.im.config;import com.example.im.infra.handle.ImRejectExecutionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;import javax.annotation.Resource;/*** @author PC*/
@Configuration
@EnableWebSocket
public class WebSocketConfig {@Resourceprivate WebSocketProperties webSocketProperties;@Beanpublic ServerEndpointExporter serverEndpoint() {return new ServerEndpointExporter();}/**** 配置线程池* @return 线程池*/@Beanpublic TaskExecutor taskExecutor() {WebSocketProperties.ExecutorProperties executorProperties = webSocketProperties.getExecutorProperties();ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 设置核心线程数executor.setCorePoolSize(executorProperties.getCorePoolSize());// 设置最大线程数executor.setMaxPoolSize(executorProperties.getMaxPoolSize());// 设置队列容量executor.setQueueCapacity(executorProperties.getQueueCapacity());// 设置线程活跃时间(秒)executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds());// 设置默认线程名称executor.setThreadNamePrefix("im-");// 设置拒绝策略executor.setRejectedExecutionHandler(new ImRejectExecutionHandler());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);return executor;}
}
  • RedisConfiguration

缓存用到了redis,RedisTemplate需要加载

package com.example.im.config;import com.example.im.infra.executor.send.redis.RedisMessageListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @author PC*/
@Configuration
public class RedisConfiguration {/*** redisTemplate配置** @return RedisTemplate*/@Beanpublic RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();redisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setValueSerializer(stringRedisSerializer);redisTemplate.setStringSerializer(stringRedisSerializer);redisTemplate.setDefaultSerializer(stringRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setHashValueSerializer(stringRedisSerializer);redisTemplate.setConnectionFactory(connectionFactory);return redisTemplate;}@Bean@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 订阅一个或多个频道container.addMessageListener(listenerAdapter, new PatternTopic("redis-websocket-*"));return container;}@Bean@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")MessageListenerAdapter listenerAdapter(RedisMessageListener redisMessageListener) {return new MessageListenerAdapter(redisMessageListener);}
}
  • RedisMessageListener
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
public class RedisMessageListener implements MessageListener 
  • RedisSendExecutor
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
public class RedisSendExecutor extends AbstractBaseSendExecutor 

Kafka渠道

  • StreamMessageListener
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "stream")
public class StreamMessageListener 
  • StreamSendExecutor
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "stream")
public class StreamSendExecutor extends AbstractBaseSendExecutor

参考资料

[1].im项目

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/883112.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FastDFS扩容操作

FastDFS扩容操作 3.FastDFS 扩容操作3.1 迁移到全新的FastDFS3.1.1 部署全新的FastDFS3.1.2 Storage连接即将被迁移的Tracker3.1.3 启动Storage节点3.1.4 在老FastDFS服务器上查看同步进程3.1.5 停止storage节点3.1.6 Storage连接全新的的Tracker3.1.7 修改.data_init_flag文件…

百度SEO前10关键词排名波动跟用户行为反馈有很大关系

大家好&#xff0c;我是林汉文&#xff08;谷歌SEO专家&#xff09;&#xff0c;在百度SEO优化中&#xff0c;网站的排名并非一成不变&#xff0c;尤其是前10名的位置&#xff0c;更是动态变化。很多站长可能会发现&#xff0c;有时明明内容质量不错&#xff0c;外链也稳定&…

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-20

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-20 目录 文章目录 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-20目录1. FLARE: Faithful Logic-Aided Reasoning and Exploration摘要研究背景问题与挑战如何解决创新点算法模型实验效果重要数…

多线程进阶——线程池的实现

什么是池化技术 池化技术是一种资源管理策略&#xff0c;它通过重复利用已存在的资源来减少资源的消耗&#xff0c;从而提高系统的性能和效率。在计算机编程中&#xff0c;池化技术通常用于管理线程、连接、数据库连接等资源。 我们会将可能使用的资源预先创建好&#xff0c;…

Ubuntu22.04虚拟机安装

一、安装介质下载&#xff1a; 在官网下载安装镜像&#xff0c;下载地址https://releases.ubuntu.com/22.04/ubuntu-22.04.5-live-server-amd64.iso 二、操作系统安装&#xff1a; step 1:进入ubuntu的安装界面&#xff0c;直接回车安装。 step 2:选择语言&#xff0c;直接回…

liunx线程互斥

临界资源和临界区 临界资源&#xff1a;多线程执行流共享的资源就叫临界资源。 临界区&#xff1a;每个线程中&#xff0c;访问临界区的代码&#xff0c;就叫临界区。 互斥&#xff1a;任何时候&#xff0c;互斥保证只有一个执行流进入临界区&#xff0c;访问临界资源&#…

iframe里放的视频,如何采用纯css适配

步骤1&#xff1a;设置包含iframe的父元素 首先&#xff0c;确保iframe的父容器具有一个适当的宽高比。通过为父容器设置一个相对定位和一定的宽度和高度&#xff0c;你可以控制它的尺寸。 <div class"video-container"><iframe src"https://www.exa…

NXP Smart Access Car-车用产品整合应用

在汽车技术不断进步的今天&#xff0c;智能化已成为汽车行业发展的主要趋势之一。本次研讨会将深入探讨NXP的Smart Access Car技术&#xff0c;说明如何通过NXP 产品设计提升汽车的安全性、便利性和使用者体验。研讨会将涵盖NXP MCU/NFC等方面的最新解决方案&#xff0c;并探讨…

Qt调用Yolov11导出的Onnx分类模型开发分类检测软件

软件视频地址:视频地址 代码开源地址 之前用Python配合YOLOV11开发一个了分类训练软件&#xff0c;软件只要准备好数据&#xff0c;然后导入就可以训练数据&#xff0c;训练完成后还可以验证&#xff0c;测试&#xff0c;但是要真正落地&#xff0c;还是有点欠缺。配合YOLOV1…

Qt 中实现进程保护的方法

引言 在开发桌面应用程序时&#xff0c;确保应用程序的稳定运行和防止非法关闭是非常重要的。本文将介绍如何在 Qt 框架中实现进程保护&#xff0c;以提高应用程序的稳定性和安全性。 什么是进程保护&#xff1f; 进程保护是一种确保应用程序持续运行的机制。它可以防止应用程序…

入门数据结构JAVADS——如何通过遍历顺序构建二叉树

目录 前言 构建二叉树的前提&#xff1a; 为什么需要两个不同类型的遍历&#xff1a; 前序遍历 中序遍历 我们的算法思路如下: 举例&#xff1a; 代码实现 后序遍历 中序遍历 结尾 前言 入门数据结构JAVA DS——二叉树的介绍 (构建,性质,基本操作等) (1)-CSDN博客 在上…

我毕业后的8年嵌入式工作

2015年毕业&#xff0c;2016年工作到现在已经过了8个年头&#xff0c;借着征文&#xff0c;做个简单的回顾与总结。 2015年从广州番禺职业技术学院毕业&#xff0c;学的是嵌入式技术与应用&#xff0c;我的下一届学弟学妹变物联网了&#xff0c;算是绝版专业了吧。出来后谨遵校…

07 设计模式-结构型模式-桥接模式

桥接&#xff08;Bridge&#xff09;是用于把抽象化与实现化解耦&#xff0c;使得二者可以独立变化。这种类型的设计模式属于结构型模式&#xff0c;它通过提供抽象化和实现化之间的桥接结构&#xff0c;来实现二者的解耦。 这种模式涉及到一个作为桥接的接口&#xff0c;使得…

redis的string是怎么实现的

Redis 的 String 类型是最基本的数据类型&#xff0c;底层通过多种方式实现&#xff0c;能够存储字符、整数、浮点数等各种形式的值。String 数据结构的实现基于 Redis 的简单动态字符串&#xff08;SDS&#xff09;&#xff0c;同时在处理不同的数据类型时也进行了优化。 1. …

实用AI工具推荐

在当今数字化时代&#xff0c;AI工具已经成为提升工作效率的重要助手。 以下是一些实用AI工具的推荐&#xff0c;它们能在不同领域帮助你提高生产力&#xff1a; ChatGPT&#xff1a;由OpenAI开发&#xff0c;擅长文本生成、撰写文章、回答问题和编程辅助&#xff0c;支持多语言…

JAVA单列集合

List系列集合:添加的元素是 有序、可重复、有索引 Set系列集合:添加的元素是 无序、不重复、无索引 Collection Collection是单列集合的接口&#xff0c;它的功能是全部单列集合都可以继承使用的 public boolean add(E e) 把给定的对象添加到当前集合中 public void …

AI学习指南深度学习篇-自注意力机制(Self-Attention Mechanism)

AI学习指南深度学习篇—自注意力机制&#xff08;Self-Attention Mechanism&#xff09; 在深度学习的研究领域&#xff0c;自注意力机制&#xff08;Self-Attention Mechanism&#xff09;作为一种创新的模型结构&#xff0c;已成为了神经网络领域的一个重要组成部分&#xf…

附录章节:SQL标准与方言对比

目录 1. SQL标准 2. 常见的SQL方言及其特性 3. 对比总结 附录 B: 常见错误及解决方案 1. 语法错误 2. 数据类型不匹配 3. 索引缺失 4. 存储过程执行失败 5. 锁定问题 6. 性能瓶颈 附录 C: 进一步阅读资源 1. 书籍 2. 在线资源 3. 视频课程 1. SQL标准 定义: SQL…

SpringBoot 事务管理 @Transactional

Spring JDBC的事务管理 事务&#xff08;Transaction&#xff09;&#xff1a;是关系型数据库中一种能够保障多个写操作&#xff08;增、删、改&#xff09;要么全部成功&#xff0c;要么全部失败的机制。 在基于Spring JDBC的项目中&#xff0c;只需要在业务方法上添加Trans…

Java基于数据库的分布式可重入锁(带等待时间和过期时间)

文章目录 技术背景介绍代码实现数据库表结构尝试获取锁续约阻塞式获取锁解锁检查锁是否过期或者释放 使用示例优化方案 项目代码 技术背景介绍 一般分布式锁使用最方便的就是使用redis实现&#xff0c;因为他自带超时过期机制、发布订阅模式、高吞吐高性能的优势&#xff0c;…