Netty核心原理剖析与RPC实践16-20

Netty核心原理剖析与RPC实践16-20

16 IO 加速:与众不同的 Netty 零拷贝技术

今天的课程我们继续讨论 Netty 实现高性能的另一个高阶特性——零拷贝。零拷贝是一个耳熟能详的词语,在 Linux、Kafka、RocketMQ 等知名的产品中都有使用,通常用于提升 I/O 性能。而且零拷贝也是面试过程中的高频问题,那么你知道零拷贝体现在哪些地方吗?Netty 的零拷贝技术又是如何实现的呢?接下来我们就针对 Netty 零拷贝特性进行详细地分析。

传统 Linux 中的零拷贝技术

在介绍 Netty 零拷贝特性之前,我们有必要学习下传统 Linux 中零拷贝的工作原理。所谓零拷贝,就是在数据操作时,不需要将数据从一个内存位置拷贝到另外一个内存位置,这样可以减少一次内存拷贝的损耗,从而节省了 CPU 时钟周期和内存带宽。

我们模拟一个场景,从文件中读取数据,然后将数据传输到网络上,那么传统的数据拷贝过程会分为哪几个阶段呢?具体如下图所示。

Drawing 0.png

从上图中可以看出,从数据读取到发送一共经历了四次数据拷贝,具体流程如下:

  1. 当用户进程发起 read() 调用后,上下文从用户态切换至内核态。DMA 引擎从文件中读取数据,并存储到内核态缓冲区,这里是第一次数据拷贝
  2. 请求的数据从内核态缓冲区拷贝到用户态缓冲区,然后返回给用户进程。第二次数据拷贝的过程同时,会导致上下文从内核态再次切换到用户态。
  3. 用户进程调用 send() 方法期望将数据发送到网络中,此时会触发第三次线程切换,用户态会再次切换到内核态,请求的数据从用户态缓冲区被拷贝到 Socket 缓冲区。
  4. 最终 send() 系统调用结束返回给用户进程,发生了第四次上下文切换。第四次拷贝会异步执行,从 Socket 缓冲区拷贝到协议引擎中。

说明:DMA(Direct Memory Access,直接内存存取)是现代大部分硬盘都支持的特性,DMA 接管了数据读写的工作,不需要 CPU 再参与 I/O 中断的处理,从而减轻了 CPU 的负担。

传统的数据拷贝过程为什么不是将数据直接传输到用户缓冲区呢?其实引入内核缓冲区可以充当缓存的作用,这样就可以实现文件数据的预读,提升 I/O 的性能。但是当请求数据量大于内核缓冲区大小时,在完成一次数据的读取到发送可能要经历数倍次数的数据拷贝,这就造成严重的性能损耗。

接下来我们介绍下使用零拷贝技术之后数据传输的流程。重新回顾一遍传统数据拷贝的过程,可以发现第二次和第三次拷贝是可以去除的,DMA 引擎从文件读取数据后放入到内核缓冲区,然后可以直接从内核缓冲区传输到 Socket 缓冲区,从而减少内存拷贝的次数。

在 Linux 中系统调用 sendfile() 可以实现将数据从一个文件描述符传输到另一个文件描述符,从而实现了零拷贝技术。在 Java 中也使用了零拷贝技术,它就是 NIO FileChannel 类中的 transferTo() 方法,transferTo() 底层就依赖了操作系统零拷贝的机制,它可以将数据从 FileChannel 直接传输到另外一个 Channel。transferTo() 方法的定义如下:

public abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException;

FileChannel#transferTo() 的使用也非常简单,我们直接看如下的代码示例,通过 transferTo() 将 from.data 传输到 to.data(),等于实现了文件拷贝的功能。

public void testTransferTo() throws IOException {RandomAccessFile fromFile = new RandomAccessFile("from.data", "rw");FileChannel fromChannel = fromFile.getChannel();RandomAccessFile toFile = new RandomAccessFile("to.data", "rw");FileChannel toChannel = toFile.getChannel();long position = 0;long count = fromChannel.size();fromChannel.transferTo(position, count, toChannel);
}

在使用了 FileChannel#transferTo() 传输数据之后,我们看下数据拷贝流程发生了哪些变化,如下图所示:

Drawing 1.png

比较大的一个变化是,DMA 引擎从文件中读取数据拷贝到内核态缓冲区之后,由操作系统直接拷贝到 Socket 缓冲区,不再拷贝到用户态缓冲区,所以数据拷贝的次数从之前的 4 次减少到 3 次。

但是上述的优化离达到零拷贝的要求还是有差距的,能否继续减少内核中的数据拷贝次数呢?在 Linux 2.4 版本之后,开发者对 Socket Buffer 追加一些 Descriptor 信息来进一步减少内核数据的复制。如下图所示,DMA 引擎读取文件内容并拷贝到内核缓冲区,然后并没有再拷贝到 Socket 缓冲区,只是将数据的长度以及位置信息被追加到 Socket 缓冲区,然后 DMA 引擎根据这些描述信息,直接从内核缓冲区读取数据并传输到协议引擎中,从而消除最后一次 CPU 拷贝。

Drawing 2.png

通过上述 Linux 零拷贝技术的介绍,你也许还会存在疑问,最终使用零拷贝之后,不是还存在着数据拷贝操作吗?其实从 Linux 操作系统的角度来说,零拷贝就是为了避免用户态和内核态之间的数据拷贝。无论是传统的数据拷贝还是使用零拷贝技术,其中有 2 次 DMA 的数据拷贝必不可少,只是这 2 次 DMA 拷贝都是依赖硬件来完成,不需要 CPU 参与。所以,在这里我们讨论的零拷贝是个广义的概念,只要能够减少不必要的 CPU 拷贝,都可以被称为零拷贝。

Netty 的零拷贝技术

介绍完传统 Linux 的零拷贝技术之后,我们再来学习下 Netty 中的零拷贝如何实现。Netty 中的零拷贝和传统 Linux 的零拷贝不太一样。Netty 中的零拷贝技术除了操作系统级别的功能封装,更多的是面向用户态的数据操作优化,主要体现在以下 5 个方面:

  • 堆外内存,避免 JVM 堆内存到堆外内存的数据拷贝。
  • CompositeByteBuf 类,可以组合多个 Buffer 对象合并成一个逻辑上的对象,避免通过传统内存拷贝的方式将几个 Buffer 合并成一个大的 Buffer。
  • 通过 Unpooled.wrappedBuffer 可以将 byte 数组包装成 ByteBuf 对象,包装过程中不会产生内存拷贝。
  • ByteBuf.slice 操作与 Unpooled.wrappedBuffer 相反,slice 操作可以将一个 ByteBuf 对象切分成多个 ByteBuf 对象,切分过程中不会产生内存拷贝,底层共享一个 byte 数组的存储空间。
  • Netty 使用 FileRegion 实现文件传输,FileRegion 底层封装了 FileChannel#transferTo() 方法,可以将文件缓冲区的数据直接传输到目标 Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝,这属于操作系统级别的零拷贝。

下面我们从以上 5 个方面逐一进行介绍。

堆外内存

如果在 JVM 内部执行 I/O 操作时,必须将数据拷贝到堆外内存,才能执行系统调用。这是所有 VM 语言都会存在的问题。那么为什么操作系统不能直接使用 JVM 堆内存进行 I/O 的读写呢?主要有两点原因:第一,操作系统并不感知 JVM 的堆内存,而且 JVM 的内存布局与操作系统所分配的是不一样的,操作系统并不会按照 JVM 的行为来读写数据。第二,同一个对象的内存地址随着 JVM GC 的执行可能会随时发生变化,例如 JVM GC 的过程中会通过压缩来减少内存碎片,这就涉及对象移动的问题了。

Netty 在进行 I/O 操作时都是使用的堆外内存,可以避免数据从 JVM 堆内存到堆外内存的拷贝。

CompositeByteBuf

CompositeByteBuf 是 Netty 中实现零拷贝机制非常重要的一个数据结构,CompositeByteBuf 可以理解为一个虚拟的 Buffer 对象,它是由多个 ByteBuf 组合而成,但是在 CompositeByteBuf 内部保存着每个 ByteBuf 的引用关系,从逻辑上构成一个整体。比较常见的像 HTTP 协议数据可以分为头部信息 header消息体数据 body,分别存在两个不同的 ByteBuf 中,通常我们需要将两个 ByteBuf 合并成一个完整的协议数据进行发送,可以使用如下方式完成:

ByteBuf httpBuf = Unpooled.buffer(header.readableBytes() + body.readableBytes());
httpBuf.writeBytes(header);
httpBuf.writeBytes(body);

可以看出,如果想实现 header 和 body 这两个 ByteBuf 的合并,需要先初始化一个新的 httpBuf,然后再将 header 和 body 分别拷贝到新的 httpBuf。合并过程中涉及两次 CPU 拷贝,这非常浪费性能。如果使用 CompositeByteBuf 如何实现类似的需求呢?如下所示:

CompositeByteBuf httpBuf = Unpooled.compositeBuffer();
httpBuf.addComponents(true, header, body);

CompositeByteBuf 通过调用 addComponents() 方法来添加多个 ByteBuf,但是底层的 byte 数组是复用的,不会发生内存拷贝。但对于用户来说,它可以当作一个整体进行操作。那么 CompositeByteBuf 内部是如何存放这些 ByteBuf,并且如何进行合并的呢?我们先通过一张图看下 CompositeByteBuf 的内部结构:

Drawing 3.png

从图上可以看出,CompositeByteBuf 内部维护了一个 Components 数组。在每个 Component 中存放着不同的 ByteBuf,各个 ByteBuf 独立维护自己的读写索引,而 CompositeByteBuf 自身也会单独维护一个读写索引。由此可见,Component 是实现 CompositeByteBuf 的关键所在,下面看下 Component 结构定义:

private static final class Component {final ByteBuf srcBuf; // 原始的 ByteBuffinal ByteBuf buf; // srcBuf 去除包装之后的 ByteBufint srcAdjustment; // CompositeByteBuf 的起始索引相对于 srcBuf 读索引的偏移int adjustment; // CompositeByteBuf 的起始索引相对于 buf 的读索引的偏移int offset; // Component 相对于 CompositeByteBuf 的起始索引位置int endOffset; // Component 相对于 CompositeByteBuf 的结束索引位置// 省略其他代码
}

为了方便理解上述 Component 中的属性含义,我同样以 HTTP 协议中 header 和 body 为示例,通过一张图来描述 CompositeByteBuf 组合后其中 Component 的布局情况,如下所示:

Drawing 4.png

从图中可以看出,header 和 body 分别对应两个 ByteBuf,假设 ByteBuf 的内容分别为 “header” 和 “body”,那么 header ByteBuf 中 offset~endOffset 为 0~6,body ByteBuf 对应的 offset~endOffset 为 0~10。由此可见,Component 中的 offset 和 endOffset 可以表示当前 ByteBuf 可以读取的范围,通过 offset 和 endOffset 可以将每一个 Component 所对应的 ByteBuf 连接起来,形成一个逻辑整体。

此外 Component 中 srcAdjustment 和 adjustment 表示 CompositeByteBuf 起始索引相对于 ByteBuf 读索引的偏移。初始 adjustment = readIndex - offset,这样通过 CompositeByteBuf 的起始索引就可以直接定位到 Component 中 ByteBuf 的读索引位置。当 header ByteBuf 读取 1 个字节,body ByteBuf 读取 2 个字节,此时每个 Component 的属性又会发生什么变化呢?如下图所示。

Drawing 5.png

至此,CompositeByteBuf 的基本原理我们已经介绍完了,关于具体 CompositeByteBuf 数据操作的细节在这里就不做展开了,有兴趣的同学可以自己深入研究 CompositeByteBuf 的源码。

Unpooled.wrappedBuffer 操作

介绍完 CompositeByteBuf 之后,再来理解 Unpooled.wrappedBuffer 操作就非常容易了,Unpooled.wrappedBuffer 同时也是创建 CompositeByteBuf 对象的另一种推荐做法。

Unpooled 提供了一系列用于包装数据源的 wrappedBuffer 方法,如下所示:

Drawing 6.png

Unpooled.wrappedBuffer 方法可以将不同的数据源的一个或者多个数据包装成一个大的 ByteBuf 对象,其中数据源的类型包括 byte[]、ByteBuf、ByteBuffer。包装的过程中不会发生数据拷贝操作,包装后生成的 ByteBuf 对象和原始 ByteBuf 对象是共享底层的 byte 数组。

ByteBuf.slice 操作

ByteBuf.slice 和 Unpooled.wrappedBuffer 的逻辑正好相反,ByteBuf.slice 是将一个 ByteBuf 对象切分成多个共享同一个底层存储的 ByteBuf 对象。

ByteBuf 提供了两个 slice 切分方法:

public ByteBuf slice();
public ByteBuf slice(int index, int length);

假设我们已经有一份完整的 HTTP 数据,可以通过 slice 方法切分获得 header 和 body 两个 ByteBuf 对象,对应的内容分别为 “header” 和 “body”,实现方式如下:

ByteBuf httpBuf = ...
ByteBuf header = httpBuf.slice(0, 6);
ByteBuf body = httpBuf.slice(6, 4);

通过 slice 切分后都会返回一个新的 ByteBuf 对象,而且新的对象有自己独立的 readerIndex、writerIndex 索引,如下图所示。由于新的 ByteBuf 对象与原始的 ByteBuf 对象数据是共享的,所以通过新的 ByteBuf 对象进行数据操作也会对原始 ByteBuf 对象生效。

图片8.png

文件传输 FileRegion

在 Netty 源码的 example 包中,提供了 FileRegion 的使用示例,以下代码片段摘自 FileServerHandler.java。

@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {RandomAccessFile raf = null;long length = -1;try {raf = new RandomAccessFile(msg, "r");length = raf.length();} catch (Exception e) {ctx.writeAndFlush("ERR: " + e.getClass().getSimpleName() + ": " + e.getMessage() + '\n');return;} finally {if (length < 0 && raf != null) {raf.close();}}ctx.write("OK: " + raf.length() + '\n');if (ctx.pipeline().get(SslHandler.class) == null) {// SSL not enabled - can use zero-copy file transfer.ctx.write(new DefaultFileRegion(raf.getChannel(), 0, length));} else {// SSL enabled - cannot use zero-copy file transfer.ctx.write(new ChunkedFile(raf));}ctx.writeAndFlush("\n");
}

从 FileRegion 的使用示例可以看出,Netty 使用 FileRegion 实现文件传输的零拷贝。FileRegion 的默认实现类是 DefaultFileRegion,通过 DefaultFileRegion 将文件内容写入到 NioSocketChannel。那么 FileRegion 是如何实现零拷贝的呢?我们通过源码看看 FileRegion 到底使用了什么黑科技。

public class DefaultFileRegion extends AbstractReferenceCounted implements FileRegion {private final File f; // 传输的文件private final long position; // 文件的起始位置private final long count; // 传输的字节数private long transferred; // 已经写入的字节数private FileChannel file; // 文件对应的 FileChannel    @Overridepublic long transferTo(WritableByteChannel target, long position) throws IOException {long count = this.count - position;if (count < 0 || position < 0) {throw new IllegalArgumentException("position out of range: " + position +" (expected: 0 - " + (this.count - 1) + ')');}if (count == 0) {return 0L;}if (refCnt() == 0) {throw new IllegalReferenceCountException(0);}open();long written = file.transferTo(this.position + position, count, target);if (written > 0) {transferred += written;} else if (written == 0) {validate(this, position);}return written;}    // 省略其他代码
}

从源码可以看出,FileRegion 其实就是对 FileChannel 的包装,并没有什么特殊操作,底层使用的是 JDK NIO 中的 FileChannel#transferTo() 方法实现文件传输,所以 FileRegion 是操作系统级别的零拷贝,对于传输大文件会很有帮助。

到此为止,Netty 相关的零拷贝技术都已经介绍完了,可以看出 Netty 对于 ByteBuf 做了更多精进的设计和优化。

总结

零拷贝是网络编程中一种常用的技术,可以用于优化网络数据传输的性能。本文介绍了操作系统 Linux 和 Netty 中的零拷贝技术,Netty 除了支持操作系统级别的零拷贝,更多提供了面向用户态的零拷贝特性,主要体现在 5 个方面:堆外内存、CompositeByteBuf、Unpooled.wrappedBuffer、ByteBuf.slice 以及 FileRegion。以操作系统的角度来看,零拷贝是一个广义的概念,可以认为只要能够减少不必要的 CPU 拷贝,都可以理解为是零拷贝。

最后,留一个思考题,使用具备零拷贝特性的 transfer() 方法拷贝文件,一定会比传统 I/O 的方式更高效吗?


17 源码篇:从 Linux 出发深入剖析服务端启动流程

通过前几章课程的学习,我们已经对 Netty 的技术思想和基本原理有了初步的认识,从今天这节课开始我们将正式进入 Netty 核心源码学习的课程。希望能够通过源码解析的方式让你更加深入理解 Netty 的精髓,如 Netty 的设计思想、工程技巧等,为之后继续深入研究 Netty 打下坚实的基础。

在课程开始之前,我想分享一下关于源码学习的几点经验和建议。第一,很多同学在开始学习源码时面临的第一个问题就是不知道从何下手,这个时候一定不能对着源码毫无意义地四处翻看。建议你可以通过 Hello World 或者 TestCase 作为源码学习的入口,然后再通过 Debug 断点的方式调试并跑通源码。第二,阅读源码一定要有全局观。首先要把握源码的主流程,避免刚开始陷入代码细节的死胡同。第三,源码一定要反复阅读,让自己每一次读都有不同的收获。我们可以通过画图、注释的方式帮助自己更容易理解源码的核心流程,方便后续的复习和回顾。

作为源码解析的第一节课,我们将深入分析 Netty 服务端的启动流程。启动服务的过程中我们可以了解到 Netty 各大核心组件的关系,这将是学习 Netty 源码一个非常好的切入点,让我们一起看看 Netty 的每个零件是如何运转起来的吧。

说明:本文参考的 Netty 源码版本为 4.1.42.Final。

从 Echo 服务器示例入手

在《引导器作用:客户端和服务端启动都要做些什么?》的课程中,我们介绍了如何使用引导器搭建服务端的基本框架。在这里我们实现了一个最简单的 Echo 服务器,用于调试 Netty 服务端启动的源码。

public class EchoServer {public void startEchoServer(int port) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)) // 设置ServerSocketChannel 对应的 Handler.childHandler(new ChannelInitializer<SocketChannel>() { // 设置 SocketChannel 对应的 Handler@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new FixedLengthFrameDecoder(10));ch.pipeline().addLast(new ResponseSampleEncoder());ch.pipeline().addLast(new RequestSampleHandler());}});ChannelFuture f = b.bind(port).sync();f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

