深入理解Netty以及为什么项目中要使用?(六)Netty核心组件实例

调度器详解

前面我们讲过NIO多路复用的设计模式之Reactor模型,Reactor模型的主要思想就是把网络连接、事件分发、任务处理的职责进行分离,并且通过引入多线程来提高Reactor模型中的吞吐量。其中包括三种Reactor模型

  • 单线程单Reactor模型

  • 多线程单Reactor模型

  • 多线程多Reactor模型

在Netty中,可以非常轻松的实现上述三种线程模型,并且Netty推荐使用主从多线程模型,这样就可以轻松的实现成千上万的客户端连接的处理。在海量的客户端并发请求中,主从多线程模型可以通过增加SubReactor线程数量,充分利用多核能力提升系统吞吐量。

Reactor模型的运行机制分为四个步骤:

  • 连接注册,Channel建立后,注册到Reactor线程中的Selector选择器

  • 事件轮询,轮询Selector选择器中已经注册的所有Channel的I/O事件

  • 事件分发,为准备就绪的I/O事件分配相应的处理线程

  • 任务处理,Reactor线程还负责任务队列中的非I/O任务,每个Worker线程从各自维护的任务队列中取出任务异步执行。

EventLoop事件循环

在Netty中,Reactor模型的事件处理器是使用EventLoop来实现的,一个EventLoop对应一个线程,EventLoop内部维护了一个Selector和taskQueue,分别用来处理网络IO事件以及内部任务

EventLoop基本应用

下面这段代码表示EventLoop,分别实现Selector注册以及普通任务提交功能。

public class EventLoopExample {public static void main(String[] args) {EventLoopGroup group=new NioEventLoopGroup(2);System.out.println(group.next()); //输出第一个NioEventLoopSystem.out.println(group.next()); //输出第二个NioEventLoopSystem.out.println(group.next()); //由于只有两个,所以又会从第一个开始//获取一个事件循环对象NioEventLoopgroup.next().register(); //注册到selector上group.next().submit(()->{System.out.println(Thread.currentThread().getName()+"-----");});}
}

EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理IO事件和内部任务。IO事件和内部任务执行时间百分比通过ioRatio来调节,ioRatio表示执行IO时间所占百分比。任务包括普通任务和已经到时的延迟任务,延迟任务存放到一个优先级队列PriorityQueue中,执行任务前从PriorityQueue读取所有到时的task,然后添加到taskQueue中,最后统一执行task。

EventLoop如何实现多种Reactor模型

  • 单线程模式

    EventLoopGroup group=new NioEventLoopGroup(1);
    ServerBootstrap b=new ServerBootstrap();
    b.group(group);
    
  • 多线程模式

    EventLoopGroup group =new NioEventLoopGroup(); //默认会设置cpu核心数的2倍
    ServerBootstrap b=new ServerBootstrap();
    b.group(group);
    
  • 多线程主从模式

    EventLoopGroup boss=new NioEventLoopGroup(1);
    EventLoopGroup work=new NioEventLoopGroup();
    ServerBootstrap b=new ServerBootstrap();
    b.group(boss,work);

 

EventLoop实现原理

