SpringBoot集成netty实现websocket通信

实现推送消息给指定的用户

一、依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>netty</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.87.Final</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.6.1</version></dependency></dependencies>
</project>

二、属性文件和启动类

server:port: 8088

三、Controller接口

@RestController
@RequestMapping("/push")
public class TestController{@AutowiredPushMsgService pushMsgService
}

四、PushMsgService

public interface PushMsgService{//推送给指定用户void pushMsgToOne(String userId,String msg);//推送给所有用户void pushMsgToAll(String msg);
}
@Service
public class PushMsgServiceImpl implements PushMsgService{@Overridepublic void pushMsgToOne(String userId, String msg){Channel channel = NettyConfig.getChannel(userId);if (Objects.isNull(channel)) {throw new RuntimeException("未连接socket服务器");}channel.writeAndFlush(new TextWebSocketFrame(msg));}@Overridepublic void pushMsgToAll(String msg){NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));}
}

五、NettyConfig

public class NettyConfig{//定义全局channel,管理所有的channelprivate static volatile ChannelGroup channelGroup = null;//存放请求ID与channel的对应关系private static volatile ConcurrentHashMap<String, Channel> channelMap = null;//定义两把锁private static final Object lock1 = new Object();private static final Object lock2 = new Object();public static ChannelGroup getChannelGroup() {if (null == channelGroup) {synchronized (lock1) {if (null == channelGroup) {channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);}}}return channelGroup;}public static ConcurrentHashMap<String, Channel> getChannelMap() {if (null == channelMap) {synchronized (lock2) {if (null == channelMap) {channelMap = new ConcurrentHashMap<>();}}}return channelMap;}public static Channel getChannel(String userId) {if (null == channelMap) {return getChannelMap().get(userId);}return channelMap.get(userId);}
}

六、netty server

