即时通讯增加Redis渠道

情况说明

在本地和服务器分别启动im服务,当本地发送消息时,会发现服务器上并没有收到消息

初版im只支持单机版,不支持分布式的情况。此次针对该情况对项目进行优化,文档中贴出的代码非完整代码,可自行查看参考资料[2]

代码结构调整

本次调整需要增加一个redis的渠道,为了方便后续再进行渠道的增加,对现有代码结构进行调整

  • IBaseSendExecutor

渠道扩充接口,后续再增加渠道都可以实现该接口

package com.example.im.infra.executor.send;/*** @author PC* 通信处理*/
public interface IBaseSendExecutor {/*** 获取通信类型,预置的有默认和redis** @return 通讯类型*/String getCommunicationType();/*** 发送给指定人** @param sendUserName 发送人* @param message      消息*/void sendToUser(String sendUserName, String message);/*** 发送给全部人** @param sendUserName 发送人* @param message      消息*/void sendToAll(String sendUserName, String message);
}
  • AbstractBaseSendExecutor

通信处理抽象类,将一些预定义的渠道所需要的公有方法提取出来

package com.example.im.infra.executor.send;import com.example.im.config.WebSocketProperties;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;/*** @author PC*/
public abstract class AbstractBaseSendExecutor implements IBaseSendExecutor {protected WebSocketProperties webSocketProperties;@Autowiredpublic void setWebSocketProperties(WebSocketProperties webSocketProperties) {this.webSocketProperties = webSocketProperties;}/*** 获取接收人信息** @param sendUserName 发送人* @param message      消息* @return 接收人列表*/protected List<String> getReceiverName(String sendUserName, String message) {if (!StringUtils.contains(message, webSocketProperties.getReceiverSeparator())) {return new ArrayList<>();}String[] names = StringUtils.split(message, webSocketProperties.getReceiverSeparator());return Stream.of(names).skip(1).filter(receiver ->!(webSocketProperties.getReceiverExcludesHimselfFlag() && StringUtils.equals(sendUserName, receiver))).collect(Collectors.toList());}/*** 根据配置处理发送的信息** @param message 原消息* @return 被处理后的消息*/protected String generatorMessage(String message) {return BooleanUtils.isTrue(webSocketProperties.getExcludeReceiverInfoFlag()) ?StringUtils.substringBefore(message, webSocketProperties.getReceiverSeparator()) : message;}
}
  • DefaultSendExecutor

原有消息发送逻辑

package com.example.im.infra.executor.send;import com.example.im.endpoint.WebSocketEndpoint;
import com.example.im.infra.constant.ImConstants;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;/*** @author PC* 默认执行*/
@Component
public class DefaultSendExecutor extends AbstractBaseSendExecutor {private final static Logger logger = LoggerFactory.getLogger(DefaultSendExecutor.class);private TaskExecutor taskExecutor;@Autowiredpublic void setTaskExecutor(TaskExecutor taskExecutor) {this.taskExecutor = taskExecutor;}@Overridepublic String getCommunicationType() {return ImConstants.CommunicationType.DEFAULT;}@Overridepublic void sendToUser(String sendUserName, String message) {List<String> receiverNameList = getReceiverName(sendUserName, message);CountDownLatch countDownLatch = new CountDownLatch(receiverNameList.size());Set<String> notOnlineReceiverSet = ConcurrentHashMap.newKeySet();Set<String> finalNotOnlineReceiverSet = notOnlineReceiverSet;receiverNameList.forEach(receiverName -> taskExecutor.execute(() -> {try {if (WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.containsKey(receiverName)) {WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.get(receiverName).getSession().getBasicRemote().sendText(generatorMessage(message));} else {finalNotOnlineReceiverSet.add(receiverName);}} catch (IOException ioException) {logger.error("send error:" + ioException);} finally {countDownLatch.countDown();}}));try {countDownLatch.await();} catch (InterruptedException interruptedException) {logger.error("error.countDownLatch.await");}notOnlineReceiverSet = notOnlineReceiverSet.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toSet());if (CollectionUtils.isNotEmpty(notOnlineReceiverSet)) {logger.info("not online number is " + notOnlineReceiverSet.size());logger.info("The user : {} is not online", String.join(",", notOnlineReceiverSet));}}@Overridepublic void sendToAll(String sendUserName, String message) {for (Map.Entry<String, WebSocketEndpoint> webSocketEndpointEntry : WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.entrySet()) {taskExecutor.execute(() -> {if (webSocketProperties.getReceiverExcludesHimselfFlag() && StringUtils.equals(sendUserName, webSocketEndpointEntry.getKey())) {return;}try {webSocketEndpointEntry.getValue().getSession().getBasicRemote().sendText(generatorMessage(message));} catch (IOException ioException) {logger.error("send error:" + ioException);}});}}
}
  • SendExecutorFactory

