Netty的基本架构与组件

Netty实战精髓

前言

Netty的组成部分
1、Channel
2、Callback
3、Future
ChannelFuture 提供多个附件方法来允许一个或者多个 ChannelFutureListener 实例,这个回调方法 operationComplete() 会在操作完成时调用。
4、Event和Handler

5、EventLOOP
Netty 通过触发事件从应用程序中抽象出 Selector,从而避免手写调度代码。EventLoop 分配给每个 Channel 来处理所有的事件,包括
注册感兴趣的事件
调度事件到 ChannelHandler
安排进一步行动

Netty的整体架构

在这里插入图片描述

网络通信层

网络通信层的核心组件包含BootStrap、ServerBootStrap、Channel三个组件。
Bootstrap 是“引导”的意思,它主要负责整个 Netty 程序的启动、初始化、服务器连接等过程,它相当于一条主线,串联了 Netty 的其他核心组件。
一个为用于客户端引导的 Bootstrap,另一个为用于服务端引导的 ServerBootStrap,它们都继承自抽象类 AbstractBootstrap。
Bootstrap 和 ServerBootStrap 十分相似,两者非常重要的区别在于 Bootstrap 可用于连接远端服务器,只绑定一个 EventLoopGroup。而 ServerBootStrap 则用于服务端启动绑定本地端口,会绑定两个 EventLoopGroup,这两个 EventLoopGroup 通常称为 Boss 和 Worker。

Channel

Channel 的字面意思是“通道”,它是网络通信的载体。抽象了JDK的NIO,屏蔽了底层Socket的复杂性。

Channel的实现类有:
NioServerSocketChannel 异步 TCP 服务端。
NioSocketChannel 异步 TCP 客户端。
OioServerSocketChannel 同步 TCP 服务端。
OioSocketChannel 同步 TCP 客户端。
NioDatagramChannel 异步 UDP 连接。
OioDatagramChannel 同步 UDP 连接。

事件调度层

事件调度层的职责是通过 Reactor 线程模型对各类事件进行聚合处理,通过 Selector 主循环线程集成多种事件( I/O 事件、信号事件、定时事件等),实际的业务处理逻辑是交由服务编排层中相关的 Handler 完成。
事件调度层的核心组件包括 EventLoopGroup、EventLoop。
EventLoopGroup、EventLoop、Channel 的几点关系。
1、一个 EventLoopGroup 往往包含一个或者多个 EventLoop。EventLoop 用于处理 Channel 生命周期内的所有 I/O 事件,如 accept、connect、read、write 等 I/O 事件。
2、EventLoop 同一时间会与一个线程绑定,每个 EventLoop 负责处理多个 Channel。
3、每新建一个 Channel,EventLoopGroup 会选择一个 EventLoop 与其绑定。该 Channel 在生命周期内都可以对 EventLoop 进行多次绑定和解绑。
EventLoop 的实现有NioEventLoopGroup

服务编排层

服务编排层的职责是负责组装各类服务,它是 Netty 的核心处理链,用以实现网络事件的动态编排和有序传播。

服务编排层的核心组件包括 ChannelPipeline、ChannelHandler、ChannelHandlerContext。

ChannelPipeline

ChannelPipeline 是 Netty 的核心编排组件,负责组装各种 ChannelHandler,实际数据的编解码以及加工处理操作都是由 ChannelHandler 完成的。ChannelPipeline 可以理解为ChannelHandler 的实例列表——内部通过双向链表将不同的 ChannelHandler 链接在一起。当 I/O 读写事件触发时,ChannelPipeline 会依次调用 ChannelHandler 列表对 Channel 的数据进行拦截和处理。

ChannelPipeline 是线程安全的,因为每一个新的 Channel 都会对应绑定一个新的 ChannelPipeline。一个 ChannelPipeline 关联一个 EventLoop,一个 EventLoop 仅会绑定一个线程。

ChannelHandler & ChannelHandlerContext

在介绍 ChannelPipeline 的过程中,想必你已经对 ChannelHandler 有了基本的概念,数据的编解码工作以及其他转换工作实际都是通过 ChannelHandler 处理的。站在开发者的角度,最需要关注的就是 ChannelHandler,我们很少会直接操作 Channel,都是通过 ChannelHandler 间接完成。

总结

在这里插入图片描述
这些组件是怎么协作的呢?

  • 服务端启动初始化 Boss EventLoopGroup 和 Worker EventLoopGroup 两个组件,其中Boss负责监听网络连接事件。当有新的网络事件到达时,则将Channel注册到Worker EventLoopGroup。
  • Worker EventLoopGroup 会被分配到一个EventLoop负责处理Chanel的读写事件。每个 EventLoop 都是单线程 的,通过 Selector 进行事件循环。
  • 当客户端发起I/O读写事件时,服务端 EventLoop 会进行数据的读取,通过 Pipeline 触发各种监听器进行数据的加工处理。
  • 客户端数据会被传递到 ChannelPipeline 的第一个 ChannelBoundHandler 中,数据处理完成后,将加工完成的数据传递到下一个 ChannelboundHandler.
  • 当数据写回客户端时,会将处理结果在ChannelPipeline的 ChannelOutboundHandler 中传播,最后到达客户端。

