Java并发编程第7讲——CompletableFuture、Future和ForkJoinPool(万字详解)

在Java中进行异步计算是比较难以理解的。一般来说,我们希望将任何计算都视为一系列步骤,但是在异步的情况下,这些步骤通常以回调函数的形式存在,要么散布在代码中,要么互相嵌套的很深。而我们需要处理可能发生在某个步骤中的错误时,情况就变得更加复杂,而CompletableFuture就是来解决这些“困扰”的。

一、什么是CompletableFuture

CompletableFuture是Java 8中引入的一个类,用于异步编程和并发操作,它大约提供了50个不同的方法,用于组合、合并和执行异步计算步骤以及处理错误。

下面是它的继承关系:

从图中可以看出,CompletableFuture类实现了FutureCompletionStage接口,这两个接口分别代表了异步任务的结果和完成状态,以及异步任务之间的依赖关系。

  • Future:Java 5新加的一个接口,它提供一种异步并行计算的功能,如果主线程需要执行一个很耗时的计算任务,那么我们可以通过Future把这个任务放进异步线程中执行,并且可以通过Future获取计算结果。

  • CompletionStage

    • 提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法。

    • 他可能代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后出发一些函数或执行某些动作。

二、CompletableFuture的底层实现

底层实现主要涉及到了几个重要的技术手段,如Completion链式异步处理、事件驱动、ForkJoinPool线程池、通过CompletionException捕获异常、CAS操作等。

  • 链式结构:CompletableFuture内部采用了一种链式结构来处理异步计算的结果,每个CompletableFuture都有一个与之关联的Completion链,它可以包含多个Completion阶段,每个阶段都代表一个异步操作,并且可以指定它所依赖的前一个计算结果。【在CompletableFuture类中,定义了一个内部类Completion,它表示Completion链的一个阶段,其中包含了前一个阶段的计算结果、下一个阶段的计算操作以及执行计算操作的线程池等信息。

  • 事件驱动:CompletableFuture还使用了一种事件驱动的机制来处理异步计算的完成事件。在一个ComletableFuture对象上注册的Completion阶段完成后,它会出发一个完成事件,然后CompletableFuture对象会执行与之关联的下一个Completion阶段。

  • ForkJoinPool:CompletableFuture的异步计算是通过线程池来实现的。它在内部使用了一个ForkJoinPool线程池来执行异步任务。ForkJoinPool是一种特殊的线程池,使用了工作窃取算法来提高多线程的执行效率、可以根据需要动态地创建和回收线程、线程之间还可以共享任务,从而更好地利用计算资源。

  • 捕获异常:CompletableFuture会通过内部的CompletionException来捕获计算过程中出现的异常。

  • CAS操作:CompletableFuture在状态转换和任务完成通知的过程中,使用CAS操作来保证线程安全。

三、CompletbaleFuture的使用

3.0 六个函数式接口

CompletableFuture方法的入参大部分是函数式接口,在学习之前先了解一下还是必要的。

3.1 四个静态方法

CompletableFuture提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • runAsync()方法用Runnable函数时接口类型作为参数,无返回值,supplyAsync()方法以Supplier函数式接口作为参数,有返回值【用get()方法以阻塞的形式获取返回计算结果】。

  • Executor参数可传可不传,若不传,就用默认的ForkJoinPool线程池,反之用传入的线程池。

示例:

public class FourStaticDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//1.无返回值且默认线程池CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {System.out.println("无返回值");});
​//2.有返回值且自定义线程池CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("有返回值");return "hello world";}, Executors.newFixedThreadPool(1));//获取返回值(阻塞)String s = future1.get();System.out.println("s = " + s);}
}

有没有不阻塞获取结果的方法?当然有,可以用:

public static <U> CompletableFuture<U> completedFuture(U value)

如果我们已经知道计算结果,我们可以用静态的completedFuture()方法,该方法接收一个表示计算结果的参数,因此,它的get()方法不会阻塞,而是立即返回这个结果。

