前置说明
本文主要记录Netty中的一些主要的关键时间点,是理解Netty 和 事件驱动的关键。也是阅读Netty源码的指导。
代码来源:Netty 4.1.77
本文涉及到的角色:
角色:主线程,主Reactor线程(bossGroup中的EventLoop), 从Reactor线程(workGroup中的EventLoop),结合Reactor编码模型理解
PS:Netty中的主要事件节点不断完善中,看看后面要不要拆成多篇。。。
主Reactor线程的启动时间点
主线程调用ServerBootStrap.bind方法,会通过io.netty.bootstrap.AbstractBootstrap#initAndRegister方法,先初始化NioServerSocktChannel,然后把NioServerSocktChannel注册到主Reactor线程组中的一个主Reactor线程上
final ChannelFuture initAndRegister() {Channel channel = null;try {// 创建和初始化channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}//注册ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}return regFuture;}
最终会调用该主Reactor线程的注册方法进行注册:io.netty.channel.AbstractChannel.AbstractUnsafe#register,
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...//检查相关代码忽略AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {//当前是Main线程调用,所以提交任务到ServerChannel对应的Reactor线程eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {异常处理忽略。。。}}}
由于是主线程调用的该方法,所以会通过io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable) -> io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable, boolean) 提交一个异步任务到该主Reactor线程,
private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();addTask(task); //把注册任务提交到主Reactor线程的任务队列if (!inEventLoop) {//当前是Main线程,需要启动主Reactor线程startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {}if (reject) {reject();}}}if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}}
然后主线程调用io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread方法启动主Reactor线程;
private void doStartThread() {assert this.thread == null;this.executor.execute(new Runnable() {public void run() {SingleThreadEventExecutor.this.thread = Thread.currentThread();...try {...SingleThreadEventExecutor.this.run();...
在该io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread方法中,是通过io.netty.util.concurrent.ThreadPerTaskExecutor#execute方法来提交任务,启动一个线程的,
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;...public void execute(Runnable command) {this.threadFactory.newThread(command).start();}
}
并且会把该线程赋值到主Reactor线程的io.netty.util.concurrent.SingleThreadEventExecutor#thread属性中,其实实际上的主Reactor线程应该是io.netty.util.concurrent.ThreadPerTaskExecutor#execute方法启动的线程;
此时,主Reactor线程就开始执行io.netty.channel.nio.NioEventLoop#run方法,进行主Reactor的核心逻辑了。
NioServerSocketChannel注册到主Reactor线程:
在上面的【主Reactor线程的启动时间点】流程中,我们知道,SeverBootStrap.bind方法会提交一个注册任务到主Reactor线程,主Reactor线程启动后,就会处理这个注册任务,该任务的核心代码为:io.netty.channel.AbstractChannel.AbstractUnsafe#register0 -> io.netty.channel.nio.AbstractNioChannel#doRegister
@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {//调用JDK中ServerSocketChannel的注册方法,把当前ServerSocketChannel注册到主Reactor线程中的selector上;//同时把Netty中的NioServerSocketChannel作为attachment,后面在通过JDK selector.select()获取事件的时候,可以获取到NioServerSocketChannel对象,方便执行Netty的操作selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {...异常处理忽略}}}
这里最终调用的是JDK中ServerSocketChannel的注册方法,把当前ServerSocketChannel注册到主Reactor线程中的selector上;
注意这里java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object)的参数:
att:这里把Netty中的NioServerSocketChannel作为attachment,后面在通过JDK selector.select()获取事件的时候,可以获取到NioServerSocketChannel对象,方便执行Netty的操作。
ops:0, 这里注册ServerSocketChannel的时候,感兴趣的操作是0,而OP_ACCETP对应的值是16,
java.nio.channels.SelectionKey
public static final int OP_ACCEPT = 1 << 4;
Netty这里并没有注册感兴趣的事件。OP_ACCETP事件的添加另有他处。
NioServerSocketChannel注册OP_ACCEPT
在ServerBootStrap.bind方法,首先会通过io.netty.bootstrap.AbstractBootstrap#initAndRegister方法,先初始化NioServerSocktChannel,然后把NioServerSocktChannel注册到主Reactor线程组中的一个主Reactor线程上的selector上,此时没有关注IO事件,因为java.nio.channels.SelectableChannel#register注册的ops参数为0,不代表任何IO事件。
NioServerSocketChannel异常注册的时候,bind方法还提交了一个bind任务到主Reactor线程。
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister(); //创建、初始化、异步注册NioServerSocketChannelfinal Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 给异步注册任务添加Listener,在异步注册完成后,执行该Listener,通过doBind0向主Reactor线程提交bind任务regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}
给异步注册任务添加Listener,在异步注册完成后,执行该Listener,向主Reactor线程提交bind任务。
该bind任务,最终会调用io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise),在NioServerSocketChannel的pipeline中传播bind事件,该事件是一个outbound事件,会从tail开始,从后往前查找覆盖了bind方法的AbstractChannelHandlerContext,执行对应ChannelHandler的bind方法,最终会执行到HeadContext的bind方法:io.netty.channel.DefaultChannelPipeline.HeadContext#bind
@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);}
这里我们重点关注bind的整体执行主体行为:
通过unsafe.bind执行底层JDK的bind操作;然后向主Reactor线程提交ChannelActive事件,在pipeline中传播ChannelActive事件
@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {检查代码忽略boolean wasActive = isActive(); //此时wasActive为falsetry {doBind(localAddress); //调用底层JDK方法bind} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive && isActive()) {//doBind完成后,isActive为trueinvokeLater(new Runnable() {@Overridepublic void run() {//向主Reactor线程提交ChannelActive事件pipeline.fireChannelActive();}});}safeSetSuccess(promise);}@Overridepublic boolean isActive() {// As java.nio.ServerSocketChannel.isBound() will continue to return true even after the channel was closed// we will also need to check if it is open.return isOpen() && javaChannel().socket().isBound();}
pipeline中ChannelActive事件的传播:
ChannelActive事件是inBound事件,会从当前ChannelHandlerContext向后查找覆盖了channelActive的ChannelHandler,进行处理
主要处理逻辑在HeadContext中,
@Overridepublic void channelActive(ChannelHandlerContext ctx) {//pipeline中继续向后传播channelActive事件ctx.fireChannelActive();//如果是autoRead 则自动触发read事件传播//在read回调函数中 触发OP_ACCEPT注册readIfIsAutoRead();}@Overridepublic Channel read() {pipeline.read();return this;}@Overridepublic void read(ChannelHandlerContext ctx) {unsafe.beginRead();}
然后通过unsafe.beginRead注册OP_ACCEPT
@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();//这里readInterestOp就是NIOServerSocketChannel创建的时候传入的OP_ACCEPT,super(null, channel, SelectionKey.OP_ACCEPT);if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}
此时,才完成了对OP_ACCEPT IO事件的关注监听。
主Reactor线程获取客户端连接时间:
当有客户端完成了三次握手后,服务端Socket会收到OP_ACCEPT事件,在主Reactor的核心逻辑io.netty.channel.nio.NioEventLoop#run方法中,会处理该OP_ACCEPT事件。
io.netty.channel.nio.NioEventLoop#processSelectedKeys -> io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {//获取Channel的底层操作类Unsafefinal AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {......如果SelectionKey已经失效则关闭对应的Channel......}try {//获取IO就绪事件int readyOps = k.readyOps();//处理Connect事件if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();//移除对Connect事件的监听,否则Selector会一直通知ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);//触发channelActive事件处理Connect事件unsafe.finishConnect();}//处理Write事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}//处理Read事件或者Accept事件if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}
然后会调用io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read方法进行底层操作,调用io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages方法,然后通过io.netty.util.internal.SocketUtils#accept方法,调用JDK中的java.nio.channels.ServerSocketChannel#accept获取客户端SocketChannel,并放入io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#readBuf容器中存放
然后遍历readBuf容器中的NioSocketChannel,通过NioServerSocketChannel#pipeline传播ChannelRead事件,最后会在io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead的方法中处理,向从Reactor线程组注册NioSocketChannel,触发从Reactor线程的启动。(流程和主Reactor线程启动调用)
从Reactor线程启动时间:
在上面【主Reactor获取客户端连接】的时候,会向从Reactor线程注册,这里会提交任务触发从Reactor线程的启动。
NioSocketChannel添加关注事件:OP_READ
在【主Reactor线程获取客户端连接时间】流程中,我们知道NioServerSocketChannel向主Reactor线程注册的时候,会在pipeline中传播ChannelRead事件,ServerBootstrap.ServerBootstrapAcceptor#channelRead的方法会处理ChannelRead,向从Reactor线程组注册NioSocketChannel,触发从Reactor线程的启动
@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {...try {// 选择一个从Reactor提交注册任务childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {...}});} catch (Throwable t) {forceClose(child, t);}}//io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}// io.netty.channel.AbstractChannel.AbstractUnsafe#register
这里的注册处理顺序与【NioServerSocketChannel注册OP_ACCEPT】顺序一致,且在调用JDK的时候,感兴趣的时间也是0;然后开始传播pipeline.fireChannelActive();
io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive处理ChannelActive事件,
@Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();readIfIsAutoRead();}
然后,io.netty.channel.Channel.Unsafe#beginRead -> io.netty.channel.nio.AbstractNioChannel#doBeginRead
@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {// readInterestOp 在NioServerSocketChannel和NIOSocketChannel中不一样selectionKey.interestOps(interestOps | readInterestOp);}}
这里与【NioServerSocketChannel注册OP_ACCEPT】的差异在于readInterestOp的值,这里是NIOSocketChannel,super(parent, ch, SelectionKey.OP_READ);
至此,OP_READ事件就可以监听了
从Reactor线程读取客户端数据时间
从Reactor线程向取客户端写数据时间
PS:耗时良久啊。相当于又看了几遍源码