Socket粘包问题终极解决方案—Netty版(2W字)!

作者 | 王磊

来源 | Java中文社群(ID:javacn666)

转载请联系授权(微信ID:GG_Stone)

上一篇我们写了《Socket粘包问题的3种解决方案》,但没想到评论区竟然炸了。介于大家的热情讨论,以及不同的反馈意见,本文就来做一个扩展和延伸,试图找到问题的最优解,以及消息通讯的最优解决方案。

在正式开始之前,我们先对上篇评论中的几个典型问题做一个简单的回复,不感兴趣的朋友可直接划过。

问题一:TCP存在粘包问题吗?

先说答案:TCP 本身并没有粘包和半包一说,因为 TCP 本质上只是一个传输控制协议(Transmission Control Protocol,TCP),它是一种面向连接的、可靠的、基于字节流的传输层通信协议,由 IETF 的 RFC 793 定义。

所谓的协议本质上是一个约定,就好比 Java 编程约定使用驼峰命名法一样,约定的意义是为了让通讯双方,能够正常的进行消息互换的,那粘包和半包问题又是如何产生的呢?

这是因为在 TCP 的交互中,数据是以字节流的形式进行传输的,而“流”的传输是没有边界的,因为没有边界所以就不能区分消息的归属,从而就会产生粘包和半包问题(粘包和半包的定义,详见上一篇)。所以说 TCP 协议本身并不存在粘包和半包问题,只是在使用中如果不能有效的确定流的边界就会产生粘包和半包问题。

问题二:分隔符是最优解决方案?

坦白的说,经过评论区大家的耐心“开导”,我也意识到了以结束符作为最终的解决方案存在一定的局限性,比如当一条消息中间如果出现了结束符就会造成半包的问题,所以如果是复杂的字符串要对内容进行编码和解码处理,这样才能保证结束符的正确性。

问题三:Socket 高效吗?

这个问题的答案是否定的,其实上文在开头已经描述了应用场景:「传统的 Socket 编程」,学习它的意义就在于理解更早期更底层的一些知识,当然作为补充本文会提供更加高效的消息通讯方案——Netty 通讯。


聊完了以上问题,接下来咱们先来补充一下上篇文章中提到的,将消息分为消息头和消息体的代码实现。

一、封装消息头和消息体

在开始写服务器端和客户端之前,咱们先来编写一个消息的封装类,使用它可以将消息封装成消息头和消息体,如下图所示:消息头中存储消息体的长度,从而确定了消息的边界,便解决粘包和半包问题。

1.消息封装类

消息的封装类中提供了两个方法:一个是将消息转换成消息头 + 消息体的方法,另一个是读取消息头的方法,具体实现代码如下:

/*** 消息封装类*/
class SocketPacket {// 消息头存储的长度(占 8 字节)static final int HEAD_SIZE = 8;/*** 将协议封装为:协议头 + 协议体* @param context 消息体(String 类型)* @return byte[]*/public byte[] toBytes(String context) {// 协议体 byte 数组byte[] bodyByte = context.getBytes();int bodyByteLength = bodyByte.length;// 最终封装对象byte[] result = new byte[HEAD_SIZE + bodyByteLength];// 借助 NumberFormat 将 int 转换为 byte[]NumberFormat numberFormat = NumberFormat.getNumberInstance();numberFormat.setMinimumIntegerDigits(HEAD_SIZE);numberFormat.setGroupingUsed(false);// 协议头 byte 数组byte[] headByte = numberFormat.format(bodyByteLength).getBytes();// 封装协议头System.arraycopy(headByte, 0, result, 0, HEAD_SIZE);// 封装协议体System.arraycopy(bodyByte, 0, result, HEAD_SIZE, bodyByteLength);return result;}/*** 获取消息头的内容(也就是消息体的长度)* @param inputStream* @return*/public int getHeader(InputStream inputStream) throws IOException {int result = 0;byte[] bytes = new byte[HEAD_SIZE];inputStream.read(bytes, 0, HEAD_SIZE);// 得到消息体的字节长度result = Integer.valueOf(new String(bytes));return result;}
}

2.编写客户端

接下来我们来定义客户端,在客户端中我们添加一组待发送的消息,随机给服务器端发送一个消息,实现代码如下:

/*** 客户端*/
class MySocketClient {public static void main(String[] args) throws IOException {// 启动 Socket 并尝试连接服务器Socket socket = new Socket("127.0.0.1", 9093);// 发送消息合集(随机发送一条消息)final String[] message = {"Hi,Java.", "Hi,SQL~", "关注公众号|Java中文社群."};// 创建协议封装对象SocketPacket socketPacket = new SocketPacket();try (OutputStream outputStream = socket.getOutputStream()) {// 给服务器端发送 10 次消息for (int i = 0; i < 10; i++) {// 随机发送一条消息String msg = message[new Random().nextInt(message.length)];// 将内容封装为:协议头+协议体byte[] bytes = socketPacket.toBytes(msg);// 发送消息outputStream.write(bytes, 0, bytes.length);outputStream.flush();}}}
}

3.编写服务器端

服务器端我们使用线程池来处理每个客户端的业务请求,实现代码如下:

/*** 服务器端*/
class MySocketServer {public static void main(String[] args) throws IOException {// 创建 Socket 服务器端ServerSocket serverSocket = new ServerSocket(9093);// 获取客户端连接Socket clientSocket = serverSocket.accept();// 使用线程池处理更多的客户端ThreadPoolExecutor threadPool = new ThreadPoolExecutor(100, 150, 100,TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));threadPool.submit(() -> {// 客户端消息处理processMessage(clientSocket);});}/*** 客户端消息处理* @param clientSocket*/private static void processMessage(Socket clientSocket) {// Socket 封装对象SocketPacket socketPacket = new SocketPacket();// 获取客户端发送的消息对象try (InputStream inputStream = clientSocket.getInputStream()) {while (true) {// 获取消息头(也就是消息体的长度)int bodyLength = socketPacket.getHeader(inputStream);// 消息体 byte 数组byte[] bodyByte = new byte[bodyLength];// 每次实际读取字节数int readCount = 0;// 消息体赋值下标int bodyIndex = 0;// 循环接收消息头中定义的长度while (bodyIndex <= (bodyLength - 1) &&(readCount = inputStream.read(bodyByte, bodyIndex, bodyLength)) != -1) {bodyIndex += readCount;}bodyIndex = 0;// 成功接收到客户端的消息并打印System.out.println("接收到客户端的信息:" + new String(bodyByte));}} catch (IOException ioException) {System.out.println(ioException.getMessage());}}
}

以上程序的执行结果如下:从上述结果可以看出,消息通讯正常,客户端和服务器端的交互中并没有出现粘包和半包的问题。

二、使用 Netty 实现高效通讯

以上的内容都是针对传统 Socket 编程的,但要实现更加高效的通讯和连接对象的复用就要使用 NIO(Non-Blocking IO,非阻塞 IO)或者 AIO(Asynchronous IO,异步非阻塞 IO)了。

传统的 Socket 编程是 BIO(Blocking IO,同步阻塞 IO),它和 NIO 和 AIO  的区别如下:

  • BIO 来自传统的 java.io 包,它是基于流模型实现的,交互的方式是同步、阻塞方式,也就是说在读入输入流或者输出流时,在读写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序。它的优点就是代码比较简单、直观;缺点就是 IO 的效率和扩展性很低,容易成为应用性能瓶颈。

  • NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,可以构建多路复用的、同步非阻塞 IO 程序,同时提供了更接近操作系统底层高性能的数据操作方式。

  • AIO 是 Java 1.7 之后引入的包,是 NIO 的升级版本,提供了异步非堵塞的 IO 操作方式,因此人们叫它 AIO(Asynchronous IO),异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。

PS:AIO 可以看作是 NIO 的升级,它也叫 NIO 2。

传统 Socket 的通讯流程:NIO 的通讯流程:

使用 Netty 替代传统 NIO 编程

NIO 的设计思路虽然很好,但它的代码编写比较麻烦,比如 Buffer 的使用和 Selector 的编写等。并且在面对断线重连、包丢失和粘包等复杂问题时手动处理的成本都很大,因此我们通常会使用 Netty 框架来替代传统的 NIO。

Netty 是什么?

Netty 是一个异步、事件驱动的用来做高性能、高可靠性的网络应用框架,使用它可以快速轻松地开发网络应用程序,极大的简化了网络编程的复杂度。

Netty 主要优点有以下几个:

  1. 框架设计优雅,底层模型随意切换适应不同的网络协议要求;

  2. 提供很多标准的协议、安全、编码解码的支持;

  3. 简化了 NIO 使用中的诸多不便;

