Netty的解码器和编码器

链路图

一个完整的RPC请求中,netty对请求数据和响应数据的处理流程如下图所示

a6c09e60ef19479a97f2e519d10006ad.png

网络线路中传输的都是二进制数据,之后netty将二进制数据解码乘POJO对象,让客户端或者服务端程序处理。

解码的工具称为解码器,是一个入站处理器InBound。

编码的工具称为编码器,是一个处长处理器OutBound。

解码器

原理

解码器作为一个入站处理器,它需要将上一个入站处理器传过来的输入数据进行数据的编码或者格式转换,然后输出到下一站的入站处理器。

通常使用的ByteToMessageDecoder解码器将输入类型为ByteBuf缓冲区的数据进行解码,输出一个一个的POJO对象。

ByteToMessageDecoder是一个抽象类,继承关系如图

4ae04200c5d24576803704660b8443a8.png

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

ByteToMessageDecoder使用了模板模式,只定义了解码的流程,具体的解码逻辑由子类完成。也就是开放了decode解码方法,由具体的解码器实现。

重申一下Netty对于handler的管理是通过通道pipeline完成的,所以解码器后面的处理器可以是业务处理器。

业务处理器接收解码结果,进行业务处理。

解码器中有一个比较重要的实现是ReplayingDecoder(也是一个抽象类),它在读取ByteBuf缓冲区的数据之前,需要检查缓冲区是否有足够的字节,如果缓冲区中字节足够,则会正常读取,反之,则会停止解码。等待下一次IO时间到来时再读取。

ReplayingDecoder在内部定义了一个新的二进制缓冲区类,对ByteBuf缓冲区进行了修饰,也就是ReplayingDecoderBuffer。

也就是说,继承ReplayingDecoder的子类解码器收到的二进制数据是经过ReplayingDecoderBuffer修饰过,判断过的。不是直接读取的ByteBuf中的数据。

ReplayingDecoder除了对ByteBuf数组的修饰以外,另一个作用,也更重要的作用是做分包传输。

我们知道底层通信协议是分包传输的。也就是我们预期的包大小和顺序可能和实际的并不一样,这时候就可以通过ReplayingDecoder来处理,ReplayingDecoder通过state属性来控制状态变化。比如如下sock鉴权解码器

public class SocksAuthRequestDecoder extends ReplayingDecoder<State> {private String username;public SocksAuthRequestDecoder() {super(State.CHECK_PROTOCOL_VERSION);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {switch (state()) {case CHECK_PROTOCOL_VERSION: {if (byteBuf.readByte() != SocksSubnegotiationVersion.AUTH_PASSWORD.byteValue()) {out.add(SocksCommonUtils.UNKNOWN_SOCKS_REQUEST);break;}checkpoint(State.READ_USERNAME);}case READ_USERNAME: {int fieldLength = byteBuf.readByte();username = SocksCommonUtils.readUsAscii(byteBuf, fieldLength);checkpoint(State.READ_PASSWORD);}case READ_PASSWORD: {int fieldLength = byteBuf.readByte();String password = SocksCommonUtils.readUsAscii(byteBuf, fieldLength);out.add(new SocksAuthRequest(username, password));break;}default: {throw new Error();}}ctx.pipeline().remove(this);}@UnstableApipublic enum State {CHECK_PROTOCOL_VERSION,READ_USERNAME,READ_PASSWORD}
}

以上是偏分阶段解码,适用于那些固定长度的数据,比如整型等,但对于字符串来说,可长可短,没有具体的长度限制。如果用ReplayingDecoder来实现

@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {switch (state()) {case PARSE_1: {//基于Header-Content协议传输,Header中带有content长度,用一个int长度标识即可length = in.readInt();inBytes = new byte[];break;}case PARSE_2: {in.readBytes(inBytes,0,length);out.add(new String(inBytes,"UTF-8"));}default: {throw new Error();}}ctx.pipeline().remove(this);}

但其实对于比较复杂的业务场景中,不太建议使用ReplayingDecoder,主要原因是ReplayingDecoer在解析速度上相对较差,试想一下,replayingDecoder长度不够时,会停止解码。也就是说一个请求会被解码多次才可能最终完成。

对于字符串分包传输来说,更适合直接继承ByteToMessageDecoder基类来完成Header-Content协议的解析

@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {if(buf.readableBytes()<4){//可读字节小于4,消息头还没读满,返回。(假设Header是一个int的数据return;        }buf.markReaderIndex();int length = buf.readInt();if(buf.readableBytes()<length){buf.resetReaderIndex();        }byte[] inBytes = new byte[length];buf.readBytes(inBytes,0,length);out.add(new String(inBytes,"UTF-8"));}

除了ByteToMessageDecoder这种将二进制数据转化为POJO对象的解码器以外,还有将一种POJO转为另一种POJO对象的解码器,MessageToMessageDecoder,不同的是,后者需要指明泛型类型。比如Integer转为String,这时候泛型类型为Integer。

Netty内置的开箱即用的Decoder

FixedLengthFrameDecoder-固定长度数据包解码器

他会把入站ByteBuf数据包拆分成一个个长度为n的数据包,然后发往下一个channelHandler入站处理器

LineBasedFrameDecoder-行分割数据包解码器

如果ByteBuf数据包使用换行符/回车符作为数据包的边界分隔符。这时他会把数据包按换行符/回车符拆分成一个个数据包。

有一个行最大长度限制,如果超过这个长度还没有发现分隔符,会抛出异常

DelimiterFrameDecoder-自定义分隔符数据包解码器

他会按照自定义分隔符将ByteBuf数据包进行拆分

LengthFieldBasedFrameDecoder-自定义长度数据包解码器

基于灵活长度的数据包,在ByteBuf数据包中,加了一个长度字段,保存了原始数据包长度,解码的时候,会按照这个长度进行原始数据包的提取。

一般基于Header-Content协议的数据包,都建议使用这个解码器

public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {private final int maxFrameLength;        //发送的数据包最大长度private final int lengthFieldOffset;     //长度字段偏移量private final int lengthFieldLength;     //长度字段自己占用的字节数private final int lengthAdjustment;      //长度字段的偏移量矫正,比如长度后面还有两个字节用于存储别的信息,那么该值为2private final int initialBytesToStrip;   //丢弃的起始字节数...
}

编码器

原理

所谓的编码器就是服务端应用程序处理完之后,一般会有一个响应结果Response。也就是一个Java POJO对象。需要将他编码为最终ByteBuf二进制类型。通过流水线写入到底层的Java通道。

上面说,解码器是一个入站处理器,那么编码器就是一个出站处理器。也就是OutboundHandler。处理逻辑为每个出站处理器会将上一个出站处理器的结果作为输入,经过处理后,传递给下一个出站处理器,直至最后写入Java通道。

由于出站处理器是从后向前执行的,所以第一个处理器一定是需要将结果处理成ByteBuf类型的数据。

MessageToByteEncoder同ByteToMessageDecoder一样都是一个抽象类,用模板模式。其中encode方法由子类实现。

在最后一步之前,可能会需要将一种POJO对象转成另一种POJO对象,就像解码器中的MessageToMessageDecoder一样,编码器也有同样的MessageToMessageEncoder解码器抽象类。

编解码器

所谓的编解码器也就是把解码器和编码器放在同一个类中,这个类就叫做ByteToMessageCodec,需要同时实现encode和decode方法。

不过这样的话,解码和编码的不同的代码就会出现在一个类中。出现逻辑混乱。Netty提供了另一种方式可以让编码代码和解码代码放在两个类,同时把编码工作和解码工作组合起来

编解码组合器

这个编解码组合器称为CombinedChanneldDuplexHandler组合器,比如客户端的编解码组合器就是用的这种方式

public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder>implements HttpClientUpgradeHandler.SourceCodec {...
}public class HttpResponseDecoder extends HttpObjectDecoder {...
}public abstract class HttpObjectDecoder extends ByteToMessageDecoder {private enum State {SKIP_CONTROL_CHARS,READ_INITIAL,READ_HEADER,READ_VARIABLE_LENGTH_CONTENT,READ_FIXED_LENGTH_CONTENT,READ_CHUNK_SIZE,READ_CHUNKED_CONTENT,READ_CHUNK_DELIMITER,READ_CHUNK_FOOTER,BAD_MESSAGE,UPGRADED}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {if (resetRequested) {resetNow();}switch (currentState) {case SKIP_CONTROL_CHARS:// Fall-throughcase READ_INITIAL: try {AppendableCharSequence line = lineParser.parse(buffer);if (line == null) {return;}String[] initialLine = splitInitialLine(line);if (initialLine.length < 3) {// Invalid initial line - ignore.currentState = State.SKIP_CONTROL_CHARS;return;}message = createMessage(initialLine);currentState = State.READ_HEADER;// fall-through} catch (Exception e) {out.add(invalidMessage(buffer, e));return;}case READ_HEADER: try {State nextState = readHeaders(buffer);if (nextState == null) {return;}currentState = nextState;switch (nextState) {case SKIP_CONTROL_CHARS:// fast-path// No content is expected.out.add(message);out.add(LastHttpContent.EMPTY_LAST_CONTENT);resetNow();return;case READ_CHUNK_SIZE:if (!chunkedSupported) {throw new IllegalArgumentException("Chunked messages not supported");}// Chunked encoding - generate HttpMessage first.  HttpChunks will follow.out.add(message);return;default:/*** <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a> states that if a* request does not have either a transfer-encoding or a content-length header then the message body* length is 0. However for a response the body length is the number of octets received prior to the* server closing the connection. So we treat this as variable length chunked encoding.*/long contentLength = contentLength();if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {out.add(message);out.add(LastHttpContent.EMPTY_LAST_CONTENT);resetNow();return;}assert nextState == State.READ_FIXED_LENGTH_CONTENT ||nextState == State.READ_VARIABLE_LENGTH_CONTENT;out.add(message);if (nextState == State.READ_FIXED_LENGTH_CONTENT) {// chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by chunk.chunkSize = contentLength;}// We return here, this forces decode to be called again where we will decode the contentreturn;}} catch (Exception e) {out.add(invalidMessage(buffer, e));return;}case READ_VARIABLE_LENGTH_CONTENT: {// Keep reading data as a chunk until the end of connection is reached.int toRead = Math.min(buffer.readableBytes(), maxChunkSize);if (toRead > 0) {ByteBuf content = buffer.readRetainedSlice(toRead);out.add(new DefaultHttpContent(content));}return;}case READ_FIXED_LENGTH_CONTENT: {int readLimit = buffer.readableBytes();// Check if the buffer is readable first as we use the readable byte count// to create the HttpChunk. This is needed as otherwise we may end up with// create an HttpChunk instance that contains an empty buffer and so is// handled like it is the last HttpChunk.//// See https://github.com/netty/netty/issues/433if (readLimit == 0) {return;}int toRead = Math.min(readLimit, maxChunkSize);if (toRead > chunkSize) {toRead = (int) chunkSize;}ByteBuf content = buffer.readRetainedSlice(toRead);chunkSize -= toRead;if (chunkSize == 0) {// Read all content.out.add(new DefaultLastHttpContent(content, validateHeaders));resetNow();} else {out.add(new DefaultHttpContent(content));}return;}/*** everything else after this point takes care of reading chunked content. basically, read chunk size,* read chunk, read and ignore the CRLF and repeat until 0*/case READ_CHUNK_SIZE: try {AppendableCharSequence line = lineParser.parse(buffer);if (line == null) {return;}int chunkSize = getChunkSize(line.toString());this.chunkSize = chunkSize;if (chunkSize == 0) {currentState = State.READ_CHUNK_FOOTER;return;}currentState = State.READ_CHUNKED_CONTENT;// fall-through} catch (Exception e) {out.add(invalidChunk(buffer, e));return;}case READ_CHUNKED_CONTENT: {assert chunkSize <= Integer.MAX_VALUE;int toRead = Math.min((int) chunkSize, maxChunkSize);if (!allowPartialChunks && buffer.readableBytes() < toRead) {return;}toRead = Math.min(toRead, buffer.readableBytes());if (toRead == 0) {return;}HttpContent chunk = new DefaultHttpContent(buffer.readRetainedSlice(toRead));chunkSize -= toRead;out.add(chunk);if (chunkSize != 0) {return;}currentState = State.READ_CHUNK_DELIMITER;// fall-through}case READ_CHUNK_DELIMITER: {final int wIdx = buffer.writerIndex();int rIdx = buffer.readerIndex();while (wIdx > rIdx) {byte next = buffer.getByte(rIdx++);if (next == HttpConstants.LF) {currentState = State.READ_CHUNK_SIZE;break;}}buffer.readerIndex(rIdx);return;}case READ_CHUNK_FOOTER: try {LastHttpContent trailer = readTrailingHeaders(buffer);if (trailer == null) {return;}out.add(trailer);resetNow();return;} catch (Exception e) {out.add(invalidChunk(buffer, e));return;}case BAD_MESSAGE: {// Keep discarding until disconnection.buffer.skipBytes(buffer.readableBytes());break;}case UPGRADED: {int readableBytes = buffer.readableBytes();if (readableBytes > 0) {// Keep on consuming as otherwise we may trigger an DecoderException,// other handler will replace this codec with the upgraded protocol codec to// take the traffic over at some point then.// See https://github.com/netty/netty/issues/2173out.add(buffer.readBytes(readableBytes));}break;}default:break;}}...
}

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/652131.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

物联网协议Coap之C#基于Mozi的CoapClient调用解析

目录 前言 一、CoapClient相关类介绍 1、CoapClient类图 2、CoapClient的设计与实现 3、SendMessage解析 二、Client调用分析 1、创建CoapClient对象 2、实际发送请求 3、Server端请求响应 4、控制器寻址 总结 前言 在之前的博客内容中&#xff0c;关于在ASP.Net Co…

node.js 分布式锁看这篇就够用了

Redis SETNX 命令背后的原理探究 当然&#xff0c;让我们通过一个简单的例子&#xff0c;使用 Redis CLI&#xff08;命令行界面&#xff09;来模拟获取锁和释放锁的过程。 在此示例中 获取锁: # 首先&#xff0c;设置锁密钥的唯一值和过期时间(秒) 127.0.0.1:6379> SET …

数字三角形(很经典的动态规划问题)

给定一个如下图所示的数字三角形&#xff0c;从顶部出发&#xff0c;在每一结点可以选择移动至其左下方的结点或移动至其右下方的结点&#xff0c;一直走到底层&#xff0c;要求找出一条路径&#xff0c;使路径上的数字的和最大。 73 88 1 02 7 4 4 4 5 2 6 …

第2章-神经网络的数学基础——python深度学习

第2章 神经网络的数学基础 2.1 初识神经网络 我们来看一个具体的神经网络示例&#xff0c;使用 Python 的 Keras 库 来学习手写数字分类。 我们这里要解决的问题是&#xff0c; 将手写数字的灰度图像&#xff08;28 像素28 像素&#xff09;划分到 10 个类别 中&#xff08;0…

基于多种CNN模型在清华新闻语料分类效果上的对比

该实验项目目录如图&#xff1a; 1、 模型 1.1. TextCNN # coding: UTF-8 import torch import torch.nn as nn import torch.nn.functional as F import numpy as npclass Config(object):"""配置参数"""def __init__(self, dataset, embedd…

【C++类与对象(上)】

C类与对象(上&#xff09; 1.面向过程和面向对象初步认识2.类的引入3.类的定义4.类的访问限定符及封装4.1 访问限定符4.2 封装 5.类的作用域6.类的实例化7.类的对象大小的计算7.1如何计算类对象的大小7.2 类对象的存储方式猜测7.3结构体内存对齐规则 8.类成员函数的this指针8.1…

Java多线程基础-18:线程安全的集合类与ConcurrentHashMap

Java标准库提供了很多集合类&#xff0c;但有一些集合类是线程不安全的&#xff0c;也就是说&#xff0c;在多线程环境下可能会出问题的。常用的ArrayList&#xff0c;LinkedList&#xff0c;HashMap&#xff0c;PriorityQueue等都是线程不安全的&#xff08;Vector, Stack, Ha…

Android创建工程

语言选择Java&#xff0c;我用的Java 最小SDK&#xff1a;就是开发的APP支持的最小安卓版本 Gradle 是一款Google 推出的基于 JVM、通用灵活的项目构建工具&#xff0c;支持 Maven&#xff0c;JCenter 多种第三方仓库;支持传递性依赖管理、废弃了繁杂的xml 文件&#xff0c;转而…

关于ArcGIS的Update更新工具的疑问

Update更新工具官方帮助文件解释如下&#xff1a; 但是根据这个插图很让人疑惑&#xff0c;输入要素是蓝色&#xff0c;更新要素是黄色&#xff0c;输出要素为绿色&#xff0c;而且全部是绿色。我一直以为是与更新要素相交&#xff08;被包含切割&#xff09;的哪些输入要素都被…

【常用工具】7-Zip 解/压缩软件——基本使用方法

在实际日常工作或项目中&#xff0c;经常会遇到需要在window操作系统上压缩文件&#xff0c;在Linux操作系统上解压缩的场景&#xff0c;一款实用的压缩软件迫在眉睫&#xff0c;经过实际使用总结&#xff0c;7-Zip可以很好的解决很多压缩和解压缩问题&#xff0c;其基本使用方…

WordPress如何自定义日期和时间格式?附PHP日期和时间格式字符串

WordPress网站在很多地方都需要用到日期和时间&#xff0c;那么我们应该在哪里设置日期和时间呢&#xff1f;又如何自定义日期和时间格式呢&#xff1f;下面boke112百科就跟大家一起来学习一下PHP标准化的日期和时间格式字符串。 特别说明&#xff1a;格式字符是标准化的&#…

canvas绘制旋转的大风车

查看专栏目录 canvas实例应用100专栏&#xff0c;提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重…

LCweekly-game

ExScorecomplete situation1220717/719(解答错误)30523/537(超时,弱智题已AC)40 有用的是Ex2和Ex4 Ex2 my solution class Solution { public://calculate xs l-time 幂乘int jiecheng(int x,int l){int zx;for(int i0;i<l;i){if(z>pow(10,4.5))return 0;zz*z;}return…

C#算法(11)—求三个点构成圆的圆心坐标和半径

前言 我们在上位机开发领域也经常会碰到根据三个点求出圆的圆形、半径等信息的场景,本文就是详细的介绍如何根据三个点使用C#代码求出三点构成的圆的圆心坐标、圆半径、三点构成的圆弧的角度。 1、3点求圆分析 A、B、C三个点都是圆上的坐标点,过向量AB做中垂线,过向量AC做…

What is `@Scheduled` does?

Scheduled 是Spring框架中用于定时任务调度的注解&#xff0c;它允许我们在类的方法上声明一个方法作为定时任务&#xff0c;由Spring容器统一管理和执行。使用此注解后&#xff0c;Spring会根据注解中的属性配置&#xff0c;按照指定的时间规则自动调用该方法。 public class…

文心一言 VS ChatGPT :谁是更好的选择?

前言 目前各种大模型、人工智能相关内容覆盖了朋友圈已经各种媒体平台&#xff0c;对于Ai目前来看只能说各有千秋。GPT的算法迭代是最先进的&#xff0c;但是它毕竟属于国外产品&#xff0c;有着网络限制、注册限制、会员费高昂等弊端&#xff0c;难以让国内用户享受。文心一言…

2023年度AI盘点 AIGC|AGI|ChatGPT|人工智能大模型

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 2023年是人工智能大语言模型大爆发的一年&#xff0c;一些概念和英文缩写也在这一年里集中出现&#xff0c;很容易混淆&#xff0c;甚至把人搞懵。 文章目录 前言01 《ChatGPT 驱动软件开…

使用一个定时器(timer_fd)管理多个定时事件

使用一个定时器(timer_fd)管理多个定时事件 使用 timerfd_xxx 系列函数可以很方便的与 select、poll、epoll 等IO复用函数相结合&#xff0c;实现基于事件的定时器功能。大体上有两种实现思路&#xff1a; 为每个定时事件创建一个 timer_fd&#xff0c;绑定对应的定时回调函数…

QEMU源码全解析41 —— Machine(11)

接前一篇文章&#xff1a;QEMU源码全解析40 —— Machine&#xff08;10&#xff09; 本文内容参考&#xff1a; 《趣谈Linux操作系统》 —— 刘超&#xff0c;极客时间 《QEMU/KVM》源码解析与应用 —— 李强&#xff0c;机械工业出版社 特此致谢&#xff01; 时间过去了几…

go语言(二十一)---- channel的关闭

channel不像文件一样需要经常去关闭&#xff0c;只有当你确实没有任何发送数据了&#xff0c;或者你想显示的结束range循环之类的&#xff0c;才去关闭channel。关闭channel后&#xff0c;无法向channel再发送数据&#xff0c;&#xff08;引发pannic错误后&#xff0c;导致接收…