Netty-ChannelPipeline

EventLoop可以说是 Netty 的调度中心,负责监听多种事件类型:I/O 事件、信号事件、定时事件等,然而实际的业务处理逻辑则是由 ChannelPipeline 中所定义的 ChannelHandler 完成的,ChannelPipeline 和 ChannelHandler应用开发的过程中打交道最多的组件,为用户提供了 I/O 事件的全部控制权。

文章目录

  • 一、ChannelPipeline 是什么?🤔️
  • 二、ChannelPipeline 的内部结构🔍
    • 1、HeadContext
    • 2、TailContext
    • 3、addLiast() 方法🔍
  • 三、ChannelPipeline 事件传播机制
  • 四、ChannelPipeline 异常传播机制
  • 五、统一的异常处理器

一、ChannelPipeline 是什么?🤔️

pipeline 有管道,流水线的意思,最早使用在 Unix 操作系统中,可以让不同功能的程序相互通讯,使软件更加”高内聚,低耦合”,它以一种”链式模型”来串起不同的程序或组件,使它们组成一条直线的工作流。

ChannelPipeline 也是 Netty 中的一个比较重要的组件,从上面的 Channel 实例化过程可以看出,每一个 Channel 实例中都会包含一个对应的 ChannelPipeline 属性。ChannelPipeline维护着处理或拦截channel的进站事件和出站事件的双向链表,事件在ChannelPipeline中流动和传递,可以增加或删除ChannelHandler来实现对不同业务逻辑的处理。通俗的说,ChannelPipeline是工厂里的流水线,ChannelHandler是流水线上的工人。

二、ChannelPipeline 的内部结构🔍

final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise =  new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;
}

从 ChannelPipeline 的构造函数可以看出,ChannelPipeline 维护了一组 ChannelHandlerContext 实例组成双向链表。默认会包含 head 和 tail 头尾节点,用来进行一些默认的逻辑处理。我们自定义的ChannelHandler会插入到 head 和 tail 之间,这两个节点在 Netty 中已经默认实现了,它们在ChannelPipeline 中起到了至关重要的作用。

那么你可能会有疑问,为什么这里会多一层 ChannelHandlerContext 的封装呢?

其实这是一种比较常用的编程思想。ChannelHandlerContext用于保存ChannelHandler。ChannelHandlerContext包含了ChannelHandler生命周期的所有事件,如 connect、bind、read、 flush、write、close 等。

可以试想一下,如果没有ChannelHandlerContext 的这层封装,那么我们在做 ChannelHandler 之间传递的时候。前置后置的通用逻辑就要在每个 ChannelHandler 里都实现一份。

首先我们看下 HeadContext 和 TailContext 的继承关系
在这里插入图片描述

1、HeadContext

通过集成关系我们发现 HeadContext 分别实现了ChannelInboundHandler 和 ChannelOutboundHandler,即 HeadContext 既是 入站处理器,也是出站处理器。

HeadContext是入站第一站出站最后一站。对于1个请求先由HeadContext处理入栈,经过一系列的入栈处理器然后传递到TailContext,由TailContext往下传递经过一系列的出栈处理器,最后再经过HeadContext返回。

2、TailContext

TailContext 只实现了 ChannelInboundHandler 接口。它会在 ChannelInboundHandler 调用链路的最后一步执行,主要用于终止 入站事件传播,例如释放 Message 数据资源等。

TailContext是入站最后一站出站第一站。TailContext节点作为出站事件传播的第一站,仅仅是将出站事件传递给下一个节点。

从整个 ChannelPipeline 调用链路来看,如果由 Channel 直接触发事件传播,那么调用链路将贯穿整个 ChannelPipeline。然而也可以在其中某一个 ChannelHandlerContext 触发同样的方法,这样只会从当前的 ChannelHandler 开始执行事件传播,该过程不会从头贯穿到尾,在一定场景下,可以提高程序性能。

3、addLiast() 方法🔍

addLast() 方法是向 ChannelPipeline 中添加 ChannelHandler 用来进行业务处理,关于ChannelHandler将会在下文中详细讲解!
在这里插入图片描述

三、ChannelPipeline 事件传播机制

入站事件是由I/O线程被动触发,由入站处理器按自下而上的方向处理,在中途可以被拦截丢弃,出站事件由用户handler主动触发,由出站处理器按自上而下的方向处理。
在这里插入图片描述
接下来用一个示例来讲解~

服务端代码,

public class PipelineServer {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup(2);new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(1);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(2);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(3);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(4);super.write(ctx, msg, promise);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(5);super.write(ctx, msg, promise);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(6);super.write(ctx, msg, promise);}});}}).bind(8080);}
}

客户端代码,

public class PipelineClient {public static void main(String[] args) throws InterruptedException {new Bootstrap().group( new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new StringEncoder());}}).connect("127.0.0.1", 8080).sync().channel().writeAndFlush("Hello,server!");}
}

依次启动服务端和客户端,服务端打印如下:

1
2
3

以上我们通过 Pipeline 的 addLast 方法分别添加了三个 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter,添加顺序分别是 1 -> 2 -> 3,4 -> 5 -> 6。

