Recycler 回收器接口设计
本节接着 ObjectPool 的设计脉络,具体看看其具体实现 RecyclerObjectPool 中引用的 Recycler 究竟是怎么实现的
这一张图基本已经说明白了,我再做个总结,对细节感兴趣的可以看看我下面带源码的注释。
对于 Recycler 回收器:它实现了很多方法和类,但实际上除了持有一些静态参数配置外,它仅持有了一个 FastThreadLocal<LocalPool<T>>
那实际上就是每个线程有一个自己的 LocalPool ,真正分配也是通过调用 LocalPool 的方法。
那从设计的角度来说,在思考要设计一个对象池的时候就要考虑到多线程分配的问题,使用线程本地变量可能就是要优先考虑的问题。
然后实际上我们的对象池就是 LocalPool 对象中实例化的一个队列 Queue,这个 Queue 也比较有意思后续章节会详细讨论一下,这里只需要记住它是一个可以支持多线程入队,单线程消费的队列。
这个队列存储了我们分配出去然后空闲释放的句柄 handle,在之后的分配中会首先尝试在对象池中获取可以分配的句柄,如果没有则尝试创建池化句柄,如果没有达到可创建的间隔会直接创建非池化对象
根据解析篇 1 ObjectPool 和 本篇 Recycler ,已经大致对池的设计有了一个大概的了解
package io.netty.util;import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.ObjectPool;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;import static java.lang.Math.max;
import static java.lang.Math.min;/*** Light-weight object pool based on a thread-local stack.* 这里源码注释说这就是一个 轻量级的对象池,基于本地线程栈实现。以下代码我省去了其中废弃的方法* @param <T> the type of the pooled object*/
public abstract class Recycler<T> {// 这是一个日志工具private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);/*** 定义一个空的 HANDLE 实现,实现 Handle 接口,实现方法不做任何事*/private static final Handle<?> NOOP_HANDLE = new Handle<Object>() {@Overridepublic void recycle(Object object) {// NOOP}@Overridepublic String toString() {return "NOOP_HANDLE";}};// 这里定义了每个线程的初始 最大 容量是 4kprivate static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.// 默认每个线程的最大容量 默认等于上面这个东西 ↑ 4kprivate static final int DEFAULT_MAX_CAPACITY_PER_THREAD;// 一个比率private static final int RATIO;// 默认的每个线程的队列块的大小 默认 32private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;static {// In the future, we might have different maxCapacity for different object types.// e.g. io.netty.recycler.maxCapacity.writeTask// io.netty.recycler.maxCapacity.outboundBuffer// 在未来的版本,我们可能会为不同的对象设置不同的 maxCapacity// 这里先取 maxCapacity 再尝试取 maxCapacityPerThread 优先读 maxCapacityPerThread 的设置int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));// 如果你的配置小于 0 ,会使用默认的 4kif (maxCapacityPerThread < 0) {maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;}// 这里初始化了 默认的每个线程额最大容量,如果不配置的话就是 4k 大小DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;// 获取每个线程的线程块大小,默认 32DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty.recycler.chunkSize", 32);// By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.// This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation// bursts.// 这里写的很抽象,我看了半天没搞懂,大概的感觉是 RATIO 是默认为 8 ,也就是说每 8 次的分配中有 1 次是尝试重用的,// 剩余 7 次都是进行扩容,也就是说,在一定时间的预热后这个池化资源池可能会逼近最大分配值。// 这种缓慢的增长有助于在热点场景不会那么猛烈的申请空间 (大概……缓慢吧)// 仅字面理解,后续代码核实后会订正 (问了 GPT3.5 和 通义千问)// ok 我回来了,这里应该是说这个方法,就是指每 8 次申请才生成一个新的// DefaultHandle<T> newHandle() {// // 看这里创建的逻辑就明白了,这里是每 ratioInterval 次才实际执行一次新建,其他的时候都直接返回 null// // 这里默认每 8 次才初始化一个新的句柄,因为 ratioCounter 初始化为 ratioInterval ,所以第一次会执行新建// if (++ratioCounter >= ratioInterval) {// ratioCounter = 0;// return new DefaultHandle<T>(this);// }// return null;// }RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));// 如果可以打 debug 日志,就打一下配置日志if (logger.isDebugEnabled()) {if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");logger.debug("-Dio.netty.recycler.ratio: disabled");logger.debug("-Dio.netty.recycler.chunkSize: disabled");} else {logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);}}}// 每个线程最大的容量private final int maxCapacityPerThread;// 间隔private final int interval;// 块大小private final int chunkSize;// 这里声明初始化了一个 Netty 自己实现的 FastThreadLocal 是为了在特定情况下提供更高性能的线程局部变量实现。// 它通过使用数组索引而非哈希表来加速变量的访问。但要注意,要充分发挥其优势,需要确保线程是 FastThreadLocalThread 或其子类。// 这里声明了一个 泛型是 LoaclPool 的 FastThreadLoal,保证每个线程都只有一个自己的 LoaclPoolprivate final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() {// 每个线程在首次访问 threadLocal 的时候会调用此方法进行初始化@Overrideprotected LocalPool<T> initialValue() {return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize);}// 当线程结束或者 FastThreadLocal 被移除时会调用此方法@Overrideprotected void onRemoval(LocalPool<T> value) throws Exception {super.onRemoval(value);value.pooledHandles.clear();}};// 无参构造函数,是一个构造函数的重载protected Recycler() {// 4kthis(DEFAULT_MAX_CAPACITY_PER_THREAD);}// 有参构造,传入 maxCapacityPerThreadprotected Recycler(int maxCapacityPerThread) {// 默认的 4k 8 32this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);}// 实际实现 3 参数,这里初始化了上述几个变量protected Recycler(int maxCapacityPerThread, int ratio, int chunkSize) {// 间隔初始化为 ratio 默认 8interval = max(0, ratio);if (maxCapacityPerThread <= 0) {this.maxCapacityPerThread = 0;this.chunkSize = 0;} else {// 默认 4kthis.maxCapacityPerThread = max(4, maxCapacityPerThread);// 默认 32this.chunkSize = max(2, min(chunkSize, this.maxCapacityPerThread >> 1));}}/*** 这个方法就是真正的分配方法了**/@SuppressWarnings("unchecked")public final T get() {// 如果你设置的每个线程的最大容量是 0 ,则每次都会新创建对象,并且不需要 HANDLE 回收处理if (maxCapacityPerThread == 0) {// 所以这里大概猜想,为了保持代码的一致性,就声明了一个 NOOP_HANDLE 来做此类不需要入池回收的对象创建return newObject((Handle<T>) NOOP_HANDLE);}// 本地池,从 threadLocal 中获取当前线程的 LocalPoolLocalPool<T> localPool = threadLocal.get();// 从本地池获取一个 handle 这个方法有大逻辑,先仅表面瞅一眼DefaultHandle<T> handle = localPool.claim();T obj;if (handle == null) {// 如果 handle 是空则会通过 localPool 新建一个 handle, handle 会持有此 localPool 的引用handle = localPool.newHandle();if (handle != null) {// 这里如果新建 handle 成功则通过 newObject 方法传入新建的 handle 新建池化对象,// 也就是说这个新建的对象是可以通过 handle 回收池化利用的obj = newObject(handle);handle.set(obj);} else {// 新建不了一点的情况,跟没容量一样,直接新建一个不需要池化的对象obj = newObject((Handle<T>) NOOP_HANDLE);}} else {// 如果直接从池子里拿到可重用的 handle 对象了,则直接 get 获取,// 这里 get 就直接返回了 handle 持有的 value 也就是之前新建分配出去过的对象obj = handle.get();}return obj;}// 获取 threadLocal 的大小final int threadLocalSize() {// 这里很有意思,获取的但是 LocalPool 的 pooledHandles 的大小// 池化 handles 从这里的调用看也可以了解到,LocalPool 是持有一个 handle 池的// 那他实际上是个队列 Queue<DefaultHandle<T>>return threadLocal.get().pooledHandles.size();}// 抽象方法 newObject ,这里联系一下 ObjectPool ,实际的实现是通过调用 ObjectCreator 的 newObject 方法,// 所以这里看起来做了一个抽象的方法 newObject 做各类实现,// 但实际上只用这个做区别化实现就需要有很多类继承 Recycler , Recycler 又是一个逻辑比较多的类,而且有时候继承不是很方便的使用方式// 而组合更加灵活,所以他搞了一个 ObjectCreator 的接口抽象,在实例化 RecyclerObjectPool 的时候传入 ObjectCreator 来实现 newObject// 这里实例化 ObjectPool 的时候隐式实现一下这个接口就可以,就会比较方便 (个人理解)protected abstract T newObject(Handle<T> handle);// 这里又定义了一个 Handle 继承自 ObjectPool 的 Handle ,还有个注释 不能改变这个定义 。 估计是有啥历史原因,先不妄加揣测@SuppressWarnings("ClassNameSameAsAncestorName") // Can't change this due to compatibility.public interface Handle<T> extends ObjectPool.Handle<T> { }// 默认的 Handle 实现,基本上他池子里实例化的都是这个private static final class DefaultHandle<T> implements Handle<T> {// 状态常量,已被分配的private static final int STATE_CLAIMED = 0;// 状态常量,可用的private static final int STATE_AVAILABLE = 1;// 原子 Integer 字段更新器,泛型给到了 DefaultHandle 用于状态更新 CAS 保证多线程环境下的原子更新private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;static {// 静态代码块初始化了 STATE_UPDATER,指定对 DefaultHandle 类的 state 字段可进行原子更新AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");//noinspection uncheckedSTATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;}// Handle 的状态字段,是通过 STATE_UPDATER 来进行更新的,用于判断是否可以被分配@SuppressWarnings({"FieldMayBeFinal", "unused"}) // Updated by STATE_UPDATER.private volatile int state; // State is initialised to STATE_CLAIMED (aka. 0) so they can be released.// 这里定义了一个本地线程池,从之前的逻辑看,在一个 Handle 实例创建的时候会引用当前线程的 LocalPool,这里应该是方便后续释放private final LocalPool<T> localPool;// value 实际被 new 出来的池化对象,也就是我们池化复用的实际对象,我们所聊的就是 ByteBuf 的实例化对象引用private T value;// 构造方法DefaultHandle(LocalPool<T> localPool) {this.localPool = localPool;}// 回收方法@Overridepublic void recycle(Object object) {// 校验你要回收的对象是不是你持有的if (object != value) {throw new IllegalArgumentException("object does not belong to handle");}// 直接调用了 LocalPool 的 release 方法,并把当前的 Handle 对象传了进去localPool.release(this);}// get 方法返回实例化对象T get() {return value;}void set(T value) {this.value = value;}// 判断是否可以分配,如果可以分配会更新状态 为已分配boolean availableToClaim() {if (state != STATE_AVAILABLE) {return false;}return STATE_UPDATER.compareAndSet(this, STATE_AVAILABLE, STATE_CLAIMED);}// 更新状态为可用,如果本身已经可用会抛出异常void toAvailable() {int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);if (prev == STATE_AVAILABLE) {throw new IllegalStateException("Object has been recycled already.");}}}// 关键的 本地的池子 LocalPool ,每个线程持有一个,再次提醒 T 泛型是你池化资源的类型,如我们这里就是 ByteBufprivate static final class LocalPool<T> {// 比率间隔,比率间隔?? 又是啥东西,奇怪的命名 同 ratio 和 intervalprivate final int ratioInterval;// Handles 队列池,从这个接口的设计上看,这个 Queue 应该就是池化的本质了,就是池本池,// 所以你如果问我池化实现的本质是什么,我应该会回答,本质就是一个 Handle 队列private final Queue<DefaultHandle<T>> pooledHandles;// 比率计数器?? 先打个问号,后面懂了来补充 应该是控制生产速率的private int ratioCounter;// 构造器,向调用方追溯在 FastThreadLocal 的初始化方法中是这样传入的 maxCapacityPerThread 4k, interval 8, chunkSize 32LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {this.ratioInterval = ratioInterval;// 你看这里,队列是通过 PlatformDependent.newMpscQueue ,这个东西也没见过,有意思了// 我查了查,这个东西叫 MPSC 队列,是一个多生产者单消费者队列的一种实现(这看起来又是一种设计思想),// 那在这种场景很明显,Netty 的意图是想构建一个适用于一个或多个线程生产数据,而只有一个线程消费数据的情况。// 在对象池的场景中,生产者线程负责将句柄放入队列,而消费者线程负责从队列中取出句柄进行对象的分配和回收。// 这样的设计能够有效地在多线程环境中管理对象的生命周期。pooledHandles = PlatformDependent.newMpscQueue(chunkSize, maxCapacity);ratioCounter = ratioInterval; // Start at interval so the first one will be recycled.}// claim 这个词这里应该是用作索取的意思,索取一个 DefaultHandleDefaultHandle<T> claim() {// 这里给临时变量赋值为 pooledHandlesQueue<DefaultHandle<T>> pooledHandles = this.pooledHandles;DefaultHandle<T> handle;do {handle = pooledHandles.poll();// 这里会先分配,如果为 null 则直接跳出循环返回 null// 如果不为空,且当前的 handle 不是可分配的状态则进行循环继续取下一个} while (handle != null && !handle.availableToClaim());return handle;}// 释放方法,这里实际是对 handle 进行操作void release(DefaultHandle<T> handle) {// 首先修改传入的 handle 为可用状态handle.toAvailable();// 将 handle 入队pooledHandles.offer(handle);}// 新建一个 handleDefaultHandle<T> newHandle() {// 看这里创建的逻辑就明白了,这里是每 ratioInterval 次才实际执行一次新建,其他的时候都直接返回 null// 这里默认每 8 次才初始化一个新的句柄,因为 ratioCounter 初始化为 ratioInterval ,所以第一次会执行新建if (++ratioCounter >= ratioInterval) {ratioCounter = 0;return new DefaultHandle<T>(this);}return null;}}
}
到了 Recycler 的设计上看起来就没有那么抽象了,主要的就是一个线程本地变量的控制,以及 LocalPool 对象的池化资源管理。
其实到此为止,并没有实现任何的创建对象的逻辑,所有创建对象逻辑均由 ObjectCreator 接口的实现来控制,这个是灵活控制的,它的体现会在梳理 PooledByteBufAllocator 的整体脉络时得到体现。