我们以引导器 ServerBootstrap 为切入点,开始深入分析 Netty 服务端的启动流程。在服务端启动之前,需要配置 ServerBootstrap 的相关参数,这一步大致可以分为以下几个步骤:

  • 配置 EventLoopGroup 线程组;
  • 配置 Channel 的类型;
  • 设置 ServerSocketChannel 对应的 Handler;
  • 设置网络监听的端口;
  • 设置 SocketChannel 对应的 Handler;
  • 配置 Channel 参数。

配置 ServerBootstrap 参数的过程非常简单,把参数值保存在 ServerBootstrap 定义的成员变量里就可以了。我们可以看下 ServerBootstrap 的成员变量定义,基本与 ServerBootstrap 暴露出来的配置方法是一一对应的。如下所示,我以注释的形式说明每个成员变量对应的调用方法。

volatile EventLoopGroup group; // group()
volatile EventLoopGroup childGroup; // group()
volatile ChannelFactory<? extends C> channelFactory; // channel()
volatile SocketAddress localAddress; // localAddress
Map<ChannelOption<?>, Object> childOptions = new ConcurrentHashMap<ChannelOption<?>, Object>(); // childOption()
volatile ChannelHandler childHandler; // childHandler()
ServerBootstrapConfig config = new ServerBootstrapConfig(this);

关于 ServerBootstrap 如何为每个成员变量保存参数的过程,我们就不一一展开了,你可以理解为这部分工作只是一个前置准备,课后你可以自己跟进下每个方法的源码。今天我们核心聚焦在 b.bind().sync() 这行代码,bind() 才是真正进行服务器端口绑定和启动的入口,sync() 表示阻塞等待服务器启动完成。接下来我们对 bind() 方法进行展开分析。

在开始源码分析之前,我们带着以下几个问题边看边思考:

  • Netty 自己实现的 Channel 与 JDK 底层的 Channel 是如何产生联系的?
  • ChannelInitializer 这个特殊的 Handler 处理器的作用是什么?
  • Pipeline 初始化的过程是什么样的?

服务端启动全过程

首先我们来看下 ServerBootstrap 中 bind() 方法的源码实现:

public ChannelFuture bind() {validate();SocketAddress localAddress = this.localAddress;if (localAddress == null) {throw new IllegalStateException("localAddress not set");}return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {promise.setFailure(cause);} else {promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}

由此可见,doBind() 方法是我们需要分析的重点。我们再一起看下 doBind() 具体做了哪些事情:

  1. 调用 initAndRegister() 初始化并注册 Channel,同时返回一个 ChannelFuture 实例 regFuture,所以我们可以猜测出 initAndRegister() 是一个异步的过程。
  2. 接下来通过 regFuture.cause() 方法判断 initAndRegister() 的过程是否发生异常,如果发生异常则直接返回。
  3. regFuture.isDone() 表示 initAndRegister() 是否执行完毕,如果执行完毕则调用 doBind0() 进行 Socket 绑定。如果 initAndRegister() 还没有执行结束,regFuture 会添加一个 ChannelFutureListener 回调监听,当 initAndRegister() 执行结束后会调用 operationComplete(),同样通过 doBind0() 进行端口绑定。

doBind() 整个实现结构非常清晰,其中 initAndRegister() 负责 Channel 初始化和注册,doBind0() 用于端口绑定。这两个过程最为重要,下面我们分别进行详细的介绍。

服务端 Channel 初始化及注册

initAndRegister() 方法顾名思义,主要负责初始化和注册的相关工作,我们具体看下它的源码实现:

final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel(); // 创建 Channelinit(channel); // 初始化 Channel} catch (Throwable t) {if (channel != null) {channel.unsafe().closeForcibly();return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}ChannelFuture regFuture = config().group().register(channel); // 注册 Channelif (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}return regFuture;
}

initAndRegister() 可以分为三步:创建 Channel、初始化 Channel 和注册 Channel,接下来我们一步步进行拆解分析。

创建服务端 Channel

首先看下创建 Channel 的过程,直接跟进 channelFactory.newChannel() 的源码。

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {private final Constructor<? extends T> constructor;public ReflectiveChannelFactory(Class<? extends T> clazz) {ObjectUtil.checkNotNull(clazz, "clazz");try {this.constructor = clazz.getConstructor();} catch (NoSuchMethodException e) {throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +" does not have a public non-arg constructor", e);}}@Overridepublic T newChannel() {try {return constructor.newInstance(); // 反射创建对象} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);}}// 省略其他代码
}

在前面 Echo 服务器的示例中,我们通过 channel(NioServerSocketChannel.class) 配置 Channel 的类型,工厂类 ReflectiveChannelFactory 是在该过程中被创建的。从 constructor.newInstance() 我们可以看出,ReflectiveChannelFactory 通过反射创建出 NioServerSocketChannel 对象,所以我们重点需要关注 NioServerSocketChannel 的构造函数。

public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER)); 
}
public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT); // 调用父类方法config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {try {return provider.openServerSocketChannel(); // 创建 JDK 底层的 ServerSocketChannel} catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);}
}

SelectorProvider 是 JDK NIO 中的抽象类实现,通过 openServerSocketChannel() 方法可以用于创建服务端的 ServerSocketChannel。而且 SelectorProvider 会根据操作系统类型和版本的不同,返回不同的实现类,具体可以参考 DefaultSelectorProvider 的源码实现:

public static SelectorProvider create() {String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));if (osname.equals("SunOS"))return createProvider("sun.nio.ch.DevPollSelectorProvider");if (osname.equals("Linux"))return createProvider("sun.nio.ch.EPollSelectorProvider");return new sun.nio.ch.PollSelectorProvider();
}

在这里我们只讨论 Linux 操作系统的场景,在 Linux 内核 2.6版本及以上都会默认采用 EPollSelectorProvider。如果是旧版本则使用 PollSelectorProvider。对于目前的主流 Linux 平台而言,都是采用 Epoll 机制实现的。

创建完 ServerSocketChannel,我们回到 NioServerSocketChannel 的构造函数,接着它会通过 super() 依次调用到父类的构造进行初始化工作,最终我们可以定位到 AbstractNioChannel 和 AbstractChannel 的构造函数:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);// 省略其他代码try {ch.configureBlocking(false);} catch (IOException e) {// 省略其他代码}
}
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId(); // Channel 全局唯一 id unsafe = newUnsafe(); // unsafe 操作底层读写pipeline = newChannelPipeline(); // pipeline 负责业务处理器编排
}

首先调用 AbstractChannel 的构造函数创建了三个重要的成员变量,分别为 id、unsafe、pipeline。id 表示全局唯一的 Channel,unsafe 用于操作底层数据的读写操作,pipeline 负责业务处理器的编排。初始化状态,pipeline 的内部结构只包含头尾两个节点,如下图所示。三个核心成员变量创建好之后,会回到 AbstractNioChannel 的构造函数,通过 ch.configureBlocking(false) 设置 Channel 是非阻塞模式。

netty17图.png

创建服务端 Channel 的过程我们已经讲完了,简单总结下其中几个重要的步骤:

  1. ReflectiveChannelFactory 通过反射创建 NioServerSocketChannel 实例;
  2. 创建 JDK 底层的 ServerSocketChannel;
  3. 为 Channel 创建 id、unsafe、pipeline 三个重要的成员变量;
  4. 设置 Channel 为非阻塞模式。
初始化服务端 Channel

回到 ServerBootstrap 的 initAndRegister() 方法,继续跟进用于初始化服务端 Channel 的 init() 方法源码:

void init(Channel channel) {setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger); // 设置 Socket 参数setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0))); // 保存用户自定义属性ChannelPipeline p = channel.pipeline();// 获取 ServerBootstrapAcceptor 的构造参数final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions =childOptions.entrySet().toArray(newOptionArray(0));final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));// 添加特殊的 Handler 处理器p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}

init() 方法的源码比较长,我们依然拆解成两个部分来看:

第一步,设置 Socket 参数以及用户自定义属性。在创建服务端 Channel 时,Channel 的配置参数保存在 NioServerSocketChannelConfig 中,在初始化 Channel 的过程中,Netty 会将这些参数设置到 JDK 底层的 Socket 上,并把用户自定义的属性绑定在 Channel 上。

第二步,添加特殊的 Handler 处理器。首先 ServerBootstrap 为 Pipeline 添加了一个 ChannelInitializer,ChannelInitializer 是实现了 ChannelHandler 接口的匿名类,其中 ChannelInitializer 实现的 initChannel() 方法用于添加 ServerSocketChannel 对应的 Handler。然后 Netty 通过异步 task 的方式又向 Pipeline 一个处理器 ServerBootstrapAcceptor,从 ServerBootstrapAcceptor 的命名可以看出,这是一个连接接入器,专门用于接收新的连接,然后把事件分发给 EventLoop 执行,在这里我们先不做展开。此时服务端的 pipeline 内部结构又发生了变化,如下图所示。

图片1.png

思考一个问题,为什么需要 ChannelInitializer 处理器呢?ServerBootstrapAcceptor 的注册过程为什么又需要封装成异步 task 呢?因为我们在初始化时,还没有将 Channel 注册到 Selector 对象上,所以还无法注册 Accept 事件到 Selector 上,所以事先添加了 ChannelInitializer 处理器,等待 Channel 注册完成后,再向 Pipeline 中添加 ServerBootstrapAcceptor 处理器。

服务端 Channel 初始化的过程已经结束了。整体流程比较简单,主要是设置 Socket 参数以及用户自定义属性,并向 Pipeline 中添加了两个特殊的处理器。接下来我们继续分析,如何将初始化好的 Channel 注册到 Selector 对象上?

注册服务端 Channel

回到 initAndRegister() 的主流程,创建完服务端 Channel 之后,继续一层层跟进 register() 方法的源码:

// MultithreadEventLoopGroup#register
public ChannelFuture register(Channel channel) {return next().register(channel); // 选择一个 eventLoop 注册
}
// AbstractChannel#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 省略其他代码AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) { // Reactor 线程内部调用register0(promise);} else { // 外部线程调用try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 省略其他代码}}
}

Netty 会在线程池 EventLoopGroup 中选择一个 EventLoop 与当前 Channel 进行绑定,之后 Channel 生命周期内的所有 I/O 事件都由这个 EventLoop 负责处理,如 accept、connect、read、write 等 I/O 事件。可以看出,不管是 EventLoop 线程本身调用,还是外部线程用,最终都会通过 register0() 方法进行注册:

private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister(); // 调用 JDK 底层的 register() 进行注册neverRegistered = false;registered = true;pipeline.invokeHandlerAddedIfNeeded(); // 触发 handlerAdded 事件safeSetSuccess(promise);pipeline.fireChannelRegistered(); // 触发 channelRegistered 事件// 此时 Channel 还未注册绑定地址,所以处于非活跃状态if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive(); // Channel 当前状态为活跃时,触发 channelActive 事件} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {// 省略其他代码}
}

register0() 主要做了四件事:调用 JDK 底层进行 Channel 注册、触发 handlerAdded 事件、触发 channelRegistered 事件、Channel 当前状态为活跃时,触发 channelActive 事件。我们对它们逐一进行分析。

首先看下 JDK 底层注册 Channel 的过程,对应 doRegister() 方法的实现逻辑。

protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); // 调用 JDK 底层的 register() 进行注册return;} catch (CancelledKeyException e) {// 省略其他代码}}
}
public final SelectionKey register(Selector sel, int ops,Object att)throws ClosedChannelException
{synchronized (regLock) {// 省略其他代码SelectionKey k = findKey(sel);if (k != null) {k.interestOps(ops);k.attach(att);}if (k == null) {synchronized (keyLock) {if (!isOpen())throw new ClosedChannelException();k = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;}
}

javaChannel().register() 负责调用 JDK 底层,将 Channel 注册到 Selector 上,register() 的第三个入参传入的是 Netty 自己实现的 Channel 对象,调用 register() 方法会将它绑定在 JDK 底层 Channel 的 attachment 上。这样在每次 Selector 对象进行事件循环时,Netty 都可以从返回的 JDK 底层 Channel 中获得自己的 Channel 对象。

完成 Channel 向 Selector 注册后,接下来就会触发 Pipeline 一系列的事件传播。在事件传播之前,用户自定义的业务处理器是如何被添加到 Pipeline 中的呢?答案就在pipeline.invokeHandlerAddedIfNeeded() 当中,我们重点看下 handlerAdded 事件的处理过程。invokeHandlerAddedIfNeeded() 方法的调用层次比较深,推荐你结合上述 Echo 服务端示例,使用 IDE Debug 的方式跟踪调用栈,如下图所示。

图片2.png

我们首先抓住 ChannelInitializer 中的核心源码,逐层进行分析。

// ChannelInitializer
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {if (initChannel(ctx)) {removeState(ctx);}}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) {try {initChannel((C) ctx.channel()); // 调用 ChannelInitializer 实现的 initChannel() 方法} catch (Throwable cause) {exceptionCaught(ctx, cause);} finally {ChannelPipeline pipeline = ctx.pipeline();if (pipeline.context(this) != null) {pipeline.remove(this); // 将 ChannelInitializer 自身从 Pipeline 中移出}}return true;}return false;
}

可以看出 ChannelInitializer 首先会调用 initChannel() 抽象方法,然后 Netty 会把 ChannelInitializer 自身从 Pipeline 移出。其中 initChannel() 抽象方法是在哪里实现的呢?这就要跟踪到 ServerBootstrap 之前的 init() 方法,其中有这么一段代码:

p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}
});

在前面我们已经分析了 initChannel() 方法的实现逻辑,首先向 Pipeline 中添加 ServerSocketChannel 对应的 Handler,然后通过异步 task 的方式向 Pipeline 添加 ServerBootstrapAcceptor 处理器。其中有一个点不要混淆,handler() 方法是添加到服务端的Pipeline 上,而 childHandler() 方法是添加到客户端的 Pipeline 上。所以对应 Echo 服务器示例中,此时被添加的是 LoggingHandler 处理器。

因为添加 ServerBootstrapAcceptor 是一个异步过程,需要 EventLoop 线程负责执行。而当前 EventLoop 线程正在执行 register0() 的注册流程,所以等到 register0() 执行完之后才能被添加到 Pipeline 当中。完成 initChannel() 这一步之后,ServerBootstrapAcceptor 并没有被添加到 Pipeline 中,此时 Pipeline 的内部结构变化如下图所示。
图片3.png

我们回到 register0() 的主流程,接着向下分析。channelRegistered 事件是由 fireChannelRegistered() 方法触发,沿着 Pipeline 的 Head 节点传播到 Tail 节点,并依次调用每个 ChannelHandler 的 channelRegistered() 方法。然而此时 Channel 还未注册绑定地址,所以处于非活跃状态,所以并不会触发 channelActive 事件。

执行完整个 register0() 的注册流程之后,EventLoop 线程会将 ServerBootstrapAcceptor 添加到 Pipeline 当中,此时 Pipeline 的内部结构又发生了变化,如下图所示。
图片4.png

整个服务端 Channel 注册的流程我们已经讲完,注册过程中 Pipeline 结构的变化值得你再反复梳理,从而加深理解。目前服务端还是不能工作的,还差最后一步就是进行端口绑定,我们继续向下分析。

端口绑定

回到 ServerBootstrap 的 bind() 方法,我们继续跟进端口绑定 doBind0() 的源码。

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {assertEventLoop();// 省略其他代码boolean wasActive = isActive();try {doBind(localAddress); // 调用 JDK 底层进行端口绑定} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive(); // 触发 channelActive 事件}});}safeSetSuccess(promise);
}

