Netty系列-2 NioServerSocketChannel和NioSocketChannel介绍

背景

本文介绍Netty的通道组件NioServerSocketChannel和NioSocketChannel,从源码的角度介绍其实现原理。

1.NioServerSocketChannel

Netty本质是对NIO的封装和增强,因此Netty框架中必然包含了对于ServerSocketChannel的构建、配置以及向选择器注册,如下所示:

// 创建ServerSocketChannel对象
ServerSocketChannel serverSocketChannel = SelectorProvider.provider().openServerSocketChannel();// ServerSocketChannel通道设置为非阻塞
serverSocketChannel.configureBlocking(false);// 将ServerSocketChannel通道注册至选择器
serverSocketChannel.register(Selector, opts, attachment);// 接收客户端连接得到SocketChannel通道
SocketChannel socketChannel = serverSocketChannel.accept();

其中的构建和配置过程发生在NioServerSocketChannel的实例化过程。

1.1 NioServerSocketChannel构造函数

NioServerSocketChannel实例化过程包含了对serverSocketChannel的创建以及配置

Netty启动时,通过反射调用NioServerSocketChannel的无参构造函数创建NioServerSocketChannel对象.

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

DEFAULT_SELECTOR_PROVIDER是Provider对象,用于创建通道和选择器,newSocket方法返回一个ServerSocketChannel对象,如下所示:

private static ServerSocketChannel newSocket(SelectorProvider provider) {try {return provider.openServerSocketChannel();} catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);}
}

NioServerSocketChannel中还维护了一个config对象用于储存该通道相关的配置,后续通过通道对象的config()方法获取该config对象。
继续调用父类的构造方法:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;try {ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {logger.warn("Failed to close a partially initialized socket.", e2);}throw new ChannelException("Failed to enter non-blocking mode.", e);}
}// super(parent)内容如下:
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();
}

因此NioServerSocketChannel中包含如下属性:
[1] SelectableChannel ch:实际为ServerSocketChannel类型,即NIO中的服务端通道类型,并将其配置为非阻塞类型,以便后续向选择器注册;
[2] int readInterestOp: 值固定为SelectionKey.OP_ACCEPT,表示仅处理连接事件;
[3] pipeline: Netty的Pipeline组件,每个channel都有一个属于自己的Pipeline对象;
[4] unsafe: 对底层IO进行了封装,实际的读写操作在该类中进行处理;
[5] 其他: id唯一ID标识,parent固定为空。

1.2 NioServerSocketChannel注册

NioServerSocketChannel包含了ServerSocketChannel对象,向选择器注册NioServerSocketChannel本质是将ServerSocketChannel注册到选择器

在Netty启动流程流程中,依次构造ServerSocketChannel, 并注册到选择器上,具体逻辑为:

// NioServerSocketChannel的父类AbstractNioChannel中
// 删除try-catch异常逻辑
protected void doRegister() throws Exception {boolean selected = false;for (;;) {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;}
}

其中: javaChannel()获取NioServerSocketChannel对象的ServerSocketChannel属性;eventLoop().unwrappedSelector()为NioEventLoop这个线程绑定的选择器;此处的this表明将ServerSocketChannel注册到选择器上时,将当前的NioServerSocketChannel对象作为attachment保存到SelectionKey中,并使用volatile SelectionKey selectionKey;属性保存了注册结果。

说明:后续选择器会执行select而阻塞,当该选择器被IO事件唤醒时,可通过SelectionKey的attachment获取NioServerSocketChannel对象,从而可以获取包括ServerSocketChannel、Pipeline、Config等其他所有相关信息。

1.3 NioServerSocketChannel处理连接

章节1.1中提到了NioServerSocketChannel的unsafe属性,unsafe用于封装底层具体的IO行为,具体的实现类为NioMessageUnsafe.

当有连接请求到达NioServerSocketChannel后,进入NioMessageUnsafe的read()方法中(详细的调用流程和线程处理关系在后续Netty的消息处理流程中介绍, 这里仅对read方法实现逻辑进行说明),read方法省去内存分配优化策略以及异常处理逻辑后的主线逻辑如下:

