Netty
- 自定义消息协议的实现逻辑
- 自定义编码器
- 心跳机制
- 实现客户端发送心跳包
自定义消息协议的实现逻辑
消息协议:这一次消息需要包含两个部分,即消息长度和消息内容本身。
自定义消息编码器︰消息编码器将客户端发送的消息转换成遵守消息协议的消息,即包含消息长度和消息内容的消息
自定义消息解码器∶消息解码器根据消息协议的消息长度,来获得指定长度的消息内容。
自定义编码器
自定义消息协议:
//自定义消息协议
public class MessageProtocal {//消息的长度private int length;//消息的内容private byte[] content;public int getLength() {return length;}public void setLength(int length) {this.length = length;}public byte[] getContent() {return content;}public void setContent(byte[] content) {this.content = content;}
}
客户端基本代码
public class NettyClient {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup(1);Bootstrap bootstrap = new Bootstrap();//设置相关的参数bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//添加处理器,分包编码器pipeline.addLast(new MessageEncoder());//添加具体的业务处理器pipeline.addLast(new NettyMessageClientHandler());}});System.out.println("客户端启动了");ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();channelFuture.channel().closeFuture().sync();group.shutdownGracefully();}
}
客户端业务代码
public class NettyMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocal> {//连接通道创建后要向服务端发送消息@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {for(int i=0;i<200;i++){String msg = "西安科技大学";//创建消息协议对象MessageProtocal messageProtocal = new MessageProtocal();messageProtocal.setLength(msg.getBytes(StandardCharsets.UTF_8).length);messageProtocal.setContent(msg.getBytes(StandardCharsets.UTF_8));//发送协议对象,注意此时ctx只能发送Bytebuf数据,因此需要用编码器把它编码成Bytebuf数据ctx.writeAndFlush(messageProtocal);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {}
}
自定义编码器
public class MessageEncoder extends MessageToByteEncoder<MessageProtocal> {@Overrideprotected void encode(ChannelHandlerContext ctx, MessageProtocal msg, ByteBuf out) throws Exception {out.writeInt(msg.getLength());out.writeBytes(msg.getContent());}
}
服务端基本代码
public class NettyServer {public static void main(String[] args) throws Exception {EventLoopGroup boosGroup = new NioEventLoopGroup(1);EventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(boosGroup,workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//添加解码器pipeline.addLast(new MessageDecoder());pipeline.addLast(new NettyMessageServerHandler());}});System.out.println("Netty的服务端启动了");ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();channelFuture.channel().closeFuture().sync();boosGroup.shutdownGracefully();workGroup.shutdownGracefully();}
}
自定义解码器
//自定义解码器代码
public class MessageDecoder extends ByteToMessageDecoder {int length = 0;//ctx//in:客户端发送来的MessageProtocol编码后的ByteBuf数据//out:out里的数据会被放行到下一个handler把解码出来的MessageProtocol放到out里面@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("ByteBuf:"+in);//获得前面的4个字节的数据 == 描述实际内容的长度if(in.readableBytes()>=4){//ByteBuf里面可能有MessageProtocol数据if(length==0){length = in.readInt();}//length = 15if(in.readableBytes()<length){//说明数据还没到齐,等待下一次调用decodeSystem.out.println("当前数据量不够,继续等待");return;}//可读数据量>=length ==> 意味着这一次的MessageProtocol的内容已经到齐了//创建了一个指定length长度的字节数组byte[] content = new byte[length];//把ByteBuf里面的指定长度的数据读到content数组中in.readBytes(content);//创建协议MessageProtocol对象赋值MessageProtocal messageProtocal = new MessageProtocal();messageProtocal.setLength(length);messageProtocal.setContent(content);out.add(messageProtocal);length=0;}}
}
服务端业务处理代码
public class NettyMessageServerHandler extends SimpleChannelInboundHandler<MessageProtocal> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {System.out.println("---服务器收到的数据---");System.out.println("消息的长度:"+msg.getLength());System.out.println("消息的内容:"+new String(msg.getContent(), StandardCharsets.UTF_8));}
}
运行结果:
心跳机制
在分布式系统中,心跳机制常常在注册中心组件中提及,比如Zookeeper、Eureka、Nacos等,通过维护客户端的心跳,来判断客户端是否正常在线。如果客户端达到超时次数等预设的条件时,服务端将释放客户端的连接资源。
试想一下,当我们一个用来写数据的通道,它虽然没有下线,但这个通道长时间都不写数据了,是不是我们可以利用心跳机制,关闭此类通道及其对应的客户端
实现客户端发送心跳包
客户端基本代码
public class NettyClient {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup(1);Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioServerSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//添加编解码器pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyClientHandler());}});System.out.println("客户端启动了");ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9090).sync();//模拟向服务端发送心跳数据String packet = "heartbeat packet";Random random = new Random();Channel channel = channelFuture.channel();while (channel.isActive()){//随机的事件来实现时间间隔等待int num = random.nextInt(10);Thread.sleep(num*1000);channel.writeAndFlush(packet);}group.shutdownGracefully();}
}
客户端拦截器
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println("客户端收到的数据"+s);}
}
IdleStateHandler类描述三种空闲状态
读空闲:在指定时间间隔内没有从Channel中读到数据,将会创建状态为READER_IDLE的IdleStateEvent对象。
写空闲︰在指定时间间隔内没有数据写入到Channel中,将会创建状态为WRITER_IDLE的ldleStateEvent对象。
读写空闲:在指定时间间隔内Channel中没有发生读写操作,将会创建状态为ALL_IDLE的ldleStateEvent对象。
服务端基本代码
public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringEncoder());pipeline.addLast(new StringDecoder());//超时状态处理器会在服务端发现有超过3秒没有没有发生读操作的话会触发超时事件//创建出IdleStateEvent对象,将该对象交给下一个Handlerpipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));//HeartbeatServerHandler必领重写userEventTriggered方法,用来做具体的超时的业务处理pipeline.addLast(new HeartbeatServerHandler());}});System.out.println("Netty服务端启动了");ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();channelFuture.channel().closeFuture().sync();bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}
}
服务端业务代码
public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {int readIdleTimes = 0;@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println("服务端收到的心跳"+s);channelHandlerContext.writeAndFlush("服务端已经收到了心跳");}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent)evt;switch (event.state()){case READER_IDLE:readIdleTimes++;break;case WRITER_IDLE:System.out.println("写超时");break;case ALL_IDLE:System.out.println("读写超时");break;}if(readIdleTimes>3){System.out.println("读超时超过三次,关闭连接");ctx.writeAndFlush("超时关闭");ctx.channel().close();}}
}