bind() 方法主要做了两件事,分别为调用 JDK 底层进行端口绑定;绑定成功后并触发 channelActive 事件。下面我们逐一进行分析。

首先看下调用 JDK 底层进行端口绑定的 doBind() 方法:

protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}

Netty 会根据 JDK 版本的不同,分别调用 JDK 底层不同的 bind() 方法。我使用的是 JDK8,所以会调用 JDK 原生 Channel 的 bind() 方法。执行完 doBind() 之后,服务端 JDK 原生的 Channel 真正已经完成端口绑定了。

完成端口绑定之后,Channel 处于活跃 Active 状态,然后会调用 pipeline.fireChannelActive() 方法触发 channelActive 事件。我们可以一层层跟进 fireChannelActive() 方法,发现其中比较重要的部分:

// DefaultChannelPipeline#channelActive
public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();readIfIsAutoRead();
}
// AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp); // 注册 OP_ACCEPT 事件到服务端 Channel 的事件集合}
}

可以看出,在执行完 channelActive 事件传播之后,会调用 readIfIsAutoRead() 方法触发 Channel 的 read 事件,而它最终调用到 AbstractNioChannel 中的 doBeginRead() 方法,其中 readInterestOp 参数就是在前面初始化 Channel 所传入的 SelectionKey.OP_ACCEPT 事件,所以 OP_ACCEPT 事件会被注册到 Channel 的事件集合中。

到此为止,整个服务端已经真正启动完毕。我们总结一下服务端启动的全流程,如下图所示。
图片5.png

  • 创建服务端 Channel:本质是创建 JDK 底层原生的 Channel,并初始化几个重要的属性,包括 id、unsafe、pipeline 等。
  • 初始化服务端 Channel:设置 Socket 参数以及用户自定义属性,并添加两个特殊的处理器 ChannelInitializer 和 ServerBootstrapAcceptor。
  • 注册服务端 Channel:调用 JDK 底层将 Channel 注册到 Selector 上。
  • 端口绑定:调用 JDK 底层进行端口绑定,并触发 channelActive 事件,把 OP_ACCEPT 事件注册到 Channel 的事件集合中。

加餐:服务端如何处理客户端新建连接

Netty 服务端完全启动后,就可以对外工作了。接下来 Netty 服务端是如何处理客户端新建连接的呢?主要分为四步:

  1. Boss NioEventLoop 线程轮询客户端新连接 OP_ACCEPT 事件;
  2. 构造 Netty 客户端 NioSocketChannel;
  3. 注册 Netty 客户端 NioSocketChannel 到 Worker 工作线程中;
  4. 注册 OP_READ 事件到 NioSocketChannel 的事件集合。

下面我们对每个步骤逐一进行简单的介绍。

Netty 中 Boss NioEventLoop 专门负责接收新的连接,关于 NioEventLoop 的核心源码我们下节课会着重介绍,在这里我们只先了解基本的处理流程。当客户端有新连接接入服务端时,Boss NioEventLoop 会监听到 OP_ACCEPT 事件,源码如下所示:

// NioEventLoop#processSelectedKey
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();
}

NioServerSocketChannel 所持有的 unsafe 是 NioMessageUnsafe 类型,我们看下 NioMessageUnsafe.read() 方法中做了什么事。

public void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {            do {int localRead = doReadMessages(readBuf);  // while 循环不断读取 Buffer 中的数据if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i)); // 传播读取事件}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete(); // 传播读取完毕事件// 省略其他代码} finally {if (!readPending && !config.isAutoRead()) {removeReadOp();}}
}

可以看出 read() 方法的核心逻辑就是通过 while 循环不断读取数据,然后放入 List 中,这里的数据其实就是新连接。需要重点跟进一下 NioServerSocketChannel 的 doReadMessages() 方法。

protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;
}

这时就开始执行第二个步骤:构造 Netty 客户端 NioSocketChannel。Netty 先通过 JDK 底层的 accept() 获取 JDK 原生的 SocketChannel,然后将它封装成 Netty 自己的 NioSocketChannel。新建 Netty 的客户端 Channel 的实现原理与上文中我们讲到的创建服务端 Channel 的过程是类似的,只是服务端 Channel 的类型是 NioServerSocketChannel,而客户端 Channel 的类型是 NioSocketChannel。NioSocketChannel 的创建同样会完成几件事:创建核心成员变量 id、unsafe、pipeline;注册 SelectionKey.OP_READ 事件;设置 Channel 的为非阻塞模式;新建客户端 Channel 的配置。

成功构造客户端 NioSocketChannel 后,接下来会通过 pipeline.fireChannelRead() 触发 channelRead 事件传播。对于服务端来说,此时 Pipeline 的内部结构如下图所示。
图片6.png

上文中我们提到了一种特殊的处理器 ServerBootstrapAcceptor,在这里它就发挥了重要的作用。channelRead 事件会传播到 ServerBootstrapAcceptor.channelRead() 方法,channelRead() 会将客户端 Channel 分配到工作线程组中去执行。具体实现如下:

public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;// 在客户端 Channel 中添加 childHandler,childHandler 是用户在启动类中通过 childHandler() 方法指定的child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {// 注册客户端 ChannelchildGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}
}

ServerBootstrapAcceptor 开始就把 msg 强制转换为 Channel。难道不会有其他类型的数据吗?因为 ServerBootstrapAcceptor 是服务端 Channel 中一个特殊的处理器,而服务端 Channel 的 channelRead 事件只会在新连接接入时触发,所以这里拿到的数据都是客户端新连接。

ServerBootstrapAcceptor 通过 childGroup.register() 方法会完成第三和第四两个步骤,将 NioSocketChannel 注册到 Worker 工作线程中,并注册 OP_READ 事件到 NioSocketChannel 的事件集合。在注册过程中比较有意思的一点是,它会调用 pipeline.fireChannelRegistered() 方法传播 channelRegistered 事件,然后再调用 pipeline.fireChannelActive() 方法传播 channelActive 事件。兜了一圈,这又会回到之前我们介绍的 readIfIsAutoRead() 方法,此时它会将 SelectionKey.OP_READ 事件注册到 Channel 的事件集合。

关于服务端如何处理客户端新建连接的具体源码,我在此就不继续展开了。这里留一个小任务,建议你亲自动手分析下 childGroup.register() 的相关源码,从而加深对服务端启动以及新连接处理流程的理解。有了服务端启动源码分析的基础,再去理解客户端新建连接的过程会相对容易很多。

总结

本节课我们深入分析了 Netty 服务端启动的全流程,对其中涉及的核心组件有了基本的认识。Netty 服务端启动的相关源码层次比较深,推荐大家在读源码的时候,可以先把主体流程梳理清楚,开始时先不用纠结具体的方法是用来做什么,自顶而下先画出完整的调用链路图(如下图所示),然后再逐一击破。
图片7.png

下节课,我们将学习 Netty 最核心的 Reactor 线程模型的源码,推荐你把两节课放在一起再进行复习,可以解答你目前不少的疑问,如异步 task 是如何封装并执行的?事件注册之后是如何被处理的?


18 源码篇:解密 Netty Reactor 线程模型

通过第一章 Netty 基础课程的学习,我们知道 Reactor 线程模型是 Netty 实现高性能的核心所在,在 Netty 中 EventLoop 是 Reactor 线程模型的核心处理引擎,那么 EventLoop 到底是如何实现的呢?又是如何保证高性能和线程安全性的呢?今天这节课让我们一起一探究竟。

说明:本文参考的 Netty 源码版本为 4.1.42.Final。

Reactor 线程执行的主流程

在《事件调度层:为什么 EventLoop 是 Netty 的精髓》的课程中,我们介绍了 EventLoop 的概貌,因为 Netty 是基于 NIO 实现的,所以推荐使用 NioEventLoop 实现,我们再次通过 NioEventLoop 的核心入口 run() 方法回顾 Netty Reactor 线程模型执行的主流程,并以此为基础继续深入研究 NioEventLoop 的逻辑细节。

protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:case SelectStrategy.SELECT:select(wakenUp.getAndSet(false)); // 轮询 I/O 事件if (wakenUp.get()) {selector.wakeup();}default:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys(); // 处理 I/O 事件} finally {runAllTasks(); // 处理所有任务}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys(); // 处理 I/O 事件} finally {final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 处理完 I/O 事件,再处理异步任务队列}}} catch (Throwable t) {handleLoopException(t);}try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
}

NioEventLoop 的 run() 方法是一个无限循环,没有任何退出条件,在不间断循环执行以下三件事情,可以用下面这张图形象地表示。

Lark20201216-164824.png

  • 轮询 I/O 事件(select):轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件。
  • 处理 I/O 事件(processSelectedKeys):处理已经准备就绪的 I/O 事件。
  • 处理异步任务队列(runAllTasks):Reactor 线程还有一个非常重要的职责,就是处理任务队列中的非 I/O 任务。Netty 提供了 ioRatio 参数用于调整 I/O 事件处理和任务处理的时间比例。

下面我们对 NioEventLoop 的三个步骤进行详细的介绍。

轮询 I/O 事件

我们首先聚焦在轮询 I/O 事件的关键代码片段:

case SelectStrategy.CONTINUE:continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}

NioEventLoop 通过核心方法 select() 不断轮询注册的 I/O 事件。当没有 I/O 事件产生时,为了避免 NioEventLoop 线程一直循环空转,在获取 I/O 事件或者异步任务时需要阻塞线程,等待 I/O 事件就绪或者异步任务产生后才唤醒线程。NioEventLoop 使用 wakeUp 变量表示是否唤醒 selector,Netty 在每一次执行新的一轮循环之前,都会将 wakeUp 设置为 false。

Netty 提供了选择策略 SelectStrategy 对象,它用于控制 select 循环行为,包含 CONTINUE、SELECT、BUSY_WAIT 三种策略,因为 NIO 并不支持 BUSY_WAIT,所以 BUSY_WAIT 与 SELECT 的执行逻辑是一样的。在 I/O 事件循环的过程中 Netty 选择使用何种策略,具体的判断依据如下:

// DefaultSelectStrategy#calculateStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
// NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {@Overridepublic int get() throws Exception {return selectNow();}
}
// NioEventLoop#selectNow
int selectNow() throws IOException {try {return selector.selectNow();} finally {if (wakenUp.get()) {selector.wakeup();}}
}

如果当前 NioEventLoop 线程存在异步任务,会通过 selectSupplier.get() 最终调用到 selectNow() 方法,selectNow() 是非阻塞,执行后立即返回。如果存在就绪的 I/O 事件,那么会走到 default 分支后直接跳出,然后执行 I/O 事件处理 processSelectedKeys 和异步任务队列处理 runAllTasks 的逻辑。所以在存在异步任务的场景,NioEventLoop 会优先保证 CPU 能够及时处理异步任务。

当 NioEventLoop 线程的不存在异步任务,即任务队列为空,返回的是 SELECT 策略, 就会调用 select(boolean oldWakenUp) 方法,接下来我们看看 select() 内部是如何实现的:

private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // 计算 select 阻塞操作的最后截止时间long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();if (nextWakeupTime != normalizedDeadlineNanos) {nextWakeupTime = normalizedDeadlineNanos;}for (;;) {// ------ 1. 检测 select 阻塞操作是否超过截止时间 ------long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// ------ 2. 轮询过程中如果有任务产生,中断本次轮询if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}// ------ 3. select 阻塞等待获取 I/O 事件 ------int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}if (Thread.interrupted()) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}// ------ 4. 解决臭名昭著的 JDK epoll 空轮询 Bug ------long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {selector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}}
}

Netty 为了解决臭名昭著的 JDK epoll 空轮询 Bug,造成整个 select() 方法是相对比较复杂的,我把它划分成四个部分逐一拆解来看。

第一步,检测 select 阻塞操作是否超过截止时间。 在进入无限循环之前,Netty 首先记录了当前时间 currentTimeNanos 以及定时任务队列中最近待执行任务的执行时间 selectDeadLineNanos,Netty 中定时任务队列是按照延迟时间从小到大进行排列的,通过调用 delayNanos(currentTimeNanos) 方法可以获得第一个待执行定时任务的延迟时间。然后代码会进入无限循环。首先判断 currentTimeNanos 是否超过 selectDeadLineNanos 0.5ms 以上,如果超过说明当前任务队列中有定时任务需要立刻执行,所以此时会退出无限循环。退出之前如果从未执行过 select 操作,那么会立即一次非阻塞的 selectNow 操作。那么这里有一个疑问,为什么会留出 0.5ms 的时间窗口呢?在任务队列为空的情况下,可能 select 操作没有获得到任何 I/O 事件就立即停止阻塞返回。

其中有一点容易混淆,Netty 的任务队列包括普通任务、定时任务以及尾部任务,hasTask() 判断的是普通任务队列和尾部队列是否为空,而 delayNanos(currentTimeNanos) 方法获取的是定时任务的延迟时间。

第二步,轮询过程中及时处理产生的任务。 Netty 为了保证任务能够及时执行,会立即一次非阻塞的 selectNow 操作后,立即跳出循环回到事件循环的主流程,确保接下来能够优先执行 runAllTasks。

第三步,select 阻塞等待获取 I/O 事件。 执行 select 阻塞操作,说明任务队列已经为空,而且第一个待执行定时任务还没有到达任务执行的截止时间,需要阻塞等待 timeoutMillis 的超时时间。假设一种极端情况,如果定时任务的截止时间非常久,那么 select 操作岂不是会一直阻塞造成 Netty 无法工作?所以 Netty 在外部线程添加任务的时候,可以唤醒 select 阻塞操作,具体源码如下:

// SingleThreadEventExecutor#execute
public void execute(Runnable task) {// 省略其他代码if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop); }
}
// NioEventLoop#wakeup
protected void wakeup(boolean inEventLoop) {// 如果是外部线程,设置 wakenUp 为true,则唤醒 select 阻塞操作if (!inEventLoop && wakenUp.compareAndSet(false, true)) {selector.wakeup(); }
}

selector.wakeup() 操作的开销是非常大的,所以 Netty 并不是每次都直接调用,在每次调用之前都会先执行 wakenUp.compareAndSet(false, true),只有设置成功之后才会执行 selector.wakeup() 操作。

第四步,解决臭名昭著的 JDK epoll 空轮询 Bug。 在之前的课程中已经初步介绍了 Netty 的解决方案,在这里结合整体 select 操作我们再做一次回顾。实际上 Netty 并没有从根源上解决该问题,而是巧妙地规避了这个问题。Netty 引入了计数变量 selectCnt,用于记录 select 操作的次数,如果事件轮询时间小于 timeoutMillis,并且在该时间周期内连续发生超过 SELECTOR_AUTO_REBUILD_THRESHOLD(默认512) 次空轮询,说明可能触发了 epoll 空轮询 Bug。Netty 通过重建新的 Selector 对象,将异常的 Selector 中所有的 SelectionKey 会重新注册到新建的 Selector,重建完成之后异常的 Selector 就可以废弃了。

NioEventLoop 轮询 I/O 事件 select 的过程已经讲完了,我们简单总结 select 过程所做的事情。select 操作也是一个无限循环,在事件轮询之前检查任务队列是否为空,确保任务队列中待执行的任务能够及时执行。如果任务队列中已经为空,然后执行 select 阻塞操作获取等待获取 I/O 事件。Netty 通过引入计数器变量,并统计在一定时间窗口内 select 操作的执行次数,识别出可能存在异常的 Selector 对象,然后采用重建 Selector 的方式巧妙地避免了 JDK epoll 空轮询的问题。

处理 I/O 事件

通过 select 过程我们已经获取到准备就绪的 I/O 事件,接下来就需要调用 processSelectedKeys() 方法处理 I/O 事件。在开始处理 I/O 事件之前,Netty 通过 ioRatio 参数控制 I/O 事件处理和任务处理的时间比例,默认为 ioRatio = 50。如果 ioRatio = 100,表示每次都处理完 I/O 事件后,会执行所有的 task。如果 ioRatio < 100,也会优先处理完 I/O 事件,再处理异步任务队列。所以不论如何 processSelectedKeys() 都是先执行的,接下来跟进下 processSelectedKeys() 的源码:

private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}
}

处理 I/O 事件时有两种选择,一种是处理 Netty 优化过的 selectedKeys,另外一种是正常的处理逻辑。根据是否设置了 selectedKeys 来判断使用哪种策略,这两种策略使用的 selectedKeys 集合是不一样的。Netty 优化过的 selectedKeys 是 SelectedSelectionKeySet 类型,而正常逻辑使用的是 JDK HashSet 类型。下面我们逐一介绍两种策略的实现。

1. processSelectedKeysPlain

