Netty-时间轮
归档
参考
说明
- 其实 Netty 框架并没有使用,其可做学习算法原理的参考
单元测试
public class HashedWheelTimerTest2 {public static void main(String[] args) {System.out.println("---------> " + LocalTime.now());Timer timer = new HashedWheelTimer(); Timeout timeout1 = timer.newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) {System.out.println("timeout1: " + LocalTime.now());}}, 10, TimeUnit.SECONDS);if (!timeout1.isExpired()) {timeout1.cancel(); }timer.newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws InterruptedException {System.out.println("timeout2: " + LocalTime.now());Thread.sleep(5000); }}, 1, TimeUnit.SECONDS);timer.newTimeout(new TimerTask() { @Overridepublic void run(Timeout timeout) {System.out.println("timeout3: " + LocalTime.now());System.out.println(timeout.timer()); System.out.println(timeout.task().getClass()); }}, 3, TimeUnit.SECONDS);System.out.println("timer -> " + timer);System.out.println("---------> " + LocalTime.now());}
}
原理
类结构
io.netty.util.HashedWheelTimer
public class HashedWheelTimer implements Timer {private final Worker worker = new Worker(); private final Thread workerThread; private final long tickDuration; private final HashedWheelBucket[] wheel; private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue(); private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue(); private final AtomicLong pendingTimeouts = new AtomicLong(0); private static final class HashedWheelBucket {private HashedWheelTimeout head; private HashedWheelTimeout tail; }private static final class HashedWheelTimeout implements Timeout, Runnable {private final TimerTask task; private final long deadline; long remainingRounds; HashedWheelTimeout next; HashedWheelTimeout prev;}
}
调用链
io.netty.util.HashedWheelTimer
public class HashedWheelTimer implements Timer {@Overridepublic Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {... start(); long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; ... HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout); return timeout;}
}
io.netty.util.HashedWheelTimer.Worker
private final class Worker implements Runnable {private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); private long tick; @Overridepublic void run() {... do {final long deadline = waitForNextTick(); if (deadline > 0) {int idx = (int) (tick & mask); processCancelledTasks(); HashedWheelBucket bucket = wheel[idx];transferTimeoutsToBuckets(); bucket.expireTimeouts(deadline);tick++; }} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); ... }private long waitForNextTick() {long deadline = tickDuration * (tick + 1);for (;;) {final long currentTime = System.nanoTime() - startTime;long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE; } else {return currentTime; }}... try {Thread.sleep(sleepTimeMs); } ... }}private void transferTimeoutsToBuckets() {for (int i = 0; i < 10_0000; i++) {HashedWheelTimeout timeout = timeouts.poll();... long calculated = timeout.deadline / tickDuration;timeout.remainingRounds = (calculated - tick) / wheel.length; final long ticks = Math.max(calculated, tick);int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex];bucket.addTimeout(timeout);}}}
io.netty.util.HashedWheelTimer.HashedWheelBucket
private static final class HashedWheelBucket {public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;while (timeout != null) {HashedWheelTimeout next = timeout.next;if (timeout.remainingRounds <= 0) { next = remove(timeout);if (timeout.deadline <= deadline) { timeout.expire(); } ... } else if (timeout.isCancelled()) {next = remove(timeout);} else {timeout.remainingRounds--; }timeout = next;}}}
io.netty.util.HashedWheelTimer.HashedWheelTimeout
private static final class HashedWheelTimeout implements Timeout, Runnable {public void expire() {if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return; }try {timer.taskExecutor.execute(this); } catch (Throwable t) {... }}}