实例代码

服务端:

public class HttpServer {public void start(int port) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("codec", new HttpServerCodec()).addLast("compressor", new HttpContentCompressor()).addLast("aggregator", new HttpObjectAggregator(65536)).addLast("handler", new HttpServerHandler());}}).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture sync = b.bind().sync();System.out.println("port:"+port);sync.channel().closeFuture().sync();}public static void main(String[] args) throws InterruptedException {new HttpServer().start(8080);}
}

服务端处理Handler

package Netty2;import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {String content = String.format("Receive http request, uri: %s, method: %s, content: %s%n", msg.uri(), msg.method(), msg.content().toString(CharsetUtil.UTF_8));FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(content.getBytes()));ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}
}

客户端:

package Netty;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;import java.net.InetSocketAddress;public class EchoClient {private final String host;private final int port;public EchoClient(String host, int port) {this.host = host;this.port = port;}public void start() throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port)).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new EchoClientHandler());}});ChannelFuture f = b.connect().sync();f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}}public static void main(String[] args) throws Exception {final String host = "localhost";final int port = 8080;new EchoClient(host, port).start();}
}

客户端处理Handler

package Netty;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;@ChannelHandler.Sharable                                //1
public class EchoClientHandler extendsSimpleChannelInboundHandler<ByteBuf> {@Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",CharsetUtil.UTF_8));}@Overridepublic void channelRead0(ChannelHandlerContext ctx,ByteBuf in) {System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) {                    //4cause.printStackTrace();ctx.close();}
}

Netty实现EventLoop

EventLoop是Netty的调度中心,负责监听多种事件类型
EventLoop的处理结构
在这里插入图片描述

1、BossEventLoopGroup 和 WorkerEventLoopGroup 包含一个或者多个 NioEventLoop。BossEventLoopGroup 负责监听客户端的 Accept 事件,当事件触发时,将事件注册至 WorkerEventLoopGroup 中的一个 NioEventLoop 上。每新建一个 Channel, 只选择一个 NioEventLoop 与其绑定。所以说 Channel 生命周期的所有事件处理都是线程独立的,不同的 NioEventLoop 线程之间不会发生任何交集。
2、NioEventLoop 完成数据读取后,会调用绑定的 ChannelPipeline 进行事件传播,ChannelPipeline 也是线程安全的,数据会被传递到 ChannelPipeline 的第一个 ChannelHandler 中。数据处理完成后,将加工完成的数据再传递给下一个 ChannelHandler,整个过程是串行化执行,不会发生线程上下文切换的问题。

核心源码

@Overrideprotected void run() {int selectCnt = 0;for (;;) {try {int strategy;try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT://检查线程池是否有可利用的资源long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try {if (!hasTasks()) {//事件轮询strategy = select(curDeadlineNanos);}} finally {// This update is just to help block unnecessary selector wakeups// so use of lazySet is ok (no race condition)nextWakeupNanos.lazySet(AWAKE);}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;//Netty 提供了一个参数 ioRatio,可以调整 I/O 事件处理和任务处理的时间比例final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {if (strategy > 0) {//事件处理processSelectedKeys();}} finally {// Ensure we always run tasks.ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks}if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}selectCnt = 0;} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)selectCnt = 0;}} catch (CancelledKeyException e) {// Harmless exception - log anywayif (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}} catch (Error e) {throw e;} catch (Throwable t) {handleLoopException(t);} finally {// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Error e) {throw e;} catch (Throwable t) {handleLoopException(t);}}}}

NioEventLoop 不仅负责处理 I/O 事件,还要兼顾执行任务队列中的任务。任务队列遵循 FIFO 规则,可以保证任务执行的公平性。NioEventLoop 处理的任务类型基本可以分为三类。

1、普通任务:通过 NioEventLoop 的 execute() 方法向任务队列 taskQueue 中添加任务。例如 Netty 在写数据时会封装 WriteAndFlushTask 提交给 taskQueue。taskQueue 的实现类是多生产者单消费者队列 MpscChunkedArrayQueue,在多线程并发添加任务时,可以保证线程安全。

2、 定时任务:通过调用 NioEventLoop 的 schedule() 方法向定时任务队列 scheduledTaskQueue 添加一个定时任务,用于周期性执行该任务。例如,心跳消息发送等。定时任务队列 scheduledTaskQueue 采用优先队列 PriorityQueue 实现。

3、尾部队列:tailTasks 相比于普通任务队列优先级较低,在每次执行完 taskQueue 中任务后会去获取尾部队列中任务执行。尾部任务并不常用,主要用于做一些收尾工作,例如统计事件循环的执行时间、监控信息上报等。

Pipeline

整体结构

EventLoop是调度中心,但实际的业务逻辑会交给ChannelPipeline所定义的ChannelHandler完成。

首先ChannelPipeline与ChannelHandler是什么关系呢?
实际上,ChannelPipline可以看作是ChannelHandler 的容器载体,内部通过双向链表将不同的ChannelHandler链接在一起。
当有IO读写事件触发时,ChannelPipeline会依次调用ChannelHandler列表对Channel的数据进行拦截处理。
在这里插入图片描述

由图可知,每个Chanel会绑定一个ChannelPipeline,每个ChannelPipeline都包含多个ChannelHandlerContext,所有ChannelHandlerContext构成一个双向链表。
ChannelHandlerContext内部封装着ChannelHandler,同时保存着ChannleHandler 的上下文。如 connect、bind、read、flush、write、close 等

根据网络数据的流向,ChannelPipeline 分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器。
数据先由一系列 InboundHandler 处理后入站,然后再由相反方向的 OutboundHandler 处理完成后出站

ChannelPipeline 的双向链表分别维护了 HeadContext 和 TailContext 的头尾节点。我们自定义的 ChannelHandler 会插入到 Head 和 Tail 之间,这两个节点在 Netty 中已经默认实现了。

HeadContext 既是 Inbound 处理器,也是 Outbound 处理器。它分别实现了 ChannelInboundHandler 和 ChannelOutboundHandler。网络数据写入操作的入口就是由 HeadContext 节点完成的。HeadContext 作为 Pipeline 的头结点负责读取数据并开始传递 InBound 事件,当数据处理完成后,数据会反方向经过 Outbound 处理器,最终传递到 HeadContext,所以 HeadContext 又是处理 Outbound 事件的最后一站。此外 HeadContext 在传递事件之前,还会执行一些前置操作。

TailContext 只实现了 ChannelInboundHandler 接口。它会在 ChannelInboundHandler 调用链路的最后一步执行,主要用于终止 Inbound 事件传播,例如释放 Message 数据资源等。TailContext 节点作为 OutBound 事件传播的第一站,仅仅是将 OutBound 事件传递给上一个节点。

事件传播机制

ChannelPipeline 可分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器,与此对应传输的事件类型可以分为Inbound 事件和Outbound 事件。

我们通过一个代码示例,一起体验下 ChannelPipeline 的事件传播机制。

serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new SampleInBoundHandler("SampleInBoundHandlerA", false)).addLast(new SampleInBoundHandler("SampleInBoundHandlerB", false)).addLast(new SampleInBoundHandler("SampleInBoundHandlerC", true));ch.pipeline().addLast(new SampleOutBoundHandler("SampleOutBoundHandlerA")).addLast(new SampleOutBoundHandler("SampleOutBoundHandlerB")).addLast(new SampleOutBoundHandler("SampleOutBoundHandlerC"));}}public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {private final String name;private final boolean flush;public SampleInBoundHandler(String name, boolean flush) {this.name = name;this.flush = flush;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("InBoundHandler: " + name);if (flush) {ctx.channel().writeAndFlush(msg);} else {super.channelRead(ctx, msg);}}}
public class SampleOutBoundHandler extends ChannelOutboundHandlerAdapter {private final String name;public SampleOutBoundHandler(String name) {this.name = name;}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutBoundHandler: " + name);super.write(ctx, msg, promise);}}

通过 Pipeline 的 addLast 方法分别添加了三个 InboundHandler 和 OutboundHandler,添加顺序都是 A -> B -> C,下图可以表示初始化后 ChannelPipeline 的内部结构。
在这里插入图片描述

当客户端向服务端发送请求时,会触发 SampleInBoundHandler 调用链的 channelRead 事件。经过 SampleInBoundHandler 调用链处理完成后,在 SampleInBoundHandlerC 中会调用 writeAndFlush 方法向客户端写回数据,此时会触发 SampleOutBoundHandler 调用链的 write 事件。

由此可见,Inbound 事件和 Outbound 事件的传播方向是不一样的。Inbound 事件的传播方向为 Head -> Tail,而 Outbound 事件传播方向是 Tail -> Head,两者恰恰相反。在 Netty 应用编程中一定要理清楚事件传播的顺序。推荐你在系统设计时模拟客户端和服务端的场景画出 ChannelPipeline 的内部结构图,以避免搞混调用关系。

异常传播机制

ChannelPipeline 事件传播的实现采用了经典的责任链模式,调用链路环环相扣。那么如果有一个节点处理逻辑异常会出现什么现象呢?

异常按顺序从 Head 节点传播到 Tail 节点。如果用户没有对异常进行拦截处理,最后将由 Tail 节点统一处理,在 TailContext 源码中可以找到具体实现:

protected void onUnhandledInboundException(Throwable cause) {try {logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +"It usually means the last handler in the pipeline did not handle the exception.",cause);} finally {ReferenceCountUtil.release(cause);}}

虽然 Netty 中 TailContext 提供了兜底的异常处理逻辑,但是在很多场景下,并不能满足我们的需求。假如你需要拦截指定的异常类型,并做出相应的异常处理,应该如何实现呢?我们接着往下看。

异常处理的最佳实践

过异常传播机制的学习,我们应该可以想到最好的方法是在 ChannelPipeline 自定义处理器的末端添加统一的异常处理器,此时 ChannelPipeline 的内部结构如下图所示。

在这里插入图片描述

用户自定义的异常处理器代码示例如下:

public class ExceptionHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {if (cause instanceof RuntimeException) {System.out.println("Handle Business Exception Success.");}}}

加入统一的异常处理器后,可以看到异常已经被优雅地拦截并处理掉了。这也是 Netty 推荐的最佳异常处理实践。

总结

ChannelPipeline 是双向链表结构,包含 ChannelInboundHandler 和 ChannelOutboundHandler 两种处理器。
ChannelHandlerContext 是对 ChannelHandler 的封装,每个 ChannelHandler 都对应一个 ChannelHandlerContext,实际上 ChannelPipeline 维护的是与 ChannelHandlerContext 的关系。
Inbound 事件和 Outbound 事件的传播方向相反,Inbound 事件的传播方向为 Head -> Tail,而 Outbound 事件传播方向是 Tail -> Head。
异常事件的处理顺序与 ChannelHandler 的添加顺序相同,会依次向后传播,与 Inbound 事件和 Outbound 事件无关。

粘包拆包问题:如何获取一个完整的网络包?

粘包/拆包的物种现象

在这里插入图片描述

  • 服务端恰巧读到了两个完整的数据包 A 和 B,没有出现拆包/粘包问题;
  • 服务端接收到 A 和 B 粘在一起的数据包,服务端需要解析出 A 和 B;
  • 服务端收到完整的 A 和 B 的一部分数据包 B-1,服务端需要解析出完整的 A,并等待读取完整的 B 数据包;
  • 服务端接收到 A 的一部分数据包 A-1,此时需要等待接收到完整的 A 数据包;
  • 数据包 A 较大,服务端需要多次才可以接收完数据包 A。

由于拆包/粘包问题的存在,数据接收方很难界定数据包的边界在哪里,很难识别出一个完整的数据包。所以需要提供一种机制来识别数据包的界限,这也是解决拆包/粘包的唯一方法:定义应用层的通信协议。下面我们一起看下主流协议的解决方案。

主流解决方案:
1、消息长度固定
2、特定分隔符 --》Redis采用换行符
3、消息长度 + 消息内容–》主流方案,dubbo,rockerMQ

Netty实现自定义协议

首先我们看下 Netty 中编解码器是如何分类的。

Netty 常用编码器类型:
MessageToByteEncoder 对象编码成字节流;
MessageToMessageEncoder 一种消息类型编码成另外一种消息类型。

Netty 常用解码器类型:

ByteToMessageDecoder/ReplayingDecoder 将字节流解码为消息对象;
MessageToMessageDecoder 将一种消息类型解码为另外一种消息类型。

编解码器可以分为一次解码器和二次解码器,一次解码器用于解决 TCP 拆包/粘包问题,按协议解析后得到的字节数据。如果你需要对解析后的字节数据做对象模型的转换,这时候便需要用到二次解码器,同理编码器的过程是反过来的。

一次编解码器:MessageToByteEncoder/ByteToMessageDecoder。
二次编解码器:MessageToMessageEncoder/MessageToMessageDecoder。

需要编码类实际上再ChannelOutBoundHandler的抽象类实现,具体操作时OutBound出站数据。

同样,解码类是 ChanneInboundHandler 的抽象类实现,操作的是 Inbound 入站数据。解码器实现的难度要远大于编码器,因为解码器需要考虑拆包/粘包问题。由于接收方有可能没有接收到完整的消息,所以解码框架需要对入站的数据做缓冲操作,直至获取到完整的消息。

Netty 提供了很多开箱即用的解码器,这些解码器基本覆盖了 TCP 拆包/粘包的通用解决方案。

固定长度解码器 FixedLengthFrameDecoder

固定长度解码器 FixedLengthFrameDecoder 非常简单,直接通过构造函数设置固定长度的大小 frameLength,无论接收方一次获取多大的数据,都会严格按照 frameLength 进行解码。如果累积读取到长度大小为 frameLength 的消息,那么解码器认为已经获取到了一个完整的消息。如果消息长度小于 frameLength,FixedLengthFrameDecoder 解码器会一直等后续数据包的到达,直至获得完整的消息。

特殊分隔符解码器 DelimiterBasedFrameDecoder

使用特殊分隔符解码器 DelimiterBasedFrameDecoder 之前我们需要了解以下几个属性的作用。
delimiters
delimiters 指定特殊分隔符,通过写入 ByteBuf 作为参数传入。delimiters 的类型是 ByteBuf 数组,所以我们可以同时指定多个分隔符,但是最终会选择长度最短的分隔符进行消息拆分。
maxLength
maxLength 是报文最大长度的限制。如果超过 maxLength 还没有检测到指定分隔符,将会抛出 TooLongFrameException。可以说 maxLength 是对程序在极端情况下的一种保护措施。

failFast
failFast 与 maxLength 需要搭配使用,通过设置 failFast 可以控制抛出 TooLongFrameException 的时机,可以说 Netty 在细节上考虑得面面俱到。如果 failFast=true,那么在超出 maxLength 会立即抛出 TooLongFrameException,不再继续进行解码。如果 failFast=false,那么会等到解码出一个完整的消息后才会抛出 TooLongFrameException。
stripDelimiter
stripDelimiter 的作用是判断解码后得到的消息是否去除分隔符。如果 stripDelimiter=false,特定分隔符为 \n,那么上述数据包解码出的结果为:

长度域解码器 LengthFieldBasedFrameDecoder

长度域解码器 LengthFieldBasedFrameDecoder 是解决 TCP 拆包/粘包问题最常用的解码器。它基本上可以覆盖大部分基于长度拆包场景,开源消息中间件 RocketMQ 就是使用 LengthFieldBasedFrameDecoder 进行解码的。

首先我们同样先了解 LengthFieldBasedFrameDecoder 中的几个重要属性,这里我主要把它们分为两个部分:长度域解码器特有属性以及与其他解码器(如特定分隔符解码器)的相似的属性。

长度域解码器特有属性。

// 长度字段的偏移量,也就是存放长度数据的起始位置
private final int lengthFieldOffset; 
// 长度字段所占用的字节数
private final int lengthFieldLength; 
/** 消息长度的修正值* 在很多较为复杂一些的协议设计中,长度域不仅仅包含消息的长度,而且包含其他的数据,如版本号、数据类型、数据状态等,那么这时候我们需要使用 lengthAdjustment 进行修正* lengthAdjustment = 包体的长度值 - 长度域的值*/
private final int lengthAdjustment; 
// 解码后需要跳过的初始字节数,也就是消息内容字段的起始位置
private final int initialBytesToStrip;
// 长度字段结束的偏移量,lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength
private final int lengthFieldEndOffset;

与固定长度解码器和特定分隔符解码器相似的属性。

private final int maxFrameLength; // 报文最大限制长度
private final boolean failFast; // 是否立即抛出 TooLongFrameException,与 maxFrameLength 搭配使用
private boolean discardingTooLongFrame; // 是否处于丢弃模式
private long tooLongFrameLength; // 需要丢弃的字节数
private long bytesToDiscard; // 累计丢弃的字节数

在 Netty LengthFieldBasedFrameDecoder 源码的注释中已经描述得非常详细,一共给出了 7 个场景示例,理解了这些示例基本上可以真正掌握 LengthFieldBasedFrameDecoder 的参数用法。

数据传输:writeAndFlush 处理流程剖析

源码查看链路:

ctx.channel().writeAndFlush(response);
点进去
DefaultChannelPipeline.java
@Override
public final ChannelFuture writeAndFlush(Object msg) {return tail.writeAndFlush(msg);
}@Overridepublic ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {write(msg, true, promise);return promise;}next.invokeWriteAndFlush(m, promise);void invokeWriteAndFlush(Object msg, ChannelPromise promise) {if (invokeHandler()) {invokeWrite0(msg, promise);invokeFlush0();} else {writeAndFlush(msg, promise);}}

可以总结以下三点:
writeAndFlush 属于出站操作,它是从 Pipeline 的 Tail 节点开始进行事件传播,一直向前传播到 Head 节点。不管在 write 还是 flush 过程,Head 节点都中扮演着重要的角色。
write 方法并没有将数据写入 Socket 缓冲区,只是将数据写入到 ChannelOutboundBuffer 缓存中,ChannelOutboundBuffer 缓存内部是由单向链表实现的。
flush 方法才最终将数据写入到 Socket 缓冲区。

零拷贝技术

发展历程

传统拷贝技术
在这里插入图片描述

DMA(Direct Memory Access,直接内存存取)是现代大部分硬盘都支持的特性,DMA 接管了数据读写的工作,不需要 CPU 再参与 I/O 中断的处理,从而减轻了 CPU 的负担。

DMA 引擎从文件读取数据后放入到内核缓冲区,然后可以直接从内核缓冲区传输到 Socket 缓冲区,从而减少内存拷贝的次数。

Java 中也使用了零拷贝技术,它就是 NIO FileChannel 类中的 transferTo() 方法,transferTo() 底层就依赖了操作系统零拷贝的机制,它可以将数据从 FileChannel 直接传输到另外一个 Channel。transferTo() 方法的定义如下:

public abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException;

在使用了 FileChannel#transferTo() 传输数据之后,我们看下数据拷贝流程发生了哪些变化,如下图所示:

在这里插入图片描述

比较大的一个变化是,DMA 引擎从文件中读取数据拷贝到内核态缓冲区之后,由操作系统直接拷贝到 Socket 缓冲区,不再拷贝到用户态缓冲区,所以数据拷贝的次数从之前的 4 次减少到 3 次。

在 Linux 2.4 版本之后,开发者对 Socket Buffer 追加一些 Descriptor 信息来进一步减少内核数据的复制。如下图所示,DMA 引擎读取文件内容并拷贝到内核缓冲区,然后并没有再拷贝到 Socket 缓冲区,只是将数据的长度以及位置信息被追加到 Socket 缓冲区,然后 DMA 引擎根据这些描述信息,直接从内核缓冲区读取数据并传输到协议引擎中,从而消除最后一次 CPU 拷贝。

在这里插入图片描述

从 Linux 操作系统的角度来说,零拷贝就是为了避免用户态和内存态之间的数据拷贝。无论是传统的数据拷贝还是使用零拷贝技术,其中有 2 次 DMA 的数据拷贝必不可少,只是这 2 次 DMA 拷贝都是依赖硬件来完成,不需要 CPU 参与。所以,在这里我们讨论的零拷贝是个广义的概念,只要能够减少不必要的 CPU 拷贝,都可以被称为零拷贝。

netty的零拷贝技术

Netty 除了支持操作系统级别的零拷贝,更多提供了面向用户态的零拷贝特性,主要体现在 5 个方面:堆外内存、CompositeByteBuf、Unpooled.wrappedBuffer、ByteBuf.slice 以及 FileRegion

堆外内存

堆内内存是受到Java虚拟机管理的内存,JVM有自己的垃圾回收算法,对于使用者来说不必关心对象的内存如何回收。
堆外内存与堆内存相对应,对于整个机器内存而言,除了堆内内存之外的部分即为堆外内存。

申请堆外内存

// 分配 10M 堆外内存long address = unsafe.allocateMemory(10 * 1024 * 1024);

与 DirectByteBuffer 不同的是,Unsafe#allocateMemory 所分配的内存必须自己手动释放,否则会造成内存泄漏,这也是 Unsafe 不安全的体现。Unsafe 同样提供了内存释放的操作:

unsafe.freeMemory(address);

堆外内存的引用关系如下:
在这里插入图片描述

first 是 Cleaner 类中的静态变量,Cleaner 对象在初始化时会加入 Cleaner 链表中。DirectByteBuffer 对象包含堆外内存的地址、大小以及 Cleaner 对象的引用,ReferenceQueue 用于保存需要回收的 Cleaner 对象。
当发生 GC 时,DirectByteBuffer 对象被回收
此时 Cleaner 对象不再有任何引用关系,在下一次 GC 时,该 Cleaner 对象将被添加到 ReferenceQueue 中,并执行 clean() 方法。clean() 方法主要做两件事情:

1、将 Cleaner 对象从 Cleaner 链表中移除;
2、调用 unsafe.freeMemory 方法清理堆外内存。

ByteBuf

ByteBuf 分类
可以划分为三个不同的维度:Heap/Direct、Pooled/Unpooled和Unsafe/非 Unsafe
**Heap/Direct 就是堆内和堆外内存。**Heap 指的是在 JVM 堆内分配,底层依赖的是字节数据;Direct 则是堆外内存,不受 JVM 限制,分配方式依赖 JDK 底层的 ByteBuffer。

**Pooled/Unpooled 表示池化还是非池化内存。**Pooled 是从预先分配好的内存中取出,使用完可以放回 ByteBuf 内存池,等待下一次分配。而 Unpooled 是直接调用系统 API 去申请内存,确保能够被 JVM GC 管理回收。

Unsafe/非 Unsafe 的区别在于操作方式是否安全。 Unsafe 表示每次调用 JDK 的 Unsafe 对象操作物理内存,依赖 offset + index 的方式操作数据。非 Unsafe 则不需要依赖 JDK 的 Unsafe 对象,直接通过数组下标的方式操作数据。
常用API

写入操作:
writeByte(byte b): 向缓冲区中写入一个字节。
writeShort(short s): 向缓冲区中写入一个短整型数据(2 字节)。
writeInt(int i): 向缓冲区中写入一个整型数据(4 字节)。
writeLong(long l): 向缓冲区中写入一个长整型数据(8 字节)。
writeBytes(byte[] src): 将字节数组中的数据写入到缓冲区。
writeBytes(ByteBuf src): 将另一个 ByteBuf 中的数据写入到缓冲区。
读取操作:
readByte(): 从缓冲区中读取一个字节。
readShort(): 从缓冲区中读取一个短整型数据(2 字节)。
readInt(): 从缓冲区中读取一个整型数据(4 字节)。
readLong(): 从缓冲区中读取一个长整型数据(8 字节)。
readBytes(byte[] dst): 从缓冲区中读取数据到字节数组。
readBytes(ByteBuf dst, int length): 从缓冲区中读取指定长度的数据到另一个 ByteBuf 中。
获取和设置操作:
getByte(int index): 获取缓冲区中指定位置的字节值。
setByte(int index, byte b): 设置缓冲区中指定位置的字节值。
getShort(int index): 获取缓冲区中指定位置的短整型数据。
setShort(int index, short s): 设置缓冲区中指定位置的短整型数据。
getInt(int index): 获取缓冲区中指定位置的整型数据。
setInt(int index, int i): 设置缓冲区中指定位置的整型数据。
getLong(int index): 获取缓冲区中指定位置的长整型数据。
setLong(int index, long l): 设置缓冲区中指定位置的长整型数据。
其他操作:
capacity(): 获取缓冲区的容量。
readerIndex() / writerIndex(): 获取读写指针的位置。
readableBytes() / writableBytes(): 获取可读/可写字节数。
clear() / resetReaderIndex() / resetWriterIndex(): 清空缓冲区或重置读写指针。
slice() / duplicate(): 创建缓冲区的切片或复制。
release(): 释放缓冲区。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/806354.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何在极狐GitLab 启用依赖代理功能

本文作者&#xff1a;徐晓伟 GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署极狐GitLab。 本文主要讲述了如何在[极狐GitLab…

SSH穿透ECS访问内网RDS数据库

处于安全考虑&#xff0c;RDS一般只会允许指定的IP进行访问&#xff0c;而我们开发环境的IP往往是动态的&#xff0c;每次IP变动都需要去修改RDS的白名单&#xff0c;为我们的工作带来很大的不便。 那么如何去解决这个问题&#xff1f; 假如我们有一台ESC服务器&#xff0c;E…

STC89C52学习笔记(八)

STC89C52学习笔记&#xff08;八&#xff09; 综述&#xff1a;本文讲述了LED点阵屏以及如何进行数据串行输入&#xff0c;并行输出。 一、LED点阵屏 1.介绍 LED点阵屏由多个LED组成&#xff0c;以矩阵形式排列&#xff08;类似于矩阵键盘&#xff09;&#xff0c;像素一般…

CSS滚动条样式修改

前言 目前我们可以通过 CSS伪类 来实现滚动条的样式修改&#xff0c;以下为修改滚动条样式用到的CSS伪类&#xff1a; ::-webkit-scrollbar — 整个滚动条 ::-webkit-scrollbar-button — 滚动条上的按钮 (上下箭头) ::-webkit-scrollbar-thumb — 滚动条上的滚动滑块 ::-web…

了解大语言模型的参数高效微调(Parameter-Effcient Fine-Tuning)

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 大语言模型在众多应用领域实现了突破性的进步&#xff0c;显著提升了各种任务的完成度。然而&#xff0c;其庞大的规模也带来了高昂的计算成本。这些模型往往包含数十亿甚至上千亿参数&#xff0c;需要…

Golang使用PGO优化程序性能

文章目录 参考文章PGO是什么使用PGO的好处PGO做了什么热函数内联什么是内联内联的好处Go默认的内联策略查看内联预算PGO的热函数内联 去虚拟化调用指令高速缓存 PGO有什么缺点可执行程序变大构建时间变长 PGO怎么使用典型的工作流程收集CPU配置文件生产环境启动PGO代码改动重新…

Electron+React 搭建桌面应用

创建应用程序 创建 Electron 应用 使用 Webpack 创建新的 Electron 应用程序&#xff1a; npm init electron-applatest my-new-app -- --templatewebpack 启动应用 npm start 设置 Webpack 配置 添加依赖包&#xff0c;确保可以正确使用 JSX 和其他 React 功能&#xff…

3.1 基本形式 机器学习

从本章本节开始就开始正式介绍机器学习的算法了&#xff01;我们首先登场的是---------线性模型。 w可以理解为权重&#xff0c;我们的x就是我们的样本点的各个特征数值&#xff0c;最后输出模型f&#xff08;x&#xff09;。其代表我们把样本点带入&#xff0c;以二分类为例&a…

厂房起火3D消防灭火安全救援模拟演练

深圳VR公司华锐视点依托前沿的VR虚拟现实制作、三维仿真和图形图像渲染技术&#xff0c;将参训者带入栩栩如生的火灾现场。佩戴VR头盔&#xff0c;参训者将真切体验火势蔓延的紧张与危机&#xff0c;身临其境地感受火灾的恐怖。 并且消防安全VR虚拟现实演练系统精心模拟了住宅、…

