之前我们介绍了用特殊分隔符来分割每个报文,但是如果传输的数据中恰好有个特殊分隔符,它将会被拆分成多个,于是,为了进一步避免这个问题,还有一种解决方案是在两端的channelPipeline
中用一个固定长度来区分,这样也可以解决粘包半包的问题
1.Server
package splicing.fixed;import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;import java.net.InetSocketAddress;public class FixedLengthEchoServer {public static final String RESPONSE = "Welcome to Netty";public static void main(String[] args) throws InterruptedException {FixedLengthEchoServer fixedLengthEchoServer = new FixedLengthEchoServer();System.out.println("服务器即将启动");fixedLengthEchoServer.start();}public void start() throws InterruptedException {FixedLengthServerHandler serverHandler = new FixedLengthServerHandler();EventLoopGroup group = new NioEventLoopGroup();// 线程组try {// 服务端启动必须ServerBootstrap b = new ServerBootstrap();b.group(group)// 将线程组传入.channel(NioServerSocketChannel.class) // 指定使用NIO进行网络传输.localAddress(new InetSocketAddress(Constant.DEFAULT_PORT)) // 指定服务器监听端口// 服务端每接收到一个连接请求,就会新启一个Socket通信,也就是channel// 所以下面这段代码的作用就是为这个子channel增加handler.childHandler(new ChannelInitializerImp());// 异步绑定到服务器,sync()会阻塞直到完成ChannelFuture f = b.bind().sync();System.out.println("服务器启动完成,等待客户端的连接和数据.....");// 阻塞直到服务器的channel关闭f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}}private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new FixedLengthFrameDecoder(FixedLengthEchoClient.REQUEST.length()));ch.pipeline().addLast(new FixedLengthServerHandler());}}
}
package splicing.fixed;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;import java.util.concurrent.atomic.AtomicInteger;public class FixedLengthServerHandler extends ChannelInboundHandlerAdapter {private AtomicInteger counter = new AtomicInteger(0);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf) msg;String request = in.toString(CharsetUtil.UTF_8);System.out.println("Server Accept[" + request +" ] and the counter is :" + counter.incrementAndGet());ctx.writeAndFlush(Unpooled.copiedBuffer(FixedLengthEchoServer.RESPONSE.getBytes()));
// super.channelRead(ctx, msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();
// super.exceptionCaught(ctx, cause);}
}
2.Client
package splicing.fixed;import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;import java.net.InetSocketAddress;public class FixedLengthEchoClient {public static final String REQUEST = "Mark,zhuge,zhouyu,fox,loulan";private final String host;public FixedLengthEchoClient(String host) {this.host = host;}public void start() throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();try {// 客户端启动必须Bootstrap b = new Bootstrap();b.group(group)// 将线程组传入.channel(NioSocketChannel.class) // 指定使用NIO进行网络传输.remoteAddress(new InetSocketAddress(host, Constant.DEFAULT_PORT)).handler(new ChannelInitializerImp());ChannelFuture f = b.connect().sync();System.out.println("已连接到服务器....");f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}}private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new FixedLengthFrameDecoder(FixedLengthEchoServer.RESPONSE.length()));ch.pipeline().addLast(new FixedLengthClientHandler());}}public static void main(String[] args) throws InterruptedException {new FixedLengthEchoClient(Constant.DEFAULT_SERVER_IP).start();}
}
package splicing.fixed;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;import java.util.concurrent.atomic.AtomicInteger;public class FixedLengthClientHandler extends SimpleChannelInboundHandler<ByteBuf> {private AtomicInteger counter = new AtomicInteger(0);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {System.out.println("client Accept [" + msg.toString(CharsetUtil.UTF_8)+ "] and the counter is :" + counter.incrementAndGet());}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf msg = null;for (int i = 0; i < 100; i++) {msg = Unpooled.buffer(FixedLengthEchoClient.REQUEST.length());msg.writeBytes(FixedLengthEchoClient.REQUEST.getBytes());ctx.writeAndFlush(msg);}
// super.channelActive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();
// super.exceptionCaught(ctx, cause);}
}
3.结果展示
这种方案的限制在于要提前直到客户端和服务端要发送的报文,并且这个长度是固定的,不能变,如果变了,还是会产生粘包半包的问题,那么有的人可能会想,有没有更好的解决方案,别着急,下一期我们接着分析