文章目录
- Pre
- Netty主从Reactor线程模型
- 服务端channel注册流程
- 源码解读
- 入口 serverBootstrap.bind(port)
- 执行队列中的任务 : AbstractUnsafe#register0
- 注册 doRegister()
- 源码流程图
Pre
Netty Review - ServerBootstrap源码解析
Netty Review - NioServerSocketChannel源码分析
Netty主从Reactor线程模型
Netty 使用主从 Reactor 线程模型来处理并发连接和网络事件。
在 Netty 中,通常有两种类型的线程池:
-
Boss 线程池:用于接受客户端连接请求,并将接受到的连接注册到 Worker 线程池的 EventLoop 中。Boss 线程池中的线程负责监听 ServerSocketChannel,并将接受到的连接分配给 Worker 线程池中的某个 EventLoop 处理。
-
Worker 线程池:每个 Worker 线程池包含多个 EventLoop,每个 EventLoop 负责处理一组连接的读写和事件处理。当一个连接被注册到某个 Worker 线程池的 EventLoop 中时,该 EventLoop 将负责处理这个连接的所有事件,包括读取数据、写入数据、处理网络事件等。
主从 Reactor 线程模型的工作流程如下:
-
主线程池(Boss 线程池)负责监听 ServerSocketChannel 上的连接请求,并将接受到的连接请求分配给 Worker 线程池中的某个 EventLoop。
-
Worker 线程池中的每个 EventLoop 都独立负责一组连接的读写和事件处理。当一个连接被注册到某个 EventLoop 上时,该 EventLoop 将会不断地轮询连接上是否有可读事件或可写事件,并在事件发生时进行相应的处理。
-
当有读写事件发生时,EventLoop 将调用对应的 ChannelHandler 进行处理。这些 ChannelHandler 可以进行数据解析、业务逻辑处理等操作。
-
处理完事件后,EventLoop 可能会将结果写回到连接中,或者关闭连接等。
通过主从 Reactor 线程模型,Netty 可以高效地处理大量的并发连接和网络事件,提高了网络应用程序的性能和可扩展性。
服务端channel注册流程
在Netty中,服务端Channel注册流程涉及以下几个关键步骤:
-
创建ServerBootstrap实例: 首先,需要创建一个ServerBootstrap实例,它是Netty提供的用于启动服务端的引导类。
-
配置ServerBootstrap: 使用ServerBootstrap实例,设置一系列参数,包括线程模型、Channel类型、处理器等。
-
绑定端口并启动服务: 调用ServerBootstrap的bind方法,指定端口并启动服务端。在bind方法内部,会进行以下操作:
-
创建NioServerSocketChannel实例:用于表示服务端的Channel,内部封装了Java NIO中的ServerSocketChannel。
-
初始化ChannelPipeline:为NioServerSocketChannel实例创建一个ChannelPipeline对象,用于管理ChannelHandler链。
-
创建ChannelInitializer并添加到ChannelPipeline:ChannelInitializer是一个特殊的ChannelHandler,它用于在Channel注册到EventLoop之后初始化ChannelPipeline。在ChannelInitializer的initChannel方法中,可以向ChannelPipeline中添加自定义的ChannelHandler。
-
获取EventLoopGroup并注册Channel:从ServerBootstrap中获取Boss EventLoopGroup,然后调用其register方法注册NioServerSocketChannel到EventLoop上。
-
-
注册Channel到EventLoop: 在调用register方法时,会将NioServerSocketChannel注册到Boss EventLoop上。在注册过程中,会执行以下操作:
-
获取EventLoop:根据配置,从Boss EventLoopGroup中选择一个EventLoop。
-
调用EventLoop的register方法:将NioServerSocketChannel注册到选定的EventLoop上。注册过程中,会创建一个NioServerSocketChannelUnsafe实例来处理注册过程,其中会调用底层的Java NIO方法将ServerSocketChannel注册到Selector上,并监听ACCEPT事件。
-
-
事件处理: 一旦NioServerSocketChannel注册到了EventLoop上,就会开始监听ACCEPT事件。当有新的连接接入时,会触发ACCEPT事件,EventLoop会调用相关的ChannelHandler进行处理,如调用ChannelInitializer的initChannel方法,添加用户自定义的ChannelHandler到新的连接的ChannelPipeline中。接着,新的连接就可以接受和处理客户端的请求了。
通过以上流程,服务端Channel在Netty中的注册过程就完成了,它可以接受客户端的连接,并将连接注册到EventLoop上进行事件处理。
源码解读
当我们梳理完
Netty Review - ServerBootstrap源码解析
Netty Review - NioServerSocketChannel源码分析
接下来让我们从下面这一行代码开始
ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
channelFuture.channel().closeFuture().sync();
这段代码用于启动服务端并阻塞当前线程直到服务端关闭。
-
serverBootstrap.bind(9000)
:调用serverBootstrap
的bind()
方法绑定端口9000,并返回一个ChannelFuture
对象,表示绑定操作的异步结果。 -
.sync()
:调用sync()
方法阻塞当前线程,直到绑定操作完成。这样做是为了确保服务端在端口绑定完成后再继续执行后续代码。 -
channelFuture.channel().closeFuture().sync()
:获取channelFuture
中的channel()
,然后调用其closeFuture()
方法获取一个表示关闭操作的ChannelFuture
对象。接着,再次调用sync()
方法阻塞当前线程,直到关闭操作完成。这样做是为了让当前线程一直等待直到服务端关闭。
入口 serverBootstrap.bind(port)
这段代码是bind(int inetPort)
方法的实现。
/*** Create a new {@link Channel} and bind it.*/
public ChannelFuture bind(int inetPort) {// 调用bind方法,传入一个InetSocketAddress对象,该对象使用指定的端口号创建return bind(new InetSocketAddress(inetPort));
}
创建一个新的Channel并绑定到指定的端口。
doBind(final SocketAddress localAddress)
这段代码是doBind(final SocketAddress localAddress)
方法的实现。
private ChannelFuture doBind(final SocketAddress localAddress) {// 初始化并注册Channel,并返回一个ChannelFuturefinal ChannelFuture regFuture = initAndRegister();// 获取注册完成的Channelfinal Channel channel = regFuture.channel();// 如果注册过程中发生异常,则直接返回注册的ChannelFutureif (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// 如果注册已经完成,则创建一个新的ChannelPromise,并执行绑定操作ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// 如果注册尚未完成,则创建一个PendingRegistrationPromise,并添加一个监听器等待注册完成后再执行绑定操作final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// 如果注册过程中发生异常,则直接设置失败状态promise.setFailure(cause);} else {// 注册成功后执行绑定操作promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}
这段代码的作用是执行绑定操作,并返回一个与绑定操作相关的ChannelFuture。
initAndRegister()
final ChannelFuture initAndRegister() {Channel channel = null;try {// 使用channelFactory创建一个新的Channel实例channel = channelFactory.newChannel();// 对新创建的Channel进行初始化init(channel);} catch (Throwable t) {if (channel != null) {// 如果初始化过程中发生异常,关闭Channelchannel.unsafe().closeForcibly();// 创建一个新的ChannelPromise,并设置失败状态return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// 如果channel为null,则创建一个FailedChannel实例,并设置失败状态return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 使用ChannelConfig的EventLoopGroup进行注册ChannelFuture regFuture = config().group().register(channel);// 如果注册过程中发生异常,则关闭Channelif (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// 返回注册的ChannelFuturereturn regFuture;
}
创建一个新的Channel实例并对其进行初始化,然后使用EventLoopGroup将其注册到事件循环中。最后返回一个与注册操作相关的ChannelFuture。
channelFactory.newChannel()
channelFactory.newChannel() 中的实现,请移步 Netty Review - NioServerSocketChannel源码分析
init(channel)
@Override
void init(Channel channel) throws Exception {// 设置Channel的选项final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);}// 设置Channel的属性final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}// 获取Channel的PipelineChannelPipeline p = channel.pipeline();// 复制当前的子组、子处理器、子选项和子属性final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}// 向Channel的Pipeline中添加一个ChannelInitializerp.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {// 添加用户配置的处理器到Pipeline中pipeline.addLast(handler);}// 在Channel的事件循环中执行ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 添加一个ServerBootstrapAcceptor到Pipeline中,用于接收新连接pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}
init()
方法的作用是初始化Channel
,设置Channel
的选项和属性,然后向Channel
的Pipeline
中添加一个ChannelInitializer
,该Initializer
在Channel
的事件循环中执行,并向Pipeline
中添加一个ServerBootstrapAcceptor
,用于接收新连接。
config().group().register(channel)
@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}
next()
io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser#next
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {// 使用原子整数来维护索引,以确保在多线程环境中安全地获取下一个 EventExecutor 实例。private final AtomicInteger idx = new AtomicInteger();// 存储所有可用的 EventExecutor 实例的数组。private final EventExecutor[] executors;// 构造方法,接收一个 EventExecutor 实例数组作为参数,并将其存储在 executors 成员变量中。PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}// 选择下一个要使用的 EventExecutor 实例。// 通过对索引进行按位与操作(idx.getAndIncrement() & executors.length - 1),// 来确保索引始终在 executors 数组的有效范围内。// 由于 executors.length 必须是 2 的幂次方,因此使用按位与运算(&)可以有效地实现取模操作,// 从而将索引限制在数组长度范围内。@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}
}
@Override
public ChannelFuture register(final ChannelPromise promise) {// 检查传入的 promise 参数是否为 null,如果为 null,则抛出 NullPointerException 异常。ObjectUtil.checkNotNull(promise, "promise");// 获取与该 promise 关联的 Channel 实例,通过 unsafe() 方法获取 Channel 的 Unsafe 实例,然后调用 register() 方法注册 Channel。// 这里调用的是 unsafe() 方法,表示使用一种不安全的方式直接注册 Channel,而不经过 EventLoop 的事件循环。// register() 方法将 Channel 注册到当前 EventLoop,由 EventLoop 负责管理该 Channel。promise.channel().unsafe().register(this, promise);// 返回传入的 promise 对象,即注册 Channel 的异步结果。return promise;
}
AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 检查 eventLoop 参数是否为 null,如果为 null,则抛出 NullPointerException 异常。if (eventLoop == null) {throw new NullPointerException("eventLoop");}// 检查 Channel 是否已经注册到某个 EventLoop,如果已经注册,则设置 promise 的失败状态,并返回。if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}// 检查传入的 eventLoop 是否与当前 Channel 兼容,如果不兼容,则设置 promise 的失败状态,并返回。if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}// 将当前 Channel 的 eventLoop 属性设置为传入的 eventLoop。AbstractChannel.this.eventLoop = eventLoop;// 判断当前线程是否在 eventLoop 的事件循环中,如果是,则直接调用 register0() 方法进行注册。if (eventLoop.inEventLoop()) {register0(promise);} else {// 如果当前线程不在 eventLoop 的事件循环中,则通过 eventLoop.execute() 方法提交一个任务,让 eventLoop 执行注册操作。try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 如果无法将注册任务提交给 eventLoop 执行,则强制关闭 Channel,并设置 promise 的失败状态。logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 检查 eventLoop 参数是否为 null,如果为 null,则抛出 NullPointerException 异常。if (eventLoop == null) {throw new NullPointerException("eventLoop");}// 检查 Channel 是否已经注册到某个 EventLoop,如果已经注册,则设置 promise 的失败状态,并返回。if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}// 检查传入的 eventLoop 是否与当前 Channel 兼容,如果不兼容,则设置 promise 的失败状态,并返回。if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}// 将当前 Channel 的 eventLoop 属性设置为传入的 eventLoop。AbstractChannel.this.eventLoop = eventLoop;// 判断当前线程是否在 eventLoop 的事件循环中,如果是,则直接调用 register0() 方法进行注册。if (eventLoop.inEventLoop()) {register0(promise);} else {// 如果当前线程不在 eventLoop 的事件循环中,则通过 eventLoop.execute() 方法提交一个任务,让 eventLoop 执行注册操作。try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 如果无法将注册任务提交给 eventLoop 执行,则强制关闭 Channel,并设置 promise 的失败状态。logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}
进入到异步这里
try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {....}
eventLoop.execute
@Override
public void execute(Runnable task) {// 检查任务是否为nullif (task == null) {throw new NullPointerException("task");}// 判断当前线程是否在EventLoop的事件循环中boolean inEventLoop = inEventLoop();// 将任务添加到任务队列中addTask(task);// 如果当前线程不在EventLoop的事件循环中if (!inEventLoop) {// 启动一个新的线程来执行任务startThread();// 如果EventLoop已经被关闭if (isShutdown()) {boolean reject = false;try {// 检查任务是否可以从任务队列中移除if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// 任务队列不支持移除操作,直接忽略异常,希望在任务完全终止之前能够拾取到任务// 最坏的情况下,在终止时进行记录}// 如果需要拒绝执行该任务if (reject) {reject();}}}// 如果不需要唤醒EventLoop来处理任务if (!addTaskWakesUp && wakesUpForTask(task)) {// 如果当前线程不在EventLoop的事件循环中,则唤醒EventLoop来处理任务wakeup(inEventLoop);}
}
addTask(task);
/*** 将任务添加到任务队列中,如果实例在关闭之前被关闭,则抛出{@link RejectedExecutionException}。*/
protected void addTask(Runnable task) {// 检查任务是否为nullif (task == null) {throw new NullPointerException("task");}// 如果无法将任务添加到队列中,则拒绝执行该任务if (!offerTask(task)) {reject(task);}
}
startThread();
private void startThread() {// 如果线程状态为未启动if (state == ST_NOT_STARTED) {// 尝试将状态从未启动更改为已启动if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {// 启动线程doStartThread();} catch (Throwable cause) {// 如果启动线程时发生异常,则将状态重置为未启动,并抛出异常STATE_UPDATER.set(this, ST_NOT_STARTED);PlatformDependent.throwException(cause);}}}
}
doStartThread()
private void doStartThread() {// 确保线程为空assert thread == null;// 在执行器上执行一个新的 Runnableexecutor.execute(new Runnable() {@Overridepublic void run() {// 将当前线程设置为执行线程thread = Thread.currentThread();// 如果标记为已中断,则中断线程if (interrupted) {thread.interrupt();}boolean success = false;// 更新上次执行时间updateLastExecutionTime();try {// 运行单线程事件执行器的主要逻辑SingleThreadEventExecutor.this.run();// 执行成功success = true;} catch (Throwable t) {// 捕获并记录执行期间的异常logger.warn("Unexpected exception from an event executor: ", t);} finally {// 循环直到能够安全关闭for (;;) {int oldState = state;// 尝试将状态更改为正在关闭if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// 检查是否在循环结束时调用了 confirmShutdown()if (success && gracefulShutdownStartTime == 0) {if (logger.isErrorEnabled()) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +"be called before run() implementation terminates.");}}try {// 运行所有剩余任务和关闭钩子for (;;) {if (confirmShutdown()) {break;}}} finally {try {// 清理资源cleanup();} finally {// 移除线程上的所有 FastThreadLocals,因为线程即将终止,并通知未来。// 用户可能会在未来上阻塞,一旦解除阻塞,JVM 可能会终止并开始卸载类。// 详情请参阅 https://github.com/netty/netty/issues/6596。FastThreadLocal.removeAll();// 设置执行器状态为终止STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);// 释放线程锁threadLock.release();// 如果任务队列不为空,则记录警告日志if (!taskQueue.isEmpty()) {if (logger.isWarnEnabled()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}}// 设置终止未来为成功terminationFuture.setSuccess(null);}}}}});
}
SingleThreadEventExecutor.this.run()
@Override
protected void run() {// for死循环for (;;) {try {try {// 根据选择策略计算下一步的操作switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:// 继续循环,不进行任何操作continue;case SelectStrategy.BUSY_WAIT:// 忙等待策略,由于 NIO 不支持忙等待,因此执行 select 操作// 不会直接执行 select 方法,而是会先设置 wakenUp 为 false,然后执行 select 方法// 如果 wakenUp 为 true,则再次唤醒 Selector// 这样做是为了减少唤醒 Selector 的开销// 但是存在一个竞态条件:如果 wakenUp 在 select 之前设置为 true,则会导致不必要的唤醒// 因此在 select 之后需要再次判断 wakenUp 是否为 true,如果是则再次唤醒 Selectorselect(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// 继续执行后续操作default:}} catch (IOException e) {// 如果在这里收到 IOException,则表示 Selector 出现问题,需要重建 Selector 并重试rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {// 如果 ioRatio 为 100%,则优先处理所有的 IO 事件,然后再执行所有任务try {processSelectedKeys();} finally {// 确保始终运行所有任务runAllTasks();}} else {// 如果 ioRatio 不为 100%,则按比例处理 IO 事件和任务final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// 确保始终运行所有任务final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {// 处理循环中的异常handleLoopException(t);}// 即使循环处理中抛出异常,也始终处理关闭try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {// 处理关闭过程中的异常handleLoopException(t);}}
}
select(wakenUp.getAndSet(false))
private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {// 如果超时时间小于等于 0,则立即进行非阻塞的 selectNow 操作if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// 如果有任务并且 wakenUp 的值为 true,则立即进行非阻塞的 selectNow 操作// 这是为了确保任务在 select 操作之前已经被执行if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}// 执行阻塞的 select 操作,并记录选择的次数int selectedKeys = selector.select(timeoutMillis);selectCnt++;// 如果 select 操作返回了结果,或者已经被唤醒,或者有任务待执行,则立即退出循环if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}// 如果当前线程被中断,则重置选择的键并退出循环if (Thread.interrupted()) {selectCnt = 1;break;}// 更新当前时间,并判断是否需要继续循环long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// 如果 select 次数超过阈值,则重建 Selectorselector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}// 如果 select 次数超过了预设的阈值,则输出警告日志if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}} catch (CancelledKeyException e) {// 取消键异常通常是无害的,只输出调试日志即可logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}
}
int selectedKeys = selector.select(timeoutMillis);
熟悉的NIO代码。
processSelectedKeys()
private void processSelectedKeys() {// 如果使用了优化的 selectedKeys 集合,则调用优化过的处理方法if (selectedKeys != null) {processSelectedKeysOptimized();} else {// 否则,调用普通的处理方法并传入 selector.selectedKeys() 作为参数processSelectedKeysPlain(selector.selectedKeys());}
}
private void processSelectedKeysOptimized() {// 遍历优化过的 selectedKeys 集合for (int i = 0; i < selectedKeys.size; ++i) {// 获取当前索引处的 SelectionKeyfinal SelectionKey k = selectedKeys.keys[i];// 将数组中的该元素置为 null,以便在 Channel 关闭后可以被垃圾回收selectedKeys.keys[i] = null;// 获取 SelectionKey 对应的附件final Object a = k.attachment();// 如果附件是 AbstractNioChannel 类型,则调用 processSelectedKey 方法处理if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {// 否则,认为附件是 NioTask 类型,将其转换为 NioTask<SelectableChannel> 并调用 processSelectedKey 方法处理@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}// 如果需要再次进行 select,则重置 selectedKeys 集合并再次进行 selectif (needsToSelectAgain) {// 将数组中的剩余元素置为 null,以便在 Channel 关闭后可以被垃圾回收selectedKeys.reset(i + 1);// 再次进行 select 操作selectAgain();// 重置索引为 -1,使循环从 0 开始i = -1;}}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {// 获取与 Channel 相关联的 NioUnsafe 对象final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();// 如果 SelectionKey 不再有效,则关闭 Channelif (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// 如果 Channel 实现抛出异常,表示没有事件循环,我们忽略此异常// 因为我们只想确定 ch 是否注册到此事件循环,因此具有关闭 ch 的权限return;}// 只有在 ch 仍然注册到此 EventLoop 时才关闭 ch// 如果 ch 已从事件循环中注销,则 SelectionKey 可能作为注销过程的一部分被取消注册// 但是通道仍然健康且不应关闭。// 详见 https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// 关闭 Channelunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// 如果 OP_CONNECT 位被设置,则调用 finishConnect() 完成连接if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// 移除 OP_CONNECT 位,否则 Selector.select(..) 将总是立即返回而不阻塞// 详见 https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);// 完成连接unsafe.finishConnect();}// 先处理 OP_WRITE,因为我们可能能够写入一些排队的缓冲区,从而释放内存if ((readyOps & SelectionKey.OP_WRITE) != 0) {// 调用 forceFlush() 方法,该方法还会负责在没有剩余可写内容时清除 OP_WRITE 位ch.unsafe().forceFlush();}// 如果 OP_READ 或 OP_ACCEPT 被设置,或者 readyOps 为 0,则进行读取操作if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {// 如果出现 CancelledKeyException 异常,则关闭 Channelunsafe.close(unsafe.voidPromise());}
}
runAllTasks()
/*** 从任务队列中获取并运行所有任务。* * @return 如果至少运行了一个任务,则返回 true*/
protected boolean runAllTasks() {// 断言当前线程在事件循环中assert inEventLoop();boolean fetchedAll;boolean ranAtLeastOne = false;do {// 从计划任务队列中获取任务fetchedAll = fetchFromScheduledTaskQueue();// 运行任务队列中的所有任务if (runAllTasksFrom(taskQueue)) {ranAtLeastOne = true;}} while (!fetchedAll); // 继续处理,直到获取所有计划任务。// 如果至少运行了一个任务,则更新最后执行时间if (ranAtLeastOne) {lastExecutionTime = ScheduledFutureTask.nanoTime();}// 在运行完所有任务后执行的操作afterRunningAllTasks();return ranAtLeastOne;
}
/*** 从传入的任务队列中运行所有任务。** @param taskQueue 要轮询和执行所有任务的任务队列。** @return 如果至少执行了一个任务,则返回 true。*/
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {// 从任务队列中轮询出一个任务Runnable task = pollTaskFrom(taskQueue);if (task == null) {return false;}for (;;) {// 执行任务safeExecute(task);// 继续轮询下一个任务task = pollTaskFrom(taskQueue);if (task == null) {return true;}}
}
/*** 从任务队列中轮询出一个任务。** @param taskQueue 要轮询的任务队列。** @return 任务队列中的下一个任务,如果没有任务则返回 null。*/
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {for (;;) {// 从任务队列中轮询出一个任务Runnable task = taskQueue.poll();// 如果轮询出来的任务是 WAKEUP_TASK,则继续轮询下一个任务if (task == WAKEUP_TASK) {continue;}return task;}
}
取出来的任务就是
执行队列中的任务 : AbstractUnsafe#register0
private void register0(ChannelPromise promise) {try {// 检查通道是否仍然打开,因为在注册调用之外的时间内,通道可能已关闭// 设置 ChannelPromise 为不可取消状态,以确保注册成功后无法取消// 并且检查通道是否仍然打开if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}// 判断是否是第一次注册boolean firstRegistration = neverRegistered;// 执行具体的注册逻辑doRegister();// 标记通道已经注册过neverRegistered = false;registered = true;// 在实际通知 promise 之前,确保先调用 handlerAdded(...) 方法。这是必要的,因为用户可能已经通过管道触发了事件。pipeline.invokeHandlerAddedIfNeeded();// 设置注册成功,并通知 ChannelPromisesafeSetSuccess(promise);// 触发 ChannelRegistered 事件,通知管道上下文pipeline.fireChannelRegistered();// 只有在通道尚未激活并且已经注册过一次时才触发 channelActive 事件,避免重复触发if (isActive()) {if (firstRegistration) {// 如果是第一次注册,则触发 channelActive 事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {// 如果通道已经注册过且设置了自动读取,则重新开始读取,以便处理传入数据beginRead();}}} catch (Throwable t) {// 发生异常时,直接关闭通道以避免 FD 泄漏closeForcibly();closeFuture.setClosed();// 设置注册失败,并通知 ChannelPromisesafeSetFailure(promise, t);}
}
注册 doRegister()
doRegister();
@Override
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 尝试将通道注册到 EventLoop 的 Selector 上,关注的事件为 0,表示不关注任何事件selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// 强制 Selector 立即执行 select 操作,因为 "canceled" SelectionKey 可能仍然被缓存,尚未移除,因为尚未调用 Select.select(..) 操作。eventLoop().selectNow();selected = true;} else {// 我们在之前已经强制执行了一次选择操作,但是 SelectionKey 仍然因为某种原因被缓存了,可能是 JDK 的 bugthrow e;}}}
}
@Override
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 尝试将通道注册到 EventLoop 的 Selector 上,关注的事件为 0,表示不关注任何事件selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// 强制 Selector 立即执行 select 操作,因为 "canceled" SelectionKey 可能仍然被缓存,尚未移除,因为尚未调用 Select.select(..) 操作。eventLoop().selectNow();selected = true;} else {// 我们在之前已经强制执行了一次选择操作,但是 SelectionKey 仍然因为某种原因被缓存了,可能是 JDK 的 bugthrow e;}}}
}
熟悉的NIO代码
javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
pipeline.invokeHandlerAddedIfNeeded();
pipeline.fireChannelRegistered();
pipeline.fireChannelActive();
源码流程图
图都给你画好了,戳这里