private final class NioMessageUnsafe extends AbstractNioUnsafe {private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {// ...final ChannelPipeline pipeline = pipeline();do {// ...doReadMessages(readBuf);} while (allocHandle.continueReading());int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();pipeline.fireChannelReadComplete();}
}

readBuf是一个列表类型,用于存放解析后的消息对象,解析完成后,依次遍历readBuf,并调用pipeline.fireChannelRead将消息对象发送至Netty的Pipeline组件(后面单独介绍)。

解析逻辑在doReadMessages方法中:

protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;
}// SocketUtils.accept(javaChannel())代码逻辑:
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {// 删除try-catch异常逻辑return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {@Overridepublic SocketChannel run() throws IOException {return serverSocketChannel.accept();}});
}

javaChannel()得到ServerSocketChannel对象,serverSocketChannel.accept()得到客户端通道对象SocketChannel。将当前服务端通道NioServerSocketChannel对象和得到的客户端通道对象SocketChannel作为参数构造NioSocketChannel对象。

2.NioSocketChannel

与NioServerSocketChannel相似,NioSocketChannel也是Netty对NIO中ServerSocketChannel的封装和增强。本章节内容将包含SocketChannel的构建、配置、向选择器注册以及读取数据,如下所示:

// 得到SocketChannel对象
SocketChannel socketChannel = serverSocketChannel.accept();// SocketChannel通道设置为非阻塞
socketChannel.configureBlocking(false);// 将SocketChannel通道注册至选择器
socketChannel.register(Selector, opts, attachment);// 从SocketChannel通道读取数据值缓冲区
socketChannel.read(ByteBuffer)

2.1 NioSocketChannel构造函数

每个客户端连接对应一个通道,即一个NioSocketChannel对象。

Netty收到客户端连接时,会调用NioSocketChannel构造函数创建通道对象,如下所示:

public NioSocketChannel(Channel parent, SocketChannel socket) {super(parent, socket);config = new NioSocketChannelConfig(this, socket.socket());
}

parent为NioServerSocketChannel对象,socket为NIO中SocketChannel对象。NioSocketChannel与NioServerSocketChannel相似,维持了一个config配置类用于存放和读取通道的配置信息。
继续沿着super调用父类的构造方法:

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {super(parent, ch, SelectionKey.OP_READ);
}protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;try {ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {logger.warn("Failed to close a partially initialized socket.", e2);}throw new ChannelException("Failed to enter non-blocking mode.", e);}
}protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();
}

上述构造过程逻辑较为简单,为NioSocketChannel创建一个Unsafe对象和Pipeline对象;以及将ch属性即SocketChannel设置为非阻塞。

2.2 注册选择器

NioServerSocketChannel接收客户端连接构造出NioSocketChannel对象,并通过Pipeline.fireChannelRead触发Inbound读事件后,通过Pipiline进入ServerBootstrapAcceptor处理器的channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;// ...childGroup.register(child).addListener(new ChannelFutureListener() {//...});
}

由章节1可知msg消息为NioSocketChannel,childGroup为线程池NioEventLoopGroup对象(workgroup)。
childGroup.register(child)表示将NioSocketChannel注册到workgroup的一个线程中,经过Unsafe对象最终会进入NioSocketChannel的doRegister方法:

@Override
protected void doRegister() throws Exception {// ...selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);// ...
}

javaChannel()为NioSocketChannel的ch属性,即SocketChannel通道对象;eventLoop().unwrappedSelector()为选择器;this为NioSocketChannel对象本身;返回的SelectionKey也作为属性保存在NioSocketChannel类中。
说明:后续选择器会执行select而阻塞,当有可读消息到达时被唤醒。可通过SelectionKey得到NioSocketChannel对象,从而得到相关的SocketChannel、Pipeline、Config等其他所有相关信息。

2.3 读取消息

当有可读时间到达时,NioEvetLoop会从阻塞中被唤醒,从而执行processSelectedKeys处理IO事件:

private void processSelectedKeys() {// ...processSelectedKeysOptimized();// ...
}private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];selectedKeys.keys[i] = null;final Object a = k.attachment();processSelectedKey(k, (AbstractNioChannel) a);}
}

遍历已就绪的IO事件,调用processSelectedKey方法处理,此时k为NIO的SelectionKey对象,而attachment为NioSocketChannel对象。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();int readyOps = k.readyOps();//...if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}// ...
}

根据SelectionKey和NioSocketChannel对象的readyOps确定此时IO事件为可读消息,进入unsafe.read():

@Override
public final void read() {final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();ByteBuf byteBuf = null;boolean close = false;// ...do {// ...// 1.分配ButeBuf缓冲对象byteBuf = allocHandle.allocate(allocator);// 2.将数据读取到ButeBuf缓冲对象allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {byteBuf.release();byteBuf = null;break;}readPending = false;// 3.向Pipeline传递可读消息pipeline.fireChannelRead(byteBuf);byteBuf = null;// 直到读取完所有消息内容} while (allocHandle.continueReading());// ...// 触发消息读取完成事件pipeline.fireChannelReadComplete();// ...
}

代码较为清晰,重点包含3个步骤:创建ByteBuf缓冲对象(Netty自定义的,而非NIO的ByteBuffer); 将消息读取到ButeBuf对象,向Pipeline触发可读事件(在Pipeline的Handler中传递并处理消息);其中,核心逻辑在于doReadBytes(byteBuf):

@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {// ...return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

javaChannel()是NIO的SocketChannel对象,继续跟进ByteBuf的writeBytes方法进入:

@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {//...int writtenBytes = setBytes(writerIndex, in, length);//...return writtenBytes;
}@Override
public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {try {return in.read(internalNioBuffer(index, length));} catch (ClosedChannelException ignored) {return -1;}
}

可以看到底层逻辑在于in.read(internalNioBuffer(index, length)), 返回一个ByteBuffer对象,in此时为SocketChannel, 即本质是调用NIO通道的API将数据读取至缓冲区: SocketChannel.read(ByteBuffer).

2.3 响应消息

Netty中Pipeline的任何一个Handler中都可以发送响应消息,响应消息也会沿着Pipeline的流水线传递,并经过网卡传递出去:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.writeAndFlush("hello");
}

注意:需要在此Handler前添加StringEncoder编码器,将String类型转为ByteBuf类型,否则会抛出异常。因为NioSocketChannel的Unsafe对象也维持在了Pipeline的HeadContext对象中,所有的消息最终会经过Unsafe的write方法,而Unsafe只会处理ByteBuf类型消息,其他类型会抛出异常。

追踪ctx.writeAndFlush("hello")进入invokeWriteAndFlush方法:

void invokeWriteAndFlush(Object msg, ChannelPromise promise) {// ...invokeWrite0(msg, promise);invokeFlush0();// ...
}

依次调用invokeWrite0和invokeFlush0实现写操作和刷盘操作, 分别进入Unsafe对象的write和flush方法:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {unsafe.write(msg, promise);
}public void flush(ChannelHandlerContext ctx) {unsafe.flush();
}

unsafe最终调用doWrite方法实现IO功能:

protected void doWrite(ChannelOutboundBuffer in) throws Exception {SocketChannel ch = javaChannel();int writeSpinCount = config().getWriteSpinCount();do {// ...			ByteBuffer buffer = nioBuffers[0];int attemptedBytes = buffer.remaining();final int localWrittenBytes = ch.write(buffer);--writeSpinCount;// ...					} while (writeSpinCount > 0);incompleteWrite(writeSpinCount < 0);
}

核心逻辑在与ch.write(buffer),其中ch和buffer分别是NIO的SocketChannel和ByteBuffer,
即Netty向客户端发送消息底层仍是借助NIO的API.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/878710.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

谈一谈JVM的GC(垃圾回收)

JVM&#xff08;Java Virtual Machine&#xff09;的GC&#xff08;Garbage Collection&#xff0c;垃圾回收&#xff09;是Java语言的一个重要特性&#xff0c;它负责自动管理内存&#xff0c;释放那些不再被使用的对象所占用的内存空间。以下是对JVM GC的详细介绍&#xff1a…

Python 全栈系列266 Kafka服务的Docker搭建

说明 在大量数据处理任务下的缓存与分发 这个算是来自顾同学的助攻1&#xff0c;我有点java绝缘体的体质&#xff0c;碰到和java相关的安装部署总会碰到点奇怪的问题&#xff0c;不过现在已经搞定了。测试也接近了kafka官方标称的性能。考虑到网络、消息的大小等因素&#xff0…

风机设计基础

目录 1、风机分类按气体出口压力&#xff08;或升压&#xff09;来进行风机分类按风机全压来进行分类 2、风机定律及效率、功率、压力计算风机轴功率与扭矩关系&#xff1a;风机全压、静压效率计算公式&#xff1a;全压、动压、静压计算公式&#xff1a; 3、风机噪声1、离散噪声…

修改jupyter notebook 默认浏览器(不动配置文件,改系统默认浏览器)

最开始把联想浏览器切到EDGE就是用的修改系统的默认浏览器。不知怎么的现在搜到的方法都是在说修改配置文件&#x1f613;。 不想动配置文件&#xff0c;平时对默认浏览器没有特殊要求的&#xff0c;可以用这个方法。 这里是把默认浏览器改成联想浏览器&#xff0c;电脑也是联…

macos 系统 降级, 重装, 升级图文教程

最近一不小心mac被升级到了最新版本, 在使用vscode的时候经常卡顿,于是有了降级macos的想法! 于是就有了这macos的系统降级 , 重装, 升级图文教程. 本文重点介绍macos降级, 重装过程, 至于升级, 这个一不小心就会被升级(通过应用商店)基本上都是自动升级的,所以不做更多说明. …

低代码平台赋能:烟花鞭炮企业数字化转型新篇章

随着数字化转型的浪潮席卷全球&#xff0c;传统制造业正面临着前所未有的变革机遇。烟花鞭炮行业&#xff0c;作为承载深厚文化底蕴与独特工艺的传统产业&#xff0c;也不例外。近年来&#xff0c;我国政府高度重视中小企业数字化转型&#xff0c;出台了一系列扶持政策&#xf…

基于大数据的电商平台电脑销售数据分析系统

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长 QQ 名片 :) 1. 项目简介 随着电子商务的蓬勃发展&#xff0c;各大电商平台积累了海量的商品数据。如何从这些数据中提取有价值的信息&#xff0c;对于商家来说至关重要。本项目利用网络爬虫技术从京东电商平台采集各类品牌…

春秋云镜(OpenSSH)·CVE-2023-51385

靶标介绍&#xff1a; OpenSSH 是使用 SSH 协议进行远程登录的连接工具。在OpenSSH 9.6版本之前的ssh中&#xff0c;如果用户名或主机名中含有shell元字符&#xff08;如 | "等&#xff09;&#xff0c;并且ssh_config中ProxyCommand、LocalCommand指令或"match exe…

Mybatis 基础知识

目录 一、简介 1、JDK&#xff1a; 2、JRE&#xff1a; 3、JVM&#xff1a; 4、Java SE&#xff1a; 5、Java EE&#xff1a; 6、持久层&#xff1a; 7、轻量级&#xff1a; 8、半自动化&#xff1a; 9、ORM&#xff1a; 10、框架&#xff1a; 二、三层架构 1、业…

数据结构-----双向链表

1.什么是内存泄漏&#xff1f; 主动申请到的内存空间没有进行内存释放&#xff0c;导致无内存空间可用。 2.双向链表&#xff1f; 双向链表&#xff08;Doubly Linked List&#xff09;是链表的一种&#xff0c;它允许我们在链表中的任何一个节点向前或 向后遍历。与单向链表…

