简介
ScheduledThreadPoolExecutor 是 Java 中的一个类,它属于 java.util.concurrent 包。这个类是一个线程池,用于在给定的延迟后运行命令,或者定期地执行命令。它是 ThreadPoolExecutor 的一个子类,专门用于处理需要定时或周期性执行的任务。
ScheduledThreadPoolExecutor 的主要特点如下:
- 线程池大小固定:线程池的大小在创建时就已经设定,并且之后不会改变。
- 任务调度:它可以用来调度一次性任务,也可以用来调度重复执行的任务。
- 延迟执行:任务可以在给定的延迟之后开始执行。
- 周期性执行:任务可以周期性地执行,例如每隔固定时间执行一次。
ScheduledThreadPoolExecutor 提供的主要方法包括:
- schedule(Runnable command, long delay, TimeUnit unit): 在给定的延迟后执行一次任务。
- scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit): 以固定的速率周期性地执行任务。
- scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit): 在给定的初始延迟后首次执行任务,然后每次任务执行完毕之后等待指定的延迟再次执行。
源码
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {//任务或单位为空 则抛出异常if (command == null || unit == null)throw new NullPointerException();//把要执行的任务包装成 RunnableScheduledFutureRunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));//延时执行定时任务delayedExecute(t);return t;}//逻辑同上public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit) {if (callable == null || unit == null)throw new NullPointerException();RunnableScheduledFuture<V> t = decorateTask(callable,new ScheduledFutureTask<V>(callable,triggerTime(delay, unit)));delayedExecute(t);return t;}public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {//任务和单位为空则抛出异常if (command == null || unit == null)throw new NullPointerException();if (period <= 0)//执行周期时间小于等于0 则抛出异常throw new IllegalArgumentException();//将任务包装成ScheduledFutureTaskScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));//然后包装成RunnableScheduledFutureRunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;//延时执行任务delayedExecute(t);return t;}//同上public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}//所谓的包装就是直接返回RunnableScheduledFuture对象protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {return task;}//从scheduleWithFixedDelay方法的源代码,我们可以看出在将Runnable对象封装成ScheduledFutureTask时,设置了执行周期,//但是此时设置的执行周期与scheduleAtFixedRate方法设置的执行周期不同。//此时设置的执行周期规则为:下一次任务执行的时间是上一次任务完成的时间加上delay时长,时长单位由TimeUnit决定。//也就是说,具体的执行时间不是固定的,但是执行的周期是固定的,整体采用的是相对固定的延迟来执行定时任务public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {//传入的Runnable对象和TimeUnit为空,则抛出空指针异常if (command == null || unit == null)throw new NullPointerException();//任务延时时长小于或者等于0,则抛出非法参数异常if (delay <= 0)throw new IllegalArgumentException();//将Runnable对象封装成ScheduledFutureTask任务//并设置固定的执行周期来执行任务ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));//调用decorateTask方法,本质上直接返回ScheduledFutureTask任务RunnableScheduledFuture<Void> t = decorateTask(command, sft);//设置执行的任务sft.outerTask = t;//执行延时任务delayedExecute(t);return t;}private void setNextRunTime() {//距离下次执行任务的时长long p = period;//固定频率执行,//上次执行任务的时间//加上任务的执行周期if (p > 0)time += p;//相对固定的延迟//使用的是系统当前时间//加上任务的执行周期elsetime = triggerTime(-p);}//这两个triggerTime方法的代码比较简单,就是获取下一次执行任务的具体时间。//有一点需要注意的是:delay <(Long.MAX_VALUE >> 1判断delay的值是否小于Long.MAX_VALUE的一半,//如果小于Long.MAX_VALUE值的一半,则直接返回delay,否则需要处理溢出的情况。private long triggerTime(long delay, TimeUnit unit) {return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));}long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));}private long overflowFree(long delay) {//获取队列中的节点Delayed head = (Delayed) super.getQueue().peek();//获取的节点不为空,则进行后续处理if (head != null) {//从队列节点中获取延迟时间long headDelay = head.getDelay(NANOSECONDS);//如果从队列中获取的延迟时间小于0,并且传递的delay//值减去从队列节点中获取延迟时间小于0if (headDelay < 0 && (delay - headDelay < 0))//将delay的值设置为Long.MAX_VALUE + headDelaydelay = Long.MAX_VALUE + headDelay;}//返回延迟时间return delay;}private void delayedExecute(RunnableScheduledFuture<?> task) {//如果当前线程池已经关闭//则执行线程池的拒绝策略if (isShutdown())reject(task);//线程池没有关闭else {//将任务添加到阻塞队列中super.getQueue().add(task);//如果当前线程池是SHUTDOWN状态//并且当前线程池状态下不能执行任务//并且成功从阻塞队列中移除任务if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))//取消任务的执行,但不会中断执行中的任务task.cancel(false);else//调用ThreadPoolExecutor类中的ensurePrestart()方法ensurePrestart();}}void reExecutePeriodic(RunnableScheduledFuture<?> task) {//线程池当前状态下能够执行任务if (canRunInCurrentRunState(true)) {//将任务放入队列super.getQueue().add(task);//线程池当前状态下不能执行任务,并且成功移除任务if (!canRunInCurrentRunState(true) && remove(task))//取消任务task.cancel(false);else//调用ThreadPoolExecutor类的ensurePrestart()方法ensurePrestart();}}//ScheduledThreadPoolExecutor类中的onShutdown方法的主要逻辑就是先判断线程池调用shutdown方法后,是否继续执行现有的延迟任务和定时任务,//如果不再执行,则取消任务并清空队列;如果继续执行,将队列中的任务强转为RunnableScheduledFuture对象之后,从队列中删除并取消任务。//大家需要好好理解这两种处理方式。最后调用ThreadPoolExecutor类的tryTerminate方法。@Overridevoid onShutdown() {//获取队列BlockingQueue<Runnable> q = super.getQueue();//在线程池已经调用shutdown方法后,是否继续执行现有延迟任务boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();//在线程池已经调用shutdown方法后,是否继续执行现有定时任务boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();//在线程池已经调用shutdown方法后,不继续执行现有延迟任务和定时任务if (!keepDelayed && !keepPeriodic) {//遍历队列中的所有任务for (Object e : q.toArray())//取消任务的执行if (e instanceof RunnableScheduledFuture<?>)((RunnableScheduledFuture<?>) e).cancel(false);//清空队列q.clear();}//在线程池已经调用shutdown方法后,继续执行现有延迟任务和定时任务else {//遍历队列中的所有任务for (Object e : q.toArray()) {//当前任务是RunnableScheduledFuture类型if (e instanceof RunnableScheduledFuture) {//将任务强转为RunnableScheduledFuture类型RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>) e;//在线程池调用shutdown方法后不继续的延迟任务或周期任务//则从队列中删除并取消任务if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||t.isCancelled()) {if (q.remove(t))t.cancel(false);}}}}//最终调用tryTerminate()方法tryTerminate();}}
示例
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class ScheduledThreadPoolExecutorExample { public static void main(String[] args) { // 创建一个包含单个线程的ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor executor = Executors.newScheduledThreadPool(1); // 创建一个Runnable任务 Runnable task = () -> System.out.println("Task is running: " + System.currentTimeMillis()); // 在10秒后执行该任务 executor.schedule(task, 10, TimeUnit.SECONDS); // 每隔5秒执行一次该任务 executor.scheduleAtFixedRate(task, 0, 5, TimeUnit.SECONDS); // 在初始延迟15秒后执行,之后每次任务执行完等待2秒再次执行 executor.scheduleWithFixedDelay(task, 15, 2, TimeUnit.SECONDS); // 注意:在实际应用中,你应该适当地关闭线程池以避免资源泄露 // executor.shutdown(); // 这将平滑地关闭线程池,等待所有任务完成 }
}
在这个例子中,我们创建了一个 ScheduledThreadPoolExecutor 实例,它包含单个线程。我们定义了三个不同的任务调度:一个在10秒后执行,一个每隔5秒执行一次,还有一个在初始延迟15秒后执行,然后每次任务执行完毕之后等待2秒再次执行。
请注意,在实际应用中,当你不再需要 ScheduledThreadPoolExecutor 时,应该调用 shutdown 或 shutdownNow 方法来关闭线程池,以避免资源泄露。同时,线程池中的任务也应该设计为能够在适当的时候结束执行,避免无限期地占用资源。