归档
使用示例
https://github.com/zengxf/small-frame-demo/blob/master/jdk-demo/simple-demo/src/main/java/test/jdkapi/juc/thread_pool/TestSchedule.java
JDK 版本
openjdk version "17" 2021 - 09 - 14
OpenJDK Runtime Environment ( build 17 + 35 - 2724 )
OpenJDK 64 - Bit Server VM ( build 17 + 35 - 2724 , mixed mode, sharing)
原理
类结构
java.util.concurrent.ScheduledThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService
{ public ScheduledThreadPoolExecutor ( int corePoolSize) { super ( corePoolSize, Integer . MAX_VALUE , DEFAULT_KEEPALIVE_MILLIS , MILLISECONDS , new DelayedWorkQueue ( ) ) ; } }
java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue
static class DelayedWorkQueue extends AbstractQueue < Runnable > implements BlockingQueue < Runnable > { private static final int INITIAL_CAPACITY = 16 ; private RunnableScheduledFuture < ? > [ ] queue = new RunnableScheduledFuture < ? > [ INITIAL_CAPACITY ] ; private final ReentrantLock lock = new ReentrantLock ( ) ; private final Condition available = lock. newCondition ( ) ; }
java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask
private class ScheduledFutureTask < V > extends FutureTask < V > implements RunnableScheduledFuture < V > { private final long sequenceNumber; private volatile long time; private final long period; ScheduledFutureTask ( Runnable r, V result, long triggerTime, long period, long sequenceNumber) { super ( r, result) ; this . time = triggerTime; this . period = period; this . sequenceNumber = sequenceNumber; } }
java.util.concurrent.FutureTask
public class FutureTask < V > implements RunnableFuture < V > { private volatile int state; private Callable < V > callable; public FutureTask ( Runnable runnable, V result) { this . callable = Executors . callable ( runnable, result) ; this . state = NEW ; }
}
public interface RunnableScheduledFuture < V > extends RunnableFuture < V > , ScheduledFuture < V > { boolean isPeriodic ( ) ;
}
public interface ScheduledFuture < V > extends Delayed , Future < V > {
}
public interface Delayed extends Comparable < Delayed > { long getDelay ( TimeUnit unit) ;
}
java.util.concurrent.ThreadPoolExecutor.Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; Worker ( Runnable firstTask) { setState ( - 1 ) ; this . firstTask = firstTask; this . thread = getThreadFactory ( ) . newThread ( this ) ; } }
调用链
java.util.concurrent.ScheduledThreadPoolExecutor
public ScheduledFuture < ? > scheduleAtFixedRate ( Runnable command, long initialDelay, long period, TimeUnit unit) { . . . ScheduledFutureTask < Void > sft = new ScheduledFutureTask < Void > ( command, null , triggerTime ( initialDelay, unit) , unit. toNanos ( period) , sequencer. getAndIncrement ( ) ) ; RunnableScheduledFuture < Void > t = decorateTask ( command, sft) ; sft. outerTask = t; delayedExecute ( t) ; return t; } private void delayedExecute ( RunnableScheduledFuture < ? > task) { if ( isShutdown ( ) ) reject ( task) ; else { super . getQueue ( ) . add ( task) ; if ( ! canRunInCurrentRunState ( task) && remove ( task) ) task. cancel ( false ) ; else ensurePrestart ( ) ; } }
java.util.concurrent.ThreadPoolExecutor
void ensurePrestart ( ) { int wc = workerCountOf ( ctl. get ( ) ) ; if ( wc < corePoolSize) addWorker ( null , true ) ; else if ( wc == 0 ) addWorker ( null , false ) ; } private boolean addWorker ( Runnable firstTask, boolean core) { . . . boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker ( firstTask) ; final Thread t = w. thread; if ( t != null ) { . . . . . . workers. add ( w) ; . . . t. start ( ) ; . . . } } . . . return workerStarted; } final void runWorker ( Worker w) { Thread wt = Thread . currentThread ( ) ; Runnable task = w. firstTask; . . . try { while ( task != null || ( task = getTask ( ) ) != null ) { w. lock ( ) ; . . . try { beforeExecute ( wt, task) ; try { task. run ( ) ; afterExecute ( task, null ) ; } . . . } finally { task = null ; w. completedTasks++ ; w. unlock ( ) ; } } . . . } . . . } private Runnable getTask ( ) { . . . for ( ; ; ) { . . . try { Runnable r = timed ? workQueue. poll ( keepAliveTime, TimeUnit . NANOSECONDS ) : workQueue. take ( ) ; if ( r != null ) return r; . . . } . . . } }
java.util.concurrent.ThreadPoolExecutor.Worker
public void run ( ) { runWorker ( this ) ; }
java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask
public void run ( ) { if ( ! canRunInCurrentRunState ( this ) ) cancel ( false ) ; else if ( ! isPeriodic ( ) ) super . run ( ) ; else if ( super . runAndReset ( ) ) { setNextRunTime ( ) ; reExecutePeriodic ( outerTask) ; } }
java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue
public RunnableScheduledFuture < ? > take ( ) throws InterruptedException { final ReentrantLock lock = this . lock; lock. lockInterruptibly ( ) ; try { for ( ; ; ) { RunnableScheduledFuture < ? > first = queue[ 0 ] ; if ( first == null ) available. await ( ) ; else { long delay = first. getDelay ( NANOSECONDS ) ; if ( delay <= 0L ) return finishPoll ( first) ; first = null ; if ( leader != null ) available. await ( ) ; else { Thread thisThread = Thread . currentThread ( ) ; leader = thisThread; try { available. awaitNanos ( delay) ; } finally { if ( leader == thisThread) leader = null ; } } } } } finally { if ( leader == null && queue[ 0 ] != null ) available. signal ( ) ; lock. unlock ( ) ; } }
总结
线程等待是在队列 take()
方法中处理 等待时延是由 ScheduledFutureTask #getDelay()
进行判断 而不是通过 DelayQueue
实现,但底层原理一样 重复延时执行的任务,每次执行完,会重新添加到队列中