1、Jdk自带ByteBuffer
1.1、ByteBuffer简介
事实上,jdk自1.4版本,就已经提供了nio的ByteBuffer,用于在Java程序中操作原始数据。ByteBuffer可以用来读取和写入二进制数据,例如网络传输的数据、文件的内容等。
ByterBuffer的部分源代码如下
public abstract class ByteBufferextends Bufferimplements Comparable<ByteBuffer>
{// These fields are declared here rather than in Heap-X-Buffer in order to// reduce the number of virtual method invocations needed to access these// values, which is especially costly when coding small buffers.//final byte[] hb; // Non-null only for heap buffersfinal int offset;boolean isReadOnly; // Valid only for heap buffers// Creates a new buffer with the given mark, position, limit, capacity,// backing array, and array offset//ByteBuffer(int mark, int pos, int lim, int cap, // package-privatebyte[] hb, int offset){super(mark, pos, lim, cap);this.hb = hb;this.offset = offset;}// Creates a new buffer with the given mark, position, limit, and capacity//ByteBuffer(int mark, int pos, int lim, int cap) { // package-privatethis(mark, pos, lim, cap, null, 0);}
}
从定义上看,它属于抽象类,不可直接实例化。它有两个非抽象子类,分别是:
- HeapByteBuffer:在JVM的堆内存中创建的ByteBuffer对象。特点是可以快速分配和释放内存,但是读写性能相对较低,因为需要进行数据的复制。
- DirectByteBuffer:操作系统的本地内存中创建的ByteBuffer对象。本地内存是直接在操作系统中分配的内存空间,对于大量的数据操作和网络传输等场景,DirectByteBuffer的读写性能相对较高。需要显式地调用System.gc()进行内存回收,否则可能会导致内存泄露。
1.2、ByteBuffer缺陷
ByteBuffer自身存在以下局限性:
1)长度一经初始化即固定,无法动态扩展。分配少了无法容纳大数据,分配多了浪费内存。
public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(10);buffer.putInt(1);buffer.putInt(2);buffer.putInt(3);System.out.println(buffer);//Exception in thread "main" java.nio.BufferOverflowException//at java.nio.Buffer.nextPutIndex(Buffer.java:532)//at java.nio.HeapByteBuffer.putInt(HeapByteBuffer.java:375)//at jforgame.demo.ByteBufferTest.main(ByteBufferTest.java:11)}
2)ByteBuffer只有一个标识位置的指针,读写不方便。
public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put("jforgame".getBytes());// 需要flip()下,把limit设为position,position设为0buffer.flip();byte[] data = new byte[buffer.remaining()];buffer.get(data);String decoded = new String(data);System.out.println(decoded);}
2、Netty的ByteBuf
2.1、ByteBuf简介
类似的,ByteBuf也是一个抽象类,无法直接实例化。
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>, ByteBufConvertible {public ByteBuf() {}public abstract int capacity();public abstract ByteBuf capacity(int var1);public abstract int maxCapacity();public abstract ByteBufAllocator alloc();// 篇幅原因,省略其他方法
}
ByteBuf的子类非常复杂,这里作一个简要版
ByteBuf
|
+-- AbstractByteBuf|+-- PooledByteBuf| || +-- PooledHeapByteBuf| +-- PooledDirectByteBuf+-- UnpooledHeapByteBuf+-- UnpooledDirectByteBuf
ByteBuf不同子类实现原理不太一样,这里介绍ByteBuf基本功能
-
内存管理:ByteBuf在内部维护了一个字节数组来存储数据,可以通过其他方式(如直接内存)创建ByteBuf来管理内存。
-
读写操作:可以使用ByteBuf的read和write方法来读取和写入字节数据。这些方法支持基本的数据类型,如int、long、float、double等,以及字节数组、字符串等。
-
索引操作:ByteBuf提供了readIndex和writeIndex两个指针,分别指示当前读取和写入的位置。可以使用相关方法来控制这些指针的位置,例如setIndex、readBytes、writeBytes等。
-
顺序访问:ByteBuf提供了一系列顺序访问的方法,例如get、set、readBytes、writeBytes等。这些方法可以按照指定的顺序读取或写入字节数据。
-
随机访问:ByteBuf还支持随机访问,可以通过set、get方法直接访问指定位置的字节数据。
-
引用计数:ByteBuf使用引用计数来管理内存的释放。可以使用retain和release方法来增加和减少引用计数,当引用计数为0时,ByteBuf将会释放内存。
-
缓冲区类型:ByteBuf有两种类型,HeapByteBuf和DirectByteBuf。HeapByteBuf使用JVM堆内存来存储数据,而DirectByteBuf使用直接内存存储数据。可以根据需求选择不同的类型。
2.2、ByteBuf自动扩容
ByteBuf的初始大小,作用类似与ArrayList的构造函数,只是为了减少扩容的次数。
public static void main(String[] args) {ByteBuf buffer = Unpooled.buffer(10);buffer.writeInt(1);buffer.writeInt(2);buffer.writeInt(3);System.out.println(buffer);}
每次写入的时候,会检测写入位置是否越界, 越界则扩展,扩展大小为下一个不小于最小容量的2的N次幂数值。
2.3、ByteBuf的读写位置分离
Netty提供了两个指针变量,其中,readerIndex用于标示读取索引,writerIndex用于标示写入索引。这两个指针将ByteBuf分为三个区域,如下:
discardableBytes | readableBytes | writableBytes |
0 readerIndex writerIndex capacity
当应用程序调用read操作时,从readerIndex开始读取。readerIndex到writerIndex之间的区域是可读区域,writerIndex到capacity的区域是可写入区域(动态扩展时会修改capacity)。0到readerIndex之间的区域是已经读取过的缓存区,可以调用disacardBytes操作来压缩空间,也可结合markPosition等API进行数据重读。
2.4、ByteBuf的Clear操作
Clear操作不清除缓存区数据,只是重置readerIndex和writerIndex的值。
public static void main(String[] args) {ByteBuf buffer = Unpooled.buffer(10);buffer.writeInt(1);buffer.writeInt(2);// 重置读写索引buffer.clear();buffer.writeInt(3);buffer.writeInt(4);System.out.println(buffer.readInt());System.out.println(buffer.readInt());// 打印 3 4}
2.5、ByteBuf的Mark和Reset操作
Mark操作用于备份数据,Reste操作用于回滚数据。由于ByteBuf有读写索引,相应的,Mark和Reset操作有4个方法,如下:
- markReaderIndex: ==> this.markedReaderIndex = this.readerIndex;
- resetReaderIndex: ==> this.readerIndex = this.markedReaderIndex;
- markWriterIndex: ==> this.markedWriterIndex = this.writerIndex;
- resetWriterIndex: ==> this.writerIndex = this.markedWriterIndex;
3、通信IO粘包拆包问题
Tcp通信面向的是字节流,如同水管里的水,没有边界。因此,传输过程中会出现粘包(多个包融合在一起),拆包(一个包分成多个小包)问题。
很难解决吗,其实非常简单,也就几行代码的事!!
3.1使用ByteBuf相关API解决问题
假设我们的私有协议设计如下:
// ----------------protocol pattern-------------------------// packetLength | cmd | body// int int byte[]
- 解码器先读4个字节的长度,表示消息的字节数长度;若不足4个字节,则等待下一次字节流
- 读取到数据长度之后,假设为100,代表后面的消息(cmd+body)为100字节数
- 假设readerIndex到writerIndex直接的可读缓存区大于等于100,则读取到一个完整的消息
- 否则,回滚readerIndex,等待下一波字节流的到来
@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < 4) {return;}in.markReaderIndex();// ----------------protocol pattern-------------------------// packetLength | cmd | body// int int byte[]int length = in.readInt();if (length > maxProtocolBytes) {logger.error("message data frame [{}] too large, close session now", length);ctx.close();return;}if (in.readableBytes() < length) {in.resetReaderIndex();return;}int cmd = in.readInt();byte[] body = new byte[length - 4];in.readBytes(body);Class<?> msgClazz = messageFactory.getMessage(cmd);out.add(messageCodec.decode(msgClazz, body));}
总体来说,利用Netty的API,还是非常方便的处理半包读写问题。