本篇为Netty系列的最后一篇,按照惯例会简单介绍一些Netty相关核心源码。
1、Netty启动源码分析
代码就使用最初的Netty服务器案例,在bind这一行打上断点,观察启动的全过程:
由于某些方法的调用链过深,节约篇幅,不会一张张地截图,只会对最终结果或者关键部分进行说明分析:
doBind 是一个重点方法,其中包含了:
- initAndRegister:初始化ServerSocketChannel并将ServerSocketChannel注册到selector的方法
- doBind0:ServerSocketChannel绑定端口号的方法
1.1、initAndRegister
首先会调用channelFactory工厂类的方法得到一个Channel
相当于NIO中的:
ServerSocketChannel ssc = ServerSocketChannel.open();
然后进入init方法,关键点在于,利用刚刚得到的channel对象,创建了一个流水线,并且添加了一个ChannelInitializer 处理器,监听初始化事件,在初始化事件中,使用eventLoop所在的NIO线程,提交一个任务,向pipeline中新增一个ServerBootstrapAcceptor用于处理新连接。
真正的调用时机是在AbstractChannel的register0中pipeline.invokeHandlerAddedIfNeeded();方法调用时被执行
然后进入.register(channel):
经过一系列的调用链,最终会进入AbstractChannel的register 方法:
关键点:首先会判断当前线程是否是NIO线程,此时是主线程,所以会进入else分支:
在try代码块中,会进行线程切换,由NIO线程负责注册。
我们选择NIO线程,进入register0方法,在register0方法中,有三个关键方法doRegister() 、pipeline.invokeHandlerAddedIfNeeded(); 和safeSetSuccess(promise);
1.1.1、 doRegister()
上图中框出的这一行代码,相当于NIO中的
SelectionKey sscKey = ssc.register(selector, 0, attach);
pipeline.invokeHandlerAddedIfNeeded(); 该方法被执行时会回调ServerBootstrap中init方法的p.addLast,
1.1.2、safeSetSuccess(promise);
该方法是给主线程的final ChannelFuture regFuture 结果。参数中的promise和主线程的regFuture 是同一个。
1.2、doBind0
doBind0实际上是主线程注册的一个regFuture监听回调对象中的方法。当initAndRegister 返回结果后,才会触发回调对象中的operationComplete方法:
在方法内部依旧是保证任务由NIO所在线程执行:
经过一系列的调用,找到了AbstractChannel中的bind方法,bind方法中又有两个重点:
doBind进行端口号绑定
对应NIO中的:
ssc.bind(new InetSocketAddress(8080));
然后会进入if代码块判断,如果目前ServerSocketChannel处于Active状态,就触发流水线上所有的active事件。
最后定位到AbstractChannel中的doBeginRead方法,在方法中注册一个接受连接事件:
相当于NIO中的
sscKey.interestOps(SelectionKey.OP_ACCEPT);
小结:
Netty的启动流程大致可以分为三部分:
- 创建ServerSocketChannel对象。
- 将ServerSocketChannel对象注册到selector上。
- ServerSocketChannel进行端口绑定。
其中,创建ServerSocketChannel对象是由主线程进行的,在将ServerSocketChannel对象注册到selector上时,会进行线程切换,由NIO线程去完成注册以及后续的端口绑定。
在创建ServerSocketChannel对象后,会向ServerSocketChannel的流水线上先注册一个ChannelInitializer事件,加入acceptor handler,但是是在第二步注册后调用流水线的invokeHandlerAddedIfNeeded触发。
端口绑定的方法dobind是regFuture的回调,第二步注册后会向promise中存放结果,由NIO所在线程执行端口绑定,绑定完成后触发NioServerSocketChannel的active事件,设置关注连接事件。
2、EventLoop源码分析
在翻源码之前,我们简单地复习一下什么是EventLoop:
EventLoop是一个不断循环的线程,用于处理所有注册到其上的事件。每一个Channel在创建时会被分配到一个EventLoop上,并且与其绑定,后续该Channel的所有事件都由这个EventLoop进行处理。
EventLoop既可以处理IO事件,也可以处理普通事件或定时事件。
我们重点看它的NioEventLoop实现,NioEventLoop 主要由selector,线程,任务队列组成。
2.1、selector何时被创建
NioEventLoop 有两个selector,可以理解成selector是经过封装优化的,而unwrappedSelector是原始的selector。
它们是在构造方法中被初始化 :
2.2、NioEventLoop 的NIO线程何时启动?
通过下面的案例代码,观察NIO线程启动的时机:
public class TestEventLoop {public static void main(String[] args) {EventLoop eventLoop = new NioEventLoopGroup().next();eventLoop.execute(()->{System.out.println("test");});}
}
进入execute方法:
首先if代码块会检查任务对象是否为空。
然后通过inEventLoop(); 方法检查当前线程是否是NIO线程,此时false。
进入startThread()方法,第一次的state必然和ST_NOT_STARTED相等,进入最外层的if块。如果此时没有其他线程修改状态,则通过第二个if块中的CAS操作将状态修改为2,并且进入doStartThread()方法
doStartThread()方法是启动NIO线程的核心方法:
在 SingleThreadEventExecutor.this.run();中,会根据不同的事件执行对应的操作:
如果没有任务,会进入SelectStrategy.SELECT分支,陷入阻塞。
NIO线程是懒加载的,只有在执行execute方法时才会被创建。
2.3、提交普通任务会不会结束select阻塞?
在select方法内部,会调用有时限的阻塞方法,默认时间是1000ms,在这个期间如果被唤醒则会解除阻塞。
在提交任务的execute中,有一个wakeup方法,我们选择它的NIO实现:
如果不是当前NIO线程的任务,并且CAS成功(因为唤醒操作只需要调用一次wakeup方法,如果多个线程同时调用多次和调用一次的效果是一样的,多次调用影响性能。),才会调用唤醒方法:
2.4、循环时什么时候会进入SelectStrategy.SELECT分支?
进入SelectStrategy.SELECT分支的情况是没有任务:
如果有任务会调用selectNowSupplier的get()方法返回一个selectNow()
selectNow() 方法的作用是立刻查看selector上有无IO事件,如果有则会将IO事件也一起拿到,如果没有就返回0。
2.5、NIO空轮询bug的体现以及Netty的解决方法
什么是NIO的空轮询bug?指的是selector.select(timeoutMillis); 没有到超时时间,期间也没有任务或者事件,但是NIO线程没有在这一行陷入阻塞,而是不断地进行空循环。
这种bug主要是出现在linux环境下,在Netty框架中对其进行了解决:
关键点在于引入了一个计数器,每循环一次计数器+1
当设置了阈值并且循环的次数超过了阈值,就可以认为发生了这个bug,会调用selectRebuildSelector 方法重建一个selector,并且将原有的key以及事件复制过去
阈值的默认值是512次,或者通过参数进行设置:
2.6、ioRatio的作用
在NioEventLoop的run方法中有一段关于处理IO事件和普通事件的逻辑:
其中涉及到了ioRatio,它在成员变量中的默认值是50。
如果它的值为100,则会执行所有的IO事件和普通事件。
否则会对执行普通任务的时间进行计算,用当前时间 - IO事件发生前的当前时间 = IO事件的消耗时间。 假设为4s,然后用 4 * (100 - 50)/ 50 = 4s,得到普通任务的执行时间也为4s,如果在规定的时间内没有执行完普通任务,则会停止执行。
2.7、selectedKeys优化
在创建selector时,会通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet
SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能:
这一行是取出附件,在将ServerSocketChannel绑定到selector时,附件对象是Channel。
final Object a = k.attachment();
满足下面的if代码块判断,会进入processSelectedKey(k, (AbstractNioChannel) a); 方法,在这个方法里会根据不同的事件类型做出判断并且执行:
3、accpet源码分析
在原先的NIO中,一旦有事件发生,则会执行以下的代码逻辑:
//1 阻塞直到事件发生
selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) { //2 拿到一个事件SelectionKey key = iter.next();//3 如果是 accept 事件if (key.isAcceptable()) {//4 执行 acceptSocketChannel channel = serverSocketChannel.accept();channel.configureBlocking(false);//5 关注 read 事件channel.register(selector, SelectionKey.OP_READ);}// ...
}
我们来看一下上面的代码在Netty中的实现过程:
接着2.7中的代码,进入最后一个分支的read方法:
选择AbstractNioMessageChannel 实现,关键代码有以下三处
3.1、doReadMessages(readBuf)
我们选择NioServerSocketChannel的实现:
在SocketChannel ch = SocketUtils.accept(javaChannel()); 这一行代码中,会得到一个SocketChannel,相当于:
SocketChannel channel = serverSocketChannel.accept();
然后会把这个SocketChannel封装在一个NioSocketChannel对象中,并且存放在参数中的list,然后返回:
在NioSocketChannel父类的构造方法中也会设置channel为非阻塞,相当于:
channel.configureBlocking(false);
3.2、allocHandle.incMessagesRead(localRead);
这个方法的主要作用是接受客户端的连接。
3.3、pipeline.fireChannelRead(readBuf.get(i));
这个方法的作用是触发 read 事件,让 pipeline 上的 handler 处理,触发的是ServerBootstrapAcceptor 上的channelRead 事件:
主要看try中的代码:
又回到了register方法中,不过这次线程切换是从NIO的Boss切换到worker:
切换到worker线程:
进入doRegister
注册事件,相当于:
channel.register(selector, 0);
最后关注read事件:
一路跳转到doBeginRead中,执行关注read事件的逻辑:
大致流程和accpet类似,最大的区别是由Worker线程完成。
4、read源码分析
当客户端连接上服务器并且触发了一次write操作时,服务器首先会触发连接操作:
跳过,下一次会触发读取操作:
config.getAllocator(); 会分配一个ByteBufAllocator
得到byteBuf:
在循环中进行读取的逻辑: