归档
JDK 版本
openjdk version "17.0.12" 2024 - 07 - 16
OpenJDK Runtime Environment Temurin- 17.0 .12 + 7 ( build 17.0 .12 + 7 )
OpenJDK 64 - Bit Server VM Temurin- 17.0 .12 + 7 ( build 17.0 .12 + 7 , mixed mode, sharing)
测试
@Slf4j
public class MixTest { @Test public void testFJ ( ) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool ( ) ; CountTask task = new CountTask ( 1 , 10 ) ; Future < Integer > result = forkJoinPool. submit ( task) ; System . out. println ( "结果:" + result. get ( ) ) ; } public static class CountTask extends RecursiveTask < Integer > { private static final int THRESHOLD = 2 ; private final int start; private final int end; public CountTask ( int start, int end) { this . start = start; this . end = end; } @Override public Integer compute ( ) { int sum = 0 ; try { Thread . sleep ( 200L ) ; System . out. printf ( "cur-thread: [%s].%n" , Thread . currentThread ( ) . getName ( ) ) ; } catch ( InterruptedException e) { } boolean canCompute = ( end - start) <= THRESHOLD ; if ( canCompute) { for ( int i = start; i <= end; i++ ) { sum += i; } } else { int middle = ( start + end) / 2 ; CountTask leftTask = new CountTask ( start, middle) ; CountTask rightTask = new CountTask ( middle + 1 , end) ; leftTask. fork ( ) ; rightTask. fork ( ) ; int leftResult = leftTask. join ( ) ; int rightResult = rightTask. join ( ) ; sum = leftResult + rightResult; } return sum; } }
}
原理
类结构
java.util.concurrent.ForkJoinPool
public class ForkJoinPool extends AbstractExecutorService { WorkQueue [ ] queues; final ForkJoinWorkerThreadFactory factory; final UncaughtExceptionHandler ueh; final String workerNamePrefix;
}
java.util.concurrent.ForkJoinPool.WorkQueue
static final class WorkQueue { ForkJoinTask < ? > [ ] array; final ForkJoinWorkerThread owner; }
java.util.concurrent.ForkJoinWorkerThread
public class ForkJoinWorkerThread extends Thread { final ForkJoinPool pool; final ForkJoinPool. WorkQueue workQueue;
}
初始化
java.util.concurrent.ForkJoinPool
public class ForkJoinPool extends AbstractExecutorService { public ForkJoinPool ( ) { this ( Math . min ( MAX_CAP , Runtime . getRuntime ( ) . availableProcessors ( ) ) , defaultForkJoinWorkerThreadFactory, null , false , 0 , MAX_CAP , 1 , null , DEFAULT_KEEPALIVE , TimeUnit . MILLISECONDS ) ; } public ForkJoinPool ( int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, . . . , long keepAliveTime, TimeUnit unit) { . . . int p = parallelism; this . factory = factory; this . ueh = handler; this . keepAlive = Math . max ( unit. toMillis ( keepAliveTime) , TIMEOUT_SLOP ) ; int size = 1 << ( 33 - Integer . numberOfLeadingZeros ( p - 1 ) ) ; . . . this . registrationLock = new ReentrantLock ( ) ; this . queues = new WorkQueue [ size] ; String pid = Integer . toString ( getAndAddPoolIds ( 1 ) + 1 ) ; this . workerNamePrefix = "ForkJoinPool-" + pid + "-worker-" ; } static { . . . defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory ( ) ; . . . ForkJoinPool tmp = . . . new ForkJoinPool ( ( byte ) 0 ) ; common = tmp; }
}
java.util.concurrent.ForkJoinPool.DefaultForkJoinWorkerThreadFactory
static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread ( ForkJoinPool pool) { return . . . new ForkJoinWorkerThread ( null , pool, true , false ) ; } }
java.util.concurrent.ForkJoinWorkerThread
public class ForkJoinWorkerThread extends Thread { ForkJoinWorkerThread ( ThreadGroup group, ForkJoinPool pool, boolean useSystemClassLoader, boolean isInnocuous) { super ( group, null , pool. nextWorkerThreadName ( ) , 0L ) ; UncaughtExceptionHandler handler = ( this . pool = pool) . ueh; this . workQueue = new ForkJoinPool. WorkQueue ( this , isInnocuous) ; super . setDaemon ( true ) ; if ( handler != null ) super . setUncaughtExceptionHandler ( handler) ; if ( useSystemClassLoader) super . setContextClassLoader ( ClassLoader . getSystemClassLoader ( ) ) ; }
}
启动线程
java.util.concurrent.ForkJoinPool
public class ForkJoinPool extends AbstractExecutorService { public < T > ForkJoinTask < T > submit ( ForkJoinTask < T > task) { return externalSubmit ( task) ; } private < T > ForkJoinTask < T > externalSubmit ( ForkJoinTask < T > task) { Thread t; ForkJoinWorkerThread wt; WorkQueue q; . . . if ( ( ( t = Thread . currentThread ( ) ) instanceof ForkJoinWorkerThread ) && ( q = ( wt = ( ForkJoinWorkerThread ) t) . workQueue) != null && wt. pool == this ) q. push ( task, this ) ; else externalPush ( task) ; return task; } final void externalPush ( ForkJoinTask < ? > task) { WorkQueue q; if ( ( q = submissionQueue ( ) ) == null ) throw . . . ; else if ( q. lockedPush ( task) ) signalWork ( ) ; } final void signalWork ( ) { for ( long c = ctl; c < 0L ; ) { int sp, i; WorkQueue [ ] qs; WorkQueue v; if ( ( sp = ( int ) c & ~ UNSIGNALLED ) == 0 ) { . . . createWorker ( ) ; break ; } . . . else { . . . Thread vt = v. owner; . . . LockSupport . unpark ( vt) ; break ; } } } private boolean createWorker ( ) { ForkJoinWorkerThreadFactory fac = factory; ForkJoinWorkerThread wt = null ; try { if ( fac != null && ( wt = fac. newThread ( this ) ) != null ) { wt. start ( ) ; return true ; } } . . . return false ; } }
执行
java.util.concurrent.ForkJoinWorkerThread
public class ForkJoinWorkerThread extends Thread { public void run ( ) { ForkJoinPool p = pool; ForkJoinPool. WorkQueue w = workQueue; if ( p != null && w != null ) { try { p. registerWorker ( w) ; onStart ( ) ; p. runWorker ( w) ; } . . . } }
}
java.util.concurrent.ForkJoinPool
public class ForkJoinPool extends AbstractExecutorService { final void runWorker ( WorkQueue w) { if ( mode >= 0 && w != null ) { w. config |= SRC ; int r = w. stackPred, src = 0 ; do { r ^= r << 13 ; r ^= r >>> 17 ; r ^= r << 5 ; } while ( ( src = scan ( w, src, r) ) >= 0 || ( src = awaitWork ( w) ) == 0 ) ; } } private int scan ( WorkQueue w, int prevSrc, int r) { WorkQueue [ ] qs = queues; for ( int step = ( r >>> 16 ) | 1 , i = n; i > 0 ; -- i, r += step) { int j, cap, b; WorkQueue q; ForkJoinTask < ? > [ ] a; if ( ( q = qs[ j = r & ( n - 1 ) ] ) != null && ( a = q. array) != null && ( cap = a. length) > 0 ) { . . . ForkJoinTask < ? > t = WorkQueue . getSlot ( a, k) ; . . . else if ( t != null && WorkQueue . casSlotToNull ( a, k, t) ) { . . . w. topLevelExec ( t, q) ; return src; } . . . } } . . . } public final ForkJoinTask < V > fork ( ) { Thread t; ForkJoinWorkerThread w; if ( ( t = Thread . currentThread ( ) ) instanceof ForkJoinWorkerThread ) ( w = ( ForkJoinWorkerThread ) t) . workQueue. push ( this , w. pool) ; else ForkJoinPool . common. externalPush ( this ) ; return this ; }
}
java.util.concurrent.ForkJoinPool.WorkQueue
static final class WorkQueue { final void topLevelExec ( ForkJoinTask < ? > task, WorkQueue q) { int cfg = config, nstolen = 1 ; while ( task != null ) { task. doExec ( ) ; if ( ( task = nextLocalTask ( cfg) ) == null && q != null && ( task = q. tryPoll ( ) ) != null ) ++ nstolen; } nsteals += nstolen; . . . } }
java.util.concurrent.ForkJoinTask
public abstract class ForkJoinTask < V > implements Future < V > , Serializable { volatile int status; final int doExec ( ) { int s; boolean completed; if ( ( s = status) >= 0 ) { try { completed = exec ( ) ; } . . . . . . } return s; }
}
java.util.concurrent.RecursiveTask
public abstract class RecursiveTask < V > extends ForkJoinTask < V > { protected final boolean exec ( ) { result = compute ( ) ; return true ; }
}
总结
窃取时,只窃取一个队列,参考 执行 sign_m_430 算法很精妙,了解大概流程即可