归档
- GitHub: JDK-CompletableFuture
使用示例
- https://github.com/zengxf/small-frame-demo/blob/master/jdk-demo/simple-demo/src/main/java/test/new_features/jdk1_8/juc/TestCompletableFuture.java
- 基础方法使用测试:
testThenApply2()
JDK 版本
openjdk version "17" 2021-09-14
OpenJDK Runtime Environment (build 17+35-2724)
OpenJDK 64-Bit Server VM (build 17+35-2724, mixed mode, sharing)
原理
类结构
java.util.concurrent.CompletableFuture
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {volatile Object result; // 结果或封装的异常volatile Completion stack; // 依赖操作的栈顶 (组装单向链表)
}
java.util.concurrent.CompletableFuture.Completion
static abstract class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask{volatile Completion next; // 组装单向链表// ------ 方法定义 ------/** 触发:执行完成操作,返回可能需要传播的依赖项(如果存在)。 */abstract CompletableFuture<?> tryFire(int mode);abstract boolean isLive(); // 判断是否可触发public final void run() { tryFire(ASYNC); } // Runnablepublic final Void getRawResult() { return null; } // ForkJoinTaskpublic final void setRawResult(Void v) {} // ForkJoinTaskpublic final boolean exec() { tryFire(ASYNC); return false; } }
java.util.concurrent.CompletableFuture.AsynchronousCompletionTask
// 异步任务标识接口(无其他定义)public static interface AsynchronousCompletionTask {}
java.util.concurrent.CompletableFuture.AsyncSupply
// sign_c_030 异步生成数据static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<T> dep; Supplier<? extends T> fn; // 数据提供者AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {this.dep = dep; this.fn = fn;}public final Void getRawResult() { return null; } // ForkJoinTaskpublic final void setRawResult(Void v) {} // ForkJoinTaskpublic final boolean exec() { run(); return false; }// ForkJoinTask}
java.util.concurrent.CompletableFuture.UniApply
// sign_c_040static final class UniApply<T, V> extends UniCompletion<T, V> {Function<? super T, ? extends V> fn;UniApply( // sign_cm_050Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src,Function<? super T, ? extends V> fn) {super(executor, dep, src); this.fn = fn;}}
java.util.concurrent.CompletableFuture.UniCompletion
abstract static class UniCompletion<T,V> extends Completion {Executor executor; // 要使用的执行器(如果没有则为 null)CompletableFuture<V> dep; // 要完成的依赖项CompletableFuture<T> src; // 行动来源UniCompletion(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src) {this.executor = executor; this.dep = dep; this.src = src;}final boolean isLive() { return dep != null; }}
java.util.concurrent.CompletableFuture.UniAccept
// sign_c_060static final class UniAccept<T> extends UniCompletion<T, Void> {Consumer<? super T> fn;... // 构造器类似:UniApply, ref: sign_cm_050}
java.util.concurrent.CompletableFuture.UniRun
// sign_c_070static final class UniRun<T> extends UniCompletion<T, Void> {Runnable fn;... // 构造器类似:UniApply, ref: sign_cm_050}
初始链
supplyAsync()
java.util.concurrent.CompletableFuture
// 调用入口,ref: sign_demo_010public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);}static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {...CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f)); // 进行执行数据生成,ref: sign_c_030 | sign_m_110return d;}
java.util.concurrent.CompletableFuture.AsyncSupply
// sign_m_110 数据生成public void run() {CompletableFuture<T> d; Supplier<? extends T> f;if ((d = dep) != null && (f = fn) != null) {dep = null; fn = null;if (d.result == null) {try {d.completeValue(f.get()); // 获取数据并填充结果} catch (Throwable ex) {d.completeThrowable(ex); // 出错时,封装异常填充结果}}d.postComplete(); // 传递给后面依赖项,ref: sign_m_310}}
thenApply()
java.util.concurrent.CompletableFuture
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);}// sign_m_210private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {...Object r;if ((r = result) != null)return uniApplyNow(r, e, f);CompletableFuture<V> d = newIncompleteFuture();unipush(new UniApply<T,V>(e, d, this, f)); // ref: sign_m_230 | sign_c_040return d;}public <U> CompletableFuture<U> newIncompleteFuture() {return new CompletableFuture<U>();}// sign_m_230final void unipush(Completion c) {if (c != null) {while (!tryPushStack(c)) {if (result != null) {NEXT.set(c, null); // 相当于:c.next = null;break;}}if (result != null)c.tryFire(SYNC); // 有结果就直接触发下级执行}}final boolean tryPushStack(Completion c) {Completion h = stack;NEXT.set(c, h); // 相当于:c.next = stack;return STACK.compareAndSet(this, h, c); // 相当于:stack = c;}
thenApplyAsync()
java.util.concurrent.CompletableFuture
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {return uniApplyStage(screenExecutor(executor), fn); // ref: sign_m_210}
thenAccept()
java.util.concurrent.CompletableFuture
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {return uniAcceptStage(null, action);}private CompletableFuture<Void> uniAcceptStage(Executor e,Consumer<? super T> f) {...Object r;if ((r = result) != null)return uniAcceptNow(r, e, f);CompletableFuture<Void> d = newIncompleteFuture();unipush(new UniAccept<T>(e, d, this, f)); // ref: sign_m_230 | sign_c_060return d;}
thenRun()
java.util.concurrent.CompletableFuture
public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);}private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {...Object r;if ((r = result) != null)return uniRunNow(r, e, f);CompletableFuture<Void> d = newIncompleteFuture();unipush(new UniRun<T>(e, d, this, f)); // ref: sign_m_230 | sign_c_070return d;}
链结构
// dep (new CF)// CompletableFuture (dep.stack)
AsyncSupply-1 -> UniApply-2 -> UniApply-3 -> UniAccept -> UniRun// UniCompletion(next & src)
UniRun -> UniAccept -> UniApply-3 -> UniApply-2 -> AsyncSupply-1
调用链
postComplete()
java.util.concurrent.CompletableFuture
// sign_m_310 弹出并尝试触发所有可到达的依赖项final void postComplete() {CompletableFuture<?> f = this; Completion h;while ((h = f.stack) != null ||(f != this && (h = (f = this).stack) != null)) {CompletableFuture<?> d; Completion t;if (STACK.compareAndSet(f, h, t = h.next)) {...f = (d = h.tryFire(NESTED)) == null ? this : d; // 触发具体操作逻辑}}}
UniApply
java.util.concurrent.CompletableFuture.UniApply
final CompletableFuture<V> tryFire(int mode) {CompletableFuture<V> d; CompletableFuture<T> a;Object r; Throwable x; Function<? super T,? extends V> f;if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (f = fn) == null)return null;tryComplete: if (d.result == null) {... // 源异常处理try {if (mode <= 0 && !claim()) // ref: sign_m_325return null; // 如果判断为异步执行,则进入此逻辑else {T t = (T) r; // 源的结果d.completeValue(f.apply(t)); // 调用 Function 转换并设置结果}} ... // catch}src = null; dep = null; fn = null;return d.postFire(a, mode); // 传给下一项}
java.util.concurrent.CompletableFuture.UniCompletion
// sign_m_325// 如果操作可以运行,则返回 true (相当于没设置线程池,不用异步执行)final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag(0, 1)) { // 一般 CAS 成功,进入此逻辑if (e == null)return true; // 没有设置线程池,表示同步执行executor = null; // 置空,防止死循环e.execute(this); // 异步执行}return false;}
UniAccept
java.util.concurrent.CompletableFuture.UniAccept
final CompletableFuture<Void> tryFire(int mode) {CompletableFuture<Void> d; CompletableFuture<T> a;Object r; Throwable x; Consumer<? super T> f;if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (f = fn) == null)return null;tryComplete: if (d.result == null) {... // 源异常处理try {if (mode <= 0 && !claim())return null;else {T t = (T) r;f.accept(t); // 调用 Consumer 消费上游结果d.completeNull();}} ... // catch}src = null; dep = null; fn = null;return d.postFire(a, mode); // 传给下一项}
UniRun
java.util.concurrent.CompletableFuture.UniRun
final CompletableFuture<Void> tryFire(int mode) {... // 类似 UniAccept 处理f.run(); // 调用 Runnable 运行d.completeNull();...}
两者组合
acceptEither()
-
两个只要有一个完成,就传递给下游
-
java.util.concurrent.CompletableFuture.OrAccept
static final class OrAccept<T, U extends T> extends BiCompletion<T, U, Void> {final CompletableFuture<Void> tryFire(int mode) {CompletableFuture<Void> d; CompletableFuture<? extends T> a, b;Object r; Throwable x; Consumer<? super T> f;if ((a = src) == null || (b = snd) == null|| ((r = a.result) == null && (r = b.result) == null) // 只要有一个不为 null,就算完成|| (d = dep) == null || (f = fn) == null)return null;... // 类似 UniAccept 处理f.accept(t); // 调用 Consumer 消费上游结果d.completeNull();...}}
thenAcceptBoth()
-
两个必须都完成,才传递给下游
-
java.util.concurrent.CompletableFuture.BiAccept
static final class BiAccept<T, U> extends BiCompletion<T, U, Void> {final CompletableFuture<Void> tryFire(int mode) {CompletableFuture<Void> d; CompletableFuture<T> a; CompletableFuture<U> b;Object r, s; BiConsumer<? super T,? super U> f;if ( (a = src) == null || (r = a.result) == null|| (b = snd) == null || (s = b.result) == null|| (d = dep) == null || (f = fn) == null|| !d.biAccept(r, s, f, mode > 0 ? null : this)) // r, s 都不为空,才进入此,ref: sign_m_510return null;...}}
java.util.concurrent.CompletableFuture
// sign_m_510final <R, S> boolean biAccept(Object r, Object s,BiConsumer<? super R, ? super S> f,BiAccept<R, S> c) {...if (result == null) {... // 源异常处理try {if (c != null && !c.claim())return false;R rr = (R) r;S ss = (S) s;f.accept(rr, ss); // 调用 BiConsumer 消费上游结果completeNull();} ... // catch}return true;}