Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制

Netty

  • 自定义消息协议的实现逻辑
    • 自定义编码器
  • 心跳机制
    • 实现客户端发送心跳包

自定义消息协议的实现逻辑

在这里插入图片描述
消息协议:这一次消息需要包含两个部分,即消息长度和消息内容本身。
自定义消息编码器︰消息编码器将客户端发送的消息转换成遵守消息协议的消息,即包含消息长度和消息内容的消息
自定义消息解码器∶消息解码器根据消息协议的消息长度,来获得指定长度的消息内容。

自定义编码器

自定义消息协议:

//自定义消息协议
public class MessageProtocal {//消息的长度private int length;//消息的内容private byte[] content;public int getLength() {return length;}public void setLength(int length) {this.length = length;}public byte[] getContent() {return content;}public void setContent(byte[] content) {this.content = content;}
}

客户端基本代码

public class NettyClient {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup(1);Bootstrap bootstrap = new Bootstrap();//设置相关的参数bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//添加处理器,分包编码器pipeline.addLast(new MessageEncoder());//添加具体的业务处理器pipeline.addLast(new NettyMessageClientHandler());}});System.out.println("客户端启动了");ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();channelFuture.channel().closeFuture().sync();group.shutdownGracefully();}
}

客户端业务代码

public class NettyMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocal> {//连接通道创建后要向服务端发送消息@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {for(int i=0;i<200;i++){String msg = "西安科技大学";//创建消息协议对象MessageProtocal messageProtocal = new MessageProtocal();messageProtocal.setLength(msg.getBytes(StandardCharsets.UTF_8).length);messageProtocal.setContent(msg.getBytes(StandardCharsets.UTF_8));//发送协议对象,注意此时ctx只能发送Bytebuf数据,因此需要用编码器把它编码成Bytebuf数据ctx.writeAndFlush(messageProtocal);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {}
}

自定义编码器

public class MessageEncoder extends MessageToByteEncoder<MessageProtocal> {@Overrideprotected void encode(ChannelHandlerContext ctx, MessageProtocal msg, ByteBuf out) throws Exception {out.writeInt(msg.getLength());out.writeBytes(msg.getContent());}
}

服务端基本代码

public class NettyServer {public static void main(String[] args) throws Exception {EventLoopGroup boosGroup = new NioEventLoopGroup(1);EventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(boosGroup,workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//添加解码器pipeline.addLast(new MessageDecoder());pipeline.addLast(new NettyMessageServerHandler());}});System.out.println("Netty的服务端启动了");ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();channelFuture.channel().closeFuture().sync();boosGroup.shutdownGracefully();workGroup.shutdownGracefully();}
}

自定义解码器

//自定义解码器代码
public class MessageDecoder extends ByteToMessageDecoder {int length = 0;//ctx//in:客户端发送来的MessageProtocol编码后的ByteBuf数据//out:out里的数据会被放行到下一个handler把解码出来的MessageProtocol放到out里面@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("ByteBuf:"+in);//获得前面的4个字节的数据 == 描述实际内容的长度if(in.readableBytes()>=4){//ByteBuf里面可能有MessageProtocol数据if(length==0){length = in.readInt();}//length = 15if(in.readableBytes()<length){//说明数据还没到齐,等待下一次调用decodeSystem.out.println("当前数据量不够,继续等待");return;}//可读数据量>=length ==> 意味着这一次的MessageProtocol的内容已经到齐了//创建了一个指定length长度的字节数组byte[] content = new byte[length];//把ByteBuf里面的指定长度的数据读到content数组中in.readBytes(content);//创建协议MessageProtocol对象赋值MessageProtocal messageProtocal = new MessageProtocal();messageProtocal.setLength(length);messageProtocal.setContent(content);out.add(messageProtocal);length=0;}}
}

服务端业务处理代码

public class NettyMessageServerHandler extends SimpleChannelInboundHandler<MessageProtocal> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {System.out.println("---服务器收到的数据---");System.out.println("消息的长度:"+msg.getLength());System.out.println("消息的内容:"+new String(msg.getContent(), StandardCharsets.UTF_8));}
}

运行结果:
在这里插入图片描述


心跳机制

在分布式系统中,心跳机制常常在注册中心组件中提及,比如Zookeeper、Eureka、Nacos等,通过维护客户端的心跳,来判断客户端是否正常在线。如果客户端达到超时次数等预设的条件时,服务端将释放客户端的连接资源。
试想一下,当我们一个用来写数据的通道,它虽然没有下线,但这个通道长时间都不写数据了,是不是我们可以利用心跳机制,关闭此类通道及其对应的客户端

在这里插入图片描述

实现客户端发送心跳包

客户端基本代码

public class NettyClient {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup(1);Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioServerSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//添加编解码器pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyClientHandler());}});System.out.println("客户端启动了");ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9090).sync();//模拟向服务端发送心跳数据String packet = "heartbeat packet";Random random = new Random();Channel channel = channelFuture.channel();while (channel.isActive()){//随机的事件来实现时间间隔等待int num = random.nextInt(10);Thread.sleep(num*1000);channel.writeAndFlush(packet);}group.shutdownGracefully();}
}

客户端拦截器

public class NettyClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println("客户端收到的数据"+s);}
}

