在Java中进行异步计算是比较难以理解的。一般来说,我们希望将任何计算都视为一系列步骤,但是在异步的情况下,这些步骤通常以回调函数的形式存在,要么散布在代码中,要么互相嵌套的很深。而我们需要处理可能发生在某个步骤中的错误时,情况就变得更加复杂,而CompletableFuture就是来解决这些“困扰”的。
一、什么是CompletableFuture
CompletableFuture是Java 8中引入的一个类,用于异步编程和并发操作,它大约提供了50个不同的方法,用于组合、合并和执行异步计算步骤以及处理错误。
下面是它的继承关系:
从图中可以看出,CompletableFuture类实现了Future和CompletionStage接口,这两个接口分别代表了异步任务的结果和完成状态,以及异步任务之间的依赖关系。
Future:Java 5新加的一个接口,它提供一种异步并行计算的功能,如果主线程需要执行一个很耗时的计算任务,那么我们可以通过Future把这个任务放进异步线程中执行,并且可以通过Future获取计算结果。
CompletionStage
提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法。
他可能代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后出发一些函数或执行某些动作。
二、CompletableFuture的底层实现
底层实现主要涉及到了几个重要的技术手段,如Completion链式异步处理、事件驱动、ForkJoinPool线程池、通过CompletionException捕获异常、CAS操作等。
链式结构:CompletableFuture内部采用了一种链式结构来处理异步计算的结果,每个CompletableFuture都有一个与之关联的Completion链,它可以包含多个Completion阶段,每个阶段都代表一个异步操作,并且可以指定它所依赖的前一个计算结果。【在CompletableFuture类中,定义了一个内部类Completion,它表示Completion链的一个阶段,其中包含了前一个阶段的计算结果、下一个阶段的计算操作以及执行计算操作的线程池等信息。】
事件驱动:CompletableFuture还使用了一种事件驱动的机制来处理异步计算的完成事件。在一个ComletableFuture对象上注册的Completion阶段完成后,它会出发一个完成事件,然后CompletableFuture对象会执行与之关联的下一个Completion阶段。
ForkJoinPool:CompletableFuture的异步计算是通过线程池来实现的。它在内部使用了一个ForkJoinPool线程池来执行异步任务。ForkJoinPool是一种特殊的线程池,使用了工作窃取算法来提高多线程的执行效率、可以根据需要动态地创建和回收线程、线程之间还可以共享任务,从而更好地利用计算资源。
捕获异常:CompletableFuture会通过内部的CompletionException来捕获计算过程中出现的异常。
CAS操作:CompletableFuture在状态转换和任务完成通知的过程中,使用CAS操作来保证线程安全。
三、CompletbaleFuture的使用
3.0 六个函数式接口
CompletableFuture方法的入参大部分是函数式接口,在学习之前先了解一下还是必要的。
3.1 四个静态方法
CompletableFuture提供了四个静态方法来创建一个异步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync()方法用Runnable函数时接口类型作为参数,无返回值,supplyAsync()方法以Supplier函数式接口作为参数,有返回值【用get()方法以阻塞的形式获取返回计算结果】。
Executor参数可传可不传,若不传,就用默认的ForkJoinPool线程池,反之用传入的线程池。
示例:
public class FourStaticDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//1.无返回值且默认线程池CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {System.out.println("无返回值");});
//2.有返回值且自定义线程池CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("有返回值");return "hello world";}, Executors.newFixedThreadPool(1));//获取返回值(阻塞)String s = future1.get();System.out.println("s = " + s);}
}
有没有不阻塞获取结果的方法?当然有,可以用:
public static <U> CompletableFuture<U> completedFuture(U value)
如果我们已经知道计算结果,我们可以用静态的completedFuture()方法,该方法接收一个表示计算结果的参数,因此,它的get()方法不会阻塞,而是立即返回这个结果。
3.2 结果传递
3.2.1 thenApply()
它接收一个Function示例,用它来处理结果。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
thenApply():将前面任务的执行结果,交给后面的Function。
thenApplyAsync():异步执行。
示例:thenApply入参传入Function函数式接口,重写apply方法。
public class ThenApplyDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {//返回一个"hello"CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");CompletableFuture<String> future1 = future.thenApply(new Function<String, String>() {@Overridepublic String apply(String s) {return s+" world";}});String s = future1.get();System.out.println(s);//hello world}
}
ps:上面是完整的写法,后面就用Lambda表达式的简化写法了哈。
3.2.2 thenAccept()
如果我们不需要在Future链中返回一个值,我们可以使用Consumer功能接口的实例。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
thenAccept():将前面任务的执行结果,交给后面的Consumer。
thenAcceptAsync():异步执行。
示例:
public class ThenAcceptDemo {public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");CompletableFuture<Void> future1 =future.thenAccept(s -> System.out.println(s + " world"));//hello world}
}
3.2.3 thenRun()
如果我们既不需要计算结果的值,也不想在链的最后返回任何值,那么我们可以使用thenRun()方法。
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
thenRun():将前面任务的执行结果,交给后面的Runnable。
thenRunAsync():异步执行。
示例:
public class ThenRunDemo {public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");CompletableFuture<Void> future1 = future.thenRun(() -> System.out.println("hello world"));//hello world}
}
3.2.4 thenApply()+thenAccpet()
public class ApplyAcceptDemo {public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {return 1;}, threadPool).thenApply(f -> {return f + 2;}).thenApply(f -> {return f + 2;}).thenAccept(r -> {System.out.println(r);//5});}
}
3.3 合并结果
CompletableFuture API最好的部分是能够将CompletableFuture实例组合成计算步骤链条。
3.3.1 thenCompose()
此方法接收一个返回CompletableFuture实例的函数。该函数的参数是上一步计算的结果。这使得我们可以在下一个CompletableFuture的lambda表达式中使用这个值:
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor)
thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回。
thenComposeAsync():异步执行。
示例:
public class ThenCompose {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello").thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " world"));String s = future.get();System.out.println(s);//hello world}
}
3.3.2 thenCombine()
如果我们想执行两个独立的Futures并对它们的结果进行处理,我们可以使用thenCombine方法,该方法接收一个Future和一个带有两个参数的函数来处理两个结果。
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
thenCombine():合并两个任务。
thenCombineAsync():异步执行。
示例:
public class ThenCombineDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello").thenCombine(CompletableFuture.supplyAsync(() -> " world"), (s1, s2) -> s1 + s2);String s = future.get();System.out.println(s);//hello world}
}
3.3.3 thenAcceptBoth()
一个更简单的情况是当我们相对两个Future的结果进行操作,但不需要将任何结果传递给Future链中时,可以使用thenAcceptBoth()方法。
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)
thenAcceptBoth():两个任务执行完成后,将结果交给thenAcceptBoth处理。
thenAcceptBothAsync():异步执行。
示例:
public class ThenAccpetBothDemo {public static void main(String[] args) {CompletableFuture.supplyAsync(()->"hello").thenAcceptBoth(CompletableFuture.supplyAsync(()->" world"),(s1,s2)-> System.out.println(s1+s2));//hello world}
}
3.4 异常处理
在一系列异步计算步骤中处理错误,抛出/捕获使我们惯用的方式。
3.4.1 whenComplete()
CompletableFuture类允许我们用whenComplete()方法来处理异常,参数是函数式接口Biconsumer,可以接收两个参数:计算的结果(如果成功完成)和抛出的异常(没有正常完成)
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
whenComplete():当任务完成时,可以使用结果和此阶段的异常执行操作。
whenCompleteAsync():异步执行。
示例:
public class WhenCompleteDemo {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture//除0异常.supplyAsync(() -> { int i = 1 / 0; return i;}).whenComplete((v, e) -> {if (e != null) {//出现异常,处理。。System.out.println("出现异常啦!异常:"+e.getMessage());}//没有异常System.out.println(v);});}
}
运行结果:
3.4.2 exceptionally()
exceptionally()方法是纯处理异常的操作,参数为Function函数式接口,接收的是一个Throwable类型的异常。
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
exceptionally():此方法前的链中如果出现异常会走该方法,一般跟whenComplete配合使用,捕获范围包含此方法前的所有链中的异常。
ps:出现异常才会走,而whenComplete()出没出现都会走。
示例:
public class ExceptionallyDemo {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i = 1 / 1; return i;}).exceptionally(e -> {System.out.println("出现异常啦!" + e.getCause());return null;});}
}
3.4.3 handle()
handle()和whenComplete()类似,也是接收两个参数:计算的结果(如果成功完成)和抛出的异常(没有正常完成),不同的是handle()用的是BiFunction函数式接口。
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
handle():相当于whenComplete+exceptionally。
handleAsync():异步执行。
示例:
public class HandleDemo {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture//除0异常.supplyAsync(() -> { int i = 1 / 0; return i;}).handle((v, e) -> {if (e != null) {//出现异常,处理。。System.out.println("出现异常啦!异常:"+e.getMessage());}//没有异常return v+1;});}
}
ps:乍一看,whenComplete()和handle()没有什么区别,其实不然。whenComplete()是以BiConsumer函数式接口作为参数,而BiConsumer没有返回值,就不能对上一任务的结果进行“替换”。handle()以BiFunction作为参数,它有返回值,这就意味着可以对上一任务的结果进行“替换”。说正式点就是,whenComplete()不能消费异常,而handle()可以消费异常(可以把"异常的结果"替换成“正常的结果”)。
3.4.4 whenComplete()+exceptionally()+handle()
示例:
public class WhenExceptionHandleDemo {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {return 1;}).handle((f, e) -> {System.out.println("----handle()----");if(e==null){return f + 2;}return null;}).whenComplete((v, e) -> {if (e == null) {//没有异常System.out.println("result:" + v);throw new RuntimeException("抛个异常---");}}).exceptionally(e -> {System.out.println(e.getCause());return null;});}
}
运行结果:
3.5 根据计算速度选用
这也比较好理解,就是看两个任务的处理速度,谁快就触发下一步的动作。
3.5.1 applyToEither()
两个任务谁快用谁的计算结果,有返回值。
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor)
applyToEither():两个任务比较,谁先获得计算结果,就用谁。
applyToEitherAsync():异步执行。
示例:
public class ApplyToEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {return "A";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {//睡0.1秒Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}return "B";});CompletableFuture<String> result = future1.applyToEither(future2, v -> {return v + "比较快";});String res = result.get();System.out.println(res);//A比较快}
}
3.5.2 accpetEither
和applyToEither差不多,区别在于没有返回值。
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)
acceptEither():两个任务比较,谁先获得计算结果,就用谁。
acceptEitherAsync():异步执行。
示例:
public class AcceptToEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {return "A";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {//睡0.1秒Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}return "B";});CompletableFuture<Void> result = future1.acceptEither(future2, v -> {System.out.println(v+"比较快");//A比较快});}
}
3.5.3 runAfterEither()
两个任务任一先执行完就进行下一步操作,不用计算结果,也无返回值。
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
runAfterEither():两个任务有任何一个执行完成,就进入下一步操作,无返回值。
runAfterEitherAsync():异步执行。
示例:
public class RunAfterEitherDemo {public static void main(String[] args) {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {return "A";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {//睡0.1秒Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}return "B";});CompletableFuture<Void> result = future1.runAfterEither(future2, () -> {System.out.println("有任务完成啦!!!");});//有任务完成啦!!!}
}
3.6 等待一组任务完成
当需要同时执行多个异步操作,并等待所有或者任何一个操作完成后,再进行下一步操作时,就可以考虑使用以下两个方法,一般配合join()使用。
3.6.1 allOf()
合并多个任务为一个,等待全部完成,无返回值。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
allOf():多个任务全部执行完返回。
示例:
public class AllOfDemo {public static void main(String[] args) {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("future1 执行");return 1;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("future2 执行");return 2;});CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("future3 执行");return 3;});//阻塞三个任务CompletableFuture.allOf(future1, future2, future3).join();System.out.println("future1、future2、future3全部执行完毕!");}
}
输出结果:
3.6.2 anyOf()
合并多个任务为一个,任意一个执行完成就进行下一步操作,有返回值。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
anyOf():多个任务任一执行完就返回。
示例:
public class AllOfDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("future1 执行");return "A";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(200);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("future2 执行");return "B";});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(300);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("future3 执行");return "C";});//阻塞三个任务CompletableFuture<Object> obj = CompletableFuture.anyOf(future1, future2, future3);Object o = obj.get();System.out.println(o+"执行完了!");}
}
输出结果:
3.7 其它api和Java9新增的api
3.7.1 Java 8
public T join()
public T getNow(T valueIfAbsent)
public boolean isCompletedExceptionally()
public int getNumberOfDependents()
解释:
join():阻塞获取结果,不需要抛出异常。
getNow():计算完成就返回正常值,否则返回备胎值(传入的参数),立即获取不阻塞。
isCompletedExceptionally():判断任务是否异常结束。
getNumberOfDependents():返回依赖当前任务的任务数量。
3.7.2 Java 9
CompletableFuture<U> newIncompleteFuture()
CompletableFuture<T> copy()
CompletionStage<T> minimalCompletionStage()
CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)
解释:
newIncompleteFuture():也被称为“虚拟构造函数”,用于获取相同类型的新的Completable实例。
copy():返回一个新的CompletableFuture。
minimalCompletionStage():返回一个新的CompletionStage。
orTimeout():若在指定的超时时间之前未完成,则会以异常(TimeoutException)结束。
completeOnTimeout():若在指定的超时时间之前未完成,则以给定的value为任务结果。
四、Future接口
简单介绍一下Future接口,Future是Java 5新加的一个接口,他在处理异步调用和并发处理时非常有用。
4.1 什么是Future
简单来说,Future类代表了一个异步计算的未来结果。这个结果将在处理完成后最终出现在Future中。
如果主线程需要执行一个很耗时的计算任务,那么Future将是一个不错的选择,我们可以通过Future把这个任务放入异步线程中去执行,主线程就可以去处理其它任务或提前结束。
4.2 Future的相关Api
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get()
V get(long timeout, TimeUnit unit)
解释:
cancel():取消任务,若一个任务未完成,则以CancellationException异常结束。其相关未完成的子任务也会以此异常结束。
isCancelled():判断任务是否已取消,若任务在正常执行前取消,则返回ture,反之返回false。
isDone():判断任务是否完成。三种完成情况:正常执行完成、异常完成和已取消。
get():以阻塞的方式获取计算结果,需要处理异常,捕获或者直接抛。
get(long timeout,TimeUnit unit):若超过设置的时间还未获取到结果,就直接抛异常。
4.3 FutureTask
FutureTask也是Future的一个实现类,下面我们来看一个例子:
public class FutureTaskDemo {public static void main(String[] args) throws Exception {FutureTask<String> task = new FutureTask<>(() -> {return "hello world";});new Thread(task).start();//阻塞获取结果System.out.println(task.get());//hello world}
}
4.4 与CompletableFuture对比
阻塞等待:
Future接口的get()方法在获取计算结果时会阻塞等待,可能导致线程资源浪费和性能问题。
CompletableFuture提供了一系列非阻塞的方法,如thenApply()、thenCompose()等。
无法手动完成或取消:
Future接口的实现类无法手动标记任务的完成或取消状态,只能依赖于任务本身的执行状态。
CompletableFuture提供了如complete()、completeExceptionally()、cancel()等,以便手动设置异步任务的结果、异常或取消状态。
缺乏异常梳理机制:
Future接口的get()方法在异步任务会抛出异常,但在任务完成前无法捕获和处理异常。
CompletableFuture提供了exceptionally()和handle()方法,可以方便地对异步任务的异常情况进行处理和转换。
缺少组合能力:
Future接口无法直接组合多个异步任务,不能方便的实现串行、并行等操作。
CompletableFuture提供了一系列方法,如thenApply()、thenCompose()和thenCombine()等,可以方便地实现异步任务的组合。
缺少回调机制:
Future接口无法在异步任务完成时自动触发回调函数执行。
CompletableFuture提供了如thenApply()、thenAccpept()、thenRun()等,可以在任务完成时自动执行回调函数。
总之一句话,Future能干的CompletableFuture都能干,Future不能干的CompletableFuture也能干!
五、ForkJoinPool
5.1 Fork/Join框架
Java 7引入了Fork/Join框架,它提供了一些工具,通过尽可能地提高cpu的利用率来加速并行处理,它通过分而治之的方法来实现这一点的。
实际上,这意味着该框架首先进行“fork”操作,递归地将任务分解为较小的独立子任务,知道它们足够简单可以异步运行。
之后,开始“join”部分。所有的子任务的结果被递归地合并为一个单一的结果。对于返回void的任务,程序会简单地等待每个子任务运行完毕。
为了提供有效的并行执行,Fork/Join框架使用了一个线程池——ForkJoinPool。
5.2 ForkJoinPool
ForkJoinPool是该框架的核心。它是ExecutorService的一个实现,负责管理工作线程,并提供工具以获取有关线程池的状态和性能信息。
工作线程一次只能执行一个任务,但ForkJoinPool并不为每个子任务创建一个单独的线程。相反,池中的每个线程都有自己的双端队列(deque),用于存储任务。
5.2.1 工作窃取算法
简单来说,空闲的线程会尝试从繁忙线程的deque中“窃取”任务。
默认情况下,工作线程从自己的deque头部获取任务。当deque为空时,线程会从另一个繁忙线程的deque尾部获取任务,或者从全局入口队列获取任务,因为那里可能存放着最大的工作块。
这种方式最大程度地减少了线程竞争任务的可能性。它还减少了线程寻找任务的次数,因为首先处理最大的可用工作块。
5.2.2 CompletableFuture使用ForkJoinPool的原因
主要原因时因为它的执行模型和任务分割方式与ForlJoinPool更加匹配。
-
在CompletableFuture中,一个任务可以分割成多个子任务,并且这些子任务之间可以存在依赖关系。而ForkJoinPool本身就是一种支持任务分割和合并的线程池实现,能够自动地处理任务的拆分和合并。而且,ForkJoinPool还有一种工作窃取算法,能够自动地调整线程的负载,提高线程的利用率和并行度。
-
ForkJoinPool还有一个特点,就是它的线程池大小是动态调整的。当任务比较小时,线程池的大小会自动缩小,从而减少线程的数量和占用的系统资源。当任务比较多时,线程池的大小会自动增加,从而保证任务能够及时地得到执行。
-
如果使用ExcutorService来执行这些任务,需要手动地创建线程池、任务队列和任务执行策略,并且需要手动地处理任务的拆分和合并,实现起来比较复杂。
End:希望对大家有所帮助,如果有纰漏或者更好的想法,请您一定不要吝啬你的赐教🙋。