前言
在日常开发过程中,我们经常会遇到周期性执行某段代码的场景。比如定期同步订单,定期更新商品信息,定期发送消息等。这些重复执行的代码可以抽象为一个任务(Task)。 一个Task的特点如下:
- 包含需要执行的业务逻辑。
- 能够在指定的时间重复(一次或者多次)执行。
围绕Task的特点,开发者们开发了不同的调度框架或者中间件,满足日常开发中的使用。
以下表格列出了部分实现。
技术 | 来源 | 使用场景 | 依赖第三方 |
---|---|---|---|
Timer | JDK自带 | 目前比较少使用 | |
ScheduledExecutorService | JDK自带 | 基于线程池技术,常用于中间件中 | |
Spring Task | Spring-context | Spring 项目,常用于单体应用开发 | |
XXL-JOB | 国产开源中间件 | 可用于分布式项目调度 | 依赖mysql |
Quartz | OpenSymphony 开源组织 | 一些中间件常常基于Quartz开发 | 分布式需要依赖数据库 |
Elastic-Job | 当当⽹开源 | 可用于分布式项目调度 | 需要依赖ZooKeeper + Mesos |
Apache DolphinScheduler | 易观开源 | 大数据任务调度 |
本文对部分技术的实现进行了介绍,从简单到复杂,从具体到抽象,希望可以让大家在面对一个任务调度框架时可以快速抓住要点,不再陌生。
1. Timer
java.util.Timer位于JDK的rt.jar包下,始于jdk1.3,是JDK自带的任务调度器,虽然目前基本不再使用Timer来进行任务调度,但是Timer设计简单,理解起来比较容易。而且后续ScheduledExecutorService的基本原理和Timer基本类似,因此需要对Timer进行一个详细的了解。
Timer的核心类比较少,只需要以下4个类即可。
类 | 功能 | 说明 |
---|---|---|
Timer | 入口类,整个调度器的组织者,相当于其他框架的. | 定义了多个提交task的方法 |
TimerThread | 任务调度器 | 后台线程执行一个轮询方法,核心方法为mainLoop()。 |
TimerTask | 抽象类,其实现类包装业务逻辑 | 核心属性:nextExecutionTime下一个执行时间点。 |
TaskQueue | 任务队列 | 优先队列,头部节点为最早执行的Task |
以上类都处于java.util包下。
1.1 存储任务的数据结构-平衡二叉堆
一个任务框架,需要可以容纳在不同时间执行的任务,因此必须要有一个容器来缓存或者持久化提交的任务。 那么在多任务的场景下,我们如何挑选出需要执行的任务呢?以下对一些场景进行分析:
方案1. 对所有的任务进行遍历,对于下次执行时间小于当前时间的任务,执行业务逻辑。
时间复杂度为O(n),一些时间没到的任务也被遍历到了。性能不好。
方案2. 先对所有的任务,按照下次执行时间的大小进行排序,每次只取头部任务。
对2进行分析可以发现,只要保证队列头部为最早执行的元素即可,对于其他任务,因为还不需要执行,是否有序并不重要。
方案3. 采用优先队列,头部为权值最小,每次取权值即可。
从以上分析可以看出,一个任务框架,可以采用优先队列来容纳提交的任务。Timer正是如此,它的基本数据结构为平衡二叉堆(balanced binary heap)。想要理解Timer,需要对平衡二叉堆进行了解。
详细可以参考 【Java基础】JAVA中优先队列详解 。 摘抄如下:
1.1.1 基本结构
Java平衡二叉堆的定义为:
任意一个非叶子节点的权值,都不大于其左右子节点的权值
可以使用下面的在线数据模拟器进行
平衡二叉堆数据结构模拟器
结构示例如下:
从图中可以看出,可以通过数组来实现平衡二叉堆。每一个节点的编号,可以使用数组的下标来表示。
- 数组的第一个元素为二叉树的根节点,在所有节点中权值最小。
- 父子节点之间的关系可以用以下算法表示。这个算法很重要。在新增元素或者删除元素的时候,都需要使用到该算法。
leftNo = parentNo * 2+1
rightNo = parentNo * 2+2
parentNo = (nodeNo-1)/2
在优先队列中,一般只使用到新增元素和删除根节点元素,因此只对这两个算法进行介绍。
1.1.2 新增元素
步骤如下:
1.先在队尾新增一个元素。如果数组长度不够就先扩容。
2.如果有父节点,则与父节点进行对比。如果权值比父节点小,则与父节点交换位置。
3.重复步骤2,直到没有父节点或者比父节点小则完成新增。2~3步一般称作siftUp。
//siftUp()
private void siftUp(int k, E x) {while (k > 0) {int parent = (k - 1) >>> 1;//parentNo = (nodeNo-1)/2Object e = queue[parent];if (comparator.compare(x, (E) e) >= 0)//调用比较器的比较方法break;queue[k] = e;k = parent;}queue[k] = x;
}
通过以上步骤。可以保证所有的父节点权值都小于子节点的权值。
1.1.3 删除队首元素
步骤如下:
1.删除数组的第一个元素。
2.将队尾的元素放置到头部位置,记为一个父节点。
3.通过比较获取子节点中较小的一个,并与父节点比较,如果父节点大于子节点,则交换位子。
4.重复步骤3,直到父节点小于等于子节点或者已经没有子节点,则结束比较。这个过程一般称作siftDown。
//siftDown()
private void siftDown(int k, E x) {int half = size >>> 1;while (k < half) {//首先找到左右孩子中较小的那个,记录到c里,并用child记录其下标int child = (k << 1) + 1;//leftNo = parentNo*2+1Object c = queue[child];int right = child + 1;if (right < size &&comparator.compare((E) c, (E) queue[right]) > 0)c = queue[child = right];if (comparator.compare(x, (E) c) <= 0)break;queue[k] = c;//然后用c取代原来的值k = child;}queue[k] = x;
}
1.2 Timer 核心执行逻辑
查看Timer类的结构,可以看到提交任务的方法有6个
代表了6种不同的场景
delay: 延迟毫秒数period: 时间间隔
1.延迟delay毫秒后,执行任务一次。
2.延迟delay毫秒后,周期性执行任务,两次任务之间间隔period毫秒。
3.延迟delay毫秒后,以固定频率执行任务,两次任务之间间隔period毫秒。4.指定的时间Date开始,执行任务一次。
5.指定的时间Date开始,周期性执行任务,两次任务之间间隔period毫秒。
6.指定的时间Date开始,以固定频率执行任务,两次任务之间间隔period毫秒。
其中需要特别说明的是2和3,5和6之间的区别,也就是schedule和scheduleAtFixedRate的区别。具体看下表:
方法名 | 下一个执行时间nextExecutionTime | 说明 |
---|---|---|
schedule | currentTime + delay | 当前序堵塞时,会影响到后续任务的下次计划时间, 下次任务会推迟执行,对于丢失的时间不会补上任务 |
scheduleAtFixedRate | nextExecutionTime + delay | 当堵塞时,影响到后续任务的计划时间, 任务的次数不会丢失,快速补上调度次数 |
注: currentTime:当前时间,nextExecutionTime:下次执行时间,delay:时间间隔
1.2.1 Timer简单的例子:
Demo先行,先看一个简单的例子,有个初步的印象。
public class Application {public static void main(String[] args) {//初始化一个timer对象Timer timer = new Timer();//创建抽象类TimerTask的实例。TimerTask myTask = new TimerTask() {@Overridepublic void run() {System.out.println("执行run方法,time="+System.currentTimeMillis()/1000%60+"秒");}};//提交任务,延迟1秒执行,每两秒执行一次timer.schedule(myTask, 1000, 1000*2);}
}
结果:
执行run方法,time=25秒
执行run方法,time=27秒
执行run方法,time=29秒
执行run方法,time=31秒
执行run方法,time=33秒
执行run方法,time=35秒
执行run方法,time=37秒
原始代码分析如下,只挑选了核心代码展示。
1.2.2 Timer 类源码分析
Timer 是整个任务架构的组织者,也是入口,因此首先看Timer的代码。
public class Timer {// TaskQueue 实现了一个优先队列private final TaskQueue queue = new TaskQueue();// TimerThread继承了Thread。同时组合了TaskQueue。当Timer实例化时。会启动TimerThread实例的的start()方法。启动线程处理定时任务。private final TimerThread thread = new TimerThread(queue);//构造函数。做了一件事情,及时启动了TimerThread线程,处理队列数据。public Timer(String name, boolean isDaemon) {thread.setName(name);thread.setDaemon(isDaemon);thread.start();}//延迟delay毫秒后执行一次任务public void schedule(TimerTask task, long delay) {sched(task, System.currentTimeMillis()+delay, 0);}//延迟delay毫秒,以固定频率执行定时任务。下次的执行时间为当前系统时间(System.currentTimeMillis())+|period|//当发生阻塞时,有可能丢失调度次数public void schedule(TimerTask task, long delay, long period) {sched(task, System.currentTimeMillis()+delay, -period);}//延迟delay毫秒,以固定频率执行定时任务。与schedule不同。下次执行时间为当前本应执行时间(nextExecutionTime)+period//当发生阻塞时,不会丢失调度次数。public void scheduleAtFixedRate(TimerTask task, long delay, long period) {sched(task, System.currentTimeMillis()+delay, period);}/*** 核心新增定时任务的方法。* @param task 为实现了业务的任务类。* @param time 为下次执行任务的时间。 * @param period 为0时表示不会重复执行。当period !=0时表示会周期性执行*/private void sched(TimerTask task, long time, long period) {//以下代码省去了部分校验代码synchronized(queue) {synchronized(task.lock) {task.nextExecutionTime = time;task.period = period;task.state = TimerTask.SCHEDULED;}queue.add(task);if (queue.getMin() == task)//说明加入task之前。队列为空,处于wait状态,需要唤醒。或者还有一种情况,就是加入的task处于头部,需要立即处理。有可能此时线程处于等待状态,需要唤醒。queue.notify();}}//省略。。。}
1.2.3 TaskQueue 类源码分析
TaskQueue 本质是一个平衡二叉堆,1.1已经有所介绍。
class TaskQueue {/*** Priority queue represented as a balanced binary heap: the two children* of queue[n] are queue[2*n] and queue[2*n+1]. The priority queue is* ordered on the nextExecutionTime field: The TimerTask with the lowest* nextExecutionTime is in queue[1] (assuming the queue is nonempty). For* each node n in the heap, and each descendant of n, d,* n.nextExecutionTime <= d.nextExecutionTime.* 注释已经讲明,就是一个平衡二叉堆。*/private TimerTask[] queue = new TimerTask[128];void add(TimerTask task) {// Grow backing store if necessaryif (size + 1 == queue.length)queue = Arrays.copyOf(queue, 2*queue.length);//加到队尾queue[++size] = task;//向上排序fixUp(size);}//新增元素时向上排序private void fixUp(int k) {while (k > 1) {int j = k >> 1;if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)break;TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;k = j;}}//刷新void rescheduleMin(long newTime) {queue[1].nextExecutionTime = newTime;fixDown(1);}//删除元素时向下排序private void fixDown(int k) {int j;while ((j = k << 1) <= size && j > 0) {if (j < size &&queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)j++; // j indexes smallest kidif (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)break;TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;k = j;}}//省略其他
}
1.2.3 TimerThread 类源码分析
从Timer代码可以看到,当实例化Timer时,将会启动一个TimerThread线程,具体作用是不断轮询队列的头部元素,然后执行业务代码。核心代码如下(省略部分代码):
class TimerThread extends Thread {//标识线程已经启用。当为false时,跳出循环。boolean newTasksMayBeScheduled = true;private TaskQueue queue;public void run() {try {mainLoop();} finally {//忽略。。。}}//执行轮询private void mainLoop() {while (true) {try {TimerTask task;boolean taskFired;synchronized(queue) {// Wait for queue to become non-emptywhile (queue.isEmpty() && newTasksMayBeScheduled) {//while循环 让出锁,等待新任务加入queue.wait();}if (queue.isEmpty()){//此时newTasksMayBeScheduled队列已死。退出循环break; }long currentTime, executionTime;task = queue.getMin();synchronized(task.lock) {//判断状态if (task.state == TimerTask.CANCELLED) {//检查已经取消的任务,移除。queue.removeMin();continue; }currentTime = System.currentTimeMillis();//当前task计划的执行时间。executionTime = task.nextExecutionTime;if (taskFired = (executionTime<=currentTime)) {//当前任务的计划执行时间<=当前时间,则允许执行。if (task.period == 0) { // Non-repeating, remove//只执行一次的任务,移除。queue.removeMin();task.state = TimerTask.EXECUTED;} else { //修改队列中最小的一个task的时间为下一个执行时间,并且重新排序。queue.rescheduleMin(task.period<0 ? currentTime - task.period: executionTime + task.period);}}}if (!taskFired) // Task hasn't yet fired; wait//最近时间执行的任务还未到时间,需要等待。让出锁。queue.wait(executionTime - currentTime);}if (taskFired) //执行业务逻辑。这里可以看到是同步执行的。如果业务逻辑耗时较长,会影响后续任务的执行。task.run();} catch(InterruptedException e) {}}}
}
TimerThread 中的 mainLoop 方法+TaskQueue队列,看起来非常熟悉,在queue为空的时候,会调用queue.wait()方法。直到Timer在新增元素时,调用了queue.notify()。这些代码和BlockQueue原理非常像。
1.2.4 TimerTask 类源码分析
public abstract class TimerTask implements Runnable {//执行状态int state = VIRGIN;//下次执行时间,如果是重复的任务,在任务执行前会被更新成下次的执行时间。long nextExecutionTime;// 毫秒数,用于重复执行的时间间隔。证书标识以固定频率调度。负数标识以固定的时间延迟调度。0代表不会重复执行。long period = 0;//抽象犯法,用于实现业务public abstract void run();//省略部分代码。
}
1.3 Timer 调度示意图
通过对Timer的四个核心类,我们可以得出以下调度示意图。
1.4 Timer 总结
可以看到,Timer 是JDK自带的任务调度器。实现的逻辑如下
- 实现一个优先队列。队列的头部为最先需要执行的任务。
- 启动一个后台线程,不断从优先队列中获取待执行的任务。
- 执行任务。
通过使用Timer,我们可以方便地在一个线程中执行多个计划任务。但是也有一定的局限性,主要是多个任务之间相互影响:
- 所有的任务都在一个线程中执行,如果前面的任务耗时比较长,则会影响后续任务的执行。
- 假设前序任务抛出了非InterruptedException的异常,则整个队列将会被清空,任务调度终止。
基于以上局限性,在实际应用中,使用Timer使用得并不多。常用的为 ScheduledExecutorService。ScheduledExecutorService与Timer 的最大区别是将任务提交给线程池处理。
2. ScheduledExecutorService
在前一章节可以了解到,在 Timer 类中所有的任务都是同步执行,如果前序任务发生了阻塞或者耗时比较长,那么后续任务就容易被阻塞到。
JDK在1.5之后J引入了 ThreadPoolExecutor 线程池技术。 线程池技术的逻辑机构图如下:
(参考聊聊Java进阶之并发基础技术—线程池剖析)
从上图可以看到,线程池也是将不同的任务加入到一个队列中(BlockingQueue),等待着多个线程的调用。与Timer的调度很相似,只是最大区别是线程池队列是被多个线程调用的。
因此JDK在1.5引入了ThreadPoolExecutor的同时,也重新编写了一套新任务调度器-ScheduledExecutorService,具体实现类为ScheduledThreadPoolExecutor,用于任务的调度。
其继承关系如下:
从继承关系中可以看出,ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,拥有线程池的所有功能。
ScheduledThreadPoolExecutor 在实现上与Timer是相似的,都是通过实现一个优先队列来管理任务,同时这个优先队列又是一个阻塞队列,在获取第一个任务后,只有到了执行时间才会返回任务。一个比较大的改进在于,获取任务后不是直接执行代码,而是交给线程池来调度。
2.1核心类
ScheduledExecutorService 的一些核心类如下:
类 | 功能 | 说明 |
---|---|---|
ScheduledExecutorService | 抽象类 | |
Executors.DelegatedScheduledExecutorService | 包装类 | 用于包装 ScheduledThreadPoolExecutor, 只暴露关键方法 |
ScheduledThreadPoolExecutor | 核心执行器 | 实现类,真正执行调度逻辑的地方 |
ScheduledThreadPoolExecutor.DelayedWorkQueue | 延迟阻塞队列 | 任务周期执行的核心方法在这个类中实现 |
ScheduledThreadPoolExecutor.ScheduledFutureTask | 队列中的对象 | 是ScheduledThreadPoolExecutor的成员内部类 |
以上类处于java.util.concurrent包下
2.2 ScheduledExecutorService 的简单用法和介绍
public class Application {public static void main(String[] args) {ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();Runnable runnable = ()->{System.out.println("执行run方法,time="+System.currentTimeMillis()/1000%60+"秒");};scheduledExecutorService.scheduleAtFixedRate(runnable, 1, 2, TimeUnit.SECONDS);}
}
查看 ScheduledExecutorService 的结构,
提交任务的方法共有4个,与Timer不同的是,可以提交Callable类型的任务。
public interface ScheduledExecutorService extends ExecutorService {//delay时间后,执行一次任务public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);//delay时间后,执行一次任务有返回值的任务public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);//以固定频率执行任务public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);//以固定延迟时间执行任务public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
2.3 ScheduledThreadPoolExecutor 类
ScheduledExecutorService只定义了相应的规范,还需要具体类进行实现。
通过查看 Executors.newSingleThreadScheduledExecutor(),具体实现如下
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}
DelegatedScheduledExecutorService 只是一个包装类,核心逻辑在 ScheduledThreadPoolExecutor。
其构造函数调用了父类的构造函数的时候,传入了 DelayedWorkQueue 延时阻塞队列。
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {// 构造函数1public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());}// 构造函数2public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);}// 构造函数3public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);}// 构造函数4public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);}//ScheduledThreadPoolExecutor 提供了4个构造函数,每个构造函数都调用了父类ThreadPoolExecutor的构造函数,这四个经典参数中,DelayedWorkQueue是不变的,说明它是实现任务队列的关键。//向队列提交任务。// 查看源码,所有提交任务的方法,经过包装后最终会调用delayedExecute,像队列中新增任务。并调用父类的ensurePrestart()方法确认线程池已经准备就绪。private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart();}}
}
到这时ScheduledExecutorService的秘密浮出水面,核心在于 DelayedWorkQueue。
2.4 DelayedWorkQueue 类实现
DelayedWorkQueue 类的定义如下。可以看到,DelayedWorkQueue的实现了BlockingQueue接口,可以传入JDK的线程池进行消费。
static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {
}
但是具体何如实现队列中的任务,在指定的时间被调度呢?
首先来看一下他的队列实现方式。
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {/** A DelayedWorkQueue is based on a heap-based data structure* like those in DelayQueue and PriorityQueue, except that* every ScheduledFutureTask also records its index into the* heap array. * ...* * 从以上注释中可以知道 DelayedWorkQueue 本质上也是一个基于堆的数据结构。*///初始化了一个数组private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];//用于新增元素时向上移动元素private void siftUp(int k, RunnableScheduledFuture<?> key) {//省略代码}//去除元素时向下移动元素private void siftDown(int k, RunnableScheduledFuture<?> key) {//省略代码}private final ReentrantLock lock = new ReentrantLock();private final Condition available = lock.newCondition();//查看阻塞队列的take方法public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {//1、从队首获取待执行的任务。此任务在队列中最早执行。RunnableScheduledFuture<?> first = queue[0];if (first == null)// 2 队列中还没有元素,等待。available.await();else {//3 delay=下次执行时间-当前时间。当<=0说明需要被执行。long delay = first.getDelay(NANOSECONDS);if (delay <= 0) {// 4 删除队列的第一个个元素并重新向下排序。返回任务return finishPoll(first);}first = null; // don't retain ref while waiting 等待时去除引用if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {//5 当前线程等待,直到被唤醒或者等待时长结束available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}}/*** 查看finishPoll方法。* * 使用最后一个元素替换掉当前元素,并且重新向下排序。* 注意,这时第一个元素已经从队列中去除,这一点与Timer的实现方式不同。* Timer是修改时间了之后,从上往下重新排序。只需要排序一次。* ScheduledExecutorService执行一个定时任务,需要进行两次排序。第一次是获取了task,第二次是真正执行task的时候。* * @param f* @return*/private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {int s = --size;RunnableScheduledFuture<?> x = queue[s];queue[s] = null;if (s != 0)//把队列头部换成x,向下排序siftDown(0, x);setIndex(f, -1);return f;}
}
具体流程如下:
2.5 ScheduledFutureTask 类
从上一节DelayedWorkQueue类中的take方法和finishPoll方法可知,在线程池获取task后,已经从队列中移走,那么对于重复执行的队列怎么办呢?那就是在线程池执行run方法前,重新将task加到队列中。
// 注意 ScheduledFutureTask 属于 ScheduledThreadPoolExecutor 的成员内部类,因此可以使用ScheduledThreadPoolExecutor方法和成员变量。
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {/** 下次执行时间 */private long time;/** 周期性执行任务的时间间隔 分为0、正、负值。*/private final long period;ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}/**Overrides FutureTask version so as to reset/requeue if periodic. * 重写了 FutureTask 的run方法,主要是增加了重置下个调度时间以及重新排序*/public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))//判断当前状态能不能运行cancel(false);else if (!periodic)//非周期性的任务,只执行一次ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {//设置下次执行时间setNextRunTime();//直接调用外部类的方法。重新将任务加入到队列中。reExecutePeriodic(outerTask);}}}//重新将任务加入到队列中void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(true)) {//往队列加任务super.getQueue().add(task);if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);elseensurePrestart();}}
}
2.6 总结
通过分析源码可以看出,ScheduledExecutorService 是通过实现一个优先队列来存储和调度任务的。从原理上来说是和Timer是类似的。可以认为是Timer 的升级版,新增了线程池执行任务的功能。
ScheduledExecutorService 和 Timer 比较
框架 | 相同 | 不同 |
---|---|---|
Timer | 都使用堆作为数据结构 | 1、同步执行任务 2、从队列头部获取任务之后,直接修改下次执行时间,直接排序 |
ScheduledExecutorService | 1、线程池异步执行任务 2、从头部获取任务后,移除当前任务 ,排序一次。在执行任务前修改时间后,再提交到队列。相当于排序两次 |
但是ScheduledExecutorService也有一定的局限性,那就是任务只能执行一次或者以固定的时间差周期性执行。不够灵活。
3 Spring Task
Spring Task处于spring-context项目的org.springframework.scheduling包下。可以通过注解的方式,将Spring bean中的某个方法变成一个task,非常方便。而且引入了cron表达式,使用更加灵活。
3.1 Spring Task 简单用法
新建一个maven项目,引入spring-context包
<dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.3.15</version>
</dependency>
新建一个启动类
@Configuration
@EnableScheduling
public class Application {public static void main(String[] args) {AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(Application.class);}@Scheduled(fixedRate = 1*1000)public void schedled(){System.out.println("执行定时任务,time="+System.currentTimeMillis()/1000%60+"秒");}
}
启动main方法后,执行结果如下:
执行定时任务,time=18秒
执行定时任务,time=19秒
执行定时任务,time=20秒
执行定时任务,time=21秒
从以上demo中可以看出,在spring中,只需要两个注解@EnableScheduling和@Scheduled,就可以启动一个定时任务了,非常方便。
3.2 Spring Task核心类
初始化基本流程
需要注意的是,Spring Task本质上只定义了提交task的抽象类,并没有提供具体的调度的实现。
为了实现开箱可用,Spring task将ScheduledExecutorService适配成了默认的实现,但是可以根据具体的需要,替换成其他实现。
3.3 Trigger接口及实现类
Trigger排在最前面介绍,是因为Timer和ScheduledExcecutorService是没有Trigger的概念的,都是再在各自的Task类中拥有一个下次执行时间的属性。Timer的TimerTask是private long nextExecutionTime。ScheduledFutureTask的是private long time。为了让任务的触发时间更加灵活,引入了Trigger的概念。
Trigger的本意为触发器,枪的扳机。因此相对的,当一个任务被触发时,常常用fire这个词。
Trigger的本质是封装了获取下一次执行时间的逻辑。
public interface Trigger {// 获取下一次执行时间@NullableDate nextExecutionTime(TriggerContext triggerContext);
}//上下文,用于存储一些时间变量
public interface TriggerContext {default Clock getClock() {return Clock.systemDefaultZone();}//上一次计划执行时间@NullableDate lastScheduledExecutionTime();// 上一次具体执行时间@NullableDate lastActualExecutionTime();//完成时间@NullableDate lastCompletionTime();}
//TriggerContext实现类很简单
public class SimpleTriggerContext implements TriggerContext {private final Clock clock;@Nullableprivate volatile Date lastScheduledExecutionTime;@Nullableprivate volatile Date lastActualExecutionTime;@Nullableprivate volatile Date lastCompletionTime;
}
spring task中提供了两个Trigger实现
周期性触发器-目前没有看到使用的地方
public class PeriodicTrigger implements Trigger {private final long period;private final TimeUnit timeUnit;private volatile long initialDelay;private volatile boolean fixedRate;@Overridepublic Date nextExecutionTime(TriggerContext triggerContext) {Date lastExecution = triggerContext.lastScheduledExecutionTime();Date lastCompletion = triggerContext.lastCompletionTime();if (lastExecution == null || lastCompletion == null) {return new Date(triggerContext.getClock().millis() + this.initialDelay);}if (this.fixedRate) {return new Date(lastExecution.getTime() + this.period);}return new Date(lastCompletion.getTime() + this.period);}}
CronTrigger-支持cron表达式
public class CronTrigger implements Trigger {private final CronExpression expression;private final ZoneId zoneId;//提供了多个构造函数 完成 expression 和 zoneId的复制public CronTrigger(String expression) {this(expression, ZoneId.systemDefault());}public CronTrigger(String expression, TimeZone timeZone) {this(expression, timeZone.toZoneId());}public CronTrigger(String expression, ZoneId zoneId) {Assert.hasLength(expression, "Expression must not be empty");Assert.notNull(zoneId, "ZoneId must not be null");this.expression = CronExpression.parse(expression);this.zoneId = zoneId;}@Overridepublic Date nextExecutionTime(TriggerContext triggerContext) {Date date = triggerContext.lastCompletionTime();if (date != null) {Date scheduled = triggerContext.lastScheduledExecutionTime();if (scheduled != null && date.before(scheduled)) {// Previous task apparently executed too early...// Let's simply use the last calculated execution time then,// in order to prevent accidental re-fires in the same second.date = scheduled;}}else {date = new Date(triggerContext.getClock().millis());}ZonedDateTime dateTime = ZonedDateTime.ofInstant(date.toInstant(), this.zoneId);ZonedDateTime next = this.expression.next(dateTime);return (next != null ? Date.from(next.toInstant()) : null);}
}
目前,在Spring task中,只有cron类型的任务才用到Trigger,其他的固定固定周期执行和固定频率执行,还是用传统的模式。
3.4 Task父类以及子类
Task是所有Spring task的所有父类,比较简单,就是定义了runnable变量,用来存储业务逻辑,而其他周期性执行,延迟执行等参数留给子类实现
public class Task {private final Runnable runnable;
}
其子类如下:
从图上可知,Task的子类分为两种:
- Trigger触发的Task。代表为CronTask
- 固定间歇执行的Task。代表为FixedDelayTask和FixedRateTask。其中FixedDelayTask和FixedRateTask本身没有任何实现,只是为了区分不同的调度方式(还记得Timer中的正负period吗),具体功能都是由父类IntervalTask承担。
public class IntervalTask extends Task {private final long interval;private final long initialDelay;public IntervalTask(Runnable runnable, long interval, long initialDelay) {super(runnable);this.interval = interval;this.initialDelay = initialDelay;}
}
public class FixedRateTask extends IntervalTask {public FixedRateTask(Runnable runnable, long interval, long initialDelay) {super(runnable, interval, initialDelay);}
}
public class FixedDelayTask extends IntervalTask {public FixedDelayTask(Runnable runnable, long interval, long initialDelay) {super(runnable, interval, initialDelay);}
}
public class TriggerTask extends Task {private final Trigger trigger;public TriggerTask(Runnable runnable, Trigger trigger) {super(runnable);this.trigger = trigger;}
}public class CronTask extends TriggerTask {private final String expression;public CronTask(Runnable runnable, String expression) {this(runnable, new CronTrigger(expression));}
}
3.3 TaskScheduler接口逻辑
TaskScheduler 是Spring task中的任务调度接口,定义了一系列提交任务的方法,与 ScheduledExecutorService 角色相当。
方法概览如下:
public interface TaskScheduler {default Clock getClock() {return Clock.systemDefaultZone();}@NullableScheduledFuture<?> schedule(Runnable task, Trigger trigger);ScheduledFuture<?> schedule(Runnable task, Date startTime);ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);//另外提供了一些default方法default ScheduledFuture<?> schedule(Runnable task, Instant startTime) {return schedule(task, Date.from(startTime));}default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {return scheduleAtFixedRate(task, Date.from(startTime), period.toMillis());}default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {return scheduleAtFixedRate(task, period.toMillis());}default ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {return scheduleWithFixedDelay(task, Date.from(startTime), delay.toMillis());}default ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {return scheduleWithFixedDelay(task, delay.toMillis());}}
与ScheduledExecutorService的接口很类似
public interface ScheduledExecutorService extends ExecutorService {//delay时间后,执行一次任务public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);//delay时间后,执行一次任务有返回值的任务public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);//以固定频率执行任务public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);//以固定延迟时间执行任务public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
但需要注意的是新增了一个方法。
ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
这个方法比较特殊,也是实现cron表达式的关键,依靠 Trigger。Trigger在上一节已经有介绍。
看下TaskScheduler,spring提供了三个实现类。
实现类 ConcurrentTaskScheduler 注解上讲的很明白,就是一个将java.util.concurrent.ScheduledExecutorService 适配成 TaskScheduler 的适配器。
其构造器如下
ThreadPoolTaskScheduler 则是封装了ScheduledThreadPoolExecutor。
因此很明显,默认情况下,Spring-task的底层就是由ScheduledExecutorService来提供实际调度的。当然也可以自己实现一个TaskScheduler的实现类,但目前看来并没有理由再造一个这样的轮子。置于为什么没有直接使用ScheduledExecutorService,一是提供了一个新的方法提交Trigger。二是方便拓展,可以自己实现一个任务调度器。
public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements TaskScheduler {@Nullableprivate static Class<?> managedScheduledExecutorServiceClass;static {try {// 需要单独引入 javax.enterprise.concurrent-api 包。默认是没有的。ManagedScheduledExecutorServicemanagedScheduledExecutorServiceClass = ClassUtils.forName("javax.enterprise.concurrent.ManagedScheduledExecutorService",ConcurrentTaskScheduler.class.getClassLoader());}catch (ClassNotFoundException ex) {// JSR-236 API not available...managedScheduledExecutorServiceClass = null;}}private ScheduledExecutorService scheduledExecutor;private boolean enterpriseConcurrentScheduler = false;@Nullableprivate ErrorHandler errorHandler;private Clock clock = Clock.systemDefaultZone();public ConcurrentTaskScheduler() {super();this.scheduledExecutor = initScheduledExecutor(null);}public ConcurrentTaskScheduler(ScheduledExecutorService scheduledExecutor) {super(scheduledExecutor);this.scheduledExecutor = initScheduledExecutor(scheduledExecutor);}public ConcurrentTaskScheduler(Executor concurrentExecutor, ScheduledExecutorService scheduledExecutor) {super(concurrentExecutor);this.scheduledExecutor = initScheduledExecutor(scheduledExecutor);}private ScheduledExecutorService initScheduledExecutor(@Nullable ScheduledExecutorService scheduledExecutor) {if (scheduledExecutor != null) {this.scheduledExecutor = scheduledExecutor;// 当前实现类为 ManagedScheduledExecutorService的子类this.enterpriseConcurrentScheduler = (managedScheduledExecutorServiceClass != null &&managedScheduledExecutorServiceClass.isInstance(scheduledExecutor));} else {this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();this.enterpriseConcurrentScheduler = false;}return this.scheduledExecutor;}@Override@Nullablepublic ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {try {if (this.enterpriseConcurrentScheduler) {return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);} else {//默认为走到这里ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));return new ReschedulingRunnable(task, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();}} catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> schedule(Runnable task, Date startTime) {long initialDelay = startTime.getTime() - this.clock.millis();try {return this.scheduledExecutor.schedule(decorateTask(task, false), initialDelay, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {long initialDelay = startTime.getTime() - this.clock.millis();try {return this.scheduledExecutor.scheduleAtFixedRate(decorateTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {try {return this.scheduledExecutor.scheduleAtFixedRate(decorateTask(task, true), 0, period, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {long initialDelay = startTime.getTime() - this.clock.millis();try {return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {try {return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), 0, delay, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}private Runnable decorateTask(Runnable task, boolean isRepeatingTask) {Runnable result = TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);if (this.enterpriseConcurrentScheduler) {result = ManagedTaskBuilder.buildManagedTask(result, task.toString());}return result;}/*** Delegate that adapts a Spring Trigger to a JSR-236 Trigger.* Separated into an inner class in order to avoid a hard dependency on the JSR-236 API.*/private class EnterpriseConcurrentTriggerScheduler {public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) {ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor;return executor.schedule(task, new javax.enterprise.concurrent.Trigger() {@Override@Nullablepublic Date getNextRunTime(@Nullable LastExecution le, Date taskScheduledTime) {return (trigger.nextExecutionTime(le != null ?new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) :new SimpleTriggerContext()));}@Overridepublic boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) {return false;}});}}}
3.4 Scheduled 注解
首先看一下Scheduled的代码,看提供了哪些功能呢。
public @interface Scheduled {//核心参数如下//1、cron表达式String cron() default "";//2、固定延迟long fixedDelay() default -1;//3、固定频率long fixedRate() default -1;//4、固定的延迟long initialDelay() default -1;//时间单位,默认是毫秒TimeUnit timeUnit() default TimeUnit.MILLISECONDS;//一下是String类型的配,方便接收配置化的数据,如${fixedDelay:10}String fixedDelayString() default "";String fixedRateString() default "";String initialDelayString() default "";}
Scheduled 中包含了任务调度的相关配置参数。 相比较ScheduledExecutorService,多了cron表达式。在任务的控制上更加灵活,不再局限于固定重复周期。
3.5 ScheduledAnnotationBeanPostProcessor 类
spring-task需要@EnableScheduling开启注解,查看其定义:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
最重要的是@Import(SchedulingConfiguration.class),将SchedulingConfiguration注入到spring容器中
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}}
SchedulingConfiguration只做了一件事情,那就是配置了ScheduledAnnotationBeanPostProcessor。
ScheduledAnnotationBeanPostProcessor实现了Spring的后置处理器,因此在Spring启动后,可以根据相应的配置或者注解,可以筛选出对应的方法,封装成为ScheduledTask,等待被调用。在Spring初始化完成后将会触发任务的调度。
首先来看下 ScheduledMethodRunnable。ScheduledAnnotationBeanPostProcessor一个重要的目标就是将注解的方法封装成为ScheduledMethodRunnable。
public class ScheduledMethodRunnable implements Runnable {private final Object target;private final Method method;public ScheduledMethodRunnable(Object target, Method method) {this.target = target;this.method = method;}@Overridepublic void run() {try {ReflectionUtils.makeAccessible(this.method);this.method.invoke(this.target);} catch (InvocationTargetException ex) {}}
}
//只保留核心代码
public class ScheduledAnnotationBeanPostProcessorimplements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {//用来缓存task和TaskScheduler调度器private final ScheduledTaskRegistrar registrar;public ScheduledAnnotationBeanPostProcessor() {this.registrar = new ScheduledTaskRegistrar();}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) {// 忽略...// 查找Scheduled注解的MethodMap<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);});annotatedMethods.forEach((method, scheduledAnnotations) ->scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));//忽略。。}//处理Spring beanprotected void processScheduled(Scheduled scheduled, Method method, Object bean) {try {//将Spring bean 封装成为一个Runnable 。在执行Runnable方法时,使用反射技术 method.invoke(this.target),执行原本逻辑即可。Runnable runnable = createRunnable(bean, method);// 处理延迟时间long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());String initialDelayString = scheduled.initialDelayString();if (StringUtils.hasText(initialDelayString)) {Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");if (this.embeddedValueResolver != null) {initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);}if (StringUtils.hasLength(initialDelayString)) {try {initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");}}}// 1、处理cron表达式String cron = scheduled.cron();if (StringUtils.hasText(cron)) {//处理一下时区的问题String zone = scheduled.zone();if (this.embeddedValueResolver != null) {cron = this.embeddedValueResolver.resolveStringValue(cron);zone = this.embeddedValueResolver.resolveStringValue(zone);}if (StringUtils.hasLength(cron)) {Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");processedSchedule = true;if (!Scheduled.CRON_DISABLED.equals(cron)) {TimeZone timeZone;if (StringUtils.hasText(zone)) {timeZone = StringUtils.parseTimeZoneString(zone);}else {timeZone = TimeZone.getDefault();}tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));}}}// At this point we don't need to differentiate between initial delay set or not anymoreif (initialDelay < 0) {initialDelay = 0;}// 2、处理// Check fixed delaylong fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());if (fixedDelay >= 0) {processedSchedule = true;tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));}// 3、处理字符串形式的Scheduled ,比如配置化${time:1000}String fixedDelayString = scheduled.fixedDelayString();//延迟if (StringUtils.hasText(fixedDelayString)) {if (this.embeddedValueResolver != null) {fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);}if (StringUtils.hasLength(fixedDelayString)) {processedSchedule = true;try {fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit());}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");}tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));}}// 3 固定频率的任务long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());if (fixedRate >= 0) {processedSchedule = true;tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));}String fixedRateString = scheduled.fixedRateString();if (StringUtils.hasText(fixedRateString)) {if (this.embeddedValueResolver != null) {fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);}if (StringUtils.hasLength(fixedRateString)) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;try {fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit());}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");}tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));}}//忽略其他情况}catch (Exception e){}}protected Runnable createRunnable(Object target, Method method) {Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());return new ScheduledMethodRunnable(target, invocableMethod);}//监听容器刷新事件public void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext() == this.applicationContext) {finishRegistration();}}//这部分主要是从Spring中获取配置的 scheduler 只保留核心代码private void finishRegistration() {if (this.scheduler != null) {this.registrar.setScheduler(this.scheduler);}//检查是否做了配置SchedulingConfigurerif (this.beanFactory instanceof ListableBeanFactory) {Map<String, SchedulingConfigurer> beans =((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());AnnotationAwareOrderComparator.sort(configurers);for (SchedulingConfigurer configurer : configurers) {configurer.configureTasks(this.registrar);}}/*** 获取调度器的核心代码*/if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {try {//1、根据类型获取TaskScheduler实现类this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));}catch (NoUniqueBeanDefinitionException ex) {try {//2、实现类不唯一,尝试使用默认名称taskScheduler查找this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));}catch (NoSuchBeanDefinitionException ex2) {}}catch (NoSuchBeanDefinitionException ex) {try {//3、根据类型获取 ScheduledExecutorService 实现类this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));}catch (NoUniqueBeanDefinitionException ex2) {try {//4、实现类不唯一,尝试使用默认名称taskScheduler查找this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));}catch (NoSuchBeanDefinitionException ex3) {}}catch (NoSuchBeanDefinitionException ex2) {// Giving up -> falling back to default scheduler within the registrar...}}}this.registrar.afterPropertiesSet();}
}
从 ScheduledAnnotationBeanPostProcessor 源码可以看出,经过处理后,所有的任务和执行器都存放于ScheduledTaskRegistrar中。通过调用afterPropertiesSet()来启动任务。总结来说,做了以下3件事情
1、将被注解的方法封装成为 Task。
2、从容器中查找合适的 TaskScheduler。
3、将1和2都存到ScheduledTaskRegistrar。
其中查找 TaskScheduler 的来源分为4个。
1、调用ScheduledAnnotationBeanPostProcessor实例的set方法。
2、配置SchedulingConfigurer实现类到spring容器中。
3、配置 TaskScheduler 实现类到spring容器中。
4、配置ScheduledExecutorService实现类到Spring 容器中。
5、在1-4都没有做的情况下ScheduledTaskRegistrar会直接调用 Executors.newSingleThreadScheduledExecutor()获取一个ScheduledExecutorService。
其中1-4在 ScheduledAnnotationBeanPostProcessor 中实现, 5在ScheduledTaskRegistrar中实现。
3.6 ScheduledTaskRegistrar 类
ScheduledTaskRegistrar是一个核心类,也是一个容器类,保存了所有的task的定义(这地方和xxl-job的类似)。同时也是真正将Task提交给调度器的地方。具体看以下代码。
public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {//处理器,是Spring的类private TaskScheduler taskScheduler;//默认处理器 JDKprivate ScheduledExecutorService localExecutor;//根据不同的配置,缓存了不同的类型的task。名字与Scheduled中配置基本一样。@Nullableprivate List<TriggerTask> triggerTasks;@Nullableprivate List<CronTask> cronTasks;@Nullableprivate List<IntervalTask> fixedRateTasks;@Nullableprivate List<IntervalTask> fixedDelayTasks;private final Map<Task, ScheduledTask> unresolvedTasks = new HashMap<>(16);//已经加入执行器中的任务private final Set<ScheduledTask> scheduledTasks = new LinkedHashSet<>(16);@Overridepublic void afterPropertiesSet() {scheduleTasks();}//具体将任务提交到执行器中的方法protected void scheduleTasks() {if (this.taskScheduler == null) {//当容器中没有找到执行器的时候,将会使用 ScheduledExecutorService。此时只有一个线程处理。this.localExecutor = Executors.newSingleThreadScheduledExecutor();//需要将ScheduledExecutorService 封装成为 TaskScheduler 才能够使用this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);}if (this.triggerTasks != null) {for (TriggerTask task : this.triggerTasks) {addScheduledTask(scheduleTriggerTask(task));}}if (this.cronTasks != null) {for (CronTask task : this.cronTasks) {addScheduledTask(scheduleCronTask(task));}}if (this.fixedRateTasks != null) {for (IntervalTask task : this.fixedRateTasks) {addScheduledTask(scheduleFixedRateTask(task));}}if (this.fixedDelayTasks != null) {for (IntervalTask task : this.fixedDelayTasks) {addScheduledTask(scheduleFixedDelayTask(task));}}}public ScheduledTask scheduleTriggerTask(TriggerTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());}else {addTriggerTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}@Nullablepublic ScheduledTask scheduleCronTask(CronTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());}else {addCronTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}@Deprecated@Nullablepublic ScheduledTask scheduleFixedRateTask(IntervalTask task) {FixedRateTask taskToUse = (task instanceof FixedRateTask ? (FixedRateTask) task :new FixedRateTask(task.getRunnable(), task.getInterval(), task.getInitialDelay()));return scheduleFixedRateTask(taskToUse);}@Nullablepublic ScheduledTask scheduleFixedRateTask(FixedRateTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {if (task.getInitialDelay() > 0) {Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay());scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());}else {scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());}}else {addFixedRateTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}@Deprecated@Nullablepublic ScheduledTask scheduleFixedDelayTask(IntervalTask task) {FixedDelayTask taskToUse = (task instanceof FixedDelayTask ? (FixedDelayTask) task :new FixedDelayTask(task.getRunnable(), task.getInterval(), task.getInitialDelay()));return scheduleFixedDelayTask(taskToUse);}@Nullablepublic ScheduledTask scheduleFixedDelayTask(FixedDelayTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {if (task.getInitialDelay() > 0) {Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay());scheduledTask.future = this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval());}else {scheduledTask.future = this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), task.getInterval());}}else {addFixedDelayTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}}
3.7 如何执行cron表达式的任务
从前面的代码中我们知道, Spring-task默认使用 ScheduledExecutorService 作为底层逻辑,但是ScheduledExecutorService并不支持cron表达式。不过可以通过将cron表达式的任务封装成ScheduledExecutorService支持的参数即可。基本思想是将任务当成一次延时任务即可,等执行完上一次任务之后,如果还有下次,则重新提交到调度器。也就是:
1、将task在封装一层成为 ReschedulingRunnable,
2、计算cron的下次执行时间与当前的时间差delay。
3、调用提交任务。让任务延迟delay执行一次,注意只执行一次。
4、执行业务 run 方法。
5、重复执行2-4即可。
这个思路与ScheduledExecutorService获取task后再提交到队列中的思路是一样的。具体代码参考 ReschedulingRunnable 类和 CronTask 类。
class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {// trigger 封装了cron和获取下次触发时间的类private final Trigger trigger;// 上下文private final SimpleTriggerContext triggerContext;// 执行器private final ScheduledExecutorService executor;@Nullableprivate ScheduledFuture<?> currentFuture;@Nullableprivate Date scheduledExecutionTime;private final Object triggerContextMonitor = new Object();//重新将task封装成为 ReschedulingRunnable public ReschedulingRunnable(Runnable delegate, Trigger trigger, Clock clock,ScheduledExecutorService executor, ErrorHandler errorHandler) {super(delegate, errorHandler);this.trigger = trigger;this.triggerContext = new SimpleTriggerContext(clock);this.executor = executor;}@Overridepublic void run() {//实际执行时间Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());//执行业务方法super.run();//完成时间Date completionTime = new Date(this.triggerContext.getClock().millis());synchronized (this.triggerContextMonitor) {//更新相关参数this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);//判断是否可以继续执行if (!obtainCurrentFuture().isCancelled()) {//继续提交任务schedule();}}}@Nullablepublic ScheduledFuture<?> schedule() {synchronized (this.triggerContextMonitor) {//获取下次执行时间this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);if (this.scheduledExecutionTime == null) {//没有下次时间,终止执行return null;}long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();//提交到线程池 ScheduledExecutorService。只执行一次。this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);return this;}}//省略其他代码。。。
}
3.8 Spring task使用注意事项
通过ScheduledAnnotationBeanPostProcessor查找TaskScheduler的过程,以及ScheduledTaskRegistrar调度过程,我们知道,如果我们没有配置TaskScheduler实例,默认情况下使用 Executors.newSingleThreadScheduledExecutor()新建了一个实例,这个实例只有一个线程处理任务,在任务耗时比较高的情况下会有可能发生阻塞。最好是配置一个ScheduledExecutorService实例交给Spring管理
如:
@Configuration
@EnableScheduling
public class Application2 {public static void main(String[] args) {AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(Application2.class);}@Scheduled(fixedRate = 100)public void schedled(){System.out.println("执行定时任务,time="+System.currentTimeMillis()/1000%60+"秒。threadName="+Thread.currentThread().getName());}@Beanpublic ScheduledExecutorService executorService(){ThreadFactory factory = new ThreadFactory() {private int seq = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r,"定时任务线程池 seq="+seq++);}};ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3, factory);return scheduledExecutorService;}
}
3.9 总结
spring task中任务处理器为TaskScheduler实现类,任务为Task的子类。
基本的思想还是与 ScheduledExecutorService 想类似的。在默认情况下也是使用ScheduledExecutorService作为任务的处理器。
4 XXL-JOB
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。由于是国产开发的,中文文档,而且很全面,具体使用方法可以直接看官方文档 《分布式任务调度平台XXL-JOB》-官方文档 。
fe最初XXL-JOB的底层是Quartz,后来发现Quartz比较复杂,不利于扩展和维护,因此自研了一个调度器,简单但是很实用,很值得学习。
4.1 为什么我们需要一个分布式的调度平台
前面介绍的Timer, ScheduledExcutorService 和 Spring Task,都是针对单个实例内的任务调度,在分布式部署的时候,可能会有一定的问题。比如假设有一个任务A,是给用户发送消息的,设置每一秒执行一次,如果部署了3个实例,那么就会变成每秒执行3次,调度频率随着实例的增多而增多,如果没有加全局锁,会出现重复发送的问题。此外在实际的业务中,我们还有可能需要随时JOB的调度周期,随时停止和启动一个任务等,这些操作都需要发版才能实现。因此在分布式系统中,有一个分布式调度器尤为重要。
4.1.1 分布式调度平台的设计思路
通过前面的几个任务调度工具的介绍,我们可以总结出来,一个任务调度系统,包含以下几个元素:
其中,Runnable(业务逻辑)可以是任意的可执行的业务逻辑,包括一个HTTP请求。如果是通过HTTP请求调度其他服务的任务,那么就实现了一个最基本的任务调度服务。
但是,如果我们还希望任务的调度更加灵活,比如可以随时修改调度的频率,随时停止和启动,那么就必须将触发器持久化,比如存储到Mysql等。
除此以外,我们还希望这个调度服务是高可用的,因此多实例部署,加一个锁就可以了
4.2 XXL-JOB 的模块
从github 上下载源码,可以看到XXL-JOB的核心模块分为2个,xxl-job-admin 和 xxl-job-core。另外的xxl-job-executor-samples是一个例子模块
核心模块的作用如下:
模块 | 说明 | 功能 |
---|---|---|
xxl-job-admin | 服务端(调度中心) | 管理界面+任务调度 |
xxl-job-core | 客户端(执行器) | 在项目中引用,执行业务逻辑 |
从模块划分可以看出,xxl-job的任务调度和任务的执行是分开的,客户端只管执行任务,不用管任务单的调度。任务调度是由服务端执行,这样各司其职,进行了解耦,提高系统整体稳定性和扩展性
官方架构图
如果大家心中没有任务调度的概念,直接看官方架构图是有些吃力的,因此我做了简化,保留了核心部分,如下图:
从上图可以看出,XXL-JOB框架分为三个结构。
1、Mysql:存储相关信息
2、客户端:将自己注册到服务端,等待任务下发。
3、服务端:维护JOB的信息,将需要执行的JOB,通过一定的策略,找到对应的客户端地址,发送HTTP请求,客户端执行即可。
4.3 xxl-job-admin 解析
服务端核心类如下:
类 | 功能 |
---|---|
JobScheduleHelper | 调度器,将需要执行的JOB提交到线程池 |
XxlJobInfo | 实体类,记录了一个任务的配置,持久化到mysql中 |
JobTriggerPoolHelper | 线程池,多个线程发送job到客户端 |
XxlJobTrigger | 触发器, 真正处理 XxlJobInfo并发送到制定的客户端 |
JobRegistryHelper | 注册中心 ,接收客户端的注册和心跳 |
以上类处于com.xxl.job.admin.core包
4.3.1 服务端初始化入口
admin的初始化入口在XxlJobAdminConfig,是InitializingBean的实现类,因此在spring配置文件初始化完成后就触发了XXL-JOB的初始化
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {private XxlJobScheduler xxlJobScheduler;@Overridepublic void afterPropertiesSet() throws Exception {adminConfig = this;xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();}//省略
}
从上边可以看到,真正执行初始化的类是XxlJobScheduler,初始化逻辑在init()方法中。
public class XxlJobScheduler {//启动了一些列的线程或者线程池来来处理相关逻辑public void init() throws Exception {// init i18ninitI18n();// admin trigger pool start 触发器,触发http请求。接收JobScheduleHelper发送过来的triggerJobTriggerPoolHelper.toStart();// admin registry monitor run 处理注册上来的服务IP地址,并监控心跳等信息JobRegistryHelper.getInstance().start();// admin fail-monitor run 生成一些任务失败的告警等JobFailMonitorHelper.getInstance().start();// admin lose-monitor run ( depend on JobTriggerPoolHelper ) // 处理执行完成的任务,并监控丢失执行器的任务记录为失败。JobCompleteHelper.getInstance().start();// admin log report start 调度统计JobLogReportHelper.getInstance().start();// start-schedule ( depend on JobTriggerPoolHelper )// 调度器JobScheduleHelper.getInstance().start();}//省略。。。
}
服务端初始化流程图如下:
XXL-JOB服务端在初始化过程中启动了多个后台线程或者线程池,用于异步处理多项任务。
4.3.2 JobScheduleHelper 类
从前面对任务调度的介绍可以看出,一个任务调度器,离不开
1.带有执行时间的任务列表
2.轮询执行任务的调度器
XXL-JOB 也不例外。其中JobScheduleHelper类就属于轮询执行任务的调度器,包含了任务调用的基本逻辑,属于必看的类
具体代码如下
public class JobScheduleHelper {//预读5秒public static final long PRE_READ_MS = 5000; // pre read//时间轮刻度-任务ID映射表private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();public void start(){scheduleThread = new Thread(new Runnable() {@Overridepublic void run() {// 按照默认配置 (快200+慢100)* 20 = 6000int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();//数据库连接Connection conn = null;PreparedStatement preparedStatement = null;boolean preReadSuc = true;try {conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();conn.setAutoCommit(false);//加锁preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// 1、预读 查询数据库,获取下次执行时间 <= 当前时间+5秒 的所有JOBlong nowTime = System.currentTimeMillis();//SELECT * FROM xxl_job_info AS t WHERE t.trigger_status = 1 and t.trigger_next_time <= #{maxNextTime} ORDER BY id ASC LIMIT #{pagesize}List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {// 2、遍历处理JOB,看是直接提交给线程池还是先提交到 time-ring后再提交给线程池。for (XxlJobInfo jobInfo: scheduleList) {// time-ring jumpif (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、超时>5秒以上,当做错失触发时机// 1、超时触发策略MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);}// 2、更新时间refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {// 2.2、超时时间在5秒以内// 1、提交到线程池JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);// 2、刷新一次时间refreshNextValidTime(jobInfo, new Date());if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {//下次执行在5秒内,说明下次循环还有它,可以再预读一次,直接提交到时间轮,提高想能// 1、计算时间轮刻度int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、提交到时间轮pushTimeRing(ringSecond, jobInfo.getId());// 3、再更新一次时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 2.3、未到执行时间// 1、计算时间轮刻度(60秒时间轮)int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、提交到时间轮线程pushTimeRing(ringSecond, jobInfo.getId());// 3、刷新refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、将修改了下次执行时间的任务存到数据库中for (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {preReadSuc = false;}} finally {//省略处理数据库连接}long cost = System.currentTimeMillis()-start;//省略。。。}}});scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();// ring threadringThread = new Thread(new Runnable() {@Overridepublic void run() {while (!ringThreadToStop) {//时间轮代码,忽略}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");}});ringThread.setDaemon(true);ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");ringThread.start();}//忽略其他代码
}
源码逻辑解析
从以上逻辑看,XXL-JOB的核心逻辑与JDK的 ScheduledExecutorService 是基本类似的。都是先从一个队列(xxl-job是使用mysql排序)中取出JOB,然后提交给线程池处理。
但有一点区别:XXL-JOB是从数据库读取数据,因此为了提高性能,做了一个预读5秒的变化。未到时间执行的job提交给时间轮,再由时间轮提交给线程池处理。
4.3.3 XxlJobTrigger 类
经过JobScheduleHelper调度,job的参数会被提交的线程池,线程池由JobTriggerPoolHelper实现,比较简单,不再描述,然后最终会使用 XxlJobTrigger 是触发执行job的地方。
public class XxlJobTrigger {public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {//1、 从数据库获取job配置XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo == null) {return;}if (executorParam != null) {//如果传参就覆盖jobInfo.setExecutorParam(executorParam);}int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();// 2 从数据库获取分组信息(本质就是获取接收job的地址)XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressListif (addressList!=null && addressList.trim().length()>0) {group.setAddressType(1);//传参的话就覆盖group.setAddressList(addressList.trim());}// sharding paramint[] shardingParam = null;if (executorShardingParam!=null){String[] shardingArr = executorShardingParam.split("/");if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {shardingParam = new int[2];shardingParam[0] = Integer.valueOf(shardingArr[0]);shardingParam[1] = Integer.valueOf(shardingArr[1]);}}if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {//分片for (int i = 0; i < group.getRegistryList().size(); i++) {processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {if (shardingParam == null) {//分片参数,这里意思是只发给一个地址。shardingParam = new int[]{0, 1};}processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}/*** @param group job group, registry list may be empty* @param jobInfo* @param finalFailRetryCount 纯粹是为了打日志* @param triggerType 为了打日志* @param index sharding index* @param total sharding index*/private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){//阻塞策略ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy//路由策略 就是如何发给客户端,如第一个,最后一个,一致性哈希ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy//分片参数 日志记录用String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;// 1、save log-idXxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());//记录日志XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);// 2、组装参数TriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init addressString address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {//广播模式,获取指定下标的 url 。if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {//根据路由策略获取 url.routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executorReturnT<String> triggerResult = null;if (address != null) {triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger info//忽略一长串组装日志代码//保存日志XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);}//根据地址url,将参数发送到指定的客户端。public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ReturnT<String> runResult = null;try {ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");runResultSB.append("<br>address:").append(address);runResultSB.append("<br>code:").append(runResult.getCode());runResultSB.append("<br>msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());return runResult;}}
总结以上XxlJobTrigger类的代码,做了几件事
1、根据jobId从数据库获取job参数
2、根据job参数获取groupId后,再获取分组信息,里面包含了分组中的客户端 ip:port地址。
3、根据路由策略,获取指定的address,将job参数通过http发送往客户端。
4、记录日志。
4.3.4 JobRegistryHelper 类
客户端会定时上报自身的ip+port,JobRegistryHelper就是专门处理这些信息的。
实现类比较简单,就补贴源码了,只讲一下逻辑。
1、定义了一个线程池,专门保存或者修改用来处理客户端上报的address。
2、定义了一个后台线程,周期性(30s)处理以下逻辑:清除过期的客户端注册信息(30*3s不上报),将最新的address更新到各自的任务组中。
4.3.4 路由策略
路由策略抽象类为 ExecutorRouter,在配置job的时候指定的路由策略,就有对应的ExecutorRouter子类去实现。
public abstract class ExecutorRouter {/*** route address* @param addressList* @return ReturnT.content=address*/public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);}
1、第一个:直接取addressList第一个地址
2、最后一个:直接取 addressList最后地址
3、轮询: 对调度次数进行计数n,n%addressList.size获取地址下标。
4、随机: 随机取一个。
5、一致性hash: 使用java.util.TreeMap.tailMap()方法来实现。负载均衡之一致性哈希环算法
6、最不经常使用:LFU(Least Frequently Used):最不经常使用,频率/次数
7、最近最久未使用:LRU(Least Recently Used):最近最久未使用,时间
8、故障转移:对addressList进行循环http请求,第一个正常返回的地址作为调度地州。
9、忙碌转移:通过http请求客户端检查JobThread,第一个空闲的客户端作为调度客户端。
10、分片广播:发送给所有的客户端。
4.4 客户端逻辑
客户端核心类如下:
类 | 功能 | 说明 |
---|---|---|
XxlJob | Task注解 | 被标注的方法将会被处理成为 IJobHandler, 与@Scheduled注解功能相似 (19年底新增注解)。 每个IJobHandler有唯标识 |
EmbedServer | 客户端server | 启动一个netty,用于接收服务端的调度 |
ExecutorBizImpl | 处理服务端的请求 | EmbedServer接收请求后,实际交给ExecutorBizImpl进行处理,里面有处理阻塞策略 |
TriggerParam | 触发参数 | 记录服务端发送过来的任务 |
JobThread | Job线程 | 用 LinkedBlockingQueue 缓存服务端传递过来的 TriggerParam。轮询 LinkedBlockingQueue,顺序处理同一个job的任务 |
IJobHandler | Task抽象类 | 被@XxlJob的注释的方法,或者通过服务端传递过来的代码,将会封装成为一个 IJobHandler 实现类 |
XxlJobContext | 上下文 | 内置InheritableThreadLocal,在线程中存储变量,供给IJobHandler |
从以上表格基本可以看出客户端的执行逻辑,其中比较重要的是 ExecutorBizImpl 和 JobThread,以及XxlJob注解的原理。 将会对这三个类进行介绍。
4.4.1 @XxlJob 注解原理
在客户端引入XXL-JOB的时候,一般需要进行如下配置
@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}
查看XxlJobSpringExecutor 具体代码如下
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {// start@Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository (for method)initJobHandlerMethodRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super starttry {super.start();} catch (Exception e) {throw new RuntimeException(e);}}//通过Spring的ApplicationContext,获取到使用了@XxlJob注解的类,缓存起来。private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}// init job handler from methodString[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBeantry {annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {Method executeMethod = methodXxlJobEntry.getKey();XxlJob xxlJob = methodXxlJobEntry.getValue();// 父类核心方法registJobHandler(xxlJob, bean, executeMethod);}}}}
继续看父类XxlJobExecutor,可以看到使用一个ConcurrentMap缓存了包装过的业务方法。其中key为每个job的唯一标识,与服务端的key一一对应。
public class XxlJobExecutor {//使用一个ConcurrentMap缓存了包装过的业务方法。其中key为每个job的唯一标识,与服务端的key一一对应。private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){if (xxlJob == null) {return;}String name = xxlJob.value();//make and simplify the variables since they'll be called several times laterClass<?> clazz = bean.getClass();String methodName = executeMethod.getName();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");}if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");}executeMethod.setAccessible(true);// init and destroyMethod initMethod = null;Method destroyMethod = null;//初始化代码if (xxlJob.init().trim().length() > 0) {try {initMethod = clazz.getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");}}//销毁代码if (xxlJob.destroy().trim().length() > 0) {try {destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");}}// registry jobhandlerregistJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}
}
最终包装成为了MethodJobHandler
public class MethodJobHandler extends IJobHandler {private final Object target;private final Method method;private Method initMethod;private Method destroyMethod;
}
以上流程总结如下
在spring启动,所有单例类都创建完成后,触发从ApplicationContext获取所有标注了 @XxlJob的bean和对应方法。最终封装成为MethodJobHandler,存储到了了一个ConcurrentMap中。key为JOB的唯一标识,与服务端一对一对应。等待服务端的调用。
4.4.2 JobThread
从@XxlJob的原理,可以看到,一个job最终会被封装成为MethodJobHandler,那么客户端如何处理服务端下发的调度任务呢。
J
obThread是真正客户端真正执行任务的地方。每一个JAVA类型的JOB都会对应一个JobThread。
public class JobThread extends Thread{private static Logger logger = LoggerFactory.getLogger(JobThread.class);private int jobId;//标注了@XxlJob的方法或者从前端传过来的代码脚本。private IJobHandler handler;// 存储服务端穿过来的请求,如果前一个任务没有执行文,后续的会继续存在这里。private LinkedBlockingQueue<TriggerParam> triggerQueue;// 服务端每发送一次到客户端,会生成一个唯一的JOBid,可以用来做幂等,防止HTTP请求重试等造成重复调用。private Set<Long> triggerLogIdSet;private volatile boolean toStop = false;private String stopReason;private boolean running = false; // if running jobprivate int idleTimes = 0; // idel timespublic JobThread(int jobId, IJobHandler handler) {this.jobId = jobId;this.handler = handler;this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());// assign job thread namethis.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis());}public IJobHandler getHandler() {return handler;}//存储服务端调度请求public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}//杀死调度任务public void toStop(String stopReason) {/*** Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;*/this.toStop = true;this.stopReason = stopReason;}//启动线程@Overridepublic void run() {// inittry {handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// 死循环知道停止while(!toStop){running = false;//统计空闲次数 超过30次就终止线程idleTimes++;TriggerParam triggerParam = null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0;triggerLogIdSet.remove(triggerParam.getLogId());// 创建日志文件,用于存储日志,日志异步上报。String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// 初始化上下文,使用InheritableThreadLocal保存XxlJobContext.setXxlJobContext(xxlJobContext);// executeXxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());if (triggerParam.getExecutorTimeout() > 0) {// 设定调度过期时间Thread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);handler.execute();return true;}});futureThread = new Thread(futureTask);futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {//没有过期时间,直接执行handler.execute();}// 处理执行结果if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {XxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {if (idleTimes > 30) {if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}// handle resultStringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();XxlJobHelper.handleFail(errorMsg);XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {if(triggerParam != null) {// callback handler infoif (!toStop) {// 提交处理结果到队列中,等待上报TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// 提交处理结果到队列中,等待上报TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job running, killed]" ));}}}}// callback trigger request in queuewhile(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}// destroytry {handler.destroy();} catch (Throwable e) {logger.error(e.getMessage(), e);}logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());}
}
外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
4.4.3 ExecutorBizImpl
客户端在启动后,会在EmbedServer实例中启动一个netty专门接收从服务端发送来的请求。其中包括检查服务端心跳,job线程心跳,终止调度,读取日志以及JOB调度等,都是交给ExecutorBizImpl进行处理。这里主要介绍下任务调度的过程:
public class ExecutorBizImpl implements ExecutorBiz {@Overridepublic ReturnT<String> run(TriggerParam triggerParam) {// 根据id获取 JOB的执行线程JobThreadJobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());// 获取jobThread内部的 jobHandlerIJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;String removeOldReason = null;// valid:jobHandler + jobThreadGlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());if (GlueTypeEnum.BEAN == glueTypeEnum) {// new jobhandlerIJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());// valid old jobThreadif (jobThread!=null && jobHandler != newJobHandler) {// change handler, need kill old threadremoveOldReason = "change jobhandler or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = newJobHandler;if (jobHandler == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");}}} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof GlueJobHandler&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change handler or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {try {IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());}}} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof ScriptJobHandler&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change script or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));}} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");}// executor block strategyif (jobThread != null) {ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// discard when runningif (jobThread.isRunningOrHasQueue()) {return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {// kill running jobThreadif (jobThread.isRunningOrHasQueue()) {removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread = null;}} else {// just queue trigger}}// replace thread (new or exists invalid)if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}// push data to queueReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);return pushResult;}
}
4.5 几个表格作用
xxl_job_group:任务分组。一个执行器算作一个组。每组下面会记录对应的实例地址。
xxl_job_info:具体的任务信息。
xxl_job_lock:分布式锁
xxl_job_log:日志
xxl_job_log_report:调度统计
xxl_job_logglue: 可以记录GLUE模式代码历史版本
xxl_job_registry:注册信息表,每一台机器注册上来,都会记录一条记录。
xxl_job_user:用户表
其中xxl_job_group,xxl_job_info,xxl_job_lock是调度器的关键,其他的是起到支撑辅助作用。
5 Quartz
5.1 什么是 Quartz
从 Quartz官网 简介可以知道,
Quartz 是一个开源的任务调度框架,可以用于单体应用,也可以用于大型的电子商务平台,支持成千上万的任务。
5.1 Quartz 简单demo
新建一个maven项目,引入依赖
<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.1</version>
</dependency>
public class Application {public static void main(String[] args) {try {Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();scheduler.start();//定义一个工作对象 设置工作名称与组名JobDetail job = JobBuilder.newJob(HelloJob.class).withIdentity("job41","group1").build();//定义一个触发器 简单Trigger 设置工作名称与组名 5秒触发一次Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1","group1").startNow().withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(5)).build();//设置工作 与触发器scheduler.scheduleJob(job, trigger);
// scheduler.shutdown();} catch (SchedulerException se) {se.printStackTrace();}}
}
Scheduler 是一个接口定义了一系列提交job的方法。
其实现类如下:
查看 StdScheduler 本质上是一个代理类,代理了 QuartzScheduler 类的所有方法。
挑选一个方法来看:
public class QuartzScheduler implements RemotableQuartzScheduler {//存储jobprivate QuartzSchedulerResources resources;public Date scheduleJob(JobDetail jobDetail,Trigger trigger) throws SchedulerException {//省略校验OperableTrigger trig = (OperableTrigger) trigger;if (trigger.getJobKey() == null) {trig.setJobKey(jobDetail.getKey());} else if (!trigger.getJobKey().equals(jobDetail.getKey())) {throw new SchedulerException("Trigger does not reference given job!");}trig.validate();Calendar cal = null;if (trigger.getCalendarName() != null) {cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());}Date ft = trig.computeFirstFireTime(cal);if (ft == null) {throw new SchedulerException("Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");}resources.getJobStore().storeJobAndTrigger(jobDetail, trig);notifySchedulerListenersJobAdded(jobDetail);notifySchedulerThread(trigger.getNextFireTime().getTime());notifySchedulerListenersSchduled(trigger);return ft;}//start方法public void start() throws SchedulerException {if (shuttingDown|| closed) {throw new SchedulerException("The Scheduler cannot be restarted after shutdown() has been called.");}// QTZ-212 : calling new schedulerStarting() method on the listeners// right after entering start()notifySchedulerListenersStarting();if (initialStart == null) {initialStart = new Date();//启动了线程this.resources.getJobStore().schedulerStarted();startPlugins();} else {resources.getJobStore().schedulerResumed();}schedThread.togglePause(false);getLog().info("Scheduler " + resources.getUniqueIdentifier() + " started.");notifySchedulerListenersStarted();}
}
可以看到存储和触发的代码
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
其中resources.getJobStore() 为 JobStore 实例。用于存储job和triger提供给 QuartzScheduler 使用。
JOB Store
RAMJobStore 内存行存储,单机情况下默认。
JDBC JobStore 数据库。
查看 RAMJobStore 。
public class RAMJobStore implements JobStore {/** ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~* * Data members.* * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/protected HashMap<JobKey, JobWrapper> jobsByKey = new HashMap<JobKey, JobWrapper>(1000);protected HashMap<TriggerKey, TriggerWrapper> triggersByKey = new HashMap<TriggerKey, TriggerWrapper>(1000);protected HashMap<String, HashMap<JobKey, JobWrapper>> jobsByGroup = new HashMap<String, HashMap<JobKey, JobWrapper>>(25);protected HashMap<String, HashMap<TriggerKey, TriggerWrapper>> triggersByGroup = new HashMap<String, HashMap<TriggerKey, TriggerWrapper>>(25);protected TreeSet<TriggerWrapper> timeTriggers = new TreeSet<TriggerWrapper>(new TriggerWrapperComparator());protected HashMap<String, Calendar> calendarsByName = new HashMap<String, Calendar>(25);protected Map<JobKey, List<TriggerWrapper>> triggersByJob = new HashMap<JobKey, List<TriggerWrapper>>(1000);protected final Object lock = new Object();protected HashSet<String> pausedTriggerGroups = new HashSet<String>();protected HashSet<String> pausedJobGroups = new HashSet<String>();protected HashSet<JobKey> blockedJobs = new HashSet<JobKey>();protected long misfireThreshold = 5000l;protected SchedulerSignaler signaler;//获取下一个任务public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) {synchronized (lock) {List<OperableTrigger> result = new ArrayList<OperableTrigger>();Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();Set<TriggerWrapper> excludedTriggers = new HashSet<TriggerWrapper>();long batchEnd = noLaterThan;// return empty list if store has no triggers.if (timeTriggers.size() == 0)return result;while (true) {TriggerWrapper tw;try {tw = timeTriggers.first();if (tw == null)break;timeTriggers.remove(tw);} catch (java.util.NoSuchElementException nsee) {break;}if (tw.trigger.getNextFireTime() == null) {continue;}if (applyMisfire(tw)) {if (tw.trigger.getNextFireTime() != null) {timeTriggers.add(tw);}continue;}if (tw.getTrigger().getNextFireTime().getTime() > batchEnd) {timeTriggers.add(tw);break;}// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then// put it back into the timeTriggers set and continue to search for next trigger.JobKey jobKey = tw.trigger.getJobKey();JobDetail job = jobsByKey.get(tw.trigger.getJobKey()).jobDetail;if (job.isConcurrentExectionDisallowed()) {if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {excludedTriggers.add(tw);continue; // go to next trigger in store.} else {acquiredJobKeysForNoConcurrentExec.add(jobKey);}}tw.state = TriggerWrapper.STATE_ACQUIRED;tw.trigger.setFireInstanceId(getFiredTriggerRecordId());OperableTrigger trig = (OperableTrigger) tw.trigger.clone();if (result.isEmpty()) {batchEnd = Math.max(tw.trigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;}result.add(trig);if (result.size() == maxCount)break;}// If we did excluded triggers to prevent ACQUIRE state due to DisallowConcurrentExecution, we need to add them back to store.if (excludedTriggers.size() > 0)timeTriggers.addAll(excludedTriggers);return result;}}
}
从以上源码可知 RAMJobStore 是存储job以及相关参数的地方。
其中特别注意的是timeTriggers属性,其使用TreeSet来保存,这个是实现类似Timer优先队列的作用。
QuartzSchedulerThread 的run方法,就是去获取任务的地方。
public class QuartzSchedulerThread extends Thread {/*** <p>* The main processing loop of the <code>QuartzSchedulerThread</code>.* </p>*/@Overridepublic void run() {int acquiresFailed = 0;while (!halted.get()) {try {// check if we're supposed to pause...synchronized (sigLock) {while (paused && !halted.get()) {try {// wait until togglePause(false) is called...sigLock.wait(1000L);} catch (InterruptedException ignore) {}// reset failure counter when paused, so that we don't// wait again after unpausingacquiresFailed = 0;}if (halted.get()) {break;}}// wait a bit, if reading from job store is consistently// failing (e.g. DB is down or restarting)..if (acquiresFailed > 1) {try {long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);Thread.sleep(delay);} catch (Exception ignore) {}}int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...List<OperableTrigger> triggers;long now = System.currentTimeMillis();clearSignaledSchedulingChange();try {triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());acquiresFailed = 0;if (log.isDebugEnabled())log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");} catch (JobPersistenceException jpe) {if (acquiresFailed == 0) {qs.notifySchedulerListenersError("An error occurred while scanning for the next triggers to fire.",jpe);}if (acquiresFailed < Integer.MAX_VALUE)acquiresFailed++;continue;} catch (RuntimeException e) {if (acquiresFailed == 0) {getLog().error("quartzSchedulerThreadLoop: RuntimeException "+e.getMessage(), e);}if (acquiresFailed < Integer.MAX_VALUE)acquiresFailed++;continue;}if (triggers != null && !triggers.isEmpty()) {now = System.currentTimeMillis();long triggerTime = triggers.get(0).getNextFireTime().getTime();long timeUntilTrigger = triggerTime - now;while(timeUntilTrigger > 2) {synchronized (sigLock) {if (halted.get()) {break;}if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {try {// we could have blocked a long while// on 'synchronize', so we must recomputenow = System.currentTimeMillis();timeUntilTrigger = triggerTime - now;if(timeUntilTrigger >= 1)sigLock.wait(timeUntilTrigger);} catch (InterruptedException ignore) {}}}if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {break;}now = System.currentTimeMillis();timeUntilTrigger = triggerTime - now;}// this happens if releaseIfScheduleChangedSignificantly decided to release triggersif(triggers.isEmpty())continue;// set triggers to 'executing'List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();boolean goAhead = true;synchronized(sigLock) {goAhead = !halted.get();}if(goAhead) {try {List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);if(res != null)bndles = res;} catch (SchedulerException se) {qs.notifySchedulerListenersError("An error occurred while firing triggers '"+ triggers + "'", se);//QTZ-179 : a problem occurred interacting with the triggers from the db//we release them and loop againfor (int i = 0; i < triggers.size(); i++) {qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));}continue;}}for (int i = 0; i < bndles.size(); i++) {TriggerFiredResult result = bndles.get(i);TriggerFiredBundle bndle = result.getTriggerFiredBundle();Exception exception = result.getException();if (exception instanceof RuntimeException) {getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));continue;}// it's possible to get 'null' if the triggers was paused,// blocked, or other similar occurrences that prevent it being// fired at this time... or if the scheduler was shutdown (halted)if (bndle == null) {qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));continue;}JobRunShell shell = null;try {shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);shell.initialize(qs);} catch (SchedulerException se) {qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);continue;}if (qsRsrcs.getThreadPool().runInThread(shell) == false) {// this case should never happen, as it is indicative of the// scheduler being shutdown or a bug in the thread pool or// a thread pool being used concurrently - which the docs// say not to do...getLog().error("ThreadPool.runInThread() return false!");qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);}}continue; // while (!halted)}} else { // if(availThreadCount > 0)// should never happen, if threadPool.blockForAvailableThreads() follows contractcontinue; // while (!halted)}long now = System.currentTimeMillis();long waitTime = now + getRandomizedIdleWaitTime();long timeUntilContinue = waitTime - now;synchronized(sigLock) {try {if(!halted.get()) {// QTZ-336 A job might have been completed in the mean time and we might have// missed the scheduled changed signal by not waiting for the notify() yet// Check that before waiting for too long in case this very job needs to be// scheduled very soonif (!isScheduleChanged()) {sigLock.wait(timeUntilContinue);}}} catch (InterruptedException ignore) {}}} catch(RuntimeException re) {getLog().error("Runtime error occurred in main trigger firing loop.", re);}} // while (!halted)// drop references to scheduler stuff to aid garbage collection...qs = null;qsRsrcs = null;}}
5.2 几个概念
Trigger
6 Elastic-Job
待续
7 Apache DolphinScheduler
待续
总结
参考
Java中常见的几种任务调度框架对比
引用
- Spring Job?Quartz?XXL-Job?年轻人才做选择,艿艿全莽~
- Timer与TimerTask的真正原理&使用介绍
- 深入 DelayQueue 内部实现
- PriorityQueue详解
- Java优先级队列DelayedWorkQueue原理分析
- 【Java基础】JAVA中优先队列详解
- quartz (从原理到应用)详解篇
- 《分布式任务调度平台XXL-JOB》-官方文档
- 平衡二叉堆
- 聊聊Java进阶之并发基础技术—线程池剖析
- Java中常见的几种任务调度框架对比
- Quartz 源码解析(一) —— 基本介绍
(该文成于2022年公司内部主题分享)