  • EventLoopGroup初始化方法,在MultithreadEventExecutorGroup.java中,根据配置的nThreads数量,构建一个EventExecutor数组

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {checkPositive(nThreads, "nThreads");if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {children[i] = newChild(executor, args);}}
    }
    
  • 注册channel到多路复用器的实现,MultithreadEventLoopGroup.register方法()

    SingleThreadEventLoop ->AbstractUnsafe.register ->AbstractChannel.register0->AbstractNioChannel.doRegister()

    可以看到会把channel注册到某一个eventLoop中的unwrappedSelector复路器中。

    protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;}}
    }
    
  • 事件处理过程,通过NioEventLoop中的run方法不断遍历

    protected void run() {int selectCnt = 0;for (;;) {try {int strategy;try {//计算策略,根据阻塞队列中是否含有任务来决定当前的处理方式strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try {if (!hasTasks()) { //如果队列中数据为空,则调用select查询就绪事件strategy = select(curDeadlineNanos);}} finally {nextWakeupNanos.lazySet(AWAKE);}default:}}selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;/* ioRatio调节连接事件和内部任务执行事件百分比* ioRatio越大,连接事件处理占用百分比越大 */final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {if (strategy > 0) { //处理IO时间processSelectedKeys();}} finally {//确保每次都要执行队列中的任务ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks}if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}selectCnt = 0;} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)selectCnt = 0;}}
    }
    

服务编排层Pipeline的协调处理

通过EventLoop可以实现任务的调度,负责监听I/O事件、信号事件等,当收到相关事件后,需要有人来响应这些事件和数据,而这些事件是通过ChannelPipeline中所定义的ChannelHandler完成的,他们是Netty中服务编排层的核心组件。

在下面这段代码中,我们增加了h1和h2两个InboundHandler,用来处理客户端数据的读取操作,代码如下。

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)//配置Server的通道,相当于NIO中的ServerSocketChannel.channel(NioServerSocketChannel.class)//childHandler表示给worker那些线程配置了一个处理器,// 这个就是上面NIO中说的,把处理业务的具体逻辑抽象出来,放到Handler里面.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//                            socketChannel.pipeline().addLast(new NormalMessageHandler());socketChannel.pipeline().addLast("h1",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("handler-01");super.channelRead(ctx, msg);}}).addLast("h2",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("handler-02");super.channelRead(ctx, msg);}});}});

上述代码构建了一个ChannelPipeline, 每个Channel都会绑定一个ChannelPipeline,一个ChannelPipeline包含多个ChannelHandler,这些Handler会被包装成ChannelHandlerContext加入到Pipeline构建的双向链表中。

ChannelHandlerContext用来保存ChannelHandler的上下文,它包含了ChannelHandler生命周期中的所有事件,比如connect/bind/read/write等,这样设计的好处是,各个ChannelHandler进行数据传递时,前置和后置的通用逻辑就可以直接保存到ChannelHandlerContext中进行传递。

 

ChannelHandler事件触发机制

当某个Channel触发了IO事件后,会通过Handler进行处理,而ChannelHandler是围绕I/O事件的生命周期来设计的,比如建立连接、读数据、写数据、连接销毁等。

ChannelHandler有两个重要的子接口实现,分别拦截数据流入和数据流出的I/O事件

  • ChannelInboundHandler

  • ChannelOutboundHandler

Adapter类,提供很多默认操作,比如ChannelHandler中有很多很多方法,我们用户自定义的方法有时候不需要重载全部,只需要重载一两个方法,那么可以使用Adapter类,它里面有很多默认的方法。其它框架中结尾是Adapter的类的作用也大都是如此。所以我们在使用netty的时候,往往很少直接实现ChannelHandler的接口,经常是继承Adapter类。

ChannelInboundHandler事件回调和触发时机如下

 ChannelOutboundHandler时间回调触发时机

 

事件传播机制演示

public class NormalOutBoundHandler extends ChannelOutboundHandlerAdapter {private final String name;public NormalOutBoundHandler(String name) {this.name = name;}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutBoundHandler:"+name);super.write(ctx, msg, promise);}
}
public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {private final String name;private final boolean flush;public NormalInBoundHandler(String name, boolean flush) {this.name = name;this.flush = flush;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("InboundHandler:"+name);if(flush){ctx.channel().writeAndFlush(msg);}else {super.channelRead(ctx, msg);}}
}
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)//配置Server的通道,相当于NIO中的ServerSocketChannel.channel(NioServerSocketChannel.class)//childHandler表示给worker那些线程配置了一个处理器,// 这个就是上面NIO中说的,把处理业务的具体逻辑抽象出来,放到Handler里面.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NormalInBoundHandler("NormalInBoundA",false)).addLast(new NormalInBoundHandler("NormalInBoundB",false)).addLast(new NormalInBoundHandler("NormalInBoundC",true));socketChannel.pipeline().addLast(new NormalOutBoundHandler("NormalOutBoundA")).addLast(new NormalOutBoundHandler("NormalOutBoundB")).addLast(new NormalOutBoundHandler("NormalOutBoundC"));}});

上述代码运行后会得到如下执行结果

InboundHandler:NormalInBoundA
InboundHandler:NormalInBoundB
InboundHandler:NormalInBoundC
OutBoundHandler:NormalOutBoundC
OutBoundHandler:NormalOutBoundB
OutBoundHandler:NormalOutBoundA

当客户端向服务端发送请求时,会触发服务端的NormalInBound调用链,按照排列顺序逐个调用Handler,当InBound处理完成后调用WriteAndFlush方法向客户端写回数据,此时会触发NormalOutBoundHandler调用链的write事件。

从执行结果来看,Inbound和Outbound的事件传播方向是不同的,Inbound传播方向是head->tail,Outbound传播方向是Tail-Head。

异常传播机制

ChannelPipeline时间传播机制是典型的责任链模式,那么有同学肯定会有疑问,如果这条链路中某个handler出现异常,那会导致什么问题呢?我们对前面的例子修改NormalInBoundHandler

public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {private final String name;private final boolean flush;public NormalInBoundHandler(String name, boolean flush) {this.name = name;this.flush = flush;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("InboundHandler:"+name);if(flush){ctx.channel().writeAndFlush(msg);}else {//增加异常处理throw new RuntimeException("InBoundHandler:"+name);}}
}

这个时候一旦抛出异常,会导致整个请求链被中断,在ChannelHandler中提供了一个异常捕获方法,这个方法可以避免ChannelHandler链中某个Handler异常导致请求链路中断。它会把异常按照Handler链路的顺序从head节点传播到Tail节点。如果用户最终没有对异常进行处理,则最后由Tail节点进行统一处理

修改NormalInboundHandler,重写下面这个方法。

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("InboundHandlerException:"+name);super.exceptionCaught(ctx, cause);
}

在Netty应用开发中,好的异常处理非常重要能够让问题排查变得很轻松,所以我们可以通过一种统一拦截的方式来解决异常处理问题。

添加一个复合处理器实现类

public class ExceptionHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if(cause instanceof RuntimeException){System.out.println("处理业务异常");}super.exceptionCaught(ctx, cause);}
}