此时为什么没有打印 4、5、6呢,即没有触发出站的操作❓

出站处理器只有向channel中写入数据才会触发,我们在第三个 ChannelInboundHandlerAdapter 实现类中加入以下代码!

在这里插入图片描述
通过依次点入,我们发现最终是调用了 tail节点 的writeAndFlush 方法,即TailContext节点作为出站事件传播的第一站!
在这里插入图片描述

最终服务端打印如下:

1
2
3
6
5
4

可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表

在这里插入图片描述

  • 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
    • 如果注释掉 1 处代码,则仅会打印 1
    • 如果注释掉 2 处代码,则仅会打印 1 2
  • 3 处的 ctx.channel().write(msg) 会 从尾部开始触发 后续出站处理器的执行
    • 如果注释掉 3 处代码,则仅会打印 1 2 3
  • 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
    • 如果注释掉 6 处代码,则仅会打印 1 2 3 6
  • ctx.channel().write(msg) vs ctx.write(msg)
    • 都是触发出站处理器的执行
    • ctx.channel().write(msg) 从尾部开始查找出站处理器
    • ctx.write(msg) 是从当前节点找上一个出站处理器
    • 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
    • 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己

如图,服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序

在这里插入图片描述

四、ChannelPipeline 异常传播机制

ChannelPipeline 事件传播的实现采用了经典的责任链模式,调用链路环环相扣。那么如果有一个节点处理逻辑异常会出现什么现象呢?我们通过修改 第二个 ChannelInboundHandlerAdapter 实现类 的实现来模拟业务逻辑异常:
在这里插入图片描述
由输出结果可以看出 ctx.fireExceptionCaugh 会将异常按顺序从 Head 节点传播到 Tail 节点
在这里插入图片描述
如果用户没有对异常进行拦截处理,最后将由 Tail 节点统一处理,在 TailContext 源码中可以找到具体实现:
在这里插入图片描述

五、统一的异常处理器

在 Netty 应用开发的过程中,良好的异常处理机制会让开发在排查问题的时候事半功倍。虽然 Netty 中 TailContext 提供了兜底的异常处理逻辑,但是在很多场景下,并不能满足我们的需求。假如你需要拦截指定的异常类型,并做出相应的异常处理,应该如何实现呢?

小编个人推荐用户对异常进行统一拦截,然后根据实际业务场景实现更加完善的异常处理机制。

通过异常传播机制的学习,我们应该可以想到最好的方法是在 ChannelPipeline 自定义处理器的末端添加统一的异常处理器!

/*** 自定义异常处理器*/
public static class ExceptionHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof RuntimeException) {System.out.println();log.error("业务异常处理,异常信息:{}", cause.getMessage());}}
}

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/64674.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言(第三十三天)

3.1.2 画图推演 3.2 举例2&#xff1a;顺序打印一个整数的每一位 输入一个整数m&#xff0c;打印这个按照顺序打印整数的每一位。 比如&#xff1a; 输入&#xff1a;1234 输出&#xff1a;1 2 3 4 输入&#xff1a;520 输出&#xff1a;5 2 0 3.2.1 分析和代码实现 这个题目&a…

数据结构--队列与循环队列

队列 队列是什么&#xff0c;先联想一下队&#xff0c;排队先来的人排前面先出&#xff0c;后来的人排后面后出&#xff1b;队列的性质也一样&#xff0c;先进队列的数据先出&#xff0c;后进队列的后出&#xff1b;就像图一的样子&#xff1a; 图1 如图1&#xff0c;1号元素是…

本地开机启动jar

1&#xff1a;首先有个可运行的jar包 本地以ruiyi代码为例打包 2&#xff1a;编写bat命令---命名为.bat即可 echo off java -jar D:\everyDay\test\RuoYi\target\RuoYi.jar 3&#xff1a;设置为开机自启动启动 快捷键winr----输入shell:startup---打开启动文档夹 把bat文件复…

春秋云镜 CVE-2018-16283

春秋云镜 CVE-2018-16283 WordPress Plugin Wechat Broadcast LFI 靶标介绍 WordPress Plugin Wechat Broadcast LFI 启动场景 漏洞利用 exp # Exploit Title: WordPress Plugin Wechat Broadcast 1.2.0 - Local File Inclusion # Author: Manuel Garcia Cardenas # Date:…

Spring boot 第一个程序

新建工程 选择spring-boot版本 右键创建类TestController&#xff1a; 代码如下&#xff1a; package com.example.demo; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springf…

Empowering Long-tail Item Recommendation through Cross Decoupling Network (CDN)

Empowering Long-tail Item Recommendation through Cross Decoupling Network (CDN) 来源&#xff1a; KDD’2023Google Research 文章目录 Empowering Long-tail Item Recommendation through Cross Decoupling Network (CDN)长尾问题分析CDNItem Memorization and General…

是否在业务中使用大语言模型?

ChatGPT取得了巨大的成功&#xff0c;在短短一个月内就获得了1亿用户&#xff0c;并激发了企业和专业人士对如何在他们的组织中利用这一工具的兴趣和好奇心。 但LLM究竟是什么&#xff0c;它们如何使你的企业受益?它只是一种炒作&#xff0c;还是会长期存在? 在这篇文章中我…

