即时通讯代码优化

在线用户逻辑修复

在进行测试时,发现当前代码有个问题,如果test1在服务器进行连接,本地的test2给test1发消息,虽然test1能收到服务器上的信息,但是本地服务日志中会报teset1不在线,需要对该种情况进行修复

修复方案:使用redis存储在线用户

  • WebSocketEndpoint

利用setBit记录登录用户,key为用户名的hashcode,即便有可能冲突,但是概率较小,可以接受。

package com.example.im.endpoint;import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;/*** @author PC*/
@Component
@ServerEndpoint("/ws")
public class WebSocketEndpoint {private final static Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);public static final ConcurrentHashMap<Integer, WebSocketEndpoint> WEB_SOCKET_ENDPOINT_MAP = new ConcurrentHashMap<>();/*** redis中用户SessionMap*/public static final String ONLINE_USER = "cus:ws:online-user";private Session session;private static WebSocketMessageService webSocketMessageService;private static RedisTemplate<String, String> redisTemplate;@Autowiredpublic void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {WebSocketEndpoint.webSocketMessageService = webSocketMessageService;}@Autowiredpublic void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {WebSocketEndpoint.redisTemplate = redisTemplate;}/*** 打开ws连接** @param session 会话*/@OnOpenpublic void onOpen(Session session) {//连接成功String userName = getUserName(session);logger.info("The connection is successful:" + getUserName(session));this.session = session;//是有hash冲突的可能性的,不过触发概率很低,可以忽略int hashCode = userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, true);WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);}/*** 断开ws连接** @param session 会话*/@OnClosepublic void onClose(Session session) {String userName = getUserName(session);int hashCode = userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, false);WEB_SOCKET_ENDPOINT_MAP.remove(hashCode);//断开连接logger.info("Disconnect:" + userName);}/*** 接收到的消息** @param message 消息内容*/@OnMessagepublic void onMessage(String message, Session session) {//接收消息String sendUserName = getUserName(session);webSocketMessageService.sendMessage(sendUserName, message);}private String getUserName(Session session) {return Optional.ofNullable(session.getRequestParameterMap().get("userName")).orElse(new ArrayList<>()).stream().findFirst().orElse("anonymous_users");}public Session getSession() {return session;}public void setSession(Session session) {this.session = session;}
}
  • DefaultSendExecutor

适配改变类型后的WEB_SOCKET_ENDPOINT_MAP,并调整代码结构

package com.example.im.endpoint;import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;/*** @author PC*/
@Component
@ServerEndpoint("/ws")
public class WebSocketEndpoint {private final static Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);public static final ConcurrentHashMap<Integer, WebSocketEndpoint> WEB_SOCKET_ENDPOINT_MAP = new ConcurrentHashMap<>();/*** redis中用户SessionMap*/public static final String ONLINE_USER = "cus:ws:online-user";private Session session;private static WebSocketMessageService webSocketMessageService;private static RedisTemplate<String, String> redisTemplate;@Autowiredpublic void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {WebSocketEndpoint.webSocketMessageService = webSocketMessageService;}@Autowiredpublic void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {WebSocketEndpoint.redisTemplate = redisTemplate;}/*** 打开ws连接** @param session 会话*/@OnOpenpublic void onOpen(Session session) {//连接成功String userName = getUserName(session);logger.info("The connection is successful:" + getUserName(session));this.session = session;//是有hash冲突的可能性的,不过触发概率很低,可以忽略int hashCode = userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, true);WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);}/*** 断开ws连接** @param session 会话*/@OnClosepublic void onClose(Session session) {String userName = getUserName(session);int hashCode = userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, false);WEB_SOCKET_ENDPOINT_MAP.remove(hashCode);//断开连接logger.info("Disconnect:" + userName);}/*** 接收到的消息** @param message 消息内容*/@OnMessagepublic void onMessage(String message, Session session) {//接收消息String sendUserName = getUserName(session);webSocketMessageService.sendMessage(sendUserName, message);}private String getUserName(Session session) {return Optional.ofNullable(session.getRequestParameterMap().get("userName")).orElse(new ArrayList<>()).stream().findFirst().orElse("anonymous_users");}public Session getSession() {return session;}public void setSession(Session session) {this.session = session;}
}

重复代码提取

