【Netty】ByteToMessageDecoder源码解析

目录

1.协议说明

2.类的实现

3.Decoder工作流程

4.源码解析

4.1 ByteToMessageDecoder#channelRead

4.2 累加器Cumulator

 4.3 解码过程

4.4 Decoder实现举例

5. 如何开发自己的Decoder


1.协议说明

Netty框架是基于Java NIO框架,性能彪悍,支持的协议丰富,广受Java爱好者亲莱,支持如下协议

  • TCP/UDP:Netty提供了基于NIO的TCP和UDP编程框架,可以用来构建高性能、高可用性的网络应用。
  • HTTP/HTTPS:Netty提供了HTTP/HTTPS编程框架,可以用来开发Web服务器和客户端。
  • WebSocket:Netty提供了WebSocket编程框架,可以用来实现双向通信应用程序,如聊天室等。
  • SPDY/HTTP2:Netty提供了SPDY和HTTP2编程框架,可以用来实现高效的Web应用程序。
  • MQTT/CoAP:Netty提供了MQTT和CoAP编程框架,可以用来构建IoT应用程序。
     

我们在基于Netty框架开发过程中往往需要自定义私有协议,如端到端的通信协议,端到平台数据通信协议,我们需要根据业务的特点自定义数据报文格式,举例如下:

数据报文格式定义(TCP)
帧头版本命令标识符序列号设备编码帧长正文校验码
1byte1byte1byte2byte4byte       4byteN个byte2byte

假如我们定义了上述私有协议的TCP报文,通过netty框架发送和解析

发送端:某类通信设备(client)

接收端:Java应用服务(Server)

本节我主要分析一下server端解析报文的一个过程,client当然也很重要,尤其在建立TCP连接和关闭连接需要严格控制,否则服务端会发现大量的CLOSE_WAIT(被动关闭连接),甚至大量TIME_WAIT(主动关闭连接),关于这个处理之前的文章有讲解。

本节Server端是基于Netty版本:netty-all-4.1.30.Final

本节源码分析需求就是要解析一个自定义TCP协议的数据报文进行解码,关于编码解码熟悉网络编程的同学都明白,不清楚的可以稍微查阅一下资料有助于学习为什么要解码以及如何解码。本节不会对具体报文的解析做具体讲解,只对Netty提供的解码器基类ByteToMessageDecoder做一下源码分解,以及如何使用ByteToMessageDecoder开发属于自己的Decoder,接下来我们看看ByteToMessageDecoder的定义。

#继承ChannelInboundHandlerAdapter
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
}

2.类的实现

解码器的ByteToMessageDecoder ,该类继承了ChannelInboundHandlerAdapter ,ChannelInboundHandlerAdapter继承ChannelHandlerAdapter,

ChannelInboundHandlerAdapter实现ChannelInboundHandler接口,也就是说ChannelInboundHandler定义了解码器需要处理的工作(方法)
ChannelInboundHandlerAdapter是一个适配器模式,负责Decoder的扩展。它的实现有很多,简单列举一下:
  • HeartBeatHandler
  • MessageToMessageDecoder
  • SimpleChannelInboundHandler(抽象了方法channelRead0)
  •  ByteToMessageDecoder
  •  。。。。。。

以上都是比较常用的Decoder或Handler,基于这些基类还定义了很多handler,有兴趣的同学可以跟代码查阅。

3.Decoder工作流程

每当数据到达Server端时,SocketServer通过Reactor模型分配具体的worker线程进行处理数据,处理数据就需要我们的事先定义好的Decoder以及handler,假如我们定义了以下两个对象:

  • MyDecoder extends ByteToMessageDecoder{} 作为解码器
  • MyHandler extends SimpleChannelInboundHandler{} 作为解码后的业务处理器

worker线程——〉MyDecoder#channelRead实际就是调用ByteToMessageDecoder#channelRead——〉Cumulator累加器处理——〉

解码器decode处理(MyDecoder需要实现decode方法)——〉Myhandler#channelRead0处理具体的数据(msg)

4.源码解析

