Spring Cloud + Nacos 集成Netty Socket.IO

        项目需要集成实时消息通讯,所以尝试在项目中集成websocket。技术上选择了Socket.io,前/后端统一使用此开源项目来实现需求。

一、版本

spring cloud:  2022.0.4

注册中心: nacos 

Netty-Socket.io : 2.0.9

<dependency><groupId>com.corundumstudio.socketio</groupId><artifactId>netty-socketio</artifactId><version>${netty-socketio.version}</version>
</dependency>

前端:vue3、socket.io-client【 4.7.4】

二、关键代码

socket event handler

@Component
@Slf4j
public class NettySocketEventHandler {@Autowiredprivate SocketIOServer socketIoServer;@Autowiredprivate SocketClientService socketClientService;@Value("${socketio.application.name}")private String serverName;@Value("${socketio.reg-server}")private String host;@Autowiredprivate NacosDiscoveryProperties nacosDiscoveryProperties;private void start() throws Exception {//注册到Nacos里registerNamingService(serverName, String.valueOf(socketIoServer.getConfiguration().getPort()));}/*** 注册到 nacos 服务中** @param nettyName netty服务名称* @param nettyPort netty服务端口*/private void registerNamingService(String nettyName, String nettyPort) {try {log.info("-------------- register socket server  {}  {}", nettyName, nettyPort);NamingService namingService = NamingFactory.createNamingService(nacosDiscoveryProperties.getServerAddr());// 注册到nacosInstance instance = new Instance();instance.setIp(host);instance.setPort(socketIoServer.getConfiguration().getPort());instance.setServiceName(nettyName);instance.setWeight(1.0);Map<String, String> map = new HashMap<>();map.put("preserved.register.source", "SPRING_CLOUD");instance.setMetadata(map);namingService.registerInstance(nettyName, instance);} catch (Exception e) {throw new RuntimeException(e);}}@PostConstructprivate void autoStartup() {try {socketIoServer.start();start();log.info("-------------- start socket server  ----------");} catch (Exception ex) {log.error("SocketIOServer启动失败", ex);}}@PreDestroyprivate void autoStop() {socketIoServer.stop();}//socket事件消息接收入口@OnEvent(value = MessageConstant.SOCKET_EVENT_NAME) //value值与前端自行商定public void onEvent(SocketIOClient client, AckRequest ackRequest, SendMessageDTO data) {
//            client.sendEvent("message_event", "已成功接收数据"); //向前端发送接收数据成功标识log.info("socket  event  {}", JSON.toJSONString(data));}//socket添加@OnDisconnect事件,客户端断开连接时调用,刷新客户端信息@OnDisconnectpublic void onDisconnect(SocketIOClient client) {String userId = client.getHandshakeData().getSingleUrlParam("userId");UUID sessionId = client.getSessionId();log.info("socket  Disconnect {} {}", userId, sessionId);socketClientService.deleteSessionClientByUserId(userId, sessionId);log.info("socket  Disconnect {} {}", userId, sessionId);client.disconnect();}//socket添加connect事件,当客户端发起连接时调用@OnConnectpublic void onConnect(SocketIOClient client) {
//        log.info("socket  onConnect  {}", JSON.toJSONString(client));if (client != null) {HandshakeData client_mac = client.getHandshakeData();String userId = client_mac.getSingleUrlParam("userId");// 处理业务} else {log.error("客户端为空");}}}

socket client service

@Component
public class SocketClientService {private static ConcurrentHashMap<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = new ConcurrentHashMap<>();/*** 保存客户端实例,发送消息时使用** @param userId         用户ID* @param sessionId      用户连接的session,可能存在多个页面连接* @param socketIOClient 客户的实例*/public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient) {HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId);if (sessionIdClientCache == null) {sessionIdClientCache = new HashMap<>();}sessionIdClientCache.put(sessionId, socketIOClient);concurrentHashMap.put(userId, sessionIdClientCache);}/*** 获取用户的客户端实例** @param userId 用户的ID* @return HashMap<UUID, SocketIOClient>*/public HashMap<UUID, SocketIOClient> getUserClient(String userId) {return concurrentHashMap.get(userId);}/*** 获取所有客户端,不区分用户** @return 集合*/public Collection<HashMap<UUID, SocketIOClient>> getAllClient() {return concurrentHashMap.values();}/*** 删除用户的某个页面的连接** @param userId    用户ID* @param sessionId 页面的sessionID*/public void deleteSessionClientByUserId(String userId, UUID sessionId) {if(concurrentHashMap.get(userId) != null){concurrentHashMap.get(userId).remove(sessionId);}}/*** 删除用户的所有连接的实例** @param userId 用户的ID*/public void deleteUserCacheByUserId(String userId) {concurrentHashMap.remove(userId);}
}

socket config