在这里插入图片描述
IdleStateHandler类描述三种空闲状态
读空闲:在指定时间间隔内没有从Channel中读到数据,将会创建状态为READER_IDLE的IdleStateEvent对象。
写空闲︰在指定时间间隔内没有数据写入到Channel中,将会创建状态为WRITER_IDLE的ldleStateEvent对象。
读写空闲:在指定时间间隔内Channel中没有发生读写操作,将会创建状态为ALL_IDLE的ldleStateEvent对象。

服务端基本代码

public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringEncoder());pipeline.addLast(new StringDecoder());//超时状态处理器会在服务端发现有超过3秒没有没有发生读操作的话会触发超时事件//创建出IdleStateEvent对象,将该对象交给下一个Handlerpipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));//HeartbeatServerHandler必领重写userEventTriggered方法,用来做具体的超时的业务处理pipeline.addLast(new HeartbeatServerHandler());}});System.out.println("Netty服务端启动了");ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();channelFuture.channel().closeFuture().sync();bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}
}

服务端业务代码

public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {int readIdleTimes = 0;@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println("服务端收到的心跳"+s);channelHandlerContext.writeAndFlush("服务端已经收到了心跳");}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent)evt;switch (event.state()){case READER_IDLE:readIdleTimes++;break;case WRITER_IDLE:System.out.println("写超时");break;case ALL_IDLE:System.out.println("读写超时");break;}if(readIdleTimes>3){System.out.println("读超时超过三次,关闭连接");ctx.writeAndFlush("超时关闭");ctx.channel().close();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/19041.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pandas调整文件列顺序

使用loc索引器&#xff0c;可以传入一个列序列表给loc索引器来重新排列列顺序。例如&#xff1a;dfdf[[col3,col2,col1]]&#xff0c;这将col3列置于第一列&#xff0c;col2列置于第二列&#xff0c;col1列置于第三列。使用loc整数位置选择器&#xff0c;dfdf.iloc[:,[2,1,0]]使…

spring boot 拦截器例子

在Spring Boot中&#xff0c;拦截器是通过实现HandlerInterceptor接口来实现的。它允许你在请求到达控制器方法之前和之后执行自定义的逻辑。下面我将为你提供一个简单的Spring Boot拦截器的例子。 假设我们有一个简单的控制器类UserController&#xff0c;其中有两个请求处理…

Linux第一个小程序-进度条(缓冲区概念)

1.\r和\n C语言中有很多字符 a.可显字符 b.控制字符 对于回车其实有两个动作&#xff0c;首先换行&#xff0c;在将光标指向最左侧 \r &#xff1a;回车 \n&#xff1a;换行 下面举个例子&#xff1a; 把\n去掉会怎样 什么都没输出。为什么&#xff1f; 2.缓冲区概念 观察下两个…

微信小程序性能优化

一、提高小程序速度 优化小程序的速度是最基本的需求之一&#xff0c;因为流畅的使用体验对于用户来说非常重要。可以采取以下措施来提高小程序的速度&#xff1a; 压缩代码 编写高效的代码是提高小程序速度的关键之一&#xff0c;开发者可以使用一些工具来对代码进行压缩&…

网工内推 | 网络安全工程师,最高15K,有高温补贴

01 超圣信华 招聘岗位&#xff1a;网络安全工程师 职责描述&#xff1a; 1. 负责网络安全产品的售前沟通交流、现状调研、方案设计、产品测试、产品选型和招投标等工作。 2. 负责网络安全集成项目的实施管理、项目交付文档编制以及项目验收等工作。 3. 负责网络安全产品的售后…

Vue中对对象内容调用的Demo

目录 1.对象作为数据&#xff1a; 2.对象数组 在Vue中&#xff0c;你可以通过对象的键来调用对象中的各个部分的内容。下面是一些使用Vue调用对象各部分内容的示例&#xff1a; 1.对象作为数据&#xff1a; 如果你在Vue实例的数据中有一个对象&#xff0c;你可以使用点语法来…

C#中i++和++i的底层原理

一&#xff1a;前言 我们都知道&#xff0c;i是先取值&#xff0c;后计算。i是先计算&#xff0c;后取值。下面说下它的底层原理 运算符优先级与运算顺序&#xff1a; 运算符的优先级只是影响了表达式中的结合顺序&#xff0c;不会影响运算顺序&#xff0c;运算顺序永远都是从…

在云服务器上,clone github时报Connection timed outexit code: 128

文章目录 问题解决方案 问题 在执行pip install安装依赖时&#xff0c;需要clone github代码&#xff0c;此时报了Connection timed out&exit code: 128错误&#xff0c;原因是访问超时了&#xff0c;此时需要使用代理 fatal: unable to access https://github.com/hugg…

【MATLAB第62期】基于MATLAB的PSO-NN、BBO-NN、前馈神经网络NN回归预测对比

【MATLAB第62期】基于MATLAB的PSO-NN、BBO-NN、前馈神经网络NN回归预测对比 一、数据设置 1、7输入1输出 2、103行样本 3、80个训练样本&#xff0c;23个测试样本 二、效果展示 NN训练集数据的R2为&#xff1a;0.73013 NN测试集数据的R2为&#xff1a;0.23848 NN训练集数据的…

Python tqdm的两种用法【教程】

Python tqdm的两种用法 本文记录一下在学习深度强化学习过程中遇到tqdm库显示进度条的用法&#xff0c;以供大家交流。 注意本文使用的tqdm均是使用的tqdm库中的同名tqdm方法&#xff0c;应该按照如下方式导入 from tqdm import tqdmCatologue Python tqdm的两种用法1. 基于可…

在Spring Boot框架中使用拦截器实现URL限制

限制URL列表的JSON格式可以根据您的需求进行定义。以下是一个示例&#xff1a; { "restrictions": [ { "url": "/api/endpoint1", "params": { "param1": "value1", "param2": "value2" } },…

RUST 有哪些整型?

在Rust中&#xff0c;有以下几种整型数据类型&#xff1a; i8 &#xff1a;有符号8位整型&#xff0c;取值范围为-128到127。u8 &#xff1a;无符号8位整型&#xff0c;取值范围为0到255。i16 &#xff1a;有符号16位整型&#xff0c;取值范围为-32768到32767。u16 &#xff1…

CUDA并行编程

并行编程 参考 1. pthread 求素数 // PrimesThreads.c // PrimesThreads.c// threads-based program to find the number of primes between 2 and n; // uses the Sieve of Eratosthenes, deleting all multiples of 2, all // multiples of 3, all multiples of 5, etc./…

淘宝开放平台API接口用法介绍

淘宝是中国最大的电子商务平台之一&#xff0c;其开放平台API接口为开发者提供了强大的数据支持。在本篇文章中&#xff0c;我们将从多个方面对淘宝开放平台API接口进行详细阐述。 一、API概述 淘宝开放平台提供了丰富的API接口&#xff0c;涵盖了商品、店铺、交易、物流、用…

使用Update修改不报错但是修改不成功

使用Update修改不报错但是修改不成功 sql执行后 后台日志说影响行数为0。然后发现是id不存在。 因为项目中使用自动生成ID&#xff0c;使用的是雪花算法&#xff0c;长度超过了前端js可处理长度&#xff0c;所以当后台id传到前台后会丢失精度&#xff0c;导致无法匹配到要修改…

解决Mysql报错2003 (HY000): Can‘t connect to MySQL server on ‘localhost‘ (10061)

1.找到mysql文件夹&#xff0c;将my,ini文件放入bin文件夹 2.管理员模式打开cmd 3.输入netstat -ano查看端口占用情况&#xff0c;这里我已经开启mysql应用&#xff0c;所以会有3306&#xff0c;如果没有开启是不会有的 4.输入sc delete mysql&#xff0c;删除mysql服务 5.将…

接口自动化报告,生成本地服务并自动打开时失败

错误原因&#xff1a; 端口号被占用 首先可以在cmd中调出命令窗口然后执行命令netstat -ano就可以查看所有活动的链接&#xff0c;找到被占用的端口号 1、通过命令taskkill /f /t /im "进程名称" &#xff0c;根据进程的名称杀掉所有的进程。或者taskkill /f /t /p…

嵌入式开发的学习内容和技能包括:

. 熟悉C语言编程 掌握基础电子知识&#xff0c;如数字电路、模拟电路和单片机 .熟练掌握嵌入式操作系统的原理、内核架构和应用&#xff0c;如Linux、RTOS等 了解各种外设接口及其驱动程序开发&#xff0c;如SPI、I2C、USART等 熟悉常用的嵌入式开发工具和软件工程流程&#…

如何通过 5 步激活策略扩大用户群

假设&#xff0c;你现在是一个“深藏功与名”的增长黑客。前期你表现非常好&#xff0c;做了一些拉新实验&#xff0c;每天都有上千用户进入到产品。团队成员和家人朋友都非常开心你们的产品增长终于有了起色。 然而&#xff0c;如果你不重视拉新&#xff08;acquisition&…

ES6系列之let、const、箭头函数使用的坑

变量提升块级作用域的重要性箭头函数this的指向rest参数和arguments 1.ECMAScript与Js的关系 2.Babel转码器 Babel是一个广泛使用的ES6转码器&#xff0c;可以将ES6代码转为ES5代码&#xff0c;从而在老版本的浏览器执行。这意味着&#xff0c;你可以用ES6的方式编写程序&…