首先看下正常的处理逻辑 processSelectedKeysPlain 的源码:

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {if (selectedKeys.isEmpty()) {return;}Iterator<SelectionKey> i = selectedKeys.iterator();for (;;) {final SelectionKey k = i.next();final Object a = k.attachment();i.remove();if (a instanceof AbstractNioChannel) {// I/O 事件由 Netty 负责处理processSelectedKey(k, (AbstractNioChannel) a);} else {// 用户自定义任务@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (!i.hasNext()) {break;}if (needsToSelectAgain) {selectAgain();selectedKeys = selector.selectedKeys();if (selectedKeys.isEmpty()) {break;} else {i = selectedKeys.iterator();}}}
}

Netty 会遍历依次处理已经就绪的 SelectionKey,SelectionKey 上面可以挂载 attachment。再根据 attachment 属性可以判断 SelectionKey 的类型,SelectionKey 的类型可能是 AbstractNioChannel 和 NioTask,这两种类型对应的处理方式也是不同的,AbstractNioChannel 类型由 Netty 框架负责处理,NioTask 是用户自定义的 task,一般不会是这种类型。我们着重看下 AbstractNioChannel 的处理场景,跟进 processSelectedKey() 的源码:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) { // 检查 Key 是否合法final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {return;}if (eventLoop != this || eventLoop == null) {return;}unsafe.close(unsafe.voidPromise()); // Key 不合法,直接关闭连接return;}try {int readyOps = k.readyOps();// 处理连接事件if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// 处理可写事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}// 处理可读事件if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}

从上述源码可知,processSelectedKey 一共处理了 OP_CONNECT、OP_WRITE、OP_READ 三个事件,我们分别了解下这三个事件的处理过程。

OP_CONNECT 连接建立事件。表示 TCP 连接建立成功, Channel 处于 Active 状态。处理 OP_CONNECT 事件首先将该事件从事件集合中清除,避免事件集合中一直存在连接建立事件,然后调用 unsafe.finishConnect() 方法通知上层连接已经建立。可以跟进 unsafe.finishConnect() 的源码发现会底层调用的 pipeline().fireChannelActive() 方法,这时会产生一个 Inbound 事件,然后会在 Pipeline 中进行传播,依次调用 ChannelHandler 的 channelActive() 方法,通知各个 ChannelHandler 连接建立成功。

  • OP_WRITE,可写事件。表示上层可以向 Channel 写入数据,通过执行 ch.unsafe().forceFlush() 操作,将数据冲刷到客户端,最终会调用 javaChannel 的 write() 方法执行底层写操作。
  • OP_READ,可读事件。表示 Channel 收到了可以被读取的新数据。Netty 将 READ 和 Accept 事件进行了统一的封装,都通过 unsafe.read() 进行处理。unsafe.read() 的逻辑可以归纳为几个步骤:从 Channel 中读取数据并存储到分配的 ByteBuf;调用 pipeline.fireChannelRead() 方法产生 Inbound 事件,然后依次调用 ChannelHandler 的 channelRead() 方法处理数据;调用 pipeline.fireChannelReadComplete() 方法完成读操作;最终执行 removeReadOp() 清除 OP_READ 事件。

我们再次回到 processSelectedKeysPlain 的主流程,接下来会判断 needsToSelectAgain 决定是否需要重新轮询。如果 needsToSelectAgain == true,会调用 selectAgain() 方法进行重新轮询,该方法会将 needsToSelectAgain 再次置为 false,然后调用 selectorNow() 后立即返回。

我们回顾一下 Reactor 线程的主流程,会发现每次在处理 I/O 事件之前,needsToSelectAgain 都会被设置为 false,那么在什么场景下 needsToSelectAgain 会再次设置为 true 呢?我们通过查找变量的引用,最后定位到 AbstractChannel#doDeregister。该方法的作用是将 Channel 从当前注册的 Selector 对象中移除,方法内部可能会把 needsToSelectAgain 设置为 true,具体源码如下:

protected void doDeregister() throws Exception {eventLoop().cancel(selectionKey());
}
void cancel(SelectionKey key) {key.cancel();cancelledKeys ++;// 当取消的 Key 超过默认阈值 256,needsToSelectAgain 设置为 trueif (cancelledKeys >= CLEANUP_INTERVAL) {cancelledKeys = 0;needsToSelectAgain = true;}
}

当 Netty 在处理 I/O 事件的过程中,如果发现超过默认阈值 256 个 Channel 从 Selector 对象中移除后,会将 needsToSelectAgai 设置为 true,重新做一次轮询操作,从而确保 keySet 的有效性。

2. processSelectedKeysOptimized

介绍完正常的 I/O 事件处理 processSelectedKeysPlain 之后,回过头我们再来分析 Netty 优化的 processSelectedKeysOptimized 就会轻松很多,Netty 是否采用 SelectedSelectionKeySet 类型的优化策略由 DISABLE_KEYSET_OPTIMIZATION 参数决定。那么到底 SelectedSelectionKeySet 是如何进行优化的呢?我们继续跟进下 processSelectedKeysOptimized 的源码:

private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];selectedKeys.keys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {selectedKeys.reset(i + 1);selectAgain();i = -1;}}
}

可以发现 processSelectedKeysOptimized 与 processSelectedKeysPlain 的代码结构非常相似,其中最重要的一点就是 selectedKeys 的遍历方式是不同的,所以还是需要看下 SelectedSelectionKeySet 的源码一探究竟。

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {SelectionKey[] keys;int size;SelectedSelectionKeySet() {keys = new SelectionKey[1024];}@Overridepublic boolean add(SelectionKey o) {if (o == null) {return false;}keys[size++] = o;if (size == keys.length) {increaseCapacity();}return true;}// 省略其他代码
}

因为 SelectedSelectionKeySet 内部使用的是 SelectionKey 数组,所以 processSelectedKeysOptimized 可以直接通过遍历数组取出 I/O 事件,相比 JDK HashSet 的遍历效率更高。SelectedSelectionKeySet 内部通过 size 变量记录数据的逻辑长度,每次执行 add 操作时,会把对象添加到 SelectionKey[] 尾部。当 size 等于 SelectionKey[] 的真实长度时,SelectionKey[] 会进行扩容。相比于 HashSet,SelectionKey[] 不需要考虑哈希冲突的问题,所以可以实现 O(1) 时间复杂度的 add 操作。

那么 SelectedSelectionKeySet 是什么时候生成的呢?通过查找 SelectedSelectionKeySet 的引用定位到 NioEventLoop#openSelector 方法,摘录核心源码片段如下:

private SelectorTuple openSelector() {// 省略其他代码final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {@Overridepublic Object run() {try {Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);long publicSelectedKeysFieldOffset =PlatformDependent.objectFieldOffset(publicSelectedKeysField);if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);return null;}}// 省略其他代码} catch (NoSuchFieldException e) {return e;} catch (IllegalAccessException e) {return e;}}});    // 省略其他代码
}

Netty 通过反射的方式,将 Selector 对象内部的 selectedKeys 和 publicSelectedKeys 替换为 SelectedSelectionKeySet,原先 selectedKeys 和 publicSelectedKeys 这两个字段都是 HashSet 类型。这真是很棒的一个小技巧,对于 JDK 底层的优化一般是很少见的,Netty 在细节优化上追求极致的精神值得我们学习。

到这里,Reactor 线程主流程的第二步。处理 I/O 事件 processSelectedKeys 已经讲完了,简单总结一下 processSelectedKeys 的要点。处理 I/O 事件时有两种选择,一种是处理 Netty 优化过的 selectedKeys,另外一种是正常的处理逻辑,两种策略的处理逻辑是相似的,都是通过获取 SelectionKey 上挂载的 attachment 判断 SelectionKey 的类型,不同的 SelectionKey 的类型又会调用不同的处理方法,然后通过 Pipeline 进行事件传播。Netty 优化过的 selectedKeys 是使用数组存储的 SelectionKey,相比于 JDK 的 HashSet 遍历效率更高效。processSelectedKeys 还做了更多的优化处理,如果发现超过默认阈值 256 个 Channel 从 Selector 对象中移除后,会重新做一次轮询操作,以确保 keySet 的有效性。

处理异步任务队列

继续分析 Reactor 线程主流程的最后一步,处理异步任务队列 runAllTasks。为什么 Netty 能够保证 Channel 的操作都是线程安全的呢?这要归功于 Netty 的任务机制。下面我们从任务添加和任务执行两个方面介绍 Netty 的任务机制。

  • 任务添加

NioEventLoop 内部有两个非常重要的异步任务队列,分别为普通任务队列和定时任务队列。NioEventLoop 提供了 execute() 和 schedule() 方法用于向不同的队列中添加任务,execute() 用于添加普通任务,schedule() 方法用于添加定时任务。

首先我们看下如何添加普通任务。NioEventLoop 继承自 SingleThreadEventExecutor,SingleThreadEventExecutor 提供了 execute() 用于添加普通任务,源码如下:

public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();addTask(task);if (!inEventLoop) {startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {}if (reject) {reject();}}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}
}
protected void addTask(Runnable task) {if (task == null) {throw new NullPointerException("task");}if (!offerTask(task)) {reject(task);}
}
final boolean offerTask(Runnable task) {if (isShutdown()) {reject();}return taskQueue.offer(task);
}

我们一步步跟进 addTask(task),发现最后是将任务添加到了 taskQueue,SingleThreadEventExecutor 中 taskQueue 就是普通任务队列。taskQueue 默认使用的是 Mpsc Queue,可以理解为多生产者单消费者队列,关于 Mpsc Queue 我们会有一节课程单独介绍,在这里不详细展开。此外,在任务处理的场景下,inEventLoop() 始终是返回 true,始终都是在 Reactor 线程内执行,既然在 Reactor 线程内都是串行执行,可以保证线程安全,那为什么还需要 Mpsc Queue 呢?我们继续往下看。

这里举一种很常见的场景,比如在 RPC 业务线程池里处理完业务请求后,可以根据用户请求拿到关联的 Channel,将数据写回客户端。那么对于外部线程调用 Channel 的相关方法 Netty 是如何操作的呢?我们一直跟进下 channel.write() 的源码:

// #AbstractChannel#write
public ChannelFuture write(Object msg) {return pipeline.write(msg);
}
// AbstractChannelHandlerContext#write
private void write(Object msg, boolean flush, ChannelPromise promise) {// 省略其他代码final AbstractChannelHandlerContext next = findContextOutbound(flush ?(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);final Object m = pipeline.touch(msg, next);EventExecutor executor = next.executor();if (executor.inEventLoop()) { // Reactor 线程内部调用if (flush) {next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else { // 外部线程调用会走到该分支final AbstractWriteTask task;if (flush) {task = WriteAndFlushTask.newInstance(next, m, promise);}  else {task = WriteTask.newInstance(next, m, promise);}if (!safeExecute(executor, task, promise, m)) {task.cancel();}}
}
// AbstractChannelHandlerContext#safeExecute
private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {try {executor.execute(runnable);return true;} catch (Throwable cause) {try {promise.setFailure(cause);} finally {if (msg != null) {ReferenceCountUtil.release(msg);}}return false;}
}

如果是 Reactor 线程发起调用 channel.write() 方法,inEventLoop() 返回 true,此时直接在 Reactor 线程内部直接交由 Pipeline 进行事件处理。如果是外部线程调用,那么会走到 else 分支,此时会将写操作封装成一个 WriteTask,然后通过 safeExecute() 执行,可以发现 safeExecute() 就是调用的 SingleThreadEventExecutor#execute() 方法,最终会将任务添加到 taskQueue 中。因为多个外部线程可能会并发操作同一个 Channel,这时候 Mpsc Queue 就可以保证线程的安全性。

接下来我们再分析定时任务的添加过程。与普通任务类似,定时任务也会有 Reactor 线程内和外部线程两种场景,我们直接跟进到 AbstractScheduledEventExecutor#schedule() 源码的深层,发现如下核心代码:

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {if (inEventLoop()) { // Reactor 线程内部scheduledTaskQueue().add(task.setId(nextTaskId++));} else { // 外部线程executeScheduledRunnable(new Runnable() {@Overridepublic void run() {scheduledTaskQueue().add(task.setId(nextTaskId++));}}, true, task.deadlineNanos());}return task;
}
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {if (scheduledTaskQueue == null) {scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(SCHEDULED_FUTURE_TASK_COMPARATOR,11);}return scheduledTaskQueue;
}
void executeScheduledRunnable(Runnable runnable,@SuppressWarnings("unused") boolean isAddition,@SuppressWarnings("unused") long deadlineNanos) {execute(runnable);
}

AbstractScheduledEventExecutor 中 scheduledTaskQueue 就是定时任务队列,可以看到 scheduledTaskQueue 的默认实现是优先级队列 DefaultPriorityQueue,这样可以方便队列中的任务按照时间进行排序。但是 DefaultPriorityQueue 是非线程安全的,如果是 Reactor 线程内部调用,因为是串行执行,所以不会有线程安全问题。如果是外部线程添加定时任务,我们发现 Netty 把添加定时任务的操作又再次封装成一个任务交由 executeScheduledRunnable() 处理,而 executeScheduledRunnable() 中又再次调用了普通任务的 execute() 的方法,巧妙地借助普通任务场景中 Mpsc Queue 解决了外部线程添加定时任务的线程安全问题。

  • 任务执行

介绍完 Netty 中不同任务的添加过程,回过头我们再来分析 Reactor 线程是如何执行这些任务的呢?通过 Reactor 线程主流程的分析,我们知道处理异步任务队列有 runAllTasks() 和 runAllTasks(long timeoutNanos) 两种实现,第一种会处理所有任务,第二种是带有超时时间来处理任务。之所以设置超时时间是为了防止 Reactor 线程处理任务时间过长而导致 I/O 事件阻塞,我们着重分析下 runAllTasks(long timeoutNanos) 的源码:

protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue(); // 1. 合并定时任务到普通任务队列// 2. 从普通任务队列中取出任务并处理Runnable task = pollTask();if (task == null) {afterRunningAllTasks();return false;}// 计算任务处理的超时时间final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;long runTasks = 0;long lastExecutionTime;for (;;) {safeExecute(task); // 执行任务runTasks ++;// 每执行 64 个任务检查一下是否超时if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask(); // 继续取出下一个任务if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}// 3. 收尾工作afterRunningAllTasks();this.lastExecutionTime = lastExecutionTime;return true;
}

异步任务处理 runAllTasks 的过程可以分为三步:合并定时任务到普通任务队列,然后从普通任务队列中取出任务并处理,最后进行收尾工作。我们分别看看三个步骤都是如何实现的。

第一步,合并定时任务到普通任务队列,对应的实现是 fetchFromScheduledTaskQueue() 方法。