@Data
@Configuration
@ConfigurationProperties(prefix = "socketio")
public class SocketIOConfig {private String host;private Integer port;private int bossCount;private int workCount;private boolean allowCustomRequests;private int upgradeTimeout;private int pingTimeout;private int pingInterval;@Beanpublic SocketIOServer socketIOServer() {SocketConfig socketConfig = new SocketConfig();socketConfig.setTcpNoDelay(true);socketConfig.setSoLinger(0);com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();config.setSocketConfig(socketConfig);config.setHostname(host);config.setPort(port);config.setBossThreads(bossCount);config.setWorkerThreads(workCount);config.setAllowCustomRequests(allowCustomRequests);config.setUpgradeTimeout(upgradeTimeout);config.setPingTimeout(pingTimeout);config.setPingInterval(pingInterval);return new SocketIOServer(config);}@Beanpublic SpringAnnotationScanner springAnnotationScanner() {return new SpringAnnotationScanner(socketIOServer());}
}

nacos 里的网关的配置【关键:StripPrefix 需要是0,否则长连接,并不能连接上】

        # socket    - id: socket-serviceuri: lb://socket-servicepredicates:- Path=/socket.io/**filters:- StripPrefix=0

socket.io的配置

socketio:application: name: socket-servicereg-server: 127.0.0.1   host: 127.0.0.1port: 16001
# 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器maxFramePayloadLength: 1048576
# 设置http交互最大内容长度maxHttpContentLength: 1048576
# socket连接数大小(如只监听一个端口boss线程组为1即可)bossCount: 1workCount: 100allowCustomRequests: true
# 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间upgradeTimeout: 100000
# Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件pingTimeout: 6000000
# Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔pingInterval: 25000

三、问题

     1、socket.io与其它微服务在同一个web容器里,这时候是2个端口。所以socket.io另注册了一个服务名。

     2、解决分布式的问题。我是采用了后端增加消息中间件来分发。

有问题可以私信我。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/738217.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】InfiniBand驱动mlx4_register_interface函数

一、讲解 mlx4_register_interface函数是Mellanox InfiniBand驱动程序的一部分&#xff0c;这个函数的作用是注册一个新的接口(intf)到InfiniBand设备。这允许不同的子系统&#xff0c;如以太网或存储&#xff0c;能够在同一个硬件设备上注册它们各自需要的接口&#xff0c;在…

编程笔记 html5cssjs 008 HTML图片 名画欣赏

编程笔记 html5&css&js 008 HTML图片 名画欣赏 一、代码二、解释 这段HTML代码定义了一个网页&#xff0c;展示了名画欣赏的内容。主要包括页面的标题、样式定义和主体内容。其中&#xff0c;样式定义使用了CSS来控制页面的布局和外观&#xff0c;主体内容使用了结构化…

ASP.Net实现玩具管理(三层架构,两项数据相乘)

目录 演示功能&#xff1a; 点击启动生成页面 步骤&#xff1a; 1、建文件 ​编辑 2、添加引用关系 3、根据数据库中的列写Models下的XueshengModels类 4、DAL下的DBHelper&#xff08;对数据库进行操作&#xff09; 5、DAL数据访问层下的service文件 6、BLL业务逻辑层…

华为HCIE实验题库哪里有?Cloud相关证书咋样?

华为HCIE认证的含金量很高&#xff0c;这除了是因为华为自身的影响力之外&#xff0c;也是因为HCIE的考试难度大。 HCIE的实验考试长达八个小时&#xff0c;考的是实际操作和论述&#xff0c;要想拿下HCIE实验考试&#xff0c;不断练习是十分关键的。 而华为HCIE实验的题库哪…

通讯协议制定之交互方式、步骤介绍

文章目录 通讯协议制定之交互方式、步骤介绍1. 前言2. 通讯协议发送类型2.1 周期发送2.2 事件发送 3. 通讯协议数据包类型3.1 握手3.2 心跳3.1 数据包 4. 小结 通讯协议制定之交互方式、步骤介绍 1. 前言 通讯协议又称通信规程&#xff0c;是指通信双方对数据传送控制的一种约…

【Spring Boot 3】读取resource文件

【Spring Boot 3】读取resource文件 背景介绍开发环境开发步骤及源码工程目录结构总结背景 软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或…

如何部署Python Flask并实现远程访问本地搭建web站点【内网穿透】

文章目录 前言1. 安装部署Flask并制作SayHello问答界面2. 安装Cpolar内网穿透3. 配置Flask的问答界面公网访问地址4. 公网远程访问Flask的问答界面 前言 Flask是一个Python编写的Web微框架&#xff0c;让我们可以使用Python语言快速实现一个网站或Web服务&#xff0c;本期教程…

导出微软浏览器收藏的网页,并查看网页保存的登录密码

导出微软Edge浏览器收藏夹&#xff08;书签&#xff09;的步骤如下&#xff1a; 打开Microsoft Edge浏览器。右键点击浏览器收藏栏上的任意位置或使用快捷键Ctrl Shift O打开收藏夹管理页面。在收藏夹管理页面中&#xff0c;通常你会看到右上角或菜单区域有一个“…”或者三…

