Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制

Netty

  • 自定义消息协议的实现逻辑
    • 自定义编码器
  • 心跳机制
    • 实现客户端发送心跳包

自定义消息协议的实现逻辑

在这里插入图片描述
消息协议:这一次消息需要包含两个部分,即消息长度和消息内容本身。
自定义消息编码器︰消息编码器将客户端发送的消息转换成遵守消息协议的消息,即包含消息长度和消息内容的消息
自定义消息解码器∶消息解码器根据消息协议的消息长度,来获得指定长度的消息内容。

自定义编码器

自定义消息协议:

//自定义消息协议
public class MessageProtocal {//消息的长度private int length;//消息的内容private byte[] content;public int getLength() {return length;}public void setLength(int length) {this.length = length;}public byte[] getContent() {return content;}public void setContent(byte[] content) {this.content = content;}
}

客户端基本代码

public class NettyClient {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup(1);Bootstrap bootstrap = new Bootstrap();//设置相关的参数bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//添加处理器,分包编码器pipeline.addLast(new MessageEncoder());//添加具体的业务处理器pipeline.addLast(new NettyMessageClientHandler());}});System.out.println("客户端启动了");ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();channelFuture.channel().closeFuture().sync();group.shutdownGracefully();}
}

客户端业务代码

public class NettyMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocal> {//连接通道创建后要向服务端发送消息@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {for(int i=0;i<200;i++){String msg = "西安科技大学";//创建消息协议对象MessageProtocal messageProtocal = new MessageProtocal();messageProtocal.setLength(msg.getBytes(StandardCharsets.UTF_8).length);messageProtocal.setContent(msg.getBytes(StandardCharsets.UTF_8));//发送协议对象,注意此时ctx只能发送Bytebuf数据,因此需要用编码器把它编码成Bytebuf数据ctx.writeAndFlush(messageProtocal);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {}
}

自定义编码器

public class MessageEncoder extends MessageToByteEncoder<MessageProtocal> {@Overrideprotected void encode(ChannelHandlerContext ctx, MessageProtocal msg, ByteBuf out) throws Exception {out.writeInt(msg.getLength());out.writeBytes(msg.getContent());}
}

服务端基本代码

public class NettyServer {public static void main(String[] args) throws Exception {EventLoopGroup boosGroup = new NioEventLoopGroup(1);EventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(boosGroup,workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//添加解码器pipeline.addLast(new MessageDecoder());pipeline.addLast(new NettyMessageServerHandler());}});System.out.println("Netty的服务端启动了");ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();channelFuture.channel().closeFuture().sync();boosGroup.shutdownGracefully();workGroup.shutdownGracefully();}
}

自定义解码器

//自定义解码器代码
public class MessageDecoder extends ByteToMessageDecoder {int length = 0;//ctx//in:客户端发送来的MessageProtocol编码后的ByteBuf数据//out:out里的数据会被放行到下一个handler把解码出来的MessageProtocol放到out里面@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("ByteBuf:"+in);//获得前面的4个字节的数据 == 描述实际内容的长度if(in.readableBytes()>=4){//ByteBuf里面可能有MessageProtocol数据if(length==0){length = in.readInt();}//length = 15if(in.readableBytes()<length){//说明数据还没到齐,等待下一次调用decodeSystem.out.println("当前数据量不够,继续等待");return;}//可读数据量>=length ==> 意味着这一次的MessageProtocol的内容已经到齐了//创建了一个指定length长度的字节数组byte[] content = new byte[length];//把ByteBuf里面的指定长度的数据读到content数组中in.readBytes(content);//创建协议MessageProtocol对象赋值MessageProtocal messageProtocal = new MessageProtocal();messageProtocal.setLength(length);messageProtocal.setContent(content);out.add(messageProtocal);length=0;}}
}

服务端业务处理代码

public class NettyMessageServerHandler extends SimpleChannelInboundHandler<MessageProtocal> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {System.out.println("---服务器收到的数据---");System.out.println("消息的长度:"+msg.getLength());System.out.println("消息的内容:"+new String(msg.getContent(), StandardCharsets.UTF_8));}
}

运行结果:
在这里插入图片描述


心跳机制

在分布式系统中,心跳机制常常在注册中心组件中提及,比如Zookeeper、Eureka、Nacos等,通过维护客户端的心跳,来判断客户端是否正常在线。如果客户端达到超时次数等预设的条件时,服务端将释放客户端的连接资源。
试想一下,当我们一个用来写数据的通道,它虽然没有下线,但这个通道长时间都不写数据了,是不是我们可以利用心跳机制,关闭此类通道及其对应的客户端

在这里插入图片描述

实现客户端发送心跳包

客户端基本代码

public class NettyClient {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup(1);Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioServerSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//添加编解码器pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyClientHandler());}});System.out.println("客户端启动了");ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9090).sync();//模拟向服务端发送心跳数据String packet = "heartbeat packet";Random random = new Random();Channel channel = channelFuture.channel();while (channel.isActive()){//随机的事件来实现时间间隔等待int num = random.nextInt(10);Thread.sleep(num*1000);channel.writeAndFlush(packet);}group.shutdownGracefully();}
}

客户端拦截器

public class NettyClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println("客户端收到的数据"+s);}
}

