SpringBoot+Netty+Websocket实现消息推送

这样一个需求:把设备异常的状态每10秒推送到页面并且以弹窗弹出来,这个时候用Websocket最为合适,今天主要是后端代码展示。

添加依赖

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.36.Final</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

定义netty端口号

websocket:netty:port: 8888path: /websocket

netty服务器

@Slf4j
@Component
public class NettyServer {/*** netty服务端口号*/@Value("${websocket.netty.port}")private int port;/*** netty事件辅助组*/private EventLoopGroup bossGroup;/*** netty事件工作组*/private EventLoopGroup workGroup;/*** 管道配置*/private final CustomChannelInitializer channelInitializer;public NettyServer(CustomChannelInitializer channelInitializer) {this.channelInitializer = channelInitializer;}/*** netty服务初始化*/@PostConstructpublic void start() {new Thread(() -> {bossGroup = new NioEventLoopGroup();workGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();//bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作bootstrap.group(bossGroup, workGroup);//设置NIO类型的channelbootstrap.channel(NioServerSocketChannel.class);//设置监听端口bootstrap.localAddress(new InetSocketAddress(port));//设置管道bootstrap.childHandler(channelInitializer);try {ChannelFuture channelFuture = bootstrap.bind().sync();log.info("Netty服务启动成功,开启监听:{}", channelFuture.channel().localAddress());//对关闭通道进行监听channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("Netty服务启动失败!", e);throw new RuntimeException(e);}}).start();}}

Netty配置

管理全局Channel以及用户对应的channel(推送消息)

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;import java.util.concurrent.ConcurrentHashMap;/*** @version 1.0.0* @description 业务类*/
public class NettyConfig {/*** 定义全局单利channel组 管理所有channel*/private static volatile ChannelGroup channelGroup = null;/*** 存放请求ID与channel的对应关系*/private static volatile ConcurrentHashMap<String, Channel> channelMap = null;/*** 定义两把锁*/private static final Object lock1 = new Object();private static final Object lock2 = new Object();public static ChannelGroup getChannelGroup() {if (null == channelGroup) {synchronized (lock1) {if (null == channelGroup) {channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);}}}return channelGroup;}public static ConcurrentHashMap<String, Channel> getChannelMap() {if (null == channelMap) {synchronized (lock2) {if (null == channelMap) {channelMap = new ConcurrentHashMap<>();}}}return channelMap;}public static Channel getChannel(String userId) {if (null == channelMap) {return getChannelMap().get(userId);}return channelMap.get(userId);}
}

管道配置

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** @version 1.0.0* @description Netty管道配置类*/
@Component
public class CustomChannelInitializer extends ChannelInitializer<SocketChannel> {/*** webSocket协议名*/private static final String WEBSOCKET_PROTOCOL = "WebSocket";/*** websocket服务地址*/@Value("${websocket.path:/websocket}")private String websocketPath;private final CustomChannelHandler channelHandler;public CustomChannelInitializer(CustomChannelHandler channelHandler) {this.channelHandler = channelHandler;}@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 设置管道ChannelPipeline pipeline = socketChannel.pipeline();// 流水线管理通道中的处理程序(Handler),用来处理业务// webSocket协议本身是基于http协议的,所以这边也要使用http编解码器pipeline.addLast(new HttpServerCodec());pipeline.addLast(new ObjectEncoder());// 以块的方式来写的处理器pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new HttpObjectAggregator(8192));pipeline.addLast(new WebSocketServerProtocolHandler(websocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));// 自定义的handler,处理业务逻辑pipeline.addLast(channelHandler);}
}

自定义CustomChannelHandler

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.ruoyi.common.utils.StringUtils;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @version 1.0.0* @description Netty管道handler类*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class CustomChannelHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {/*** 一旦连接,第一个被执行*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());// 添加到channelGroup 通道组NettyConfig.getChannelGroup().add(ctx.channel());}/*** 读取数据*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {log.info("服务器收到消息:{}", msg.text());// 获取用户ID,关联channelJSONObject jsonObject = JSONUtil.parseObj(msg.text());String uid = jsonObject.getStr("uid");if(StringUtils.isNotEmpty(uid)){NettyConfig.getChannelMap().put(uid, ctx.channel());// 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户IDAttributeKey<String> key = AttributeKey.valueOf("userId");ctx.channel().attr(key).setIfAbsent(uid);// 回复消息ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));}}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("用户下线了:{}", ctx.channel().id().asLongText());// 删除通道NettyConfig.getChannelGroup().remove(ctx.channel());removeUserId(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("异常:{}", cause.getMessage());super.exceptionCaught(ctx,cause);// 删除通道NettyConfig.getChannelGroup().remove(ctx.channel());removeUserId(ctx);ctx.close();}/*** 删除用户与channel的对应关系*/private void removeUserId(ChannelHandlerContext ctx) {AttributeKey<String> key = AttributeKey.valueOf("userId");String userId = ctx.channel().attr(key).get();if(StringUtils.isNotEmpty(userId)){NettyConfig.getChannelMap().remove(userId);}}
}

推送消息接口及实现类

public interface PushMsgService {/*** 推送给指定用户*/void pushMsgToOne(String group, String msg);/*** 推送给所有用户*/void pushMsgToAll(String msg);
}

实现接口

@Service
public class PushMsgServiceImpl implements PushMsgService {@Overridepublic void pushMsgToOne(String group, String msg) {Channel channel = NettyConfig.getChannel(group);if (Objects.isNull(channel)) {throw new RuntimeException("未连接socket服务器");}channel.writeAndFlush(new TextWebSocketFrame(msg));}@Overridepublic void pushMsgToAll(String msg) {NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));}
}

具体的controller层接口