3.2 结果传递

3.2.1 thenApply()

它接收一个Function示例,用它来处理结果。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
  • thenApply():将前面任务的执行结果,交给后面的Function。

  • thenApplyAsync():异步执行。

示例:thenApply入参传入Function函数式接口,重写apply方法。

public class ThenApplyDemo {
​public static void main(String[] args) throws ExecutionException, InterruptedException {//返回一个"hello"CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");CompletableFuture<String> future1 = future.thenApply(new Function<String, String>() {@Overridepublic String apply(String s) {return s+" world";}});String s = future1.get();System.out.println(s);//hello world}
}

ps:上面是完整的写法,后面就用Lambda表达式的简化写法了哈。

3.2.2 thenAccept()

如果我们不需要在Future链中返回一个值,我们可以使用Consumer功能接口的实例。

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
  • thenAccept():将前面任务的执行结果,交给后面的Consumer。

  • thenAcceptAsync():异步执行。

示例:

public class ThenAcceptDemo {public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");CompletableFuture<Void> future1 =future.thenAccept(s -> System.out.println(s + " world"));//hello world}
}

3.2.3 thenRun()

如果我们既不需要计算结果的值,也不想在链的最后返回任何值,那么我们可以使用thenRun()方法。

public CompletableFuture<Void> thenRun(Runnable action) 
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) 
  • thenRun():将前面任务的执行结果,交给后面的Runnable。

  • thenRunAsync():异步执行。

示例:

public class ThenRunDemo {public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");CompletableFuture<Void> future1 = future.thenRun(() -> System.out.println("hello world"));//hello world}
}

3.2.4 thenApply()+thenAccpet()

public class ApplyAcceptDemo {public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {return 1;}, threadPool).thenApply(f -> {return f + 2;}).thenApply(f -> {return f + 2;}).thenAccept(r -> {System.out.println(r);//5});}
}

3.3 合并结果

CompletableFuture API最好的部分是能够将CompletableFuture实例组合成计算步骤链条。

3.3.1 thenCompose()

此方法接收一个返回CompletableFuture实例的函数。该函数的参数是上一步计算的结果。这使得我们可以在下一个CompletableFuture的lambda表达式中使用这个值:

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) 
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) 
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) 
  • thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回。

  • thenComposeAsync():异步执行。

示例:

public class ThenCompose {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello").thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " world"));String s = future.get();System.out.println(s);//hello world}
}

3.3.2 thenCombine()

如果我们想执行两个独立的Futures并对它们的结果进行处理,我们可以使用thenCombine方法,该方法接收一个Future和一个带有两个参数的函数来处理两个结果。

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) 
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) 
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor) 
  • thenCombine():合并两个任务。

  • thenCombineAsync():异步执行。

示例:

public class ThenCombineDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello").thenCombine(CompletableFuture.supplyAsync(() -> " world"), (s1, s2) -> s1 + s2);String s = future.get();System.out.println(s);//hello world}
}

3.3.3 thenAcceptBoth()

一个更简单的情况是当我们相对两个Future的结果进行操作,但不需要将任何结果传递给Future链中时,可以使用thenAcceptBoth()方法。

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) 
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) 
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) 
  • thenAcceptBoth():两个任务执行完成后,将结果交给thenAcceptBoth处理。

  • thenAcceptBothAsync():异步执行。

示例:

public class ThenAccpetBothDemo {public static void main(String[] args) {CompletableFuture.supplyAsync(()->"hello").thenAcceptBoth(CompletableFuture.supplyAsync(()->" world"),(s1,s2)-> System.out.println(s1+s2));//hello world}
}

3.4 异常处理

在一系列异步计算步骤中处理错误,抛出/捕获使我们惯用的方式。

3.4.1 whenComplete()

CompletableFuture类允许我们用whenComplete()方法来处理异常,参数是函数式接口Biconsumer,可以接收两个参数:计算的结果(如果成功完成)和抛出的异常(没有正常完成)

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
  • whenComplete():当任务完成时,可以使用结果和此阶段的异常执行操作。