把新增的ExceptionHandler添加到ChannelPipeline中

bootstrap.group(bossGroup, workerGroup)//配置Server的通道,相当于NIO中的ServerSocketChannel.channel(NioServerSocketChannel.class)//childHandler表示给worker那些线程配置了一个处理器,// 这个就是上面NIO中说的,把处理业务的具体逻辑抽象出来,放到Handler里面.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NormalInBoundHandler("NormalInBoundA",false)).addLast(new NormalInBoundHandler("NormalInBoundB",false)).addLast(new NormalInBoundHandler("NormalInBoundC",true));socketChannel.pipeline().addLast(new NormalOutBoundHandler("NormalOutBoundA")).addLast(new NormalOutBoundHandler("NormalOutBoundB")).addLast(new NormalOutBoundHandler("NormalOutBoundC")).addLast(new ExceptionHandler());}});

最终,我们就能够实现异常的统一处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/765854.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Python + Django】表结构创建

以员工管理系统为例。 事前呢&#xff0c;我们先把项目和app创建出来&#xff0c;详细步骤可以看我同栏目的第一篇、第二篇文章。 我知道你们是不会下来找的&#xff0c;就把链接贴在下面吧&#xff1a; 【Python Django】启动简单的文本页面-CSDN博客 【Python Django】…

Microsoft Windows 10 22H2官方简体中文正式版2023年12月更新版(最新微软原版ISO镜像)