  4. 社区非常活跃,很多开源框架中都使用了 Netty 框架,如 Dubbo、RocketMQ、Spark 等。

Netty 主要包含以下 3 个部分,如下图所示:

这 3 个部分的功能介绍如下。

1. Core 核心层

Core 核心层是 Netty 最精华的内容,它提供了底层网络通信的通用抽象和实现,包括可扩展的事件模型、通用的通信 API、支持零拷贝的 ByteBuf 等。

2. Protocol Support 协议支持层

协议支持层基本上覆盖了主流协议的编解码实现,如 HTTP、SSL、Protobuf、压缩、大文件传输、WebSocket、文本、二进制等主流协议,此外 Netty 还支持自定义应用层协议。Netty 丰富的协议支持降低了用户的开发成本,基于 Netty 我们可以快速开发 HTTP、WebSocket 等服务。

3. Transport Service 传输服务层

传输服务层提供了网络传输能力的定义和实现方法。它支持 Socket、HTTP 隧道、虚拟机管道等传输方式。Netty 对 TCP、UDP 等数据传输做了抽象和封装,用户可以更聚焦在业务逻辑实现上,而不必关系底层数据传输的细节。

Netty 使用

对 Netty 有了大概的认识之后,接下来我们用 Netty 来编写一个基础的通讯服务器,它包含两个端:服务器端和客户端,客户端负责发送消息,服务器端负责接收并打印消息,具体的实现步骤如下。

1.添加 Netty 框架

首先我们需要先添加 Netty 框架的支持,如果是 Maven 项目添加如下配置即可:

<!-- 添加 Netty 框架 -->
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.56.Final</version>
</dependency>
Netty 版本说明

Netty 的 3.x 和 4.x 为主流的稳定版本,而最新的 5.x 已经是放弃的测试版了,因此推荐使用 Netty 4.x 的最新稳定版。

2. 服务器端实现代码

按照官方的推荐,这里将服务器端的代码分为以下 3 个部分:

  • MyNettyServer:服务器端的核心业务代码;

  • ServerInitializer:服务器端通道(Channel)初始化;