  • whenCompleteAsync():异步执行。

示例:

public class WhenCompleteDemo {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture//除0异常.supplyAsync(() -> { int i = 1 / 0; return i;}).whenComplete((v, e) -> {if (e != null) {//出现异常,处理。。System.out.println("出现异常啦!异常:"+e.getMessage());}//没有异常System.out.println(v);});}
}

运行结果:

3.4.2 exceptionally()

exceptionally()方法是纯处理异常的操作,参数为Function函数式接口,接收的是一个Throwable类型的异常。

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) 
  • exceptionally():此方法前的链中如果出现异常会走该方法,一般跟whenComplete配合使用,捕获范围包含此方法前的所有链中的异常。

ps:出现异常才会走,而whenComplete()出没出现都会走。

示例:

public class ExceptionallyDemo {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i = 1 / 1; return i;}).exceptionally(e -> {System.out.println("出现异常啦!" + e.getCause());return null;});}
}

3.4.3 handle()

handle()和whenComplete()类似,也是接收两个参数:计算的结果(如果成功完成)和抛出的异常(没有正常完成),不同的是handle()用的是BiFunction函数式接口。

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) 
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) 
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) 
  • handle():相当于whenComplete+exceptionally。

  • handleAsync():异步执行。

示例:

public class HandleDemo {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture//除0异常.supplyAsync(() -> { int i = 1 / 0; return i;}).handle((v, e) -> {if (e != null) {//出现异常,处理。。System.out.println("出现异常啦!异常:"+e.getMessage());}//没有异常return v+1;});}
}

ps:乍一看,whenComplete()和handle()没有什么区别,其实不然。whenComplete()是以BiConsumer函数式接口作为参数,而BiConsumer没有返回值,就不能对上一任务的结果进行“替换”。handle()以BiFunction作为参数,它有返回值,这就意味着可以对上一任务的结果进行“替换”。说正式点就是,whenComplete()不能消费异常,而handle()可以消费异常(可以把"异常的结果"替换成“正常的结果”)。

3.4.4 whenComplete()+exceptionally()+handle()

示例:

public class WhenExceptionHandleDemo {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {return 1;}).handle((f, e) -> {System.out.println("----handle()----");if(e==null){return f + 2;}return null;}).whenComplete((v, e) -> {if (e == null) {//没有异常System.out.println("result:" + v);throw new RuntimeException("抛个异常---");}}).exceptionally(e -> {System.out.println(e.getCause());return null;});}
}

运行结果:

 

3.5 根据计算速度选用

这也比较好理解,就是看两个任务的处理速度,谁快就触发下一步的动作。

3.5.1 applyToEither()

两个任务谁快用谁的计算结果,有返回值。

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) 
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor) 
  • applyToEither():两个任务比较,谁先获得计算结果,就用谁。

  • applyToEitherAsync():异步执行。

示例:

public class ApplyToEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {return "A";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {//睡0.1秒Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}return "B";});CompletableFuture<String> result = future1.applyToEither(future2, v -> {return v + "比较快";});String res = result.get();System.out.println(res);//A比较快}
}

3.5.2 accpetEither

和applyToEither差不多,区别在于没有返回值。

public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) 
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) 
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor) 
  • acceptEither():两个任务比较,谁先获得计算结果,就用谁。

  • acceptEitherAsync():异步执行。

示例:

public class AcceptToEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {return "A";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {//睡0.1秒Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}return "B";});CompletableFuture<Void> result = future1.acceptEither(future2, v -> {System.out.println(v+"比较快");//A比较快});}
}

3.5.3 runAfterEither()

两个任务任一先执行完就进行下一步操作,不用计算结果,也无返回值。

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) 
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action) 
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
  • runAfterEither():两个任务有任何一个执行完成,就进入下一步操作,无返回值。

  • runAfterEitherAsync():异步执行。

示例:

public class RunAfterEitherDemo {public static void main(String[] args) {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {return "A";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {//睡0.1秒Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}return "B";});CompletableFuture<Void> result = future1.runAfterEither(future2, () -> {System.out.println("有任务完成啦!!!");});//有任务完成啦!!!}
}

3.6 等待一组任务完成

当需要同时执行多个异步操作,并等待所有或者任何一个操作完成后,再进行下一步操作时,就可以考虑使用以下两个方法,一般配合join()使用。

3.6.1 allOf()

合并多个任务为一个,等待全部完成,无返回值。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
  • allOf():多个任务全部执行完返回。

示例:

public class AllOfDemo {public static void main(String[] args) {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("future1 执行");return 1;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("future2 执行");return 2;});CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("future3 执行");return 3;});//阻塞三个任务CompletableFuture.allOf(future1, future2, future3).join();System.out.println("future1、future2、future3全部执行完毕!");}
}

输出结果:

3.6.2 anyOf()

合并多个任务为一个,任意一个执行完成就进行下一步操作,有返回值。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
  • anyOf():多个任务任一执行完就返回。

示例:

public class AllOfDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("future1 执行");return "A";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(200);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("future2 执行");return "B";});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(300);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("future3 执行");return "C";});//阻塞三个任务CompletableFuture<Object> obj = CompletableFuture.anyOf(future1, future2, future3);Object o = obj.get();System.out.println(o+"执行完了!");}
}

输出结果:

3.7 其它api和Java9新增的api

3.7.1 Java 8

public T join()
public T getNow(T valueIfAbsent)
public boolean isCompletedExceptionally()
public int getNumberOfDependents()

解释:

  • join():阻塞获取结果,不需要抛出异常。

  • getNow():计算完成就返回正常值,否则返回备胎值(传入的参数),立即获取不阻塞。

  • isCompletedExceptionally():判断任务是否异常结束。

  • getNumberOfDependents():返回依赖当前任务的任务数量。

3.7.2 Java 9

CompletableFuture<U> newIncompleteFuture()
CompletableFuture<T> copy()
CompletionStage<T> minimalCompletionStage()
CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)

解释:

  • newIncompleteFuture():也被称为“虚拟构造函数”,用于获取相同类型的新的Completable实例。

  • copy():返回一个新的CompletableFuture。

  • minimalCompletionStage():返回一个新的CompletionStage。

  • orTimeout():若在指定的超时时间之前未完成,则会以异常(TimeoutException)结束。

  • completeOnTimeout():若在指定的超时时间之前未完成,则以给定的value为任务结果。

四、Future接口

简单介绍一下Future接口,Future是Java 5新加的一个接口,他在处理异步调用和并发处理时非常有用。

4.1 什么是Future

简单来说,Future类代表了一个异步计算的未来结果。这个结果将在处理完成后最终出现在Future中。

如果主线程需要执行一个很耗时的计算任务,那么Future将是一个不错的选择,我们可以通过Future把这个任务放入异步线程中去执行,主线程就可以去处理其它任务或提前结束。

4.2 Future的相关Api

boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get()
V get(long timeout, TimeUnit unit)

解释:

  • cancel():取消任务,若一个任务未完成,则以CancellationException异常结束。其相关未完成的子任务也会以此异常结束。

  • isCancelled():判断任务是否已取消,若任务在正常执行前取消,则返回ture,反之返回false。

  • isDone():判断任务是否完成。三种完成情况:正常执行完成、异常完成和已取消。

  • get():以阻塞的方式获取计算结果,需要处理异常,捕获或者直接抛。

  • get(long timeout,TimeUnit unit):若超过设置的时间还未获取到结果,就直接抛异常。

4.3 FutureTask

FutureTask也是Future的一个实现类,下面我们来看一个例子:

public class FutureTaskDemo {public static void main(String[] args) throws Exception {FutureTask<String> task = new FutureTask<>(() -> {return "hello world";});new Thread(task).start();//阻塞获取结果System.out.println(task.get());//hello world}
}

4.4 与CompletableFuture对比