display flex 的div 被子元素撑开不显示滚动条的一个解决demo

display flex 的div 被子元素撑开&#xff0c;不显示y轴滚动条的 一个解决demo。 注&#xff1a; 不一定适用所有人的的相同问题 less # less .contact {display: flex;flex-grow: 1;overflow: hidden auto;flex-direction: column;.contact-items {flex: 1 1 0;display: flex…

Python和Java及MATLAB和CUDA显微镜导图

&#x1f3af;要点 交互式设备控制和图像处理图像背景和阴影校正可视化萤光团位置算法和读取光学图像读写转换显微镜图像生物医学细胞图像分析荧光图像算法计算亮度数据和模拟表征新型染料和缓冲液强度估计细菌图像分析扫描透射和高分辨率透射图像模拟多模态成像分割可视化透射…

Hive服务部署及Datagrip工具使用

目录 Hive服务部署 Hiveserver2服务 1&#xff09;用户说明 2&#xff09;Hiveserver2部署 &#xff08;1&#xff09;Hadoop端配置 &#xff08;2&#xff09;Hive端配置 3&#xff09;测试 &#xff08;1&#xff09;启动Hiveserver2 &#xff08;2&#xff09;使用命…

深入学习电路基础:从理论到实践

引言 电路是电子学的核心&#xff0c;也是现代科技的基石。从简单的灯泡开关到复杂的计算机处理器&#xff0c;电路在各类电子设备中都起到了至关重要的作用。深入学习电路知识不仅有助于理解电子设备的工作原理&#xff0c;还能够为实际设计和开发电子产品打下坚实的基础。 …

flutter语法:var、late、const、final区别

var: 用于声明可变变量&#xff0c;支持类型推断并能多次赋值&#xff0c;但只能是同类型的数据赋值。之后其类型更改&#xff0c;会抛出异常。 var number 10;void main() {print(number); // 这将打印10。number 20; // 再次赋值&#xff0c;但必须同类型print(number); /…

某云彩SRM2.0任意文件下载漏洞

文章目录 免责申明搜索语法漏洞描述漏洞复现修复建议 免责申明 本文章仅供学习与交流&#xff0c;请勿用于非法用途&#xff0c;均由使用者本人负责&#xff0c;文章作者不为此承担任何责任 搜索语法 fofa icon_hash"1665918155"漏洞描述 某云采 SRM2.0是一款先…

扁平数组转化分类树

使用下列数组生成一个分类树&#xff08;数组中每项中的pid是父节点的id&#xff0c;pid:0表示顶层&#xff0c;pid:1&#xff0c;表示这个节点属于id为1的节点&#xff0c;children该节点的子节点数组&#xff09; const jsona [{"ID": 1,"CreatedAt": …

【计算机视觉前沿研究 热点 顶会】ECCV 2024中扩散模型有关的论文

神经辐射场修复的驯服潜在扩散模型 神经辐射场(NERF)是一种从多视角图像进行三维重建的表示法。尽管最近的一些工作表明&#xff0c;在编辑具有扩散先验的重建的 NERF 方面取得了初步成功&#xff0c;但他们仍然在努力在完全未覆盖的区域中合成合理的几何图形。一个主要原因是…

使用大型语言模型进行监督微调(SFT)

大型语言模型&#xff08;LLMs&#xff09;通常经过几个阶段的训练&#xff0c;包括预训练和几个微调阶段&#xff1b;请参见下文。尽管预训练很昂贵&#xff08;即需要数十万美元的计算成本&#xff09;&#xff0c;但相比之下&#xff0c;微调LLM&#xff08;或进行上下文学习…

应用商店优化(ASO)的四大误区

应用商店优化 (ASO) 是移动营销中最重要的部分之一&#xff0c;可以帮助开发人员吸引自然流量并在应用推广方面取得预期效果。近年来ASO优化在开发者中越来越受欢迎。虽然它已经证明了其有效性和对应用成功的影响力&#xff0c;但尽管如此仍然存在与ASO相关的误解&#xff0c;导…