前言
Java 提供的java.util.Timer
类可以用来执行延时任务,任务可以只执行一次,也可以周期性的按照固定的速率或延时来执行。
实现一个延时任务调度器,核心有两点:
- 如何存储延时任务
- 如何调度执行延时任务
源码分析
TimerTask
延时任务的核心属性有两个:
- 任务的逻辑(任务要干什么)
- 任务计划执行时间
Java 把延时任务封装成java.util.TimerTask
类,实现自 Runnable,可以被线程执行。
public abstract class TimerTask implements Runnable {int state = VIRGIN;// 默认值,表示任务还没被安排调度执行static final int VIRGIN = 0;// 任务入队,等待调度static final int SCHEDULED = 1;// 任务执行中static final int EXECUTED = 2;// 任务取消static final int CANCELLED = 3;long nextExecutionTime;long period = 0;
}
属性说明:
- state 任务的状态,例如 执行中、已取消等
- nextExecutionTime 任务下次执行的时间
- period 任务重复执行的时间周期,正值表示固定速率执行,负值表示固定延时执行,0表示非重复任务
Timer
Java 把延时任务调度器封装成java.util.Timer
类
public class Timer {private final TaskQueue queue = new TaskQueue();private final TimerThread thread = new TimerThread(queue);
}
属性说明:
- queue 任务优先级队列,基于最小堆实现,队头永远是最早要执行的任务
- thread 任务调度执行线程,单线程执行
提交延时任务,最终会调用sched()
,给 task 赋值任务的执行时间,状态等信息,然后加入到队列,如果队头就是自己,那么就要唤醒线程轮询队列调度执行。
private void sched(TimerTask task, long time, long period) {if (time < 0)throw new IllegalArgumentException("Illegal execution time.");if (Math.abs(period) > (Long.MAX_VALUE >> 1))period >>= 1;synchronized(queue) {if (!thread.newTasksMayBeScheduled)throw new IllegalStateException("Timer already cancelled.");// 抢锁synchronized(task.lock) {if (task.state != TimerTask.VIRGIN)throw new IllegalStateException("Task already scheduled or cancelled");// 设置任务的执行时间,重复执行的时间周期,状态改为已调度task.nextExecutionTime = time;task.period = period;task.state = TimerTask.SCHEDULED;}// 入队queue.add(task);// 如果队头是自己,唤醒线程调度执行if (queue.getMin() == task)queue.notify();}
}
TimerThread
执行延时任务的线程被封装成java.util.TimerThread
类,继承自 Thread,内部持有任务队列的引用。
class TimerThread extends Thread {private TaskQueue queue;
}
线程执行会调用mainLoop()
,不断轮询任务队列,如果队列是空的,线程就 wait,等待外部提交延时任务将自己唤醒。如果队列非空,就判断队头节点的执行时间是否已到,时间到了就立即执行任务,否则就 wait 指定时间。如果任务是一次性的,就把它从堆中移除;如果任务是重复执行的,就再重新添加到堆中。
private void mainLoop() {while (true) {try {TimerTask task;boolean taskFired;synchronized(queue) {// 队列空就waitwhile (queue.isEmpty() && newTasksMayBeScheduled)queue.wait();if (queue.isEmpty())break; // Queue is empty and will forever remain; die// 获取队头节点long currentTime, executionTime;task = queue.getMin();synchronized(task.lock) {if (task.state == TimerTask.CANCELLED) {queue.removeMin();continue; // No action required, poll queue again}currentTime = System.currentTimeMillis();executionTime = task.nextExecutionTime;if (taskFired = (executionTime<=currentTime)) {if (task.period == 0) {// 一次性任务,删除掉queue.removeMin();task.state = TimerTask.EXECUTED;} else {// 重复执行的任务,再提交一次queue.rescheduleMin(task.period<0 ? currentTime - task.period: executionTime + task.period);}}}if (!taskFired)// 任务执行时间还没到,继续waitqueue.wait(executionTime - currentTime);}if (taskFired)// 任务执行时间到了,就执行task.run();} catch(InterruptedException e) {}}
}
注意,period>0 代表任务以 **固定速率 **执行,period<0 代表任务以 **固定延时 **执行。
什么意思呢?固定速率模式下,任务的下次执行时间会以上次任务的计划执行时间开始计算,上次任务执行的耗时尽量不影响下次任务的执行时间。固定延时模式下,任务的下次执行时间会以上次任务的实际执行时间开始计算,也就是说上次任务的执行耗时会影响下次任务的执行时间。
TaskQueue
存储任务的队列被封装成java.util.TaskQueue
类,它是一个基于最小堆实现的优先队列,堆中的元素会按照任务的计划执行时间升序排列,队头永远是最早要执行的任务,这样获取要执行的任务时间复杂度是O(1),但是任务的插入删除,时间复杂度是O(logn)。
因为堆是一棵完全二叉树,数据规模为n的情况下,二叉树的高度是 logn。堆节点的插入和删除,需要不断和父节点或子节点比较并交换,交换的次数最多是树的高度,所以时间复杂度最坏是O(logn)。
class TaskQueue {private TimerTask[] queue = new TimerTask[128];private int size = 0;
}
属性说明:
- queue 任务队列,最小堆结构,用数组存储
- size 任务数
提交任务会调用add()
,一个基本的往堆中添加节点的操作。先判断数组是否要扩容,然后添加到队尾,最后节点上滤,调整堆结构。
void add(TimerTask task) {// 扩容if (size + 1 == queue.length)queue = Arrays.copyOf(queue, 2*queue.length);// 先入到队尾queue[++size] = task;// 上滤操作,调整堆结构fixUp(size);
}
堆节点上滤过程很简单,不断与父节点比较任务的执行时间,如果父节点任务晚于自己执行,就要和父节点交换位置。
/**
* 元素上滤,调整堆结构
* 不断与父节点比较,如果父节点比自己大,就要和父节点交换
*/
private void fixUp(int k) {// 是否有父节点while (k > 1) {// j = 父节点下标int j = k >> 1;// 如果父节点的执行时间大于自己,就要交换,直到自己排到正确的位置if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)break;TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;k = j;}
}
如果是一次性任务,执行前要把它从堆中移除,调用removeMin()
。先把队尾节点赋值给队头节点,然后将队头节点下滤操作。
void removeMin() {queue[1] = queue[size];queue[size--] = null;fixDown(1);
}
堆节点下滤也很简单,就是不断和自己的左右子节点比较,如果子节点比自己小,就要交换,直到自己被交换到正确的位置。
private void fixDown(int k) {int j;// 是否有子节点 j=左子节点,j+1=右子节点while ((j = k << 1) <= size && j > 0) {if (j < size &&queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)j++; // j indexes smallest kidif (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)break;TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;k = j;}
}
尾巴
延时任务调度执行器的核心有两点,分别是如何存储任务以及如何调度执行任务。Java Timer 基于最小堆的数据结构来存储延时任务,根节点永远是最早执行的任务,获取任务的时间复杂度是O(1),但是任务的插入和删除需要O(logn)复杂度,这在需要维护大量延时任务时,会有性能问题,可以考虑采用时间轮算法。
另外,每个 Java Timer 对象都会开启一个线程来调度执行提交的所有任务,因为是单线程执行,所以一旦有耗时的任务,队列中的其它任务都会受到影响,这点尤其要注意。