在增加了stream和redis渠道后,消息监听处的代码有大段重复,可以进行提取处理

  • MessageInfoUtil
package com.example.im.infra.executor.send.util;import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;/*** @author PC* MessageInfo工具类*/
@Component
public class MessageInfoUtil {private final static Logger logger = LoggerFactory.getLogger(MessageInfoUtil.class);private DefaultSendExecutor defaultSendExecutor;@Autowiredpublic void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {this.defaultSendExecutor = defaultSendExecutor;}public MessageInfo sendMessageByMessageInfoByte(byte[] messageInfoByte) {String messageJson = new String(messageInfoByte, StandardCharsets.UTF_8);MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {});switch (messageInfo.getScopeOfSending()) {case USER:defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());break;case ALL:defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());break;default://一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());break;}return messageInfo;}
}
  • RedisMessageListener
package com.example.im.infra.executor.send.redis;import com.example.im.infra.executor.send.util.MessageInfoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;/*** @author PC* redis监听*/
@Component
public class RedisMessageListener implements MessageListener {private final static Logger logger = LoggerFactory.getLogger(RedisMessageListener.class);private MessageInfoUtil messageInfoUtil;@Autowiredpublic void setMessageInfoUtil(MessageInfoUtil messageInfoUtil) {this.messageInfoUtil = messageInfoUtil;}@Overridepublic void onMessage(Message message, byte[] pattern) {logger.debug("send redis info");messageInfoUtil.sendMessageByMessageInfoByte(message.getBody());}
}
  • StreamMessageListener
package com.example.im.infra.executor.send.stream;import com.example.im.infra.executor.send.util.MessageInfoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.util.function.Function;/*** @author PC* 消息队列监听*/
@Component
public class StreamMessageListener {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private MessageInfoUtil messageInfoUtil;@Autowiredpublic void setMessageInfoUtil(MessageInfoUtil messageInfoUtil) {this.messageInfoUtil = messageInfoUtil;}@Beanpublic Function<Flux<Message<byte[]>>, Mono<Void>> listener() {logger.debug("send stream info");return messageInfoFlux -> messageInfoFlux.map(message -> messageInfoUtil.sendMessageByMessageInfoByte(message.getPayload())).then();}
}

未用到的bean不加载

当未用到某些渠道时,无需进行相关配置,如使用了redis渠道,yml中就无需配置kafka信息

Redis渠道

调整config文件,当渠道非redis时对redis渠道相关bean不进行加载

  • WebSocketConfig
package com.example.im.config;import com.example.im.infra.handle.ImRejectExecutionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;import javax.annotation.Resource;/*** @author PC*/
@Configuration
@EnableWebSocket
public class WebSocketConfig {@Resourceprivate WebSocketProperties webSocketProperties;@Beanpublic ServerEndpointExporter serverEndpoint() {return new ServerEndpointExporter();}/**** 配置线程池* @return 线程池*/@Beanpublic TaskExecutor taskExecutor() {WebSocketProperties.ExecutorProperties executorProperties = webSocketProperties.getExecutorProperties();ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 设置核心线程数executor.setCorePoolSize(executorProperties.getCorePoolSize());// 设置最大线程数executor.setMaxPoolSize(executorProperties.getMaxPoolSize());// 设置队列容量executor.setQueueCapacity(executorProperties.getQueueCapacity());// 设置线程活跃时间(秒)executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds());// 设置默认线程名称executor.setThreadNamePrefix("im-");// 设置拒绝策略executor.setRejectedExecutionHandler(new ImRejectExecutionHandler());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);return executor;}
}
  • RedisConfiguration

缓存用到了redis,RedisTemplate需要加载

package com.example.im.config;import com.example.im.infra.executor.send.redis.RedisMessageListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @author PC*/
@Configuration
public class RedisConfiguration {/*** redisTemplate配置** @return RedisTemplate*/@Beanpublic RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();redisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setValueSerializer(stringRedisSerializer);redisTemplate.setStringSerializer(stringRedisSerializer);redisTemplate.setDefaultSerializer(stringRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setHashValueSerializer(stringRedisSerializer);redisTemplate.setConnectionFactory(connectionFactory);return redisTemplate;}@Bean@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 订阅一个或多个频道container.addMessageListener(listenerAdapter, new PatternTopic("redis-websocket-*"));return container;}@Bean@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")MessageListenerAdapter listenerAdapter(RedisMessageListener redisMessageListener) {return new MessageListenerAdapter(redisMessageListener);}
}
  • RedisMessageListener
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
public class RedisMessageListener implements MessageListener 
  • RedisSendExecutor
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
public class RedisSendExecutor extends AbstractBaseSendExecutor 

