Netty Review - 服务端channel注册流程源码解析

文章目录

  • Pre
  • Netty主从Reactor线程模型
  • 服务端channel注册流程
  • 源码解读
    • 入口 serverBootstrap.bind(port)
    • 执行队列中的任务 : AbstractUnsafe#register0
    • 注册 doRegister()
  • 源码流程图

在这里插入图片描述

在这里插入图片描述


Pre

Netty Review - ServerBootstrap源码解析

Netty Review - NioServerSocketChannel源码分析


Netty主从Reactor线程模型

Netty 使用主从 Reactor 线程模型来处理并发连接和网络事件。

在 Netty 中,通常有两种类型的线程池:

  1. Boss 线程池:用于接受客户端连接请求,并将接受到的连接注册到 Worker 线程池的 EventLoop 中。Boss 线程池中的线程负责监听 ServerSocketChannel,并将接受到的连接分配给 Worker 线程池中的某个 EventLoop 处理。

  2. Worker 线程池:每个 Worker 线程池包含多个 EventLoop,每个 EventLoop 负责处理一组连接的读写和事件处理。当一个连接被注册到某个 Worker 线程池的 EventLoop 中时,该 EventLoop 将负责处理这个连接的所有事件,包括读取数据、写入数据、处理网络事件等。


主从 Reactor 线程模型的工作流程如下:

  1. 主线程池(Boss 线程池)负责监听 ServerSocketChannel 上的连接请求,并将接受到的连接请求分配给 Worker 线程池中的某个 EventLoop。

  2. Worker 线程池中的每个 EventLoop 都独立负责一组连接的读写和事件处理。当一个连接被注册到某个 EventLoop 上时,该 EventLoop 将会不断地轮询连接上是否有可读事件或可写事件,并在事件发生时进行相应的处理。

  3. 当有读写事件发生时,EventLoop 将调用对应的 ChannelHandler 进行处理。这些 ChannelHandler 可以进行数据解析、业务逻辑处理等操作。

  4. 处理完事件后,EventLoop 可能会将结果写回到连接中,或者关闭连接等。

通过主从 Reactor 线程模型,Netty 可以高效地处理大量的并发连接和网络事件,提高了网络应用程序的性能和可扩展性。


服务端channel注册流程

在Netty中,服务端Channel注册流程涉及以下几个关键步骤:

  1. 创建ServerBootstrap实例: 首先,需要创建一个ServerBootstrap实例,它是Netty提供的用于启动服务端的引导类。

  2. 配置ServerBootstrap: 使用ServerBootstrap实例,设置一系列参数,包括线程模型、Channel类型、处理器等。

  3. 绑定端口并启动服务: 调用ServerBootstrap的bind方法,指定端口并启动服务端。在bind方法内部,会进行以下操作:

    • 创建NioServerSocketChannel实例:用于表示服务端的Channel,内部封装了Java NIO中的ServerSocketChannel。

    • 初始化ChannelPipeline:为NioServerSocketChannel实例创建一个ChannelPipeline对象,用于管理ChannelHandler链。

    • 创建ChannelInitializer并添加到ChannelPipeline:ChannelInitializer是一个特殊的ChannelHandler,它用于在Channel注册到EventLoop之后初始化ChannelPipeline。在ChannelInitializer的initChannel方法中,可以向ChannelPipeline中添加自定义的ChannelHandler。

    • 获取EventLoopGroup并注册Channel:从ServerBootstrap中获取Boss EventLoopGroup,然后调用其register方法注册NioServerSocketChannel到EventLoop上。

  4. 注册Channel到EventLoop: 在调用register方法时,会将NioServerSocketChannel注册到Boss EventLoop上。在注册过程中,会执行以下操作:

    • 获取EventLoop:根据配置,从Boss EventLoopGroup中选择一个EventLoop。

    • 调用EventLoop的register方法:将NioServerSocketChannel注册到选定的EventLoop上。注册过程中,会创建一个NioServerSocketChannelUnsafe实例来处理注册过程,其中会调用底层的Java NIO方法将ServerSocketChannel注册到Selector上,并监听ACCEPT事件。

  5. 事件处理: 一旦NioServerSocketChannel注册到了EventLoop上,就会开始监听ACCEPT事件。当有新的连接接入时,会触发ACCEPT事件,EventLoop会调用相关的ChannelHandler进行处理,如调用ChannelInitializer的initChannel方法,添加用户自定义的ChannelHandler到新的连接的ChannelPipeline中。接着,新的连接就可以接受和处理客户端的请求了。

