Netty中的各个关键时间点(一)

前置说明

本文主要记录Netty中的一些主要的关键时间点,是理解Netty 和 事件驱动的关键。也是阅读Netty源码的指导。

代码来源:Netty 4.1.77

本文涉及到的角色:

角色:主线程,主Reactor线程(bossGroup中的EventLoop), 从Reactor线程(workGroup中的EventLoop),结合Reactor编码模型理解

PS:Netty中的主要事件节点不断完善中,看看后面要不要拆成多篇。。。

主Reactor线程的启动时间点

主线程调用ServerBootStrap.bind方法,会通过io.netty.bootstrap.AbstractBootstrap#initAndRegister方法,先初始化NioServerSocktChannel,然后把NioServerSocktChannel注册到主Reactor线程组中的一个主Reactor线程上

    final ChannelFuture initAndRegister() {Channel channel = null;try {// 创建和初始化channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}//注册ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}return regFuture;}

最终会调用该主Reactor线程的注册方法进行注册:io.netty.channel.AbstractChannel.AbstractUnsafe#register,

        @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...//检查相关代码忽略AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {//当前是Main线程调用,所以提交任务到ServerChannel对应的Reactor线程eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {异常处理忽略。。。}}}

由于是主线程调用的该方法,所以会通过io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable) -> io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable, boolean) 提交一个异步任务到该主Reactor线程,

private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();addTask(task); //把注册任务提交到主Reactor线程的任务队列if (!inEventLoop) {//当前是Main线程,需要启动主Reactor线程startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {}if (reject) {reject();}}}if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}}

