图解java.util.concurrent并发包源码系列——深入理解定时任务线程池ScheduledThreadPoolExecutor

深入理解定时任务线程池ScheduledThreadPoolExecutor

  • ScheduledThreadPoolExecutor作用与用法
  • ScheduledThreadPoolExecutor内部执行流程
  • DelayedWorkQueue
  • ScheduledFutureTask
  • 源码分析
    • 任务提交
      • ScheduledFutureTask的属性和方法
      • delayedExecute(t)
    • 任务执行
      • ScheduledFutureTask.super.run()
      • ScheduledFutureTask.super.runAndReset()
      • setNextRunTime()
      • reExecutePeriodic(outerTask)
    • DelayedWorkQueue
      • 小顶堆
      • DelayedWorkQueue成员变量
      • void add(Runnable e)
        • void grow()
        • void siftUp(int k, RunnableScheduledFuture<?> key)
      • RunnableScheduledFuture<?> take()
        • RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f)
          • void siftDown(int k, RunnableScheduledFuture<?> key)
  • 总结

ScheduledThreadPoolExecutor作用与用法

ScheduledThreadPoolExecutor是一个用于执行定时任务或延时任务的线程池,提交到该线程池中的任务会等待执行时间到了才会被执行。与此相对的是ThreadPoolExecutor,提交到ThreadPoolExecutor中的任务,只要ThreadPoolExecutor中有空闲线程,就会被马上执行,如果ThreadPoolExecutor中没有空闲线程,则会把任务放入队列。而提交到ScheduledThreadPoolExecutor中的任务,不管此时ScheduledThreadPoolExecutor有没有空闲线程,任务都会被放入到队列里去,等待任务执行时间到期时被线程从队列中取出并执行。

除此以外,ScheduledThreadPoolExecutor是继承自ThreadPoolExecutor的,因此ThreadPoolExecutor有的东西ScheduledThreadPoolExecutor也有。比如任务队列 BlockingQueue<Runnable> workQueue,工作者线程集合 HashSet<Worker> workers。

在这里插入图片描述

ScheduledThreadPoolExecutor一般可以运用在一些非实时性或者非交互性的场景。比如微服务的注册中心就可以通过定时任务扫描没有在规定时间之内续约的服务,将其下线。

在这里插入图片描述

提交到ScheduledThreadPoolExecutor的任务有两种类型,一种时定时任务,一种时延时任务。我们可以调ScheduledThreadPoolExecutor不同的方法来提交不同类型的任务。