在这里插入图片描述
IdleStateHandler类描述三种空闲状态
读空闲:在指定时间间隔内没有从Channel中读到数据,将会创建状态为READER_IDLE的IdleStateEvent对象。
写空闲︰在指定时间间隔内没有数据写入到Channel中,将会创建状态为WRITER_IDLE的ldleStateEvent对象。
读写空闲:在指定时间间隔内Channel中没有发生读写操作,将会创建状态为ALL_IDLE的ldleStateEvent对象。

服务端基本代码

public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringEncoder());pipeline.addLast(new StringDecoder());//超时状态处理器会在服务端发现有超过3秒没有没有发生读操作的话会触发超时事件//创建出IdleStateEvent对象,将该对象交给下一个Handlerpipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));//HeartbeatServerHandler必领重写userEventTriggered方法,用来做具体的超时的业务处理pipeline.addLast(new HeartbeatServerHandler());}});System.out.println("Netty服务端启动了");ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();channelFuture.channel().closeFuture().sync();bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}
}

服务端业务代码

public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {int readIdleTimes = 0;@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println("服务端收到的心跳"+s);channelHandlerContext.writeAndFlush("服务端已经收到了心跳");}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent)evt;switch (event.state()){case READER_IDLE:readIdleTimes++;break;case WRITER_IDLE:System.out.println("写超时");break;case ALL_IDLE:System.out.println("读写超时");break;}if(readIdleTimes>3){System.out.println("读超时超过三次,关闭连接");ctx.writeAndFlush("超时关闭");ctx.channel().close();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/19041.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux第一个小程序-进度条(缓冲区概念)

1.\r和\n C语言中有很多字符 a.可显字符 b.控制字符 对于回车其实有两个动作&#xff0c;首先换行&#xff0c;在将光标指向最左侧 \r &#xff1a;回车 \n&#xff1a;换行 下面举个例子&#xff1a; 把\n去掉会怎样 什么都没输出。为什么&#xff1f; 2.缓冲区概念 观察下两个…

网工内推 | 网络安全工程师,最高15K,有高温补贴

01 超圣信华 招聘岗位&#xff1a;网络安全工程师 职责描述&#xff1a; 1. 负责网络安全产品的售前沟通交流、现状调研、方案设计、产品测试、产品选型和招投标等工作。 2. 负责网络安全集成项目的实施管理、项目交付文档编制以及项目验收等工作。 3. 负责网络安全产品的售后…

在云服务器上,clone github时报Connection timed outexit code: 128

文章目录 问题解决方案 问题 在执行pip install安装依赖时&#xff0c;需要clone github代码&#xff0c;此时报了Connection timed out&exit code: 128错误&#xff0c;原因是访问超时了&#xff0c;此时需要使用代理 fatal: unable to access https://github.com/hugg…

【MATLAB第62期】基于MATLAB的PSO-NN、BBO-NN、前馈神经网络NN回归预测对比

【MATLAB第62期】基于MATLAB的PSO-NN、BBO-NN、前馈神经网络NN回归预测对比 一、数据设置 1、7输入1输出 2、103行样本 3、80个训练样本&#xff0c;23个测试样本 二、效果展示 NN训练集数据的R2为&#xff1a;0.73013 NN测试集数据的R2为&#xff1a;0.23848 NN训练集数据的…

解决Mysql报错2003 (HY000): Can‘t connect to MySQL server on ‘localhost‘ (10061)

1.找到mysql文件夹&#xff0c;将my,ini文件放入bin文件夹 2.管理员模式打开cmd 3.输入netstat -ano查看端口占用情况&#xff0c;这里我已经开启mysql应用&#xff0c;所以会有3306&#xff0c;如果没有开启是不会有的 4.输入sc delete mysql&#xff0c;删除mysql服务 5.将…

接口自动化报告,生成本地服务并自动打开时失败

错误原因&#xff1a; 端口号被占用 首先可以在cmd中调出命令窗口然后执行命令netstat -ano就可以查看所有活动的链接&#xff0c;找到被占用的端口号 1、通过命令taskkill /f /t /im "进程名称" &#xff0c;根据进程的名称杀掉所有的进程。或者taskkill /f /t /p…

嵌入式开发的学习内容和技能包括:

. 熟悉C语言编程 掌握基础电子知识&#xff0c;如数字电路、模拟电路和单片机 .熟练掌握嵌入式操作系统的原理、内核架构和应用&#xff0c;如Linux、RTOS等 了解各种外设接口及其驱动程序开发&#xff0c;如SPI、I2C、USART等 熟悉常用的嵌入式开发工具和软件工程流程&#…

如何通过 5 步激活策略扩大用户群

假设&#xff0c;你现在是一个“深藏功与名”的增长黑客。前期你表现非常好&#xff0c;做了一些拉新实验&#xff0c;每天都有上千用户进入到产品。团队成员和家人朋友都非常开心你们的产品增长终于有了起色。 然而&#xff0c;如果你不重视拉新&#xff08;acquisition&…

ES6系列之let、const、箭头函数使用的坑

变量提升块级作用域的重要性箭头函数this的指向rest参数和arguments 1.ECMAScript与Js的关系 2.Babel转码器 Babel是一个广泛使用的ES6转码器&#xff0c;可以将ES6代码转为ES5代码&#xff0c;从而在老版本的浏览器执行。这意味着&#xff0c;你可以用ES6的方式编写程序&…

【微服务】springboot整合redis哨兵集群使用详解

目录 一、前言 二、环境准备 三、安装redis 3.1 前置准备 3.1.1 下载安装包 3.1.2 准备依赖环境 3.1.3 上传并解压包 3.2 执行安装 四、搭建redis主从集群 4.1 环境准备 4.2 搭建过程 4.2.1 创建实例文件目录 4.2.2 修改redis.conf配置文件 4.2.3 拷贝配置文件 4…

喜讯! WorkPlus入选中国信通院数字产品“2023全景图”!

“2023数字生态发展大会”暨中国信通院“铸基计划” WorkPlus喜讯 7月27日&#xff0c;中国信息通信研究院&#xff08;下称“中国信通院”&#xff09;主办的“2023数字生态发展大会”暨中国信通院“铸基计划”年中会议在京召开&#xff0c;大会全面地总结了“铸基计划”上半…

Linux系统安装部署MongoDB完整教程(图文详解)

前言&#xff1a;本期给大家分享一下目前最新Linux系统安装部署MongoDB完整教程&#xff0c;我的服务器采用的是Centos7&#xff0c;在部署之前我重装了我的服务器&#xff0c;目的是为了干净整洁的给大家演示我是如何一步步的操作的&#xff0c;整体部署还是挺简洁&#xff0c…

JavaScript 手撕大厂面试题数组扁平化以及增加版本 plus

前言 现在的前端面试手撕题是一个必要环节&#xff0c;有点时候八股回答的不错但是手撕题没写出来就会让面试官印象分大减&#xff0c;很可能就挂了… 概念 数组的扁平化其实就是将一个多层嵌套的数组转换为只有一层的数组 比如&#xff1a; [1, [2, [3, [4, 5]]]] > [1…

机器学习十大经典算法

机器学习算法是计算机科学和人工智能领域的关键组成部分&#xff0c;它们用于从数据中学习模式并作出预测或做出决策。本文将为大家介绍十大经典机器学习算法&#xff0c;其中包括了线性回归、逻辑回归、支持向量机、朴素贝叶斯、决策树等算法&#xff0c;每种算法都在特定的领…

Docker 安装 MySQL

目录 一、查看 MySQL 版本 二、拉取 MySQL 镜像 三、查看本地镜像 四、运行容器 五、停止和启动容器 六、列出正在运行的容器 七、进入容器 八、登录MySQL 一、查看 MySQL 版本 访问 MySQL 镜像库地址&#xff1a;https://hub.docker.com/_/mysql?tabtags 。 可以通…

ODIN_1靶机详解

ODIN_1靶机复盘 下载地址&#xff1a;https: //download.vulnhub.com/odin/odin.ova 靶场很简单&#xff0c;一会儿就打完了。 靶场说明里提醒说加一个dns解析。 我们在/etc/hosts加一条解析 就能正常打开网站了&#xff0c;要么网站打开css是乱的。 这里看到结尾就猜测肯定…

2023年性价比电脑硬件主机推荐|电脑党必备硬件选购攻略

在自主搭建电脑变得流行且显卡价格飙升的这个时代&#xff0c;我想给大家推荐一款特别的产品——NUC&#xff08;Next Unit of Computing&#xff09;。 NUC是Intel所推出的一种「ITX台式机」&#xff0c;截止目前已经迭代了很多型号&#xff0c;比如之前我买过的猛兽峡谷&…

贝锐蒲公英:没有公网IP,多分支企业如何高效远程访问OA系统?

贝锐蒲公英&#xff1a;没有公网IP&#xff0c;多分支企业、移动办公人员如何高效远程访问OA系统&#xff1f; 国内某大型美妆公司&#xff0c;旗下产品覆盖美容护肤品、彩妆、美容仪器、健康食品、SPA美容会所及等多类服务&#xff0c;致力于为客户提供高品质的产品和完善的服…

2.6 伽马校正 一、Gamma校正

一、Gamma校正 颜色空间 通用&#xff1a;sRGB 电影&#xff1a;DCI-P3 电视&#xff1a;Rec-709、PAL等 印刷&#xff1a;CMYK、Adobe RGB 传递函数 我们知道了颜色的颜色值&#xff0c;要在电子设备上显示&#xff0c;就要把它转换为视频信号&#xff0c;传递函数就是用…

SOLIDWORKS中的弹簧设计指南

SOLIDWORKS是一款广泛使用的三维计算机辅助设计软件&#xff0c;可以用于设计各种机械零件和组件&#xff0c;包括弹簧。在SOLIDWORKS中设计弹簧需要注意一些关键点&#xff0c;本文将为您介绍SOLIDWORKS中的弹簧设计指南。 1. 弹簧类型 按受力性质&#xff0c;弹簧类型包括压…