Java常用任务调度

JAVA 任务调度技术

前言

在日常开发过程中,我们经常会遇到周期性执行某段代码的场景。比如定期同步订单,定期更新商品信息,定期发送消息等。这些重复执行的代码可以抽象为一个任务(Task)。 一个Task的特点如下:

  1. 包含需要执行的业务逻辑。
  2. 能够在指定的时间重复(一次或者多次)执行。

围绕Task的特点,开发者们开发了不同的调度框架或者中间件,满足日常开发中的使用。

以下表格列出了部分实现。

技术来源使用场景依赖第三方
TimerJDK自带目前比较少使用
ScheduledExecutorServiceJDK自带基于线程池技术,常用于中间件中
Spring TaskSpring-contextSpring 项目,常用于单体应用开发
XXL-JOB国产开源中间件可用于分布式项目调度依赖mysql
QuartzOpenSymphony 开源组织一些中间件常常基于Quartz开发分布式需要依赖数据库
Elastic-Job当当⽹开源可用于分布式项目调度需要依赖ZooKeeper + Mesos
Apache DolphinScheduler易观开源大数据任务调度

本文对部分技术的实现进行了介绍,从简单到复杂,从具体到抽象,希望可以让大家在面对一个任务调度框架时可以快速抓住要点,不再陌生。

1. Timer

java.util.Timer位于JDK的rt.jar包下,始于jdk1.3,是JDK自带的任务调度器,虽然目前基本不再使用Timer来进行任务调度,但是Timer设计简单,理解起来比较容易。而且后续ScheduledExecutorService的基本原理和Timer基本类似,因此需要对Timer进行一个详细的了解。

Timer的核心类比较少,只需要以下4个类即可。

功能说明
Timer入口类,整个调度器的组织者,相当于其他框架的.定义了多个提交task的方法
TimerThread任务调度器后台线程执行一个轮询方法,核心方法为mainLoop()。
TimerTask抽象类,其实现类包装业务逻辑核心属性:nextExecutionTime下一个执行时间点。
TaskQueue任务队列优先队列,头部节点为最早执行的Task

以上类都处于java.util包下。

1.1 存储任务的数据结构-平衡二叉堆

一个任务框架,需要可以容纳在不同时间执行的任务,因此必须要有一个容器来缓存或者持久化提交的任务。 那么在多任务的场景下,我们如何挑选出需要执行的任务呢?以下对一些场景进行分析:

方案1. 对所有的任务进行遍历,对于下次执行时间小于当前时间的任务,执行业务逻辑。

时间复杂度为O(n),一些时间没到的任务也被遍历到了。性能不好。

方案2. 先对所有的任务,按照下次执行时间的大小进行排序,每次只取头部任务。

对2进行分析可以发现,只要保证队列头部为最早执行的元素即可,对于其他任务,因为还不需要执行,是否有序并不重要。

方案3. 采用优先队列,头部为权值最小,每次取权值即可。

从以上分析可以看出,一个任务框架,可以采用优先队列来容纳提交的任务。Timer正是如此,它的基本数据结构为平衡二叉堆(balanced binary heap)。想要理解Timer,需要对平衡二叉堆进行了解。
详细可以参考 【Java基础】JAVA中优先队列详解 。 摘抄如下:

1.1.1 基本结构

Java平衡二叉堆的定义为:

任意一个非叶子节点的权值,都不大于其左右子节点的权值

可以使用下面的在线数据模拟器进行

平衡二叉堆数据结构模拟器

结构示例如下:

在这里插入图片描述

从图中可以看出,可以通过数组来实现平衡二叉堆。每一个节点的编号,可以使用数组的下标来表示。

  1. 数组的第一个元素为二叉树的根节点,在所有节点中权值最小。
  2. 父子节点之间的关系可以用以下算法表示。这个算法很重要。在新增元素或者删除元素的时候,都需要使用到该算法。

leftNo = parentNo * 2+1
rightNo = parentNo * 2+2
parentNo = (nodeNo-1)/2

在优先队列中,一般只使用到新增元素和删除根节点元素,因此只对这两个算法进行介绍。

1.1.2 新增元素

在这里插入图片描述

步骤如下:

1.先在队尾新增一个元素。如果数组长度不够就先扩容。
2.如果有父节点,则与父节点进行对比。如果权值比父节点小,则与父节点交换位置。
3.重复步骤2,直到没有父节点或者比父节点小则完成新增。2~3步一般称作siftUp。