  • 阻塞等待:

    • Future接口的get()方法在获取计算结果时会阻塞等待,可能导致线程资源浪费和性能问题。

    • CompletableFuture提供了一系列非阻塞的方法,如thenApply()、thenCompose()等。

  • 无法手动完成或取消:

    • Future接口的实现类无法手动标记任务的完成或取消状态,只能依赖于任务本身的执行状态。

    • CompletableFuture提供了如complete()、completeExceptionally()、cancel()等,以便手动设置异步任务的结果、异常或取消状态。

  • 缺乏异常梳理机制:

    • Future接口的get()方法在异步任务会抛出异常,但在任务完成前无法捕获和处理异常。

    • CompletableFuture提供了exceptionally()和handle()方法,可以方便地对异步任务的异常情况进行处理和转换。

  • 缺少组合能力:

    • Future接口无法直接组合多个异步任务,不能方便的实现串行、并行等操作。

    • CompletableFuture提供了一系列方法,如thenApply()、thenCompose()和thenCombine()等,可以方便地实现异步任务的组合。

  • 缺少回调机制:

    • Future接口无法在异步任务完成时自动触发回调函数执行。

    • CompletableFuture提供了如thenApply()、thenAccpept()、thenRun()等,可以在任务完成时自动执行回调函数。

总之一句话,Future能干的CompletableFuture都能干,Future不能干的CompletableFuture也能干!

五、ForkJoinPool

5.1 Fork/Join框架

Java 7引入了Fork/Join框架,它提供了一些工具,通过尽可能地提高cpu的利用率来加速并行处理,它通过分而治之的方法来实现这一点的。

实际上,这意味着该框架首先进行“fork”操作,递归地将任务分解为较小的独立子任务,知道它们足够简单可以异步运行。

之后,开始“join”部分。所有的子任务的结果被递归地合并为一个单一的结果。对于返回void的任务,程序会简单地等待每个子任务运行完毕。

为了提供有效的并行执行,Fork/Join框架使用了一个线程池——ForkJoinPool。

5.2 ForkJoinPool

ForkJoinPool是该框架的核心。它是ExecutorService的一个实现,负责管理工作线程,并提供工具以获取有关线程池的状态和性能信息。

工作线程一次只能执行一个任务,但ForkJoinPool并不为每个子任务创建一个单独的线程。相反,池中的每个线程都有自己的双端队列(deque),用于存储任务。

5.2.1 工作窃取算法

简单来说,空闲的线程会尝试从繁忙线程的deque中“窃取”任务。

默认情况下,工作线程从自己的deque头部获取任务。当deque为空时,线程会从另一个繁忙线程的deque尾部获取任务,或者从全局入口队列获取任务,因为那里可能存放着最大的工作块。

这种方式最大程度地减少了线程竞争任务的可能性。它还减少了线程寻找任务的次数,因为首先处理最大的可用工作块。

5.2.2 CompletableFuture使用ForkJoinPool的原因

主要原因时因为它的执行模型和任务分割方式与ForlJoinPool更加匹配。

  • 在CompletableFuture中,一个任务可以分割成多个子任务,并且这些子任务之间可以存在依赖关系。而ForkJoinPool本身就是一种支持任务分割和合并的线程池实现,能够自动地处理任务的拆分和合并。而且,ForkJoinPool还有一种工作窃取算法,能够自动地调整线程的负载,提高线程的利用率和并行度。

  • ForkJoinPool还有一个特点,就是它的线程池大小是动态调整的。当任务比较小时,线程池的大小会自动缩小,从而减少线程的数量和占用的系统资源。当任务比较多时,线程池的大小会自动增加,从而保证任务能够及时地得到执行。

  • 如果使用ExcutorService来执行这些任务,需要手动地创建线程池、任务队列和任务执行策略,并且需要手动地处理任务的拆分和合并,实现起来比较复杂。

