【图解IO与Netty系列】Netty源码解析——事件循环

Netty源码解析——事件循环

  • Netty事件循环
  • 源码解析
    • select()
    • processSelectedKeys()
      • NioMessageUnsafe#read()
      • NioByteUnsafe#read()
    • runAllTasks()

Netty事件循环

当Netty服务端启动起来以后,就可以接受客户端发送的请求,接收到客户端发来的请求后就会有事件就绪,有事件就绪就会在Netty的事件循环中被监听到并处理,我们下面看看Netty事件循环的逻辑。

在这里插入图片描述

Netty中的NIO事件循环的代码位于NioEventLoop的run()方法内部,这个方法总体是一个for(;;)死循环,然后for循环里头依次执行的三个重要方法分别是:

  1. select():调用Selector#select()方法监听注册到Selector上的Channel,等待Channel关注的事件就绪。
  2. processSelectedKeys():处理就绪事件,会调用ChannelPipeline处理,ChannelPipeline又会通过责任链模式调用里面的ChannelHandler处理。
  3. runAllTasks():处理NioEventLoop中的taskQueue的异步任务。

这就是Netty事件循环的大体逻辑,下面我们进入代码解析。

源码解析

select()

    private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {return selector.select();}long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);}

在NioEventLoop的事件循环中,首先是select()方法的调用,select()方法里面就是调用NioEventLoop对应的Selector的select()方法,阻塞当前线程,监听注册到Selector的Channel,等待事件就绪。

在这里插入图片描述

当有事件就绪时,当前线程就会解阻塞,然后调用processSelectedKeys()方法处理就绪事件。

processSelectedKeys()

NioEventLoop的 processSelectedKeys()方法会进入processSelectedKey(SelectionKey k, AbstractNioChannel ch)方法。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();try {int readyOps = k.readyOps();...if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (...) {...}}

无论是accept事件,还是read事件时,都是调用unsafe.read()方法。

在这里插入图片描述

只是这里的Unsafe的类型有可能不一样,如果是NioServerSocketChannel的话,那么Unsafe的类型就是NioMessageUnsafe,是在创建NioServerSocketChannel时就创建好的,我们看一下NioMessageUnsafe的read()方法。

NioMessageUnsafe#read()

    private final class NioMessageUnsafe extends AbstractNioUnsafe {private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {...doReadMessages(readBuf);...int size = readBuf.size();for (int i = 0; i < size; i ++) {...pipeline.fireChannelRead(readBuf.get(i));}...}}

重要方法就两个doReadMessages(readBuf)和pipeline.fireChannelRead(readBuf.get(i)),其他代码全部省略不看。

在这里插入图片描述

我们先看doReadMessages(readBuf)

@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());...buf.add(new NioSocketChannel(this, ch));...}

SocketUtils.accept(javaChannel())里面是调用ServerSocketChannel的accept()方法,返回一个SocketChannel。

然后new NioSocketChannel(this, ch)把返回的SocketChannel包装成NioSocketChannel,放入buf中,这个buf就是外面read()方法的readBuf。

在这里插入图片描述

NioSocketChannel的构造方法调用父类AbstractNioByteChannel的构造方法:

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {super(parent, ch, SelectionKey.OP_READ);}

可以看到指定关注的事件类型是read事件,再看看AbstractNioByteChannel的父类AbstractNioChannel的构造方法

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;try {ch.configureBlocking(false);} catch (...) {...}}

继续调用父类的构造方法,然后保存了NioSocketChannel和关注的事件类型OP_READ,设置NioSocketChannel为非阻塞。

再看AbstractNioChannel的父类AbstractChannel的构造方法

    protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}

创建一个Unsafe,然后初始化了ChannelPipeline。newUnsafe()方法会进入NioSocketChannel的newUnsafe()方法,创建的时NioSocketChannelUnsafe类型的Unsafe,因此NioSocketChannel的Unsafe类型就是NioSocketChannelUnsafe。

    protected AbstractNioUnsafe newUnsafe() {return new NioSocketChannelUnsafe();}