从零开始的Hadoop学习(六)| HDFS读写流程、NN和2NN工作机制、DataNode工作机制

1. HDFS的读写流程&#xff08;面试重点&#xff09; 1.1 HDFS写数据流程 1.1.1 剖析文件写入 &#xff08;1&#xff09;客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件&#xff0c;NameNode检查目标文件是否已存在&#xff0c;父目录是否存在。 &#x…

springsecurity+oauth 分布式认证授权笔记总结12

一 springsecurity实现权限认证的笔记 1.1 springsecurity的作用 springsecurity两大核心功能是认证和授权&#xff0c;通过usernamepasswordAuthenticationFilter进行认证&#xff1b;通过filtersecurityintercepter进行授权。springsecurity其实多个filter过滤链进行过滤。…

Google Services Framework 谷歌服务框架的安装以及遇到的常见问题

安装谷歌三件套&#xff1a; 1、Google 服务框架&#xff08;Google Services Framework&#xff09;下载地址&#xff1a; https://www.apkmirror.com/apk/google-inc/google-services-framework/ 注意一定要选择与自己手机对应的安卓系统版本的服务框架。 2、Google Play Se…

Java设计模式:四、行为型模式-05:备忘录模式

文章目录 一、定义&#xff1a;备忘录模式二、模拟场景&#xff1a;备忘录模式三、改善代码&#xff1a;备忘录模式3.1 工程结构3.2 备忘录模式模型结构图3.3 备忘录模式定义3.3.1 配置信息类3.3.2 备忘录类3.3.3 记录者类3.3.4 管理员类 3.4 单元测试 四、总结&#xff1a;备忘…

超全的数据可视化大屏设计组件库 sketch格式

随着大屏可视化设计需求的发展&#xff0c;可视化sketch矢量素材变得越来越受欢迎&#xff0c;它可以为设计师提供丰富的设计元素&#xff0c;帮助他们更高效更快速的完成设计任务。 大屏可视化sketch数量素材是B端可视化设计师们最佳设计资源&#xff0c;它可以帮助设计师轻松…

React 18 在组件间共享状态

参考文章 在组件间共享状态 有时候&#xff0c;希望两个组件的状态始终同步更改。要实现这一点&#xff0c;可以将相关 state 从这两个组件上移除&#xff0c;并把 state 放到它们的公共父级&#xff0c;再通过 props 将 state 传递给这两个组件。这被称为“状态提升”&#…

Socket交互的基本流程?

TCP socket通信过程图 什么是网络编程&#xff0c;网络编程就是编写程序使两台连联网的计算机相互交换数据。怎么交换数据呢&#xff1f;操作系统提供了“套接字”&#xff08;socket&#xff09;的组件我们基于这个组件进行网络通信开发。tcp套接字工作流程都以“打电话”来生…

python异常

一.什么是异常 异常是一个事件&#xff0c;该事件会在程序执行过程中发生&#xff0c;会影响程序的正常运行。一般情况下&#xff0c;python无法正常处理一个异常&#xff0c;会导致程序中断。在出现异常时&#xff0c;为了不影响程序的正常运行&#xff0c;我们需要捕获异常。…

HTML+JavaScript+CSS DIY 分隔条splitter

一、需求分析 现在电脑的屏幕越来越大&#xff0c;为了利用好宽屏&#xff0c;我们在设计系统UI时喜欢在左侧放个菜单或选项面板&#xff0c;在右边显示与菜单或选项对应的内容&#xff0c;两者之间用分隔条splitter来间隔&#xff0c;并可以通过拖动分隔条splitter来动态调研…

vue3 ref reactive响应式数据 赋值的问题

文章目录 vue3 ref reactive响应式数据 赋值的问题场景1:将响应式数据赋值请求后的数据错误示范&#xff1a;直接赋值正确写法 场景2&#xff1a;响应式数据解构之后失去响应式原因分析解决办法 toRefs/toRef方法创建ref引用对象 vue3 ref reactive响应式数据 赋值的问题 doing…

DBeaver 23.1.5 发布

导读DBeaver 是一个免费开源的通用数据库工具&#xff0c;适用于开发人员和数据库管理员。DBeaver 23.1.5 现已发布&#xff0c;更新内容如下. Data editor 重新设计了词典查看器面板 UI 空间数据类型&#xff1a;曲线几何线性化已修复 数据保存时结果选项卡关闭的问题已解决…

gitee上传本地项目bug

&#x1f92e;这个破bug不知道浪费了多长时间&#xff0c;以前没有记录&#xff0c;每次都忘记&#xff0c;这次记下来 问题描述 gitee创建仓库&#xff0c;然后根据它提示的如下命令&#xff0c;但一直报错 原因分析&#xff1a; 把命令复制出来&#xff0c;粘贴到Sublime …

vscode 清除全部的console.log

在放页面的大文件夹view上面右键点击在文件夹中查找 console.log.*$ 注意&#xff1a;要选择使用正则匹配 替换为 " " (空字符串)