Axios中每次发送post请求前都会发送options请求

今天写前端的时候&#xff0c;发现每次post请求都会失败&#xff0c; 反复调试过后发现axios在每次发送post请求前都发送了options请求&#xff0c; 在网络搜罗了一大圈&#xff0c; 发现了原因是因为web页面发送了请求给vue后&#xff0c; vue再请求后端过程中发生了跨域&…

【算法】一维前缀和以及二维前缀和

目录 一维前缀和适用场景示例 二维前缀和适用场景一种情况另一种情况示例 一维前缀和 适用场景 求一段区间的和。 比如有一个数列 &#xff1a; 如果我们要求 [l,r]即某个区间内的数组和的时候&#xff0c;思路就是每遍历一个元素就进行求和&#xff0c;记录下加到al时的和…

Skywalking

1、简介 Skywalking是由国内开源爱好者吴晟开源并提交到Apache孵化器的开源项目&#xff0c; 2017年12月SkyWalking成为Apache国内首个个人孵化项目&#xff0c; 2019年4月17日SkyWalking从Apache基金会的孵化器毕业成为顶级项目&#xff0c; 目前SkyWalking支持Java、 .Net、 …

广告主投放系统从设计到实践

在当今数字广告行业中&#xff0c;广告主投放系统扮演着至关重要的角色。它是连接广告主和广告媒体之间的桥梁&#xff0c;帮助广告主实现广告投放目标并获得可观的回报。本篇博客文章将深入探讨广告主投放系统的设计和实践过程&#xff0c;并分享一些关键的经验和最佳实践。 …

flink的分组聚合、over聚合、窗口聚合对比

【背景】 flink有几种聚合&#xff0c;使用上是有一些不同&#xff0c;需要加以区分&#xff1a; 分组聚合&#xff1a;group agg over聚合&#xff1a;over agg 窗口聚合&#xff1a;window agg 省流版&#xff1a; 触发计算时机 结果流类型 状态大小 分组聚合group ag…

使用OpenCV实现两张图像融合在一起

简单介绍 图像融合技术是一种结合多个不同来源或不同传感器捕获的同一场景的图像数据&#xff0c;以生成一幅更全面、更高质量的单一图像的过程。这种技术广泛应用于遥感、医学影像分析、计算机视觉等多个领域。常见的图像融合技术包括基于像素级、特征级和决策级的融合方法&a…

基与HTML5的塔防游戏设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 项目背景与相关技术 3 1.1 背景与发展简介 3 1.2 HTML5技术及其优势 4 1.3 JavaScript开发的优势与劣势 4 1.4 CSS样式表在开发中的用处 5 1.5 本章小结 6 2 系统分析 7 2.1 需求分析 7 2.2 问题分析 7 2.3 流程设计 7 2.3 功能分析 8 2.…

数据分析入门,深入浅出的数据分析

时下正值大数据与人工智能高速发展的时刻&#xff0c;相信很多对数据分析感兴趣的朋友想要转行。很多朋友选择从事数据分析&#xff0c;主要是看到这个岗位的发展前景和薪资待遇。 但是一些小伙伴并不知道数据分析到底是做什么的&#xff1f;需要用到哪些知识&#xff1f; 为…

【Flink SQL】Flink SQL 基础概念:SQL Table 运行环境、基本概念及常用 API

Flink SQL 基础概念&#xff1a;SQL & Table 运行环境、基本概念及常用 API 1.SQL & Table 简介及运行环境1.1 简介1.2 SQL 和 Table API 运行环境依赖 2.SQL & Table 的基本概念及常用 API2.1 一个 SQL / Table API 任务的代码结构2.2 SQL 上下文&#xff1a;Tabl…

linux部署服务相关基础操作:磁盘挂载、jdk安装、docker安装、docker-compose环境安装、mysql、redis、jenkins等

磁盘挂载 1、运行mount查看数据盘挂载信息。返回结果中没有/dev/vdb1的信息。 2、运行fdisk -l查看数据盘分区信息。 3、格式化磁盘 mkfs -t ext4 /dev/vdb3.1、 (格式化后这一步跳过)运行cat /etc/fstab查看数据盘分区/dev/vdb1原有的挂载点名称。 4、运行mkdir /data重新…

[python3] 责任链模式

责任链模式&#xff08;Chain of Responsibility Pattern&#xff09;是一种行为设计模式&#xff0c;它允许多个对象都有机会处理请求&#xff0c;从而避免请求的发送者和接收者之间的耦合关系。请求沿着链传递&#xff0c;直到有一个对象处理它为止。 下面是一个简单的 Pyth…

Linux---多线程(上)

一、线程概念 线程是比进程更加轻量化的一种执行流 / 线程是在进程内部执行的一种执行流线程是CPU调度的基本单位&#xff0c;进程是承担系统资源的基本实体 在说线程之前我们来回顾一下进程的创建过程&#xff0c;如下图 那么以进程为参考&#xff0c;我们该如何去设计创建一个…