private boolean fetchFromScheduledTaskQueue() {if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {return true;}long nanoTime = AbstractScheduledEventExecutor.nanoTime();for (;;) {Runnable scheduledTask = pollScheduledTask(nanoTime); // 从定时任务队列中取出截止时间小于等于当前时间的定时任务if (scheduledTask == null) {return true;}if (!taskQueue.offer(scheduledTask)) {// 如果普通任务队列已满,把定时任务放回scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);return false;}}
}
protected final Runnable pollScheduledTask(long nanoTime) {assert inEventLoop();Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();// 如果定时任务的 deadlineNanos 小于当前时间就取出if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {return null;}scheduledTaskQueue.remove();return scheduledTask;
}

定时任务只有满足截止时间 deadlineNanos 小于当前时间,才可以取出合并到普通任务。由于定时任务是按照截止时间 deadlineNanos 从小到大排列的,所以取出的定时任务不满足合并条件,那么定时任务队列中剩下的所有任务都不会满足条件,合并操作完成并退出。

第二步,从普通任务队列中取出任务并处理,可以回过头再看 runAllTasks(long timeoutNanos) 第二部分的源码,我已经用注释标明。真正处理任务的 safeExecute() 非常简单,就是直接调用的 Runnable 的 run() 方法。因为异步任务处理是有超时时间的,所以 Netty 采取了定时检测的策略,每执行 64 个任务的时候就会检查一下是否超时,这也是出于对性能的折中考虑,如果异步队列中有大量的短时间任务,每一次执行完都检测一次超时性能会有所降低。

第三步,收尾工作,对应的是 afterRunningAllTasks() 方法实现。

protected void afterRunningAllTasks() {runAllTasksFrom(tailTasks);
}
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {Runnable task = pollTaskFrom(taskQueue);if (task == null) {return false;}for (;;) {safeExecute(task);task = pollTaskFrom(taskQueue);if (task == null) {return true;}}
}

这里的尾部队列 tailTasks 相比于普通任务队列优先级较低,可以理解为是收尾任务,在每次执行完 taskQueue 中任务后会去获取尾部队列中任务执行。可以看出 afterRunningAllTasks() 就是把尾部队列 tailTasks 里的任务以此取出执行一遍。尾部队列并不常用,一般用于什么场景呢?例如你想对 Netty 的运行状态做一些统计数据,例如任务循环的耗时、占用物理内存的大小等等,都可以向尾部队列添加一个收尾任务完成统计数据的实时更新。

到这里,Netty 处理异步任务队列的流程就讲完了,再做一个简单的总结。异步任务主要分为普通任务和定时任务两种,在任务添加和任务执行时,都需要考虑 Reactor 线程内和外部线程两种情况。外部线程添加定时任务时,Netty 巧妙地借助普通任务的 Mpsc Queue 解决多线程并发操作时的线程安全问题。Netty 执行任务之前会将满足条件的定时任务合并到普通任务队列,由普通任务队列统一负责执行,并且每执行 64 个任务的时候就会检查一下是否超时。

总结

Reactor 线程模型是 Netty 最核心的内容,本节课我也花了大量的篇幅对其进行讲解。NioEventLoop 作为 Netty Reactor 线程的实现,它的设计原理是非常精妙的,值得我们反复阅读和思考。我们始终需要记住 NioEventLoop 的无限循环中所做的三件事:轮询 I/O 事件,处理 I/O 事件,处理异步任务队列。

关于 Netty Reactor 线程模型经常会遇到几个高频的面试问题,读完本节课之后你是否都已经清楚了呢?

  • Netty 的 NioEventLoop 是如何实现的?它为什么能够保证 Channel 的操作是线程安全的?
  • Netty 如何解决 JDK epoll 空轮询 Bug?
  • NioEventLoop 是如何实现无锁化的?

欢迎你在评论区留言,期待看到你分享关于 Reactor 线程模型更多的认识和思考。


19 源码篇:一个网络请求在 Netty 中的旅程

通过前面两节源码课程的学习,我们知道 Netty 在服务端启动时会为创建 NioServerSocketChannel,当客户端新连接接入时又会创建 NioSocketChannel,不管是服务端还是客户端 Channel,在创建时都会初始化自己的 ChannelPipeline。如果把 Netty 比作成一个生产车间,那么 Reactor 线程无疑是车间的中央管控系统,ChannelPipeline 可以看作是车间的流水线,将原材料按顺序进行一步步加工,然后形成一个完整的产品。本节课我将带你完整梳理一遍网络请求在 Netty 中的处理流程,从而加深对前两节课内容的理解,并着重讲解 ChannelPipeline 的工作原理。

说明:本文参考的 Netty 源码版本为 4.1.42.Final。

事件处理机制回顾

首先我们以服务端接入客户端新连接为例,并结合前两节源码课学习的知识点,一起复习下 Netty 的事件处理流程,如下图所示。

Drawing 0.png

Netty 服务端启动后,BossEventLoopGroup 会负责监听客户端的 Accept 事件。当有客户端新连接接入时,BossEventLoopGroup 中的 NioEventLoop 首先会新建客户端 Channel,然后在 NioServerSocketChannel 中触发 channelRead 事件传播,NioServerSocketChannel 中包含了一种特殊的处理器 ServerBootstrapAcceptor,最终通过 ServerBootstrapAcceptor 的 channelRead() 方法将新建的客户端 Channel 分配到 WorkerEventLoopGroup 中。WorkerEventLoopGroup 中包含多个 NioEventLoop,它会选择其中一个 NioEventLoop 与新建的客户端 Channel 绑定。

完成客户端连接注册之后,就可以接收客户端的请求数据了。当客户端向服务端发送数据时,NioEventLoop 会监听到 OP_READ 事件,然后分配 ByteBuf 并读取数据,读取完成后将数据传递给 Pipeline 进行处理。一般来说,数据会从 ChannelPipeline 的第一个 ChannelHandler 开始传播,将加工处理后的消息传递给下一个 ChannelHandler,整个过程是串行化执行。

在前面两节课中,我们介绍了服务端如何接收客户端新连接,以及 NioEventLoop 的工作流程,接下来我们重点介绍 ChannelPipeline 是如何实现 Netty 事件驱动的,这样 Netty 整个事件处理流程已经可以串成一条主线。

Pipeline 的初始化

我们知道 ChannelPipeline 是在创建 Channel 时被创建的,它是 Channel 中非常重要的一个成员变量。回到 AbstractChannel 的构造函数,以此为切入点,我们一起看下 ChannelPipeline 是如何一步步被构造出来的。

// AbstractChannel
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();
}
// AbstractChannel#newChannelPipeline
protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);
}
// DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise =  new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;
}

当 ChannelPipeline 初始化完成后,会构成一个由 ChannelHandlerContext 对象组成的双向链表,默认 ChannelPipeline 初始化状态的最小结构仅包含 HeadContext 和 TailContext 两个节点,如下图所示。

Drawing 1.png

HeadContext 和 TailContext 属于 ChannelPipeline 中两个特殊的节点,它们都继承自 AbstractChannelHandlerContext,根据源码看下 AbstractChannelHandlerContext 有哪些实现类,如下图所示。除了 HeadContext 和 TailContext,还有一个默认实现类 DefaultChannelHandlerContext,我们可以猜到 DefaultChannelHandlerContext 封装的是用户在 Netty 启动配置类中添加的自定义业务处理器,DefaultChannelHandlerContext 会插入到 HeadContext 和 TailContext 之间。

图片3.png

接着我们比较一下上述三种 AbstractChannelHandlerContext 实现类的内部结构,发现它们都包含当前 ChannelPipeline 的引用、处理器 ChannelHandler。有一点不同的是 HeadContext 节点还包含了用于操作底层数据读写的 unsafe 对象。对于 Inbound 事件,会先从 HeadContext 节点开始传播,所以 unsafe 可以看作是 Inbound 事件的发起者;对于 Outbound 事件,数据最后又会经过 HeadContext 节点返回给客户端,此时 unsafe 可以看作是 Outbound 事件的处理者。

接下来我们继续看下用户自定义的处理器是如何加入 ChannelPipeline 的双向链表的。

Pipeline 添加 Handler

在 Netty 客户端或者服务端启动时,就需要用户配置自定义实现的业务处理器。我们先看一段服务端启动类的代码片段:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new SampleInboundA());ch.pipeline().addLast(new SampleInboundB());ch.pipeline().addLast(new SampleOutboundA());ch.pipeline().addLast(new SampleOutboundB());}});

我们知道 ChannelPipeline 分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器,它们都会被 ChannelHandlerContext 封装,不管是哪种处理器,最终都是通过双向链表连接,代码示例中构成的 ChannelPipeline 的结构如下。

图片4.png

那么 ChannelPipeline 在添加 Handler 时是如何区分 Inbound 和 Outbound 类型的呢?我们一起跟进 ch.pipeline().addLast() 方法源码,定位到核心代码如下。

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {// 1. 检查是否重复添加 HandlercheckMultiplicity(handler);// 2. 创建新的 DefaultChannelHandlerContext 节点newCtx = newContext(group, filterName(name, handler), handler);// 3. 添加新的 DefaultChannelHandlerContext 节点到 ChannelPipelineaddLast0(newCtx);// 省略其他代码}// 4. 回调用户方法callHandlerAdded0(newCtx);return this;
}

addLast() 主要做了以下四件事:

  1. 检查是否重复添加 Handler。
  2. 创建新的 DefaultChannelHandlerContext 节点。
  3. 添加新的 DefaultChannelHandlerContext 节点到 ChannelPipeline。
  4. 回调用户方法。

前三个步骤通过 synchronized 加锁完成的,为了防止多线程并发操作 ChannelPipeline 底层双向链表。下面我们一步步进行拆解介绍。

首先在添加 Handler 时,ChannelPipeline 会检查该 Handler 有没有被添加过。如果一个非线程安全的 Handler 被添加到 ChannelPipeline 中,那么当多线程访问时会造成线程安全问题。Netty 具体检查重复性的逻辑由 checkMultiplicity() 方法实现:

private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}
}

用户自定义实现的处理一般都继承于 ChannelHandlerAdapter,ChannelHandlerAdapter 中使用 added 变量标识该 Handler 是否被添加过。如果当前添加的 Handler 是非共享且已被添加过,那么就会抛出异常,否则将当前 Handler 标记为已添加。

h.isSharable() 用于判断 Handler 是否是共享的,所谓共享就是这个 Handler 可以被重复添加到不同的 ChannelPipeline 中,共享的 Handler 必须要确保是线程安全的。如果我们想实现一个共享的 Handler,只需要在 Handler 中添加 @Sharable 注解即可,如下所示:

@ChannelHandler.Sharable
public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {}

接下来我们分析 addLast() 的第二步,创建新的 DefaultChannelHandlerContext 节点。在执行 newContext() 方法之前,会通过 filterName() 为 Handler 创建一个唯一的名称,一起先看下 Netty 生成名称的策略是怎样的。

private String filterName(String name, ChannelHandler handler) {if (name == null) {return generateName(handler);}checkDuplicateName(name);return name;
}
private String generateName(ChannelHandler handler) {Map<Class<?>, String> cache = nameCaches.get();Class<?> handlerType = handler.getClass();String name = cache.get(handlerType);if (name == null) {name = generateName0(handlerType);cache.put(handlerType, name);}if (context0(name) != null) {String baseName = name.substring(0, name.length() - 1);for (int i = 1;; i ++) {String newName = baseName + i;if (context0(newName) == null) {name = newName;break;}}}return name;
}
private static String generateName0(Class<?> handlerType) {return StringUtil.simpleClassName(handlerType) + "#0";
}

Netty 会使用 FastThreadLocal 缓存 Handler 和名称的映射关系,在为 Handler 生成默认名称的之前,会先从缓存中查找是否已经存在,如果不存在,会调用 generateName0() 方法生成默认名称后,并加入缓存。可以看出 Netty 生成名称的默认规则是 “简单类名#0”,例如 HeadContext 的默认名称为 “DefaultChannelPipeline$HeadContext#0”。

为 Handler 生成完默认名称之后,还会通过 context0() 方法检查生成的名称是否和 ChannelPipeline 已有的名称出现冲突,查重的过程很简单,就是对双向链表进行线性搜索。如果存在冲突现象,Netty 会将名称最后的序列号截取出来,一直递增直至生成不冲突的名称为止,例如 “简单类名#1” “简单类名#2” “简单类名#3” 等等。

接下来回到 newContext() 创建节点的流程,可以定位到 AbstractChannelHandlerContext 的构造函数:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,String name, Class<? extends ChannelHandler> handlerClass) {this.name = ObjectUtil.checkNotNull(name, "name");this.pipeline = pipeline;this.executor = executor;this.executionMask = mask(handlerClass);ordered = executor == null || executor instanceof OrderedEventExecutor;
}

AbstractChannelHandlerContext 中有一个 executionMask 属性并不是很好理解,它其实是一种常用的掩码运算操作,看下 mask() 方法是如何生成掩码的呢?

private static int mask0(Class<? extends ChannelHandler> handlerType) {int mask = MASK_EXCEPTION_CAUGHT;try {if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {// 如果是 ChannelInboundHandler 实例,所有 Inbound 事件置为 1mask |= MASK_ALL_INBOUND;// 排除 Handler 不感兴趣的 Inbound 事件if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_REGISTERED;}if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_UNREGISTERED;}if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_ACTIVE;}if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_INACTIVE;}if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {mask &= ~MASK_CHANNEL_READ;}if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_READ_COMPLETE;}if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;}if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {mask &= ~MASK_USER_EVENT_TRIGGERED;}}if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {// 如果是 ChannelOutboundHandler 实例,所有 Outbound 事件置为 1mask |= MASK_ALL_OUTBOUND;// 排除 Handler 不感兴趣的 Outbound 事件if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,SocketAddress.class, ChannelPromise.class)) {mask &= ~MASK_BIND;}if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,SocketAddress.class, ChannelPromise.class)) {mask &= ~MASK_CONNECT;}if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {mask &= ~MASK_DISCONNECT;}if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {mask &= ~MASK_CLOSE;}if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {mask &= ~MASK_DEREGISTER;}if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {mask &= ~MASK_READ;}if (isSkippable(handlerType, "write", ChannelHandlerContext.class,Object.class, ChannelPromise.class)) {mask &= ~MASK_WRITE;}if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {mask &= ~MASK_FLUSH;}}if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {mask &= ~MASK_EXCEPTION_CAUGHT;}} catch (Exception e) {PlatformDependent.throwException(e);}return mask;
}

Netty 中分别有多种 Inbound 事件和 Outbound 事件,如 Inbound 事件有 channelRegistered、channelActive、channelRead 等等。Netty 会判断 Handler 的类型是否是 ChannelInboundHandler 的实例,如果是会把所有 Inbound 事件先置为 1,然后排除 Handler 不感兴趣的方法。同理,Handler 类型如果是 ChannelOutboundHandler,也是这么实现的。

那么如何排除 Handler 不感兴趣的事件呢?Handler 对应事件的方法上如果有 @Skip 注解,Netty 认为该事件是需要排除的。大部分情况下,用户自定义实现的 Handler 只需要关心个别事件,那么剩余不关心的方法都需要加上 @Skip 注解吗?Netty 其实已经在 ChannelHandlerAdapter 中默认都添加好了,所以用户如果继承了 ChannelHandlerAdapter,默认没有重写的方法都是加上 @Skip 的,只有用户重写的方法才是 Handler 关心的事件。

回到 addLast() 的主流程,接着需要将新创建的 DefaultChannelHandlerContext 节点添加到 ChannelPipeline 中,跟进 addLast0() 方法的源码。

private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;
}

addLast0() 非常简单,就是向 ChannelPipeline 中双向链表的尾部插入新的节点,其中 HeadContext 和 TailContext 一直是链表的头和尾,新的节点被插入到 HeadContext 和 TailContext 之间。例如代码示例中 SampleOutboundA 被添加时,双向链表的结构变化如下所示。

Drawing 4.png

最后,添加完节点后,就到了回调用户方法,定位到 callHandlerAdded() 的核心源码:

final void callHandlerAdded() throws Exception {if (setAddComplete()) {handler().handlerAdded(this);}
}
final boolean setAddComplete() {for (;;) {int oldState = handlerState;if (oldState == REMOVE_COMPLETE) {return false;}        if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {return true;}}
}

Netty 会通过 CAS 修改节点的状态直至 REMOVE_COMPLETE 或者 ADD_COMPLETE,如果修改节点为 ADD_COMPLETE 状态,表示节点已经添加成功,然后会回调用户 Handler 中实现的 handlerAdded() 方法。

至此,Pipeline 添加 Handler 的实现原理我们已经讲完了,下面接着看下 Pipeline 删除 Handler 的场景。

Pipeline 删除 Handler

在《源码篇:从 Linux 出发深入剖析服务端启动流程》的课程中我们介绍了一种特殊的处理器 ChannelInitializer,ChannelInitializer 在服务端 Channel 注册完成之后会从 Pipeline 的双向链表中移除,我们一起回顾下这段代码:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) {try {initChannel((C) ctx.channel()); // 调用 ChannelInitializer 实现的 initChannel() 方法} catch (Throwable cause) {exceptionCaught(ctx, cause);} finally {ChannelPipeline pipeline = ctx.pipeline();if (pipeline.context(this) != null) {pipeline.remove(this); // 将 ChannelInitializer 自身从 Pipeline 中移出}}return true;}return false;
}

继续跟进 pipeline.remove() 的源码。

@Override
public final ChannelPipeline remove(ChannelHandler handler) {// 1. getContextOrDie 用于查找需要删除的节点remove(getContextOrDie(handler));return this;
}
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {assert ctx != head && ctx != tail;synchronized (this) {// 删除双向链表中的 Handler 节点atomicRemoveFromHandlerList(ctx);if (!registered) {callHandlerCallbackLater(ctx, false);return ctx;}EventExecutor executor = ctx.executor();if (!executor.inEventLoop()) {executor.execute(new Runnable() {@Overridepublic void run() {callHandlerRemoved0(ctx);}});return ctx;}}// 3. 回调用户函数callHandlerRemoved0(ctx);return ctx;
}

整个删除 Handler 的过程可以分为三步,分别为:

  1. 查找需要删除的 Handler 节点;
  2. 然后删除双向链表中的 Handler 节点;
  3. 最后回调用户函数。

我们对每一步逐一进行拆解。

第一步查找需要删除的 Handler 节点,我们自然可以想到通过遍历双向链表实现。一起看下 getContextOrDie() 方法的源码:

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);if (ctx == null) {throw new NoSuchElementException(handler.getClass().getName());} else {return ctx;}
}
public final ChannelHandlerContext context(ChannelHandler handler) {if (handler == null) {throw new NullPointerException("handler");}// 遍历双向链表查找AbstractChannelHandlerContext ctx = head.next;for (;;) {if (ctx == null) {return null;}// 如果 Handler 相同,返回当前的 Context 节点if (ctx.handler() == handler) { return ctx;}ctx = ctx.next;}
}

Netty 确实是从双向链表的头结点开始依次遍历,如果当前 Context 节点的 Handler 要被删除的 Handler 相同,那么便找到了要删除的 Handler,然后返回当前 Context 节点。

找到需要删除的 Handler 节点之后,接下来就是将节点从双向链表中删除,再跟进atomicRemoveFromHandlerList() 方法的源码:

private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {AbstractChannelHandlerContext prev = ctx.prev;AbstractChannelHandlerContext next = ctx.next;prev.next = next;next.prev = prev;
}

删除节点和添加节点类似,都是基本的链表操作,通过调整双向链表的指针即可实现。假设现在需要删除 SampleOutboundA 节点,我们以一幅图来表示删除时指针的变化过程,如下所示。

图片6.png

删除完节点之后,最后 Netty 会回调用户自定义实现的 handlerRemoved() 方法,回调的实现过程与添加节点时是类似的,在这里我就不赘述了。