如果要提交的任务是延时任务,我们可以调用ScheduledThreadPoolExecutor的schedule方法:

    public ScheduledFuture<?> schedule(Runnable command, // 要执行的任务,不带返回值long delay, // 延迟时间TimeUnit unit) // 时间单位
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,  // 要执行的任务,带返回值long delay, // 延迟时间TimeUnit unit) // 时间单位

在这里插入图片描述

如果要提交定时任务,可以调用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法或者scheduleWithFixedDelay方法:

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, // 要执行的任务,不带返回值long initialDelay, // 延迟时间long period, // 执行周期TimeUnit unit) // 时间单位

在这里插入图片描述

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, // 要执行的任务,不带返回值long initialDelay, // 初始延迟时间long delay, // 后面每次执行完上一次任务,延迟多久执行下一次任务TimeUnit unit) // 时间单位

在这里插入图片描述

scheduleAtFixedRate方法以固定的时间周期period运行任务,前一次任务开始执行后,下一次的任务会在固定的周期period之后再执行。而scheduleWithFixedDelay方法是每次执行完上一次的任务,会延迟一定的时间delay之后再执行下一次的任务。

ScheduledThreadPoolExecutor内部执行流程

  1. 提交到ScheduledThreadPoolExecutor中的任务Runnable,会被封装为一个Task,该Task会被入队列
  2. 然后检查ScheduledThreadPoolExecutor中是否有工作者线程,没有的话要新建一个工作者线程,添加到ScheduledThreadPoolExecutor中的工作者线程集合中
  3. ScheduledThreadPoolExecutor后台会有工作者线程从任务队列中拉取任务,但是ScheduledThreadPoolExecutor中的队列是一个延时队列,所以队列中的任务只有到了执行时间之后才会被执行。
  4. ScheduledThreadPoolExecutor中的延时队列内部是一个小顶堆结构,任务会按照到期时间从小到大进行排序,堆顶是最早到期的任务。
  5. 如果一个任务它是定时任务,那么它被执行完以后,会更新下一次的到期时间,然后重新放回到队列

在这里插入图片描述

DelayedWorkQueue

ScheduledThreadPoolExecutor有一个内部类DelayedWorkQueue,他是一个延时阻塞队列。DelayedWorkQueue用于存放提交到ScheduledThreadPoolExecutor中的任务,DelayedWorkQueue内部使用一个小顶堆存储提交到ScheduledThreadPoolExecutor中的任务,放入DelayedWorkQueue中的任务,会按照到期时间从小到大进行排序,堆顶元素是最早到期的元素。

小顶堆是一个特殊的二叉树。这颗二叉树中的每个父节点,都比它的两个字节点要小。其次这颗二叉树不是真的用二叉树的结构来实现的,而是用一个数组实现的,也就是用一个数组去模拟一个符合小顶堆结构的二叉树。比如父节点是数组下标n,那么左子节点的数组下标是2n+1,右子节点的下标就是2n+2。

每次往DelayedWorkQueue中放入任务时,都会从堆底往堆顶做向上调整。每次从DelayedWorkQueue中获取任务后,堆底任务会被提到堆顶,然后从堆顶到堆底做一次堆的向下调整。

在这里插入图片描述

DelayedWorkQueue中有一个Thread类型的leader变量存放等待堆顶任务到期的线程。因为ScheduledThreadPoolExecutor中可能有许多线程,线程从ScheduledThreadPoolExecutor中获取任务是从DelayedWorkQueue里面的堆顶获取,但是堆顶任务只能由一个线程执行,那么该由哪个线程执行呢?那就是leader线程是谁就由谁执行。

那么其他非leader的线程呢?这些线程就要在DelayedWorkQueue内部的一个Condition类型的条件队列available中进行等待。当一个线程成功从DelayedWorkQueue中取走一个任务时,会唤醒available中的一个线程,此时这个线程就可以去竞争当上leader线程了。

在这里插入图片描述

ScheduledFutureTask

我们提交到ScheduledThreadPoolExecutor中的任务都是Runnable类型的。但是ScheduledThreadPoolExecutor需要标记这些Runnable对象什么时候到期被执行,并且这些Runnable之间要互相比较到期时间,好让它们在DelayedWorkQueue的堆中被从小到大排序。因此ScheduledThreadPoolExecutor使用了ScheduledFutureTask类型去封装被提交进来的Runnable对象。

ScheduledFutureTask对象使用一个time属性记录下一次执行的时间,使用一个sequenceNumber记录自己的序号。ScheduledFutureTask重写了Comparable接口的compareTo方法。compareTo方法与其他任务比较时,首先会比较time属性谁更小,谁的到期时间就更早,那么在堆中的排序就越靠近堆顶,如果time属性相等,则比较谁的sequenceNumber更小。

ScheduledFutureTask还有一个period属性记录执行任务的时间间隔,这个属性可以用于计算下一次的执行时间time。

在这里插入图片描述

源码分析

下面我们进入ScheduledThreadPoolExecutor的内部,阅读ScheduledThreadPoolExecutor的源码,了解它的运行逻辑和核心原理。

任务提交

我们以ScheduledThreadPoolExecutor的scheduleAtFixedRate方法为例,看看任务被提交到ScheduledThreadPoolExecutor之后是怎么处理。

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();// step1ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;// step2delayedExecute(t);return t;}

总体上分两步。

第一步(step1)是把我们传进来的Runnable封装为一个ScheduledFutureTask对象。封装为ScheduledFutureTask对象是为了更方便的计算任务的执行时间,以及在堆中的排序。

ScheduledFutureTask利用一个time属性记录下一次任务的执行时间,调用ScheduledFutureTask的setNextRunTime()方法会自动计算下一次的执行时间并更新time属性。ScheduledFutureTask实现了Comparable接口,compareTo方法会比较两个任务的执行时间谁更快到期,谁的排序就更优先。

然后第二步(step2)就是调用delayedExecute(t),把ScheduledFutureTask对象放入到ScheduledThreadPoolExecutor的队列中,等待执行时间到期被ScheduledThreadPoolExecutor中的Worker线程取走并执行。

ScheduledFutureTask的属性和方法

既然第一步把我们传进来的Runnable对象封装为了一个ScheduledFutureTask,我们看看ScheduledFutureTask内部到底有什么。

    private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {// 序列号,每个ScheduledFutureTask都有一个唯一的序列号,顺序递增private final long sequenceNumber;// 下一次的执行时间private long time;// 执行周期private final long period;// 当前任务执行完后,重新放回队列的任务,一般就是当前任务本身RunnableScheduledFuture<V> outerTask = this;// 当前任务在DelayedWorkQueue的堆数组中的下标int heapIndex;}

以上是ScheduledFutureTask的属性。

sequenceNumber是ScheduledThreadPoolExecutor分配给当前ScheduledFutureTask的序列号,作用就是在比较两个任务的排序优先级时,如果time属性相同,会进一步拿sequenceNumber进行比较

time记录的是当前ScheduledFutureTask下一次执行的时间。

period是任务的执行周期,用于计算下一次的执行时间,计算结果会赋值给time。

outerTask就是当前的ScheduledFutureTask对象自己,用于重回队列时作为参数传递。如果我们设置outerTask为其他的ScheduledFutureTask对象,那么下一次执行的就是不同的任务。如果不做修改的话,就是当前ScheduledFutureTask对象自己。

heapIndex是记录当前ScheduledFutureTask对象在DelayedWorkQueue的堆数组中的下标。有了heapIndex属性之后,就可以很快速的从堆数组中找到对应的ScheduledFutureTask对象,比如我们判断一个ScheduledFutureTask对象是否在堆中,就可以拿到ScheduledFutureTask的heapIndex属性,从堆数组中取出heapIndex下标对应的ScheduledFutureTask对象,判断两个对象是否相等,相等表示当前ScheduledFutureTask对象在堆中,而DelayedWorkQueue的contains方法正是这样的逻辑。

delayedExecute(t)

Runnable对象封装为ScheduledFutureTask对象后,下一步就是要把它放入到队列中,并检查ScheduledThreadPoolExecutor中是否有线程,如果没有要创建一个,保证当前任务有线程执行它。

    private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {// step2.1super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else// step2.2ensurePrestart();}}

非核心逻辑不看,就看重点的两行代码。

step2.1:super.getQueue().add(task); 就是把当前ScheduledFutureTask对象放入队列中。super.getQueue()是调用父类ThreadPoolExecutor的getQueue()获取线程池中的阻塞队列,ScheduledThreadPoolExecutor的构造方法创建的队列是DelayedWorkQueue,所以这里获取到的时DelayedWorkQueue。然后调用DelayedWorkQueue的add方法把ScheduledFutureTask对象放入DelayedWorkQueue中。

step2.2:ensurePrestart()在任务放入队列之后被调用,用于检测ScheduledThreadPoolExecutor是否需要创建线程,如果需要的话会创建一个线程。

在这里插入图片描述

由于DelayedWorkQueue的add方法比较复杂,我们放到后面再看,先看完大体流程。这里我们暂时先理解为放入队列中的任务会按到期执行时间从小到大排好序。

下面看看ensurePrestart()方法如何判断是否需要创建线程。

    void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)addWorker(null, true);else if (wc == 0)addWorker(null, false);}

如果ScheduledThreadPoolExecutor中的线程数小于核心线程数,那不管ScheduledThreadPoolExecutor中有没有已经创建的线程,都会再创建一个线程,目的是为了让ScheduledThreadPoolExecutor尽快达到核心线程数。如果已经达到了核心线程数,那么就不会再创建线程。

下面的 else if 分支是当我们设置ScheduledThreadPoolExecutor的核心线程数为0时,会进入的一个逻辑,那么就看ScheduledThreadPoolExecutor中有没有线程,没有就创建,有就不创建。

在这里插入图片描述

创建线程的方法是addWorker方法,这个是ThreadPoolExecutor的方法,会创建一个Thread线程对象,并包装成Worker对象放入到一个 HashSet<Worker> workers 集合中,最后会调用Thread对象的start()方法启动线程。

线程启动后会在一个while循环中不停地从队列中拉取任务并执行,拉取任务就是调用BlockingQueue的take()方法,这里就是DelayedWorkQueue的take()方法。

在这里插入图片描述

由于addWorker方法是ThreadPoolExecutor中的方法,不是本篇文章的重点,而且ThreadPoolExecutor的源码是非常简单的,应该是新手都能看得懂,这里就不展开分析了。

至于DelayedWorkQueue的take()方法稍微有点复杂,我们也是放到后面进行分析。这里我们暂时先理解线程会等待队列中最早到期的任务到期后取走。

任务执行

当任务到期被leader线程从队列中取出后,任务ScheduledFutureTask的run()方法就会被执行。

        public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);// 延时任务else if (!periodic)ScheduledFutureTask.super.run();// 定时任务else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}}