发送渠道工厂

package com.example.im.infra.executor.send;import com.example.im.config.WebSocketProperties;
import com.example.im.infra.executor.config.ExecutorConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Optional;/*** @author PC* 发送逻辑工厂*/
@Component
public class SendExecutorFactory {private final WebSocketProperties webSocketProperties;private ExecutorConfiguration executorConfiguration;@Autowiredpublic SendExecutorFactory(WebSocketProperties webSocketProperties) {this.webSocketProperties = webSocketProperties;}@Autowiredpublic void setExecutorConfiguration(ExecutorConfiguration executorConfiguration) {this.executorConfiguration = executorConfiguration;}public void onMessage(String sendUserName, String message) {IBaseSendExecutor iBaseSendExecutor = Optional.ofNullable(executorConfiguration.getBaseSendExecutorMap().get(webSocketProperties.getCommunicationType())).orElse(new DefaultSendExecutor());//包含@发给指定人,否则发给全部人if (StringUtils.contains(message, webSocketProperties.getReceiverSeparator())) {iBaseSendExecutor.sendToUser(sendUserName, message);} else {iBaseSendExecutor.sendToAll(sendUserName, message);}}
}
  • ExecutorConfiguration

加载

package com.example.im.infra.executor.config;import com.example.im.infra.executor.send.IBaseSendExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** @author PC* Executor配置*/
@Component
public class ExecutorConfiguration implements ApplicationContextAware {private final static Logger logger = LoggerFactory.getLogger(ExecutorConfiguration.class);private Map<String, IBaseSendExecutor> baseSendExecutorMap = new HashMap<>(16);private static ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {ExecutorConfiguration.applicationContext = applicationContext;//加载IBaseSendExecutor实现类this.initBaseSendExecutor(applicationContext);}/*** 加载IBaseSendExecutor实现类* 如果一个服务的发送渠道是固定的,可以使用@Bean搭配@ConditionalOnProperty的方式* 但是考虑到后续可能会有一个服务不同发送渠道的场景,采用当前加载方式** @param applicationContext 上下文*/private void initBaseSendExecutor(ApplicationContext applicationContext) {logger.info("Start loading IBaseSendExecutor");Map<String, IBaseSendExecutor> baseSendExecutorMap = applicationContext.getBeansOfType(IBaseSendExecutor.class);for (Map.Entry<String, IBaseSendExecutor> iBaseSendExecutorEntry : baseSendExecutorMap.entrySet()) {String communicationType = iBaseSendExecutorEntry.getValue().getCommunicationType();this.baseSendExecutorMap.put(communicationType, iBaseSendExecutorEntry.getValue());logger.info("initBaseSendExecutor>>>>>>>communicationType:{},className:{}", communicationType, iBaseSendExecutorEntry.getValue().getClass().getName());}logger.info("IBaseSendExecutor loading is complete");}public static ApplicationContext getApplicationContext() {return applicationContext;}public Map<String, IBaseSendExecutor> getBaseSendExecutorMap() {return baseSendExecutorMap;}public void setBaseSendExecutorMap(Map<String, IBaseSendExecutor> baseSendExecutorMap) {this.baseSendExecutorMap = baseSendExecutorMap;}
}

添加redis通信渠道