到此为止,我们已经学会了 ChannelPipeline 内部结构的基本操作,只需要基本的链表操作就可以实现 Handler 节点的添加和删除,添加时通过掩码运算的方式排出 Handler 不关心的事件。 ChannelPipeline 是如何调度 Handler 的呢?接下来我们继续学习。

数据在 Pipeline 中的运转

我们知道,根据数据的流向,ChannelPipeline 分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器。Inbound 事件和 Outbound 事件的传播方向相反,Inbound 事件的传播方向为 Head -> Tail,而 Outbound 事件传播方向是 Tail -> Head。今天我们就以客户端和服务端请求-响应的场景,深入研究 ChannelPipeline 的事件传播机制。

Inbound 事件传播

当客户端向服务端发送数据时,服务端是如何接收的呢?回顾下之前我们所学习的 Netty Reactor 线程模型,首先 NioEventLoop 会不断轮询 OP_ACCEPT 和 OP_READ 事件,当事件就绪时,NioEventLoop 会及时响应。首先定位到 NioEventLoop 中源码的入口:

// NioEventLoop#processSelectedKey
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();
}

可以看出 unsafe.read() 会触发后续事件的处理,有一点需要避免混淆,在服务端 Channel 和客户端 Channel 中绑定的 unsafe 对象是不一样的,因为服务端 Channel 只关心如何接收客户端连接,而客户端 Channel 需要关心数据的读写。这里我们重点分析一下客户端 Channel 读取数据的过程,跟进 unsafe.read() 的源码:

public final void read() {final ChannelConfig config = config();// 省略其他代码final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {byteBuf = allocHandle.allocate(allocator); // 分配 ByteBufallocHandle.lastBytesRead(doReadBytes(byteBuf)); // 将 Channel 中的数据读到 ByteBuf 中if (allocHandle.lastBytesRead() <= 0) {byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {readPending = false;}break;}allocHandle.incMessagesRead(1);readPending = false;pipeline.fireChannelRead(byteBuf); // 传播 ChannelRead 事件byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete(); // 传播 readComplete 事件if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {if (!readPending && !config.isAutoRead()) {removeReadOp();}}
}

Netty 会不断从 Channel 中读取数据到分配的 ByteBuf 中,然后通过 pipeline.fireChannelRead() 方法触发 ChannelRead 事件的传播,fireChannelRead() 是我们需要重点分析的对象。

// DefaultChannelPipeline
public final ChannelPipeline fireChannelRead(Object msg) {AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;
}
// AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) { // 当前在 Reactor 线程内部,直接执行next.invokeChannelRead(m);} else {executor.execute(new Runnable() { // 如果是外部线程,则提交给异步任务队列@Overridepublic void run() {next.invokeChannelRead(m);}});}
}

Netty 首先会以 Head 节点为入参,直接调用一个静态方法 invokeChannelRead()。如果当前是在 Reactor 线程内部,会直接执行 next.invokeChannelRead() 方法。如果是外部线程发起的调用,Netty 会把 next.invokeChannelRead() 调用封装成异步任务提交到任务队列。通过之前对 NioEventLoop 源码的学习,我们知道这样可以保证执行流程全部控制在当前 NioEventLoop 线程内部串行化执行,确保线程安全性。我们抓住核心逻辑 next.invokeChannelRead() 继续跟进。

// AbstractChannelHandlerContext
private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);}
}

可以看出,当前 ChannelHandlerContext 节点会取出自身对应的 Handler,执行 Handler 的 channelRead 方法。此时当前节点是 HeadContext,所以 Inbound 事件是从 HeadContext 节点开始进行传播的,看下 HeadContext.channelRead() 是如何实现的。

// HeadContext
public void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.fireChannelRead(msg);
}
// AbstractChannelHandlerContext
public ChannelHandlerContext fireChannelRead(final Object msg) {// 找到下一个节点,执行 invokeChannelReadinvokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);return this;
}

我们发现 HeadContext.channelRead() 并没有做什么特殊操作,而是直接通过 fireChannelRead() 方法继续将读事件继续传播下去。接下来 Netty 会通过 findContextInbound(MASK_CHANNEL_READ), msg) 找到 HeadContext 的下一个节点,然后继续执行我们之前介绍的静态方法 invokeChannelRead(),从而进入一个递归调用的过程,直至某个条件结束。以上 channelRead 的执行过程我们可以梳理成一幅流程图:

Drawing 6.png

Netty 是如何判断 InboundHandler 是否关心 channelRead 事件呢?这就涉及findContextInbound(MASK_CHANNEL_READ), msg) 中的一个知识点,和上文中我们介绍的 executionMask 掩码运算是息息相关的。首先看下 findContextInbound() 的源码:

private AbstractChannelHandlerContext findContextInbound(int mask) {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.next;} while ((ctx.executionMask & mask) == 0);return ctx;
}

MASK_CHANNEL_READ 的值为 1 << 5,表示 channelRead 事件所在的二进制位已被置为 1。在代码示例中,SampleInboundA 是我们添加的 Inbound 类型的自定义处理器,它所对应的 executionMask 掩码和 MASK_CHANNEL_READ 进行与运算的结果如果不为 0,表示 SampleInboundA 对 channelRead 事件感兴趣,需要触发执行 SampleInboundA 的 channelRead() 方法。

Inbound 事件在上述递归调用的流程中什么时候能够结束呢?有以下两种情况:

  1. 用户自定义的 Handler 没有执行 fireChannelRead() 操作,则在当前 Handler 终止 Inbound 事件传播。
  2. 如果用户自定义的 Handler 都执行了 fireChannelRead() 操作,Inbound 事件传播最终会在 TailContext 节点终止。

接下来,我们着重看下 TailContext 节点做了哪些工作。

public void channelRead(ChannelHandlerContext ctx, Object msg) {onUnhandledInboundMessage(ctx, msg);
}
protected void onUnhandledInboundMessage(Object msg) {try {logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. " +"Please check your pipeline configuration.", msg);} finally {ReferenceCountUtil.release(msg);}
}

可以看出 TailContext 只是日志记录了丢弃的 Inbound 消息,并释放 ByteBuf 做一个兜底保护,防止内存泄漏。

到此为止,Inbound 事件的传播流程已经介绍完了,Inbound 事件在 ChannelPipeline 中的传播方向是 Head -> Tail。Netty 会从 ChannelPipeline 中找到对传播事件感兴趣的 Inbound 处理器,执行事件回调方法,然后继续向下一个节点传播,整个事件传播流程是一个递归调用的过程。

Outbound 事件传播

分析完 Inbound 事件的传播流程之后,再学习 Outbound 事件传播就会简单很多。Outbound 事件传播的方向是从 Tail -> Head,与 Inbound 事件的传播方向恰恰是相反的。Outbound 事件最常见的就是写事件,执行 writeAndFlush() 方法时就会触发 Outbound 事件传播。我们直接从 TailContext 跟进 writeAndFlush() 源码:

@Override
public final ChannelFuture writeAndFlush(Object msg) {return tail.writeAndFlush(msg);
}

继续跟进 tail.writeAndFlush() 的源码,最终会定位到 AbstractChannelHandlerContext 中的 write 方法。该方法是 writeAndFlush 的核心逻辑,具体源码如下。

private void write(Object msg, boolean flush, ChannelPromise promise) {// ...... 省略部分非核心代码 ......    // 找到 Pipeline 链表中下一个 Outbound 类型的 ChannelHandler 节点final AbstractChannelHandlerContext next = findContextOutbound(flush ?(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);final Object m = pipeline.touch(msg, next);EventExecutor executor = next.executor();// 判断当前线程是否是 NioEventLoop 中的线程if (executor.inEventLoop()) {if (flush) {// 因为 flush == true,所以流程走到这里next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {final AbstractWriteTask task;if (flush) {task = WriteAndFlushTask.newInstance(next, m, promise);}  else {task = WriteTask.newInstance(next, m, promise);}if (!safeExecute(executor, task, promise, m)) {task.cancel();}}
}

在《数据传输:writeAndFlush 处理流程剖析》的课程中,我们已经对 write() 方法做了深入分析,这里抛开其他技术细节,我们只分析 Outbound 事件传播的过程。

假设我们在代码示例中 SampleOutboundB 调用了 writeAndFlush() 方法,那么 Netty 会调用 findContextOutbound() 方法找到 Pipeline 链表中下一个 Outbound 类型的 ChannelHandler,对应上述代码示例中下一个 Outbound 节点是 SampleOutboundA,然后调用 next.invokeWriteAndFlush(m, promise),我们跟进去:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {if (invokeHandler()) {invokeWrite0(msg, promise);invokeFlush0();} else {writeAndFlush(msg, promise);}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {try {((ChannelOutboundHandler) handler()).write(this, msg, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}
}

我们发现,invokeWriteAndFlush() 方法最终会它会执行下一个 ChannelHandler 节点的 write 方法。一般情况下,用户在实现 outBound 类型的 ChannelHandler 时都会继承 ChannelOutboundHandlerAdapter,一起看下它的 write() 方法是如何处理 outBound 事件的。

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ctx.write(msg, promise);
}

ChannelOutboundHandlerAdapter.write() 只是调用了 AbstractChannelHandlerContext 的 write() 方法,是不是似曾相识?与之前介绍的 Inbound 事件处理流程类似,此时流程又回到了 AbstractChannelHandlerContext 中重复执行 write 方法,继续寻找下一个 Outbound 节点,也是一个递归调用的过程。

编码器是用户经常需要自定义实现的处理器,然而为什么用户的编码器里并没有重写 write(),只是重写一个 encode() 方法呢?在《Netty 如何实现自定义通信协议》课程中,我们所介绍的 MessageToByteEncoder 源码,用户自定义的编码器基本都会继承 MessageToByteEncoder,MessageToByteEncoder 重写了 ChanneOutboundHandler 的 write() 方法,其中会调用子类实现的 encode 方法完成数据编码,这里我们不再赘述了。

那么 OutBound 事件什么时候传播结束呢?也许你已经猜到了,OutBound 事件最终会传播到 HeadContext 节点。所以 HeadContext 节点既是 Inbound 处理器,又是 OutBound 处理器,继续看下 HeadContext 是如何拦截和处理 write 事件的。

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {unsafe.write(msg, promise);
}

HeadContext 最终调用了底层的 unsafe 写入数据,数据在执行 write() 方法时,只会写入到一个底层的缓冲数据结构,然后等待 flush 操作将数据冲刷到 Channel 中。关于 write 和 flush 是如何操作缓存数据结构的,快去复习一遍《数据传输:writeAndFlush 处理流程剖析》吧,将知识点形成一个完整的体系。

到此为止,outbound 事件传播也介绍完了,它的传播方向是 Tail -> Head,与 Inbound 事件的传播是相反的。MessageToByteEncoder 是用户在实现编码时经常用到的一个抽象类,MessageToByteEncoder 中已经重写了 ChanneOutboundHandler 的 write() 方法,大部分情况下用户只需要重写 encode() 即可。

异常事件传播

在《服务编排层:Pipeline 如何协调各类 Handler》中,我们已经初步介绍了 Netty 实现统一异常拦截和处理的最佳实践,首先回顾下异常拦截器的简单实现。

public class ExceptionHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {if (cause instanceof RuntimeException) {System.out.println("Handle Business Exception Success.");}}
}

异常处理器 ExceptionHandler 一般会继承 ChannelDuplexHandler,ChannelDuplexHandler 既是一个 Inbound 处理器,又是一个 Outbound 处理器。ExceptionHandler 应该被添加在自定义处理器的尾部,如下图所示:

图片8.png

那么异常处理器 ExceptionHandler 什么时候被执行呢?我们分别从 Inbound 异常事件传播和 Outbound 异常事件传播两种场景进行分析。

首先看下 Inbound 异常事件的传播。还是从数据读取的场景入手,发现 Inbound 事件传播的时候有异常处理的相关逻辑,我们再一起重新分析下数据读取环节的源码。

// AbstractChannelHandlerContext
private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);}
}
// AbstractChannelHandlerContext
private void notifyHandlerException(Throwable cause) {// 省略其他代码invokeExceptionCaught(cause);
}
// AbstractChannelHandlerContext
private void invokeExceptionCaught(final Throwable cause) {if (invokeHandler()) {try {handler().exceptionCaught(this, cause); // 调用 Handler 实现的 exceptionCaught 方法} catch (Throwable error) {// 省略其他代码}} else {fireExceptionCaught(cause);}
}

如果 SampleInboundA 在读取数据时发生了异常,invokeChannelRead 会捕获异常,并执行 notifyHandlerException() 方法进行异常处理。我们一步步跟进,发现最终会调用 Handler 的 exceptionCaught() 方法,所以用户可以通过重写 exceptionCaught() 实现自定义的异常处理。

我们知道,统一异常处理器 ExceptionHandler 是在 ChannelPipeline 的末端,SampleInboundA 并没有重写 exceptionCaught() 方法,那么 SampleInboundA 产生的异常是如何传播到 ExceptionHandler 中呢?用户实现的 Inbound 处理器一般都会继承 ChannelInboundHandlerAdapter 抽象类,果然我们在 ChannelInboundHandlerAdapter 中发现了 exceptionCaught() 的实现:

// ChannelInboundHandlerAdapter
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.fireExceptionCaught(cause);
}
// AbstractChannelHandlerContext
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);return this;
}

ChannelInboundHandlerAdapter 默认调用 fireExceptionCaught() 方法传播异常事件,而 fireExceptionCaught() 执行时会先调用 findContextInbound() 方法找到下一个对异常事件关注的 Inbound 处理器,然后继续向下传播异常。所以这里应该明白为什么统一异常处理器 ExceptionHandler 为什么需要添加在 ChannelPipeline 的末端了吧?这样 ExceptionHandler 可以接收所有 Inbound 处理器发生的异常。

接下来,我们分析 Outbound 异常事件传播。你可能此时就会有一个疑问,Outbound 事件的传播方向与 Inbound 事件是相反的,为什么统一异常处理器 ExceptionHandler 没有添加在 ChannelPipeline 的头部呢?我们通过 writeAndFlush() 的调用过程再来一探究竟。

// AbstractChannelHandlerContext
private void invokeFlush0() {try {((ChannelOutboundHandler) handler()).flush(this);} catch (Throwable t) {notifyHandlerException(t);}
}

我们发现,flush 发送数据时如果发生异常,那么异常也会被捕获并交由同样的 notifyHandlerException() 方法进行处理。因为 notifyHandlerException() 方法中会向下寻找 Inbound 处理器,此时又会回到 Inbound 异常事件的传播流程。所以说,异常事件的传播方向与 Inbound 事件几乎是一样的,最后一定会传播到统一异常处理器 ExceptionHandler。

到这里,整个异常事件的传播过程已经分析完了。你需要记住的是,异常事件的传播顺序与 ChannelHandler 的添加顺序相同,会依次向后传播,与 Inbound 事件和 Outbound 事件无关。

总结

这节点我们学习了数据在 Netty 中的完整处理流程,其中重点分析了数据是如何在 ChannelPipeline 中流转的。我们做一个知识点总结:

  • ChannelPipeline 是双向链表结构,包含 ChannelInboundHandler 和 ChannelOutboundHandler 两种处理器。
  • Inbound 事件和 Outbound 事件的传播方向相反,Inbound 事件的传播方向为 Head -> Tail,而 Outbound 事件传播方向是 Tail -> Head。
  • 异常事件的处理顺序与 ChannelHandler 的添加顺序相同,会依次向后传播,与 Inbound 事件和 Outbound 事件无关。

再整体回顾下 ChannelPipeline 中事件传播的实现原理:

  • Inbound 事件传播从 HeadContext 节点开始,Outbound 事件传播从 TailContext 节点开始。
  • AbstractChannelHandlerContext 抽象类中实现了一系列 fire 和 invoke 方法,如果想让事件想下传播,只需要调用 fire 系列的方法即可。fire 和 invoke 的系列方法结合 findContextInbound() 和 findContextOutbound() 可以控制 Inbound 和 Outbound 事件的传播方向,整个过程是一个递归调用。

20 技巧篇:Netty 的 FastThreadLocal 究竟比 ThreadLocal 快在哪儿?

在前面几篇源码解析的课程中,我们都有在源码中发现 FastThreadLocal 的身影。顾名思义,Netty 作为高性能的网络通信框架,FastThreadLocal 是比 JDK 自身的 ThreadLocal 性能更高的通信框架。FastThreadLocal 到底比 ThreadLocal 快在哪里呢?这节课我们就一起来探索 FastThreadLocal 高性能的奥秘。

说明:本文参考的 Netty 源码版本为 4.1.42.Final。

JDK ThreadLocal 基本原理

JDK ThreadLocal 不仅是高频的面试知识点,而且在日常工作中也是常用一种工具,所以首先我们先学习下 Java 原生的 ThreadLocal 的实现原理,可以帮助我们更好地对比和理解 Netty 的 FastThreadLocal。

如果你需要变量在多线程之间隔离,或者在同线程内的类和方法中共享,那么 ThreadLocal 大显身手的时候就到了。ThreadLocal 可以理解为线程本地变量,它是 Java 并发编程中非常重要的一个类。ThreadLocal 为变量在每个线程中都创建了一个副本,该副本只能被当前线程访问,多线程之间是隔离的,变量不能在多线程之间共享。这样每个线程修改变量副本时,不会对其他线程产生影响。

