Netty Reactor 模式解析

目录

Reactor 模式        

具体流程

配置 

初始化

NioEventLoop 

ServerBootstrapAcceptor 分发


Reactor 模式        

在刚学 Netty 的时候,我们肯定都很熟悉下面这张图,它就是单Reactor多线程模型。

在写Netty 服务端代码的时候,下面的代码时必不可少的,这是为什么呢?

public static void main(String[] args) {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(4);ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new NettyServerHandler(), new NettyServerHandler2());System.out.println("netty server start...");bootstrap.bind(9000);
}

        在 Netty 里,EventLoopGroup 就是线程池,不论 bossGroup 还是 workerGroup,它们里面的线程都叫 EventLoop。

        EventLoopGroup 就是一个线程池,bossGroup 叫连接线程池,它一般只有一个线程,workerGroup 叫工作线程池,它一般会有多个线程。bossGroup 线程池里的线程专门监听客户端连接事件,监听是否有 SelectionKey.OP_ACCEPT 事件被触发,所以 1 个线程就够用了,当它监听到有客户端请求连接时,它会把这个连接交给 workerGroup 里的一个线程去处理,这个过程叫分发,这个工作线程会为这个客户端建立一个 NIOSocketChannel,并注册到这个工作线程绑定的IO多路复用选择器 Selector 里,一个Selector可以接受多个 NIOSocketChannel 的注册,所以一个工作线程可以处理多个客户端。   这就是Reactor 模式,一个工作线程可以处理多个客户端,比 Java 传统的一个客户端对应一个工作线程节约了很多线程,减少了大量线程创建,线程切换,线程销毁的开销,所以Netty 性能很好。

        上面短短的服务端代码做了很多工作,当它刚启动还没有客户端请求连接时,bossGroup 连接线程池里的一个线程 EventLoop 会初始化一个 NioServerSocketChannel ,并把这个Channel注册到这个EventLoop 持有的IO多路复用选择器Selector里,Selector 会监听Channel里的 SelectionKey.OP_ACCEPT 事件,一旦有客户端连接过来,它会通过下面代码获取到一个

SocketChannel ch = javaChannel().accept();

NioSocketChannel,并把这个 NioSocketChannel 注册到 workerGroup 工作线程池里的一个EventLoop 里,它使用了一个叫 ServerBootstrapAcceptor 的 ChannelInboundHandler接口类去完成这个过程,连接完成后,后续这个客户端和服务端的交互和数据读写都在这个 EventLoop 完成。

具体流程

        下面我们看一下代码,Netty 代码中使用了很多继承,在继承中可以把子类相同的部分代码提到父类去完成,很多子类生成初始化的时候,它会调用父类的构造方法去完成,这个要注意。

配置 

        下面的代码主要做一些启动器的配置,group(bossGroup, workerGroup) 会设置连接线程池和工作线程池,后面有连接事件或读事件过来要处理时,它会从这些线程池里取线程去执行;channel(NioServerSocketChannel.class) 指定要生成服务端Channel,它只会监听SelectionKey.OP_ACCEPT 事件,childHandler(new NettyServerHandler(), new NettyServerHandler2()) 是我们业务处理的逻辑。

bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new NettyServerHandler(), new NettyServerHandler2());

初始化

        bind() 会把 ServerBootstrapAcceptor 添加到 NioServerSocketChannel 的 pipeline ,它会处理连接;获取一个 bossGroup 线程池里的 EventLoop并和 NioServerSocketChannel  进行绑定。

bootstrap.bind(9000)
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> {public void bind(int inetPort) {doBind(new InetSocketAddress(inetPort));} // bind 流程private void doBind(final SocketAddress localAddress) {initAndRegister();// 让channel绑定的线程处理channel.eventLoop().execute(()->{// 绑定指定端口channel.bind(localAddress);});} // 初始化和注册final void initAndRegister() {init(channel);// 把 NioServerSocketChannel 注册到一个复杂连接事件的 EventLoop 的 Selector 里 group.register(channel);} // 把 ServerBootstrapAcceptor 添加到 NioServerSocketChannel 的 pipeline 里abstract void init(Channel channel);
}