Kafka渠道

  • StreamMessageListener
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "stream")
public class StreamMessageListener 
  • StreamSendExecutor
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "stream")
public class StreamSendExecutor extends AbstractBaseSendExecutor

参考资料

[1].im项目

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/883112.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-20

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-20 目录 文章目录 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-20目录1. FLARE: Faithful Logic-Aided Reasoning and Exploration摘要研究背景问题与挑战如何解决创新点算法模型实验效果重要数…

多线程进阶——线程池的实现

什么是池化技术 池化技术是一种资源管理策略&#xff0c;它通过重复利用已存在的资源来减少资源的消耗&#xff0c;从而提高系统的性能和效率。在计算机编程中&#xff0c;池化技术通常用于管理线程、连接、数据库连接等资源。 我们会将可能使用的资源预先创建好&#xff0c;…

Ubuntu22.04虚拟机安装

一、安装介质下载&#xff1a; 在官网下载安装镜像&#xff0c;下载地址https://releases.ubuntu.com/22.04/ubuntu-22.04.5-live-server-amd64.iso 二、操作系统安装&#xff1a; step 1:进入ubuntu的安装界面&#xff0c;直接回车安装。 step 2:选择语言&#xff0c;直接回…

liunx线程互斥

临界资源和临界区 临界资源&#xff1a;多线程执行流共享的资源就叫临界资源。 临界区&#xff1a;每个线程中&#xff0c;访问临界区的代码&#xff0c;就叫临界区。 互斥&#xff1a;任何时候&#xff0c;互斥保证只有一个执行流进入临界区&#xff0c;访问临界资源&#…

NXP Smart Access Car-车用产品整合应用

在汽车技术不断进步的今天&#xff0c;智能化已成为汽车行业发展的主要趋势之一。本次研讨会将深入探讨NXP的Smart Access Car技术&#xff0c;说明如何通过NXP 产品设计提升汽车的安全性、便利性和使用者体验。研讨会将涵盖NXP MCU/NFC等方面的最新解决方案&#xff0c;并探讨…

Qt调用Yolov11导出的Onnx分类模型开发分类检测软件

软件视频地址:视频地址 代码开源地址 之前用Python配合YOLOV11开发一个了分类训练软件&#xff0c;软件只要准备好数据&#xff0c;然后导入就可以训练数据&#xff0c;训练完成后还可以验证&#xff0c;测试&#xff0c;但是要真正落地&#xff0c;还是有点欠缺。配合YOLOV1…

入门数据结构JAVADS——如何通过遍历顺序构建二叉树

目录 前言 构建二叉树的前提&#xff1a; 为什么需要两个不同类型的遍历&#xff1a; 前序遍历 中序遍历 我们的算法思路如下: 举例&#xff1a; 代码实现 后序遍历 中序遍历 结尾 前言 入门数据结构JAVA DS——二叉树的介绍 (构建,性质,基本操作等) (1)-CSDN博客 在上…

我毕业后的8年嵌入式工作

2015年毕业&#xff0c;2016年工作到现在已经过了8个年头&#xff0c;借着征文&#xff0c;做个简单的回顾与总结。 2015年从广州番禺职业技术学院毕业&#xff0c;学的是嵌入式技术与应用&#xff0c;我的下一届学弟学妹变物联网了&#xff0c;算是绝版专业了吧。出来后谨遵校…

07 设计模式-结构型模式-桥接模式

桥接&#xff08;Bridge&#xff09;是用于把抽象化与实现化解耦&#xff0c;使得二者可以独立变化。这种类型的设计模式属于结构型模式&#xff0c;它通过提供抽象化和实现化之间的桥接结构&#xff0c;来实现二者的解耦。 这种模式涉及到一个作为桥接的接口&#xff0c;使得…

JAVA单列集合

List系列集合:添加的元素是 有序、可重复、有索引 Set系列集合:添加的元素是 无序、不重复、无索引 Collection Collection是单列集合的接口&#xff0c;它的功能是全部单列集合都可以继承使用的 public boolean add(E e) 把给定的对象添加到当前集合中 public void …