4.1 ByteToMessageDecoder#channelRead

    @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//如果是设置在ServerBootstrap的childHandler那么msg的对象类型就是ByteBuf,否则就执行elseif (msg instanceof ByteBuf) {//CodecOutputList对象可以查阅文档https://www.freesion.com/article/4800509769///这个out对象随着callDecode方法进行传递,解码后的数据保存在out中CodecOutputList out = CodecOutputList.newInstance();try {ByteBuf data = (ByteBuf) msg;//1.cumulation是累加器,处理tcp半包与粘包问题first = cumulation == null;if (first) {//2.第一次收到数据累加器为nullcumulation = data;} else {//3.第二次收到数据累加器需要评估ByteBuf的capacity,够用则追加到cumulation,capacity不够则进行扩容cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);}//4.调用callDecode进行解码//5.CodecOutputList out对象保存解码后的数据,它的实现是基于AbstractList,//重新定义了add(),set(),remove()等方法,其中add()方法实现对Array数组中//进行insert,没有直接拷贝而是通过对象引用,将对象指向数据索引的index,是性能的一个提升。callDecode(ctx, cumulation, out);} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {//6.如果累加器cumulation中的数据被解码器读完了,则可以完全释放累加器cumulationif (cumulation != null && !cumulation.isReadable()) {numReads = 0;cumulation.release();cumulation = null;} else if (++ numReads >= discardAfterReads) {// We did enough reads already try to discard some bytes so we not risk to see a OOME.// See https://github.com/netty/netty/issues/4275//7.释放累加器cumulation里面的已读数据,防止cumulation无限制增长numReads = 0;discardSomeReadBytes();}int size = out.size();decodeWasNull = !out.insertSinceRecycled();//8.解码完成后需要触发事先定义好的handler的channelRead()方法处理解码后的out数据fireChannelRead(ctx, out, size);//9.最终需要回收out对象out.recycle();}} else {//10.非ByteBuf直接向后触发传递ctx.fireChannelRead(msg);}}

4.2 累加器Cumulator

累加器的作用是解决tcp数据包中出现半包和粘包问题。

半包:接收到的byte字节不足一个完整的数据包,

半包处理办法:不足一个完整的数据包先放入累加器不做解码,等待续传的数据包;

粘包:接收到的byte字节数据包中包括其他数据包的数据(靠数据包协议中定义的帧头帧尾标识来识别,多于1个以上的帧头或帧尾数据包为粘包数据),

粘包处理办法:按照数据包帧结构定义去解析,需要结合累加器,解析完一个数据包交给handler去处理,剩下的不足一个数据包长度的字节保存在累加器等待续传的数据包收到之后继续解码。

ByteToMessageDecoder内部定义了Cumulator接口

    /*** Cumulate {@link ByteBuf}s.*/public interface Cumulator {/*** Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.* The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so* call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.*/ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);}

