1. 概述
1.1 粘包
发送 abc def,接收 abcdef
原因
- 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
- Nagle 算法:会造成粘包
1.2 半包
现象,发送 abcdef,接收 abc def
原因
- 应用层:接收方 ByteBuf 小于实际发送数据量
- MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包
- 滑动窗口: 发送方维护一个发送窗口,而接收方维护一个接收窗口。发送窗口的大小取决于接收方通知的窗口大小,而接收窗口的大小取决于系统资源和当前状态。
- Nagle 算法: 发送一个字节,也需要加入 tcp 头和 ip 头,也就是总字节数会使用 41 bytes,非常不经济。因此为了提高网络利用率,tcp 希望尽可能发送足够大的数据,这就是 Nagle 算法产生的缘由; 该算法是指发送端即使还有应该发送的数据,但如果这部分数据很少的话,则进行延迟发送
MSS 是最大段长度(maximum segment size),它是 MTU 刨去 tcp 头和 ip 头后剩余能够作为数据传输的字节数,链路层对一次能够发送的最大数据有限制,这个限制称之为 MTU(maximum transmission unit),不同的链路设备的 MTU 值也有所不同, MSS 的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为 MSS
1.3 解决方案
- 短链接 发送完报文 就断开 然后重连 在发, 缺点是性能不好
- 每一条消息采用固定长度,缺点是浪费空间
- 每一条消息采用分隔符,例如 \n,缺点是不是分隔符的\n字符需要转义
- 每一条消息发消息长度+消息,根据消息中的长度读取后面的消息
2 代码实例
2.1 粘包/半包实例
2.1.1 服务器
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class FrameDecoderServer {void start() {// 创建两个EventLoopGroup,一个用于接收连接(boss),一个用于处理连接(worker)NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {// 配置服务器引导程序ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class); // 指定使用的通道类型serverBootstrap.group(boss, worker); // 关联EventLoopGroup// 配置子通道处理器serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 添加处理日志的处理器ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));// 添加自定义的处理连接激活和断开的处理器ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});// 添加字符串解码器ch.pipeline().addLast(new StringDecoder());// 添加处理实际业务逻辑的处理器ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("Client received: " + msg);}});}});// 绑定端口并等待服务器启动ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());// 等待服务器关闭channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {// 关闭EventLoopGroupboss.shutdownGracefully();worker.shutdownGracefully();log.debug("stopped");}}/*** 程序入口点。* 创建FrameDecoderServer实例并启动服务器。** @param args 命令行参数*/public static void main(String[] args) {new FrameDecoderServer().start();}
}
2.1.2 粘包客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class FrameDecoderClient {public static void main(String[] args) {// 创建一个 NIO 事件循环组,用于处理客户端的 I/O 事件。NioEventLoopGroup worker = new NioEventLoopGroup();try {// 配置客户端启动器,设置通道类型、事件循环组和处理器。Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class); // 指定使用 NIO Socket 通道。bootstrap.group(worker); // 设置事件循环组。bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connected...");// 添加自定义的入站处理程序,负责在通道激活时发送数据。ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {/*** 当通道变得可写时,发送数据到服务器。* @param ctx 通道上下文,用于分配缓冲区和写入数据。* @throws Exception 如果操作过程中发生异常。*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 循环10次,发送相同的数据。for (int i = 0; i < 10; i++) {// 分配一个缓冲区,并写入固定长度的数据。ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes("11111111".getBytes());// 写入缓冲区并刷新通道,确保数据被发送。ctx.writeAndFlush(buffer);}}});}});// 连接到服务器并等待连接完成。ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();// 等待通道关闭。channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {// 记录中断异常。log.error("client error", e);} finally {// 关闭事件循环组,释放资源。worker.shutdownGracefully();}}
}
运行截图:
2.1.2 半包客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class FrameDecoderClient {public static void main(String[] args) {// 创建一个 NIO 事件循环组,用于处理客户端的 I/O 事件。NioEventLoopGroup worker = new NioEventLoopGroup();try {// 配置客户端启动器,设置通道类型、事件循环组和处理器。Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class); // 指定使用 NIO Socket 通道。bootstrap.group(worker); // 设置事件循环组。bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connected...");// 添加自定义的入站处理程序,负责在通道激活时发送数据。ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {/*** 当通道变得可写时,发送数据到服务器。* @param ctx 通道上下文,用于分配缓冲区和写入数据。* @throws Exception 如果操作过程中发生异常。*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 循环10次,发送相同的数据。for (int i = 0; i < 1; i++) {// 分配一个缓冲区,并写入固定长度的数据。ByteBuf buffer = ctx.alloc().buffer();StringBuilder str = new StringBuilder();for (int j = 0; j < 10000; j++) {str.append("1");}buffer.writeBytes(str.toString().getBytes());// 写入缓冲区并刷新通道,确保数据被发送。ctx.writeAndFlush(buffer);}}});}});// 连接到服务器并等待连接完成。ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();// 等待通道关闭。channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {// 记录中断异常。log.error("client error", e);} finally {// 关闭事件循环组,释放资源。worker.shutdownGracefully();}}
}
3. 解决方案
3.1 固定长度
每一条消息采用固定长度,缺点是浪费空间
3.1.1 服务端代码
ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
固定长度8字节
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class FixedLengthFrameDecoderServer {void start() {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});ch.pipeline().addLast(new FixedLengthFrameDecoder(8));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("Client received: " + msg);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug("stoped");}}public static void main(String[] args) {new FixedLengthFrameDecoderServer().start();}}
3.1.2 客户端
客户端代码同2.1.2
3.1.3
运行截图:
3.1.3 FixedLengthFrameDecoder 源码
FixedLengthFrameDecoder decode 解码时,每次读取固定长度
protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {if (in.readableBytes() < frameLength) {return null;} else {return in.readRetainedSlice(frameLength);}}
看看怎么调用的
io.netty.handler.codec.ByteToMessageDecoder#channelRead
callDecode(ctx, cumulation, out);
io.netty.handler.codec.ByteToMessageDecoder#callDecode// 读取所有数据
while (in.isReadable())// 可读数据小于定长 break
if (outSize == out.size()) {if (oldInputLength == in.readableBytes()) {break;}
io.netty.handler.codec.ByteToMessageDecoder#decodeRemovalReentryProtection
调用
io.netty.handler.codec.FixedLengthFrameDecoder#decode`
3.2 分隔符
每一条消息采用分隔符,例如 \n,缺点是不是分隔符的\n字符需要转义
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
3.2.1 服务端代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class LineBasedFrameDecoderServer {void start() {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("Client received: " + msg);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug("stoped");}}public static void main(String[] args) {new LineBasedFrameDecoderServer().start();}}
3.2.2 客户端代码
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class FrameDecoderClient {public static void main(String[] args) {// 创建一个 NIO 事件循环组,用于处理客户端的 I/O 事件。NioEventLoopGroup worker = new NioEventLoopGroup();try {// 配置客户端启动器,设置通道类型、事件循环组和处理器。Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class); // 指定使用 NIO Socket 通道。bootstrap.group(worker); // 设置事件循环组。bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connected...");// 添加自定义的入站处理程序,负责在通道激活时发送数据。ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {/*** 当通道变得可写时,发送数据到服务器。* @param ctx 通道上下文,用于分配缓冲区和写入数据。* @throws Exception 如果操作过程中发生异常。*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 循环10次,发送相同的数据。for (int i = 0; i < 10; i++) {// 分配一个缓冲区,并写入固定长度的数据。ByteBuf buffer = ctx.alloc().buffer();StringBuilder str = new StringBuilder();for (int j = 0; j < 9; j++) {str.append("1");}str.append("\n");buffer.writeBytes(str.toString().getBytes());// 写入缓冲区并刷新通道,确保数据被发送。ctx.writeAndFlush(buffer);}}});}});// 连接到服务器并等待连接完成。ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();// 等待通道关闭。channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {// 记录中断异常。log.error("client error", e);} finally {// 关闭事件循环组,释放资源。worker.shutdownGracefully();}}
}
3.2.3 运行截图
3.2.4 LineBasedFrameDecoder#decode
ByteProcessor.FIND_LF 就是 ‘\n’
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {final int eol = findEndOfLine(buffer);if (!discarding) {if (eol >= 0) {final ByteBuf frame;final int length = eol - buffer.readerIndex();final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;if (length > maxLength) {buffer.readerIndex(eol + delimLength);fail(ctx, length);return null;}if (stripDelimiter) {frame = buffer.readRetainedSlice(length);buffer.skipBytes(delimLength);} else {frame = buffer.readRetainedSlice(length + delimLength);}return frame;} else {final int length = buffer.readableBytes();if (length > maxLength) {discardedBytes = length;buffer.readerIndex(buffer.writerIndex());discarding = true;offset = 0;if (failFast) {fail(ctx, "over " + discardedBytes);}}return null;}} else {if (eol >= 0) {final int length = discardedBytes + eol - buffer.readerIndex();final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;buffer.readerIndex(eol + delimLength);discardedBytes = 0;discarding = false;if (!failFast) {fail(ctx, length);}} else {discardedBytes += buffer.readableBytes();buffer.readerIndex(buffer.writerIndex());// We skip everything in the buffer, we need to set the offset to 0 again.offset = 0;}return null;}}private void fail(final ChannelHandlerContext ctx, int length) {fail(ctx, String.valueOf(length));}private void fail(final ChannelHandlerContext ctx, String length) {ctx.fireExceptionCaught(new TooLongFrameException("frame length (" + length + ") exceeds the allowed maximum (" + maxLength + ')'));}private int findEndOfLine(final ByteBuf buffer) {int totalLength = buffer.readableBytes();int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF);if (i >= 0) {offset = 0;if (i > 0 && buffer.getByte(i - 1) == '\r') {i--;}} else {offset = totalLength;}return i;}
3.3 长度域解码器
每一条消息发消息长度+消息,根据消息中的长度读取后面的消息
// 添加基于长度字段的帧解码器,用于解码客户端发送的数据// 解码器配置:最大帧长度1000,长度字段起始位置0,长度字段长度2字节,不进行长度调整,跳过2个字节ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1000, 0, 2, 0, 2));
https://www.cnblogs.com/motianlong/p/14465098.html
3.3.1 服务端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
/*** 该类实现了基于长度字段的帧解码器服务器。*/
public class LengthFieldBasedFrameDecoderServer {/*** 启动服务器。*/void start() {// 创建Boss线程组和Worker线程组NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {// 配置服务器引导程序ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 添加日志处理器ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));// 添加自定义的ChannelInboundHandlerAdapter,处理连接激活和不活跃事件ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});// 添加基于长度字段的帧解码器,用于解码客户端发送的数据// 解码器配置:最大帧长度1000,长度字段起始位置0,长度字段长度2字节,不进行长度调整,跳过2个字节ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1000, 0, 2, 0, 2));// 添加字符串解码器,将字节转换为字符串ch.pipeline().addLast(new StringDecoder());// 添加自定义的ChannelInboundHandler,处理读取到的字符串消息ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("Client received: " + msg);}});}});// 绑定端口并等待连接ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());// 等待服务器关闭channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {// 关闭线程组boss.shutdownGracefully();worker.shutdownGracefully();log.debug("stopped");}}/*** 程序入口。* @param args 命令行参数*/public static void main(String[] args) {new LengthFieldBasedFrameDecoderServer().start();}}
3.3.2 客户端代码
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class FrameDecoderClient {public static void main(String[] args) {// 创建一个 NIO 事件循环组,用于处理客户端的 I/O 事件。NioEventLoopGroup worker = new NioEventLoopGroup();try {// 配置客户端启动器,设置通道类型、事件循环组和处理器。Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class); // 指定使用 NIO Socket 通道。bootstrap.group(worker); // 设置事件循环组。bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connected...");// 添加自定义的入站处理程序,负责在通道激活时发送数据。ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {/*** 当通道变得可写时,发送数据到服务器。* @param ctx 通道上下文,用于分配缓冲区和写入数据。* @throws Exception 如果操作过程中发生异常。*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 循环10次,发送相同的数据。for (int i = 0; i < 10; i++) {// 分配一个缓冲区,并写入固定长度的数据。ByteBuf buffer = ctx.alloc().buffer();StringBuilder str = new StringBuilder();for (int j = 0; j < 9; j++) {str.append("1");}buffer.writeShort(9);buffer.writeBytes(str.toString().getBytes());// 写入缓冲区并刷新通道,确保数据被发送。ctx.writeAndFlush(buffer);}}});}});// 连接到服务器并等待连接完成。ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();// 等待通道关闭。channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {// 记录中断异常。log.error("client error", e);} finally {// 关闭事件循环组,释放资源。worker.shutdownGracefully();}}
}