  • pom.xml
<dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId>
</dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId>
</dependency>
  • application.yml
server:port: 18080
cus:ws:exclude-receiver-info-flag: truereceiver-excludes-himself-flag: truecommunication-type: redis
spring:redis:host: 127.0.0.1port: 6379username: rootpassword: rootdatabase: ${SPRING_REDIS_DATABASE:1}# Redis连接超时时间connect-timeout: ${SPRING_REDIS_CONNECT_TIMEOUT:2000}# Redis读取超时时间timeout: ${SPRING_REDIS_READ_TIMEOUT:5000}lettuce:pool:# 资源池中最大连接数# 默认8,-1表示无限制;可根据服务并发redis情况及服务端的支持上限调整max-active: ${SPRING_REDIS_POOL_MAX_ACTIVE:50}# 资源池运行最大空闲的连接数# 默认8,-1表示无限制;可根据服务并发redis情况及服务端的支持上限调整,一般建议和max-active保持一致,避免资源伸缩带来的开销max-idle: ${SPRING_REDIS_POOL_MAX_IDLE:50}# 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)# 默认 -1 表示永不超时,设置5秒max-wait: ${SPRING_REDIS_POOL_MAX_WAIT:5000}
  • RedisSendExecutor

redis发送

package com.example.im.infra.executor.send.redis;import com.example.im.infra.constant.ImConstants;
import com.example.im.infra.executor.send.AbstractBaseSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.executor.send.dto.ScopeOfSendingEnum;
import com.example.im.infra.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;/*** @author PC* redis执行*/
@Component
public class RedisSendExecutor extends AbstractBaseSendExecutor {private final static Logger logger = LoggerFactory.getLogger(RedisSendExecutor.class);private RedisTemplate<String, String> redisTemplate;@Autowiredpublic void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {this.redisTemplate = redisTemplate;}@Overridepublic String getCommunicationType() {return ImConstants.CommunicationType.REDIS;}@Overridepublic void sendToUser(String sendUserName, String message) {MessageInfo messageInfo = new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.USER);logger.debug("send to user redis websocket, channel is " + "redis-websocket");redisTemplate.convertAndSend("redis-websocket-user", JsonUtils.toJson(messageInfo));}@Overridepublic void sendToAll(String sendUserName, String message) {MessageInfo messageInfo = new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.ALL);logger.debug("send to all redis websocket, channel is " + "redis-websocket");redisTemplate.convertAndSend("redis-websocket-all", JsonUtils.toJson(messageInfo));}
}
  • RedisMessageListener

redis监听