NioEventLoop 

        现在要说一下 NioEventLoop,它拥有一个IO多路复用选择器 Selector,这个线程会在一个死循环里工作,永远也会停止;这个线程它会先执行一下 selector.select(1000),阻塞监听1秒,看看有没有Channel有事件过来,有就去处理任务,没有就等待1秒钟再超时放弃,再看看自己的任务队列有没有可执行的任务,有就去处理任务,没有就继续进行死循环,继续执行 selector.select(1000)。无论是连接线程还是工作线程都这样处理,因为它们共用了这套逻辑。

public class NioEventLoop extends SingleThreadEventLoop {@Overrideprotected void run() {for (;;) {try {select();} catch (IOException e) {e.printStackTrace();}try {# 处理事件processSelectedKeys();} finally {runAllTasks();}}}private void select() throws IOException {// 拿到多路复用器Selector selector = this.selector;for (;;) {// 等待,简化固定1秒int selectedKeys = selector.select(1000);// 如果有事件发生或当前有任务跳出循环if (selectedKeys != 0 || hasTasks()) {break;}}}
}

像下面这种 channel.eventLoop().execute(Runnable), 它也只是把 Runnable 加入到任务处理队列,稍后执行。

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> { private void doBind(final SocketAddress localAddress) {...channel.eventLoop().execute(()->{ channel.bind(localAddress);});}  
}
public abstract class SingleThreadEventExecutor implements Executor {    // 待执行任务队列private final Queue<Runnable> taskQueue;   @Overridepublic void execute(Runnable task) {// 把任务添加到 EventLoop 的任务队列,EventLoop 是 SingleThreadEventExecutor 的子类addTask(task);// 执行 EventLoop 的 run 逻辑startThread();}
}