 End:希望对大家有所帮助,如果有纰漏或者更好的想法,请您一定不要吝啬你的赐教🙋。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/70403.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式-01简单工厂模式详解 详细代码对比

目录 ChatGpt问答原生代码简单工厂模式代码 简单工厂模式&#xff08;Simple Factory Pattern&#xff09;新增boat 对比两种方法原生代码为什么使用强制转换&#xff1f;简单工厂模式 简单工厂方法总结与原生代码的区别&#xff1a;优点:缺点&#xff1a; 参考 本文将介绍什么…

二维数组笔试题及其解析

Lei宝啊 &#xff1a;个人主页 愿所有美好不期而遇 前言&#xff1a; 数组名在寻常情况下表示首元素地址&#xff0c;但有两种情况例外&#xff1a; 1.sizeof(数组名)&#xff0c;这里的数组名表示整个数组&#xff0c;计算的是整个数组的大小 2.&数组名&#xff0c;这里的…

Hystrix和Sentinel熔断降级设计理念

目录 1 基本介绍2 Hystrix信号量和线程池区别2.1 信号量模式2.2 线程池模式2.3 注意 3 Sentinel介绍 1 基本介绍 Sentinel 和 Hystrix 的原则是一致的: 当检测到调用链路中某个资源出现不稳定的表现&#xff0c;例如请求响应时间长或异常比例升高的时候&#xff0c;则对这个资源…

目前无法建立VS2013与Qt的连接???

因为下载组件的时候&#xff0c;没有哪个选项&#xff0c;还是没有MSVC2013

微信小程序集成腾讯im,会话列表数据过多(长列表),卡顿问题的解决

说明 我这边用小程序集成im&#xff0c;然后结合公司的需求&#xff0c;做了一个聊天的小程序&#xff0c;在测试上线的时候没有问题&#xff0c;结果到客户那边&#xff0c;因为他们聊天的人多&#xff0c;会话列表达到了300多条&#xff0c;然后点击会话列表&#xff0c;进入…

C#,《小白学程序》第十六课:随机数(Random)第三,正态分布的随机数的计算方法与代码

1 文本格式 // 定义一个全局性&#xff08;公共&#xff09;的随机数发生器&#xff0c;便于大家&#xff08;各函数&#xff09;后面共同使用。 Random global_rnd new Random(); /// <summary> /// 生成服从标准正态分布的随机数 /// https://zhuanlan.zhihu.com/p/6…

Postern配置HTTP和HTTPS的步骤

Postern是一款强大的Android代理工具&#xff0c;它允许您在设备上配置全局代理来实现安全、隐私保护和自由上网。本文将详细介绍如何使用Postern在Android设备上配置HTTP和HTTPS代理&#xff0c;为您提供更便捷的上网体验。 步骤1&#xff1a;下载和安装Postern应用 首先&am…

pdf怎么合并在一起?几种方法快速合并

pdf怎么合并在一起&#xff1f;在处理PDF文件时&#xff0c;有时需要将多个PDF文件合并成一个文件。这种操作在日常学习、工作和生活中很常见。但是&#xff0c;如果没有专业的PDF工具&#xff0c;这项任务可能会变得非常繁琐、耗时和费力。因此&#xff0c;我们需要一款功能强…

Faster RCNN

【简介】 Faster RCNN[7]是第一个端到端&#xff0c;最接近于实时性能的深度学习检测算法&#xff0c;该网络的主要创新点就是提出了区域选择网络用于申城候选框&#xff0c;能几大提升检测框的生成速度。该网络首先输入图像到卷积网络中&#xff0c;生成该图像的特征映射。在特…

ActiveReportsJs 账票印刷

参考资料 官方文档 一. HTML部分 在页面上添加了Loading效果&#xff0c;账票印刷开始时显示Loading效果&#xff0c;印刷结束后隐藏Loading效果。ar-js-core.js是核心文件ar-js-pdf.js用来印刷PDFar-js-xlsx.js用来印刷EXCELar-js-locales.js用来设置语言 <!DOCTYPE htm…

优化SOCKS5的方法

在今天的互联网世界中&#xff0c;保护个人隐私和提升网络速度至关重要。作为一种常用的代理协议&#xff0c;SOCKS5代理服务器不仅可以保护您的隐私&#xff0c;还可以实现更快速的网络访问。本文将为您介绍一些优化SOCKS5代理服务器的方法&#xff0c;以提高网络速度和安全性…

阿里云2核4G服务器5M带宽五年租用价格表

阿里云2核4G服务器5M带宽可以选择轻量应用服务器或云服务器ECS&#xff0c;轻量2核4G4M带宽服务器297元一年&#xff0c;2核4G云服务器ECS可以选择计算型c7、c6或通用算力型u1实例等&#xff0c;买5年可以享受3折优惠&#xff0c;阿腾云分享阿里云服务器2核4G5M带宽五年费用表&…

Xcode,swift:Error Domain=kCLErrorDomain Code=1 (null)问题解决

问题描述: iOS开发时,当使用用户的位置权限时,获取用户经纬度报错:Error DomainkCLErrorDomain Code1 "(null)",错误域kCLError域代码1“(null)” 解决方法: 打开模拟机的设置-通用-语言与地区 将地区设置为中国(如果你的开发位置在中国的话) 点击左上方Features,选择…

发光太阳聚光器的蒙特卡洛光线追踪研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【大数据】Flink 详解(七):源码篇 Ⅱ

本系列包含&#xff1a; 【大数据】Flink 详解&#xff08;一&#xff09;&#xff1a;基础篇【大数据】Flink 详解&#xff08;二&#xff09;&#xff1a;核心篇 Ⅰ【大数据】Flink 详解&#xff08;三&#xff09;&#xff1a;核心篇 Ⅱ【大数据】Flink 详解&#xff08;四…

完成Centos上使用SSH公钥进行免密上传文件到gitee的步骤后,测试免密推送到gitee的时候还是需要输入邮箱和密码

如果你已经按照正确的步骤设置了SSH公钥并进行了免密测试&#xff0c;但仍然需要输入邮箱地址和密码才能推送到gitee&#xff0c;那么可能有以下几种原因&#xff1a; 您可能没有使用SSH URL来推送代码。请确保您使用的是SSH URL而不是HTTPS URL来推送代码。您可以使用命令 gi…

Opencv快速入门教程,Python计算机视觉基础

快速入门 OpenCV 是 Intel 开源计算机视觉库。它由一系列 C 函数和少量 C 类构成&#xff0c; 实现了图像处理和计算机视觉方面的很多通用算法。 OpenCV 拥有包括 300 多个 C 函数的跨平台的中、高层 API。它不依赖于其它的外部库——尽管也 可以使用某些外部库。 OpenCV 对非…

弹窗、抽屉、页面跳转区别 | web交互入门

当用户点击或触发浏览页面的某个操作&#xff0c;有很多web交互方式&#xff0c;可以大致分为弹窗、抽屉、跳转新页面三种web交互方式。虽然这三种web交互方式看起来没什么不同&#xff0c;但实际上弹窗、抽屉、跳转新页面对交互体验有蛮大的影响。 这需要UI\UX设计师针对不同…

Liquid Studio 2023.2 Crack

Liquid Studio 提供了用于XML和JSON开发 的高级工具包以及Web 服务测试、数据映射和数据转换工具。 开发环境包含一整套用于设计 XML 和 JSON 数据结构和模式的工具。这些工具提供编辑、验证和高级转换功能。对于新手或专家来说&#xff0c;直观的界面和全面的功能将帮助您节省…

Nginx全家桶配置详解

源码包安装NGINX A&#xff0c;搭建Web Server&#xff0c;任意HTML页面&#xff0c;其8080端口提供Web访问服务&#xff0c;截图成功访问http(s)&#xff1a;//[Server1]:8080并且回显Web页面。保留Server1&#xff0c;但是不允许直接访问Server 1&#xff0c;再部署1套NGINX …