修复 Windows 上的 PyTorch 1.1 github 模型加载权限错误

问题: 在 Windows 计算机上执行示例 github 模型加载时,生成了 master.zip 文件的权限错误(请参阅下面的错误堆栈跟踪)。 错误堆栈跟踪: 在[4]中:en2de = torch.hub.load(pytorch/fairseq, transformer.wmt16.en-de, tokenizer=moses, bpe=subword_nmt) 下载:“https://…

uniapp 卡片勾选

前言 公司的app项目使用的uniapp&#xff0c;项目里有一个可勾选的卡片功能&#xff0c;效果图如下&#xff1a; 找了一圈没找到什么太好的组件&#xff0c;于是就自己简单写了一个&#xff0c;记录一下。避免以后还会用到 代码 <template><view class"card-…

ELK 企业级日志分析系统 简单介绍

目录 一 ELK 简介 1&#xff0c; elk 是什么 2&#xff0c;elk 架构图 3&#xff0c;elk 日志处理步骤 二 Elasticsearch 简介 1&#xff0c; Elasticsearch 是什么 2&#xff0c; Elasticsearch 的核心概念 3&#xff0c; Elasticsearch 的原理 三 Logstas…

labview技术交流-如何判断一个数是否为质数

问题起源 如何判断一个数是否为质数&#xff0c;其实并不难&#xff0c;只要你知道质数的定义&#xff0c;按照它的定义去编写代码就可以了。但是没有思路的人可能就会一直找不到方向&#xff0c;所以我就简单介绍一下。 还有我想吐槽的点&#xff0c;labview本来就是很小众的语…

Harmony鸿蒙南向驱动开发-SPI

SPI即串行外设接口&#xff08;Serial Peripheral Interface&#xff09;&#xff0c;是一种高速的&#xff0c;全双工&#xff0c;同步的通信总线。SPI是由Motorola公司开发&#xff0c;用于在主设备和从设备之间进行通信。 运作机制 在HDF框架中&#xff0c;SPI的接口适配模…

【MATLAB源码-第6期】基于matlab的QPSK的误码率BER和误符号率SER仿真。

1、算法描述 QPSK&#xff0c;有时也称作四位元PSK、四相位PSK、4-PSK&#xff0c;在坐标图上看是圆上四个对称的点。通过四个相位&#xff0c;QPSK可以编码2位元符号。图中采用格雷码来达到最小位元错误率&#xff08;BER&#xff09; — 是BPSK的两倍. 这意味著可以在BPSK系统…

瑞山集团携手数环通iPaaS,实现ERP与CRM无缝对接

01 客户背景 瑞山集团位于粤港澳大湾区核心商圈----佛山市&#xff0c;是一家致力于各种新型材料添加剂应用推广&#xff0c;集科研、销售和服务于一身的新材料企业&#xff0c;产品覆盖了塑料黏胶剂、水性涂料、3D打印材料、电子材料、纳米材料等各种行业。 公司旗下拥有几间不…

计算机视觉 | 基于 ORB 特征检测器和描述符的全景图像拼接算法

Hi&#xff0c;大家好&#xff0c;我是半亩花海。本项目实现了基于 ORB 特征检测器和描述符的全景图像拼接算法&#xff0c;能够将两张部分重叠的图像拼接成一张无缝连接的全景图像。 文章目录 一、随机抽样一致算法二、功能实现三、代码解析四、效果展示五、完整代码 一、随机…

【面试题】微博、百度等大厂的排行榜如何实现?

背景 现如今每个互联网平台都会提供一个排行版的功能&#xff0c;供人们预览最新最有热度的一些消息&#xff0c;比如百度&#xff1a; 再比如微博&#xff1a; 我们要知道&#xff0c;这些互联网平台每天产生的数据是非常大&#xff0c;如果我们使用MySQL的话&#xff0c;db实…

第十一届蓝桥杯大赛软件赛省赛C/C++ 大学 B 组

第十一届蓝桥杯大赛软件赛省赛C/C 大学 B 组 文章目录 第十一届蓝桥杯大赛软件赛省赛C/C 大学 B 组1、字串排序2、门牌制作3、既约分数4、蛇形填数5、跑步锻炼6、七段码7、成绩统计8、回文日期9、子串分值和10、平面切分 1、字串排序 // 转载博客链接 https://blog.csdn.net/we…

【QT+QGIS跨平台编译】175:【QGIS_App跨平台编译】—【错误处理:未定义的class APP_EXPORT】

点击查看专栏目录 文章目录 一、未定义的class APP_EXPORT二、错误处理 一、未定义的class APP_EXPORT 报错信息&#xff1a; 二、错误处理 第18行增加&#xff1a; #include "qgis_app.h"