  • ServerHandler:服务器端接收到信息之后的处理逻辑。

PS:Channel 字面意思为“通道”,它是网络通信的载体。Channel 提供了基本的 API 用于网络 I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 自己实现的 Channel 是以 JDK NIO Channel 为基础的,相比较于 JDK NIO,Netty 的 Channel 提供了更高层次的抽象,同时屏蔽了底层 Socket 的复杂性,赋予了 Channel 更加强大的功能,你在使用 Netty 时基本不需要再与 Java Socket 类直接打交道。

服务器端的实现代码如下:

// 定义服务器的端口号
static final int PORT = 8007;/*** 服务器端*/
static class MyNettyServer {public static void main(String[] args) {// 创建一个线程组,用来负责接收客户端连接EventLoopGroup bossGroup = new NioEventLoopGroup();// 创建另一个线程组,用来负责 I/O 的读写EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 创建一个 Server 实例(可理解为 Netty 的入门类)ServerBootstrap b = new ServerBootstrap();// 将两个线程池设置到 Server 实例b.group(bossGroup, workerGroup)// 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器).channel(NioServerSocketChannel.class)// 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类).childHandler(new ServerInitializer());// 绑定端口并且进行同步ChannelFuture future = b.bind(PORT).sync();// 对关闭通道进行监听future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 资源关闭bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}/*** 服务端通道初始化*/
static class ServerInitializer extends ChannelInitializer<SocketChannel> {// 字符串编码器和解码器private static final StringDecoder DECODER = new StringDecoder();private static final StringEncoder ENCODER = new StringEncoder();// 服务器端连接之后的执行器(自定义的类)private static final ServerHandler SERVER_HANDLER = new ServerHandler();/*** 初始化通道的具体执行方法*/@Overridepublic void initChannel(SocketChannel ch) {// 通道 Channel 设置ChannelPipeline pipeline = ch.pipeline();// 设置(字符串)编码器和解码器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);// 服务器端连接之后的执行器,接收到消息之后的业务处理pipeline.addLast(SERVER_HANDLER);}
}/*** 服务器端接收到消息之后的业务处理类*/
static class ServerHandler extends SimpleChannelInboundHandler<String> {/*** 读取到客户端的消息*/@Overridepublic void channelRead0(ChannelHandlerContext ctx, String request) {if (!request.isEmpty()) {System.out.println("接到客户端的消息:" + request);}}/*** 数据读取完毕*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}/*** 异常处理,打印异常并关闭通道*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

3.客户端实现代码

客户端的代码实现也是分为以下 3 个部分:

  • MyNettyClient:客户端核心业务代码;

  • ClientInitializer:客户端通道初始化;

  • ClientHandler:接收到消息之后的处理逻辑。

客户端的实现代码如下:

/*** 客户端*/
static class MyNettyClient {public static void main(String[] args) {// 创建事件循环线程组(客户端的线程组只有一个)EventLoopGroup group = new NioEventLoopGroup();try {// Netty 客户端启动对象Bootstrap b = new Bootstrap();// 设置启动参数b.group(group)// 设置通道类型.channel(NioSocketChannel.class)// 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类).handler(new ClientInitializer());// 连接服务器端并同步通道Channel ch = b.connect("127.0.0.1", 8007).sync().channel();// 发送消息ChannelFuture lastWriteFuture = null;// 给服务器端发送 10 条消息for (int i = 0; i < 10; i++) {// 发送给服务器消息lastWriteFuture = ch.writeAndFlush("Hi,Java.");}// 在关闭通道之前,同步刷新所有的消息if (lastWriteFuture != null) {lastWriteFuture.sync();}} catch (InterruptedException e) {e.printStackTrace();} finally {// 释放资源group.shutdownGracefully();}}
}/*** 客户端通道初始化类*/
static class ClientInitializer extends ChannelInitializer<SocketChannel> {// 字符串编码器和解码器private static final StringDecoder DECODER = new StringDecoder();private static final StringEncoder ENCODER = new StringEncoder();// 客户端连接成功之后业务处理private static final ClientHandler CLIENT_HANDLER = new ClientHandler();/*** 初始化客户端通道*/@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 设置(字符串)编码器和解码器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);// 客户端连接成功之后的业务处理pipeline.addLast(CLIENT_HANDLER);}
}/*** 客户端连接成功之后的业务处理*/
static class ClientHandler extends SimpleChannelInboundHandler<String> {/*** 读取到服务器端的消息*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {System.err.println("接到服务器的消息:" + msg);}/*** 异常处理,打印异常并关闭通道*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

从以上代码可以看出,我们代码实现的功能是,客户端给服务器端发送 10 条消息。

编写完上述代码之后,我们就可以启动服务器端和客户端了,启动之后,它们的执行结果如下:从上述结果中可以看出,虽然客户端和服务器端实现了通信,但在 Netty 的使用中依然存在粘包的问题,服务器端一次收到了 10 条消息,而不是每次只收到一条消息,因此接下来我们要解决掉 Netty 中的粘包问题。

三、解决 Netty 粘包问题

在 Netty 中,解决粘包问题的常用方案有以下 3 种:

  1. 设置固定大小的消息长度,如果长度不足则使用空字符弥补,它的缺点比较明显,比较消耗网络流量,因此不建议使用;

  2. 使用分隔符来确定消息的边界,从而避免粘包和半包问题的产生;

  3. 将消息分为消息头和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息。

接下来我们分别来看后两种推荐的解决方案。

1.使用分隔符解决粘包问题

在 Netty 中提供了 DelimiterBasedFrameDecoder 类用来以特殊符号作为消息的结束符,从而解决粘包和半包的问题。

它的核心实现代码是在初始化通道(Channel)时,通过设置 DelimiterBasedFrameDecoder 来分隔消息,需要在客户端和服务器端都进行设置,具体实现代码如下。

服务器端核心实现代码如下:

/*** 服务端通道初始化*/
static class ServerInitializer extends ChannelInitializer<SocketChannel> {// 字符串编码器和解码器private static final StringDecoder DECODER = new StringDecoder();private static final StringEncoder ENCODER = new StringEncoder();// 服务器端连接之后的执行器(自定义的类)private static final ServerHandler SERVER_HANDLER = new ServerHandler();/*** 初始化通道的具体执行方法*/@Overridepublic void initChannel(SocketChannel ch) {// 通道 Channel 设置ChannelPipeline pipeline = ch.pipeline();// 19 行:设置结尾分隔符【核心代码】(参数1:为消息的最大长度,可自定义;参数2:分隔符[此处以换行符为分隔符])pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));// 设置(字符串)编码器和解码器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);// 服务器端连接之后的执行器,接收到消息之后的业务处理pipeline.addLast(SERVER_HANDLER);}
}

核心代码为第 19 行,代码中已经备注了方法的含义,这里就不再赘述。

客户端的核心实现代码如下:

/*** 客户端通道初始化类*/
static class ClientInitializer extends ChannelInitializer<SocketChannel> {// 字符串编码器和解码器private static final StringDecoder DECODER = new StringDecoder();private static final StringEncoder ENCODER = new StringEncoder();// 客户端连接成功之后业务处理private static final ClientHandler CLIENT_HANDLER = new ClientHandler();/*** 初始化客户端通道*/@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 17 行:设置结尾分隔符【核心代码】(参数1:为消息的最大长度,可自定义;参数2:分隔符[此处以换行符为分隔符])pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));// 设置(字符串)编码器和解码器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);// 客户端连接成功之后的业务处理pipeline.addLast(CLIENT_HANDLER);}
}

完整的服务器端和客户端的实现代码如下:

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class NettyExample {// 定义服务器的端口号static final int PORT = 8007;/*** 服务器端*/static class MyNettyServer {public static void main(String[] args) {// 创建一个线程组,用来负责接收客户端连接EventLoopGroup bossGroup = new NioEventLoopGroup();// 创建另一个线程组,用来负责 I/O 的读写EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 创建一个 Server 实例(可理解为 Netty 的入门类)ServerBootstrap b = new ServerBootstrap();// 将两个线程池设置到 Server 实例b.group(bossGroup, workerGroup)// 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器).channel(NioServerSocketChannel.class)// 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类).childHandler(new ServerInitializer());// 绑定端口并且进行同步ChannelFuture future = b.bind(PORT).sync();// 对关闭通道进行监听future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 资源关闭bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}/*** 服务端通道初始化*/static class ServerInitializer extends ChannelInitializer<SocketChannel> {// 字符串编码器和解码器private static final StringDecoder DECODER = new StringDecoder();private static final StringEncoder ENCODER = new StringEncoder();// 服务器端连接之后的执行器(自定义的类)private static final ServerHandler SERVER_HANDLER = new ServerHandler();/*** 初始化通道的具体执行方法*/@Overridepublic void initChannel(SocketChannel ch) {// 通道 Channel 设置ChannelPipeline pipeline = ch.pipeline();// 设置结尾分隔符pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));// 设置(字符串)编码器和解码器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);// 服务器端连接之后的执行器,接收到消息之后的业务处理pipeline.addLast(SERVER_HANDLER);}}/*** 服务器端接收到消息之后的业务处理类*/static class ServerHandler extends SimpleChannelInboundHandler<String> {/*** 读取到客户端的消息*/@Overridepublic void channelRead0(ChannelHandlerContext ctx, String request) {if (!request.isEmpty()) {System.out.println("接到客户端的消息:" + request);}}/*** 数据读取完毕*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}/*** 异常处理,打印异常并关闭通道*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}}/*** 客户端*/static class MyNettyClient {public static void main(String[] args) {// 创建事件循环线程组(客户端的线程组只有一个)EventLoopGroup group = new NioEventLoopGroup();try {// Netty 客户端启动对象Bootstrap b = new Bootstrap();// 设置启动参数b.group(group)// 设置通道类型.channel(NioSocketChannel.class)// 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类).handler(new ClientInitializer());// 连接服务器端并同步通道Channel ch = b.connect("127.0.0.1", PORT).sync().channel();// 发送消息ChannelFuture lastWriteFuture = null;// 给服务器端发送 10 条消息for (int i = 0; i < 10; i++) {// 发送给服务器消息lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");}// 在关闭通道之前,同步刷新所有的消息if (lastWriteFuture != null) {lastWriteFuture.sync();}} catch (InterruptedException e) {e.printStackTrace();} finally {// 释放资源group.shutdownGracefully();}}}/*** 客户端通道初始化类*/static class ClientInitializer extends ChannelInitializer<SocketChannel> {// 字符串编码器和解码器private static final StringDecoder DECODER = new StringDecoder();private static final StringEncoder ENCODER = new StringEncoder();// 客户端连接成功之后业务处理private static final ClientHandler CLIENT_HANDLER = new ClientHandler();/*** 初始化客户端通道*/@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 设置结尾分隔符pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));// 设置(字符串)编码器和解码器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);// 客户端连接成功之后的业务处理pipeline.addLast(CLIENT_HANDLER);}}/*** 客户端连接成功之后的业务处理*/static class ClientHandler extends SimpleChannelInboundHandler<String> {/*** 读取到服务器端的消息*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {System.err.println("接到服务器的消息:" + msg);}/*** 异常处理,打印异常并关闭通道*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}}
}

最终的执行结果如下图所示:从上述结果中可以看出,Netty 可以正常使用了,它已经不存在粘包和半包问题了。

2.封装消息解决粘包问题

此解决方案的核心是将消息分为消息头 + 消息体,在消息头中保存消息体的长度,从而确定一条消息的边界,这样就避免了粘包和半包问题了,它的实现过程如下图所示:在 Netty 中可以通过 LengthFieldPrepender(编码)和 LengthFieldBasedFrameDecoder(解码)两个类实现消息的封装。和上一个解决方案类似,我们需要分别在服务器端和客户端通过设置通道(Channel)来解决粘包问题。

服务器端的核心代码如下:

/*** 服务端通道初始化*/
static class ServerInitializer extends ChannelInitializer<SocketChannel> {// 字符串编码器和解码器private static final StringDecoder DECODER = new StringDecoder();private static final StringEncoder ENCODER = new StringEncoder();// 服务器端连接之后的执行器(自定义的类)private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();/*** 初始化通道的具体执行方法*/@Overridepublic void initChannel(SocketChannel ch) {// 通道 Channel 设置ChannelPipeline pipeline = ch.pipeline();// 18 行:消息解码:读取消息头和消息体pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));// 20 行:消息编码:将消息封装为消息头和消息体,在消息前添加消息体的长度pipeline.addLast(new LengthFieldPrepender(4));// 设置(字符串)编码器和解码器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);// 服务器端连接之后的执行器,接收到消息之后的业务处理pipeline.addLast(SERVER_HANDLER);}
}

其中核心代码是 18 行和 20 行,通过 LengthFieldPrepender 实现编码(将消息打包成消息头 + 消息体),通过 LengthFieldBasedFrameDecoder 实现解码(从封装的消息中取出消息的内容)。

LengthFieldBasedFrameDecoder 的参数说明如下:

  • 参数 1:maxFrameLength - 发送的数据包最大长度;

  • 参数 2:lengthFieldOffset - 长度域偏移量,指的是长度域位于整个数据包字节数组中的下标;

  • 参数 3:lengthFieldLength - 长度域自己的字节数长度;

  • 参数 4:lengthAdjustment – 长度域的偏移量矫正。如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。矫正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长;

  • 参数 5:initialBytesToStrip – 丢弃的起始字节数。丢弃处于有效数据前面的字节数量。比如前面有 4 个节点的长度域,则它的值为 4。

LengthFieldBasedFrameDecoder(1024,0,4,0,4) 的意思是:数据包最大长度为 1024,长度域占首部的四个字节,在读数据的时候去掉首部四个字节(即长度域)。

客户端的核心实现代码如下:

/*** 客户端通道初始化类*/
static class ClientInitializer extends ChannelInitializer<SocketChannel> {// 字符串编码器和解码器private static final StringDecoder DECODER = new StringDecoder();private static final StringEncoder ENCODER = new StringEncoder();// 客户端连接成功之后业务处理private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();/*** 初始化客户端通道*/@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 消息解码:读取消息头和消息体pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));// 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度pipeline.addLast(new LengthFieldPrepender(4));// 设置(字符串)编码器和解码器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);// 客户端连接成功之后的业务处理pipeline.addLast(CLIENT_HANDLER);}
}

完整的服务器端和客户端的实现代码如下:

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;/*** 通过封装 Netty 来解决粘包*/
public class NettyExample {// 定义服务器的端口号static final int PORT = 8007;/*** 服务器端*/static class MyNettyServer {public static void main(String[] args) {// 创建一个线程组,用来负责接收客户端连接EventLoopGroup bossGroup = new NioEventLoopGroup();// 创建另一个线程组,用来负责 I/O 的读写EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 创建一个 Server 实例(可理解为 Netty 的入门类)ServerBootstrap b = new ServerBootstrap();// 将两个线程池设置到 Server 实例b.group(bossGroup, workerGroup)// 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器).channel(NioServerSocketChannel.class)// 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类).childHandler(new NettyExample.ServerInitializer());// 绑定端口并且进行同步ChannelFuture future = b.bind(PORT).sync();// 对关闭通道进行监听future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 资源关闭bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}/*** 服务端通道初始化*/static class ServerInitializer extends ChannelInitializer<SocketChannel> {// 字符串编码器和解码器private static final StringDecoder DECODER = new StringDecoder();private static final StringEncoder ENCODER = new StringEncoder();// 服务器端连接之后的执行器(自定义的类)private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();/*** 初始化通道的具体执行方法*/@Overridepublic void initChannel(SocketChannel ch) {// 通道 Channel 设置ChannelPipeline pipeline = ch.pipeline();// 消息解码:读取消息头和消息体pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));// 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度pipeline.addLast(new LengthFieldPrepender(4));// 设置(字符串)编码器和解码器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);// 服务器端连接之后的执行器,接收到消息之后的业务处理pipeline.addLast(SERVER_HANDLER);}}/*** 服务器端接收到消息之后的业务处理类*/static class ServerHandler extends SimpleChannelInboundHandler<String> {/*** 读取到客户端的消息*/@Overridepublic void channelRead0(ChannelHandlerContext ctx, String request) {if (!request.isEmpty()) {System.out.println("接到客户端的消息:" + request);}}/*** 数据读取完毕*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}/*** 异常处理,打印异常并关闭通道*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}}/*** 客户端*/static class MyNettyClient {public static void main(String[] args) {// 创建事件循环线程组(客户端的线程组只有一个)EventLoopGroup group = new NioEventLoopGroup();try {// Netty 客户端启动对象Bootstrap b = new Bootstrap();// 设置启动参数b.group(group)// 设置通道类型.channel(NioSocketChannel.class)// 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类).handler(new NettyExample.ClientInitializer());// 连接服务器端并同步通道Channel ch = b.connect("127.0.0.1", PORT).sync().channel();// 发送消息ChannelFuture lastWriteFuture = null;// 给服务器端发送 10 条消息for (int i = 0; i < 10; i++) {// 发送给服务器消息lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");}// 在关闭通道之前,同步刷新所有的消息if (lastWriteFuture != null) {lastWriteFuture.sync();}} catch (InterruptedException e) {e.printStackTrace();} finally {// 释放资源group.shutdownGracefully();}}}/*** 客户端通道初始化类*/static class ClientInitializer extends ChannelInitializer<SocketChannel> {// 字符串编码器和解码器private static final StringDecoder DECODER = new StringDecoder();private static final StringEncoder ENCODER = new StringEncoder();// 客户端连接成功之后业务处理private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();/*** 初始化客户端通道*/@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 消息解码:读取消息头和消息体pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));// 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度pipeline.addLast(new LengthFieldPrepender(4));// 设置(字符串)编码器和解码器pipeline.addLast(DECODER);pipeline.addLast(ENCODER);// 客户端连接成功之后的业务处理pipeline.addLast(CLIENT_HANDLER);}}/*** 客户端连接成功之后的业务处理*/static class ClientHandler extends SimpleChannelInboundHandler<String> {/*** 读取到服务器端的消息*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {System.err.println("接到服务器的消息:" + msg);}/*** 异常处理,打印异常并关闭通道*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}}
}

以上程序的执行结果为:

四、总结

本文提供了传统 Socket 通讯将消息分为消息头和消息体的具体代码实现,然而传统的 Socket 在性能和复用性上表现一般,为了更加高效的实现通讯,我们可以使用 Netty 框架来替代传统的 Socket 和 NIO 编程,但 Netty 在使用时依然会出现粘包的问题,于是我们提供了两种最常见的解决方案:通过分隔符或将封装消息的解决方案,其中最后一种解决方案的使用更加广泛。

参考 & 鸣谢

《Netty 核心原理剖析与 RPC 实践》


往期推荐

Socket粘包问题的3种解决方案,最后一种最完美!


文件写入的6种方法,这种方法性能最好


SpringBoot集成Google开源图片处理框架,贼好用!


关注我,每天陪你进步一点点!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/545047.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java高质量代码之 — 泛型与反射

在Java5后推出了泛型,使我们在编译期间操作集合或类时更加的安全,更方便代码的阅读,而让身为编译性语言的Java提供动态性的反射技术,更是在框架开发中大行其道,从而让Java活起来,下面看一下在使用泛型和反射需要注意和了解的事情 1.Java的泛型是类型擦除的 Java中的泛型是…

Redis 消息队列的三种方案(List、Streams、Pub/Sub)

现如今的互联网应用大都是采用 分布式系统架构 设计的&#xff0c;所以 消息队列 已经逐渐成为企业应用系统 内部通信 的核心手段&#xff0c;它具有 低耦合、可靠投递、广播、流量控制、最终一致性 等一系列功能。当前使用较多的 消息队列 有 RabbitMQ、RocketMQ、ActiveMQ、K…

c struct 对齐_C中的struct大小| 填充,结构对齐

c struct 对齐What we know is that size of a struct is the sum of all the data members. Like for the following struct, 我们知道的是&#xff0c; 结构的大小是所有数据成员的总和 。 对于以下结构&#xff0c; struct A{int a;int* b;char c;char *d;};Size of the st…

超3000岗位!腾讯产业互联网新年大扩招!

虽然离春节仅剩 1 个月的时间&#xff0c;大厂依旧没有停止招人。就在上周&#xff0c;腾讯官宣新年大扩招&#xff0c;放出 3000 多个岗位需求&#xff01;我们查看了腾讯的招聘数据发现&#xff0c;除了大量招聘运营人员&#xff0c;你猜&#xff0c;他们还在批量招聘什么岗位…

骚操作,IDEA防止写代码沉迷插件 !

当初年少懵懂&#xff0c;那年夏天填志愿选专业&#xff0c;父母听其他长辈说选择计算机专业好。从那以后&#xff0c;我的身上就有了计院深深的烙印。从寝室到机房&#xff0c;从机房到图书馆&#xff0c;C、C、Java、只要是想写点自己感兴趣的东西&#xff0c;一坐就是几个小…

css属性 content

对css一直没有很系统得学习过,练习得也不是很多,纯小白.今天在写一个页面的时候,遇到一个问题,就是如何让外面的盒子适应里面的盒子大小,完美地把小盒子包在里面. 由于里面是一个列表 ul,为了让元素横排,我使用了float:right这个属性,所以列表悬浮了.如图: 其实当然可以直接给外…

一文汇总 JDK 5 到 JDK 15 中的牛逼功能!

前言JDK 16 马上就要发布啦&#xff08;预计 2021.3.16 日发布&#xff09;&#xff0c;所以在发布之前&#xff0c;让我们先来回顾一下 JDK 5-15 的新特性吧&#xff0c;大家一起学起来~Java 5 新特性1. 泛型泛型本质是参数化类型&#xff0c;解决不确定具体对象类型的问题。L…

Tomcat 6.0 简介

本片翻译来自&#xff1a;http://tomcat.apache.org/tomcat-6.0-doc/introduction.html 介绍 无论是开发者还是tomcat管理员在使用前都需要了解一些必要的信息&#xff0c;本篇简单的介绍tomcat中的一些术语和概念。比如context是web应用的意思。CATALINA_HOME 在文档中&#x…

Docker部署SpringBoot的两种方法,后一种一键部署超好用!

作者 | LemonSquash来源 | cnblogs.com/npeng/p/14267007.html1.手工方式1.1.准备Springboot jar项目将项目打包成jar1.2.编写DockerfileFROM java:8 VOLUME /tmp ADD elk-web-1.0-SNAPSHOT.jar elk.jar EXPOSE 8080 ENTRYPOINT ["java","-Djava.security.egdfi…

UISwitch 添加 标签

给UISwitch添加一个标签。左右滑动时候出现开关标签内容。 代码&#xff1a; // // UISwitchJGLabel.h // JGSwitch // // Created by sl on 15/4/11. // Copyright (c) 2015年 Madordie. All rights reserved. // // // 说明&#xff1a; // 1.给UISwitch添加开关标…

爱了!蚂蚁开源的“SpringBoot”框架,新增了这6项功能...

SOFABoot 是蚂蚁金服开源的基于 Spring Boot 的研发框架&#xff0c;它在 Spring Boot 的基础上&#xff0c;提供了诸如 Readiness Check&#xff0c;类隔离&#xff0c;日志空间隔离等等能力。在增强了 Spring Boot 的同时&#xff0c;SOFABoot 提供了让用户可以在 Spring Boo…

PUC的完整形式是什么?

PUC&#xff1a;大学预科/污染控制/个人解锁码 (PUC: Pre University Course / Pollution Under Control / Personal Unlock Code) 1)PUC&#xff1a;大学预科课程 (1) PUC: Pre University Course) PUC is an abbreviation of the Pre University Course. It alludes to an in…

过滤器VS拦截器的4个区别,看完豁然开朗!

Spring的拦截器与Servlet的Filter有相似之处&#xff0c;比如二者都是AOP编程思想的体现&#xff0c;都能实现权限检查、日志记录等。但它们之间又有很大区别&#xff0c;所以本文磊哥就带大家全面了解一下什么是过滤器&#xff1f;什么是拦截器&#xff1f;以及二者有什么区别…

分布式ID生成的9种方法,特好用!

前言业务量小于500W或数据容量小于2G的时候单独一个mysql即可提供服务&#xff0c;再大点的时候就进行读写分离也可以应付过来。但当主从同步也扛不住的是就需要分表分库了&#xff0c;但分库分表后需要有一个唯一ID来标识一条数据&#xff0c;数据库的自增ID显然不能满足需求&…

8051 管脚定义_8051微控制器的引脚说明

8051 管脚定义8051微控制器的引脚说明 (Pin Description of 8051 Microcontroller) Pins from 1-8 1-8针 Port 1: The pins in this port are bi-directional and can be used for input and output. The pins are individually controlled; some are used for input while ot…

android 事件分发

2019独角兽企业重金招聘Python工程师标准>>> 文章来源于CSDN http://blog.csdn.net/lanhuzi9999/article/details/26515421 转载于:https://my.oschina.net/lhjtianji/blog/398998

对象复制的7种方法,还是Spring的最好用!

日常编程中&#xff0c;我们会经常会碰到对象属性复制的场景&#xff0c;就比如下面这样一个常见的三层 MVC 架构。当我们在上面的架构下编程时&#xff0c;我们通常需要经历对象转化&#xff0c;将业务请求流程经历三层机构后需要把 DTO 转为DO然后在数据库中保存。当需要从数…

Java中的Switch都支持String了,为什么不支持long?

来源 | jitwxs.cn/6f3eddff.html我们知道 Java Switch 支持byte、short、int 类型&#xff0c;在 JDK 1.5 时&#xff0c;支持了枚举类型&#xff0c;在 JDK 1.7 时&#xff0c;又支持了 String类型。那么它为什么就不能支持 long 类型呢&#xff0c;明明它跟 byte、short、int…

什么是WebSocket,以及如何在Python中使用它?

什么是WebSocket&#xff1f; (What is WebSocket?) WebSocket is a communications protocol which provides a full-duplex communication channel over a single TCP connection. WebSocket protocol is standardized by the IETF as RFC 6455. WebSocket是一种通信协议&am…

final的8个小细节,听说只有高手才知道!你知道几个?

final关键字是一个常用的关键字&#xff0c;可以修饰变量、方法、类&#xff0c;用来表示它修饰的类、方法和变量不可改变&#xff0c;下面就聊一下使用 final 关键字的一些小细节。细节一、final 修饰类成员变量和实例成员变量的赋值时机对于类变量&#xff1a;声明变量的时候…