Main
demo测试代码
public class Main {static int inCount = 0;static int runCount = 0;public static void main(String[] args) {CountDownLatch countDownLatch = new CountDownLatch(1000);Timer timer = new Timer();for(int i=1;i<=1000;i++){TimerTask timerTask = new TimerTask(i,()->{countDownLatch.countDown();int index = addRun();System.out.println(index+"----------执行");});timer.addTask(timerTask);System.out.println(i+"++++++++++加入");inCount++;}TimerTask timerTask = new TimerTask(5000,()->{countDownLatch.countDown();int index = addRun();System.out.println(index+"----------执行");});timer.addTask(timerTask);try {countDownLatch.await();System.out.println("inCount" + inCount);System.out.println("runCount" + runCount);} catch (Exception e){e.printStackTrace();}}public synchronized static int addRun(){runCount++;return runCount;}}
Timer
时间轮定时器
*** 定时器*/
public class Timer {/*** 底层时间轮*/private TimeWheel timeWheel;/*** 一个Timer只有一个delayQueue*/private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();/*** 过期任务执行线程*/private ExecutorService workerThreadPool;/*** 轮询delayQueue获取过期任务线程*/private ExecutorService bossThreadPool;/*** 构造函数*/public Timer() {timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);workerThreadPool = Executors.newFixedThreadPool(100);bossThreadPool = Executors.newFixedThreadPool(1);//20ms获取一次过期任务bossThreadPool.submit(() -> {while (true) {this.advanceClock(20);}});}/*** 添加任务*/public void addTask(TimerTask timerTask) {//添加失败任务直接执行if (!timeWheel.addTask(timerTask)) {workerThreadPool.submit(timerTask.getTask());}}/*** 获取过期任务*/private void advanceClock(long timeout) {try {TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);if (timerTaskList != null) {//推进时间timeWheel.advanceClock(timerTaskList.getExpiration());//执行过期任务(包含降级操作)timerTaskList.flush(this::addTask);}} catch (Exception e) {e.printStackTrace();}}
}
TimeWheel
时间轮
/*** 时间轮*/
public class TimeWheel {/*** 一个时间槽的范围*/private long tickMs;/*** 时间轮大小*/private int wheelSize;/*** 时间跨度*/private long interval;/*** 时间槽*/private TimerTaskList[] timerTaskLists;/*** 当前时间*/private long currentTime;/*** 上层时间轮*/private volatile TimeWheel overflowWheel;/*** 一个Timer只有一个delayQueue*/private DelayQueue<TimerTaskList> delayQueue;public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {this.currentTime = currentTime;this.tickMs = tickMs;this.wheelSize = wheelSize;this.interval = tickMs * wheelSize;this.timerTaskLists = new TimerTaskList[wheelSize];//currentTime为tickMs的整数倍 这里做取整操作this.currentTime = currentTime - (currentTime % tickMs);this.delayQueue = delayQueue;for (int i = 0; i < wheelSize; i++) {timerTaskLists[i] = new TimerTaskList();}}/*** 创建或者获取上层时间轮*/private TimeWheel getOverflowWheel() {if (overflowWheel == null) {synchronized (this) {if (overflowWheel == null) {overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);}}}return overflowWheel;}/*** 添加任务到时间轮*/public boolean addTask(TimerTask timerTask) {long expiration = timerTask.getDelayMs();//过期任务直接执行if (expiration < currentTime + tickMs) {return false;} else if (expiration < currentTime + interval) {//当前时间轮可以容纳该任务 加入时间槽Long virtualId = expiration / tickMs;int index = (int) (virtualId % wheelSize);System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration);TimerTaskList timerTaskList = timerTaskLists[index];timerTaskList.addTask(timerTask);if (timerTaskList.setExpiration(virtualId * tickMs)) {//添加到delayQueue中delayQueue.offer(timerTaskList);}} else {//放到上一层的时间轮TimeWheel timeWheel = getOverflowWheel();timeWheel.addTask(timerTask);}return true;}/*** 推进时间*/public void advanceClock(long timestamp) {if (timestamp >= currentTime + tickMs) {currentTime = timestamp - (timestamp % tickMs);if (overflowWheel != null) {//推进上层时间轮时间this.getOverflowWheel().advanceClock(timestamp);}}}
}
TimerTask
任务对象
/*** 任务*/
public class TimerTask {/*** 延迟时间*/private long delayMs;/*** 任务*/private Runnable task;/*** 时间槽*/protected TimerTaskList timerTaskList;/*** 下一个节点*/protected TimerTask next;/*** 上一个节点*/protected TimerTask pre;/*** 描述*/public String desc;public TimerTask(long delayMs, Runnable task) {this.delayMs = System.currentTimeMillis() + delayMs;this.task = task;this.timerTaskList = null;this.next = null;this.pre = null;}public Runnable getTask() {return task;}public long getDelayMs() {return delayMs;}@Overridepublic String toString() {return desc;}
}
TimerTaskList
任务集合
/*** 时间槽*/
public class TimerTaskList implements Delayed {/*** 过期时间*/private AtomicLong expiration = new AtomicLong(-1L);/*** 根节点*/private TimerTask root = new TimerTask(-1L, null);{root.pre = root;root.next = root;}/*** 设置过期时间*/public boolean setExpiration(long expire) {return expiration.getAndSet(expire) != expire;}/*** 获取过期时间*/public long getExpiration() {return expiration.get();}/*** 新增任务*/public void addTask(TimerTask timerTask) {synchronized (this) {if (timerTask.timerTaskList == null) {timerTask.timerTaskList = this;TimerTask tail = root.pre;timerTask.next = root;timerTask.pre = tail;tail.next = timerTask;root.pre = timerTask;}}}/*** 移除任务*/public void removeTask(TimerTask timerTask) {synchronized (this) {if (timerTask.timerTaskList.equals(this)) {timerTask.next.pre = timerTask.pre;timerTask.pre.next = timerTask.next;timerTask.timerTaskList = null;timerTask.next = null;timerTask.pre = null;}}}/*** 重新分配*/public synchronized void flush(Consumer<TimerTask> flush) {TimerTask timerTask = root.next;while (!timerTask.equals(root)) {this.removeTask(timerTask);flush.accept(timerTask);timerTask = root.next;}expiration.set(-1L);}@Overridepublic long getDelay(TimeUnit unit) {return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));}@Overridepublic int compareTo(Delayed o) {if (o instanceof TimerTaskList) {return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());}return 0;}
}