NioSocketChannel的构造方法总结起来就是做了5件事:

  1. 创建并保存NioSocketChannelUnsafe
  2. 创建并保存ChannelPipeline
  3. 保存SocketChannel
  4. 保存关注的事件类型OP_READ
  5. 设置SocketChannel为非阻塞

在这里插入图片描述

然后回到unsafe.read()方法,接下来执行的代码pipeline.fireChannelRead(readBuf.get(i))就是调用触发就绪事件的Channel对应的ChannelPipeline的fireChannelRead(…)方法,触发ChannelPipeline中每个处理入站事件的ChannelHandler的入站事件处理。

ChannelPipeline的fireChannelRead(…)方法会从头到尾以责任链的处理方式调用每个ChannelInboundHandler类型的channelRead(…)方法。NioServerSocketChannel的ChannelPipeline中的ChannelHandler是ServerBootstrapAcceptor。我们看看ServerBootstrapAcceptor的channelRead(…)方法。

        public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);try {childGroup.register(child).addListener(new ChannelFutureListener() {...});} catch (...) {...}}

这里的参数msg的类型是NioSocketChannel,就是上面放入buf中的NioSocketChannel,调用pipeline.fireChannelRead(readBuf.get(i))方法时readBuf.get(i)就是中buf中取出NioSocketChannel作为参数传进来。

然后child.pipeline().addLast(childHandler)这一行这里的childHandler是我们定义的ChannelInitializer,这里放入NioSocketChannel的ChannelPipeline中,当NioSocketChannel里面的SocketChannel被注册到Selector之后,会触发ChannelInitializer的调用,初始化ChannelPipeline。

然后childGroup.register(child)就是把这个NioSocketChannel注册到workerGroup中的其中一个NioEventLoop上,也就是把NioSocketChannel中的SocketChannel注册到workerGroup中的其中一个NioEventLoop的Selector上。

在这里插入图片描述

这里NioSocketChannel注册的细节跟NioServerSocketChannel的注册是一样的,上一篇文章已经分析过,这里就不重复了。

NioSocketChannel注册好之后,就可以接收客户端发来的数据,于是又有read事件触发,此时调用的unsafe.read(),就是NioSocketChannelUnsafe的父类NioByteUnsafe的read()方法了。

NioByteUnsafe#read()

        @Overridepublic final void read() {...byteBuf = allocHandle.allocate(allocator);doReadBytes(byteBuf);...pipeline.fireChannelRead(byteBuf);...}

就是通过allocator分配一个ByteBuf,然后把Channel中的数据读取到ByteBuf中,然后调用pipeline.fireChannelRead(byteBuf)触发ChannelPipeline的处理,然后ChannelPipeline中的ChannelHandler就会处理byteBuf中的数据,这里的ChannelPipeline中的ChannelHandler就是我们定义的ChannelInitializer组装到ChannelPipeline中的ChannelHandler了。

在这里插入图片描述

runAllTasks()

runAlllTasks方法其实就是从NioEventLoop的taskQueue中不停的取出task并执行。

    protected boolean runAllTasks(long timeoutNanos) {...Runnable task = pollTask();...for (;;) {safeExecute(task);...task = pollTask();if (task == null) {...break;}}...}

可以看到,就是在一个for循环中,pollTask()方法取出task,然后在下一轮循环中调用safeExecute(task)去执行,safeExecute(task)里面就是调用task.run()方法直接执行,没什么好看的。如果pollTask()方法取出的task为null,那么就break结束循环。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/857472.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络 交换机的VLAN配置

一、理论知识 1.VLAN的定义 ①VLAN虚拟局域网&#xff0c;是一种通过将局域网内的设备逻辑地而不是物理地划分成一个个网段从而实现虚拟工作组的技术。 ②IEEE于1999年颁布了用以标准化VLAN实现方案的802.1Q协议标准草案。 ③VLAN技术允许网络管理者将一个物理的LAN逻辑地划…