通过以上流程,服务端Channel在Netty中的注册过程就完成了,它可以接受客户端的连接,并将连接注册到EventLoop上进行事件处理。


源码解读

当我们梳理完

Netty Review - ServerBootstrap源码解析

Netty Review - NioServerSocketChannel源码分析

接下来让我们从下面这一行代码开始

ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
channelFuture.channel().closeFuture().sync();

这段代码用于启动服务端并阻塞当前线程直到服务端关闭。

  1. serverBootstrap.bind(9000):调用serverBootstrapbind()方法绑定端口9000,并返回一个ChannelFuture对象,表示绑定操作的异步结果。

  2. .sync():调用sync()方法阻塞当前线程,直到绑定操作完成。这样做是为了确保服务端在端口绑定完成后再继续执行后续代码。

  3. channelFuture.channel().closeFuture().sync():获取channelFuture中的channel(),然后调用其closeFuture()方法获取一个表示关闭操作的ChannelFuture对象。接着,再次调用sync()方法阻塞当前线程,直到关闭操作完成。这样做是为了让当前线程一直等待直到服务端关闭。

入口 serverBootstrap.bind(port)

这段代码是bind(int inetPort)方法的实现。

/*** Create a new {@link Channel} and bind it.*/
public ChannelFuture bind(int inetPort) {// 调用bind方法,传入一个InetSocketAddress对象,该对象使用指定的端口号创建return bind(new InetSocketAddress(inetPort));
}

创建一个新的Channel并绑定到指定的端口。


doBind(final SocketAddress localAddress)

这段代码是doBind(final SocketAddress localAddress)方法的实现。

