文章目录
- 1.概述
- 2. ByteBuf 分类
- 3. 代码实例
- 3.1 常用方法
- 3.1.1 创建ByteBuf
- 3.1.2 写入字节
- 3.1.3 扩容
- 3.1.2.1 扩容实例
- 3.1.2.2 扩容计算新容量代码
- 3.1.4 读取字节
- 3.1.5 标记回退
- 3.1.6 slice
- 3.1.7 duplicate
- 3.1.8 CompositeByteBuf
- 3.1.9 retain & release
- 3.1.9.1 retain & release
- 3.1.9.2 Netty TailContext release
- 3.2 完整实例
- 4. 参考文献
1.概述
ByteBuf 对字节进行操作
ByteBuf 四个基本属性:
- readerIndex: 读指针,字节数组,读到哪了
- writerIndex: 写指针,字节数组,写到哪了
- maxCapacity:最大容量,字节数组最大容量
- markedReaderIndex:标记读指针,
resetReaderIndex
方法可以把readerIndex
修改为markedReaderIndex
,回退重新读数据 - markedWriterIndex: 标记写指针,
resetReaderIndex
方法可以把writerIndex
修改为markedWriterIndex
,回退重新写数据
public abstract class AbstractByteBuf extends ByteBuf {int readerIndex;int writerIndex;private int markedReaderIndex;private int markedWriterIndex;private int maxCapacity;
}
2. ByteBuf 分类
ByteBuf 分为
- 直接内存或堆内存(Heap/Direct)
- 池化 和 非池化(Pooled/Unpooled)和 操作方式是否安全 (Unsafe/非 Unsafe)
ByteBuf 创建可以基于直接内存或堆内存
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
ByteBuf 池化 和 非池化
- 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
- 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
ByteBuf 操作方式是否安全 (Unsafe/非 Unsafe)
- Unsafe:表示每次调用 JDK 的 Unsafe 对象操作物理内存,依赖 offset + index 的方式操作数据
- 非 Unsafe:则不需要依赖 JDK 的 Unsafe 对象,直接通过数组下标的方式操作数据
3. 代码实例
3.1 常用方法
3.1.1 创建ByteBuf
创建ByteBuf , 默认都是池化的
// 堆内存的ByteBufByteBuf bufferHeap = ByteBufAllocator.DEFAULT.heapBuffer();// 直接内存的ByteBufByteBuf bufferDirect = ByteBufAllocator.DEFAULT.directBuffer();System.out.println(bufferHeap);System.out.println(bufferDirect);
3.1.2 写入字节
bufferHeap.writeBytes(new byte[]{1, 2, 3, 4});bufferDirect.writeBytes(new byte[]{1, 2, 3, 4});print("第一次写入", bufferHeap);print("第一次写入", bufferDirect);
3.1.3 扩容
3.1.2.1 扩容实例
- 默认 256
- 扩容加一倍
- 到了4194304,每次+4194304
for (int i = 0; i < 100; i++) {bufferHeap.writeBytes(new byte[]{1, 2, 3, 4});bufferDirect.writeBytes(new byte[]{1, 2, 3, 4});}print("批量写入&扩容", bufferHeap);print("批量写入&扩容", bufferDirect);
3.1.2.2 扩容计算新容量代码
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {ObjectUtil.checkPositiveOrZero(minNewCapacity, "minNewCapacity");if (minNewCapacity > maxCapacity) {throw new IllegalArgumentException(String.format("minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity));} else {int threshold = 4194304;if (minNewCapacity == 4194304) {return 4194304;} else {int newCapacity;if (minNewCapacity > 4194304) {newCapacity = minNewCapacity / 4194304 * 4194304;if (newCapacity > maxCapacity - 4194304) {newCapacity = maxCapacity;} else {newCapacity += 4194304;}return newCapacity;} else {for(newCapacity = 64; newCapacity < minNewCapacity; newCapacity <<= 1) {}return Math.min(newCapacity, maxCapacity);}}}
}
3.1.4 读取字节
readByte(bufferHeap);readByte(bufferDirect);print("读取一个字节", bufferHeap);print("读取一个字节", bufferDirect);
3.1.5 标记回退
bufferHeap.markReaderIndex();bufferDirect.markReaderIndex();readByte(bufferHeap);readByte(bufferDirect);print("读取一个字节", bufferHeap);print("读取一个字节", bufferDirect);System.out.println("回退");bufferHeap.resetReaderIndex();bufferDirect.resetReaderIndex();readByte(bufferHeap);readByte(bufferDirect);print("读取一个字节", bufferHeap);print("读取一个字节", bufferDirect);
3.1.6 slice
// 无参 slice 是从原始 ByteBuf 的 read index 到 write index 之间的内容进行切片// slice 和 bufferHeap 共享一块内存ByteBuf slice = bufferHeap.slice();slice.setByte(0, 9);print("slice", slice);readByte(bufferHeap);
3.1.7 duplicate
// 内存拷贝不共享内存ByteBuf duplicate = bufferHeap.duplicate();print("duplicate", duplicate);print("bufferHeap", bufferHeap);duplicate.writeBytes(new byte[]{5});print("duplicate", duplicate);print("bufferHeap", bufferHeap);
3.1.8 CompositeByteBuf
// CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,// 每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。// 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制// 缺点,复杂了很多,多次操作会带来性能的损耗ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();// true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0buf3.addComponents(true, buf1, buf2);print("buf3", buf3);
3.1.9 retain & release
3.1.9.1 retain & release
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
- 每个 ByteBuf 对象的初始计数为 1
- 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
- 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
bufferHeap.retain();bufferDirect.retain();bufferHeap.release();bufferDirect.release();print("release bufferHeap", bufferHeap);print("release bufferDirect", bufferDirect);bufferHeap.release();bufferDirect.release();print("release bufferHeap", bufferHeap);print("release bufferDirect", bufferDirect);bufferHeap.release();bufferDirect.release();print("release bufferHeap", bufferHeap);print("release bufferDirect", bufferDirect);
3.1.9.2 Netty TailContext release
io.netty.channel.DefaultChannelPipeline.TailContext
io.netty.channel.DefaultChannelPipeline.TailContext#channelRead
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {onUnhandledInboundMessage(ctx, msg);}
io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(ChannelHandlerContext, Object)
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {onUnhandledInboundMessage(msg);if (logger.isDebugEnabled()) {logger.debug("Discarded message pipeline : {}. Channel : {}.",ctx.pipeline().names(), ctx.channel());}}
3.2 完整实例
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;public class ByteBufStudy {public static void main(String[] args) {// 堆内存的ByteBufByteBuf bufferHeap = ByteBufAllocator.DEFAULT.heapBuffer();// 直接内存的ByteBufByteBuf bufferDirect = ByteBufAllocator.DEFAULT.directBuffer();System.out.println(bufferHeap);System.out.println(bufferDirect);bufferHeap.writeBytes(new byte[]{1, 2, 3, 4});bufferDirect.writeBytes(new byte[]{1, 2, 3, 4});print("第一次写入", bufferHeap);print("第一次写入", bufferDirect);for (int i = 0; i < 100; i++) {bufferHeap.writeBytes(new byte[]{1, 2, 3, 4});bufferDirect.writeBytes(new byte[]{1, 2, 3, 4});}print("批量写入&扩容", bufferHeap);print("批量写入&扩容", bufferDirect);readByte(bufferHeap);readByte(bufferDirect);print("读取一个字节", bufferHeap);print("读取一个字节", bufferDirect);bufferHeap.markReaderIndex();bufferDirect.markReaderIndex();readByte(bufferHeap);readByte(bufferDirect);print("读取一个字节", bufferHeap);print("读取一个字节", bufferDirect);System.out.println("回退");bufferHeap.resetReaderIndex();bufferDirect.resetReaderIndex();readByte(bufferHeap);readByte(bufferDirect);print("读取一个字节", bufferHeap);print("读取一个字节", bufferDirect);// 无参 slice 是从原始 ByteBuf 的 read index 到 write index 之间的内容进行切片// slice 和 bufferHeap 共享一块内存ByteBuf slice = bufferHeap.slice();slice.setByte(0, 9);print("slice", slice);readByte(bufferHeap);// 内存拷贝不共享内存ByteBuf duplicate = bufferHeap.duplicate();print("duplicate", duplicate);print("bufferHeap", bufferHeap);duplicate.writeBytes(new byte[]{5});print("duplicate", duplicate);print("bufferHeap", bufferHeap);// CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,// 每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。// 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制// 缺点,复杂了很多,多次操作会带来性能的损耗ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();// true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0buf3.addComponents(true, buf1, buf2);print("buf3", buf3);bufferHeap.retain();bufferDirect.retain();bufferHeap.release();bufferDirect.release();print("release bufferHeap", bufferHeap);print("release bufferDirect", bufferDirect);bufferHeap.release();bufferDirect.release();print("release bufferHeap", bufferHeap);print("release bufferDirect", bufferDirect);bufferHeap.release();bufferDirect.release();print("release bufferHeap", bufferHeap);print("release bufferDirect", bufferDirect);}public static void print(String prefix, ByteBuf buffer) {System.out.printf("%s readerIndex : %s writerIndex : %s maxCapacity : %s capacity : %s %n",prefix, buffer.readerIndex(), buffer.writerIndex(), buffer.maxCapacity(), buffer.capacity());}public static void readByte(ByteBuf buffer) {System.out.printf("读取一个字节: %s %n", buffer.readByte());}
}
4. 参考文献
- 黑马 Netty教程
- 拉钩教育 Netty课程 若地老师