本章我们会对定时算法做个简单介绍,包括常用的定时算法(最小堆、时间轮)的概述、实现方式、典型场景做个说明。
概述
系统或者项目中难免会遇到各种需要自动去执行的任务,实现这些任务的手段也多种多样,如操作系统的crontab,spring框架的quartz,java的Timer和ScheduledThreadPool都是定时任务中的典型手段。
最小堆
概念
Timer是java中最典型的基于优先级队列+最小堆实现的定时器,内部维护一个存放定时任务的优先级队列,该优先级队列使用了最小堆排序。当我们调用schedule方法的时候,一个新的任务被加入queue,堆重排,始终保持堆顶是执行时间最小(即最近马上要执行)的。同时,内部相当于起了一个线程不断扫描队列,从队列中依次获取堆顶元素执行,任务得到调度。
下面以Timer为例,介绍优先级队列+最小堆算法的实现原理:
案例
package com.ls.cloud.sys.alg.Timer;import java.util.Timer;
import java.util.TimerTask;class Task extends TimerTask {@Overridepublic void run() {System.out.println("running...");}
}
public class TimerDemo {public static void main(String[] args) {Timer t=new Timer();//在1秒后执行,以后每2秒跑一次t.schedule(new Task(), 1000,2000);}
}
源码分析
新加任务时,t.schedule方法会add到队列
void add(TimerTask task) {// Grow backing store if necessaryif (size + 1 == queue.length)queue = Arrays.copyOf(queue, 2*queue.length);queue[++size] = task;fixUp(size);}
add实现了容量维护,不足时扩容,同时将新任务追加到队列队尾,触发堆排序,始终保持堆顶元素最小
private void fixUp(int k) {while (k > 1) {//k指针指向当前新加入的节点,也就是队列的末尾节点,j为其父节点int j = k >> 1;//如果新加入的执行时间比父节点晚,那不需要动if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)break;//如果大于其父节点,父子交换TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;//交换后,当前指针继续指向新加入的节点,继续循环,知道堆重排合格k = j;}}
线程调度中的run,主要调用内部mainLoop()方法,使用while循环
private void mainLoop() {while (true) {try {TimerTask task;boolean taskFired;synchronized(queue) {// Wait for queue to become non-emptywhile (queue.isEmpty() && newTasksMayBeScheduled)queue.wait();if (queue.isEmpty())break; // Queue is empty and will forever remain; die// Queue nonempty; look at first evt and do the right thinglong currentTime, executionTime;task = queue.getMin();synchronized(task.lock) {if (task.state == TimerTask.CANCELLED) {queue.removeMin();continue; // No action required, poll queue again}currentTime = System.currentTimeMillis();executionTime = task.nextExecutionTime;//判断是否到了执行时间if (taskFired = (executionTime<=currentTime)) {//判断下一次执行时间,单次的执行完移除 //循环的修改下次执行时间if (task.period == 0) { // Non-repeating, removequeue.removeMin();task.state = TimerTask.EXECUTED;} else { // Repeating task, reschedule//下次时间的计算有两种策略 //1.period是负数,那下一次的执行时间就是当前时间‐period //2.period是正数,那下一次就是该任务本次的执行时间+period //注意!这两种策略大不相同。因为Timer是单线程的 //如果是1,那么currentTime是当前时间,就受任务执行长短影响 //如果是2,那么executionTime是绝对时间戳,与任务长短无关queue.rescheduleMin(task.period<0 ? currentTime - task.period: executionTime + task.period);}}}//不到执行时间,等待if (!taskFired) // Task hasn't yet fired; waitqueue.wait(executionTime - currentTime);}//到达执行时间,run!if (taskFired) // Task fired; run it, holding no lockstask.run();} catch(InterruptedException e) {}}}
}
应用
- Timer是单线程,一旦一个失败或出现异常,将打断全部任务队列,线程池不会
- Timer在jdk1.3+,而线程池需要jdk1.5+
时间轮
概念
时间轮是一种更为常见的定时调度算法,各种操作系统的定时任务调度,linux crontab,基于java的通信框架Netty等。其灵感来源于我们生活中的时钟。 轮盘实际上是一个头尾相接的环状数组,数组的个数即是插槽数,每个插槽中可以放置任务。 以1天为例,将任务的执行时间%12,根据得到的数值,放置在时间轮上,小时指针沿着轮盘扫描,扫到的点取出任务执行:
实现
package com.ls.cloud.sys.alg.Timer;public class RoundTask {//延迟多少秒后执行int delay;//加入的序列号,只是标记一下加入的顺序int index;public RoundTask(int index, int delay) {this.index = index;this.delay = delay;}void run() {System.out.println("task " + index + " start , delay = "+delay);}@Overridepublic String toString() {return String.valueOf(delay);}
}
package com.ls.cloud.sys.alg.Timer;import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;public class RoundDemo {//小轮槽数int size1=10;//大轮槽数int size2=5;//小轮,数组,每个元素是一个链表LinkedList<RoundTask>[] t1 = new LinkedList[size1];//大轮LinkedList<RoundTask>[] t2 = new LinkedList[size2];//小轮计数器,指针跳动的格数,每秒加1final AtomicInteger flag1=new AtomicInteger(0);//大轮计数器,指针跳动个格数,即每10s加1final AtomicInteger flag2=new AtomicInteger(0);//调度器,拖动指针跳动ScheduledExecutorService service = Executors.newScheduledThreadPool(2);public RoundDemo(){//初始化时间轮for (int i = 0; i < size1; i++) {t1[i]=new LinkedList<>();}for (int i = 0; i < size2; i++) {t2[i]=new LinkedList<>();}}//打印时间轮的结构,数组+链表void print(){System.out.println("t1:");for (int i = 0; i < t1.length; i++) {System.out.println(t1[i]);}System.out.println("t2:");for (int i = 0; i < t2.length; i++) {System.out.println(t2[i]);}}//添加任务到时间轮void add(RoundTask task){int delay = task.delay;if (delay < size1){//10以内的,在小轮,槽取余数t1[delay%size1].addLast(task);}else {//超过小轮的放入大轮,槽除以小轮的长度t2[delay/size1].addLast(task);}}void startT1(){//每秒执行一次,推动时间轮旋转,取到任务立马执行service.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {int point = flag1.getAndIncrement()%size1;System.out.println("t1 -----> slot "+point);LinkedList<RoundTask> list = t1[point];if (!list.isEmpty()){//如果当前槽内有任务,取出来,依次执行,执行完移除while (list.size() != 0){list.getFirst().run();list.removeFirst();}}}},0,1, TimeUnit.SECONDS);}void startT2(){//每10秒执行一次,推动时间轮旋转,取到任务下方到t1service.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {int point = flag2.getAndIncrement()%size2;System.out.println("t2 =====> slot "+point);LinkedList<RoundTask> list = t2[point];if (!list.isEmpty()){//如果当前槽内有任务,取出,放到定义的小轮while (list.size() != 0){RoundTask task = list.getFirst();//放入小轮哪个槽呢?小轮的槽按10取余数t1[task.delay % size1].addLast(task);//从大轮中移除list.removeFirst();}}}},0,10, TimeUnit.SECONDS);}public static void main(String[] args) {RoundDemo roundDemo = new RoundDemo();//生成100个任务,每个任务的延迟时间随机for (int i = 0; i < 100; i++) {roundDemo.add(new RoundTask(i,new Random().nextInt(50)));}//打印,查看时间轮任务布局roundDemo.print();//启动大轮roundDemo.startT2();//小轮启动roundDemo.startT1();}}
结果
t1 -----> slot 1
task 8 start , delay = 11
task 45 start , delay = 11
task 87 start , delay = 11
t1 -----> slot 2
task 40 start , delay = 12
task 89 start , delay = 12
t1 -----> slot 3
task 25 start , delay = 13
t1 -----> slot 4
task 69 start , delay = 14
t1 -----> slot 5
- 输出结果严格按delay顺序执行,而不管index是何时被提交的
- t1为小轮,10个槽,每个1s,10s一轮回
- t2为大轮,5个槽,每个10s,50s一轮回
- t1循环到每个槽时,打印槽内的任务数据,如 t1–>slot9 , 打印了3个9s执行的数据
- t2循环到每个槽时,将槽内的任务delay时间取余10后,放入对应的t1槽中,如 t2==>slot1
- 那么t1旋转对应的圈数后,可以取到t2下放过来的任务并执行,如10,11…