private ChannelFuture doBind(final SocketAddress localAddress) {// 初始化并注册Channel,并返回一个ChannelFuturefinal ChannelFuture regFuture = initAndRegister();// 获取注册完成的Channelfinal Channel channel = regFuture.channel();// 如果注册过程中发生异常,则直接返回注册的ChannelFutureif (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// 如果注册已经完成,则创建一个新的ChannelPromise,并执行绑定操作ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// 如果注册尚未完成,则创建一个PendingRegistrationPromise,并添加一个监听器等待注册完成后再执行绑定操作final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// 如果注册过程中发生异常,则直接设置失败状态promise.setFailure(cause);} else {// 注册成功后执行绑定操作promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}

这段代码的作用是执行绑定操作,并返回一个与绑定操作相关的ChannelFuture。


initAndRegister()

final ChannelFuture initAndRegister() {Channel channel = null;try {// 使用channelFactory创建一个新的Channel实例channel = channelFactory.newChannel();// 对新创建的Channel进行初始化init(channel);} catch (Throwable t) {if (channel != null) {// 如果初始化过程中发生异常,关闭Channelchannel.unsafe().closeForcibly();// 创建一个新的ChannelPromise,并设置失败状态return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// 如果channel为null,则创建一个FailedChannel实例,并设置失败状态return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 使用ChannelConfig的EventLoopGroup进行注册ChannelFuture regFuture = config().group().register(channel);// 如果注册过程中发生异常,则关闭Channelif (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// 返回注册的ChannelFuturereturn regFuture;
}

创建一个新的Channel实例并对其进行初始化,然后使用EventLoopGroup将其注册到事件循环中。最后返回一个与注册操作相关的ChannelFuture。


channelFactory.newChannel()

channelFactory.newChannel() 中的实现,请移步 Netty Review - NioServerSocketChannel源码分析

init(channel)

@Override
void init(Channel channel) throws Exception {// 设置Channel的选项final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);}// 设置Channel的属性final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}// 获取Channel的PipelineChannelPipeline p = channel.pipeline();// 复制当前的子组、子处理器、子选项和子属性final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}// 向Channel的Pipeline中添加一个ChannelInitializerp.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {// 添加用户配置的处理器到Pipeline中pipeline.addLast(handler);}// 在Channel的事件循环中执行ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 添加一个ServerBootstrapAcceptor到Pipeline中,用于接收新连接pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}

init()方法的作用是初始化Channel,设置Channel的选项和属性,然后向ChannelPipeline中添加一个ChannelInitializer,该InitializerChannel的事件循环中执行,并向Pipeline中添加一个ServerBootstrapAcceptor,用于接收新连接。


config().group().register(channel)

在这里插入图片描述

    @Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}

在这里插入图片描述

在这里插入图片描述

next()

io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser#next
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {// 使用原子整数来维护索引,以确保在多线程环境中安全地获取下一个 EventExecutor 实例。private final AtomicInteger idx = new AtomicInteger();// 存储所有可用的 EventExecutor 实例的数组。private final EventExecutor[] executors;// 构造方法,接收一个 EventExecutor 实例数组作为参数,并将其存储在 executors 成员变量中。PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}// 选择下一个要使用的 EventExecutor 实例。// 通过对索引进行按位与操作(idx.getAndIncrement() & executors.length - 1),// 来确保索引始终在 executors 数组的有效范围内。// 由于 executors.length 必须是 2 的幂次方,因此使用按位与运算(&)可以有效地实现取模操作,// 从而将索引限制在数组长度范围内。@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}
}

在这里插入图片描述

@Override
public ChannelFuture register(final ChannelPromise promise) {// 检查传入的 promise 参数是否为 null,如果为 null,则抛出 NullPointerException 异常。ObjectUtil.checkNotNull(promise, "promise");// 获取与该 promise 关联的 Channel 实例,通过 unsafe() 方法获取 Channel 的 Unsafe 实例,然后调用 register() 方法注册 Channel。// 这里调用的是 unsafe() 方法,表示使用一种不安全的方式直接注册 Channel,而不经过 EventLoop 的事件循环。// register() 方法将 Channel 注册到当前 EventLoop,由 EventLoop 负责管理该 Channel。promise.channel().unsafe().register(this, promise);// 返回传入的 promise 对象,即注册 Channel 的异步结果。return promise;
}

AbstractUnsafe#register

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 检查 eventLoop 参数是否为 null,如果为 null,则抛出 NullPointerException 异常。if (eventLoop == null) {throw new NullPointerException("eventLoop");}// 检查 Channel 是否已经注册到某个 EventLoop,如果已经注册,则设置 promise 的失败状态,并返回。if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}// 检查传入的 eventLoop 是否与当前 Channel 兼容,如果不兼容,则设置 promise 的失败状态,并返回。if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}// 将当前 Channel 的 eventLoop 属性设置为传入的 eventLoop。AbstractChannel.this.eventLoop = eventLoop;// 判断当前线程是否在 eventLoop 的事件循环中,如果是,则直接调用 register0() 方法进行注册。if (eventLoop.inEventLoop()) {register0(promise);} else {// 如果当前线程不在 eventLoop 的事件循环中,则通过 eventLoop.execute() 方法提交一个任务,让 eventLoop 执行注册操作。try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 如果无法将注册任务提交给 eventLoop 执行,则强制关闭 Channel,并设置 promise 的失败状态。logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 检查 eventLoop 参数是否为 null,如果为 null,则抛出 NullPointerException 异常。if (eventLoop == null) {throw new NullPointerException("eventLoop");}// 检查 Channel 是否已经注册到某个 EventLoop,如果已经注册,则设置 promise 的失败状态,并返回。if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}// 检查传入的 eventLoop 是否与当前 Channel 兼容,如果不兼容,则设置 promise 的失败状态,并返回。if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}// 将当前 Channel 的 eventLoop 属性设置为传入的 eventLoop。AbstractChannel.this.eventLoop = eventLoop;// 判断当前线程是否在 eventLoop 的事件循环中,如果是,则直接调用 register0() 方法进行注册。if (eventLoop.inEventLoop()) {register0(promise);} else {// 如果当前线程不在 eventLoop 的事件循环中,则通过 eventLoop.execute() 方法提交一个任务,让 eventLoop 执行注册操作。try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 如果无法将注册任务提交给 eventLoop 执行,则强制关闭 Channel,并设置 promise 的失败状态。logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}

进入到异步这里

 try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {....}

eventLoop.execute

在这里插入图片描述

@Override
public void execute(Runnable task) {// 检查任务是否为nullif (task == null) {throw new NullPointerException("task");}// 判断当前线程是否在EventLoop的事件循环中boolean inEventLoop = inEventLoop();// 将任务添加到任务队列中addTask(task);// 如果当前线程不在EventLoop的事件循环中if (!inEventLoop) {// 启动一个新的线程来执行任务startThread();// 如果EventLoop已经被关闭if (isShutdown()) {boolean reject = false;try {// 检查任务是否可以从任务队列中移除if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// 任务队列不支持移除操作,直接忽略异常,希望在任务完全终止之前能够拾取到任务// 最坏的情况下,在终止时进行记录}// 如果需要拒绝执行该任务if (reject) {reject();}}}// 如果不需要唤醒EventLoop来处理任务if (!addTaskWakesUp && wakesUpForTask(task)) {// 如果当前线程不在EventLoop的事件循环中,则唤醒EventLoop来处理任务wakeup(inEventLoop);}
}

addTask(task);

/*** 将任务添加到任务队列中,如果实例在关闭之前被关闭,则抛出{@link RejectedExecutionException}。*/
protected void addTask(Runnable task) {// 检查任务是否为nullif (task == null) {throw new NullPointerException("task");}// 如果无法将任务添加到队列中,则拒绝执行该任务if (!offerTask(task)) {reject(task);}
}

startThread();

private void startThread() {// 如果线程状态为未启动if (state == ST_NOT_STARTED) {// 尝试将状态从未启动更改为已启动if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {// 启动线程doStartThread();} catch (Throwable cause) {// 如果启动线程时发生异常,则将状态重置为未启动,并抛出异常STATE_UPDATER.set(this, ST_NOT_STARTED);PlatformDependent.throwException(cause);}}}
}

doStartThread()

private void doStartThread() {// 确保线程为空assert thread == null;// 在执行器上执行一个新的 Runnableexecutor.execute(new Runnable() {@Overridepublic void run() {// 将当前线程设置为执行线程thread = Thread.currentThread();// 如果标记为已中断,则中断线程if (interrupted) {thread.interrupt();}boolean success = false;// 更新上次执行时间updateLastExecutionTime();try {// 运行单线程事件执行器的主要逻辑SingleThreadEventExecutor.this.run();// 执行成功success = true;} catch (Throwable t) {// 捕获并记录执行期间的异常logger.warn("Unexpected exception from an event executor: ", t);} finally {// 循环直到能够安全关闭for (;;) {int oldState = state;// 尝试将状态更改为正在关闭if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// 检查是否在循环结束时调用了 confirmShutdown()if (success && gracefulShutdownStartTime == 0) {if (logger.isErrorEnabled()) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +"be called before run() implementation terminates.");}}try {// 运行所有剩余任务和关闭钩子for (;;) {if (confirmShutdown()) {break;}}} finally {try {// 清理资源cleanup();} finally {// 移除线程上的所有 FastThreadLocals,因为线程即将终止,并通知未来。// 用户可能会在未来上阻塞,一旦解除阻塞,JVM 可能会终止并开始卸载类。// 详情请参阅 https://github.com/netty/netty/issues/6596。FastThreadLocal.removeAll();// 设置执行器状态为终止STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);// 释放线程锁threadLock.release();// 如果任务队列不为空,则记录警告日志if (!taskQueue.isEmpty()) {if (logger.isWarnEnabled()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}}// 设置终止未来为成功terminationFuture.setSuccess(null);}}}}});
}

SingleThreadEventExecutor.this.run()

在这里插入图片描述

@Override
protected void run() {// for死循环for (;;) {try {try {// 根据选择策略计算下一步的操作switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:// 继续循环,不进行任何操作continue;case SelectStrategy.BUSY_WAIT:// 忙等待策略,由于 NIO 不支持忙等待,因此执行 select 操作// 不会直接执行 select 方法,而是会先设置 wakenUp 为 false,然后执行 select 方法// 如果 wakenUp 为 true,则再次唤醒 Selector// 这样做是为了减少唤醒 Selector 的开销// 但是存在一个竞态条件:如果 wakenUp 在 select 之前设置为 true,则会导致不必要的唤醒// 因此在 select 之后需要再次判断 wakenUp 是否为 true,如果是则再次唤醒 Selectorselect(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// 继续执行后续操作default:}} catch (IOException e) {// 如果在这里收到 IOException,则表示 Selector 出现问题,需要重建 Selector 并重试rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {// 如果 ioRatio 为 100%,则优先处理所有的 IO 事件,然后再执行所有任务try {processSelectedKeys();} finally {// 确保始终运行所有任务runAllTasks();}} else {// 如果 ioRatio 不为 100%,则按比例处理 IO 事件和任务final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// 确保始终运行所有任务final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {// 处理循环中的异常handleLoopException(t);}// 即使循环处理中抛出异常,也始终处理关闭try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {// 处理关闭过程中的异常handleLoopException(t);}}
}

select(wakenUp.getAndSet(false))

private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {// 如果超时时间小于等于 0,则立即进行非阻塞的 selectNow 操作if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// 如果有任务并且 wakenUp 的值为 true,则立即进行非阻塞的 selectNow 操作// 这是为了确保任务在 select 操作之前已经被执行if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}// 执行阻塞的 select 操作,并记录选择的次数int selectedKeys = selector.select(timeoutMillis);selectCnt++;// 如果 select 操作返回了结果,或者已经被唤醒,或者有任务待执行,则立即退出循环if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}// 如果当前线程被中断,则重置选择的键并退出循环if (Thread.interrupted()) {selectCnt = 1;break;}// 更新当前时间,并判断是否需要继续循环long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// 如果 select 次数超过阈值,则重建 Selectorselector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}// 如果 select 次数超过了预设的阈值,则输出警告日志if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}} catch (CancelledKeyException e) {// 取消键异常通常是无害的,只输出调试日志即可logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}
}

int selectedKeys = selector.select(timeoutMillis);

在这里插入图片描述

熟悉的NIO代码。


processSelectedKeys()

private void processSelectedKeys() {// 如果使用了优化的 selectedKeys 集合,则调用优化过的处理方法if (selectedKeys != null) {processSelectedKeysOptimized();} else {// 否则,调用普通的处理方法并传入 selector.selectedKeys() 作为参数processSelectedKeysPlain(selector.selectedKeys());}
}
private void processSelectedKeysOptimized() {// 遍历优化过的 selectedKeys 集合for (int i = 0; i < selectedKeys.size; ++i) {// 获取当前索引处的 SelectionKeyfinal SelectionKey k = selectedKeys.keys[i];// 将数组中的该元素置为 null,以便在 Channel 关闭后可以被垃圾回收selectedKeys.keys[i] = null;// 获取 SelectionKey 对应的附件final Object a = k.attachment();// 如果附件是 AbstractNioChannel 类型,则调用 processSelectedKey 方法处理if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {// 否则,认为附件是 NioTask 类型,将其转换为 NioTask<SelectableChannel> 并调用 processSelectedKey 方法处理@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}// 如果需要再次进行 select,则重置 selectedKeys 集合并再次进行 selectif (needsToSelectAgain) {// 将数组中的剩余元素置为 null,以便在 Channel 关闭后可以被垃圾回收selectedKeys.reset(i + 1);// 再次进行 select 操作selectAgain();// 重置索引为 -1,使循环从 0 开始i = -1;}}
}

在这里插入图片描述

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {// 获取与 Channel 相关联的 NioUnsafe 对象final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();// 如果 SelectionKey 不再有效,则关闭 Channelif (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// 如果 Channel 实现抛出异常,表示没有事件循环,我们忽略此异常// 因为我们只想确定 ch 是否注册到此事件循环,因此具有关闭 ch 的权限return;}// 只有在 ch 仍然注册到此 EventLoop 时才关闭 ch// 如果 ch 已从事件循环中注销,则 SelectionKey 可能作为注销过程的一部分被取消注册// 但是通道仍然健康且不应关闭。// 详见 https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// 关闭 Channelunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// 如果 OP_CONNECT 位被设置,则调用 finishConnect() 完成连接if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// 移除 OP_CONNECT 位,否则 Selector.select(..) 将总是立即返回而不阻塞// 详见 https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);// 完成连接unsafe.finishConnect();}// 先处理 OP_WRITE,因为我们可能能够写入一些排队的缓冲区,从而释放内存if ((readyOps & SelectionKey.OP_WRITE) != 0) {// 调用 forceFlush() 方法,该方法还会负责在没有剩余可写内容时清除 OP_WRITE 位ch.unsafe().forceFlush();}// 如果 OP_READ 或 OP_ACCEPT 被设置,或者 readyOps 为 0,则进行读取操作if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {// 如果出现 CancelledKeyException 异常,则关闭 Channelunsafe.close(unsafe.voidPromise());}
}

在这里插入图片描述


runAllTasks()

/*** 从任务队列中获取并运行所有任务。* * @return 如果至少运行了一个任务,则返回 true*/
protected boolean runAllTasks() {// 断言当前线程在事件循环中assert inEventLoop();boolean fetchedAll;boolean ranAtLeastOne = false;do {// 从计划任务队列中获取任务fetchedAll = fetchFromScheduledTaskQueue();// 运行任务队列中的所有任务if (runAllTasksFrom(taskQueue)) {ranAtLeastOne = true;}} while (!fetchedAll); // 继续处理,直到获取所有计划任务。// 如果至少运行了一个任务,则更新最后执行时间if (ranAtLeastOne) {lastExecutionTime = ScheduledFutureTask.nanoTime();}// 在运行完所有任务后执行的操作afterRunningAllTasks();return ranAtLeastOne;
}
/*** 从传入的任务队列中运行所有任务。** @param taskQueue 要轮询和执行所有任务的任务队列。** @return 如果至少执行了一个任务,则返回 true。*/
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {// 从任务队列中轮询出一个任务Runnable task = pollTaskFrom(taskQueue);if (task == null) {return false;}for (;;) {// 执行任务safeExecute(task);// 继续轮询下一个任务task = pollTaskFrom(taskQueue);if (task == null) {return true;}}
}
/*** 从任务队列中轮询出一个任务。** @param taskQueue 要轮询的任务队列。** @return 任务队列中的下一个任务,如果没有任务则返回 null。*/
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {for (;;) {// 从任务队列中轮询出一个任务Runnable task = taskQueue.poll();// 如果轮询出来的任务是 WAKEUP_TASK,则继续轮询下一个任务if (task == WAKEUP_TASK) {continue;}return task;}
}

取出来的任务就是
在这里插入图片描述


执行队列中的任务 : AbstractUnsafe#register0

private void register0(ChannelPromise promise) {try {// 检查通道是否仍然打开,因为在注册调用之外的时间内,通道可能已关闭// 设置 ChannelPromise 为不可取消状态,以确保注册成功后无法取消// 并且检查通道是否仍然打开if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}// 判断是否是第一次注册boolean firstRegistration = neverRegistered;// 执行具体的注册逻辑doRegister();// 标记通道已经注册过neverRegistered = false;registered = true;// 在实际通知 promise 之前,确保先调用 handlerAdded(...) 方法。这是必要的,因为用户可能已经通过管道触发了事件。pipeline.invokeHandlerAddedIfNeeded();// 设置注册成功,并通知 ChannelPromisesafeSetSuccess(promise);// 触发 ChannelRegistered 事件,通知管道上下文pipeline.fireChannelRegistered();// 只有在通道尚未激活并且已经注册过一次时才触发 channelActive 事件,避免重复触发if (isActive()) {if (firstRegistration) {// 如果是第一次注册,则触发 channelActive 事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {// 如果通道已经注册过且设置了自动读取,则重新开始读取,以便处理传入数据beginRead();}}} catch (Throwable t) {// 发生异常时,直接关闭通道以避免 FD 泄漏closeForcibly();closeFuture.setClosed();// 设置注册失败,并通知 ChannelPromisesafeSetFailure(promise, t);}
}

注册 doRegister()

在这里插入图片描述

doRegister();

@Override
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 尝试将通道注册到 EventLoop 的 Selector 上,关注的事件为 0,表示不关注任何事件selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// 强制 Selector 立即执行 select 操作,因为 "canceled" SelectionKey 可能仍然被缓存,尚未移除,因为尚未调用 Select.select(..) 操作。eventLoop().selectNow();selected = true;} else {// 我们在之前已经强制执行了一次选择操作,但是 SelectionKey 仍然因为某种原因被缓存了,可能是 JDK 的 bugthrow e;}}}
}
@Override
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 尝试将通道注册到 EventLoop 的 Selector 上,关注的事件为 0,表示不关注任何事件selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// 强制 Selector 立即执行 select 操作,因为 "canceled" SelectionKey 可能仍然被缓存,尚未移除,因为尚未调用 Select.select(..) 操作。eventLoop().selectNow();selected = true;} else {// 我们在之前已经强制执行了一次选择操作,但是 SelectionKey 仍然因为某种原因被缓存了,可能是 JDK 的 bugthrow e;}}}
}

熟悉的NIO代码

javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

在这里插入图片描述

pipeline.invokeHandlerAddedIfNeeded();

pipeline.fireChannelRegistered();

pipeline.fireChannelActive();


源码流程图

在这里插入图片描述

图都给你画好了,戳这里


在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/682993.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TCP高频知识点

本篇文章主要讲述一下在面试过程中TCP的高频知识点 1.TCP三次握手流程图: 客户端发送一个SYN&#xff08;同步&#xff09;报文段给服务器&#xff0c;选择一个初始序列号&#xff0c;并设置SYN标志位为1。服务器接收到客户端的SYN报文段后&#xff0c;回复一个ACK&#xff08…

OJ刷题:杨氏矩阵【建议收藏】

看见这个题目&#xff0c;很多人的第一反应是遍历整个数组查找数字&#xff0c;但是这种方法不仅效率低&#xff0c;而且远远不能满足题目要求。下面介绍一种高效的查找方法&#xff1a; 代码实现&#xff1a; #include <stdio.h>int Yang_Find_Num(int arr[][3], int …

steam游戏搬砖项目靠谱吗?有没有风险?

作为一款fps射击游戏&#xff0c;csgo在近几年可谓是火出圈&#xff0c;作为一款全球竞技游戏&#xff0c;深受玩家喜爱追捧&#xff0c;玩家追求的就是公平公正&#xff0c;各凭本事&#xff0c;像其他游戏可能还会有皮肤等装备属性加成&#xff0c;在csgo里面是不存在的。 纯…

K8sGPT 的使用

K8sGPT 介绍 k8sgpt 是一个扫描 Kubernetes 集群、诊断和分类问题的工具。它将 SRE 经验编入其分析器中&#xff0c;并帮助提取最相关的信息&#xff0c;通过人工智能来丰富它。它还可以与 OpenAI、Azure、Cohere、Amazon Bedrock 和本地模型结合使用。 K8sGPT Github 地址 …

Vue插槽

Vue插槽 一、插槽-默认插槽1.作用2.需求3.问题4.插槽的基本语法5.代码示例6.总结 二、插槽-后备内容&#xff08;默认值&#xff09;1.问题2.插槽的后备内容3.语法4.效果5.代码示例 三、插槽-具名插槽1.需求2.具名插槽语法3.v-slot的简写4.代码示例5.总结 四、作用域插槽1.插槽…

安卓价值1-如何在电脑上运行ADB

ADB&#xff08;Android Debug Bridge&#xff09;是Android平台的调试工具&#xff0c;它是一个命令行工具&#xff0c;用于与连接到计算机的Android设备进行通信和控制。ADB提供了一系列命令&#xff0c;允许开发人员执行各种操作&#xff0c;包括但不限于&#xff1a; 1. 安…

在python中JSON数据格式的使用

什么是JSON&#xff1f; JSON是一种数据格式&#xff0c;由美国程序设计师DouglasCrockford创建的&#xff0c;JSON全名是JavaScript Object Notation,由JSON英文全文字义我们可以推敲JSON的缘由&#xff0c;最初是为JavaScript开发的。这种数据格式由于简单好用被大量应用在We…

DS Wannabe之5-AM Project: DS 30day int prep day15

Q1. What is Autoencoder? 自编码器是什么&#xff1f; 自编码器是一种特殊类型的神经网络&#xff0c;它通过无监督学习尝试复现其输入数据。它通常包含两部分&#xff1a;编码器和解码器。编码器压缩输入数据成为一个低维度的中间表示&#xff0c;解码器则从这个中间表示重…

数据库被人破解,删除数据,勒索

事情是这样的&#xff0c;我买了一台服务器自己部署项目玩儿玩儿&#xff0c;我的数据库运行在3306端口&#xff0c;密码没改&#xff0c;就是默认的123456&#xff0c;诡异的事情发生了&#xff0c;用了一段时间之后&#xff0c;数据库突然连接不上了&#xff0c;我一通操作猛…

Python爬虫——解析库安装(1)

目录 1.lxml安装2.Beautiful Soup安装3.pyquery 的安装 我创建了一个社区&#xff0c;欢迎大家一起学习交流。社区名称&#xff1a;Spider学习交流 注&#xff1a;该系列教程已经默认用户安装了Pycharm和Anaconda&#xff0c;未安装的可以参考我之前的博客有将如何安装。同时默…

【开源】SpringBoot框架开发企业项目合同信息系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 合同审批模块2.3 合同签订模块2.4 合同预警模块2.5 数据可视化模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 合同审批表3.2.2 合同签订表3.2.3 合同预警表 四、系统展示五、核心代码5.1 查询合同…

Linux多线程[二]

引入知识 进程在线程内部执行是OS的系统调度单位。 内核中针对地址空间&#xff0c;有一种特殊的结构&#xff0c;VM_area_struct。这个用来控制虚拟内存中每个malloc等申请的空间&#xff0c;来区别每个malloc的是对应的堆区哪一段。OS可以做到资源的精细度划分。 对于磁盘…

嵌入式软件设计入门:从零开始学习嵌入式软件设计

&#xff08;本文为简单介绍&#xff0c;个人观点仅供参考&#xff09; 首先,让我们了解一下嵌入式软件的定义。嵌入式软件是指运行在嵌入式系统中的特定用途软件,它通常被用来控制硬件设备、处理实时数据和实现特定功能。与桌面应用程序相比,嵌入式软件需要具备更高的实时性、…

反无人机系统技术分析,无人机反制技术理论基础,无人机技术详解

近年来&#xff0c;经过大疆、parrot、3d robotics等公司不断的努力&#xff0c;具有强大功能的消费级无人机价格不断降低&#xff0c;操作简便性不断提高&#xff0c;无人机正快速地从尖端的军用设备转入大众市场&#xff0c;成为普通民众手中的玩具。 然而&#xff0c;随着消…

Python算法题集_翻转二叉树

Python算法题集_翻转二叉树 题226&#xff1a;翻转二叉树1. 示例说明2. 题目解析- 题意分解- 优化思路- 测量工具 3. 代码展开1) 标准求解【DFS递归】2) 改进版一【BFS迭代&#xff0c;节点循环】3) 改进版二【BFS迭代&#xff0c;列表循环】 4. 最优算法 本文为Python算法题集…

Spring Boot 笔记 019 创建接口_文件上传

1.1 创建阿里OSS bucket OSS Java SDK 兼容性和示例代码_对象存储(OSS)-阿里云帮助中心 (aliyun.com) 1.2 编写工具类 package com.geji.utils;import com.aliyun.oss.ClientException; import com.aliyun.oss.OSS; import com.aliyun.oss.OSSClientBuilder; import com.aliyun…

加速创新如何先从创意管理开始?

文章详细介绍了什么是创意管理以及它在组织中的重要性和最佳实践。创意管理是指在组织内捕捉、组织、评估和实施创意的过程。它通过建立一个结构化的系统&#xff0c;从员工、客户或其他利益相关者那里收集创意&#xff0c;并系统地审查和选择最有前景的创意进行进一步的开发或…

算法学习——LeetCode力扣回溯篇3

算法学习——LeetCode力扣回溯篇3 491. 非递减子序列 491. 非递减子序列 - 力扣&#xff08;LeetCode&#xff09; 描述 给你一个整数数组 nums &#xff0c;找出并返回所有该数组中不同的递增子序列&#xff0c;递增子序列中 至少有两个元素 。你可以按 任意顺序 返回答案。…

2024龙年特别篇 -- 魔法指针 之 指针变量的意义 指针运算

学习完指针变量&#xff1a;链接后&#xff0c; 我们继续学习指针变量的应用 目录 程序展示 原始方式 指针变量方式 代码对比 指针运算 指针-整数 用指针打印数组内容 使用指针打印1-10中的奇数 指针-指针 指针的关系运算 程序展示 打印一个有10个元素的数组&am…

语言与科技创新(大语言模型对科技创新的影响)

1.科技创新中的语言因素 科技创新中的语言因素至关重要&#xff0c;具体体现在以下几个方面&#xff1a; 科技文献交流&#xff1a; 英语作为全球科学研究的通用语言&#xff0c;极大地推动了科技成果的国际传播与合作。科学家们在发表论文、报告研究成果时&#xff0c;大多选…