Microsoft Windows 10 22H2官方简体中文正式版2023年12月更新版(最新微软原版ISO镜像) 将标红的地址放大迅雷里面下载就好&#xff01; MVS Microsoft Windows 10 22H2 官方正式版2023年12月版ISO镜像微软订阅中心发布信息 简体中文商业版2023年12月版&#xff08;教育版、…

【C语言进阶篇】动态内存管理

【C语言进阶篇】动态内存管理 &#x1f308;个人主页&#xff1a;开敲 &#x1f525;所属专栏&#xff1a;C语言 &#x1f33c;文章目录&#x1f33c; 1. 为什么要有动态内存分配 2.动态内存开辟和释放函数 2.1 动态内存释放函数 2.1.1 free函数 2.2 动态内存开辟函数 2.2.1 …

【鸿蒙HarmonyOS开发笔记】应用数据持久化之通过用户首选项实现数据持久化

概述 应用数据持久化&#xff0c;是指应用将内存中的数据通过文件或数据库的形式保存到设备上。内存中的数据形态通常是任意的数据结构或数据对象&#xff0c;存储介质上的数据形态可能是文本、数据库、二进制文件等。 HarmonyOS标准系统支持典型的存储数据形态&#xff0c;包…

OceanPen Art AI绘画系统 运营教程(三)2.10绘画全新界面升级

在一个崇高的目标支持下&#xff0c;不停地工作&#xff0c;即使慢&#xff0c;也一定会获得成功。 —— 爱因斯坦 演示站点&#xff1a; ai.oceanpen.art 官方论坛&#xff1a; www.jingyuai.com 一、前端用户界面全新体验 二、 MJ绘画分享 提示词自取&#xff1a;htt…

如何使用 ArcGIS Pro 制作好看的高程渲染图

虽然 ArcGIS Pro 已经提供了很多好看的配色方案&#xff0c;但是如果直接对高程DEM进行渲染效果不是很理想&#xff0c;我们可以结合山体阴影让高程渲染图看起来更加立体&#xff0c;这里为大家介绍一下制作方法&#xff0c;希望能对你有所帮助。 数据来源 教程所使用的数据是…

component-右侧抽屉组件

1.右侧抽屉组件 点击筛选&#xff0c;右侧抽屉滑出&#xff0c;点击取消或者点击空白处抽屉收起。 2.代码 <template><div class"all" click"hidden()"><!-- 抽屉 --><div class"drawer"><div class"drawerBo…

Android StateLayout状态页

文章目录 Android StateLayout状态页概述源码使用源码下载 Android StateLayout状态页 概述 StateLayout&#xff08;状态页&#xff09;包含&#xff1a;加载中页面&#xff0c;错误页面&#xff0c;空页面&#xff0c;内含状态默认页面&#xff0c;支持自定义页面。 源码 …

yolov8目标检测数据集制作——make sense

背景 在前几天我进行了录制视频以准备足够多的数据集&#xff0c;同时使用利用python自定义间隔帧数获取视频转存为图片&#xff0c;所以今天我准备对我要训练的数据集进行标注。我要做的是一个基于yolo的检测项目&#xff0c;在搜索资料后得知大家多是用labelme或者make sens…

使用 VS Code + Github 搭建个人博客

搭建个人博客的方案 现在&#xff0c;搭建个人博客的方式有很多&#xff0c;门槛也很低。 可以选择已有平台&#xff1a; 掘金语雀知乎简书博客园SegmentFault… 也可以选择一些主流的博客框架&#xff0c;自行搭建。 HexoGitBookVuePressdumi… 如何选择&#xff1f; 我…

Chain of Note-CoN增强检索增强型语言模型的鲁棒性

Enhancing Robustness in Retrieval-Augmented Language Models 检索增强型语言模型&#xff08;RALMs&#xff09;在大型语言模型的能力上取得了重大进步&#xff0c;特别是在利用外部知识源减少事实性幻觉方面。然而&#xff0c;检索到的信息的可靠性并不总是有保证的。检索…

[ESP32]:基于HTTP实现百度AI识图

