文章目录
- 1. 概要
- 2. 参数
- 3. 构造器
- 4. 回收
- 5. 启动时间轮 - start
- 6. 停止时间轮 - stop
- 7. 添加任务
- 8. 工作线程 - Worker
- 8.1 线程参数
- 8.2 核心逻辑-run
- 8.3 指针跳动到下一个tick
- 8.4 处理要取消的任务
- 8.5 把新增的任务加入时间轮
- 8.6 执行过期任务
- 9. HashedWheelTimeout
- 9.1 属性
- 9.2 任务取消
- 9.3 任务移除
- 9.4 执行过期任务
- 9.5 TimerTask
- 10. HashedWheelBucket
- 10.1 添加任务节点
- 10.2 删除任务节点
- 10.3 清空整个链表
- 11. 小结
1. 概要
时间轮的文章:
- 定时/延时任务-Netty时间轮的使用
- 定时/延时任务-时间轮
- 定时/延时任务-实现一个简单时间轮
- 定时/延时任务-实现一个分层时间轮
上一篇文章中介绍了分层时间轮的实现,到此手写时间轮的文章就写完了,下面就要开始分析框架里面的时间轮源码了,这篇文章就从 Netty 时间轮开始进行分析
2. 参数
首先还是先看下参数,由于 Netty 实现的是简单时间轮,所以说参数也是简单时间轮相关的,可以在上面的文章中去看时间轮的例子介绍。
/*** 时间轮实例个数*/
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();/*** 在服务过程中,时间轮实例个数不能超过64个*/
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
private static final int INSTANCE_COUNT_LIMIT = 64;
时间轮个数,这是一个全局的静态参数,意思就是如果 Netty 的时间轮创建的个数到了 64 个,那么就会输出错误日志,其实也是为了提高效率,时间轮个数肯定不能太多的,否则每一个时间轮都有一个任务线程去执行的话,执行时间轮的线程数就太多了,资源会倾斜到时间轮这边。要知道 Netty 的核心还是接收请求处理请求那块。
/*** 刻度持续时最小值,不能小于这个最小值*/
private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
下一个是刻度最小值,时间轮的时间间隔不能小于 1ms
/*** 内存泄漏检测*/
private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance().newResourceLeakDetector(HashedWheelTimer.class, 1);
/*** 内存泄漏检测虚引用*/
private final ResourceLeakTracker<HashedWheelTimer> leak;
内存泄露检测,Netty 中会使用自定义的内存泄露检测逻辑去检测对象有没有被回收,如果时间轮对象没用被使用者回收干净,经过 Netty 的内存泄露检测,就会输出内存泄露的日志,Netty 的内存泄露的知识点还是挺多的。
/*** 原子性更新时间轮工作状态,防止多线程重复操作,时间轮当前所处状态,可选值有 init、started、shutdown*/private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
下面是一个原子更新类,去更新时间轮的状态的,其实就是这个类里面的 workerState
属性
/*** 工作线程包装*/private final Worker worker = new Worker();
/*** 时间轮工作线程*/private final Thread workerThread;
工作线程包装,专门去轮询时间轮来执行过期任务的,下面是里面的工作线程
/*** 时间轮的3种工作状态分别为初始化,已经启动正在运行,停止*/
public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
然后就是时间轮的三种状态,初始化,启动和停止
/*** 每刻度的持续时间*/
private final long tickDuration;
每刻度的持续时间,也就是指针多长时间跳动一次
/*** 时间轮格子数组*/
private final HashedWheelBucket[] wheel;/*** 时间轮总格子数 -1*/
private final int mask;
上面是时间轮格子和时间轮格子总数 - 1,为什么要 - 1 呢?因为对于 Netty 这种把性能最求到极致的框架,&
运算一定是要比 %
要快,所以 wheel
数组的长度一定是 2^n
次方,mask = 2^n - 1,就意味着 mask 的二进制是 00...00 111...111
,所以这时候可以直接用 &
求出下标,性能更高。
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
startTimeInitialized 添加任务的时候用来阻塞当前线程,当 startTime 设置之后才可以继续添加任务。
/*** task任务队列,放任务的时候先将任务放入到这个队列中,再由Worker线程从队列中取出并放入wheel[]时间轮的链表中*/
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
timeouts 任务队列,添加任务的时候会先添加到任务队列中,然后再由工作线程从任务队列中取出任务加入时间轮。
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
当任务被取消的时候会加入这个已取消队列,工作线程在遍历的时候就会去处理这个队列,把这里面已取消的任务移除掉。
/*** 需要等待执行的任务数*/
private final AtomicLong pendingTimeouts = new AtomicLong(0);/*** 时间轮最多容纳多少定时检测任务,默认为-1,无限制*/
private final long maxPendingTimeouts;
private final Executor taskExecutor;
/*** 时间轮启动时间*/
private volatile long startTime;
等待执行的任务数,添加任务的时候数量 + 1,maxPendingTimeouts 表示时间轮最多能容纳多少任务,taskExecutor 是执行任务的线程池,不可能用工作线程来执行任务的,否则任务执行时间长一点就会导致工作线程阻塞。
3. 构造器
时间轮中提供了多个构造器,不过这里我们只看最后一个构造器,也是最终的逻辑。
/**
* 创建一个时间轮* @param threadFactory 用来创建worker线程* @param tickDuration tick的时长,也就是指针多久转一格* @param unit tickDuration的时间单位* @param ticksPerWheel 一圈有几格* @param leakDetection 是否开启内存泄露检测* @param maxPendingTimeouts 时间轮可接受最大定时检测任务数* @param taskExecutor Executor*/
public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,long maxPendingTimeouts, Executor taskExecutor) {// 检测不为空checkNotNull(threadFactory, "threadFactory");checkNotNull(unit, "unit");checkPositive(tickDuration, "tickDuration");checkPositive(ticksPerWheel, "ticksPerWheel");this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");// 创建时间轮基本的数据结构,一个数组。长度为不小于ticksPerWheel的最小2的n次方,和HashMap处理方式一样wheel = createWheel(ticksPerWheel);// 一个标示符,用来快速计算任务应该呆的格子。// 我们知道,给定一个deadline的定时任务,其应该呆的格子=deadline%wheel.length.但是%操作是个相对耗时的操作,所以使用一种变通的位运算代替:// 因为一圈的长度为2的n次方,mask = 2^n-1后低位将全部是1,然后deadline&mast == deadline%wheel.length// java中的HashMap也是使用这种处理方法mask = wheel.length - 1;// 转换成纳秒处理long duration = unit.toNanos(tickDuration);// 校验是否存在溢出。即指针转动的时间间隔不能超过 Long.MAX_VALUE / wheel.length// 为什么是除法?这里除法也是为了防止乘法溢出if (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}// 当然时间也不能太短了,起码不能小于1msif (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}// 创建worker线程workerThread = threadFactory.newThread(worker);// 这里默认是启动内存泄露检测:当HashedWheelTimer实例超过当前cpu可用核数*4的时候,将发出警告leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;// 最大定时检测任务个数this.maxPendingTimeouts = maxPendingTimeouts;// INSTANCE_COUNT_LIMIT 默认为64 , 时间轮实例个数检测,超过64个会告警if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}
}
首先检查参数
checkNotNull(threadFactory, "threadFactory");
checkNotNull(unit, "unit");
checkPositive(tickDuration, "tickDuration");
checkPositive(ticksPerWheel, "ticksPerWheel");
this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
注意如果没用设置 taskExecutor,也就是没用设置线程池,那么 Netty 会默认使用 ImmediateExecutor.INSTANCE
,执行任务的时候会直接通过当前线程调用任务的 run 方法,对于其他参数的默认值:
- tickDuration:100
- unit:ms
- ticksPerWheel:512
- leakDetection:true(开启内存泄露检测)
然后创建时间轮和设置标记
// 创建时间轮基本的数据结构,一个数组。长度为不小于ticksPerWheel的最小2的n次方,和HashMap处理方式一样
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
我们可以来看下如何计算 2^n 次方的
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {// 初始化ticksPerWheel的值为不小于ticksPerWheel的最小2的n次方ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);// 初始化wheel数组HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];for (int i = 0; i < wheel.length; i ++) {wheel[i] = new HashedWheelBucket();}return wheel;
}public static int findNextPositivePowerOfTwo(final int value) {assert value > Integer.MIN_VALUE && value < 0x40000000;return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
}
这里就是用 JDK 的方法来计算不小于 ticksPerWheel 的 2^n 次方,其实这里如果感兴趣可以去看下 Netty 的 issue,这个方法一开始是直接遍历循环,然后用 << 来计算的,比如
int num = 1, i = 1;
while(num < ticksPerWheel){num |= (1 << i);i++;
}
但是这种方式求,如果 ticksPerWheel 很大,有可能要遍历20~30 次,所以就改用了这个方法,效率会更高。
继续看构造器源码,下面转换成纳秒,Netty 是支持不同时间单位的。
long duration = unit.toNanos(tickDuration);
接着继续判断时间间隔是否合法,也就是说 时间间隔 * 时间轮大小 <= Integer.MAX_VALUE?
如果不符合条件,其实就是不合法,意思就是时间轮的整体时间间隔不能超过 Integer.MAX_VALUE。
if (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}
上面的除法是为了相乘的时候溢出,但是时间间隔也不能小于 1ms
// 当然时间也不能太短了,起码不能小于1ms
if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;
} else {this.tickDuration = duration;
}
最后就是创建工作线程和内存泄露等相关的参数
// 创建worker线程
workerThread = threadFactory.newThread(worker);
// 这里默认是启动内存泄露检测
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
// 最大定时检测任务个数this.maxPendingTimeouts = maxPendingTimeouts;
最后如果一个 Netty 里面的时间轮个数 > 64,会输出错误日志
// INSTANCE_COUNT_LIMIT 默认为64 , 时间轮实例个数检测,超过64个会告警
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();
}
private static void reportTooManyInstances() {if (logger.isErrorEnabled()) {String resourceType = simpleClassName(HashedWheelTimer.class);logger.error("You are creating too many " + resourceType + " instances. " +resourceType + " is a shared resource that must be reused across the JVM, " +"so that only a few instances are created.");}
}
这里就是全部流程了
4. 回收
@Override
protected void finalize() throws Throwable {try {super.finalize();} finally {// This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If// we have not yet shutdown then we want to make sure we decrement the active instance count.if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {INSTANCE_COUNTER.decrementAndGet();}}
}
当时间轮被回收之后就会调用这个方法设置时间轮的实例个数 - 1
5. 启动时间轮 - start
/**
* 启动时间轮,添加任务的时候会调用启动,不需要显式调用
*/
public void start() {// 判断状态switch (WORKER_STATE_UPDATER.get(this)) {// 当时间轮处于初始化状态时,启动case WORKER_STATE_INIT:// CAS 设置时间轮状态if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {// 启动工作线程workerThread.start();}break;case WORKER_STATE_STARTED:// 已经启动了break;case WORKER_STATE_SHUTDOWN:// 时间轮停止throw new IllegalStateException("cannot be started once stopped");default:// 非法throw new Error("Invalid WorkerState");}// 等待worker线程初始化时间轮的启动时间while (startTime == 0) {try {startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}}
}
时间轮启动的时候会先设置下状态,然后直接阻塞。这个方法会在添加任务的时候被调用,也就是说如果没有任务添加到时间轮,时间轮工作线程就不会启动。
状态设置完之后会阻塞等待,因为此时启动时间还没有设置,如果往里面添加任务,会导致比如一个 1s 的任务,添加的时候 currentTime - startTime = currentTime,这样结果就不对了。
那为什么 Netty 要把 startTime 丢到工作线程里面去初始化呢?
- 大胆猜测,可能这就是 Netty 的策略吧,工作线程启动的时候设置启动时间,代表时间轮和工作线程一起启动,有知道原因的朋友可以说下
6. 停止时间轮 - stop
下面是 stop 方法的流程,我们知道 stop 方法里面暂停时间轮之后工作线程肯定也要暂停运行,那么如何调度线程之间的执行顺序,同时让工作线程暂停之后返回未处理的任务呢?下面就来进行 stop 的源码分析。
@Override
public Set<Timeout> stop() {// worker线程不能停止时间轮,也就是防止在定时任务里面把时间轮停了造成其他方法执行不了if (Thread.currentThread() == workerThread) {throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() +".stop() cannot be called from " +TimerTask.class.getSimpleName());}// CAS 设置时间轮状态, 从 1 -> 2if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {// workerState can be 0 or 2 at this moment - let it always be 2.// 这个地方有可能是 0 或者 2, 0 是还没启动就停止,2 是并发环境下设置失败if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {// 再次设置为 WORKER_STATE_SHUTDOWN// 设置成功就让时间轮个数 - 1INSTANCE_COUNTER.decrementAndGet();// 关闭内存泄露检测if (leak != null) {boolean closed = leak.close(this);assert closed;}}// 返回一个空集合// 1.如果是还没启动,当然没用任务了// 2.如果是并发 CAS 失败,那就说明任务已经被其他线程返回了return Collections.emptySet();}try {// 是否打断boolean interrupted = false;// 如果工作线程还活着,因为工作线程是会阻塞到下一个 tick 的,所以这里就是在判断线程是不是还在阻塞中// 当工作线程被打断从死循环中退出,就会退出 while 循环while (workerThread.isAlive()) {// 打断工作线程,只要工作线程后面执行 waitForNextTick,就会抛出异常,然后退出 while 循环,从而结束运行workerThread.interrupt();try {// 当前线程等待工作线程执行 100 msworkerThread.join(100);} catch (InterruptedException ignored) {// 如果当前线程被中断,设置中断标记interrupted = true;}}// 如果当前线程在等待工作线程完成时被中断,interrupted 会被设置为 true,为了避免中断状态丢失,需要在最后恢复中断状态if (interrupted) {Thread.currentThread().interrupt();}} finally {// 减少时间轮实例个数INSTANCE_COUNTER.decrementAndGet();if (leak != null) {// 关闭内存泄露检测boolean closed = leak.close(this);assert closed;}}// 返回未处理的任务return worker.unprocessedTimeouts();
}
来看下整个流程,首先判断下暂停时间轮的线程是不是当前线程,如果是当前线程,那么就不允许暂停时间轮,避免定时任务把时间轮给搞停止了。但是其实你如果用线程池去执行任务的话,这个好像也防不了。
// worker线程不能停止时间轮,也就是防止在定时任务里面把时间轮停了造成其他方法执行不了
if (Thread.currentThread() == workerThread) {throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() +".stop() cannot be called from " +TimerTask.class.getSimpleName());
}
然后就是设置时间轮状态从 WORKER_STATE_STARTED
变为 WORKER_STATE_SHUTDOWN
,如果失败,注意这里失败有两种情况:
- 时间轮状态是 WORKER_STATE_INIT
- 线程并发竞争激烈,CAS 修改失败
失败的情况下,会再次尝试设置状态为 WORKER_STATE_SHUTDOWN
,如果设置成功就让时间轮实例个数 - 1,表示当前时间轮被关闭了,同时关闭内存泄露。接着返回一个空集合,这个集合就是没有执行的任务。
- 如果是第一种情况,时间轮还没启动,当然就没用任务了。
- 如果是第二种情况,这里线程 CAS 竞争失败,那么其他线程会返回失败的任务去执行,当前线程就不用操心那些任务了。
// CAS 设置时间轮状态, 从 1 -> 2
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {// workerState can be 0 or 2 at this moment - let it always be 2.// 这个地方有可能是 0 或者 2, 0 是还没启动就停止,2 是并发环境下设置失败if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {// 再次设置为 WORKER_STATE_SHUTDOWN// 设置成功就让时间轮个数 - 1INSTANCE_COUNTER.decrementAndGet();// 关闭内存泄露检测if (leak != null) {boolean closed = leak.close(this);assert closed;}}// 返回一个空集合// 1.如果是还没启动,当然没用任务了// 2.如果是并发 CAS 失败,那就说明任务已经被其他线程返回了return Collections.emptySet();
}
最后既然要暂停执行了,我们知道工作线程会不断调用 waitForNextTick
阻塞到下一个时间点然后开始执行定时任务,所以这里就判断下如果工作线程还或者,也就是还没有退出循环,就打断,打断之后,只要工作线程执行 waitForNextTick
阻塞,就会立刻抛出异常接着进而从 while
循环中退出,然后去把未完成的任务放到 unprocessedTimeouts
队列中。
这时候调用 stop
的线程会去等待工作线程执行 100ms
,如果当前工作线程被中断,就会设置一个中断标记。然后根据这个中断标记来防止中断状态丢失,最后返回未处理的任务。
try {// 是否打断boolean interrupted = false;// 如果工作线程还活着,因为工作线程是会阻塞到下一个 tick 的,所以这里就是在判断线程是不是还在阻塞中// 当工作线程被打断从死循环中退出,就会退出 while 循环while (workerThread.isAlive()) {// 打断工作线程,只要工作线程后面执行 waitForNextTick,就会抛出异常,然后退出 while 循环,从而结束运行workerThread.interrupt();try {// 当前线程等待工作线程执行 100 msworkerThread.join(100);} catch (InterruptedException ignored) {// 如果当前线程被中断,设置中断标记interrupted = true;}}// 如果当前线程在等待工作线程完成时被中断,interrupted 会被设置为 true,为了避免中断状态丢失,需要在最后恢复中断状态if (interrupted) {Thread.currentThread().interrupt();}
} finally {// 减少时间轮实例个数INSTANCE_COUNTER.decrementAndGet();if (leak != null) {// 关闭内存泄露检测boolean closed = leak.close(this);assert closed;}
}
// 返回未处理的任务
return worker.unprocessedTimeouts();
7. 添加任务
好了,现在就到添加任务的流程了,添加任务的流程很简单,其实就是把任务添加到任务队列中,等待工作线程从任务队列里面把任务拿出来添加到时间轮中。
/**
* 添加一个任务* @param task 任务* @param delay 延时时间* @param unit 时间单位* @return*/
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {// 任务和时间单位不能为空checkNotNull(task, "task");checkNotNull(unit, "unit");// 需等待执行任务数+ 1, 同时判断是否超过最大限制long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}// 若时间轮Worker线程未启动,则需要启动start();// Add the timeout to the timeout queue which will be processed on the next tick.// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.// 根据定时任务延时执行时间与时间轮启动时间,获取相对的时间轮开始后的任务执行延时时间long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;// Guard against overflow.if (delay > 0 && deadline < 0) {// 小于0其实就是Long溢出了deadline = Long.MAX_VALUE;}// 这里定时任务不是直接加到对应的格子中,而是先加入到一个队列timeouts里,然后Worker线程等到下一个tick的时候,// 会从队列里取出最多100000个任务加入到指定的wheel数组下标格子中HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);// 添加到队列里面timeouts.add(timeout);return timeout;
}
下面来看下流程,首先先去检查下参数,然后设置需要等待添加的任务数,接下来就是判断如果需要等待添加的任务个数超过了 maxPendingTimeouts
,超过就抛异常不能往里面添加任务了,其实你要是不设置这个参数,默认就是 -1,表示没有上限。
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}
下面如果时间轮还没有启动,那么就启动时间轮,所以可以看到不往里面添加任务的话时间轮是不会启动的,就是懒启动。里面会启动工作线程设置 startTime
。
// 若时间轮Worker线程未启动,则需要启动
start();/*** 启动时间轮,添加任务的时候会调用启动,不需要显式调用*/
public void start() {// 判断状态switch (WORKER_STATE_UPDATER.get(this)) {// 当时间轮处于初始化状态时,启动case WORKER_STATE_INIT:// CAS 设置时间轮状态if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {// 启动工作线程workerThread.start();}break;case WORKER_STATE_STARTED:// 已经启动了break;case WORKER_STATE_SHUTDOWN:// 时间轮停止throw new IllegalStateException("cannot be started once stopped");default:// 非法throw new Error("Invalid WorkerState");}// 等待worker线程初始化时间轮的启动时间while (startTime == 0) {try {startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}}
}
逻辑就不多细说了,其实启动的代码就是先设置下时间轮的状态,然后就阻塞等待工作线程启动,工作线程启动的时候会把这个 startTime
设置成当前时间,然后再调用 startTimeInitialized.countDown
,就是为了时间轮启动之后再往里面去添加任务。还是那句话,如果先添加任务再启动时间轮,那么这个 startTime = 0,最后任务的圈数就不是 (currentTime - startTime) / interval 了
,这样会导致任务迟迟都执行不了。
继续看下面逻辑,启动时间轮之后需要计算出当前任务的执行时间和时间轮启动时间的相对时间,就是任务延时多久执行,因为添加任务的时候时间轮已经启动很久了,所以肯定不是简单用 delay
作为截止时间来计算的。
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
最后就是把任务添加到任务队列里面
// 这里定时任务不是直接加到对应的格子中,而是先加入到一个队列timeouts里,然后Worker线程等到下一个tick的时候,// 会从队列里取出最多100000个任务加入到指定的wheel数组下标格子中HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);// 添加到队列里面timeouts.add(timeout);return timeout;
8. 工作线程 - Worker
8.1 线程参数
首先看下工作线程的两个参数:
// 调用了stop()方法之后,会从这里面返回未执行完的任务
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
// 时钟指针的跳动次数
private long tick;
看注解就行了,很好理解,主要还是下面的工作线程的核心逻辑。
8.2 核心逻辑-run
@Override
public void run() {// 时间轮启动的时间startTime = System.nanoTime();if (startTime == 0) {// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.startTime = 1;}// Notify the other threads waiting for the initialization at start().// Worker线程初始化了,通知调用时间轮启动的线程startTimeInitialized.countDown();do {// 获取下一刻度时间轮总体的执行时间,记录这个时间和时间轮启动时间大于当前时间时,线程会睡眠到这个时间点final long deadline = waitForNextTick();if (deadline > 0) {// 大于0就是到了下一个tick了,求刻度的编号,就是时间轮数组下标int idx = (int) (tick & mask);// 先处理要取消的任务processCancelledTasks();// 获取刻度所在的缓存链表HashedWheelBucket bucket = wheel[idx];// 把新增的定时任务加入wheel数组的缓存链表中transferTimeoutsToBuckets();// 循环执行刻度所在的缓存刻度bucket.expireTimeouts(deadline);// 指针跳动+1tick++;}// 时间轮状态是已启动,才能在while里面循环} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// Fill the unprocessedTimeouts so we can return them from stop() method.for (HashedWheelBucket bucket: wheel) {// 运行到这里说明时间轮停止了,也就是调用了stop()方法,需要把未处理的任务返回bucket.clearTimeouts(unprocessedTimeouts);}for (;;) {// 这里面是刚刚加入还没来得及加入时间轮的任务,也需要拿出来放到unprocessedTimeouts里面一起返回HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}// 处理需要取消的任务processCancelledTasks();
}
首先就是初始化时间轮的线程了,初始化之后会让添加任务的线程继续往下走,添加任务。
// 时间轮启动的时间
startTime = System.nanoTime();
if (startTime == 0) {startTime = 1;
}
// Worker线程初始化了,通知调用时间轮启动的线程
startTimeInitialized.countDown();
下面就是核心逻辑了,我们知道时间轮通过指针跳动来读取对应下标的延时任务,但是指针如何模拟跳动的行为呢?就是通过 sleep 来阻塞了,比如当前时间是 0
,那么就 sleep(20) 到下一个 tick,再执行对应的时间格子上面的过期任务。
- 如果线程醒来了,就开始执行对应时间格上面的任务
- 首先处理要取消的任务
- 然后把新增的定时任务加入时间轮中
- 接着执行对应时间格子上面过期的任务
- 最后让指针 + 1
do {
// 获取下一刻度时间轮总体的执行时间,记录这个时间和时间轮启动时间大于当前时间时,线程会睡眠到这个时间点final long deadline = waitForNextTick();if (deadline > 0) {// 大于0就是到了下一个tick了,求刻度的编号,就是时间轮数组下标int idx = (int) (tick & mask);// 先处理要取消的任务processCancelledTasks();// 获取刻度所在的缓存链表HashedWheelBucket bucket = wheel[idx];// 把新增的定时任务加入wheel数组的缓存链表中transferTimeoutsToBuckets();// 循环执行刻度所在的缓存刻度bucket.expireTimeouts(deadline);// 指针跳动+1tick++;}// 时间轮状态是已启动,才能在while里面循环
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
当时间轮暂停后,会把所有还没有来得及处理的任务加入 unprocessedTimeouts
中,这里面没有来得及处理的任务包括
- 时间轮上面的任务
- 刚加入时间轮任务队列里面的任务
最后处理已经取消了的任务,所位的处理就是从时间轮中移除掉
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {// 运行到这里说明时间轮停止了,也就是调用了stop()方法,需要把未处理的任务返回bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {// 这里面是刚刚加入还没来得及加入时间轮的任务,也需要拿出来放到unprocessedTimeouts里面一起返回HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}
}
// 处理需要取消的任务
processCancelledTasks();
8.3 指针跳动到下一个tick
那么我们来看下指针式如何跳动到下一个 tick 的,先看下全部的代码:
private long waitForNextTick() {// 获取下一刻度时间轮总体的执行时间long deadline = tickDuration * (tick + 1);for (;;) {// 当前时间 - 启动时间final long currentTime = System.nanoTime() - startTime;// 计算需要睡眠的毫秒时间,这里加上 999999,意思就是说睡眠时间最少都得是 1mslong sleepTimeMs = (deadline - currentTime + 999999) / 1000000;// 当睡眠时间小于0,且等于Long.MiN_VALUE时,直跳过此刻度,否则不睡眠,直接执行任务if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {// 返回时间轮执行了多长时间return currentTime;}}// Check if we run on windows, as if thats the case we will need// to round the sleepTime as workaround for a bug that only affect// the JVM if it runs on windows.//// See https://github.com/netty/netty/issues/356// Window 操作系统特殊处理, 其Sleep函数是以10ms 为单位进行延时的// 也就是说,所有小于10且大于0的情况都是10ms, 所有大于 10且小于20的情况都是20ms , 因此这里做了特殊的处理, 对于小于10ms 的,直接不睡眠。 对于 大于 10ms的,去掉层尾数if (PlatformDependent.isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;if (sleepTimeMs == 0) {sleepTimeMs = 1;}}try {Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {// 工作线程被打断的时候,如果时间轮已经关闭了,直接返回 Long.MIN_VALUE,外层就可以退出 while 循环了if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}
}
首先通过 tickDuration * (tick + 1)
求出下一个时间格指针要执行到哪个时间点,然后求出执行时间距离当前时间还有多久:
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
这里因为是以纳秒为单位,但是实际上如果延时时间是 1 纳秒,最后要求出来的睡眠时间最少也是 1ms。
下面继续判断,如果不需要睡眠,就是说当前时间已经到下一刻指针的时间了,返回当前时间。如果 currentTime == Long.MIN_VALUE
,其实这种是比较极端的情况了,一般不会走到这里。
if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {// 返回时间轮执行了多长时间return currentTime;}
}
否则就是没到点,需要阻塞等待,下面就会判断下如果是 windows 系统
,那么对睡眠时间做了处理,让睡眠时间变成 10 的倍数,其他情况如果睡眠时间 < 10ms,就统统睡眠 1ms。
if (PlatformDependent.isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;if (sleepTimeMs == 0) {sleepTimeMs = 1;}
}
为什么 Netty 要这么处理呢?其实和一个 issue 有关,https://github.com/netty/netty/issues/356
,在 windows xp 系统情况下,如果长时间睡眠类似 16ms、17ms 这种不是 10 的倍数的时间,就会导致 window 的时钟不准确,这个 issue 里面其实有提到,如果循环 sleep 一两个小时,windows 的时钟就会延时 1 个多小时,这也算是 windows 的一个 bug 了,所以这里需要转换成 10 的倍数。
最后再睡眠,如果这时候被打断了,其实就是时间轮关闭的时候会打断睡眠的线程,这时候返回 Long.MIN_VALUE
,在外层去检测,然后退出 while 循环,接着处理取消逻辑。
try {Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {// 工作线程被打断的时候,如果时间轮已经关闭了,直接返回 Long.MIN_VALUE,外层就可以退出 while 循环了if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}
}
8.4 处理要取消的任务
/**
* 处理要取消的任务,就是从时间轮移除掉
*/
private void processCancelledTasks() {for (;;) {// 从cancel任务队列里面获取取消的任务HashedWheelTimeout timeout = cancelledTimeouts.poll();if (timeout == null) {// 所有的任务都处理完了break;}try {timeout.remove();} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while process a cancellation task", t);}}}
}
这里的取消任务其实就是从取消的任务队列里面拿出任务,把这些任务从时间轮中移除掉。
8.5 把新增的任务加入时间轮
新增任务的时候首先会加入任务队列中,由工作线程从任务队列中取出来加入时间轮。
private void transferTimeoutsToBuckets() {// adds new timeouts in a loop.// 每次tick只处理10w个任务,避免 worker 线程阻塞for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}// timeout队列已经被取消了if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {// Was cancelled in the meantime.continue;}// calculated = tick 次数,这里求出来到这个任务的过期时间要经过多少个ticklong calculated = timeout.deadline / tickDuration;// 还有多少圈才可以执行timeout.remainingRounds = (calculated - tick) / wheel.length;// 如果 tick > calculated,意思就是当前时间超过了任务执行的时间,这时候会放到当前 tick 对应的时间格子下面final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.// 算出这个 ticks 对应的时间格子,也就是哪个下标int stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];// 将timeout加入到bucket链表中bucket.addTimeout(timeout);}
}
上面就是全部的流程,下面来具体看下。首先每次添加任务到时间轮最多只能添加 100000 个。如果这个任务已经取消了,这时候就不用管了,直接判断下一个任务。然后开始计算要添加的任务的下标。
首先求出这个任务的过期时间是多少个 tick,也就是指针走多少个 tick 才能执行这个过期任务。
// calculated = tick 次数,这里求出来到这个任务的过期时间要经过多少个tick
long calculated = timeout.deadline / tickDuration;
接着判断下还有多少圈才能执行,我们知道每一圈就是 wheel.length 个 tick,当前已经走过 tick 了,剩下的就是 (calculated - tick) / wheel.length
,比如求出的 remainingRounds = 5,意思就是时间指针还需要走 6 圈才能执行,因为当 timeout.remainingRounds = 0
的时候并且这个任务的过期时间到了,这时候才能执行任务。
但是我们知道,有可能添加任务的时候当前时间已经超过了任务的截止时间,这是胡其实求出来的 timeout.remainingRounds
就有可能是负数,那么这时候 calculated < tick
,所以下面需要判断下,如果真的是这种情况,那么就以当前指针的 tick 来计算下标,意味者马上就会执行这个过期任务。
// 还有多少圈才可以执行timeout.remainingRounds = (calculated - tick) / wheel.length;// 如果 tick > calculated,意思就是当前时间超过了任务执行的时间,这时候会放到当前 tick 对应的时间格子下面final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
最后求出时间任务需要加入哪个下标格子,然后就添加到链表中。
// 算出这个 ticks 对应的时间格子,也就是哪个下标
int stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];
// 将timeout加入到bucket链表中
bucket.addTimeout(timeout);
8.6 执行过期任务
执行过期任务其实是 HashedWheelBucket 中的方法,但是因为跟工作线程相关,所以在这里也就讲了。
public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;// process all timeouts// 遍历格子中的所有定时任务while (timeout != null) {HashedWheelTimeout next = timeout.next;// remainingRounds <= 0 才可以执行if (timeout.remainingRounds <= 0) {// 移除到时间点要执行的任务next = remove(timeout);// 如果截至时间小于当前的时间,那么就执行任务if (timeout.deadline <= deadline) {timeout.expire();} else {// 这种情况是不会发生的// The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {next = remove(timeout);} else {// 否则指针每走一圈,就让 remainingRounds - 1timeout.remainingRounds --;}// 把指针放置到下一个timeouttimeout = next;}
}
这里面其实就是遍历这个下标对应的链表,然后判断下如果 timeout.remainingRounds <= 0
,这时候就把这个任务从链表中移除掉,接着执行。否则就让 timeout.remainingRounds - 1
,直到 timeout.remainingRounds = 0
的时候才执行过期任务。
9. HashedWheelTimeout
这个就是任务节点了,那么现在来看下这个类里面的一些属性和方法。
9.1 属性
// 定义定时任务的3个状态:初始化、取消、过期
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;// CAS更新定时任务状态
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");// 初始化状态
private volatile int state = ST_INIT;
首先就是任务状态,可以看到时间轮的任务里面定义了三种状态:ST_INIT、ST_CANCELLED 和 ST_EXPIRED,分别代表初始化、取消和过期。默认是初始化状态。
- 初始化就是任务创建出来的状态
- 过期其实就是当任务过期被执行的时候会从初始化设置为已过期
- 已取消就是当手动调用取消方法的时候会设置成这个任务状态
下面就是一些基本属性了,比如时间轮引用,具体任务,超时时间,还有离任务执行的圈数…
// 时间轮引用
private final HashedWheelTimer timer;
// 具体到期需要执行的任务
private final TimerTask task;
private final long deadline;
// 剩下的圈数
long remainingRounds;
要知道时间轮是双向链表结构,所以当然也会有前后节点指针以及时间格子的引用。
// 定时任务所在的格子
HashedWheelBucket bucket;// 双向链表结构,由于只有worker线程会访问,这里不需要synchronization / volatile
HashedWheelTimeout next;
HashedWheelTimeout prev;
9.2 任务取消
/*** 取消任务* @return*/
@Override
public boolean cancel() {// only update the state it will be removed from HashedWheelBucket on next tick.// 修改下状态,如果修改失败就是并发失败,这里就返回 false,让其他线程去操作了if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {return false;}// If a task should be canceled we put this to another queue which will be processed on each tick.// So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way// we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.// 加入到时间轮的待取消队列,工作线程每一次到下一个 tick 的时候都会处// 理一下这些已取消的任务,简单来说就是把这些任务从时间轮链表中删掉timer.cancelledTimeouts.add(this);return true;
}
任务取消的逻辑很简单,只需要设置下任务状态 ST_INIT -> ST_CANCELLED
,但是如果 CAS 修改失败了,说明这时候并发比较激烈,那么就让修改成功的线程去处理就行了。
修改成功的线程会把任务加入到已取消队列中,工作线程每一次到下一个 tick 的时候都会处理一下这些已取消的任务,简单来说就是把这些任务从时间轮链表中删掉。
9.3 任务移除
void remove() {HashedWheelBucket bucket = this.bucket;if (bucket != null) {bucket.remove(this);} else {timer.pendingTimeouts.decrementAndGet();}}
这里逻辑就很简单了,就是把任务从 bucket 中移除,其实就是双向链表的移除节点的逻辑,等会到介绍 HashedWheelBucket
的时候会具体看详细的逻辑。
9.4 执行过期任务
最后再看下执行过期任务的逻辑。
// 过期并执行任务
public void expire() {// CAS 修改任务状态if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return;}try {// 调用 Executor 去执行,其实如果没有设置就是由工作线程去执行// 可以自己设置一个线程池,这样就不会阻塞工作线程了timer.taskExecutor.execute(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()+ " for execution.", t);}}
}@Override
public void run() {try {task.run(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);}}
}
首先也是先通过 CAS 修改一下状态,修改成功就调用时间轮的 Executor 去执行任务,这个参数其实在 3.构造器
里面也有介绍了,如果没有设置,其实默认就是由工作线程去执行,如果设置了线程,就会把任务丢到线程池里面去执行。
9.5 TimerTask
那说了那么多,TimerTask 的结构到底是什么呢?其实就是一个接口,里面定义了 run 方法。
public interface TimerTask {void run(Timeout timeout) throws Exception;
}
10. HashedWheelBucket
最后我们在来看下时间格的一些属性和方法,因为时间格是管理任务链表的,所以肯定需要链表头和尾了。
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
10.1 添加任务节点
新增任务的时候会把这个任务添加到时间格子的尾部,就是双向链表的添加的方法,需要注意的是,head 和 tail 不会设置默认的头尾指针。
/**
* Add {@link HashedWheelTimeout} to this bucket.
* 新增 HashedWheelTimeout 到双向链表的尾部
*/
public void addTimeout(HashedWheelTimeout timeout) {assert timeout.bucket == null;timeout.bucket = this;if (head == null) {head = tail = timeout;} else {tail.next = timeout;timeout.prev = tail;tail = timeout;}
}
10.2 删除任务节点
然后就是移除 timeout 节点的方法。
/**
* 从双向链表中移除指定的 HashedWheelTimeout 节点
* @param timeout
* @return
*/
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {HashedWheelTimeout next = timeout.next;// remove timeout that was either processed or cancelled by updating the linked-listif (timeout.prev != null) {timeout.prev.next = next;}if (timeout.next != null) {timeout.next.prev = timeout.prev;}// 头结点if (timeout == head) {// if timeout is also the tail we need to adjust the entry tooif (timeout == tail) {// 没有节点了tail = null;head = null;} else {// 赋值给下一个节点head = next;}} else if (timeout == tail) {// if the timeout is the tail modify the tail to be the prev node.// 更新tail节点到前一个节点tail = timeout.prev;}// null out prev, next and bucket to allow for GC.// GCtimeout.prev = null;timeout.next = null;timeout.bucket = null;// 等待的任务数 - 1timeout.timer.pendingTimeouts.decrementAndGet();return next;
}
具体逻辑我就不多说了,就是双向链表的移除节点方法。主要看下最后当移除了这个节点之后,需要把 prev、next、bucket 通通设置为 null,删除引用,避免内存泄露。
10.3 清空整个链表
清空整个链表,把还没有处理的任务返回,这个方法是时间轮停止的时候会调用来返回没有处理的任务。
public void clearTimeouts(Set<Timeout> set) {for (;;) {// 获取头节点HashedWheelTimeout timeout = pollTimeout();if (timeout == null) {// 没有数据了,返回return;}// 过期了或者是已经取消的人if (timeout.isExpired() || timeout.isCancelled()) {continue;}// 剩下那些就是没有处理的任务set.add(timeout);}
}// 断开头结点返回
private HashedWheelTimeout pollTimeout() {// 头结点HashedWheelTimeout head = this.head;if (head == null) {return null;}// 把下一个节点设置为头结点HashedWheelTimeout next = head.next;if (next == null) {tail = this.head = null;} else {this.head = next;next.prev = null;}// 属性全部设置为 null,方便 GC// null out prev and next to allow for GC.head.next = null;head.prev = null;head.bucket = null;return head;}
逻辑和上面是删除任务节点的流程差不多,就不多说了。
11. 小结
到这里 Netty 时间轮的逻辑就已经讲完了,其实里面的逻辑还是比较多的,关于时间轮也不是一个版本就能写成这样,肯定是经过了很多个版本的迭代,包括为什么要设置时间轮任务上限这些,其实在 github 的 issue 搜 HashedWheelTimer 都能看到一些相关的优化的提交,所以这方面有兴趣的朋友可以去看看。
如有错误,欢迎指出!!!