2.3 CompletableFuture源码分析
CompletableFuture的源码内容特别多。不需要把所有源码都看了,更多的是要掌握整个CompletableFuture的源码执行流程,以及任务的执行时机。
从CompletableFuture中比较简单的方法作为分析的入口,从而掌握整体执行的流程。
2.3.1 当前任务执行方式
将任务和CompletableFuture封装到一起,再执行封住好的具体对象的run方法即可
// 提交任务到CompletableFuture
public static CompletableFuture<Void> runAsync(Runnable runnable) {
// asyncPool:执行任务的线程池
// runnable:具体任务。
return asyncRunStage(asyncPool, runnable);
}
// 内部执行的方法
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
// 对任务做非空校验
if (f == null) throw new NullPointerException();
// 直接构建了CompletableFuture的对象,作为最后的返回结果
CompletableFuture<Void> d = new CompletableFuture<Void>();
// 将任务和CompletableFuture对象封装为了AsyncRun的对象
// 将封装好的任务交给了线程池去执行
e.execute(new AsyncRun(d, f));
// 返回构建好的CompletableFuture
return d;
}
// 封装任务的AsyncRun类信息
static final class AsyncRun extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
// 声明存储CompletableFuture对象以及任务的成员变量
CompletableFuture<Void> dep;
Runnable fn;
// 将传入的属性赋值给成员变量AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
this.dep = dep;
this.fn = fn;
}
// 当前对象作为任务提交给线程池之后,必然会执行当前方法
public void run() {
// 声明局部变量
CompletableFuture<Void> d; Runnable f;
// 将成员变量赋值给局部变量,并且做非空判断
if ((d = dep) != null && (f = fn) != null) {
// help GC,将成员变量置位null,只要当前任务结束后,成员变量也拿不到引用。
dep = null; fn = null;
// 先确认任务没有执行。
if (d.result == null) {
try {
// 直接执行任务
f.run();
// 当前方法是针对Runnable任务的,不能将结果置位null
// 要给没有返回结果的Runnable做一个返回结果
d.completeNull();
} catch (Throwable ex) {
// 异常结束!
d.completeThrowable(ex);
}
}
d.postComplete();
}}
}
2.3.2 任务编排的存储&执行方式
首先如果要在前继任务处理后,执行后置任务的话。
有两种情况:
● 前继任务如果没有执行完毕,后置任务需要先放在stack栈结构中存储
● 前继任务已经执行完毕了,后置任务就应该直接执行,不需要在往stack中存储了。
如果单独采用thenRun在一个任务后面指定多个后继任务,CompletableFuture无法保证具体的执行顺序,而影响执行顺序的是前继任务的执行时间,以及后置任务编排的时机。
2.3.3 任务编排流程
// 编排任务,前继任务搞定,后继任务再执行
public CompletableFuture<Void> thenRun(Runnable action) {
// 执行了内部的uniRunStage方法,
// null:线程池,现在没给。
// action:具体要执行的任务
return uniRunStage(null, action);
}
// 内部编排任务方法
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {// 后继任务不能为null,健壮性判断
if (f == null) throw new NullPointerException();
// 创建CompletableFuture对象d,与后继任务f绑定
CompletableFuture<Void> d = new CompletableFuture<Void>();
// 如果线程池不为null,代表异步执行,将任务压栈
// 如果线程池是null,先基于uniRun尝试下,看任务能否执行
if (e != null || !d.uniRun(this, f, null)) {
// 如果传了线程池,这边需要走一下具体逻辑
// e:线程池
// d:后继任务的CompletableFuture
// this:前继任务的CompletableFuture
// f:后继任务
UniRun<T> c = new UniRun<T>(e, d, this, f);
// 将封装好的任务,push到stack栈结构
// 只要前继任务没结束,这边就可以正常的将任务推到栈结构中
// 放入栈中可能会失败
push(c);
// 无论压栈成功与否,都要尝试执行以下。
c.tryFire(SYNC);
}
// 无论任务执行完毕与否,都要返回后继任务的CompletableFuture
return d;
}
2.3.4 查看后置任务执行时机
任务在编排到前继任务时,因为前继任务已经结束了,这边后置任务会主动的执行
// 后置任务无论压栈成功与否,都需要执行tryFire方法
static final class UniRun<T> extends UniCompletion<T,Void> {
Runnable fn;
// executor:线程池
// dep:后置任务的CompletableFuture
// src:前继任务的CompletableFuture
// fn:具体的任务
UniRun(Executor executor, CompletableFuture<Void> dep,CompletableFuture<T> src, Runnable fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<Void> tryFire(int mode) {
// 声明局部变量
CompletableFuture<Void> d; CompletableFuture<T> a;
// 赋值局部变量
// (d = dep) == null:赋值加健壮性校验
if ((d = dep) == null ||
// 调用uniRun。
// a:前继任务的CompletableFuture
// fn:后置任务
// 第三个参数:传入的是this,是UniRun对象
!d.uniRun(a = src, fn, mode > 0 ? null : this))
// 进到这,说明前继任务没结束,等!
return null;
dep = null; src = null; fn = null;return d.postFire(a, mode);
}
}
// 是否要主动执行任务
final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
// 方法要么正常结束,要么异常结束
Object r; Throwable x;
// a == null:健壮性校验
// (r = a.result) == null:判断前继任务结束了么?
// f == null:健壮性校验
if (a == null || (r = a.result) == null || f == null)
// 到这代表任务没结束。
return false;
// 后置任务执行了没? == null,代表没执行
if (result == null) {
// 如果前继任务的结果是异常结束。如果前继异常结束,直接告辞,封装异常结果
if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
completeThrowable(x, r);
else
// 到这,前继任务正常结束,后置任务正常执行
try {
// 如果基于tryFire(SYNC)进来,这里的C不为null,执行c.claim
// 如果是因为没有传递executor,c就是null,不会执行c.claim
if (c != null && !c.claim())
// 如果返回false,任务异步执行了,直接return false
return false;// 如果claim没有基于线程池运行任务,那这里就是同步执行
// 直接f.run了。
f.run();
// 封装Null结果
completeNull();
} catch (Throwable ex) {
// 封装异常结果
completeThrowable(ex);
}
}
return true;
}
// 异步的线程池处理任务
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
// 只要有线程池对象,不为null
if (e == null)
return true;
executor = null; // disable
// 基于线程池的execute去执行任务
e.execute(this);
}
return false;
}
前继任务执行完毕后,基于嵌套的方式执行后置。
// A:嵌套了B+C, B:嵌套了D+E
// 前继任务搞定,遍历stack执行后置任务
// A任务处理完,解决嵌套的B和C
final void postComplete() {
// f:前继任务的CompletableFuture
// h:存储后置任务的栈结构
CompletableFuture<?> f = this; Completion h;
// (h = f.stack) != null:赋值加健壮性判断,要确保栈中有数据
while ((h = f.stack) != null ||
// 循环一次后,对后续节点的赋值以及健壮性判断,要确保栈中有数据
(f != this && (h = (f = this).stack) != null)) {
// t:当前栈中任务的后续任务
CompletableFuture<?> d; Completion t;
// 拿到之前的栈顶h后,将栈顶换数据
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
// 执行tryFire方法,
f = (d = h.tryFire(NESTED)) == null ? this : d;
}}
}
// 回来了 NESTED == -1
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniRun(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
// 内部会执行postComplete,运行B内部嵌套的D和E
return d.postFire(a, mode);
}