package com.example.im.infra.executor.send.redis;import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;/*** @author PC* redis监听*/
@Component
public class RedisMessageListener implements MessageListener {private final static Logger logger = LoggerFactory.getLogger(RedisMessageListener.class);private DefaultSendExecutor defaultSendExecutor;@Autowiredpublic void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {this.defaultSendExecutor = defaultSendExecutor;}@Overridepublic void onMessage(Message message, byte[] pattern) {//消息内容String messageJson = new String(message.getBody(), StandardCharsets.UTF_8);MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {});switch (messageInfo.getScopeOfSending()) {case USER:defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());break;case ALL:defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());break;default://一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());break;}}
}

测试

本地服务发送消息

服务器接收到了消息

常见问题

打包报错

执行mvn clean packages打包时出现以下错误

[ERROR] contextLoads  Time elapsed: 0.001 s  <<< ERROR!
java.lang.IllegalStateException: Failed to load ApplicationContext
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'serverEndpoint' defined in class path resource [c
om/example/im/config/WebSocketConfig.class]: Invocation of init method failed; nested exception is java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available
Caused by: java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available

查看ServerContainer接口,发现其有两个接口实现类,其中有一个是test包的

将其排除后即可正常打包

jar包启动时no main manifest attribute问题

需将pom的plugin标签中的skip标签删除或设置为false

参考资料

[1].初版im文档

[2].im项目地址

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/56232.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker安装ocserv教程(效果极佳)

本章教程,介绍如何在Debain系统上安装ocserv。安装方式是使用Docker方式部署。 一、安装Docker curl -sSL https://file.ewbang.com/docker/debian/install_docker.sh -o install_docker.sh && bash install_docker.sh二、拉取镜像 docker pull tommylau/ocserv

Jsoup在Java中:解析京东网站数据

对于电商网站如京东来说&#xff0c;其页面上的数据包含了丰富的商业洞察。对于开发者而言&#xff0c;能够从这些网站中提取有价值的信息&#xff0c;进行分析和应用&#xff0c;无疑是一项重要的技能。本文将介绍如何使用Java中的Jsoup库来解析京东网站的数据。 Jsoup简介 …

Linux部署redis保姆级教程

一、版本说明 Redis版本号(本文的版本号是6.2.12)的第二位如果是偶数,代表稳定版本,如果是奇数,代表非稳定版本。 所有历史版本下载地址:Index of /releases/ 二、基于压缩包安装(推荐) 2.1安装依赖 2.1.1安装gcc: yum -y install gcc 2.1.2验证gcc是否安装成功:(…

Linux--多路转接之epoll

上一篇:Linux–多路转接之select epoll epoll 是 Linux 下多路复用 I/O 接口 select/poll 的增强版本&#xff0c;它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统 CPU 利用率。它是 Linux 下多路复用 API 的一个选择&#xff0c;相比 select 和 poll&#xff0c…

DevExpress WPF v24.1新版亮点:PDF查看器、富文本编辑器功能升级

DevExpress WPF拥有120个控件和库&#xff0c;将帮助您交付满足甚至超出企业需求的高性能业务应用程序。通过DevExpress WPF能创建有着强大互动功能的XAML基础应用程序&#xff0c;这些应用程序专注于当代客户的需求和构建未来新一代支持触摸的解决方案。 DevExpress WPF控件日…

1971. 寻找图中是否存在路径

有一个具有 n 个顶点的 双向 图&#xff0c;其中每个顶点标记从 0 到 n - 1&#xff08;包含 0 和 n - 1&#xff09;。图中的边用一个二维整数数组 edges 表示&#xff0c;其中 edges[i] [ui, vi] 表示顶点 ui 和顶点 vi 之间的双向边。 每个顶点对由 最多一条 边连接&#x…

Vue3 学习笔记(一)Vue3 介绍及环境部署

一、Vue.js 简介 1、Vue.js 是什么&#xff1f; Vue.js&#xff08;读音 /vjuː/, 类似于 view&#xff09; 是一套构建用户界面的渐进式框架。Vue 只关注视图层&#xff0c; 采用自底向上增量开发的设计。Vue 的目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件…

性能工具之JMeter 通过Java API生成 BeanShell PreProcessor 脚本

文章目录 一、前言二、实现代码三、代码示例四、最后 一、前言 对于上一篇文章&#xff08;性能工具之 HAR 格式化转换JMeter JMX 脚本文件&#xff09;还是有点问题。大家在使用的情况需要注意。 如果多个接口相同 path 路径且不同参数进行查询如&#xff1a; 上面接口如果…

【前端】如何制作一个自己的网页(15)

有关后代选择器的具体解释&#xff1a; 后代选择器 后代选择器使用时&#xff0c;需要以空格将多个选择器间隔开。 比如&#xff0c;这里p span&#xff0c;表示只设置p元素内&#xff0c;span元素的样式。 <style> /* 使用后代选择器设置样式 */ p span { …

java--多态(详解)

目录 一、概念二、多态实现的条件三、向上转型和向下转型3.1 向上转型3.2 向下转型 四、重写和重载五、理解多态5.1练习&#xff1a;5.2避免在构造方法中调用重写的方法&#xff1a; 欢迎来到权权的博客~欢迎大家对我的博客提出指导这是我的博客主页&#xff1a;点击 一、概念…

Java毕业设计 基于SpringBoot发卡平台

Java毕业设计 基于SpringBoot发卡平台 这篇博文将介绍一个基于SpringBoot发卡平台&#xff0c;适合用于Java毕业设计。 功能介绍 首页 图片轮播 商品介绍 商品详情 提交订单 文章教程 文章详情 查询订单  查看订单卡密 客服   后台管理 登录 个人信息 修改密码 管…

Selenium爬虫技术:如何模拟鼠标悬停抓取动态内容

介绍 在当今数据驱动的世界中&#xff0c;抓取动态网页内容变得越来越重要&#xff0c;尤其是像抖音这样的社交平台&#xff0c;动态加载的评论等内容需要通过特定的方式来获取。传统的静态爬虫方法难以处理这些由JavaScript生成的动态内容&#xff0c;Selenium爬虫技术则是一…

字典如何与选择器一起使用

背景&#xff1a;开发过程中会遇到某些字段需要做成下拉框。如下图&#xff1a; 组件 | Element里有select选择器这个组件可以实现下拉框的效果 我们可能会想到创一个辅助表来存储这些下拉数据像这样 这样虽然能实现&#xff0c;但是在实际开发中是不合理的&#xff0c;如果有…

个税自然人扣缴客户端数据的备份与恢复(在那个文件夹)

一&#xff0c;软件能够正常打开&#xff0c;软件中的备份与恢复功能 1&#xff0c;备份 您按照下面的方法备份一下哦~ 进入要备份的自然人软件&#xff0c;点击左侧系统设置→→系统管理→→备份恢复&#xff1b; 在备份设置里&#xff0c;点击“备份到选择路径”&#xff0c;…

WebGL编程指南 - 颜色与纹理续

设置纹理坐标&#xff08;initVertexBuffers()&#xff09; 从缓冲区到 attribute 变量的流程&#xff1a; // 顶点坐标 function initVertexBuffers(gl) {// 数据准备let verticesTexCoords new Float32Array([// 顶点坐标&#xff0c;纹理坐标-0.5, 0.5, 0.0, 1.0, -0.5, …

图像异常检测评估指标-分类性能

图像异常检测评估指标-分类性能 1. 混淆矩阵 混淆矩阵包括4个用于衡量分类算法性能的基本数值 四个字母代表的含义是&#xff1a;P&#xff08;Positive&#xff09;代表算法将样本预测为正类&#xff0c;N&#xff08;Negative&#xff09;代表算法将样本预测为负类&#xf…

ST7789读取ID错误新思路(以STC32G为例)

1.前言 前两天刚把ST7789写入搞定&#xff0c;这两天想折腾一下读取。最开始是读ID&#xff0c;先是用厂家送的程序&#xff0c;程序里面用的是模拟I8080协议&#xff0c;一切正常。后来我用STC32G的内置LCM模块&#xff0c;发现读取不出来。更神奇的是ID读不出来&#xff0c;…

[项目详解][boost搜索引擎#2] 建立index | 安装分词工具cppjieba | 实现倒排索引

目录 编写建立索引的模块 Index 1. 设计节点 2.基本结构 3.(难点) 构建索引 1. 构建正排索引&#xff08;BuildForwardIndex&#xff09; 2.❗构建倒排索引 3.1 cppjieba分词工具的安装和使用 3.2 引入cppjieba到项目中 倒排索引代码 本篇文章&#xff0c;我们将继续项…

【C++指南】类和对象(四):类的默认成员函数——全面剖析 : 拷贝构造函数

引言 拷贝构造函数是C中一个重要的特性&#xff0c;它允许一个对象通过另一个已创建好的同类型对象来初始化。 了解拷贝构造函数的概念、作用、特点、规则、默认行为以及如何自定义实现&#xff0c;对于编写健壮和高效的C程序至关重要。 C类和对象系列文章&#xff0c;可点击下…

GitLab+Jenkins 实现 Webhook 自动化触发构建

在持续集成和持续部署&#xff08;CI/CD&#xff09;过程中&#xff0c;如何实现代码提交后自动触发构建&#xff1f;今天&#xff0c;我们将通过GitLab与Jenkins的集成&#xff0c;利用Webhook实现自动化触发构建&#xff0c;为你的开发流程注入高效能量&#xff01; 在每次代…