如果是延时任务,那么之后执行一次,不会重回队列,直接调用父类的run()方法运行任务,然后会进入到FutureTask的run()方法中。如果是定时任务,会周期性的运行,调用父类的runAndReset()方法进入到FutureTask的runAndReset()方法,然后调用setNextRunTime()设置下一次的运行时间,然后调用reExecutePeriodic(outerTask)方法把任务重回队列。

在这里插入图片描述

ScheduledFutureTask.super.run()

ScheduledFutureTask的父类是FutureTask,ScheduledFutureTask.super.run() 会进入 FutureTask的run()方法。

    public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {// 执行Callable的call()方法,正在执行任务result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {runner = null;int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}

FutureTask的run()方法调用Callable的call()方法真正执行任务,这里执行的不是Runnable的run()方法,是因为我们的Runnable对象被转成了一个Callable的实现类,最后会调到我们的Runnable的run()方法。

在这里插入图片描述

ScheduledFutureTask.super.runAndReset()

执行 ScheduledFutureTask.super.runAndReset() 会进入到 FutureTask的runAndReset()方法。

    protected boolean runAndReset() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {// 也是调用Callable的call()方法c.call();ran = true;} catch (Throwable ex) {setException(ex);}}} finally {runner = null;s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;}

FutureTask的runAndReset()方法也是调用Callable的call()方法,只是不会接收返回值并设置到FutureTask的结果中。

在这里插入图片描述

setNextRunTime()

任务执行完之后,就会调用setNextRunTime()方法更新下一次的执行时间。

        private void setNextRunTime() {long p = period;// 调用scheduleAtFixedRate方法提交进来的任务会进这个分支if (p > 0)time += p;// 调用scheduleWithFixedDelay方法提交进来的任务会进这个分支elsetime = triggerTime(-p);}

setNextRunTime()方法会更新ScheduledFutureTask的time属性为下一次的执行时间,我们调用的scheduleAtFixedRate方法进来的任务,会简单的把time加一个period周期。

在这里插入图片描述

reExecutePeriodic(outerTask)

设置好了任务下一次的运行时间后,就会调用reExecutePeriodic(outerTask)方法把任务重新放回队列中。

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(true)) {// 调用DelayedWorkQueue的add方法把任务重新放回队列super.getQueue().add(task);if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);elseensurePrestart();}}

可以看到就是调用DelayedWorkQueue的add方法把任务重新放回队列。

在这里插入图片描述

以上就是ScheduledThreadPoolExecutor的核心流程,包含了任务被提交到ScheduledThreadPoolExecutor之后的处理,从队列中被取出执行,执行完成后计算下次执行时间然后重回队列的整个过程。

在这里插入图片描述

但是在我们上面的流程分析中,我们跳过了DelayedWorkQueue的相关方法,也就是任务入队列时的DelayedWorkQueue#add方法,和任务出队列时的DelayedWorkQueue#take方法,下面我们就着重分析DelayedWorkQueue的相关源码。

DelayedWorkQueue

小顶堆

要理解DelayedWorkQueue的原理,首先要熟悉小顶堆。

小顶堆是一个特殊的二叉树。它不是全局有序,但是保证每个父节点都比两个子节点要小。而且小顶堆通常不会真的使用树结构来实现,而是用一个数组来模拟了树结构,每个树节点在数组中都有对应的下标,获取子节点也是通过下标换算来取得。

当需要获取父节点的子节点时,假如父节点的下标是n,那么左子节点的下标就是2n+1,右子节点的下标就是2n+2。当需要通过子节点寻找父节点时,假设子节点的下标是n,那么父节点就是 (n - 1) / 2。

当向小顶堆中加入新节点时,先添加在堆底,也就是添加在数组的最末尾。然后通过下标换算找到父节点的位置,与父节点进行比较,发现比父节点小,则跟父节点交换位置,然后继续通过下标换算找到新的父节点,再次比较,直到某一次比较发现比父节点大,则不再交换位置,此时新节点就放在这个位置上,整个过程是不断的把新节点从堆底往上提,所以叫做堆的向上调整。

下面是一个往小顶堆添加新节点的例子:

假设现在有一个小顶堆:[1, 4, 3, 8, 5, 6, 7],我们要往堆中加入值为2的节点。那么首先是把它放到数组尾部(堆底),[1, 4, 3, 8, 5, 6, 7, 2];现在新节点的下标是7,通过下标运算 (7 - 1) / 2 = 3,下标为3的节点就是父节点,与父节点进行比较,发现 2 < 8,因此和父节点交换位置,新节点来到下标为3的位置 [1, 4, 3, 2, 5, 6, 7, 8]

在这里插入图片描述
进入下一轮,再次通过下标换算 (3 - 1) / 2 = 1,找到下标为1的父节点,与父节点进行比较,发现 2 < 4,那么继续跟父节点交换位置,新节点来到下标1的位置 [1, 2, 3, 4, 5, 6, 7, 8]。

在这里插入图片描述
进入下一轮,再次通过下标换算 (1 - 1) / 2 = 0,找到下标为0的父节点,与父节点进行比较,发现 2 > 1,不再跟父节点交换位置,新节点停留在下标为1的位置,调整完后堆就是 [1, 2, 3, 4, 5, 6, 7, 8]。

在这里插入图片描述
以上就是新节点加入小顶堆的过程。我们再来看一个从堆中弹出节点的例子。

还是上面的堆结构,假如我们要从堆中弹出堆顶节点,也就是值为1的节点,那么在把堆顶节点1作为结果返回之前,会进行堆调整使得堆再次符合小顶堆的规则。

首先把堆底节点节点8提到堆顶,然后数组长度减1。

在这里插入图片描述
此时节点8来到下标0的位置,也就是堆顶,开始做向下调整。根据下标换算找到子节点2和3,它要和最小的子节点PK,也就是跟2PK,发现比2大那么8和2交换位置,把2提上来。

在这里插入图片描述
进入下一轮,此时8来到了下标1的位置,通过下标换算找到两个子节点4和5,与最小的子节点4进行PK,发现比4大,继续交换位置。

在这里插入图片描述

此时节点8已经来到堆底了,那么堆的向下调整结束。

在这里插入图片描述

可以发现,通过堆调整,小顶堆总是可以保证堆顶元素最小,并且数值越小的元素,就越靠近堆顶,会越先被取走。如果堆中的节点是定时任务,那么图中节点的数值就是到期时间,小顶堆总是可以保证最快到期的任务总是在堆顶,我们取任务时直接取堆顶任务,判断是否到期即可。

了解了小顶堆的原理,我们就可以来看DelayedWorkQueue的源码了。

DelayedWorkQueue成员变量

    static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {private static final int INITIAL_CAPACITY = 16;private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private final ReentrantLock lock = new ReentrantLock();private int size = 0;private Thread leader = null;private final Condition available = lock.newCondition();}        

queue 是用于存放 ScheduledFutureTask 的堆数组,初始化容量是16,RunnableScheduledFuture是ScheduledFutureTask 的父类。

lock是ReentrantLock可重入锁对象,在向DelayedWorkQueue添加任务元素和获取任务时需要先加锁。

size是当前堆大小,也就是堆中有多少个节点。这个跟数组中的元素个数不一样,数组中的元素个数有可能比size多,但是数组中有可能有一部分是无效元素。

leader线程是等待堆顶任务到期取走的线程。

available是一个Condition条件队列,用于存放排队等待获取任务的线程,非leader线程会在available中一直等待直到被唤醒,leader线程会在available中等待堆顶任务的到期时间(带时长的等待,到期自动唤醒)。

在这里插入图片描述

void add(Runnable e)

接下来看一下add方法,add方法是把一个Runnable对象放入到队列中(这里的Runnable就是ScheduledFutureTask,ScheduledFutureTask 间接继承了Runnable接口)。

        public boolean add(Runnable e) {return offer(e);}

add方法调用了offer方法。

        public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;// 1.先上锁final ReentrantLock lock = this.lock;lock.lock();try {int i = size;// 2.判断是否需要进行数组扩容,如果需要则调用grow方法进行扩容if (i >= queue.length)grow();size = i + 1;// 3.把任务放入堆中if (i == 0) {// 堆中没有任务,则直接放在堆顶,不用做调整queue[0] = e;setIndex(e, 0);} else {// 把任务放入堆中,然后从底向上做调整siftUp(i, e);}// 4.如果堆顶任务是刚刚入队列的任务,那么重置leader,唤醒available中的线程去竞争leaderif (queue[0] == e) {leader = null;available.signal();}} finally {// 5.释放锁lock.unlock();}return true;}

一共分5步:

  1. 先获取ReentrantLock锁,获取到ReentrantLock锁的才能往下进行,获取不到锁的线程要在AQS的同步队列中阻塞等待。
  2. 判断是否需要进行数组扩容,如果需要则调用grow方法进行扩容。
  3. 把任务放入堆中。这里如果判断堆中没有任务,那么直接放到堆顶就可以,没有必要做堆调整;否则就要把任务放到堆底,然后从底往上做堆调整。
  4. 如果发现堆顶任务变成了刚刚添加的任务,那么就要重置leader,唤醒available中的一个线程去竞争leader。因为原来的leader等待的任务已被拉下去了,不再是堆顶任务,自然等待这个任务到期的线程也不能继续当leader。
  5. 最后就是释放锁。

在这里插入图片描述

void grow()

grow()是堆数组进行扩容的方法,当数组中元素的个数已满时,再往堆中添加任务,会调用该方法进行扩容。

        private void grow() {int oldCapacity = queue.length;int newCapacity = oldCapacity + (oldCapacity >> 1);if (newCapacity < 0) // overflownewCapacity = Integer.MAX_VALUE;queue = Arrays.copyOf(queue, newCapacity);}

oldCapacity是老数组的长度,oldCapacity + (oldCapacity >> 1) 就是 oldCapacity的1.5倍,得到新数组长度,然后把老数组的元素复制到新数组。

在这里插入图片描述

void siftUp(int k, RunnableScheduledFuture<?> key)

然后再看一下堆向上调整的方法。

        private void siftUp(int k, RunnableScheduledFuture<?> key) {while (k > 0) {int parent = (k - 1) >>> 1;RunnableScheduledFuture<?> e = queue[parent];if (key.compareTo(e) >= 0)break;queue[k] = e;setIndex(e, k);k = parent;}queue[k] = key;setIndex(key, k);}

k是新任务当前所处的位置,key就是新加入的任务。

while (k > 0)表示一直循环向上调整,直到来到堆顶就停止,或者中途break掉。

int parent = (k - 1) >>> 1; 就是找到当前位置k的父节点的位置,(k - 1) >>> 1 就是 (k - 1) / 2,只不过这里用了位移运算加速。

RunnableScheduledFuture<?> e = queue[parent]; 就是根据父任务下标parent,获取到父任务e。

if (key.compareTo(e) >= 0) break; 这一行的意思就是如果当前任务的过期时间比父任务大,那么调整结束,新任务就放在位置k。因为 key.compareTo(e) 返回大于等于0的数值,表示新任务key的过期时间比父任务e的过期时间大,而当前又是小顶堆,过期时间越小的越靠近堆顶,所以新任务key只能放在父任务e的下面。

queue[k] = e; 就是上面的if分支没有进去,代表新任务key的过期时间比父任务e的过期时间要小,那么新任务key是应该要处于比父任务e更靠上的位置的,因此这里先把父任务e拉下来,也就是是移到新任务的当前位置k,然后新任务key继续向上跟更靠上的父任务做比较。

setIndex(e, k); 就是更新父任务在堆中的位置,

k = parent; 就是更新新任务的在数组中的位置,方便做下一轮的比较。

当跳出while循环后,k的值就是新任务在数组中的位置,queue[k] = key; 就是把新任务放到它该放的位置。

最后 setIndex(key, k); 更新新任务在堆中的位置。

在这里插入图片描述

RunnableScheduledFuture<?> take()

看完add方法,我们再来看take方法。take方法是从队列中获取任务的方法,该方法会阻塞当前线程直到获取到任务为止。

        public RunnableScheduledFuture<?> take() throws InterruptedException {// 1.先获取锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];// 2.如果堆顶任务为空,在available中等待if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);// 3.如果堆顶任务已经到期了,取走,取走前要调做堆调整if (delay <= 0)return finishPoll(first);first = null;// 4.如果此时leader线程不为null,要在available中等待if (leader != null)available.await();else {// 5.设置当前线程为leader线程,然后在available等待堆顶任务到期,醒来后置空leaderThread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {// 6.leader为空,堆中还有任务,唤醒available中等待的线程if (leader == null && queue[0] != null)available.signal();// 7.释放锁lock.unlock();}}

总共分7步:

  1. 还是先获取锁,防止并发问题。
  2. 获取锁成功后,就进入一个for循环自旋直到取到任务。如果检查到如果堆顶任务为空,在available中等待。
  3. 堆顶任务不为空,并且堆顶任务已经到期了,那么就取走堆顶任务并执行,但是在取走前要调做堆调整,finishPoll(first)就是进行堆调整的方法。
  4. 堆顶任务没到期,并且此时leader线程不为null,当前线程就要在available中等待。
  5. 堆顶任务未到期,并且此时leader线程为null,那么就设置当前线程为leader线程,然后在available进行带超时时间的等待,等待时间就是堆顶任务的到期时间。到期醒来后,会设置leader为null,然后就是进入下一轮循环时取走堆顶任务。
  6. 最后退出方法前,检查到leader为null,并且堆中还有任务,那么唤醒available中的一个线程。

在这里插入图片描述

RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f)

finishPoll方法做的事情就是在取走堆顶任务之前,对小顶堆做调整,然后才把堆顶任务返回。

        private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {int s = --size;RunnableScheduledFuture<?> x = queue[s];queue[s] = null;if (s != 0)siftDown(0, x);setIndex(f, -1);return f;}

int s = --size; 是堆元素个数减1。

RunnableScheduledFuture<?> x = queue[s]; 获取到的是堆底任务,准备用这个任务来做堆调整。

queue[s] = null; 把堆底置空。

siftDown(0, x); 真正做堆调整的方法,从堆顶(数组下标为0)位置向下做堆调整。

setIndex(f, -1); 设置原先的堆顶任务(也就是即将要作为结果返回的任务)的位置为-1,表示该任务已被取走。

return f; 返回原先的堆顶任务,堆顶任务是作为参数传进来的,原封不动返回出去。

在这里插入图片描述

void siftDown(int k, RunnableScheduledFuture<?> key)

我们再来看一下真正做堆调整的siftDown方法,siftDown方法是拿着原先的堆底任务作为当前任务去做堆的向下调整,它跟我们上面描述的从顶向下做堆调整的流程是一致的。

        private void siftDown(int k, RunnableScheduledFuture<?> key) {int half = size >>> 1;while (k < half) {int child = (k << 1) + 1;RunnableScheduledFuture<?> c = queue[child];int right = child + 1;if (right < size && c.compareTo(queue[right]) > 0)c = queue[child = right];if (key.compareTo(c) <= 0)break;queue[k] = c;setIndex(c, k);k = child;}queue[k] = key;setIndex(key, k);}

int half = size >>> 1; 取到的是堆大小一半的位置half,然后 while (k < half) 循环直到当前位置k过了half 就停止。因为过了位置half就是小顶堆的最底一层的位置,没必要再往下了。

然后再下面 int child = (k << 1) + 1; 就是获取左孩子的位置,也就是 (k * 2)+ 1,然后int right = child + 1;就是右孩子的位置。

if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; c是最终要和当前父节点PK的子任务,这里就是两个子任务PK一下,看谁胜出就拿谁去跟父节点PK。

if (key.compareTo© <= 0) break; 就是如果父节点的任务到期时间比子任务小,那么父节点就放在当前位置k,无需再往下调整。

queue[k] = c; 就是把子节点往上提到当前位置k,因为执行到这里代表当前父节点没有PK过子节点,所以自然要把子节点往上提。

setIndex(c, k); 更新子节点在堆数组的位置记录。

k = child; 当前位置k更新为子节点原先的位置,方便做下一轮调整。

queue[k] = key; 就是在while循环结束后,得到了当前任务该放的位置k,把当前任务放在位置k。

setIndex(key, k); 最后更新当前任务在堆数组的位置记录。

在这里插入图片描述

总结

到这里,整个ScheduledThreadPoolExecutor的原理和源码都分析完毕了。

总体逻辑:

  1. 用了一个延时队列DelayedWorkQueue去存放任务,延时队列又使用一个小顶堆去存放任务,小顶堆中的任务ScheduledFutureTask会按照到期时间time从小到大进行排序。
  2. 当我们提交定时任务到ScheduledThreadPoolExecutor时,任务会被放入小顶堆中,然后会从底往上做堆调整。
  3. 任务放入堆中之后,检查是否有必要创建新线程,如果有必要则创建新线程。
  4. ScheduledThreadPoolExecutor中的线程会不停的循环从DelayedWorkQueue中获取任务并执行。
  5. 线程从DelayedWorkQueue中获取任务时,并不是想取就能取,而是要当上了leader才能取。
  6. leader线程每次取任务都是从堆顶取走一个,而且要等待堆顶任务到期才能取走,堆顶任务还没到期前,leader线程会在available条件队列中等待一定的时间后自动唤醒,其他非leader线程就要在available条件队列中等待leader线程执行完任务唤醒。
  7. 每次取走一个任务后,会拿到堆底任务,从堆顶向下做堆调整。
  8. 定时任务执行完成后,会计算并更新下一次的执行时间,然后重写放回到队列中。

在这里插入图片描述

本篇文章到此全部结束。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/189717.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于机器深度学习的交通标志目标识别

在线工具推荐&#xff1a; 三维数字孪生场景工具 - GLTF/GLB在线编辑器 - Three.js AI自动纹理化开发 - YOLO 虚幻合成数据生成器 - 3D模型在线转换 - 3D模型预览图生成服务 智能交通系统&#xff08;ITS&#xff09;&#xff0c;包括无人驾驶车辆&#xff0c;尽管在道路…

安装两个WIN10/WIN11系统到两个盘中,第二个系统依赖原系统盘引导的问题

前段时间折腾装一个双系统&#xff0c;主要两个方面考虑&#xff1a; 1. 原来的系统又许多软件&#xff0c;想着先保留&#xff1b; 2. 系统想安装到一个固态硬盘中&#xff1b; 在安装的过程中遇到了一些问题&#xff0c;这里记录分享一下。 问题1&#xff0c;运行系统自动安装…

Langchain-Chatchat学习

参考&#xff1a;Langchain-Chatchat 阿里通义千问Qwen 保姆级教程 | 次世代知识管理解决方案 - 知乎 (zhihu.com) 该文档没有安装成功&#xff0c;安装成功的文档 可见&#xff1a;Langchain-Chatchat的安装过程-CSDN博客 中文LLM生态观察 模型 就开源的部分而言&#xf…

Servlet概念视频笔记

学习地址:121-尚硅谷-Servlet-什么是Servlet_哔哩哔哩_bilibili 目录 1.Servlet技术 a.什么是Servlet b.手动实现Servlet程序 c.url地址如何定位到Servlet程序去访问 d.Servlet的生命周期 e.GET 和 POST 请求的分发处理 f.通过继承 HttpServlet 实现 Servlet程序 g.使用…

如何在财税行业查找批量客户?

现在市场上代记账公司也不算少&#xff0c;做过这行的都知道&#xff0c;最初呢行业竞争不强&#xff0c;都是靠地推、老客户转介绍&#xff0c;或者长期以往的蹲守各个地区的工商注册服务中心&#xff0c;找那些才注册企业的老板或者创业者。但是&#xff0c;随着市场经济的发…

Python+Requests模块_设置代理、超时设置、重定向设置

设置代理 代理&#xff08;英语&#xff1a;Proxy&#xff09;&#xff0c;也称网络代理&#xff0c;是一种特殊的网络服务&#xff0c;英文全称是&#xff08;Proxy Server&#xff09;&#xff0c;其功 能就是代理网络用户去取得网络信息。形象的说&#xff1a;它是网络信息…

[每周一更]-(第75期):Go相关粗浅的防破解方案

Go作为编译语言&#xff0c;天然存在跨平台的属性&#xff0c;我们在编译完成后&#xff0c;可以再不暴露源代码的情况下&#xff0c;运行在对应的平台中&#xff0c;但是 还是架不住有逆向工程师的反编译、反汇编的情形&#xff1b;&#xff08;当然我们写的都不希望被别人偷了…

国内高速下载huggingface上的模型

目录 前言 modelscope huggingface安装 Windows设置环境变量 Linux设置环境变量 设置国内镜像 Windows&#xff08;cmd.exe&#xff09; 当前窗口有效 永久生效 Linux 当前窗口有效 永久生效 下载模型 前言 国内优先使用modelscope&#xff0c;hugging face镜像站下载…

MySQL字符函数

在数据库中&#xff0c;字符函数是一组用于处理字符串的函数。这些函数可以帮助我们执行各种操作&#xff0c;如连接、比较、替换等。本文将介绍一些常用的MySQL字符函数&#xff0c;并演示如何在查询中使用它们。 1.concat() 函数 CONCAT() 函数用于连接两个或多个字符串。它…

【C/PTA —— 13.指针2(课内实践)】

C/PTA —— 13.指针2&#xff08;课内实践&#xff09; 一.函数题6-1使用函数实现字符串部分复制6-2 拆分实数的整数部分和小数部分6-3 存在感 二.编程题7-1 单词反转 一.函数题 6-1使用函数实现字符串部分复制 void strmcpy(char* t, int m, char* s) {int len 0;char* ret …

【C/PTA —— 13.指针2(课外实践)】

C/PTA —— 13.指针2&#xff08;课外实践&#xff09; 一.函数题6-1 鸡兔同笼问题6-2 冒泡排序6-3 字符串反正序连接6-4 计算最长的字符串长度6-5 查找星期 二.编程题7-1 C程序设计 实验5-7 数组指针作函数参数7-2 查找奥运五环色的位置 一.函数题 6-1 鸡兔同笼问题 int Chic…

CSS新手入门笔记整理:CSS图片样式

图片大小 语法 width:像素值; height:像素值; 图片边框&#xff1a;border 语法 边框&#xff1a;宽度值 样式值 颜色值&#xff1b; border:1px solid red; 图片对齐 水平对齐&#xff1a;text-align 语法 text-align:取值; 属性值 说明 left 左对齐(默认值) cent…

csp 现值计算 C语言

号&#xff1a; 202212-1 试题名称&#xff1a; 现值计算 时间限制&#xff1a; 1.0s 内存限制&#xff1a; 512.0MB 问题描述&#xff1a; 问题描述 评估一个长期项目的投资收益&#xff0c;资金的时间价值是一个必须要考虑到的因素。简单来说&#xff0c;假设…

Unittest(1):unittest单元测试框架简介setup前置初始化和teardown后置操作

unittest单元测试框架简介 unittest是python内置的单元测试框架&#xff0c;具备编写用例、组 织用例、执行用例、功能&#xff0c;可以结合selenium进行UI自动化测 试&#xff0c;也可以结合appium、requests等模块做其它自动化测试 官方文档&#xff1a;https://docs.pytho…

JS逆向-mytoken之code参数

前言 本文是该专栏的第60篇,后面会持续分享python爬虫干货知识,记得关注。 本文以mytoken为例,通过js逆向获取其code参数的生成规律。具体的“逆向”思路逻辑,笔者将会详细介绍每个步骤,并且将在正文结合“完整代码”进行详细说明。 接下来,跟着笔者直接往下看正文详细…

OpenOffice 4.1.14的安装以及与数据库进行连接

起因&#xff1a;因为MS Office的Access只能和自家的数据库连接&#xff0c;感觉不太舒服&#xff0c;因此尝试使用Openoffice组件中的Base进行替换。这里记录一下从安装到进行数据库连接的过程。 1.下载地址 https://www.openoffice.org/download/index.html 我这里是Debian1…

(C++)三数之和--双指针法

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 算法原理 双指针法&#xff0c;不一定是说就要使用指针&#xff0c;只是一种形象的说法&#xff0c;在数组中&#xff0c;我们一般将数组下标当做指针。我们首先对数组进行排序&#xff0c;从左向右标定一个下标i&#xff0…

CentOS7根分区扩容之二

Centos根分区快接近100%&#xff0c;如果根分区是逻辑卷&#xff0c;那么可以增加额外的磁盘&#xff0c;通过逻辑卷扩容的方式增加到根分区空间。 1.检查当前根分区大小 df -Th2.检查额外的磁盘 3.把磁盘格式化为lvm类型的文件分区。 [rootlocalhost ~]# fdisk /dev/sdb We…

数据结构:带头双向循环链表的实现

引言 单链表存在缺陷&#xff1a;需要从头开始找前一个节点 解决方法&#xff1a;双向链表 链表的结构&#xff08;8种&#xff09;&#xff1a; 1. 单向&#xff0c;双向 2. 带头、不带头 带头即为带哨兵位的头节点&#xff0c;第一个节点不存储有效数据。带头节点&#…

leetcode刷题详解—— 环形子数组的最大和

1. 题目链接&#xff1a;918. 环形子数组的最大和 2. 题目描述&#xff1a; 给定一个长度为 n 的环形整数数组 nums &#xff0c;返回 nums 的非空 子数组 的最大可能和 。 环形数组 意味着数组的末端将会与开头相连呈环状。形式上&#xff0c; nums[i] 的下一个元素是 nums[(…