MySQL存储管理(一):删数据

从表中删除数据 从表中删除数据&#xff0c;也即是delete过程。 什么是表空间 表空间可以看做是InnoDB存储引擎逻辑结构的最高层&#xff0c;所有的数据都存放在表空间中。默认情况下&#xff0c;InnoDB存储引擎有一个共享表空间idbdata1&#xff0c;即所有数据都存放在这个表…

无限滚动表格

纵向无限滚动 单元格内部横向滚动 <!--* Description: 横向、纵向滚动表格* Author: liyanfeng liyanfenghopewind.com* Date: 2024-06-15 16:06:57* LastEditors: liyanfeng liyanfenghopewind.com* LastEditTime: 2024-06-20 17:15:37* FilePath: \plus-ui\src\componen…

SEO是什么?SEO相关发展历史

一、SEO是什么意思&#xff1f; SEO&#xff08;Search Engine Optimization&#xff09;&#xff0c;翻译成中文就是“搜索引擎优化”。简单来讲&#xff0c;seo是指自然搜索结果下获得的网站流量的技术&#xff0c;是可以不用花钱就可以让自己的网站有好的排名&#xff0c;也…

C语言:生命周期和作用域,static和extern

关键字static与extern 1.作用域&#xff08;scope&#xff09;&#xff1a;代码中能够访问到变量的范围&#xff08;变量可以被使用的文本区间&#xff09;。&#xff08;分为全局作用域和局部作用域&#xff09; ☺全局作用域&#xff1a;在整个程序中都能访问的变量。通常…

C语言入门系列:数据类型转换

文章目录 一&#xff0c;自动类型转换1&#xff0c;赋值运算1.1&#xff0c;浮点数赋值给整型变量-不安全1.2&#xff0c;整数赋值给浮点数变量-安全1.3&#xff0c;窄类型赋值给宽类型-安全1.4&#xff0c;宽类型赋值给窄类型-不安全 2&#xff0c;混合类型的运算2.1&#xff…

Ubuntu24使用kubeadm部署高可用K8S集群

Ubuntu24使用kubeadm部署高可用K8S集群 使用kubeadm部署一个k8s集群&#xff0c;3个master1个worker节点。 1. 环境信息 操作系统&#xff1a;ubuntu24.04内存: 2GBCPU: 2网络: 能够互访&#xff0c;能够访问互联网 hostnameip备注k8s-master1192.168.0.51master1k8s-maste…

20.Cargo和Crates.io

标题 一、采用发布配置自定义构建1.1 默认配置1.2 修改配置项 二、将crate发布到Crates.io2.1 编写文档注释2.2 常用&#xff08;文档注释&#xff09;部分2.3 文档注释作用测试2.4 为包含注释的项添加文档注释2.5 使用pub use导出公有API2.6 创建Crates.io账号2.7 发布2.8 版本…

基于STM32的智能停车场管理系统

目录 引言环境准备智能停车场管理系统基础代码实现&#xff1a;实现智能停车场管理系统 4.1 车位检测模块4.2 数据处理与分析4.3 控制系统实现4.4 用户界面与数据可视化应用场景&#xff1a;智能停车场管理与优化问题解决方案与优化收尾与总结 1. 引言 智能停车场管理系统通…

Linux常用命令(17)—pastesortcomm命令(有相关截图)

写在前面&#xff1a; 最近在学习Linux命令&#xff0c;记录一下学习Linux常用命令的过程&#xff0c;方便以后复习。仅供参考&#xff0c;若有不当的地方&#xff0c;恳请指正。如果对你有帮助&#xff0c;欢迎点赞&#xff0c;关注&#xff0c;收藏&#xff0c;评论&#xf…

仿中波本振电路的LC振荡器电路实验