然后主线程调用io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread方法启动主Reactor线程;

    private void doStartThread() {assert this.thread == null;this.executor.execute(new Runnable() {public void run() {SingleThreadEventExecutor.this.thread = Thread.currentThread();...try {...SingleThreadEventExecutor.this.run();...

在该io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread方法中,是通过io.netty.util.concurrent.ThreadPerTaskExecutor#execute方法来提交任务,启动一个线程的,

public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;...public void execute(Runnable command) {this.threadFactory.newThread(command).start();}
}

并且会把该线程赋值到主Reactor线程的io.netty.util.concurrent.SingleThreadEventExecutor#thread属性中,其实实际上的主Reactor线程应该是io.netty.util.concurrent.ThreadPerTaskExecutor#execute方法启动的线程;

此时,主Reactor线程就开始执行io.netty.channel.nio.NioEventLoop#run方法,进行主Reactor的核心逻辑了。

NioServerSocketChannel注册到主Reactor线程:

在上面的【主Reactor线程的启动时间点】流程中,我们知道,SeverBootStrap.bind方法会提交一个注册任务到主Reactor线程,主Reactor线程启动后,就会处理这个注册任务,该任务的核心代码为:io.netty.channel.AbstractChannel.AbstractUnsafe#register0 -> io.netty.channel.nio.AbstractNioChannel#doRegister

    @Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {//调用JDK中ServerSocketChannel的注册方法,把当前ServerSocketChannel注册到主Reactor线程中的selector上;//同时把Netty中的NioServerSocketChannel作为attachment,后面在通过JDK selector.select()获取事件的时候,可以获取到NioServerSocketChannel对象,方便执行Netty的操作selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {...异常处理忽略}}}

这里最终调用的是JDK中ServerSocketChannel的注册方法,把当前ServerSocketChannel注册到主Reactor线程中的selector上;

注意这里java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object)的参数:

att:这里把Netty中的NioServerSocketChannel作为attachment,后面在通过JDK selector.select()获取事件的时候,可以获取到NioServerSocketChannel对象,方便执行Netty的操作。

ops:0, 这里注册ServerSocketChannel的时候,感兴趣的操作是0,而OP_ACCETP对应的值是16,

java.nio.channels.SelectionKey

public static final int OP_ACCEPT = 1 << 4; 

Netty这里并没有注册感兴趣的事件。OP_ACCETP事件的添加另有他处。

NioServerSocketChannel注册OP_ACCEPT

在ServerBootStrap.bind方法,首先会通过io.netty.bootstrap.AbstractBootstrap#initAndRegister方法,先初始化NioServerSocktChannel,然后把NioServerSocktChannel注册到主Reactor线程组中的一个主Reactor线程上的selector上,此时没有关注IO事件,因为java.nio.channels.SelectableChannel#register注册的ops参数为0,不代表任何IO事件。

NioServerSocketChannel异常注册的时候,bind方法还提交了一个bind任务到主Reactor线程。

    private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister(); //创建、初始化、异步注册NioServerSocketChannelfinal Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 给异步注册任务添加Listener,在异步注册完成后,执行该Listener,通过doBind0向主Reactor线程提交bind任务regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}

给异步注册任务添加Listener,在异步注册完成后,执行该Listener,向主Reactor线程提交bind任务。

该bind任务,最终会调用io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise),在NioServerSocketChannel的pipeline中传播bind事件,该事件是一个outbound事件,会从tail开始,从后往前查找覆盖了bind方法的AbstractChannelHandlerContext,执行对应ChannelHandler的bind方法,最终会执行到HeadContext的bind方法:io.netty.channel.DefaultChannelPipeline.HeadContext#bind

        @Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);}

这里我们重点关注bind的整体执行主体行为:

通过unsafe.bind执行底层JDK的bind操作;然后向主Reactor线程提交ChannelActive事件,在pipeline中传播ChannelActive事件

 @Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {检查代码忽略boolean wasActive = isActive(); //此时wasActive为falsetry {doBind(localAddress); //调用底层JDK方法bind} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive && isActive()) {//doBind完成后,isActive为trueinvokeLater(new Runnable() {@Overridepublic void run() {//向主Reactor线程提交ChannelActive事件pipeline.fireChannelActive();}});}safeSetSuccess(promise);}@Overridepublic boolean isActive() {// As java.nio.ServerSocketChannel.isBound() will continue to return true even after the channel was closed// we will also need to check if it is open.return isOpen() && javaChannel().socket().isBound();}

pipeline中ChannelActive事件的传播:

ChannelActive事件是inBound事件,会从当前ChannelHandlerContext向后查找覆盖了channelActive的ChannelHandler,进行处理

主要处理逻辑在HeadContext中,

        @Overridepublic void channelActive(ChannelHandlerContext ctx) {//pipeline中继续向后传播channelActive事件ctx.fireChannelActive();//如果是autoRead 则自动触发read事件传播//在read回调函数中 触发OP_ACCEPT注册readIfIsAutoRead();}@Overridepublic Channel read() {pipeline.read();return this;}@Overridepublic void read(ChannelHandlerContext ctx) {unsafe.beginRead();}

然后通过unsafe.beginRead注册OP_ACCEPT

    @Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();//这里readInterestOp就是NIOServerSocketChannel创建的时候传入的OP_ACCEPT,super(null, channel, SelectionKey.OP_ACCEPT);if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}

此时,才完成了对OP_ACCEPT IO事件的关注监听。

主Reactor线程获取客户端连接时间:

当有客户端完成了三次握手后,服务端Socket会收到OP_ACCEPT事件,在主Reactor的核心逻辑io.netty.channel.nio.NioEventLoop#run方法中,会处理该OP_ACCEPT事件。

io.netty.channel.nio.NioEventLoop#processSelectedKeys -> io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {//获取Channel的底层操作类Unsafefinal AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {......如果SelectionKey已经失效则关闭对应的Channel......}try {//获取IO就绪事件int readyOps = k.readyOps();//处理Connect事件if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();//移除对Connect事件的监听,否则Selector会一直通知ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);//触发channelActive事件处理Connect事件unsafe.finishConnect();}//处理Write事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}//处理Read事件或者Accept事件if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}

然后会调用io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read方法进行底层操作,调用io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages方法,然后通过io.netty.util.internal.SocketUtils#accept方法,调用JDK中的java.nio.channels.ServerSocketChannel#accept获取客户端SocketChannel,并放入io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#readBuf容器中存放

然后遍历readBuf容器中的NioSocketChannel,通过NioServerSocketChannel#pipeline传播ChannelRead事件,最后会在io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead的方法中处理,向从Reactor线程组注册NioSocketChannel,触发从Reactor线程的启动。(流程和主Reactor线程启动调用)

从Reactor线程启动时间:

在上面【主Reactor获取客户端连接】的时候,会向从Reactor线程注册,这里会提交任务触发从Reactor线程的启动。

NioSocketChannel添加关注事件:OP_READ

在【主Reactor线程获取客户端连接时间】流程中,我们知道NioServerSocketChannel向主Reactor线程注册的时候,会在pipeline中传播ChannelRead事件,ServerBootstrap.ServerBootstrapAcceptor#channelRead的方法会处理ChannelRead,向从Reactor线程组注册NioSocketChannel,触发从Reactor线程的启动

        @Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {...try {// 选择一个从Reactor提交注册任务childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {...}});} catch (Throwable t) {forceClose(child, t);}}//io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}// io.netty.channel.AbstractChannel.AbstractUnsafe#register

这里的注册处理顺序与【NioServerSocketChannel注册OP_ACCEPT】顺序一致,且在调用JDK的时候,感兴趣的时间也是0;然后开始传播pipeline.fireChannelActive();

io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive处理ChannelActive事件,

        @Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();readIfIsAutoRead();}

然后,io.netty.channel.Channel.Unsafe#beginRead -> io.netty.channel.nio.AbstractNioChannel#doBeginRead 

    @Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {// readInterestOp 在NioServerSocketChannel和NIOSocketChannel中不一样selectionKey.interestOps(interestOps | readInterestOp);}}

这里与【NioServerSocketChannel注册OP_ACCEPT】的差异在于readInterestOp的值,这里是NIOSocketChannel,super(parent, ch, SelectionKey.OP_READ);

至此,OP_READ事件就可以监听了

从Reactor线程读取客户端数据时间

从Reactor线程向取客户端写数据时间

PS:耗时良久啊。相当于又看了几遍源码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/31337.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux连接工具MobaXterm详细使用教程

目录 一、MobaXterm的下载 1、访问官网 2、下载便携版 3、启动MobaXterm 二、MobaXterm基本使用设置 1、新建会话 2、使用ssh连接第一个会话 3、设置主密码 4、主界面 5、sftp文件上传下载 6、文件拖拽的上传下载 7.右键粘贴 8、查看服务器监测信息​编辑 9、个…

进军韩国5G市场!移远通信5G模组RG500L-EU率先获得KT、LGU+认证

近日&#xff0c;移远通信工规级5G模组RG500L-EU再传喜讯&#xff0c;率先通过了韩国两大运营商KT和LGU的严格认证。​在此之前&#xff0c;该模组已顺利通过KC认证&#xff08;韩国法规认证&#xff09;&#xff0c;此次再获运营商认证表明&#xff0c;RG500L-EU已完全满足韩国…

LeetCode LCP 61. 气温变化趋势

别怕麻烦&#xff0c;模拟题有时候就是要多写一些条件&#xff08;或者你思维很活跃找出规律&#xff09;&#xff0c;代码如下&#xff1a; class Solution { public:int temperatureTrend(vector<int>& temperatureA, vector<int>& temperatureB) {int …

斗地主游戏

找了个斗地主的项目&#xff0c;github项目地址&#xff1a;https://github.com/zhuang0/BoYaDDZ/tree/master/BoyaDDZ。在此基础上做些修改和优化&#xff0c;为了方便国内访问&#xff0c;上传到gitee上。 gitee地址: https://gitee.com/zhagnjinaaaa/android-ddz v0.0.1解决…

Linux基础篇

Linux 本文章是在B站的尚课听的&#xff0c;但是由于版本较老&#xff0c;而且是以centOS学习Linux&#xff0c;由于CentOS在10天后就不再更新&#xff0c;被抛弃了&#xff0c;痛定思痛&#xff0c;及时停课。但是又不想浪费笔记&#xff0c;前来保存一下。 文章目录 Linux前…

个人介绍~

摘要 大家好&#xff0c;很高兴以这种方式见到大家。本篇文章可能会很长&#xff0c;如果您不喜欢长文章或者笔者&#xff0c;就早早出门右拐&#xff08;不喜勿喷&#xff09;。本篇会持续更新&#xff0c;记录个人从大学生涯到毕业后求职&#xff0c;工作&#xff0c;个人爱好…

mindspore打卡第三课模型定义和训练全流程

python import mindspore from mindspore import nn, ops class Network(nn.Cell): ### 先定义类合参数 需要初始化实例 def __init__(self): super().__init__() self.flatten nn.Flatten() self.dense_relu_sequential nn.SequentialCell(…

Jacob代码编写/部署的注意事项集

注意事项一&#xff1a; 慎用 ComThread.InitSTA(); ComThread.Release(); 因为经常会在 ComThread.Release(); 发生阻塞&#xff0c;导致程序一直卡在这里&#xff0c;不能被调用&#xff1b; 建议不要使用这个初始化和释放线程的代码&#xff0c;看似很高级&#xff0c;其…

Swift Combine — zip和combineLatest的理解与使用

Publisher 上还有一些其他的操作&#xff0c;比如 zip 和 combineLatest&#xff0c;能让我们在时序上对控制多个 Publisher 的结果进行类似 and 和 or 的合并&#xff0c;它们在构建复杂 Publisher 逻辑时也十分有用。 zip Publisher 中的 zip 和 Sequence 的 zip 相类似&am…

iOS政策解读之一丨App提交审核前注意事项必知

大家好&#xff0c;我是小编阿文。欢迎您关注我们&#xff0c;经常分享有关Android出海&#xff0c;iOS出海&#xff0c;App市场政策实时更新&#xff0c;互金市场投放策略&#xff0c;最新互金新闻资讯等文章&#xff0c;期待与您共航世界之海。 iOS企业出海所面临的主要挑战…

路由优先级

在网络管理中&#xff0c;“路由策略”和“策略路由”是两种不同的概念。路由策略通常指的是传统的路由协议 和静态路由等机制的优先级&#xff0c;而策略路由&#xff08;Policy-Based Routing, PBR&#xff09;则允许管理员基于特定 的策略&#xff08;如源地址、目标地址、…

高速异地组网怎么办理?

在当今信息化时代&#xff0c;跨地域的远程办公、远程教育、远程医疗等需求越来越多。而高速异地组网作为一种解决不同地区之间快速组建局域网的方法&#xff0c;被广泛应用。本文将介绍一款异地组网内网穿透产品——【天联】&#xff0c;并提供其办理流程。 【天联】组网是什…

JMeter详解

一、线程组 作用:线程组就是控制Imeter用于执行测试的一组用户 位置:右键点击测试计划’-->添加 -->线程(用户)--> 线程组 特点: 模拟多人操作线程组可以添加多个&#xff0c;多个线程组可以并行或串行取样器(请求)和逻辑控制器必须依赖线程组才能使用线程组下可以…

[广搜BFS] Pots

描述 You are given two pots, having the volume of A and B liters respectively. The following operations can be performed: FILL(i) fill the pot i (1 ≤ i ≤ 2) from the tap;DROP(i) empty the pot i to the drain;POUR(i,j) pour from pot i to po…

java -jar

java [JVM参数] -jar [jar文件路径] [应用参数&#xff0c;包括springboot特定参数]* JVM 参数必须在-jar之前 * 应用参数一般在jar文件路径之后java -jar excel.jar java -jar -Dloader.pathlibx /path/to/yourApp.jar java -jar -Dloader.pathlibx /path/to/yourApp.jar --…

Behind the Code:Polkadot 如何实现全球协作与去中心化治理?

2024 年 6 月 16 日&#xff0c;《Behind the Code: Web3 Thinkers》第二季第二集上线。本集中&#xff0c;ChaosDAO 联合创始人兼 Novasama Technologies 首席财务官 Leemo 深入探讨了 Polkadot 生态系统中的全球协作力量&#xff0c;以及这种协作如何推动去中心化治理的创新与…

聊聊语法糖

语法糖&#xff08;Syntactic sugar&#xff09;是指编程语言中添加的某种语法&#xff0c;这种语法对语言的功能没有影响&#xff0c;但更方便程序员使用&#xff0c;并能增加程序的可读性&#xff0c;减少代码出错的机会。 历史&#xff1a; 语法糖这一术语是由英国计算机科…

C++:你用过MultiIndex容器吗?

作为C开发者&#xff0c;我们对键值容器非常熟悉&#xff0c;例如std::set、std::map、std::unordered_map等。这些容器以其强大的功能和高效的性能&#xff0c;成为我们处理数据存储和检索任务时的得力助手。但是你用过多键容器&#xff08;MultiIndex&#xff09;吗&#xff…

关于团队生存的小讨论

大家好&#xff0c;我是阿赵。   今天出门上班的时候&#xff0c;在电梯里面看到了信乐团的海报&#xff0c;信乐团要来我家附近开演唱会了。可惜&#xff0c;是没有了信的信乐团。   我以前读大学的时候&#xff0c;组建过自己的乐队&#xff0c;所以对当时很多乐队都非常…

在 macOS 上使用 Homebrew 安装和配置 Python 及 Tk 库

在 macOS 上&#xff0c;系统自带的 /usr/bin/python3 版本较旧&#xff0c;且直接升级系统自带的 Python 版本可能会影响系统稳定性。因此&#xff0c;推荐使用 Homebrew 来安装和管理 Python 及其相关库。本文将详细介绍如何通过 Homebrew 安装和配置 Python 3 及 Tk 库&…