Spring MVC(下)

博主主页: 码农派大星. 数据结构专栏:Java数据结构 数据库专栏:MySQL数据库 JavaEE专栏:JavaEE 关注博主带你了解更多JavaEE知识 目录 1.响应 1.1 返回静态页面 1.2 返回数据ResponseBody 1.3 返回HTML代码⽚段 1.4 返回JSON 1.5 设置状态码 1.6 设置Header 2 . …

【文献及模型、制图分享】基于国际湿地城市视角的常德市湿地保护修复成效与归因分析及其政策启示

文献介绍 《湿地公约》提出的“国际湿地城市”认证是促进湿地保护修复的新举措。以国际湿地城市常德市为例&#xff0c;基于2000—2022年15 m空间分辨率湿地分类数据&#xff0c;监测常德市湿地保护修复逐年动态变化&#xff0c;定量分析湿地保护修复驱动因素的重要性和贡献率…

K8s中TSL证书如何续期

TSL是什么 K8s中的作用是什么&#xff1f; 在 Kubernetes&#xff08;K8s&#xff09;中&#xff0c;TSL 指的是 Transport Layer Security&#xff0c;也就是传输层安全协议。它是用来保护在网络上传输的数据的安全性和隐私性。 TSL 在 Kubernetes 中的作用包括&#xff1a;…

第1讲(ASP.NET Core 6 Web Api 开发入门):第一个Web Api项目

一、运行模板项目 二、验证模板项目的api 法1&#xff1a;直接在网页上进行验证api 法2&#xff1a;通过命令行验证api 复制下图的Curl语句&#xff0c;打开命令行进行粘贴。&#xff08;对于windows系统&#xff0c;需要把换成"&#xff0c;再去掉所有的/&#xff0c;最…

一文了解AOSP是什么?

一文了解AOSP是什么&#xff1f; AOSP基本信息 基本定义 AOSP是Android Open Source Project的缩写&#xff0c;这是一个由Google维护的完全免费和开放的操作系统开发项目。它是Android系统的核心基础&#xff0c;提供了构建移动操作系统所需的基本组件。 主要特点 完全开源…

【景观生态学实验】实验一 ArcGIS地理数据处理及制图基础

实验目的 1.掌握ArcGIS软件基本操作&#xff1a;通过实验操作与学习&#xff0c;熟练掌握ArcGIS软件相关的基本操作&#xff0c;包括界面熟悉、工具栏使用、数据的加载和保存、基本数据处理操作等; 2.掌握如何使用ArcGIS进行影像拼接及裁剪&#xff1a;通过实验操作与学习&am…

传知代码-ChatGPT多模态命名实体识别

代码以及视频讲解 本文所涉及所有资源均在传知代码平台可获取 ChatGPT辅助细化知识增强&#xff01; 多模态命名实体识别&#xff08;MNER&#xff09;最近引起了广泛关注。 用户在社交媒体上生成大量非结构化内容&#xff0c;主要由图像和文本组成。这些帖子具有与社交媒体相…

GISBox vs CesiumLab:哪款GIS工具更适合你的项目?

在地理信息系统&#xff08;GIS&#xff09;领域&#xff0c;越来越多的用户开始关注GIS工具箱的选择&#xff0c;其中GISBox和CesiumLab是两款备受推崇的产品。那么&#xff0c;哪一款更适合你的需求呢&#xff1f;本文将从功能、使用体验和应用场景等方面&#xff0c;对GISBo…

产品如何实现3D展示?具体步骤如下

产品实现3D展示主要依赖于先进的3D建模与展示技术。以下是产品实现3D展示的具体步骤和方法&#xff1a; 一、3D建模 使用专业的3D建模软件&#xff0c;如Blender、Maya、3ds Max等&#xff0c;这些软件提供了丰富的建模工具和材质编辑器&#xff0c;能够创建出高精度的3D模型…

Python基于amazon/chronos-t5-base的预训练模型离线对时间系列数据的未来进行预测

Python基于预训练模型对时间系列数据的未来进行预测 导入库 %matplotlib inline import matplotlib.pyplot as plt import numpy as np import pandas as pd import torch from chronos import ChronosPipeline from tqdm.auto import tqdm from autogluon.timeseries import…