//siftUp()
private void siftUp(int k, E x) {while (k > 0) {int parent = (k - 1) >>> 1;//parentNo = (nodeNo-1)/2Object e = queue[parent];if (comparator.compare(x, (E) e) >= 0)//调用比较器的比较方法break;queue[k] = e;k = parent;}queue[k] = x;
}

通过以上步骤。可以保证所有的父节点权值都小于子节点的权值。

1.1.3 删除队首元素

在这里插入图片描述

步骤如下:

1.删除数组的第一个元素。
2.将队尾的元素放置到头部位置,记为一个父节点。
3.通过比较获取子节点中较小的一个,并与父节点比较,如果父节点大于子节点,则交换位子。
4.重复步骤3,直到父节点小于等于子节点或者已经没有子节点,则结束比较。这个过程一般称作siftDown。

//siftDown()
private void siftDown(int k, E x) {int half = size >>> 1;while (k < half) {//首先找到左右孩子中较小的那个,记录到c里,并用child记录其下标int child = (k << 1) + 1;//leftNo = parentNo*2+1Object c = queue[child];int right = child + 1;if (right < size &&comparator.compare((E) c, (E) queue[right]) > 0)c = queue[child = right];if (comparator.compare(x, (E) c) <= 0)break;queue[k] = c;//然后用c取代原来的值k = child;}queue[k] = x;
}

1.2 Timer 核心执行逻辑

查看Timer类的结构,可以看到提交任务的方法有6个

在这里插入图片描述

代表了6种不同的场景

delay: 延迟毫秒数period: 时间间隔

1.延迟delay毫秒后,执行任务一次。
2.延迟delay毫秒后,周期性执行任务,两次任务之间间隔period毫秒。
3.延迟delay毫秒后,以固定频率执行任务,两次任务之间间隔period毫秒。

4.指定的时间Date开始,执行任务一次。
5.指定的时间Date开始,周期性执行任务,两次任务之间间隔period毫秒。
6.指定的时间Date开始,以固定频率执行任务,两次任务之间间隔period毫秒。

其中需要特别说明的是2和3,5和6之间的区别,也就是schedule和scheduleAtFixedRate的区别。具体看下表:

方法名下一个执行时间nextExecutionTime说明
schedulecurrentTime + delay当前序堵塞时,会影响到后续任务的下次计划时间,
下次任务会推迟执行,对于丢失的时间不会补上任务
scheduleAtFixedRatenextExecutionTime + delay当堵塞时,影响到后续任务的计划时间,
任务的次数不会丢失,快速补上调度次数

注: currentTime:当前时间,nextExecutionTime:下次执行时间,delay:时间间隔

1.2.1 Timer简单的例子:

Demo先行,先看一个简单的例子,有个初步的印象。

public class Application {public static void main(String[] args) {//初始化一个timer对象Timer timer = new Timer();//创建抽象类TimerTask的实例。TimerTask myTask = new TimerTask() {@Overridepublic void run() {System.out.println("执行run方法,time="+System.currentTimeMillis()/1000%60+"秒");}};//提交任务,延迟1秒执行,每两秒执行一次timer.schedule(myTask, 1000, 1000*2);}
}

结果:

执行run方法,time=25秒
执行run方法,time=27秒
执行run方法,time=29秒
执行run方法,time=31秒
执行run方法,time=33秒
执行run方法,time=35秒
执行run方法,time=37秒

原始代码分析如下,只挑选了核心代码展示。

1.2.2 Timer 类源码分析

Timer 是整个任务架构的组织者,也是入口,因此首先看Timer的代码。

public class Timer {// TaskQueue 实现了一个优先队列private final TaskQueue queue = new TaskQueue();// TimerThread继承了Thread。同时组合了TaskQueue。当Timer实例化时。会启动TimerThread实例的的start()方法。启动线程处理定时任务。private final TimerThread thread = new TimerThread(queue);//构造函数。做了一件事情,及时启动了TimerThread线程,处理队列数据。public Timer(String name, boolean isDaemon) {thread.setName(name);thread.setDaemon(isDaemon);thread.start();}//延迟delay毫秒后执行一次任务public void schedule(TimerTask task, long delay) {sched(task, System.currentTimeMillis()+delay, 0);}//延迟delay毫秒,以固定频率执行定时任务。下次的执行时间为当前系统时间(System.currentTimeMillis())+|period|//当发生阻塞时,有可能丢失调度次数public void schedule(TimerTask task, long delay, long period) {sched(task, System.currentTimeMillis()+delay, -period);}//延迟delay毫秒,以固定频率执行定时任务。与schedule不同。下次执行时间为当前本应执行时间(nextExecutionTime)+period//当发生阻塞时,不会丢失调度次数。public void scheduleAtFixedRate(TimerTask task, long delay, long period) {sched(task, System.currentTimeMillis()+delay, period);}/*** 核心新增定时任务的方法。* @param task 为实现了业务的任务类。* @param time 为下次执行任务的时间。  * @param period  为0时表示不会重复执行。当period !=0时表示会周期性执行*/private void sched(TimerTask task, long time, long period) {//以下代码省去了部分校验代码synchronized(queue) {synchronized(task.lock) {task.nextExecutionTime = time;task.period = period;task.state = TimerTask.SCHEDULED;}queue.add(task);if (queue.getMin() == task)//说明加入task之前。队列为空,处于wait状态,需要唤醒。或者还有一种情况,就是加入的task处于头部,需要立即处理。有可能此时线程处于等待状态,需要唤醒。queue.notify();}}//省略。。。}

1.2.3 TaskQueue 类源码分析

TaskQueue 本质是一个平衡二叉堆,1.1已经有所介绍。

class TaskQueue {/*** Priority queue represented as a balanced binary heap: the two children* of queue[n] are queue[2*n] and queue[2*n+1].  The priority queue is* ordered on the nextExecutionTime field: The TimerTask with the lowest* nextExecutionTime is in queue[1] (assuming the queue is nonempty).  For* each node n in the heap, and each descendant of n, d,* n.nextExecutionTime <= d.nextExecutionTime.* 注释已经讲明,就是一个平衡二叉堆。*/private TimerTask[] queue = new TimerTask[128];void add(TimerTask task) {// Grow backing store if necessaryif (size + 1 == queue.length)queue = Arrays.copyOf(queue, 2*queue.length);//加到队尾queue[++size] = task;//向上排序fixUp(size);}//新增元素时向上排序private void fixUp(int k) {while (k > 1) {int j = k >> 1;if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)break;TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;k = j;}}//刷新void rescheduleMin(long newTime) {queue[1].nextExecutionTime = newTime;fixDown(1);}//删除元素时向下排序private void fixDown(int k) {int j;while ((j = k << 1) <= size && j > 0) {if (j < size &&queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)j++; // j indexes smallest kidif (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)break;TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;k = j;}}//省略其他
}

1.2.3 TimerThread 类源码分析

从Timer代码可以看到,当实例化Timer时,将会启动一个TimerThread线程,具体作用是不断轮询队列的头部元素,然后执行业务代码。核心代码如下(省略部分代码):

class TimerThread extends Thread {//标识线程已经启用。当为false时,跳出循环。boolean newTasksMayBeScheduled = true;private TaskQueue queue;public void run() {try {mainLoop();} finally {//忽略。。。}}//执行轮询private void mainLoop() {while (true) {try {TimerTask task;boolean taskFired;synchronized(queue) {// Wait for queue to become non-emptywhile (queue.isEmpty() && newTasksMayBeScheduled) {//while循环 让出锁,等待新任务加入queue.wait();}if (queue.isEmpty()){//此时newTasksMayBeScheduled队列已死。退出循环break; }long currentTime, executionTime;task = queue.getMin();synchronized(task.lock) {//判断状态if (task.state == TimerTask.CANCELLED) {//检查已经取消的任务,移除。queue.removeMin();continue;  }currentTime = System.currentTimeMillis();//当前task计划的执行时间。executionTime = task.nextExecutionTime;if (taskFired = (executionTime<=currentTime)) {//当前任务的计划执行时间<=当前时间,则允许执行。if (task.period == 0) { // Non-repeating, remove//只执行一次的任务,移除。queue.removeMin();task.state = TimerTask.EXECUTED;} else { //修改队列中最小的一个task的时间为下一个执行时间,并且重新排序。queue.rescheduleMin(task.period<0 ? currentTime   - task.period: executionTime + task.period);}}}if (!taskFired) // Task hasn't yet fired; wait//最近时间执行的任务还未到时间,需要等待。让出锁。queue.wait(executionTime - currentTime);}if (taskFired)  //执行业务逻辑。这里可以看到是同步执行的。如果业务逻辑耗时较长,会影响后续任务的执行。task.run();} catch(InterruptedException e) {}}}
}

TimerThread 中的 mainLoop 方法+TaskQueue队列,看起来非常熟悉,在queue为空的时候,会调用queue.wait()方法。直到Timer在新增元素时,调用了queue.notify()。这些代码和BlockQueue原理非常像。

1.2.4 TimerTask 类源码分析

public abstract class TimerTask implements Runnable {//执行状态int state = VIRGIN;//下次执行时间,如果是重复的任务,在任务执行前会被更新成下次的执行时间。long nextExecutionTime;// 毫秒数,用于重复执行的时间间隔。证书标识以固定频率调度。负数标识以固定的时间延迟调度。0代表不会重复执行。long period = 0;//抽象犯法,用于实现业务public abstract void run();//省略部分代码。
}

1.3 Timer 调度示意图

通过对Timer的四个核心类,我们可以得出以下调度示意图。

在这里插入图片描述

1.4 Timer 总结

可以看到,Timer 是JDK自带的任务调度器。实现的逻辑如下

  • 实现一个优先队列。队列的头部为最先需要执行的任务。
  • 启动一个后台线程,不断从优先队列中获取待执行的任务。
  • 执行任务。

通过使用Timer,我们可以方便地在一个线程中执行多个计划任务。但是也有一定的局限性,主要是多个任务之间相互影响:

  • 所有的任务都在一个线程中执行,如果前面的任务耗时比较长,则会影响后续任务的执行。
  • 假设前序任务抛出了非InterruptedException的异常,则整个队列将会被清空,任务调度终止。

基于以上局限性,在实际应用中,使用Timer使用得并不多。常用的为 ScheduledExecutorService。ScheduledExecutorService与Timer 的最大区别是将任务提交给线程池处理。

2. ScheduledExecutorService

在前一章节可以了解到,在 Timer 类中所有的任务都是同步执行,如果前序任务发生了阻塞或者耗时比较长,那么后续任务就容易被阻塞到。

JDK在1.5之后J引入了 ThreadPoolExecutor 线程池技术。 线程池技术的逻辑机构图如下:

在这里插入图片描述

(参考聊聊Java进阶之并发基础技术—线程池剖析)

从上图可以看到,线程池也是将不同的任务加入到一个队列中(BlockingQueue),等待着多个线程的调用。与Timer的调度很相似,只是最大区别是线程池队列是被多个线程调用的。

因此JDK在1.5引入了ThreadPoolExecutor的同时,也重新编写了一套新任务调度器-ScheduledExecutorService,具体实现类为ScheduledThreadPoolExecutor,用于任务的调度。

其继承关系如下:
在这里插入图片描述

从继承关系中可以看出,ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,拥有线程池的所有功能。

ScheduledThreadPoolExecutor 在实现上与Timer是相似的,都是通过实现一个优先队列来管理任务,同时这个优先队列又是一个阻塞队列,在获取第一个任务后,只有到了执行时间才会返回任务。一个比较大的改进在于,获取任务后不是直接执行代码,而是交给线程池来调度。

2.1核心类

ScheduledExecutorService 的一些核心类如下:

功能说明
ScheduledExecutorService抽象类
Executors.DelegatedScheduledExecutorService包装类用于包装 ScheduledThreadPoolExecutor,
只暴露关键方法
ScheduledThreadPoolExecutor核心执行器实现类,真正执行调度逻辑的地方
ScheduledThreadPoolExecutor.DelayedWorkQueue延迟阻塞队列任务周期执行的核心方法在这个类中实现
ScheduledThreadPoolExecutor.ScheduledFutureTask队列中的对象是ScheduledThreadPoolExecutor的成员内部类

以上类处于java.util.concurrent包下

2.2 ScheduledExecutorService 的简单用法和介绍

public class Application {public static void main(String[] args) {ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();Runnable runnable = ()->{System.out.println("执行run方法,time="+System.currentTimeMillis()/1000%60+"秒");};scheduledExecutorService.scheduleAtFixedRate(runnable, 1, 2, TimeUnit.SECONDS);}
}

查看 ScheduledExecutorService 的结构,

在这里插入图片描述

提交任务的方法共有4个,与Timer不同的是,可以提交Callable类型的任务。

public interface ScheduledExecutorService extends ExecutorService {//delay时间后,执行一次任务public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);//delay时间后,执行一次任务有返回值的任务public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);//以固定频率执行任务public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);//以固定延迟时间执行任务public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

2.3 ScheduledThreadPoolExecutor 类

ScheduledExecutorService只定义了相应的规范,还需要具体类进行实现。

通过查看 Executors.newSingleThreadScheduledExecutor(),具体实现如下

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}

DelegatedScheduledExecutorService 只是一个包装类,核心逻辑在 ScheduledThreadPoolExecutor。
其构造函数调用了父类的构造函数的时候,传入了 DelayedWorkQueue 延时阻塞队列。

public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {// 构造函数1public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());}// 构造函数2public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);}// 构造函数3public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);}// 构造函数4public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);}//ScheduledThreadPoolExecutor 提供了4个构造函数,每个构造函数都调用了父类ThreadPoolExecutor的构造函数,这四个经典参数中,DelayedWorkQueue是不变的,说明它是实现任务队列的关键。//向队列提交任务。// 查看源码,所有提交任务的方法,经过包装后最终会调用delayedExecute,像队列中新增任务。并调用父类的ensurePrestart()方法确认线程池已经准备就绪。private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart();}}
}

到这时ScheduledExecutorService的秘密浮出水面,核心在于 DelayedWorkQueue。

2.4 DelayedWorkQueue 类实现

DelayedWorkQueue 类的定义如下。可以看到,DelayedWorkQueue的实现了BlockingQueue接口,可以传入JDK的线程池进行消费。

static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {
}

但是具体何如实现队列中的任务,在指定的时间被调度呢?

首先来看一下他的队列实现方式。


static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {/** A DelayedWorkQueue is based on a heap-based data structure* like those in DelayQueue and PriorityQueue, except that* every ScheduledFutureTask also records its index into the* heap array. * ...* * 从以上注释中可以知道 DelayedWorkQueue 本质上也是一个基于堆的数据结构。*///初始化了一个数组private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];//用于新增元素时向上移动元素private void siftUp(int k, RunnableScheduledFuture<?> key) {//省略代码}//去除元素时向下移动元素private void siftDown(int k, RunnableScheduledFuture<?> key) {//省略代码}private final ReentrantLock lock = new ReentrantLock();private final Condition available = lock.newCondition();//查看阻塞队列的take方法public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {//1、从队首获取待执行的任务。此任务在队列中最早执行。RunnableScheduledFuture<?> first = queue[0];if (first == null)// 2 队列中还没有元素,等待。available.await();else {//3 delay=下次执行时间-当前时间。当<=0说明需要被执行。long delay = first.getDelay(NANOSECONDS);if (delay <= 0) {// 4 删除队列的第一个个元素并重新向下排序。返回任务return finishPoll(first);}first = null; // don't retain ref while waiting  等待时去除引用if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {//5 当前线程等待,直到被唤醒或者等待时长结束available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}}/*** 查看finishPoll方法。* * 使用最后一个元素替换掉当前元素,并且重新向下排序。* 注意,这时第一个元素已经从队列中去除,这一点与Timer的实现方式不同。* Timer是修改时间了之后,从上往下重新排序。只需要排序一次。* ScheduledExecutorService执行一个定时任务,需要进行两次排序。第一次是获取了task,第二次是真正执行task的时候。* * @param f* @return*/private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {int s = --size;RunnableScheduledFuture<?> x = queue[s];queue[s] = null;if (s != 0)//把队列头部换成x,向下排序siftDown(0, x);setIndex(f, -1);return f;}
}

具体流程如下:
在这里插入图片描述

2.5 ScheduledFutureTask 类

从上一节DelayedWorkQueue类中的take方法和finishPoll方法可知,在线程池获取task后,已经从队列中移走,那么对于重复执行的队列怎么办呢?那就是在线程池执行run方法前,重新将task加到队列中。

// 注意 ScheduledFutureTask 属于  ScheduledThreadPoolExecutor 的成员内部类,因此可以使用ScheduledThreadPoolExecutor方法和成员变量。
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {/** 下次执行时间 */private long time;/** 周期性执行任务的时间间隔 分为0、正、负值。*/private final long period;ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}/**Overrides FutureTask version so as to reset/requeue if periodic. * 重写了 FutureTask 的run方法,主要是增加了重置下个调度时间以及重新排序*/public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))//判断当前状态能不能运行cancel(false);else if (!periodic)//非周期性的任务,只执行一次ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {//设置下次执行时间setNextRunTime();//直接调用外部类的方法。重新将任务加入到队列中。reExecutePeriodic(outerTask);}}}//重新将任务加入到队列中void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(true)) {//往队列加任务super.getQueue().add(task);if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);elseensurePrestart();}}
}

2.6 总结

通过分析源码可以看出,ScheduledExecutorService 是通过实现一个优先队列来存储和调度任务的。从原理上来说是和Timer是类似的。可以认为是Timer 的升级版,新增了线程池执行任务的功能。

在这里插入图片描述

ScheduledExecutorService 和 Timer 比较

框架相同不同
Timer都使用堆作为数据结构1、同步执行任务

2、从队列头部获取任务之后,直接修改下次执行时间,直接排序
ScheduledExecutorService1、线程池异步执行任务
2、从头部获取任务后,移除当前任务 ,排序一次。在执行任务前修改时间后,再提交到队列。相当于排序两次

但是ScheduledExecutorService也有一定的局限性,那就是任务只能执行一次或者以固定的时间差周期性执行。不够灵活。

3 Spring Task

Spring Task处于spring-context项目的org.springframework.scheduling包下。可以通过注解的方式,将Spring bean中的某个方法变成一个task,非常方便。而且引入了cron表达式,使用更加灵活。

3.1 Spring Task 简单用法

新建一个maven项目,引入spring-context包

<dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.3.15</version>
</dependency>

新建一个启动类

@Configuration
@EnableScheduling
public class Application {public static void main(String[] args) {AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(Application.class);}@Scheduled(fixedRate = 1*1000)public void schedled(){System.out.println("执行定时任务,time="+System.currentTimeMillis()/1000%60+"秒");}
}

启动main方法后,执行结果如下:

执行定时任务,time=18秒
执行定时任务,time=19秒
执行定时任务,time=20秒
执行定时任务,time=21秒

从以上demo中可以看出,在spring中,只需要两个注解@EnableScheduling和@Scheduled,就可以启动一个定时任务了,非常方便。

3.2 Spring Task核心类

在这里插入图片描述

初始化基本流程
在这里插入图片描述

需要注意的是,Spring Task本质上只定义了提交task的抽象类,并没有提供具体的调度的实现。
为了实现开箱可用,Spring task将ScheduledExecutorService适配成了默认的实现,但是可以根据具体的需要,替换成其他实现。

3.3 Trigger接口及实现类

Trigger排在最前面介绍,是因为Timer和ScheduledExcecutorService是没有Trigger的概念的,都是再在各自的Task类中拥有一个下次执行时间的属性。Timer的TimerTask是private long nextExecutionTime。ScheduledFutureTask的是private long time。为了让任务的触发时间更加灵活,引入了Trigger的概念。
Trigger的本意为触发器,枪的扳机。因此相对的,当一个任务被触发时,常常用fire这个词。
Trigger的本质是封装了获取下一次执行时间的逻辑。

public interface Trigger {// 获取下一次执行时间@NullableDate nextExecutionTime(TriggerContext triggerContext);
}//上下文,用于存储一些时间变量
public interface TriggerContext {default Clock getClock() {return Clock.systemDefaultZone();}//上一次计划执行时间@NullableDate lastScheduledExecutionTime();// 上一次具体执行时间@NullableDate lastActualExecutionTime();//完成时间@NullableDate lastCompletionTime();}
//TriggerContext实现类很简单
public class SimpleTriggerContext implements TriggerContext {private final Clock clock;@Nullableprivate volatile Date lastScheduledExecutionTime;@Nullableprivate volatile Date lastActualExecutionTime;@Nullableprivate volatile Date lastCompletionTime;
}

spring task中提供了两个Trigger实现
在这里插入图片描述
周期性触发器-目前没有看到使用的地方

public class PeriodicTrigger implements Trigger {private final long period;private final TimeUnit timeUnit;private volatile long initialDelay;private volatile boolean fixedRate;@Overridepublic Date nextExecutionTime(TriggerContext triggerContext) {Date lastExecution = triggerContext.lastScheduledExecutionTime();Date lastCompletion = triggerContext.lastCompletionTime();if (lastExecution == null || lastCompletion == null) {return new Date(triggerContext.getClock().millis() + this.initialDelay);}if (this.fixedRate) {return new Date(lastExecution.getTime() + this.period);}return new Date(lastCompletion.getTime() + this.period);}}

CronTrigger-支持cron表达式

public class CronTrigger implements Trigger {private final CronExpression expression;private final ZoneId zoneId;//提供了多个构造函数 完成 expression 和 zoneId的复制public CronTrigger(String expression) {this(expression, ZoneId.systemDefault());}public CronTrigger(String expression, TimeZone timeZone) {this(expression, timeZone.toZoneId());}public CronTrigger(String expression, ZoneId zoneId) {Assert.hasLength(expression, "Expression must not be empty");Assert.notNull(zoneId, "ZoneId must not be null");this.expression = CronExpression.parse(expression);this.zoneId = zoneId;}@Overridepublic Date nextExecutionTime(TriggerContext triggerContext) {Date date = triggerContext.lastCompletionTime();if (date != null) {Date scheduled = triggerContext.lastScheduledExecutionTime();if (scheduled != null && date.before(scheduled)) {// Previous task apparently executed too early...// Let's simply use the last calculated execution time then,// in order to prevent accidental re-fires in the same second.date = scheduled;}}else {date = new Date(triggerContext.getClock().millis());}ZonedDateTime dateTime = ZonedDateTime.ofInstant(date.toInstant(), this.zoneId);ZonedDateTime next = this.expression.next(dateTime);return (next != null ? Date.from(next.toInstant()) : null);}
}

目前,在Spring task中,只有cron类型的任务才用到Trigger,其他的固定固定周期执行和固定频率执行,还是用传统的模式。

3.4 Task父类以及子类

Task是所有Spring task的所有父类,比较简单,就是定义了runnable变量,用来存储业务逻辑,而其他周期性执行,延迟执行等参数留给子类实现

public class Task {private final Runnable runnable;
}

其子类如下:
在这里插入图片描述

从图上可知,Task的子类分为两种:

  1. Trigger触发的Task。代表为CronTask
  2. 固定间歇执行的Task。代表为FixedDelayTask和FixedRateTask。其中FixedDelayTask和FixedRateTask本身没有任何实现,只是为了区分不同的调度方式(还记得Timer中的正负period吗),具体功能都是由父类IntervalTask承担。
public class IntervalTask extends Task {private final long interval;private final long initialDelay;public IntervalTask(Runnable runnable, long interval, long initialDelay) {super(runnable);this.interval = interval;this.initialDelay = initialDelay;}
}
public class FixedRateTask extends IntervalTask {public FixedRateTask(Runnable runnable, long interval, long initialDelay) {super(runnable, interval, initialDelay);}
}
public class FixedDelayTask extends IntervalTask {public FixedDelayTask(Runnable runnable, long interval, long initialDelay) {super(runnable, interval, initialDelay);}
}
public class TriggerTask extends Task {private final Trigger trigger;public TriggerTask(Runnable runnable, Trigger trigger) {super(runnable);this.trigger = trigger;}
}public class CronTask extends TriggerTask {private final String expression;public CronTask(Runnable runnable, String expression) {this(runnable, new CronTrigger(expression));}
}

3.3 TaskScheduler接口逻辑

TaskScheduler 是Spring task中的任务调度接口,定义了一系列提交任务的方法,与 ScheduledExecutorService 角色相当。
方法概览如下:

public interface TaskScheduler {default Clock getClock() {return Clock.systemDefaultZone();}@NullableScheduledFuture<?> schedule(Runnable task, Trigger trigger);ScheduledFuture<?> schedule(Runnable task, Date startTime);ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);//另外提供了一些default方法default ScheduledFuture<?> schedule(Runnable task, Instant startTime) {return schedule(task, Date.from(startTime));}default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {return scheduleAtFixedRate(task, Date.from(startTime), period.toMillis());}default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {return scheduleAtFixedRate(task, period.toMillis());}default ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {return scheduleWithFixedDelay(task, Date.from(startTime), delay.toMillis());}default ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {return scheduleWithFixedDelay(task, delay.toMillis());}}

与ScheduledExecutorService的接口很类似

public interface ScheduledExecutorService extends ExecutorService {//delay时间后,执行一次任务public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);//delay时间后,执行一次任务有返回值的任务public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);//以固定频率执行任务public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);//以固定延迟时间执行任务public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

但需要注意的是新增了一个方法。

ScheduledFuture<?> schedule(Runnable task, Trigger trigger);

这个方法比较特殊,也是实现cron表达式的关键,依靠 Trigger。Trigger在上一节已经有介绍。
看下TaskScheduler,spring提供了三个实现类。
在这里插入图片描述
实现类 ConcurrentTaskScheduler 注解上讲的很明白,就是一个将java.util.concurrent.ScheduledExecutorService 适配成 TaskScheduler 的适配器。
其构造器如下
在这里插入图片描述

ThreadPoolTaskScheduler 则是封装了ScheduledThreadPoolExecutor。
在这里插入图片描述

因此很明显,默认情况下,Spring-task的底层就是由ScheduledExecutorService来提供实际调度的。当然也可以自己实现一个TaskScheduler的实现类,但目前看来并没有理由再造一个这样的轮子。置于为什么没有直接使用ScheduledExecutorService,一是提供了一个新的方法提交Trigger。二是方便拓展,可以自己实现一个任务调度器。

public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements TaskScheduler {@Nullableprivate static Class<?> managedScheduledExecutorServiceClass;static {try {// 需要单独引入 javax.enterprise.concurrent-api 包。默认是没有的。ManagedScheduledExecutorServicemanagedScheduledExecutorServiceClass = ClassUtils.forName("javax.enterprise.concurrent.ManagedScheduledExecutorService",ConcurrentTaskScheduler.class.getClassLoader());}catch (ClassNotFoundException ex) {// JSR-236 API not available...managedScheduledExecutorServiceClass = null;}}private ScheduledExecutorService scheduledExecutor;private boolean enterpriseConcurrentScheduler = false;@Nullableprivate ErrorHandler errorHandler;private Clock clock = Clock.systemDefaultZone();public ConcurrentTaskScheduler() {super();this.scheduledExecutor = initScheduledExecutor(null);}public ConcurrentTaskScheduler(ScheduledExecutorService scheduledExecutor) {super(scheduledExecutor);this.scheduledExecutor = initScheduledExecutor(scheduledExecutor);}public ConcurrentTaskScheduler(Executor concurrentExecutor, ScheduledExecutorService scheduledExecutor) {super(concurrentExecutor);this.scheduledExecutor = initScheduledExecutor(scheduledExecutor);}private ScheduledExecutorService initScheduledExecutor(@Nullable ScheduledExecutorService scheduledExecutor) {if (scheduledExecutor != null) {this.scheduledExecutor = scheduledExecutor;// 当前实现类为 ManagedScheduledExecutorService的子类this.enterpriseConcurrentScheduler = (managedScheduledExecutorServiceClass != null &&managedScheduledExecutorServiceClass.isInstance(scheduledExecutor));} else {this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();this.enterpriseConcurrentScheduler = false;}return this.scheduledExecutor;}@Override@Nullablepublic ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {try {if (this.enterpriseConcurrentScheduler) {return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);} else {//默认为走到这里ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));return new ReschedulingRunnable(task, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();}} catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> schedule(Runnable task, Date startTime) {long initialDelay = startTime.getTime() - this.clock.millis();try {return this.scheduledExecutor.schedule(decorateTask(task, false), initialDelay, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {long initialDelay = startTime.getTime() - this.clock.millis();try {return this.scheduledExecutor.scheduleAtFixedRate(decorateTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {try {return this.scheduledExecutor.scheduleAtFixedRate(decorateTask(task, true), 0, period, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {long initialDelay = startTime.getTime() - this.clock.millis();try {return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}@Overridepublic ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {try {return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), 0, delay, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}}private Runnable decorateTask(Runnable task, boolean isRepeatingTask) {Runnable result = TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);if (this.enterpriseConcurrentScheduler) {result = ManagedTaskBuilder.buildManagedTask(result, task.toString());}return result;}/*** Delegate that adapts a Spring Trigger to a JSR-236 Trigger.* Separated into an inner class in order to avoid a hard dependency on the JSR-236 API.*/private class EnterpriseConcurrentTriggerScheduler {public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) {ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor;return executor.schedule(task, new javax.enterprise.concurrent.Trigger() {@Override@Nullablepublic Date getNextRunTime(@Nullable LastExecution le, Date taskScheduledTime) {return (trigger.nextExecutionTime(le != null ?new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) :new SimpleTriggerContext()));}@Overridepublic boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) {return false;}});}}}

3.4 Scheduled 注解

首先看一下Scheduled的代码,看提供了哪些功能呢。

public @interface Scheduled {//核心参数如下//1、cron表达式String cron() default "";//2、固定延迟long fixedDelay() default -1;//3、固定频率long fixedRate() default -1;//4、固定的延迟long initialDelay() default -1;//时间单位,默认是毫秒TimeUnit timeUnit() default TimeUnit.MILLISECONDS;//一下是String类型的配,方便接收配置化的数据,如${fixedDelay:10}String fixedDelayString() default "";String fixedRateString() default "";String initialDelayString() default "";}

Scheduled 中包含了任务调度的相关配置参数。 相比较ScheduledExecutorService,多了cron表达式。在任务的控制上更加灵活,不再局限于固定重复周期。

3.5 ScheduledAnnotationBeanPostProcessor 类

spring-task需要@EnableScheduling开启注解,查看其定义:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}

最重要的是@Import(SchedulingConfiguration.class),将SchedulingConfiguration注入到spring容器中

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}}

SchedulingConfiguration只做了一件事情,那就是配置了ScheduledAnnotationBeanPostProcessor。
ScheduledAnnotationBeanPostProcessor实现了Spring的后置处理器,因此在Spring启动后,可以根据相应的配置或者注解,可以筛选出对应的方法,封装成为ScheduledTask,等待被调用。在Spring初始化完成后将会触发任务的调度。
首先来看下 ScheduledMethodRunnable。ScheduledAnnotationBeanPostProcessor一个重要的目标就是将注解的方法封装成为ScheduledMethodRunnable。

public class ScheduledMethodRunnable implements Runnable {private final Object target;private final Method method;public ScheduledMethodRunnable(Object target, Method method) {this.target = target;this.method = method;}@Overridepublic void run() {try {ReflectionUtils.makeAccessible(this.method);this.method.invoke(this.target);} catch (InvocationTargetException ex) {}}
}
//只保留核心代码
public class ScheduledAnnotationBeanPostProcessorimplements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {//用来缓存task和TaskScheduler调度器private final ScheduledTaskRegistrar registrar;public ScheduledAnnotationBeanPostProcessor() {this.registrar = new ScheduledTaskRegistrar();}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) {// 忽略...// 查找Scheduled注解的MethodMap<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);});annotatedMethods.forEach((method, scheduledAnnotations) ->scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));//忽略。。}//处理Spring beanprotected void processScheduled(Scheduled scheduled, Method method, Object bean) {try {//将Spring bean 封装成为一个Runnable 。在执行Runnable方法时,使用反射技术 method.invoke(this.target),执行原本逻辑即可。Runnable runnable = createRunnable(bean, method);// 处理延迟时间long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());String initialDelayString = scheduled.initialDelayString();if (StringUtils.hasText(initialDelayString)) {Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");if (this.embeddedValueResolver != null) {initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);}if (StringUtils.hasLength(initialDelayString)) {try {initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");}}}// 1、处理cron表达式String cron = scheduled.cron();if (StringUtils.hasText(cron)) {//处理一下时区的问题String zone = scheduled.zone();if (this.embeddedValueResolver != null) {cron = this.embeddedValueResolver.resolveStringValue(cron);zone = this.embeddedValueResolver.resolveStringValue(zone);}if (StringUtils.hasLength(cron)) {Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");processedSchedule = true;if (!Scheduled.CRON_DISABLED.equals(cron)) {TimeZone timeZone;if (StringUtils.hasText(zone)) {timeZone = StringUtils.parseTimeZoneString(zone);}else {timeZone = TimeZone.getDefault();}tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));}}}// At this point we don't need to differentiate between initial delay set or not anymoreif (initialDelay < 0) {initialDelay = 0;}// 2、处理// Check fixed delaylong fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());if (fixedDelay >= 0) {processedSchedule = true;tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));}// 3、处理字符串形式的Scheduled ,比如配置化${time:1000}String fixedDelayString = scheduled.fixedDelayString();//延迟if (StringUtils.hasText(fixedDelayString)) {if (this.embeddedValueResolver != null) {fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);}if (StringUtils.hasLength(fixedDelayString)) {processedSchedule = true;try {fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit());}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");}tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));}}// 3 固定频率的任务long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());if (fixedRate >= 0) {processedSchedule = true;tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));}String fixedRateString = scheduled.fixedRateString();if (StringUtils.hasText(fixedRateString)) {if (this.embeddedValueResolver != null) {fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);}if (StringUtils.hasLength(fixedRateString)) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;try {fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit());}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");}tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));}}//忽略其他情况}catch (Exception e){}}protected Runnable createRunnable(Object target, Method method) {Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());return new ScheduledMethodRunnable(target, invocableMethod);}//监听容器刷新事件public void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext() == this.applicationContext) {finishRegistration();}}//这部分主要是从Spring中获取配置的 scheduler 只保留核心代码private void finishRegistration() {if (this.scheduler != null) {this.registrar.setScheduler(this.scheduler);}//检查是否做了配置SchedulingConfigurerif (this.beanFactory instanceof ListableBeanFactory) {Map<String, SchedulingConfigurer> beans =((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());AnnotationAwareOrderComparator.sort(configurers);for (SchedulingConfigurer configurer : configurers) {configurer.configureTasks(this.registrar);}}/*** 获取调度器的核心代码*/if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {try {//1、根据类型获取TaskScheduler实现类this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));}catch (NoUniqueBeanDefinitionException ex) {try {//2、实现类不唯一,尝试使用默认名称taskScheduler查找this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));}catch (NoSuchBeanDefinitionException ex2) {}}catch (NoSuchBeanDefinitionException ex) {try {//3、根据类型获取 ScheduledExecutorService 实现类this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));}catch (NoUniqueBeanDefinitionException ex2) {try {//4、实现类不唯一,尝试使用默认名称taskScheduler查找this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));}catch (NoSuchBeanDefinitionException ex3) {}}catch (NoSuchBeanDefinitionException ex2) {// Giving up -> falling back to default scheduler within the registrar...}}}this.registrar.afterPropertiesSet();}
}

从 ScheduledAnnotationBeanPostProcessor 源码可以看出,经过处理后,所有的任务和执行器都存放于ScheduledTaskRegistrar中。通过调用afterPropertiesSet()来启动任务。总结来说,做了以下3件事情

1、将被注解的方法封装成为 Task。
2、从容器中查找合适的 TaskScheduler。
3、将1和2都存到ScheduledTaskRegistrar。

其中查找 TaskScheduler 的来源分为4个。

1、调用ScheduledAnnotationBeanPostProcessor实例的set方法。
2、配置SchedulingConfigurer实现类到spring容器中。
3、配置 TaskScheduler 实现类到spring容器中。
4、配置ScheduledExecutorService实现类到Spring 容器中。
5、在1-4都没有做的情况下ScheduledTaskRegistrar会直接调用 Executors.newSingleThreadScheduledExecutor()获取一个ScheduledExecutorService。

其中1-4在 ScheduledAnnotationBeanPostProcessor 中实现, 5在ScheduledTaskRegistrar中实现。

3.6 ScheduledTaskRegistrar 类

ScheduledTaskRegistrar是一个核心类,也是一个容器类,保存了所有的task的定义(这地方和xxl-job的类似)。同时也是真正将Task提交给调度器的地方。具体看以下代码。

public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {//处理器,是Spring的类private TaskScheduler taskScheduler;//默认处理器 JDKprivate ScheduledExecutorService localExecutor;//根据不同的配置,缓存了不同的类型的task。名字与Scheduled中配置基本一样。@Nullableprivate List<TriggerTask> triggerTasks;@Nullableprivate List<CronTask> cronTasks;@Nullableprivate List<IntervalTask> fixedRateTasks;@Nullableprivate List<IntervalTask> fixedDelayTasks;private final Map<Task, ScheduledTask> unresolvedTasks = new HashMap<>(16);//已经加入执行器中的任务private final Set<ScheduledTask> scheduledTasks = new LinkedHashSet<>(16);@Overridepublic void afterPropertiesSet() {scheduleTasks();}//具体将任务提交到执行器中的方法protected void scheduleTasks() {if (this.taskScheduler == null) {//当容器中没有找到执行器的时候,将会使用 ScheduledExecutorService。此时只有一个线程处理。this.localExecutor = Executors.newSingleThreadScheduledExecutor();//需要将ScheduledExecutorService 封装成为 TaskScheduler 才能够使用this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);}if (this.triggerTasks != null) {for (TriggerTask task : this.triggerTasks) {addScheduledTask(scheduleTriggerTask(task));}}if (this.cronTasks != null) {for (CronTask task : this.cronTasks) {addScheduledTask(scheduleCronTask(task));}}if (this.fixedRateTasks != null) {for (IntervalTask task : this.fixedRateTasks) {addScheduledTask(scheduleFixedRateTask(task));}}if (this.fixedDelayTasks != null) {for (IntervalTask task : this.fixedDelayTasks) {addScheduledTask(scheduleFixedDelayTask(task));}}}public ScheduledTask scheduleTriggerTask(TriggerTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());}else {addTriggerTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}@Nullablepublic ScheduledTask scheduleCronTask(CronTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());}else {addCronTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}@Deprecated@Nullablepublic ScheduledTask scheduleFixedRateTask(IntervalTask task) {FixedRateTask taskToUse = (task instanceof FixedRateTask ? (FixedRateTask) task :new FixedRateTask(task.getRunnable(), task.getInterval(), task.getInitialDelay()));return scheduleFixedRateTask(taskToUse);}@Nullablepublic ScheduledTask scheduleFixedRateTask(FixedRateTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {if (task.getInitialDelay() > 0) {Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay());scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());}else {scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());}}else {addFixedRateTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}@Deprecated@Nullablepublic ScheduledTask scheduleFixedDelayTask(IntervalTask task) {FixedDelayTask taskToUse = (task instanceof FixedDelayTask ? (FixedDelayTask) task :new FixedDelayTask(task.getRunnable(), task.getInterval(), task.getInitialDelay()));return scheduleFixedDelayTask(taskToUse);}@Nullablepublic ScheduledTask scheduleFixedDelayTask(FixedDelayTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {if (task.getInitialDelay() > 0) {Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay());scheduledTask.future = this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval());}else {scheduledTask.future = this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), task.getInterval());}}else {addFixedDelayTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}}

3.7 如何执行cron表达式的任务

从前面的代码中我们知道, Spring-task默认使用 ScheduledExecutorService 作为底层逻辑,但是ScheduledExecutorService并不支持cron表达式。不过可以通过将cron表达式的任务封装成ScheduledExecutorService支持的参数即可。基本思想是将任务当成一次延时任务即可,等执行完上一次任务之后,如果还有下次,则重新提交到调度器。也就是:

1、将task在封装一层成为 ReschedulingRunnable,
2、计算cron的下次执行时间与当前的时间差delay。
3、调用提交任务。让任务延迟delay执行一次,注意只执行一次。
4、执行业务 run 方法。
5、重复执行2-4即可。

这个思路与ScheduledExecutorService获取task后再提交到队列中的思路是一样的。具体代码参考 ReschedulingRunnable 类和 CronTask 类。

在这里插入图片描述

class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {// trigger 封装了cron和获取下次触发时间的类private final Trigger trigger;// 上下文private final SimpleTriggerContext triggerContext;// 执行器private final ScheduledExecutorService executor;@Nullableprivate ScheduledFuture<?> currentFuture;@Nullableprivate Date scheduledExecutionTime;private final Object triggerContextMonitor = new Object();//重新将task封装成为 ReschedulingRunnable public ReschedulingRunnable(Runnable delegate, Trigger trigger, Clock clock,ScheduledExecutorService executor, ErrorHandler errorHandler) {super(delegate, errorHandler);this.trigger = trigger;this.triggerContext = new SimpleTriggerContext(clock);this.executor = executor;}@Overridepublic void run() {//实际执行时间Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());//执行业务方法super.run();//完成时间Date completionTime = new Date(this.triggerContext.getClock().millis());synchronized (this.triggerContextMonitor) {//更新相关参数this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);//判断是否可以继续执行if (!obtainCurrentFuture().isCancelled()) {//继续提交任务schedule();}}}@Nullablepublic ScheduledFuture<?> schedule() {synchronized (this.triggerContextMonitor) {//获取下次执行时间this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);if (this.scheduledExecutionTime == null) {//没有下次时间,终止执行return null;}long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();//提交到线程池 ScheduledExecutorService。只执行一次。this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);return this;}}//省略其他代码。。。
}

3.8 Spring task使用注意事项

通过ScheduledAnnotationBeanPostProcessor查找TaskScheduler的过程,以及ScheduledTaskRegistrar调度过程,我们知道,如果我们没有配置TaskScheduler实例,默认情况下使用 Executors.newSingleThreadScheduledExecutor()新建了一个实例,这个实例只有一个线程处理任务,在任务耗时比较高的情况下会有可能发生阻塞。最好是配置一个ScheduledExecutorService实例交给Spring管理
如:

@Configuration
@EnableScheduling
public class Application2 {public static void main(String[] args) {AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(Application2.class);}@Scheduled(fixedRate = 100)public void schedled(){System.out.println("执行定时任务,time="+System.currentTimeMillis()/1000%60+"秒。threadName="+Thread.currentThread().getName());}@Beanpublic ScheduledExecutorService executorService(){ThreadFactory factory = new ThreadFactory() {private  int seq = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r,"定时任务线程池 seq="+seq++);}};ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3, factory);return scheduledExecutorService;}
}

3.9 总结

spring task中任务处理器为TaskScheduler实现类,任务为Task的子类。
基本的思想还是与 ScheduledExecutorService 想类似的。在默认情况下也是使用ScheduledExecutorService作为任务的处理器。

4 XXL-JOB

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。由于是国产开发的,中文文档,而且很全面,具体使用方法可以直接看官方文档 《分布式任务调度平台XXL-JOB》-官方文档 。

fe最初XXL-JOB的底层是Quartz,后来发现Quartz比较复杂,不利于扩展和维护,因此自研了一个调度器,简单但是很实用,很值得学习。

4.1 为什么我们需要一个分布式的调度平台

前面介绍的Timer, ScheduledExcutorService 和 Spring Task,都是针对单个实例内的任务调度,在分布式部署的时候,可能会有一定的问题。比如假设有一个任务A,是给用户发送消息的,设置每一秒执行一次,如果部署了3个实例,那么就会变成每秒执行3次,调度频率随着实例的增多而增多,如果没有加全局锁,会出现重复发送的问题。此外在实际的业务中,我们还有可能需要随时JOB的调度周期,随时停止和启动一个任务等,这些操作都需要发版才能实现。因此在分布式系统中,有一个分布式调度器尤为重要。

4.1.1 分布式调度平台的设计思路

通过前面的几个任务调度工具的介绍,我们可以总结出来,一个任务调度系统,包含以下几个元素:

在这里插入图片描述

其中,Runnable(业务逻辑)可以是任意的可执行的业务逻辑,包括一个HTTP请求。如果是通过HTTP请求调度其他服务的任务,那么就实现了一个最基本的任务调度服务。

在这里插入图片描述

但是,如果我们还希望任务的调度更加灵活,比如可以随时修改调度的频率,随时停止和启动,那么就必须将触发器持久化,比如存储到Mysql等。

在这里插入图片描述

除此以外,我们还希望这个调度服务是高可用的,因此多实例部署,加一个锁就可以了

在这里插入图片描述

4.2 XXL-JOB 的模块

从github 上下载源码,可以看到XXL-JOB的核心模块分为2个,xxl-job-admin 和 xxl-job-core。另外的xxl-job-executor-samples是一个例子模块

在这里插入图片描述

核心模块的作用如下:

模块说明功能
xxl-job-admin服务端(调度中心)管理界面+任务调度
xxl-job-core客户端(执行器)在项目中引用,执行业务逻辑

从模块划分可以看出,xxl-job的任务调度和任务的执行是分开的,客户端只管执行任务,不用管任务单的调度。任务调度是由服务端执行,这样各司其职,进行了解耦,提高系统整体稳定性和扩展性

官方架构图
在这里插入图片描述

如果大家心中没有任务调度的概念,直接看官方架构图是有些吃力的,因此我做了简化,保留了核心部分,如下图:

在这里插入图片描述

从上图可以看出,XXL-JOB框架分为三个结构。

1、Mysql:存储相关信息
2、客户端:将自己注册到服务端,等待任务下发。
3、服务端:维护JOB的信息,将需要执行的JOB,通过一定的策略,找到对应的客户端地址,发送HTTP请求,客户端执行即可。

4.3 xxl-job-admin 解析

服务端核心类如下:

功能
JobScheduleHelper调度器,将需要执行的JOB提交到线程池
XxlJobInfo实体类,记录了一个任务的配置,持久化到mysql中
JobTriggerPoolHelper线程池,多个线程发送job到客户端
XxlJobTrigger触发器, 真正处理 XxlJobInfo并发送到制定的客户端
JobRegistryHelper注册中心 ,接收客户端的注册和心跳

以上类处于com.xxl.job.admin.core包

4.3.1 服务端初始化入口

admin的初始化入口在XxlJobAdminConfig,是InitializingBean的实现类,因此在spring配置文件初始化完成后就触发了XXL-JOB的初始化

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {private XxlJobScheduler xxlJobScheduler;@Overridepublic void afterPropertiesSet() throws Exception {adminConfig = this;xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();}//省略
}

从上边可以看到,真正执行初始化的类是XxlJobScheduler,初始化逻辑在init()方法中。

public class XxlJobScheduler  {//启动了一些列的线程或者线程池来来处理相关逻辑public void init() throws Exception {// init i18ninitI18n();// admin trigger pool start  触发器,触发http请求。接收JobScheduleHelper发送过来的triggerJobTriggerPoolHelper.toStart();// admin registry monitor run  处理注册上来的服务IP地址,并监控心跳等信息JobRegistryHelper.getInstance().start();// admin fail-monitor run  生成一些任务失败的告警等JobFailMonitorHelper.getInstance().start();// admin lose-monitor run ( depend on JobTriggerPoolHelper )  // 处理执行完成的任务,并监控丢失执行器的任务记录为失败。JobCompleteHelper.getInstance().start();// admin log report start  调度统计JobLogReportHelper.getInstance().start();// start-schedule  ( depend on JobTriggerPoolHelper )// 调度器JobScheduleHelper.getInstance().start();}//省略。。。
}

服务端初始化流程图如下:

在这里插入图片描述

XXL-JOB服务端在初始化过程中启动了多个后台线程或者线程池,用于异步处理多项任务。

4.3.2 JobScheduleHelper 类

从前面对任务调度的介绍可以看出,一个任务调度器,离不开

1.带有执行时间的任务列表
2.轮询执行任务的调度器

XXL-JOB 也不例外。其中JobScheduleHelper类就属于轮询执行任务的调度器,包含了任务调用的基本逻辑,属于必看的类

具体代码如下


public class JobScheduleHelper {//预读5秒public static final long PRE_READ_MS = 5000;    // pre read//时间轮刻度-任务ID映射表private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();public void start(){scheduleThread = new Thread(new Runnable() {@Overridepublic void run() {// 按照默认配置 (快200+慢100)* 20 = 6000int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();//数据库连接Connection conn = null;PreparedStatement preparedStatement = null;boolean preReadSuc = true;try {conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();conn.setAutoCommit(false);//加锁preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// 1、预读 查询数据库,获取下次执行时间 <= 当前时间+5秒 的所有JOBlong nowTime = System.currentTimeMillis();//SELECT * FROM xxl_job_info AS t WHERE t.trigger_status = 1 and t.trigger_next_time <= #{maxNextTime} ORDER BY id ASC LIMIT #{pagesize}List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {// 2、遍历处理JOB,看是直接提交给线程池还是先提交到 time-ring后再提交给线程池。for (XxlJobInfo jobInfo: scheduleList) {// time-ring jumpif (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、超时>5秒以上,当做错失触发时机// 1、超时触发策略MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);}// 2、更新时间refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {// 2.2、超时时间在5秒以内// 1、提交到线程池JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);// 2、刷新一次时间refreshNextValidTime(jobInfo, new Date());if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {//下次执行在5秒内,说明下次循环还有它,可以再预读一次,直接提交到时间轮,提高想能// 1、计算时间轮刻度int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、提交到时间轮pushTimeRing(ringSecond, jobInfo.getId());// 3、再更新一次时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 2.3、未到执行时间// 1、计算时间轮刻度(60秒时间轮)int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、提交到时间轮线程pushTimeRing(ringSecond, jobInfo.getId());// 3、刷新refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、将修改了下次执行时间的任务存到数据库中for (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {preReadSuc = false;}} finally {//省略处理数据库连接}long cost = System.currentTimeMillis()-start;//省略。。。}}});scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();// ring threadringThread = new Thread(new Runnable() {@Overridepublic void run() {while (!ringThreadToStop) {//时间轮代码,忽略}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");}});ringThread.setDaemon(true);ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");ringThread.start();}//忽略其他代码
}

源码逻辑解析
在这里插入图片描述

从以上逻辑看,XXL-JOB的核心逻辑与JDK的 ScheduledExecutorService 是基本类似的。都是先从一个队列(xxl-job是使用mysql排序)中取出JOB,然后提交给线程池处理。

但有一点区别:XXL-JOB是从数据库读取数据,因此为了提高性能,做了一个预读5秒的变化。未到时间执行的job提交给时间轮,再由时间轮提交给线程池处理。

4.3.3 XxlJobTrigger 类

经过JobScheduleHelper调度,job的参数会被提交的线程池,线程池由JobTriggerPoolHelper实现,比较简单,不再描述,然后最终会使用 XxlJobTrigger 是触发执行job的地方。

public class XxlJobTrigger {public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {//1、 从数据库获取job配置XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo == null) {return;}if (executorParam != null) {//如果传参就覆盖jobInfo.setExecutorParam(executorParam);}int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();// 2 从数据库获取分组信息(本质就是获取接收job的地址)XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressListif (addressList!=null && addressList.trim().length()>0) {group.setAddressType(1);//传参的话就覆盖group.setAddressList(addressList.trim());}// sharding paramint[] shardingParam = null;if (executorShardingParam!=null){String[] shardingArr = executorShardingParam.split("/");if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {shardingParam = new int[2];shardingParam[0] = Integer.valueOf(shardingArr[0]);shardingParam[1] = Integer.valueOf(shardingArr[1]);}}if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {//分片for (int i = 0; i < group.getRegistryList().size(); i++) {processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {if (shardingParam == null) {//分片参数,这里意思是只发给一个地址。shardingParam = new int[]{0, 1};}processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}/*** @param group                     job group, registry list may be empty* @param jobInfo* @param finalFailRetryCount  纯粹是为了打日志* @param triggerType          为了打日志* @param index                     sharding index* @param total                     sharding index*/private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){//阻塞策略ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy//路由策略 就是如何发给客户端,如第一个,最后一个,一致性哈希ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy//分片参数 日志记录用String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;// 1、save log-idXxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());//记录日志XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);// 2、组装参数TriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init addressString address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {//广播模式,获取指定下标的 url 。if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {//根据路由策略获取 url.routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executorReturnT<String> triggerResult = null;if (address != null) {triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger info//忽略一长串组装日志代码//保存日志XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);}//根据地址url,将参数发送到指定的客户端。public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ReturnT<String> runResult = null;try {ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");runResultSB.append("<br>address:").append(address);runResultSB.append("<br>code:").append(runResult.getCode());runResultSB.append("<br>msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());return runResult;}}

总结以上XxlJobTrigger类的代码,做了几件事

1、根据jobId从数据库获取job参数
2、根据job参数获取groupId后,再获取分组信息,里面包含了分组中的客户端 ip:port地址。
3、根据路由策略,获取指定的address,将job参数通过http发送往客户端。
4、记录日志。

4.3.4 JobRegistryHelper 类

客户端会定时上报自身的ip+port,JobRegistryHelper就是专门处理这些信息的。
实现类比较简单,就补贴源码了,只讲一下逻辑。
1、定义了一个线程池,专门保存或者修改用来处理客户端上报的address。
2、定义了一个后台线程,周期性(30s)处理以下逻辑:清除过期的客户端注册信息(30*3s不上报),将最新的address更新到各自的任务组中。

4.3.4 路由策略

路由策略抽象类为 ExecutorRouter,在配置job的时候指定的路由策略,就有对应的ExecutorRouter子类去实现。

public abstract class ExecutorRouter {/*** route address* @param addressList* @return  ReturnT.content=address*/public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);}

1、第一个:直接取addressList第一个地址
2、最后一个:直接取 addressList最后地址
3、轮询: 对调度次数进行计数n,n%addressList.size获取地址下标。
4、随机: 随机取一个。
5、一致性hash: 使用java.util.TreeMap.tailMap()方法来实现。负载均衡之一致性哈希环算法
6、最不经常使用:LFU(Least Frequently Used):最不经常使用,频率/次数
7、最近最久未使用:LRU(Least Recently Used):最近最久未使用,时间
8、故障转移:对addressList进行循环http请求,第一个正常返回的地址作为调度地州。
9、忙碌转移:通过http请求客户端检查JobThread,第一个空闲的客户端作为调度客户端。
10、分片广播:发送给所有的客户端。

4.4 客户端逻辑

客户端核心类如下:

在这里插入图片描述

功能说明
XxlJobTask注解被标注的方法将会被处理成为 IJobHandler, 与@Scheduled注解功能相似 (19年底新增注解)。 每个IJobHandler有唯标识
EmbedServer客户端server启动一个netty,用于接收服务端的调度
ExecutorBizImpl处理服务端的请求EmbedServer接收请求后,实际交给ExecutorBizImpl进行处理,里面有处理阻塞策略
TriggerParam触发参数记录服务端发送过来的任务
JobThreadJob线程用 LinkedBlockingQueue 缓存服务端传递过来的 TriggerParam。轮询 LinkedBlockingQueue,顺序处理同一个job的任务
IJobHandlerTask抽象类被@XxlJob的注释的方法,或者通过服务端传递过来的代码,将会封装成为一个 IJobHandler 实现类
XxlJobContext上下文内置InheritableThreadLocal,在线程中存储变量,供给IJobHandler

从以上表格基本可以看出客户端的执行逻辑,其中比较重要的是 ExecutorBizImpl 和 JobThread,以及XxlJob注解的原理。 将会对这三个类进行介绍。

4.4.1 @XxlJob 注解原理

在客户端引入XXL-JOB的时候,一般需要进行如下配置

    @Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}

查看XxlJobSpringExecutor 具体代码如下

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {// start@Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository (for method)initJobHandlerMethodRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super starttry {super.start();} catch (Exception e) {throw new RuntimeException(e);}}//通过Spring的ApplicationContext,获取到使用了@XxlJob注解的类,缓存起来。private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}// init job handler from methodString[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBeantry {annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {Method executeMethod = methodXxlJobEntry.getKey();XxlJob xxlJob = methodXxlJobEntry.getValue();// 父类核心方法registJobHandler(xxlJob, bean, executeMethod);}}}}

继续看父类XxlJobExecutor,可以看到使用一个ConcurrentMap缓存了包装过的业务方法。其中key为每个job的唯一标识,与服务端的key一一对应。

public class XxlJobExecutor {//使用一个ConcurrentMap缓存了包装过的业务方法。其中key为每个job的唯一标识,与服务端的key一一对应。private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){if (xxlJob == null) {return;}String name = xxlJob.value();//make and simplify the variables since they'll be called several times laterClass<?> clazz = bean.getClass();String methodName = executeMethod.getName();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");}if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");}executeMethod.setAccessible(true);// init and destroyMethod initMethod = null;Method destroyMethod = null;//初始化代码if (xxlJob.init().trim().length() > 0) {try {initMethod = clazz.getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");}}//销毁代码if (xxlJob.destroy().trim().length() > 0) {try {destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");}}// registry jobhandlerregistJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}
}

最终包装成为了MethodJobHandler

public class MethodJobHandler extends IJobHandler {private final Object target;private final Method method;private Method initMethod;private Method destroyMethod;
}

以上流程总结如下

在spring启动,所有单例类都创建完成后,触发从ApplicationContext获取所有标注了 @XxlJob的bean和对应方法。最终封装成为MethodJobHandler,存储到了了一个ConcurrentMap中。key为JOB的唯一标识,与服务端一对一对应。等待服务端的调用。

4.4.2 JobThread

从@XxlJob的原理,可以看到,一个job最终会被封装成为MethodJobHandler,那么客户端如何处理服务端下发的调度任务呢。
J
obThread是真正客户端真正执行任务的地方。每一个JAVA类型的JOB都会对应一个JobThread。

public class JobThread extends Thread{private static Logger logger = LoggerFactory.getLogger(JobThread.class);private int jobId;//标注了@XxlJob的方法或者从前端传过来的代码脚本。private IJobHandler handler;// 存储服务端穿过来的请求,如果前一个任务没有执行文,后续的会继续存在这里。private LinkedBlockingQueue<TriggerParam> triggerQueue;// 服务端每发送一次到客户端,会生成一个唯一的JOBid,可以用来做幂等,防止HTTP请求重试等造成重复调用。private Set<Long> triggerLogIdSet;private volatile boolean toStop = false;private String stopReason;private boolean running = false;    // if running jobprivate int idleTimes = 0;			// idel timespublic JobThread(int jobId, IJobHandler handler) {this.jobId = jobId;this.handler = handler;this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());// assign job thread namethis.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis());}public IJobHandler getHandler() {return handler;}//存储服务端调度请求public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}//杀死调度任务public void toStop(String stopReason) {/*** Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;*/this.toStop = true;this.stopReason = stopReason;}//启动线程@Overridepublic void run() {// inittry {handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// 死循环知道停止while(!toStop){running = false;//统计空闲次数 超过30次就终止线程idleTimes++;TriggerParam triggerParam = null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0;triggerLogIdSet.remove(triggerParam.getLogId());// 创建日志文件,用于存储日志,日志异步上报。String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// 初始化上下文,使用InheritableThreadLocal保存XxlJobContext.setXxlJobContext(xxlJobContext);// executeXxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());if (triggerParam.getExecutorTimeout() > 0) {// 设定调度过期时间Thread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);handler.execute();return true;}});futureThread = new Thread(futureTask);futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {//没有过期时间,直接执行handler.execute();}// 处理执行结果if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {XxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {if (idleTimes > 30) {if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}// handle resultStringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();XxlJobHelper.handleFail(errorMsg);XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {if(triggerParam != null) {// callback handler infoif (!toStop) {// 提交处理结果到队列中,等待上报TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// 提交处理结果到队列中,等待上报TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job running, killed]" ));}}}}// callback trigger request in queuewhile(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}// destroytry {handler.destroy();} catch (Throwable e) {logger.error(e.getMessage(), e);}logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());}
}

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

4.4.3 ExecutorBizImpl

客户端在启动后,会在EmbedServer实例中启动一个netty专门接收从服务端发送来的请求。其中包括检查服务端心跳,job线程心跳,终止调度,读取日志以及JOB调度等,都是交给ExecutorBizImpl进行处理。这里主要介绍下任务调度的过程:

public class ExecutorBizImpl implements ExecutorBiz {@Overridepublic ReturnT<String> run(TriggerParam triggerParam) {// 根据id获取 JOB的执行线程JobThreadJobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());// 获取jobThread内部的 jobHandlerIJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;String removeOldReason = null;// valid:jobHandler + jobThreadGlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());if (GlueTypeEnum.BEAN == glueTypeEnum) {// new jobhandlerIJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());// valid old jobThreadif (jobThread!=null && jobHandler != newJobHandler) {// change handler, need kill old threadremoveOldReason = "change jobhandler or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = newJobHandler;if (jobHandler == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");}}} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof GlueJobHandler&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change handler or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {try {IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());}}} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof ScriptJobHandler&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change script or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));}} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");}// executor block strategyif (jobThread != null) {ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// discard when runningif (jobThread.isRunningOrHasQueue()) {return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {// kill running jobThreadif (jobThread.isRunningOrHasQueue()) {removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread = null;}} else {// just queue trigger}}// replace thread (new or exists invalid)if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}// push data to queueReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);return pushResult;}
}

4.5 几个表格作用

xxl_job_group:任务分组。一个执行器算作一个组。每组下面会记录对应的实例地址。
在这里插入图片描述

xxl_job_info:具体的任务信息。
xxl_job_lock:分布式锁
xxl_job_log:日志
xxl_job_log_report:调度统计
xxl_job_logglue: 可以记录GLUE模式代码历史版本
xxl_job_registry:注册信息表,每一台机器注册上来,都会记录一条记录。

在这里插入图片描述

xxl_job_user:用户表

其中xxl_job_group,xxl_job_info,xxl_job_lock是调度器的关键,其他的是起到支撑辅助作用。

5 Quartz

5.1 什么是 Quartz

从 Quartz官网 简介可以知道,
Quartz 是一个开源的任务调度框架,可以用于单体应用,也可以用于大型的电子商务平台,支持成千上万的任务。

5.1 Quartz 简单demo

新建一个maven项目,引入依赖

<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.1</version>
</dependency>
public class Application {public static void main(String[] args) {try {Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();scheduler.start();//定义一个工作对象 设置工作名称与组名JobDetail job = JobBuilder.newJob(HelloJob.class).withIdentity("job41","group1").build();//定义一个触发器 简单Trigger 设置工作名称与组名 5秒触发一次Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1","group1").startNow().withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(5)).build();//设置工作 与触发器scheduler.scheduleJob(job, trigger);
//            scheduler.shutdown();} catch (SchedulerException se) {se.printStackTrace();}}
}

Scheduler 是一个接口定义了一系列提交job的方法。
其实现类如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0GBKRmSg-1646875792192)(./assets/JAVA任务调度技术-1645528734979.png)]

查看 StdScheduler 本质上是一个代理类,代理了 QuartzScheduler 类的所有方法。

挑选一个方法来看:

public class QuartzScheduler implements RemotableQuartzScheduler {//存储jobprivate QuartzSchedulerResources resources;public Date scheduleJob(JobDetail jobDetail,Trigger trigger) throws SchedulerException {//省略校验OperableTrigger trig = (OperableTrigger) trigger;if (trigger.getJobKey() == null) {trig.setJobKey(jobDetail.getKey());} else if (!trigger.getJobKey().equals(jobDetail.getKey())) {throw new SchedulerException("Trigger does not reference given job!");}trig.validate();Calendar cal = null;if (trigger.getCalendarName() != null) {cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());}Date ft = trig.computeFirstFireTime(cal);if (ft == null) {throw new SchedulerException("Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");}resources.getJobStore().storeJobAndTrigger(jobDetail, trig);notifySchedulerListenersJobAdded(jobDetail);notifySchedulerThread(trigger.getNextFireTime().getTime());notifySchedulerListenersSchduled(trigger);return ft;}//start方法public void start() throws SchedulerException {if (shuttingDown|| closed) {throw new SchedulerException("The Scheduler cannot be restarted after shutdown() has been called.");}// QTZ-212 : calling new schedulerStarting() method on the listeners// right after entering start()notifySchedulerListenersStarting();if (initialStart == null) {initialStart = new Date();//启动了线程this.resources.getJobStore().schedulerStarted();startPlugins();} else {resources.getJobStore().schedulerResumed();}schedThread.togglePause(false);getLog().info("Scheduler " + resources.getUniqueIdentifier() + " started.");notifySchedulerListenersStarted();}
}

可以看到存储和触发的代码

resources.getJobStore().storeJobAndTrigger(jobDetail, trig);

其中resources.getJobStore() 为 JobStore 实例。用于存储job和triger提供给 QuartzScheduler 使用。

JOB Store

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OIbYPdrE-1646875792193)(./assets/JAVA任务调度技术-1645529673682.png)]

RAMJobStore 内存行存储,单机情况下默认。
JDBC JobStore 数据库。
查看 RAMJobStore 。

public class RAMJobStore implements JobStore {/** ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~* * Data members.* * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/protected HashMap<JobKey, JobWrapper> jobsByKey = new HashMap<JobKey, JobWrapper>(1000);protected HashMap<TriggerKey, TriggerWrapper> triggersByKey = new HashMap<TriggerKey, TriggerWrapper>(1000);protected HashMap<String, HashMap<JobKey, JobWrapper>> jobsByGroup = new HashMap<String, HashMap<JobKey, JobWrapper>>(25);protected HashMap<String, HashMap<TriggerKey, TriggerWrapper>> triggersByGroup = new HashMap<String, HashMap<TriggerKey, TriggerWrapper>>(25);protected TreeSet<TriggerWrapper> timeTriggers = new TreeSet<TriggerWrapper>(new TriggerWrapperComparator());protected HashMap<String, Calendar> calendarsByName = new HashMap<String, Calendar>(25);protected Map<JobKey, List<TriggerWrapper>> triggersByJob = new HashMap<JobKey, List<TriggerWrapper>>(1000);protected final Object lock = new Object();protected HashSet<String> pausedTriggerGroups = new HashSet<String>();protected HashSet<String> pausedJobGroups = new HashSet<String>();protected HashSet<JobKey> blockedJobs = new HashSet<JobKey>();protected long misfireThreshold = 5000l;protected SchedulerSignaler signaler;//获取下一个任务public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) {synchronized (lock) {List<OperableTrigger> result = new ArrayList<OperableTrigger>();Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();Set<TriggerWrapper> excludedTriggers = new HashSet<TriggerWrapper>();long batchEnd = noLaterThan;// return empty list if store has no triggers.if (timeTriggers.size() == 0)return result;while (true) {TriggerWrapper tw;try {tw = timeTriggers.first();if (tw == null)break;timeTriggers.remove(tw);} catch (java.util.NoSuchElementException nsee) {break;}if (tw.trigger.getNextFireTime() == null) {continue;}if (applyMisfire(tw)) {if (tw.trigger.getNextFireTime() != null) {timeTriggers.add(tw);}continue;}if (tw.getTrigger().getNextFireTime().getTime() > batchEnd) {timeTriggers.add(tw);break;}// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then// put it back into the timeTriggers set and continue to search for next trigger.JobKey jobKey = tw.trigger.getJobKey();JobDetail job = jobsByKey.get(tw.trigger.getJobKey()).jobDetail;if (job.isConcurrentExectionDisallowed()) {if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {excludedTriggers.add(tw);continue; // go to next trigger in store.} else {acquiredJobKeysForNoConcurrentExec.add(jobKey);}}tw.state = TriggerWrapper.STATE_ACQUIRED;tw.trigger.setFireInstanceId(getFiredTriggerRecordId());OperableTrigger trig = (OperableTrigger) tw.trigger.clone();if (result.isEmpty()) {batchEnd = Math.max(tw.trigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;}result.add(trig);if (result.size() == maxCount)break;}// If we did excluded triggers to prevent ACQUIRE state due to DisallowConcurrentExecution, we need to add them back to store.if (excludedTriggers.size() > 0)timeTriggers.addAll(excludedTriggers);return result;}}
}

从以上源码可知 RAMJobStore 是存储job以及相关参数的地方。
其中特别注意的是timeTriggers属性,其使用TreeSet来保存,这个是实现类似Timer优先队列的作用。

QuartzSchedulerThread 的run方法,就是去获取任务的地方。

public class QuartzSchedulerThread extends Thread {/*** <p>* The main processing loop of the <code>QuartzSchedulerThread</code>.* </p>*/@Overridepublic void run() {int acquiresFailed = 0;while (!halted.get()) {try {// check if we're supposed to pause...synchronized (sigLock) {while (paused && !halted.get()) {try {// wait until togglePause(false) is called...sigLock.wait(1000L);} catch (InterruptedException ignore) {}// reset failure counter when paused, so that we don't// wait again after unpausingacquiresFailed = 0;}if (halted.get()) {break;}}// wait a bit, if reading from job store is consistently// failing (e.g. DB is down or restarting)..if (acquiresFailed > 1) {try {long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);Thread.sleep(delay);} catch (Exception ignore) {}}int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...List<OperableTrigger> triggers;long now = System.currentTimeMillis();clearSignaledSchedulingChange();try {triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());acquiresFailed = 0;if (log.isDebugEnabled())log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");} catch (JobPersistenceException jpe) {if (acquiresFailed == 0) {qs.notifySchedulerListenersError("An error occurred while scanning for the next triggers to fire.",jpe);}if (acquiresFailed < Integer.MAX_VALUE)acquiresFailed++;continue;} catch (RuntimeException e) {if (acquiresFailed == 0) {getLog().error("quartzSchedulerThreadLoop: RuntimeException "+e.getMessage(), e);}if (acquiresFailed < Integer.MAX_VALUE)acquiresFailed++;continue;}if (triggers != null && !triggers.isEmpty()) {now = System.currentTimeMillis();long triggerTime = triggers.get(0).getNextFireTime().getTime();long timeUntilTrigger = triggerTime - now;while(timeUntilTrigger > 2) {synchronized (sigLock) {if (halted.get()) {break;}if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {try {// we could have blocked a long while// on 'synchronize', so we must recomputenow = System.currentTimeMillis();timeUntilTrigger = triggerTime - now;if(timeUntilTrigger >= 1)sigLock.wait(timeUntilTrigger);} catch (InterruptedException ignore) {}}}if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {break;}now = System.currentTimeMillis();timeUntilTrigger = triggerTime - now;}// this happens if releaseIfScheduleChangedSignificantly decided to release triggersif(triggers.isEmpty())continue;// set triggers to 'executing'List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();boolean goAhead = true;synchronized(sigLock) {goAhead = !halted.get();}if(goAhead) {try {List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);if(res != null)bndles = res;} catch (SchedulerException se) {qs.notifySchedulerListenersError("An error occurred while firing triggers '"+ triggers + "'", se);//QTZ-179 : a problem occurred interacting with the triggers from the db//we release them and loop againfor (int i = 0; i < triggers.size(); i++) {qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));}continue;}}for (int i = 0; i < bndles.size(); i++) {TriggerFiredResult result =  bndles.get(i);TriggerFiredBundle bndle =  result.getTriggerFiredBundle();Exception exception = result.getException();if (exception instanceof RuntimeException) {getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));continue;}// it's possible to get 'null' if the triggers was paused,// blocked, or other similar occurrences that prevent it being// fired at this time...  or if the scheduler was shutdown (halted)if (bndle == null) {qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));continue;}JobRunShell shell = null;try {shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);shell.initialize(qs);} catch (SchedulerException se) {qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);continue;}if (qsRsrcs.getThreadPool().runInThread(shell) == false) {// this case should never happen, as it is indicative of the// scheduler being shutdown or a bug in the thread pool or// a thread pool being used concurrently - which the docs// say not to do...getLog().error("ThreadPool.runInThread() return false!");qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);}}continue; // while (!halted)}} else { // if(availThreadCount > 0)// should never happen, if threadPool.blockForAvailableThreads() follows contractcontinue; // while (!halted)}long now = System.currentTimeMillis();long waitTime = now + getRandomizedIdleWaitTime();long timeUntilContinue = waitTime - now;synchronized(sigLock) {try {if(!halted.get()) {// QTZ-336 A job might have been completed in the mean time and we might have// missed the scheduled changed signal by not waiting for the notify() yet// Check that before waiting for too long in case this very job needs to be// scheduled very soonif (!isScheduleChanged()) {sigLock.wait(timeUntilContinue);}}} catch (InterruptedException ignore) {}}} catch(RuntimeException re) {getLog().error("Runtime error occurred in main trigger firing loop.", re);}} // while (!halted)// drop references to scheduler stuff to aid garbage collection...qs = null;qsRsrcs = null;}} 

5.2 几个概念

Trigger
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-seQJRg7e-1646875792194)(./assets/Java任务调度-Quartz-1645780446974.png)]

6 Elastic-Job

待续

7 Apache DolphinScheduler

待续

总结

请添加图片描述

参考
Java中常见的几种任务调度框架对比

引用

  1. Spring Job?Quartz?XXL-Job?年轻人才做选择,艿艿全莽~
  2. Timer与TimerTask的真正原理&使用介绍
  3. 深入 DelayQueue 内部实现
  4. PriorityQueue详解
  5. Java优先级队列DelayedWorkQueue原理分析
  6. 【Java基础】JAVA中优先队列详解
  7. quartz (从原理到应用)详解篇
  8. 《分布式任务调度平台XXL-JOB》-官方文档
  9. 平衡二叉堆
  10. 聊聊Java进阶之并发基础技术—线程池剖析
  11. Java中常见的几种任务调度框架对比
  12. Quartz 源码解析(一) —— 基本介绍

(该文成于2022年公司内部主题分享)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/58331.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

将多个commit合并成一个commit并提交

0 Preface/foreword 1 压缩多个commit方法 1.1 git merge --squash 主分支&#xff1a;main 开发分支&#xff1a;test 当前在test分支提交了8个commits&#xff0c;功能已经开发完成&#xff0c;需要将test分支合并到main分支&#xff0c;但是不想在合并时候&#xff0c;看…

开源一套基于若依的wms仓库管理系统,支持lodop和网页打印入库单、出库单的源码

大家好&#xff0c;我是一颗甜苞谷&#xff0c;今天分享一款基于若依的wms仓库管理系统&#xff0c;支持lodop和网页打印入库单、出库单的源码。 前言 在当今快速发展的商业环境中&#xff0c;库存管理对于企业来说至关重要。然而&#xff0c;许多企业仍然依赖于传统的、手动…

【Rust】环境搭建

▒ 目录 ▒ &#x1f6eb; 导读需求 1️⃣ 安装Chocolatey安装依赖 2️⃣ 安装RustRover安装toolchain&#xff08;rustup、VS&#xff09;重启配置生效设置安装插件 &#x1f4d6; 参考资料 &#x1f6eb; 导读 需求 重装系统&#xff0c;记录下环境搭建遇到的问题。 1️⃣ …

安装Ubuntu系统

打开vmware&#xff0c;新建一个Ubuntu虚拟机&#xff0c;点击自定义&#xff0c;进入下一步 &#xff0c;选择Workstation 17.x后&#xff0c;点击下一步 选择稍后安装系统选项&#xff0c;进入选择客户机操作系统页面&#xff0c;客户机操作系统选择Linux&#xff0c;版本选…

rom定制系列------红米note8_miui14安卓13定制修改固件 带面具root权限 刷写以及界面预览

&#x1f49d;&#x1f49d;&#x1f49d;红米note8机型代码&#xff1a;ginkgo。高通芯片。此固件官方最终版为稳定版12.5.5安卓11的版本。目前很多工作室需要高安卓版本的固件来适应他们的软件。并且需要root权限。根据客户要求。修改固件为完全root。并且修改为可批量刷写的…

电脑仅一个C盘如何重装系统?超简单教程分享!

当我们的电脑仅配备一个C盘时&#xff0c;重装系统的过程可能会显得尤为棘手。因为一旦格式化硬盘&#xff0c;安装系统的分区也可能被一并清除&#xff0c;导致安装过程中断。这时候我们完全可以通过对电脑进行分区来解决这一问题。分区不仅能够帮助我们更好地管理硬盘空间&am…

提升网站速度与性能优化的有效策略与实践

内容概要 在数字化快速发展的今天&#xff0c;网站速度与性能优化显得尤为重要&#xff0c;它直接影响用户的浏览体验。用户在访问网站时&#xff0c;往往希望能够迅速获取信息&#xff0c;若加载时间过长&#xff0c;轻易可能导致他们转向其他更为流畅的网站。因此&#xff0…

流媒体协议.之(RTP,RTCP,RTSP,RTMP,HTTP)(二)

继续上篇介绍&#xff0c;本篇介绍一下封装RTP的数据格式&#xff0c;如何将摄像头采集的码流&#xff0c;音频的码流&#xff0c;封装到rtp里&#xff0c;传输。 有自己私有协议例子&#xff0c;有rtp协议&#xff0c;参考代码。注意不是rtsp协议。 一、私有协议 玩过tcp协议…

构建灵活、高效的HTTP/1.1应用:探索h11库

文章目录 构建灵活、高效的HTTP/1.1应用&#xff1a;探索h11库背景这个库是什么&#xff1f;如何安装这个库&#xff1f;库函数使用方法使用场景常见的Bug及解决方案总结 构建灵活、高效的HTTP/1.1应用&#xff1a;探索h11库 背景 在现代网络应用中&#xff0c;HTTP协议是基础…

利用游戏引擎的优势

大家好&#xff0c;我是小蜗牛。 在当今快速发展的游戏产业中&#xff0c;选择合适的游戏引擎对开发者来说至关重要。Cocos Creator作为一款功能强大且灵活的游戏引擎&#xff0c;为开发者提供了丰富的工具和资源&#xff0c;使他们能够高效地开发出优秀的游戏。本文将探讨如何…

仓颉编程语言官网正式上线 !首个公测版本开放下载 !

今年6月21日&#xff0c;华为开发者大会&#xff08;HDC&#xff09;正式公开介绍了华为自研的通用编程语言&#xff1a;仓颉编程语言&#xff0c;并发布了HarmonyOS NEXT仓颉语言开发者预览版&#xff0c;开发者可以使用仓颉开发鸿蒙原生应用。4个月以来&#xff0c;仓颉编程语…

PHP爬虫的奇幻之旅:如何用代码“偷窥”京东商品的SKU信息

开篇&#xff1a;代码界的007 想象一下&#xff0c;你是一名代码界的007&#xff0c;你的任务是潜入京东的数据库&#xff0c;获取商品的SKU信息。不过别担心&#xff0c;我们不是真的去偷数据&#xff0c;而是用PHP编写一个爬虫&#xff0c;合法地获取公开的API数据。这不仅是…

C++初阶(七)--类和对象(4)

目录 ​编辑 一、再谈构造函数 1.构造函数体赋值 2.初始化列表 二、类型转换 1.隐式类型转换 2.explicit关键字 3.类类型之间的对象隐式转换 三、static成员函数 1.概念 2.特性 3.面试题&#xff1a; 四、友元函数 1.基本介绍 2.回顾&#xff1a; 3.友元类&am…

【问题记录】当机器人存在多个串口需要绑定时udevadm的作用

一、正常绑定 输入sudo udevadm info -a /dev/ttyUSBx | grep KERNELS 命令 会出现KERNELS的编号&#xff0c;记录编号。 修改规则文件/etc/udev/rules.d/99-usb.rules 添加以下命令 KERNEL"ttyUSB*", KERNELS"2-1.2:1.0", MODE:"0666", GROU…

kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?

大家好&#xff0c;我是锋哥。今天分享关于【kafka 分布式&#xff08;不是单机&#xff09;的情况下&#xff0c;如何保证消息的顺序消费?】面试题&#xff1f;希望对大家有帮助&#xff1b; kafka 分布式&#xff08;不是单机&#xff09;的情况下&#xff0c;如何保证消息的…

微信小程序时间弹窗——年月日时分

需求 1、默认当前时间2、选择时间弹窗限制最大值、最小值3、每次弹起更新最大值为当前时间&#xff0c;默认值为上次选中时间4、 minDate: new Date(2023, 10, 1).getTime(),也可以传入时间字符串new Date(2023-10-1 12:22).getTime() html <view class"flex bb ptb…

【UE5.3 Cesium for Unreal】编译GlobePawn

目录 前言 效果 步骤 一、下载所需文件 二、下载CesiumForUnreal插件 三、处理下载的文件 四、修改代码 “CesiumForUnreal.uplugin”部分 “CesiumEditor.cpp”部分 “CesiumEditor.h”部分 “CesiumPanel.cpp”部分 “IonQuickAddPanel.cpp”部分 “IonQuickAd…

截取一个字符串的一部分赋值给另一个字符串

文章目录 截取一个字符串的一部分赋值给另一个字符串1.string s(s1,pos,len)2.s.substr(pos,n) 返回一个string 截取一个字符串的一部分赋值给另一个字符串 1.string s(s1,pos,len) s是string s1从下标pos开始len个字符的拷贝。如果pos>s1.size()&#xff0c;构造函数未定…

Nginx+Lua脚本+Redis 实现自动封禁访问频率过高IP

1 、安装OpenResty 安装使用 OpenResty&#xff0c;这是一个集成了各种 Lua 模块的 Nginx 服务器&#xff0c;是一个以Nginx为核心同时包含很多第三方模块的Web应用服务器&#xff0c;使用Nginx的同时又能使用lua等模块实现复杂的控制。 &#xff08;1&#xff09;安装编译工具…

[Linux] linux 软硬链接与动静态库

标题&#xff1a;[Linux] linux 软硬链接与动静态库 个人主页水墨不写bug &#xff08;图片来源于网络&#xff09; /** _oo0oo_* o8888888o* 88" . "88* (| -_- |)* …