   /*** 获取弹框网关状态*/@GetMapping("/upKnxNetworkLink/{uid}")public void upKnxNetworkLink(@PathVariable String uid){KnxNetworkLinkInfo knxNetworkLinkInfo =new KnxNetworkLinkInfo();knxNetworkLinkInfo.setStatus("0");List<KnxNetworkLinkInfo>knxNetworkLinkInfoList=knxNetworkLinkInfoService.queryList(knxNetworkLinkInfo);JSONArray array= JSONArray.parseArray(JSON.toJSONString(knxNetworkLinkInfoList));pushMsgService.pushMsgToOne(uid,array.toJSONString());}

使用postman测试Websocket推送
在这里插入图片描述
连接Websocket
在这里插入图片描述
在开一个窗口测试发送消息的接口
在这里插入图片描述
发送过后在回到连接Websocket窗口
在这里插入图片描述
前端需要做一个定时访问发送消息的接口,每发一次就会往前端推送一次数据。
参考:Springboot + netty +websocket 实现推送消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/223506.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用Go汇编实现一个快速排序算法

本代码全网首发&#xff0c;使用Go plan9 windows arm64汇编&#xff0c;实现基础版快速排序算法。 未引入随机因子的快速排序的普通Go代码长这样。 func QuickSort(arr []int) {if len(arr) < 1 {return}base, l, r : arr[0], 0, len(arr)-1for i : 1; i < r; {if arr…

Vue H5项目,怎么引入uni.webview sdk,调用uni postMessage实现手机蓝牙连接打印功能(uniapp)

前言 目前公司Vue H5项目&#xff0c;用webview打包成APP&#xff0c;现产品提出这样打包出来的app运行较慢&#xff0c;需要用uniapp方式&#xff08;即使用HBuilder编辑器来打包H5&#xff09;来打包&#xff0c;那需要的基座就不是安卓的基座而是uniapp的基座&#xff0c;而…

扭矩法、屈服点法哪个比较高效?——SunTorque智能扭矩系统

在机械制造和维修领域&#xff0c;拧紧螺栓和螺母是一项重要的操作。拧紧方法的合理选择和使用&#xff0c;对于确保机械设备的稳定性和安全性具有至关重要的作用。本文SunTorque智能扭矩系统将介绍两种最常用的拧紧方法&#xff0c;并探讨它们的轴力范围计算方法。一、扭矩法 …

电信网关配置管理系统后台 upload.php 文件上传漏洞复现

0x01 产品简介 中国电信集团有限公司(英文名称“China Telecom”、简称“中国电信”)成立于2000年9月,是中国特大型国有通信企业、上海世博会全球合作伙伴。 0x02 漏洞概述 电信网关配置管理系统后台 /manager/teletext/material/upload.php 接口存在文件上传漏洞,攻击者…

【数电笔记】56-消抖开关

目录 说明&#xff1a; 1. 按键抖动形成的原因 2. 按键消抖的方法 3. 用与非RS触发器构成消抖开关&#xff08;硬件消抖&#xff09; 说明&#xff1a; 笔记配套视频来源&#xff1a;B站本系列笔记并未记录所有章节&#xff0c;只对个人认为重要章节做了笔记&#xff1b;标…

【稳定检索|投稿优惠】2024年艺术鉴赏与社会科学教育国际会议(ICAASSE 2024)

2024年艺术鉴赏与社会科学教育国际会议(ICAASSE 2024) 2024 International Conference on Art Appreciation and Social Science Education(ICAASSE) 一、【会议简介】 2024年艺术鉴赏与社会科学教育国际会议(ICAASSE 2024)&#xff0c;这场学术盛宴&#xff0c;将于2024年2月1…

郝斌C语言自学教程笔记

赫斌C语言——笔记目录 c语言编程预备知识流程控制函数变量指针结构体位运算符 前段时间康哥看我C语言基础不牢,推荐我学习郝斌老师的C语言课程&#xff0c;花2周看完之后发现确实是目前所看的C语言课程中最好的&#xff0c;不仅非常适合入门&#xff0c;而且对即使学了几年C语…

怒斥以色列后突发心脏病倒地,土耳其议员抢救无效身亡!

这两天互联网上热传一段视频&#xff0c;说的就是土耳其议员在议会演讲时突然倒地晕厥&#xff0c;两天后就去世了。这可真是让人震惊啊&#xff01; 据说这位议员是土耳其反对党幸福党的&#xff0c;名字叫比特梅兹。他在议会发表批评以色列的言论时&#xff0c;情绪过于激动…

安装2023最新版Java SE 21.0.1来开发Java应用程序

安装2023最新版Java SE 21.0.1来开发Java应用程序 Install the latest version of Java SE 21.01 to Develop Java Applications By JacksonML 本文简要介绍如何下载和安装2023年最新版Java Development Kit (简称JDK&#xff0c;即Java开发工具包标准版&#xff09;21.0.1&…

长尾问题之LDAM

做法&代码&公式 step1: 全连接层的权重W和特征向量X都归一化,相乘 W * X P (得到各个类别的概率) # 定义权重&#xff0c;初始化 weight nn.Parameter(torch.FloatTensor(num_classes, num_features)) weight.data.uniform_(-1, 1).renorm_(2, 1, 1e-5).mul_(1e5)#…

Java 线程的基本概念

创建和运行线程 方法一&#xff0c;直接使用 Thread // 创建线程对象 Thread t new Thread() {public void run() {// 要执行的任务}};// 启动线程 t.start();例如&#xff1a; // 构造方法的参数是给线程指定名字&#xff0c;推荐 Thread t1 new Thread("t1") …

网络安全——SQL注入实验

一、实验目的要求&#xff1a; 二、实验设备与环境&#xff1a; 三、实验原理&#xff1a; 四、实验步骤&#xff1a; 五、实验现象、结果记录及整理&#xff1a; 六、分析讨论与思考题解答&#xff1a; 七、实验截图&#xff1a; 一、实验目的要求&#xff1a; 1、…

《Cadence 16.6电路设计与仿真从入门到精通》——1.4 Cadence SPB 16.6的启动

《Cadence 16.6电路设计与仿真从入门到精通》——1.4 Cadence SPB 16.6的启动  2017-05-027334 版权 简介: 本节书摘来自异步社区《Cadence 16.6电路设计与仿真从入门到精通》一书中的第1章,第1.4节,作者: 王超 , 胡仁喜等 更多章节内容可以访问云栖社区“异步社区”公…

《PCL多线程加速处理》-滤波-统计滤波

《PCL多线程加速处理》-滤波-统计滤波 一、效果展示二、实现方式三、代码一、效果展示 提升速度随着点云越多效果越明显 二、实现方式 1、原始的统计滤波实现方式 #include <pcl/filters/statistical_outlier_removal.h>pcl::PointCloud<pcl::PointXYZ

使用 Python 使用贝叶斯神经网络从理论到实践

一、说明 在本文中&#xff0c;我们了解了如何构建一个机器学习模型&#xff0c;该模型结合了神经网络的强大功能&#xff0c;并且仍然保持概率方法进行预测。为了做到这一点&#xff0c;我们可以构建所谓的贝叶斯神经网络。 这个想法不是优化神经网络的损失&#xff0…

MySQL如何进行Sql优化

&#xff08;1&#xff09;客户端发送一条查询语句到服务器&#xff1b; &#xff08;2&#xff09;服务器先查询缓存&#xff0c;如果命中缓存&#xff0c;则立即返回存储在缓存中的数据&#xff1b; &#xff08;3&#xff09;未命中缓存后&#xff0c;MySQL通过关键字将SQ…

基于CNN+数据增强+残差网络Resnet50的少样本高准确度猫咪种类识别—深度学习算法应用(含全部工程源码)+数据集+模型(三)

系列文章目录 基于CNN数据增强残差网络Resnet50的少样本高准确度猫咪种类识别—深度学习算法应用(含全部工程源码)数据集模型&#xff08;一&#xff09; 基于CNN数据增强残差网络Resnet50的少样本高准确度猫咪种类识别—深度学习算法应用(含全部工程源码)数据集模型&#xf…

cytoscapejs获取被点击节点位置,并在该节点附近进行双击展示弹窗

获取节点位置 event.target.renderedPosition()其中event是cytoscapejs监听事件中自带的参数 实现 HTML <div id"cy" style"width: 100%; height: 550px; position: relative"><div id"pop-window">进入详情页</div></d…

day33-37-SpringBootV12(整合Spring,SpringMVC,Mybatis,日志,api测试等框架)

ssm spring --> applicationContext.xml配置文件 springmvc --> springmvc.xml配置文件 mybatis —> mybatis-config.xml配置文件 —> springboot优化了之前的框架配置,思想是约定大于配置 一、引言 1.1 初始化配置 为了使用SSM框架去开发&#xff0c;准备SSM…

UDP报文格式详解

✏️✏️✏️各位看官好&#xff0c;今天给大家分享的是 传输层的另外一个重点协议——UDP。 清风的CSDN博客 &#x1f6e9;️&#x1f6e9;️&#x1f6e9;️希望我的文章能对你有所帮助&#xff0c;有不足的地方还请各位看官多多指教&#xff0c;大家一起学习交流&#xff0…