其中在类最开始的时候构建了两个对象,分别是MERGE_CUMULATOR,COMPOSITE_CUMULATOR,代码如下

    /*** Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.*/public static final Cumulator MERGE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {try {final ByteBuf buffer;//1.如果累加器ByteBuf 剩余可写的capacity不满足当前需要写入的ByteBuf(in)长度,则进行扩容累加器ByteBuf容量,执行expandCumulation方法if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {buffer = expandCumulation(alloc, cumulation, in.readableBytes());} else {buffer = cumulation;}//2.写入累加器并返回更新后的cumulationbuffer.writeBytes(in);return buffer;} finally {// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw// for whatever release (for example because of OutOfMemoryError)//3.由于是对in的拷贝,所以需要releasein.release();}}};//通过对CompositeByteBuf的累加器的实现,CompositeByteBuf内部使用ComponentList//实现对ByteBuf进行追加//ComponentList是ArrayList的实现,所以每次Add操作都是一次内存拷贝。public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {ByteBuf buffer;try {if (cumulation.refCnt() > 1) {buffer = expandCumulation(alloc, cumulation, in.readableBytes());buffer.writeBytes(in);} else {CompositeByteBuf composite;if (cumulation instanceof CompositeByteBuf) {composite = (CompositeByteBuf) cumulation;} else {composite = alloc.compositeBuffer(Integer.MAX_VALUE);composite.addComponent(true, cumulation);}composite.addComponent(true, in);in = null;buffer = composite;}return buffer;} finally {if (in != null) {//因为是对ByteBuf in的拷贝,所以需要释放in.release();}}}};

 4.3 解码过程

ByteToMessageDecoder#channelRead看到了将累加器交给callDecoder方法

//这里ByteBuf in 就是累加器对象cumulaction
// List<Object> out解码后的对象
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {//1. 循环读取累加器对象的bytewhile (in.isReadable()) {int outSize = out.size();//2.如果解码后out对象中产生数据则触发后边的handler(MyHandler)处理数据if (outSize > 0) {fireChannelRead(ctx, out, outSize);out.clear();if (ctx.isRemoved()) {break;}outSize = 0;}//3.继续解析累加器传递过来的byteint oldInputLength = in.readableBytes();//4.注意out对象是从channelRead()方法传递过来,继续传递下去decodeRemovalReentryProtection(ctx, in, out);if (ctx.isRemoved()) {break;}//4.如果这次解码没有获得任何消息if (outSize == out.size()) {//5.如果解码器decode没有消费累加器 in 任何字节,结束循环                    if (oldInputLength == in.readableBytes()) {break;//6.否则继续循环调用解码器decode} else {continue;}}//7.如果累加器ByteBuf in中可读字节数依然没有变化,说明实现的解码器decode()方法有问题,需要检查自身代码问题if (oldInputLength == in.readableBytes()) {throw new DecoderException(StringUtil.simpleClassName(getClass()) +".decode() did not read anything but decoded a message.");}//8.是否设定每次调用解码器一次,如果是,则结束本次解码if (isSingleDecode()) {break;}                }} catch (DecoderException e) {} catch (Exception cause) {}}

继续查看ByteToMessageDecoder#decodeRemovalReentryProtection方法

    //1.此方法不允许重写final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)throws Exception {decodeState = STATE_CALLING_CHILD_DECODE;try {//2.核心方法decode,这是一个抽象方法,没有实现,需要在自定义的Decoder(Mydecoder)进行实现//3.自定义Decoder需要将解码后的数据放入到out对象中decode(ctx, in, out);} finally {boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;decodeState = STATE_INIT;if (removePending) {handlerRemoved(ctx);}}}//解码decode方法需要子类(自定义的实现类)去实现该方法,最终将解码后的数据放入List<Object> outprotected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

4.4 Decoder实现举例

基于ByteToMessageDecoder的实现很多,简单列举一下

  • JsonObjectDecoder
  • RedisDecoder
  • XmlDecoder
  • MqttDecoder
  • ReplayingDecoder
  • SslDecoder
  • DelimiterBasedFrameDecoder
  • FixedLengthFrameDecoder
  • LengthFieldBasedFrameDecoder
  • ....

我们拿JsonObjectDecoder举例如下:

    @Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {// 省略代码。。。。。。int idx = this.idx;int wrtIdx = in.writerIndex();//省略代码。。。。。。。for (/* use current idx */; idx < wrtIdx; idx++) {byte c = in.getByte(idx);if (state == ST_DECODING_NORMAL) {decodeByte(c, in, idx);if (openBraces == 0) {ByteBuf json = extractObject(ctx, in, in.readerIndex(), idx + 1 - in.readerIndex());//1.解析后的对象加入out中if (json != null) {out.add(json);}in.readerIndex(idx + 1);reset();}} else if (state == ST_DECODING_ARRAY_STREAM) {//2.自身实现解析json格式的方法decodeByte(c, in, idx);if (!insideString && (openBraces == 1 && c == ',' || openBraces == 0 && c == ']')) {for (int i = in.readerIndex(); Character.isWhitespace(in.getByte(i)); i++) {in.skipBytes(1);}// skip trailing spaces.int idxNoSpaces = idx - 1;while (idxNoSpaces >= in.readerIndex() && Character.isWhitespace(in.getByte(idxNoSpaces))) {idxNoSpaces--;}ByteBuf json = extractObject(ctx, in, in.readerIndex(), idxNoSpaces + 1 - in.readerIndex());//3.解析后的对象加入out中if (json != null) {out.add(json);}in.readerIndex(idx + 1);if (c == ']') {reset();}}} //省略代码。。。。。。}if (in.readableBytes() == 0) {this.idx = 0;} else {this.idx = idx;}this.lastReaderIndex = in.readerIndex();}

5. 如何开发自己的Decoder

读了ByteToMessageDecoder的部分源码,以及它的实现JsonObjectDecoder,那么如果我们自己实现一个Decoder该如何实现,这里提供三个思路给大家,有时间再补充代码。

  • 基于ByteToMessageDecoder实现,MyDecoder extends ByteToMessageDecoder{实现decode()方法},可参考RedisDecoder、XmlDecoder等实现。
  • 基于ChannelInboundHandlerAdapter实现,这个时候需要自己负责解决TCP报文半包和粘包问题,重写其中的channelRead()方法。
  • 直接使用已经实现ByteToMessageDecoder的解码器,如FixedLengthFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder。

注意事项:

 * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
 * annotated with {@link @Sharable}.

ByteToMessageDecoder的子类不能使用@Sharable注解修饰,因为解码器只能单独为一个Channel进行解码,也就是说每个worker线程需要独立的Decoder。


 * <p>
 * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
 * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
 * to avoid leaking memory.

如果基于ChannelInboundHandlerAdapter自己实现Decoder#channelRead()方法时注意内存泄露问题,ByteBuf#readBytes(int)方法会产生一个新的ByteBuf,需要手动释放。

或者

基于ByteToMessageDecoder实现decode()方法时将解析后的对象放入out对象中(上面源码分析中有提示)

或者

使用派生的ByteBuf,如调用ByteBuf#readSlice(int)方法,返回的ByteBuf与原有ByteBuf共享内存,不会产生新的Reference count,可以避免内存泄露。

Netty Project官网也有说明:

Reference counted objects
ByteBuf.duplicate(), ByteBuf.slice() and ByteBuf.order(ByteOrder) create a derived buffer which shares the memory region of the parent buffer. A derived buffer does not have its own reference count, but shares the reference count of the parent buffer.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/91029.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL系统与内建函数

在游戏开发、特别是像《三国志》这样的大型策略游戏中,数据分析是不可或缺的。从玩家行为到游戏内的战役结果,都需要通过高效的数据分析来优化游戏体验。MySQL的系统和内建函数为这样的分析提供了强大的工具。 本文将详细介绍MySQL中常用的系统与内建函数,并通过《三国志》…

【数据结构】归并排序、基数排序算法的学习知识点总结

目录 1、归并排序 1.1 算法思想 1.2 代码实现 1.3 例题分析 2、基数排序 2.1 算法思想 2.2 代码实现 2.3 例题分析 1、归并排序 1.1 算法思想 归并排序是一种采用分治思想的经典排序算法&#xff0c;通过将待排序数组分成若干个子序列&#xff0c;将每个子序列排序&#xff…

C++中实现一些特殊的类|设计模式

1.设计一个类 不能被拷贝 拷贝只会发生在两个场景中&#xff1a;拷贝构造以及赋值运算符重载。想要让一个类禁止拷贝&#xff0c;只需要该类不能调用拷贝构造和赋值运算符重载 c98中 将拷贝构造与赋值运算符重载只声明不定义&#xff0c;不定义是因为该函数根本不会调用&#x…

【Java 进阶篇】MySQL多表查询之子查询详解

在数据库查询中&#xff0c;多表查询是一项非常常见且重要的任务。它允许我们从多个相关联的表中检索和组合数据&#xff0c;以满足各种复杂的查询需求。在多表查询中&#xff0c;子查询是一种强大的工具&#xff0c;用于在查询中嵌套另一个查询。本文将深入探讨MySQL中的子查询…

什么是好的UI设计?优漫动游

UI&#xff08;UserInterface&#xff09;&#xff0c;即界面设计&#xff0c;它是网站、App给用户在感觉&#xff08;视觉、触觉、听觉等&#xff09;和情感上带来的第一体验&#xff0c;包括人机交互、界面逻辑、界面美观设计三个方面。简单来讲&#xff0c;UI不仅是一种表现…

【Java】建筑工地智慧管理系统源码

智慧工地系统运用物联网信息技术&#xff0c;致力于推动建筑工程行业的建设发展&#xff0c;做到全自动、信息化&#xff0c;智能化的全方位智慧工地&#xff0c;实现工程施工可视化智能管理以提高工程管理信息化水平。 智慧工地平台拥有一整套完善的智慧工地解决方案&#xff…

linkedlist和arraylist的区别

LinkedList和ArrayList都是常见的数据结构&#xff0c;用于存储和操作集合元素&#xff0c;如果需要频繁进行插入和删除操作&#xff0c;LinkedList可能更适合。如果需要快速随机访问和较小的内存占用&#xff0c;ArrayList可能更合适。 以下是它们之间存在一些关键的区别&…

源码编译安装zstd

目录 1 下载源码https://github.com/facebook/zstd 2 解压 3 在解压后的目录里输入make 4 sudo make install 安装完毕 5 输入whereis zstd 检查安装结果 1 下载源码https://github.com/facebook/zstd 2 解压 3 在解压后的目录里输入make 4 sudo make install 安装完毕…

图扑软件受邀亮相 IOTE 2023 国际物联网展

IOTE 2023 国际物联网展&#xff0c;作为全球物联网领域的盛会&#xff0c;于 9 月 20 日 - 22 日在中国深圳拉开帷幕。本届展会以“IoT构建数字经济底座”为主题&#xff0c;由深圳市物联网产业协会主办&#xff0c;打造当前物联网最新科技大秀。促进物联网与各行业深度融合&a…

安卓玩机-----给app加注册码 app加弹窗 云注入弹窗

在对接很多工作室业务中有些客户需要在他们自带的有些app中加注册码或者验证码的需求。其实操作起来也很简单。很多反编译软件有自带的注入功能。例如注入弹窗。这个是需要对应的注册码来启动应用。而且是随机id。重新安装app后需要重新注册才可以继续使用&#xff0c;原则上可…

mysql面试题5:索引、主键、唯一索引、联合索引的区别?什么情况下设置了索引但无法使用?并且举例说明

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:说一说索引、主键、唯一索引、联合索引的区别? 索引、主键、唯一索引和联合索引是数据库中常用的索引类型,它们有以下区别: 索引:索引是一种数…

ShowDoc部署与应用:文档管理的最佳实践

在项目开发和协作中&#xff0c;文档管理扮演着至关重要的角色。ShowDoc作为一款卓越的开源文档管理工具&#xff0c;不仅提供强大的文档管理功能&#xff0c;还具备简单易用的协作和部署特性。我们的项目团队最初选择了ShowDoc作为文档管理工具&#xff0c;用以促进前后端协作…

UE4/5数字人MetaHuman通过已有动画进行修改

目录 通过已有动画修改动画 开始制作 创建一个关卡序列 将动画序列烘焙到控制绑定 打开我们自己创建的动画序列 之后便是烘焙出来 通过已有动画修改动画 首先架设我们已经有相关的MetaHuman的动画&#xff0c;但是这个动画因为是外部导入进来的&#xff0c;所以可能会出…

MySQL报错:this is incompatible with sql_mode=only_full_group_by 解决方法

文章目录 项目场景&#xff1a;原因分析及解决方案&#xff1a;总结&#xff1a; 项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_modeonly_f…

Vue中自定义实现类似el-table的表格效果实现行颜色根据数据去变化展示

主要使用div布局实现表格效果&#xff0c;并使用渐变实现行背景渐变的效果 页面布局 <div class"table-wrap"><div class"table-title"><divv-for"(item, index) in tableColumn":key"index":prop"item.prop&qu…

怎样选择第三方检测机构获取功能测试报告?

数字化时代&#xff0c;软件给人们的生活带来了越来越多的便利&#xff0c;产品功能测试也成为了软件开发方在研发时的重要环节&#xff0c;这关系到用户使用产品的体验感。所以做好软件功能测试对把控产品质量有着很大影响&#xff0c;通过有效的功能测试能够发现系统潜在的问…

【Linux学习】05-1Linux上安装部署各类软件

Linux&#xff08;B站黑马&#xff09;学习笔记 01Linux初识与安装 02Linux基础命令 03Linux用户和权限 04Linux实用操作 05-1Linux上安装部署各类软件 文章目录 Linux&#xff08;B站黑马&#xff09;学习笔记前言05-1Linux上安装部署各类软件JDK安装部署Tomcat安装部署maven…

Springcloud实战之自研分布式id生成器

一&#xff0c;背景 日常开发中&#xff0c;我们需要对系统中的各种数据使用 ID 唯一表示&#xff0c;比如用户 ID 对应且仅对应一个人&#xff0c;商品 ID 对应且仅对应一件商品&#xff0c;订单 ID 对应且仅对应 一个订单。我们现实生活中也有各种 ID &#xff0c;比如身…

Unity中的两种ScriptingBackend

一&#xff1a;前言 二&#xff1a;两种模式的介绍 ios&#xff1a;unity只有il2cpp模式的编译才支持64位系统&#xff0c;mono是不支持的&#xff0c;在快速开发阶段仍然支持Mono&#xff0c;但是不能再向Apple提交Mono(32位)的应用 苹果在2016年1月就要求所有新上架游戏必须支…

【独家工具】JMeterPerfReporter3.0正式版本,让你的JMeter更好用

Lemon-JMeterPerfReporter工具&#xff0c;是我们性能测试课程教研组根据JMeter性能测试报告的不足&#xff0c;定制开发的一个性能报告生成工具。有需要的同学&#xff0c;可以通过小编官方gitee账户下载&#xff0c;或咨询我免费获取哦&#xff01; 做过性能测试的人员都知道…