        当 NioServerSocketChannel.accept() 监听到一个客户端连接,它会把这个 NIOSocketChannel 通过 pipeline 处理,最终被 ServerBootstrapAcceptor 所处理,

public class NioServerSocketChannel extends AbstractNioMessageChannel {@Overrideprotected int doReadMessages(List<Object> buf) {SocketChannel ch = null;try {ch = javaChannel().accept();} catch (IOException e) {}if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}return 0;}
}
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {@Overridepublic void read() {final ChannelPipeline pipeline = pipeline();doReadMessages(readBuf);int size = readBuf.size();for (int i = 0; i < size; i ++) {pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();}protected abstract int doReadMessages(List<Object> buf);
}

ServerBootstrapAcceptor 分发

        ServerBootstrapAcceptor 管理 workerGroup 里的所有工作线程和所有的业务处理代码 ChannelHandler,ServerBootstrapAcceptor 会把所有的 ChannelHandler 放到刚刚监听得到的 NIOSocketChannel 里的 pipeline 里,并从 workerGroup 里选择一个 EventLoop 工作线程把NIOSocketChannel 注册到该 EventLoop 拥有的IO多路复用选择器 Selector 里去,这就完成了分发,它已经处理了连接,后续这个 NIOSocketChannel 里的所有读写事件都会被 Selector 监听到,并被该 EventLoop 工作线程所处理。

private static class ServerBootstrapAcceptor implements ChannelInboundHandler {// 工作线程池,即 workerGroup private final EventLoopGroup childGroup;// 业务操作 Handlerprivate final ChannelHandler[] childHandlers;private ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler[] childHandlers) {this.childGroup = childGroup;this.childHandlers = childHandlers;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {final Channel child = (Channel) msg;// 完成 pipeline 责任链模式的组装for (ChannelHandler childHandler : childHandlers) {child.pipeline().addLast(childHandler);}// 把Channel 注册到 SelectorchildGroup.register(child);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 略}}
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/646182.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用docker实现越权漏洞-webug靶场搭建(超详解)

越权漏洞-webug靶场搭建 1.打开docker systemctl start docker 2.查找webug docker search webug 3.拉取docker.io/area39/webug 镜像 docker pull docker.io/area39/webug 4.查看镜像 docker images 5.创建容器 docker run -d -p 8080:80 --name webug docker.io/area39/we…

Database history tablesupgraded

zabbix升级到6之后&#xff0c;配置安装完成会有一个红色输出&#xff0c;但是不影响zabbix使用&#xff0c;出于强迫症&#xff0c;找到了该问题的解决方法。 Database history tables upgraded: No. Support for the old numeric type is deprecated. Please upgrade to nume…

Qt5编译qextserialport(Qt5.14.2+VS2017)

1、qextserialport库下载 (1)github GitHub - qextserialport/qextserialport: Automatically exported from code.google.com/p/qextserialport (2) code.google https://code.google.com/archive/p/qextserialport/downloads 我下载的是最新版qextserialport-1.2rc.zip ​…

手拉手JavaFX UI控件与springboot3+FX桌面开发

目录 javaFx文本 javaFX颜色 字体 Label标签 Button按钮 //按钮单击事件 鼠标、键盘事件 //(鼠标)双击事件 //键盘事件 单选按钮RadioButton 快捷键、键盘事件 CheckBox复选框 ChoiceBox选择框 Text文本 TextField(输入框)、TextArea文本域 //过滤 (传入一个参数&a…

短剧小程序分销系统开发:创新与机遇的融合

一、引言 随着移动互联网的快速发展&#xff0c;短剧作为一种新兴的娱乐形式&#xff0c;正逐渐成为人们生活中的一部分。短剧小程序分销系统的开发&#xff0c;不仅为短剧的传播提供了新的渠道&#xff0c;同时也为相关产业带来了新的商业机会。本文将探讨短剧小程序分销系统…

web架构师编辑器内容-图层拖动排序功能的开发

新的学习方法 用手写简单方法实现一个功能然后用比较成熟的第三方解决方案即能学习原理又能学习第三方库的使用 从两个DEMO开始 Vue Draggable Next: Vue Draggable NextReact Sortable HOC: React Sortable HOC 列表排序的三个阶段 拖动开始&#xff08;dragstart&#x…

[BJDCTF2020]ZJCTF,不过如此(特详解)

php特性 1.先看代码&#xff0c;提示了next.php&#xff0c;绕过题目的要求去回显next.php 2.可以看到要求存在text内容而且text内容强等于后面的字符串&#xff0c;而且先通过这个if才能执行下面的file参数。 3.看到用的是file_get_contents()函数打开text。想到用data://协…

缓存高并发问题

Redis 做缓存虽减轻了 DBMS 的压力&#xff0c;减小了 RT&#xff0c;但在高并发情况下也是可能会出现各种问题的。 缓存穿透 当用户访问的数据既不在缓存也不在数据库中时&#xff0c;就会导致每个用户查询都会“穿透”缓存“直抵”数据库。这种情况就称为缓存穿透。当高度发…

python222网站实战(SpringBoot+SpringSecurity+MybatisPlus+thymeleaf+layui)-后台管理主页面实现

锋哥原创的SpringbootLayui python222网站实战&#xff1a; python222网站实战课程视频教程&#xff08;SpringBootPython爬虫实战&#xff09; ( 火爆连载更新中... )_哔哩哔哩_bilibilipython222网站实战课程视频教程&#xff08;SpringBootPython爬虫实战&#xff09; ( 火…

程序员手把手教你参与开源!拿捏!

一、前言 有一些同学提问&#xff0c;希望在自己的简历上增加一些有含金量的项目经历&#xff0c;最好能够去参与一些开源项目的开发&#xff0c;但由于对一个庞大的开源项目缺乏认知&#xff0c;难以着手。同时也担心自己能力不足&#xff0c;不知道自己写的代码是否会被接纳。…

flutter 五点一点四:MaterialApp Theme 给你一堆颜色看看

ColorScheme colorScheme, // 拥有30种颜色(这个数可能过几个版本会变化吧)&#xff0c;可用于配置大多数组件的颜色。 A set of 30 colors based on the[Material spec] that can be used to configure the color properties of most components.Color canvasColor, // Mater…

五分钟学会接口自动化测试框架

今天&#xff0c;我们来聊聊接口自动化测试。 接口自动化测试是什么&#xff1f;如何开始&#xff1f;接口自动化测试框架如何搭建&#xff1f; 自动化测试 自动化测试&#xff0c;这几年行业内的热词&#xff0c;也是测试人员进阶的必备技能&#xff0c;更是软件测试未来发…

05.Elasticsearch应用(五)

Elasticsearch应用&#xff08;五&#xff09; 1.Mapping介绍 Mapping是对索引库中文档的约束&#xff0c;类似于数据表结构&#xff0c;作用如下&#xff1a; 定义索引中的字段的名称定义字段的数据类型&#xff0c;例如字符串&#xff0c;数字&#xff0c;布尔等字段&…

FreeRFTOS中的临界段(代码)

前言 本片文章记录我学习FreeRTOS中的“临界段”知识点&#xff0c;同时也希望我的分享能给你带来帮助 目录 前言 一、临界段&#xff08;临界区&#xff09; 二、任务级临界段代码 三、中断级临界段代码保护 四、结语 一、临界段&#xff08;临界区&#xff09; 在Fr…

仅使用 Python 创建的 Web 应用程序(前端版本)第06章_登录页面

从本章开始,我们将创建每个页面。 本栏的例子 可以访问这里, WTS 首先是登录页面。 完成后的图像如下 创建过程如下 No类型内容1Model创建继承BaseDataModel的数据类User、Session2MockDB创建用户表并添加管理员/成员用户3Service创建AuthAPIClient、UserAPIClient4Page定义…

程序员必备的20个学习网站

今天好学编程小编整理了20个程序员必备的学习网站&#xff0c;此篇对于新手程序员比较有用&#xff0c;技术老鸟们也可以查缺补漏。话不多说&#xff0c;纯纯干货呈上&#xff0c;赶紧点个赞收藏&#xff0c;以后会用得上&#xff01; 技术网站类 1、博客园 一个面向开发者的…

SpringBoot 3.1.7 集成Kafka 3.5.0

一、背景 写这边篇文章的目的&#xff0c;是记录我在集成kafka客户端遇到的一些问题&#xff0c;文章会记录整个接入的过程&#xff0c;其中会遇到几个坑&#xff0c;如果需要最终版本&#xff0c;直接看最后一节就行了&#xff0c;感觉Spring-Kafka的文档太少了&#xff0c;如…

【github】使用github action 拉取国外docker镜像

使用github action 拉取国外docker镜像 k8s部署经常用到国外镜像&#xff0c;如果本地无法拉取可以考虑使用github action环境 github action的ci服务器在国外&#xff0c;不受中国防火墙影响github action 自带docker命令运行时直接将你仓库代码拉取下来 步骤 你的国内dock…

React16源码: React中的unwindWork的源码实现

unwindWork 1 &#xff09;概述 在 renderRoot 的 throw Exception 里面, 对于被捕获到错误的组件进行了一些处理并且向上去寻找能够处理这些异常的组件&#xff0c;比如说 class component 里面具有getDerivedStateFromError 或者 componentDidCatch 这样的生命周期方法这个c…

QT发生弹出警告窗口

QTC开发程序弹出警告窗口&#xff0c;如上图 实施代码&#xff1a; #include <QMessageBox> int main() {// 在发生错误的地方QMessageBox::critical(nullptr, "错误", "发生了一个错误&#xff0c;请检查您的操作。");}上面的文字可以更改&#x…