接下来我们通过一个例子看下 ThreadLocal 如何使用:

public class ThreadLocalTest {private static final ThreadLocal<String> THREAD_NAME_LOCAL = ThreadLocal.withInitial(() -> Thread.currentThread().getName());private static final ThreadLocal<TradeOrder> TRADE_THREAD_LOCAL = new ThreadLocal<>();public static void main(String[] args) {for (int i = 0; i < 2; i++) {int tradeId = i;new Thread(() -> {TradeOrder tradeOrder = new TradeOrder(tradeId, tradeId % 2 == 0 ? "已支付" : "未支付");TRADE_THREAD_LOCAL.set(tradeOrder);System.out.println("threadName: " + THREAD_NAME_LOCAL.get());System.out.println("tradeOrder info:" + TRADE_THREAD_LOCAL.get());}, "thread-" + i).start();}}static class TradeOrder {long id;String status;public TradeOrder(int id, String status) {this.id = id;this.status = status;}@Overridepublic String toString() {return "id=" + id + ", status=" + status;}}
}

在上述示例中,构造了 THREAD_NAME_LOCAL 和 TRADE_THREAD_LOCAL 两个 ThreadLocal 变量,分别用于记录当前线程名称和订单交易信息。ThreadLocal 是可以支持泛型的,THREAD_NAME_LOCAL 和 TRADE_THREAD_LOCAL 存放 String 类型和 TradeOrder 对象类型的数据,你可以通过 set()/get() 方法设置和读取 ThreadLocal 实例。一起看下示例代码的运行结果:

threadName: thread-0
threadName: thread-1
tradeOrder info:id=1, status=未支付
tradeOrder info:id=0, status=已支付

可以看出 thread-1 和 thread-2 虽然操作的是同一个 ThreadLocal 对象,但是它们取到了不同的线程名称和订单交易信息。那么一个线程内如何存在多个 ThreadLocal 对象,每个 ThreadLocal 对象是如何存储和检索的呢?

接下来我们看看 ThreadLocal 的实现原理。既然多线程访问 ThreadLocal 变量时都会有自己独立的实例副本,那么很容易想到的方案就是在 ThreadLocal 中维护一个 Map,记录线程与实例之间的映射关系。当新增线程和销毁线程时都需要更新 Map 中的映射关系,因为会存在多线程并发修改,所以需要保证 Map 是线程安全的。那么 JDK 的 ThreadLocal 是这么实现的吗?答案是 NO。因为在高并发的场景并发修改 Map 需要加锁,势必会降低性能。JDK 为了避免加锁,采用了相反的设计思路。以 Thread 入手,在 Thread 中维护一个 Map,记录 ThreadLocal 与实例之间的映射关系,这样在同一个线程内,Map 就不需要加锁了。示例代码中线程 Thread 和 ThreadLocal 的关系可以用以下这幅图表示。

Drawing 0.png

那么在 Thread 内部,维护映射关系的 Map 是如何实现的呢?从源码中可以发现 Thread 使用的是 ThreadLocal 的内部类 ThreadLocalMap,所以 Thread、ThreadLocal 和 ThreadLocalMap 之间的关系可以用下图表示:

Drawing 1.png

为了更加深入理解 ThreadLocal,了解 ThreadLocalMap 的内部实现是非常有必要的。ThreadLocalMap 其实与 HashMap 的数据结构类似,但是 ThreadLocalMap 不具备通用性,它是为 ThreadLocal 量身定制的。

ThreadLocalMap 是一种使用线性探测法实现的哈希表,底层采用数组存储数据。如下图所示,ThreadLocalMap 会初始化一个长度为 16 的 Entry 数组,每个 Entry 对象用于保存 key-value 键值对。与 HashMap 不同的是,Entry 的 key 就是 ThreadLocal 对象本身,value 就是用户具体需要存储的值。

Drawing 2.png

当调用 ThreadLocal.set() 添加 Entry 对象时,是如何解决 Hash 冲突的呢?这就需要我们了解线性探测法的实现原理。每个 ThreadLocal 在初始化时都会有一个 Hash 值为 threadLocalHashCode,每增加一个 ThreadLocal, Hash 值就会固定增加一个魔术 HASH_INCREMENT = 0x61c88647。为什么取 0x61c88647 这个魔数呢?实验证明,通过 0x61c88647 累加生成的 threadLocalHashCode 与 2 的幂取模,得到的结果可以较为均匀地分布在长度为 2 的幂大小的数组中。有了 threadLocalHashCode 的基础,下面我们通过下面的表格来具体讲解线性探测法是如何实现的。

图片2.png

为了便于理解,我们采用一组简单的数据模拟 ThreadLocal.set() 的过程是如何解决 Hash 冲突的。

  1. threadLocalHashCode = 4,threadLocalHashCode & 15 = 4;此时数据应该放在数组下标为 4 的位置。下标 4 的位置正好没有数据,可以存放。
  2. threadLocalHashCode = 19,threadLocalHashCode & 15 = 4;但是下标 4 的位置已经有数据了,如果当前需要添加的 Entry 与下标 4 位置已存在的 Entry 两者的 key 相同,那么该位置 Entry 的 value 将被覆盖为新的值。我们假设 key 都是不相同的,所以此时需要向后移动一位,下标 5 的位置没有冲突,可以存放。
  3. threadLocalHashCode = 33,threadLocalHashCode & 15 = 3;下标 3 的位置已经有数据,向后移一位,下标 4 位置还是有数据,继续向后查找,发现下标 6 没有数据,可以存放。

ThreadLocal.get() 的过程也是类似的,也是根据 threadLocalHashCode 的值定位到数组下标,然后判断当前位置 Entry 对象与待查询 Entry 对象的 key 是否相同,如果不同,继续向下查找。由此可见,ThreadLocal.set()/get() 方法在数据密集时很容易出现 Hash 冲突,需要 O(n) 时间复杂度解决冲突问题,效率较低。

下面我们再聊聊 ThreadLocalMap 中 Entry 的设计原理。Entry 继承自弱引用类 WeakReference,Entry 的 key 是弱引用,value 是强引用。在 JVM 垃圾回收时,只要发现了弱引用的对象,不管内存是否充足,都会被回收。那么为什么 Entry 的 key 要设计成弱引用呢?我们试想下,如果 key 都是强引用,当 ThreadLocal 不再使用时,然而 ThreadLocalMap 中还是存在对 ThreadLocal 的强引用,那么 GC 是无法回收的,从而造成内存泄漏。

虽然 Entry 的 key 设计成了弱引用,但是当 ThreadLocal 不再使用被 GC 回收后,ThreadLocalMap 中可能出现 Entry 的 key 为 NULL,那么 Entry 的 value 一直会强引用数据而得不到释放,只能等待线程销毁。那么应该如何避免 ThreadLocalMap 内存泄漏呢?ThreadLocal 已经帮助我们做了一定的保护措施,在执行 ThreadLocal.set()/get() 方法时,ThreadLocal 会清除 ThreadLocalMap 中 key 为 NULL 的 Entry 对象,让它还能够被 GC 回收。除此之外,当线程中某个 ThreadLocal 对象不再使用时,立即调用 remove() 方法删除 Entry 对象。如果是在异常的场景中,记得在 finally 代码块中进行清理,保持良好的编码意识。

关于 JDK 的 ThreadLocal 的基本原理我们已经介绍完了,既然 ThreadLocal 已经非常成熟,而且在日常开发中也被广泛使用,Netty 为什么还要自己实现一个 FastThreadLocal 呢?性能真的比 ThreadLocal 高很多吗?我们接下来一起一探究竟。

FastThreadLocal 为什么快

FastThreadLocal 的实现与 ThreadLocal 非常类似,Netty 为 FastThreadLocal 量身打造了 FastThreadLocalThread 和 InternalThreadLocalMap 两个重要的类。下面我们看下这两个类是如何实现的。

FastThreadLocalThread 是对 Thread 类的一层包装,每个线程对应一个 InternalThreadLocalMap 实例。只有 FastThreadLocal 和 FastThreadLocalThread 组合使用时,才能发挥 FastThreadLocal 的性能优势。首先看下 FastThreadLocalThread 的源码定义:

public class FastThreadLocalThread extends Thread {private InternalThreadLocalMap threadLocalMap;// 省略其他代码
}

可以看出 FastThreadLocalThread 主要扩展了 InternalThreadLocalMap 字段,我们可以猜测到 FastThreadLocalThread 主要使用 InternalThreadLocalMap 存储数据,而不再是使用 Thread 中的 ThreadLocalMap。所以想知道 FastThreadLocalThread 高性能的奥秘,必须要了解 InternalThreadLocalMap 的设计原理。

上文中我们讲到了 ThreadLocal 的一个重要缺点,就是 ThreadLocalMap 采用线性探测法解决 Hash 冲突性能较慢,那么 InternalThreadLocalMap 又是如何优化的呢?首先一起看下 InternalThreadLocalMap 的内部构造。

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8;private static final int STRING_BUILDER_INITIAL_SIZE;private static final int STRING_BUILDER_MAX_SIZE;public static final Object UNSET = new Object();private BitSet cleanerFlags;    private InternalThreadLocalMap() {super(newIndexedVariableTable());}private static Object[] newIndexedVariableTable() {Object[] array = new Object[32];Arrays.fill(array, UNSET);return array;}    public static int nextVariableIndex() {int index = nextIndex.getAndIncrement();if (index < 0) {nextIndex.decrementAndGet();throw new IllegalStateException("too many thread-local indexed variables");}return index;}// 省略其他代码
}
class UnpaddedInternalThreadLocalMap {static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();static final AtomicInteger nextIndex = new AtomicInteger();    Object[] indexedVariables;UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {this.indexedVariables = indexedVariables;}// 省略其他代码
}

从 InternalThreadLocalMap 内部实现来看,与 ThreadLocalMap 一样都是采用数组的存储方式。但是 InternalThreadLocalMap 并没有使用线性探测法来解决 Hash 冲突,而是在 FastThreadLocal 初始化的时候分配一个数组索引 index,index 的值采用原子类 AtomicInteger 保证顺序递增,通过调用 InternalThreadLocalMap.nextVariableIndex() 方法获得。然后在读写数据的时候通过数组下标 index 直接定位到 FastThreadLocal 的位置,时间复杂度为 O(1)。如果数组下标递增到非常大,那么数组也会比较大,所以 FastThreadLocal 是通过空间换时间的思想提升读写性能。下面通过一幅图描述 InternalThreadLocalMap、index 和 FastThreadLocal 之间的关系。

Drawing 3.png

通过上面 FastThreadLocal 的内部结构图,我们对比下与 ThreadLocal 有哪些区别呢?FastThreadLocal 使用 Object 数组替代了 Entry 数组,Object[0] 存储的是一个Set<FastThreadLocal<?>> 集合,从数组下标 1 开始都是直接存储的 value 数据,不再采用 ThreadLocal 的键值对形式进行存储。

假设现在我们有一批数据需要添加到数组中,分别为 value1、value2、value3、value4,对应的 FastThreadLocal 在初始化的时候生成的数组索引分别为 1、2、3、4。如下图所示。

Drawing 4.png

至此,我们已经对 FastThreadLocal 有了一个基本的认识,下面我们结合具体的源码分析 FastThreadLocal 的实现原理。

FastThreadLocal 源码分析

在讲解源码之前,我们回过头看下上文中的 ThreadLocal 示例,如果把示例中 ThreadLocal 替换成 FastThread,应当如何使用呢?

public class FastThreadLocalTest {private static final FastThreadLocal<String> THREAD_NAME_LOCAL = new FastThreadLocal<>();private static final FastThreadLocal<TradeOrder> TRADE_THREAD_LOCAL = new FastThreadLocal<>();public static void main(String[] args) {for (int i = 0; i < 2; i++) {int tradeId = i;String threadName = "thread-" + i;new FastThreadLocalThread(() -> {THREAD_NAME_LOCAL.set(threadName);TradeOrder tradeOrder = new TradeOrder(tradeId, tradeId % 2 == 0 ? "已支付" : "未支付");TRADE_THREAD_LOCAL.set(tradeOrder);System.out.println("threadName: " + THREAD_NAME_LOCAL.get());System.out.println("tradeOrder info:" + TRADE_THREAD_LOCAL.get());}, threadName).start();}}
}

可以看出,FastThreadLocal 的使用方法几乎和 ThreadLocal 保持一致,只需要把代码中 Thread、ThreadLocal 替换为 FastThreadLocalThread 和 FastThreadLocal 即可,Netty 在易用性方面做得相当棒。下面我们重点对示例中用得到 FastThreadLocal.set()/get() 方法做深入分析。

首先看下 FastThreadLocal.set() 的源码:

public final void set(V value) {if (value != InternalThreadLocalMap.UNSET) { // 1. value 是否为缺省值InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 2. 获取当前线程的 InternalThreadLocalMapsetKnownNotUnset(threadLocalMap, value); // 3. 将 InternalThreadLocalMap 中数据替换为新的 value} else {remove();}
}

FastThreadLocal.set() 方法虽然入口只有几行代码,但是内部逻辑是相当复杂的。我们首先还是抓住代码主干,一步步进行拆解分析。set() 的过程主要分为三步:

  1. 判断 value 是否为缺省值,如果等于缺省值,那么直接调用 remove() 方法。这里我们还不知道缺省值和 remove() 之间的联系是什么,我们暂且把 remove() 放在最后分析。
  2. 如果 value 不等于缺省值,接下来会获取当前线程的 InternalThreadLocalMap。
  3. 然后将 InternalThreadLocalMap 中对应数据替换为新的 value。

首先我们看下 InternalThreadLocalMap.get() 方法,源码如下:

public static InternalThreadLocalMap get() {Thread thread = Thread.currentThread();if (thread instanceof FastThreadLocalThread) { // 当前线程是否为 FastThreadLocalThread 类型return fastGet((FastThreadLocalThread) thread);} else {return slowGet();}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); // 获取 FastThreadLocalThread 的 threadLocalMap 属性if (threadLocalMap == null) {thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());}return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; InternalThreadLocalMap ret = slowThreadLocalMap.get(); // 从 JDK 原生 ThreadLocal 中获取 InternalThreadLocalMapif (ret == null) {ret = new InternalThreadLocalMap();slowThreadLocalMap.set(ret);}return ret;
}

InternalThreadLocalMap.get() 逻辑很简单,为了帮助你更好地理解,下面使用一幅图描述 InternalThreadLocalMap 的获取方式。

Drawing 5.png

如果当前线程是 FastThreadLocalThread 类型,那么直接通过 fastGet() 方法获取 FastThreadLocalThread 的 threadLocalMap 属性即可。如果此时 InternalThreadLocalMap 不存在,直接创建一个返回。关于 InternalThreadLocalMap 的初始化在上文中已经介绍过,它会初始化一个长度为 32 的 Object 数组,数组中填充着 32 个缺省对象 UNSET 的引用。

那么 slowGet() 又是什么作用呢?从代码分支来看,slowGet() 是针对非 FastThreadLocalThread 类型的线程发起调用时的一种兜底方案。如果当前线程不是 FastThreadLocalThread,内部是没有 InternalThreadLocalMap 属性的,Netty 在 UnpaddedInternalThreadLocalMap 中保存了一个 JDK 原生的 ThreadLocal,ThreadLocal 中存放着 InternalThreadLocalMap,此时获取 InternalThreadLocalMap 就退化成 JDK 原生的 ThreadLocal 获取。

获取 InternalThreadLocalMap 的过程已经讲完了,下面看下 setKnownNotUnset() 如何将数据添加到 InternalThreadLocalMap 的。

private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {if (threadLocalMap.setIndexedVariable(index, value)) { // 1. 找到数组下标 index 位置,设置新的 valueaddToVariablesToRemove(threadLocalMap, this); // 2. 将 FastThreadLocal 对象保存到待清理的 Set 中}
}

setKnownNotUnset() 主要做了两件事:

  1. 找到数组下标 index 位置,设置新的 value。
  2. 将 FastThreadLocal 对象保存到待清理的 Set 中。

首先我们看下第一步 threadLocalMap.setIndexedVariable() 的源码实现:

public boolean setIndexedVariable(int index, Object value) {Object[] lookup = indexedVariables;if (index < lookup.length) {Object oldValue = lookup[index]; lookup[index] = value; // 直接将数组 index 位置设置为 value,时间复杂度为 O(1)return oldValue == UNSET;} else {expandIndexedVariableTableAndSet(index, value); // 容量不够,先扩容再设置值return true;}
}

indexedVariables 就是 InternalThreadLocalMap 中用于存放数据的数组,如果数组容量大于 FastThreadLocal 的 index 索引,那么直接找到数组下标 index 位置将新 value 设置进去,事件复杂度为 O(1)。在设置新的 value 之前,会将之前 index 位置的元素取出,如果旧的元素还是 UNSET 缺省对象,那么返回成功。

如果数组容量不够了怎么办呢?InternalThreadLocalMap 会自动扩容,然后再设置 value。接下来看看 expandIndexedVariableTableAndSet() 的扩容逻辑:

private void expandIndexedVariableTableAndSet(int index, Object value) {Object[] oldArray = indexedVariables;final int oldCapacity = oldArray.length;int newCapacity = index;newCapacity |= newCapacity >>>  1;newCapacity |= newCapacity >>>  2;newCapacity |= newCapacity >>>  4;newCapacity |= newCapacity >>>  8;newCapacity |= newCapacity >>> 16;newCapacity ++;Object[] newArray = Arrays.copyOf(oldArray, newCapacity);Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);newArray[index] = value;indexedVariables = newArray;
}

上述代码的位移操作是不是似曾相识?我们去翻阅下 JDK HashMap 中扩容的源码,其中有这么一段代码:

static final int tableSizeFor(int cap) {int n = cap - 1;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

可以看出 InternalThreadLocalMap 实现数组扩容几乎和 HashMap 完全是一模一样的,所以多读源码还是可以给我们很多启发的。InternalThreadLocalMap 以 index 为基准进行扩容,将数组扩容后的容量向上取整为 2 的次幂。然后将原数组内容拷贝到新的数组中,空余部分填充缺省对象 UNSET,最终把新数组赋值给 indexedVariables。

为什么 InternalThreadLocalMap 以 index 为基准进行扩容,而不是原数组长度呢?假设现在初始化了 70 个 FastThreadLocal,但是这些 FastThreadLocal 从来没有调用过 set() 方法,此时数组还是默认长度 32。当第 index = 70 的 FastThreadLocal 调用 set() 方法时,如果按原数组容量 32 进行扩容 2 倍后,还是无法填充 index = 70 的数据。所以使用 index 为基准进行扩容可以解决这个问题,但是如果 FastThreadLocal 特别多,数组的长度也是非常大的。

回到 setKnownNotUnset() 的主流程,向 InternalThreadLocalMap 添加完数据之后,接下就是将 FastThreadLocal 对象保存到待清理的 Set 中。我们继续看下 addToVariablesToRemove() 是如何实现的。

private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); // 获取数组下标为 0 的元素Set<FastThreadLocal<?>> variablesToRemove;if (v == InternalThreadLocalMap.UNSET || v == null) {variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); // 创建 FastThreadLocal 类型的 Set 集合threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); // 将 Set 集合填充到数组下标 0 的位置} else {variablesToRemove = (Set<FastThreadLocal<?>>) v; // 如果不是 UNSET,Set 集合已存在,直接强转获得 Set 集合}variablesToRemove.add(variable); // 将 FastThreadLocal 添加到 Set 集合中
}

variablesToRemoveIndex 是采用 static final 修饰的变量,在 FastThreadLocal 初始化时 variablesToRemoveIndex 被赋值为 0。InternalThreadLocalMap 首先会找到数组下标为 0 的元素,如果该元素是缺省对象 UNSET 或者不存在,那么会创建一个 FastThreadLocal 类型的 Set 集合,然后把 Set 集合填充到数组下标 0 的位置。如果数组第一个元素不是缺省对象 UNSET,说明 Set 集合已经被填充,直接强转获得 Set 集合即可。这就解释了 InternalThreadLocalMap 的 value 数据为什么是从下标为 1 的位置开始存储了,因为 0 的位置已经被 Set 集合占用了。

为什么 InternalThreadLocalMap 要在数组下标为 0 的位置存放一个 FastThreadLocal 类型的 Set 集合呢?这时候我们回过头看下 remove() 方法。

public final void remove() {remove(InternalThreadLocalMap.getIfSet());
}
public static InternalThreadLocalMap getIfSet() {Thread thread = Thread.currentThread();if (thread instanceof FastThreadLocalThread) {return ((FastThreadLocalThread) thread).threadLocalMap();}return slowThreadLocalMap.get();
}
public final void remove(InternalThreadLocalMap threadLocalMap) {if (threadLocalMap == null) {return;}Object v = threadLocalMap.removeIndexedVariable(index); // 删除数组下标 index 位置对应的 valueremoveFromVariablesToRemove(threadLocalMap, this); // 从数组下标 0 的位置取出 Set 集合,并删除当前 FastThreadLocalif (v != InternalThreadLocalMap.UNSET) {try {onRemoval((V) v); // 空方法,用户可以继承实现} catch (Exception e) {PlatformDependent.throwException(e);}}
}

在执行 remove 操作之前,会调用 InternalThreadLocalMap.getIfSet() 获取当前 InternalThreadLocalMap。有了之前的基础,理解 getIfSet() 方法就非常简单了,如果是 FastThreadLocalThread 类型,直接取 FastThreadLocalThread 中 threadLocalMap 属性。如果是普通线程 Thread,从 ThreadLocal 类型的 slowThreadLocalMap 中获取。

找到 InternalThreadLocalMap 之后,InternalThreadLocalMap 会从数组中定位到下标 index 位置的元素,并将 index 位置的元素覆盖为缺省对象 UNSET。接下来就需要清理当前的 FastThreadLocal 对象,此时 Set 集合就派上了用场,InternalThreadLocalMap 会取出数组下标 0 位置的 Set 集合,然后删除当前 FastThreadLocal。最后 onRemoval() 方法起到什么作用呢?Netty 只是留了一处扩展,并没有实现,用户需要在删除的时候做一些后置操作,可以继承 FastThreadLocal 实现该方法。

至此,FastThreadLocal.set() 的完成过程已经讲完了,接下来我们继续 FastThreadLocal.get() 方法的实现就易如反掌拉。FastThreadLocal.get() 的源码实现如下:

public final V get() {InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();Object v = threadLocalMap.indexedVariable(index); // 从数组中取出 index 位置的元素if (v != InternalThreadLocalMap.UNSET) {return (V) v;}return initialize(threadLocalMap); // 如果获取到的数组元素是缺省对象,执行初始化操作
}
public Object indexedVariable(int index) {Object[] lookup = indexedVariables;return index < lookup.length? lookup[index] : UNSET;
}
private V initialize(InternalThreadLocalMap threadLocalMap) {V v = null;try {v = initialValue();} catch (Exception e) {PlatformDependent.throwException(e);}threadLocalMap.setIndexedVariable(index, v);addToVariablesToRemove(threadLocalMap, this);return v;
}

首先根据当前线程是否是 FastThreadLocalThread 类型找到 InternalThreadLocalMap,然后取出从数组下标 index 的元素,如果 index 位置的元素不是缺省对象 UNSET,说明该位置已经填充过数据,直接取出返回即可。如果 index 位置的元素是缺省对象 UNSET,那么需要执行初始化操作。可以看到,initialize() 方法会调用用户重写的 initialValue 方法构造需要存储的对象数据,如下所示。

private final FastThreadLocal<String> threadLocal = new FastThreadLocal<String>() {@Overrideprotected String initialValue() {return "hello world";}
};

构造完用户对象数据之后,接下来就会将它填充到数组 index 的位置,然后再把当前 FastThreadLocal 对象保存到待清理的 Set 中。整个过程我们在分析 FastThreadLocal.set() 时都已经介绍过,就不再赘述了。

到此为止,FastThreadLocal 最核心的两个方法 set()/get() 我们已经分析完了。下面有两个问题我们再深入思考下。

  1. FastThreadLocal 真的一定比 ThreadLocal 快吗?答案是不一定的,只有使用FastThreadLocalThread 类型的线程才会更快,如果是普通线程反而会更慢。
  2. FastThreadLocal 会浪费很大的空间吗?虽然 FastThreadLocal 采用的空间换时间的思路,但是在 FastThreadLocal 设计之初就认为不会存在特别多的 FastThreadLocal 对象,而且在数据中没有使用的元素只是存放了同一个缺省对象的引用,并不会占用太多内存空间。

总结

本节课我们对比介绍了 ThreadLocal 和 FastThreadLocal,简单总结下 FastThreadLocal 的优势。

  • 高效查找。FastThreadLocal 在定位数据的时候可以直接根据数组下标 index 获取,时间复杂度 O(1)。而 JDK 原生的 ThreadLocal 在数据较多时哈希表很容易发生 Hash 冲突,线性探测法在解决 Hash 冲突时需要不停地向下寻找,效率较低。此外,FastThreadLocal 相比 ThreadLocal 数据扩容更加简单高效,FastThreadLocal 以 index 为基准向上取整到 2 的次幂作为扩容后容量,然后把原数据拷贝到新数组。而 ThreadLocal 由于采用的哈希表,所以在扩容后需要再做一轮 rehash。
  • 安全性更高。JDK 原生的 ThreadLocal 使用不当可能造成内存泄漏,只能等待线程销毁。在使用线程池的场景下,ThreadLocal 只能通过主动检测的方式防止内存泄漏,从而造成了一定的开销。然而 FastThreadLocal 不仅提供了 remove() 主动清除对象的方法,而且在线程池场景中 Netty 还封装了 FastThreadLocalRunnable,FastThreadLocalRunnable 最后会执行 FastThreadLocal.removeAll() 将 Set 集合中所有 FastThreadLocal 对象都清理掉,

FastThreadLocal 体现了 Netty 在高性能方面精益求精的设计精神,FastThreadLocal 仅仅是其中的冰山一角,下节课我们继续探索 Netty 中其他高效的数据结构技巧。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/780108.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uniapp开发App——登陆流程 判断是否登陆,是,进入首页,否,跳转到登录页

一、登陆流程 文字描述&#xff1a;用户进入App&#xff0c;之后就是判断该App是否有用户登陆过&#xff0c;如果有&#xff0c;直接进入首页&#xff0c;否则跳转到登陆页&#xff0c;登陆成功后&#xff0c;进入首页。 流程图如下&#xff1a; 二、在uniapp项目中代码实现 实…

代码随想录算法训练营第二十四天(回溯1)|77. 组合(JAVA)

文章目录 回溯理论基础概念类型回溯模板 77. 组合解题思路源码 回溯理论基础 概念 回溯是递归的副产品&#xff0c;本质上是一种穷举 回溯解决的问题可以抽象为一种树形结构 类型 回溯主要用来解决以下问题 组合问题&#xff1a;N个数里面按一定规则找出k个数的集合切割问…

如何使用Docker打包构建Java项目然后部署发布?

前言 今天我们来讲下如何使用Docker打包构建Java项目并且完成部署发布。 前期准备&#xff0c;需要安装好docker。 以及一个需要安装好Maven的镜像&#xff0c;可以参考下面的文章。 构建一个包含mvn命令的Java 17基础镜像-CSDN博客 一、打包构建Java项目镜像 1、创建Jav…

【力扣】300. 最长递增子序列(DFS+DP两种方法实现)

目录 题目传送最长递增子序列[DFS 方法]DFS方法思路图思路简述代码大家可以自行考虑有没有优化的方法 最长递增子序列[DP]方法DP方法思路图思路简述代码方案 题目传送 原题目链接 最长递增子序列[DFS 方法] DFS方法思路图 思路简述 对于序列中的每一个数字只有选择和不选择两…

电脑开机慢怎么办,电脑开机慢解决方法

新电脑呢&#xff0c;开机速度特别快。有很多人会感觉到电脑拿回家以后&#xff0c;一按开机键&#xff0c;六秒七秒&#xff0c;这个电脑就启动起来了&#xff0c;但是用一段时间以后呢&#xff0c;他会明显感觉到这个开机的速度呢&#xff0c;会特别慢&#xff0c;可能从六秒…

OSX-02-Mac OS应用开发系列课程大纲和章节内容设计

本节笔者会详细介绍下本系统专题的大纲&#xff0c;以及每个专题章节的组织结构。这样读者会有一个全局的概念。 在开始前还是在再介绍一下下面这个框架图&#xff0c;因为比较重要&#xff0c;在这里再冗余介绍一下。开发Apple公司相关产品的软件时&#xff0c;主要有两个框架…

kubernetes(K8S)学习(六):K8S之Dashboard图形界面

K8S之Dashboard图形界面 一、Dashboard简介二、k8s安装Dashboard(1)下载Dashboard镜像&#xff08;可选&#xff09;(2)根据yaml文件创建资源(3)查看资源(4)生成登录需要的token(5)使用火狐 / 搜狗浏览器访问&#xff08;个人用的搜狗&#xff09; 一、Dashboard简介 官网&…

【最新版RabbitMQ3.13】Linux安装基于源码构建的RabbitMQ教程

前言 linux环境 安装方式有三种&#xff0c;我们这里使用源码安装 Linux下rpm、yum和源码三种安装方式简介 个人语雀首发教程&#xff1a;https://www.yuque.com/wzzz/java/kl2zn22b42svsc6b csdn地址: https://blog.csdn.net/u013625306/article/details/137151862 安装版本…

Netty核心原理剖析与RPC实践6-10

Netty核心原理剖析与RPC实践6-10 06-粘包拆包问题&#xff1a;如何获取一个完整的网络包 本节课开始我们将学习 Netty 通信过程中的编解码技术。编解码技术这是实现网络通信的基础&#xff0c;让我们可以定义任何满足业务需求的应用层协议。在网络编程中&#xff0c;我们经常…

3D人体姿态估计项目 | 从2D视频中通过检测人体关键点来估计3D人体姿态实现

项目应用场景 人体姿态估计是关于图像或视频中人体关节的 2D 或 3D 定位。一般来说&#xff0c;这个过程可以分为两个部分&#xff1a;(1) 2D 视频中的 2D 关键点检测&#xff1b;(2) 根据 2D 关键点进行 3D 位姿估计。这个项目使用 Detectron2 从任意的 2D 视频中检测 2D 关节…

vue2处理跨域问题

vue中访问springboot中的RestController中的服务 &#xff08;vue.config.js不生效-CSDN博客&#xff09; 1、创建项目 使用vue init webpack my_frontend 创建vue项目 在HelloWorld.vue文件中添加内容&#xff1a; HelloWorld.vue 文件内容&#xff1a; <template>&…

LLMs之Mistral:Mistral 7B v0.2的简介、安装和使用方法、案例应用之详细攻略

LLMs之Mistral&#xff1a;Mistral 7B v0.2的简介、安装和使用方法、案例应用之详细攻略 导读&#xff1a;Mistral AI首个7B模型发布于2023年9月&#xff0c;在基准测试中超越Llama 2 13B&#xff0c;一下子声名大振。Mistral 7B v0.2对应的指令调优版本Mistral-7B-Instruct-v0…

JS数组练习

查找、筛选 Code <script>// 筛选>10的数组中元素var num1 [10, 2, 5, 0, 11, 121, 3, 0];var num2 [];var j 0;for (var i 0; i < num1.length; i) {// 法1// if (num1[i] > 10) {// num2[j] num1[i];// j;// }// 法2// if (num1[i] > 10) {/…

目前现货黄金行情技术分析

目前行情黄金可以投资吗&#xff1f;不论是黄金价格怎么波动&#xff0c;总是有投资者问这个问题&#xff0c;原因是他们搞不清现状&#xff0c;弄不懂当前的市场形势&#xff0c;对于技术分析和基本分析的方法也不甚了解&#xff0c;因此缺乏对未来行情判断的能力。下面我们就…

环境温度对测量平板有什么影响

环境温度可以对测量平板有影响。温度变化可以导致平板的尺寸发生变化。根据热膨胀原理&#xff0c;当环境温度升高时&#xff0c;平板的尺寸会扩大&#xff1b;当环境温度降低时&#xff0c;平板的尺寸会缩小。这种尺寸变化可能会导致测量结果的误差。因此&#xff0c;在测量平…

AI计算平台设计方案:901-基于3U VPX的图像数据AI计算平台

一、产品概述 设备基于3U VPX的导冷结构&#xff0c;集成FPGA接口预处理卡&#xff0c;GPU板卡、飞腾ARM处理卡&#xff0c;实现光纤、差分电口或者Camera link的图像接入&#xff0c;FPGA信号预处理&#xff0c;GPU AI计算&#xff0c;飞腾ARM的采集管理存储。 二、系统…

网安播报 | GitHub遭遇严重的供应链“投毒”攻击,影响GG平台

1、GitHub遭遇严重的供应链“投毒”攻击&#xff0c;影响GG平台 多年来&#xff0c;威胁行为者一直在使用多种策略、技术和程序 &#xff08;TTP&#xff09;&#xff0c;包括劫持 GitHub账户、分发恶意 Python 包、使用虚假的 Python 基础设施以及社会工程进行攻击&#xff0c…

jupyter lab使用虚拟环境

python -m ipykernel install --name 虚拟环境名 --display-name 虚拟环境名然后再启动jupyter lab就行了

计算机视觉的应用25-关于Deeplab系列语义分割模型的应用场景,以及空洞卷积的介绍

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下计算机视觉的应用25-关于Deeplab系列语义分割模型的应用场景&#xff0c;以及空洞卷积的介绍。Deeplab是Google研发的一系列深度学习模型&#xff0c;主要用于图像语义分割任务&#xff0c;其在众多应用场景中展现出…

设计模式之代理模式精讲

代理模式&#xff08;Proxy Pattern&#xff09;也叫委托模式&#xff0c;是一个使用率非常高的模式&#xff0c;比如我们在Spring中经常使用的AOP&#xff08;面向切面编程&#xff09;。 概念&#xff1a;为其他对象提供一种代理以控制对这个对象的访问。 代理类和实际的主题…