@Component
public class NettyServer {static final Logger log = LoggerFactory.getLogger(NettyServer.class);/*** 端口号*/@Value("${webSocket.netty.port:8889}")int port;EventLoopGroup bossGroup;EventLoopGroup workGroup;@AutowiredProjectInitializer nettyInitializer;@PostConstructpublic void start() throws InterruptedException {new Thread(() -> {bossGroup = new NioEventLoopGroup();workGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();// bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作bootstrap.group(bossGroup, workGroup);// 设置NIO类型的channelbootstrap.channel(NioServerSocketChannel.class);// 设置监听端口bootstrap.localAddress(new InetSocketAddress(port));// 设置管道bootstrap.childHandler(nettyInitializer);// 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功ChannelFuture channelFuture = null;try {channelFuture = bootstrap.bind().sync();log.info("Server started and listen on:{}", channelFuture.channel().localAddress());// 对关闭通道进行监听channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}).start();}/*** 释放资源*/@PreDestroypublic void destroy() throws InterruptedException {if (bossGroup != null) {bossGroup.shutdownGracefully().sync();}if (workGroup != null) {workGroup.shutdownGracefully().sync();}}
}

七、ProjectInitializer初始化,设置websocket handler

@Component
public class ProjectInitializer extends ChannelInitializer<SocketChannel> {/*** webSocket协议名*/static final String WEBSOCKET_PROTOCOL = "WebSocket";/*** webSocket路径*/@Value("${webSocket.netty.path:/webSocket}")String webSocketPath;@AutowiredWebSocketHandler webSocketHandler;@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 设置管道ChannelPipeline pipeline = socketChannel.pipeline();// 流水线管理通道中的处理程序(Handler),用来处理业务// webSocket协议本身是基于http协议的,所以这边也要使用http编解码器pipeline.addLast(new HttpServerCodec());pipeline.addLast(new ObjectEncoder());// 以块的方式来写的处理器pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new HttpObjectAggregator(8192));pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));// 自定义的handler,处理业务逻辑pipeline.addLast(webSocketHandler);}
}

八、WebSocketHandler

@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {private static final Logger log = LoggerFactory.getLogger(NettyServer.class);/*** 一旦连接,第一个被执行*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());// 添加到channelGroup 通道组NettyConfig.getChannelGroup().add(ctx.channel());}/*** 读取数据*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {log.info("服务器收到消息:{}", msg.text());// 获取用户ID,关联channelJSONObject jsonObject = JSONUtil.parseObj(msg.text());String uid = jsonObject.getStr("uid");NettyConfig.getChannelMap().put(uid, ctx.channel());// 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户IDAttributeKey<String> key = AttributeKey.valueOf("userId");ctx.channel().attr(key).setIfAbsent(uid);// 回复消息ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("用户下线了:{}", ctx.channel().id().asLongText());// 删除通道NettyConfig.getChannelGroup().remove(ctx.channel());removeUserId(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("异常:{}", cause.getMessage());// 删除通道NettyConfig.getChannelGroup().remove(ctx.channel());removeUserId(ctx);ctx.close();}/*** 删除用户与channel的对应关系*/private void removeUserId(ChannelHandlerContext ctx) {AttributeKey<String> key = AttributeKey.valueOf("userId");String userId = ctx.channel().attr(key).get();NettyConfig.getChannelMap().remove(userId);}
}

测试:

postman创建websocket连接 ws://127.0.0.1:8889/webSocket,并发送消息{‘uid’:‘sss’}给服务端
在这里插入图片描述
打开浏览器,给用户sss推送消息 http://127.0.0.1:8088/push/sss
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/741962.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

武汉儿童医院变电所电力运维平台系统的设计及应用

彭姝麟 Acrelpsl 1 引言 2015年国务院发布《中共中央、国务院关于进一步深化电力体制改革的若干意见》&#xff08;中发[2015]9号&#xff09;&#xff0c;简称“电改9号文”。而本次新电改的重点是“三放开一独立三强化”&#xff1a;输配以外的经营性电价放开、售电业务放开…

法规解读 | 坚持总体国家安全观,新修订的《保守国家秘密法》今年5月1日起施行!

2024年2月27日&#xff0c;第十四届全国人大常委会第八次会议表决通过新修订的《中华人民共和国保守国家秘密法》&#xff08;以下简称保密法&#xff09;&#xff0c;自2024年5月1日起施行。 本次保密法修订坚持总体国家安全观&#xff0c;统筹发展与安全。 一方面吸收了一些工…

政务云安全风险分析与解决思路探讨

1.1概述 为了掌握某市政务网站的网络安全整体情况&#xff0c;在相关监管机构授权后&#xff0c;我们组织人员抽取了某市78个政务网站进行安全扫描&#xff0c;通过安全扫描&#xff0c;对该市政务网站的整体安全情况进行预估。 1.2工具扫描结果 本次利用漏洞扫描服务VSS共扫…

app逆向-ratel框架-sekiro框架的安装使用

文章目录 一、前言二、初次尝试三、原⽣APP的使⽤四、ratel框架结合sekiro框架使用 一、前言 sekiro主要支持多节点的程序调用&#xff0c;所以他归属于RPC&#xff08;Remote Procedure Call&#xff09;框架&#xff1a;API管理、鉴权、分布式、负载均衡、跨语言 开源文档&…

如何在群晖NAS部署WPS容器并实现无公网IP远程访问本地office软件

文章目录 1. 拉取WPS Office镜像2. 运行WPS Office镜像容器3. 本地访问WPS Office4. 群晖安装Cpolar5. 配置WPS Office远程地址6. 远程访问WPS Office小结 7. 固定公网地址 wps-office是一个在Linux服务器上部署WPS Office的镜像。它基于WPS Office的Linux版本&#xff0c;通过…

【C语言】指针详解2

&#x1f451;个人主页&#xff1a;啊Q闻 &#x1f387;收录专栏&#xff1a;《C语言》 &#x1f389;道阻且长&#xff0c;行则将至 前言 这篇博客分享的指针部分为与数组有关的指针知识&#xff0c;包括一位数组和二维数组 指针详解1的博客 【C语言】指针…

算法思想总结:双指针算法

一、移动零 . - 力扣&#xff08;LeetCode&#xff09; 移动零 该题重要信息&#xff1a;1、保持非0元素的相对位置。2、原地对数组进行操作 思路&#xff1a;双指针算法 class Solution { public:void moveZeroes(vector<int>& nums){int nnums.size();for(int cur…

【Linux】Shell编程【二】

目录 Shell流程控制条件测试注意事项示例[ condition ]与[[ condition ]]的区别 if条件单分支语法示例1&#xff1a;统计根分区使用率示例2&#xff1a;创建目录 双分支if条件语句语法案例1&#xff1a;备份mysql数据库案例2&#xff1a;判断apache是否启动&#xff0c;如果没有…

网络学习:9个计算机的“网络层”知识点

目录 一、IP 地址 1.1 分类表示法&#xff1a; 1.1.1 分类表示地址的其他说明 1.2 无分类编址 CIDR 二、IP 数据报文格式 Q: IP 报文里有什么&#xff1f;可以不按顺序或者字节来讲一讲 三、 路由概念 3.1 路由表 3.2 路由网络匹配 3.3 ARP 解析 3.4 RARP 逆地址解析…

考试题库:华为HCIA-Datacom易错题⑦(含答案解析)

华为认证HCIA-Datacom易错题举例和答案分析。 1、现有一台交换机通过某端口与一个指定端口相连&#xff0c;但是该端口不转发任何报文&#xff0c;却可以通过接收BPDU来监听网络变化&#xff0c;那么该端口的角色应该是&#xff08; &#xff09;。 A、Designated端口 B、Al…

分布式搜索elasticsearch(1)

1.初识elasticsearch 1.1.了解ES 1.1.1.elasticsearch的作用 elasticsearch是一款非常强大的开源搜索引擎&#xff0c;具备非常多强大功能&#xff0c;可以帮助我们从海量数据中快速找到需要的内容 例如&#xff1a; 在GitHub搜索代码 在电商网站搜索商品 在百度搜索答案…

c++函数SetConsoleTextAttribute

前言 正文 1.作用&#xff1a; 2.函数格式(重点)&#xff1a; 3.参数(重点)&#xff1a; 前言 实用(真的) 正文 1.作用&#xff1a; 更改cmd的背景色与字体颜色 2.函数格式(重点)&#xff1a; SetConsoleTextAttribute(GetStdHandle(STD_OUTPUT_HANDLE),10进制参数); …

第14届环境与农业工程国际会议(ICEAE 2024)即将召开!

2024年第14届环境与农业工程国际会议&#xff08;ICEAE 2024&#xff09;将于6月7日至9日在泰国曼谷召开。本次会议旨在促进环境与农业工程的研究和开发活动&#xff0c;共同探讨领域内最新以及具有根本性的进展突破。热忱欢迎从事相关领域研究的专家&#xff0c;学者和专业技术…

iStoreOS系统内安装HomeAssistant服务

iStoreOS系统内安装HomeAssistant服务 1. HomeAssistant服务 HomeAssistant是一款基于Python的开源智能家居系统&#xff0c;简称HA。 HomeAssistant可以方便地连接各种外部设备&#xff0c;如智能设备、摄像头、邮件、短消息和云服务等&#xff0c;其成熟的可连接组件有近千…

【Twinmotion】Twinmotion导入UE5

步骤 1. 在虚幻商城中安装“Datasmith Twinmotion导入器插件” 安装“面向虚幻引擎的Twinmotion内容” 2. 打开虚幻引擎&#xff0c;在插件中搜索“twinmotion”&#xff0c;勾选如下两个插件&#xff0c;然后重启虚幻引擎 3. 打开Twinmotion&#xff0c;随便添加一个物体 导出…

【阿里云系列】-ACK的Java应用POD无法访问云数据库Redis

问题介绍 如下图所示&#xff0c;是ACK集群的POD访问阿里云的云数据库Redis&#xff0c;如何实现访问呢 配置步骤 要实现ACK集群内的所有POD都可以访问云数据库Redis&#xff0c;则需要在Redsi的白名单里增加源IP或网段&#xff0c;如下图所示 注意&#xff1a; 以上添加…

网络套接字-UDP服务器

一 预备知识 1 端口号和进程id 主机间的数据传输本质是两个进程在通信&#xff0c;就像是我们打开抖音刷视频&#xff0c;视频不是都保存在手机上的&#xff0c;而是服务器发送给你的&#xff0c;这里就是用到了网络。 那如何保证把数据给指定进程呢? 就是用端口号去标识主机中…

Pytorch学习 day13(完整的模型训练步骤)

步骤一&#xff1a;定义神经网络结构 注意&#xff1a;由于一次batch_size的大小为64&#xff0c;表示一次放入64张图片&#xff0c;且Flatten()只会对单张图片的全部通道做拉直操作&#xff0c;也就是不会将batch_size合并&#xff0c;但是一张图片有3个通道&#xff0c;在Ma…

YOLOv9改进项目|关于本周更新计划的说明24/3/12

目前售价售价59.9&#xff0c;改进点30个 专栏地址&#xff1a; 专栏介绍&#xff1a;YOLOv9改进系列 | 包含深度学习最新创新&#xff0c;主力高效涨点&#xff01;&#xff01;&#xff01; 日期&#xff1a;24/3/12 本周更新计划说明&#xff1a; 1. 更新华为Gold YOLO中的…

读西游记第一回:西游记世界格局

天地之数&#xff1a; 元&#xff1a;十二万九千六百岁&#xff08;129600年&#xff09; 1元12会&#xff1a;子、丑、寅、卯、巳、午、未、申、酉、戌、亥。每会18000年。与12地支对应。 亥会期&#xff1a;前5400年混沌期&#xff0c;后5400年&#xff0c;盘古开天辟地&am…