手里正好有一套中波收音机套件的中周。用它来测试一下LC振荡器&#xff0c;电路如下&#xff1a; 用的是两只中频放大的中周&#xff0c;初步测试是用的中周自带的瓷管电容&#xff0c;他们应该都是谐振在465k附近。后续测试再更换电容测试。 静态电流&#xff0c;0.5到1mA。下…

malloc和new的本质区别

目录 一、结论 二、示例 1.实现类T 2.用malloc分配类T的内存空间 3.用new分配类T的内存空间 一、结论 malloc 和 new 都是用于在运行时动态分配内存的机制。但它们之间存在一些本质的区别&#xff0c;主要是在使用方面&#xff0c;现在我们直接说结论&#xff0c;然后在通过…

ArcGIS与Excel分区汇总统计三调各地类面积!数据透视表与汇总统计!

​ 点击下方全系列课程学习 点击学习—>ArcGIS全系列实战视频教程——9个单一课程组合系列直播回放 点击学习——>遥感影像综合处理4大遥感软件ArcGISENVIErdaseCognition 01 需求说明 介绍一下ArcGIS与Excel统计分区各地类的三调地类面积。 ArcGIS统计分析不会&#x…

Unity客户端的Http通讯实战

背景知识 在Unity游戏开发中&#xff0c;一个常见场景是&#xff0c;后端扔过来一个Swagger后端接口网页&#xff0c;需要你使用对应的接口对应的接口发送和接收数据&#xff0c;如图所示为发起Get请求&#xff1a; 我们可以通过点击Try it out按钮直接在网页上测试收发数据&a…

spring整合openAI大模型之Spring AI

文章目录 一、SpringAI简介1.什么是SpringAI2.SpringAI支持的大模型类型&#xff08;1&#xff09;聊天模型&#xff08;2&#xff09;文本到图像模型&#xff08;3&#xff09;转录&#xff08;音频到文本&#xff09;模型&#xff08;4&#xff09;嵌入模型&#xff08;5&…

Guava-EventBus 源码解析

EventBus 采用发布订阅者模式的实现方式&#xff0c;它实现了泛化的注册方法以及泛化的方法调用,另外还考虑到了多线程的问题,对多线程使用时做了一些优化&#xff0c;观察者模式都比较熟悉&#xff0c;这里会简单介绍一下&#xff0c;重点介绍的是如何泛化的进行方法的注册以及…

dial tcp 10.96.0.1:443: connect: no route to host

1、创建Pod一直不成功&#xff0c;执行kubectl describe pod runtime-java-c8b465b98-47m82 查看报错 Warning FailedCreatePodSandBox 2m17s kubelet Failed to create pod sandbox: rpc error: code Unknown desc failed to setup network for…

数据挖掘与分析——数据预处理

数据探索 波士顿房价数据集&#xff1a;卡内基梅隆大学收集&#xff0c;StatLib库&#xff0c;1978年&#xff0c;涵盖了麻省波士顿的506个不同郊区的房屋数据。 一共含有506条数据。每条数据14个字段&#xff0c;包含13个属性&#xff0c;和一个房价的平均值。 数据读取方法…

昨天gitee网站访问不了,开始以为电脑哪里有问题了

昨天gitee网站下午访问不了&#xff0c;开始以为是什么毛病。 结果同样的网络&#xff0c;手机是可以访问的。 当然就ping www.gitee.com 结果也下面那样是正常的 以为是好的&#xff0c;但就是访问www.gitee.com也是不行&#xff0c;后来用阿里云的服务器curl访问是下面情况&…

LabVIEW机器视觉在质量控制中的应用

基于LabVIEW的机器视觉系统在质量控制中应用广泛&#xff0c;通过图像采集、处理和分析&#xff0c;自动检测产品缺陷、测量尺寸和识别标记&#xff0c;提高生产效率和产品质量。下面介绍LabVIEW机器视觉系统在质量控制中的实现方法、应用场景及其优势。 项目背景 在现代制造业…