[ESP32]&#xff1a;基于HTTP实现百度AI识图 测试环境&#xff1a; esp32-s3esp idf 5.1 首先&#xff0c;先配置sdk&#xff0c;可以写入到sdkconfig.defaults CONFIG_IDF_TARGET"esp32s3" CONFIG_IDF_TARGET_ESP32S3yCONFIG_PARTITION_TABLE_CUSTOMy CONFIG_PA…

值迭代和策略迭代【强化学习】

强化学习笔记 主要基于b站西湖大学赵世钰老师的【强化学习的数学原理】课程&#xff0c;个人觉得赵老师的课件深入浅出&#xff0c;很适合入门. 第一章 强化学习基本概念 第二章 贝尔曼方程 第三章 贝尔曼最优方程 第四章 值迭代和策略迭代 文章目录 强化学习笔记一、Value It…

江苏开放大学2024年春《中级会计实务(上) 050284》第1次任务第一单元总论、第二单元存货练习参考答案

答案&#xff1a;更多答案&#xff0c;请关注【电大搜题】微信公众号 答案&#xff1a;更多答案&#xff0c;请关注【电大搜题】微信公众号 答案&#xff1a;更多答案&#xff0c;请关注【电大搜题】微信公众号 电大搜题 多的用不完的题库&#xff…

Qt教程 — 3.6 深入了解Qt 控件:Display Widgets部件(2)

目录 1 Display Widgets简介 2 如何使用Display Widgets部件 2.1 QTextBrowser组件-简单的文本浏览器 ​2.2 QGraphicsView组件-简单的图像浏览器 Display Widgets将分为两篇文章介绍 文章1&#xff08;Qt教程 — 3.5 深入了解Qt 控件&#xff1a;Display Widgets部件-CSDN…

Magic Copy:一键AI抠图,在浏览器中获得任何图像素材

Magic Copy&#xff1a;轻松一点&#xff0c;精准抠图&#xff0c;让创意无限放大&#xff01; - 精选真开源&#xff0c;释放新价值。 概览 Magic Copy&#xff08;AI智能抠图插件&#xff09;是一个创新型的浏览器扩展工具&#xff0c;其独特之处在于能够无缝集成于用户的网…

CCDP.02.OS正确部署后的Dashboard摘图说明

前言 在部署成功OpenStack后&#xff0c;应该可以在浏览器打开Dashboard&#xff0c;并对计算资源&#xff08;这里主要是指VM&#xff09;进行管理&#xff0c;也可以在Dashboard上面查看OpenStack是否存在错误&#xff0c;下面&#xff0c;已针对检查的关键点&#xff0c;用红…

Mysql之索引存储原理

在介绍索引实现之前&#xff0c;我们先来了解下几种树的数据结构&#xff1a; 一、二叉搜索树 二叉搜索树有以下性质&#xff1a; 1.每个节点有一个关键字 2.左右孩子至多有一个。 3.关键字大于左孩子&#xff0c;小于右孩子。 正因为二叉搜索树的特性&#xff0c;所以这种数…

基于java+springboot+vue实现的游戏账号估价交易平台(文末源码+Lw+ppt)23-555

摘 要 系统根据现有的管理模块进行开发和扩展&#xff0c;采用面向对象的开发的思想和结构化的开发方法对游戏账号估价交易的现状进行系统调查。采用结构化的分析设计&#xff0c;该方法要求结合一定的图表&#xff0c;在模块化的基础上进行系统的开发工作。在设计中采用“自…

Structured Knowledge Distillation for Accurate and Efficient Object Detection

摘要 许多之前的知识蒸馏方法是为图像分类而设计的&#xff0c;在具有挑战性的任务&#xff08;如目标检测&#xff09;中失败。本文首先提出了知识蒸馏在目标检测中失败的主要原因是&#xff1a;&#xff08;1&#xff09;前景和背景之间不平衡&#xff1a;(2)缺乏对不同像素…