文章目录
- CompletableFuture^1.8+^
- CompletionStage 接口
- thenApply 系列
- thenAccept 系列
- thenRun 系列
- thenCombine 系列
- thenAcceptBoth
- runAfterBoth
- applyToEither
- acceptEither
- runAfterEither
- thenCompose
- whenComplete
- handle
- 其他
- CompletionStage 的方法总结
- CompletableFuture 实例化
- CompletableFuture 其他 API 方法
- complete 和 completeExceptionally
- CompletableFuture 示例
CompletableFuture1.8+
CompletionStage 接口
thenApply 系列
/*** 当该阶段正常完成时,该阶段将以该阶段的结果作为所提供函数的参数来执行,并返回新的 CompletionStage 对象*** @param fn Function 函数,函数的入参是当前阶段的结果* @param <U> Function 函数的返回值类型* @return 新的 CompletionStage 对象 */public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);/*** 当该阶段正常完成时,该阶段将以该阶段的结果作为所提供函数的参数来执行,并返回新的 CompletionStage 对象* 与 thenApply 的不同在于,异步执行** @param fn Function 函数,函数的入参是当前阶段的结果* @param <U> Function 函数的返回值类型* @return 新的 CompletionStage 对象*/public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);/*** 当该阶段正常完成时,该阶段将以该阶段的结果作为所提供函数的参数来执行,并返回新的 CompletionStage 对象* 与 thenApply 的不同在于,异步执行** @param fn Function 函数,函数的入参是当前阶段的结果* @param <U> Function 函数的返回值类型* @param executor 执行阶段任务的线程池对象* (不传,默认使用 CompletableFuture 实例化时传入的线程池,CompletableFuture 实例化时,默认为:ForkJoinPool.commonPool())* @return 新的 CompletionStage 对象*/public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
thenAccept 系列
/*** 与 thenApply 一样,只是接受的参数为 Consumer* 返回的 CompletionStage 泛型为 Void *** @param action 返回 CompletionStage 之前要执行的操作* @return the new CompletionStage*/public CompletionStage<Void> thenAccept(Consumer<? super T> action);/*** 异步执行*/public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);/*** 自定义线程池*/public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
thenRun 系列
/*** 与 thenApply 一样,只是接受 Runnable 类型的参数** @param action Runnable 类型,表示在该阶段返回新的 CompletionStage<Void> 对象之前执行的任务*/public CompletionStage<Void> thenRun(Runnable action);/*** 异步执行public CompletionStage<Void> thenRunAsync(Runnable action);/*** 自定义线程池*/public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
thenCombine 系列
/*** 返回一个新的 CompletionStage<V>* 用于结合两个 CompletionStage 对象的返回值** @param other 另一个 CompletionStage * @param fn BiFunction 类型的函数式接口(当前阶段的结果 + other 对象的结果作为函数的参数)* @param <U> other CompletionStage 的返回值类型* @param <V> BiFunction 类型的函数式接口的返回值类型* @return 新的 CompletionStage 对象*/public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
thenAcceptBoth
/*** 执行的函数式接口为:BiConsumer*/public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,Executor executor);
runAfterBoth
/*** 执行的接口为 Runnable*/public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
applyToEither
/*** 等待两个阶段正常完成,则将当前阶段的结果传入 Function 函数执行*/public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
acceptEither
/*** 同 ApplyEither ,执行函数接口为 Consumer*/public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
runAfterEither
/*** 同 ApplyEither ,执行接口为 Runnable*/public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
thenCompose
/**** @param fn 函数接口,该接口返回新的 CompletionStage* @param <U> 返回的 CompletionStage 对象的泛型* @return 新的CompletionStage*/public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor);
whenComplete
/*** 无论当前解决正常还是异常结束,都会执行 BiConsumer 函数接口,* BiConsumer 函数接口第一个参数为当前阶段的结果(如果没有则为 null)* 第二个参数为当前阶段抛出的异常(如果没有则为 null)*/public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
handle
/*** 与 whenComplete 一样,但执行的函数接口为 BiFunction*/public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
其他
/*** 当当前阶段计算异常完成时,执行该方法,并将异常作为参数传入函数接口* * @param fn Function 类型的函数接口* @return 返回新的 CompletionStage 对象*/public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);/*** CompletionStage 转换为 CompletableFuture 对象*/public CompletableFuture<T> toCompletableFuture();
如果某个阶段执行过程中出现异常,我们又没有通过 exceptionally 方法处理的话,CompletableFuture 将抛出 CompletionException 并停止运行。所以在某些情况下(某些异常不能影响之后的程序运行),我们需要使用 exceptionally 方法来处理阶段异常。
CompletionStage 的方法总结
- 几乎所有的方法都返回了 CompletionStage ,这是链式调用的特性
- 所有的方法参数都使用了函数式接口,一是可以 Lambda 表达式,二是根据不同的函数式接口的特性达到不同的执行要求
- 不带 Async 的方法为同步方法,带 Async 的方法为异步方法
- 这些方法分为 3 大类
- 阶段成功才执行函数式接口(函数式接口的参数为当前阶段的结果)
- 阶段失败才执行函数式接口(函数式接口的参数为当前阶段出现的异常)
- 无论阶段是否执行成功都会执行函数式接口(此类方法的函数式接口参数第一个参数为当前阶段返回值,第二个参数为当前阶段出现的异常)
- 各个类型的方法都可以传入一个自定义的线程池。这可以避免所有的异步执行都使用同一个线程池,造成线程的争用。
CompletableFuture 实例化
CompletableFuture 有一个无参构造函数,但使用 CompletableFuture 时,不要通过该无参构造函数实例化 CompletableFuture 对象。我们正常应该使用如下几个静态方法获得 CompletableFuture 实例化对象
// 当此值为真,则默认使用 ForkJoinPool.commonPool(),否则是 ThreadPerTaskExecutorprivate static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);/*** 默认线程池 -- ForkJoinPool.commonPool() * ThreadPerTaskExecutor 是每次提交 new Thread 的方式* 正常情况下,默认线程池就是 ForkJoinPool.commonPool() */private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {// asyncPool 线程池,默认就是 ForkJoinPool.commonPool()return asyncSupplyStage(asyncPool, supplier);}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);}public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);}public static <U> CompletableFuture<U> completedFuture(U value) {return new CompletableFuture<U>((value == null) ? NIL : value);}
CompletableFuture 其他 API 方法
// 等待计算完成获得结果
public T get() throws InterruptedException, ExecutionException// 等待计算完成获得结果,有最大等待时间
public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException// 等待计算完成获得结果 抛出 RuntimeException
public T join()// 直接获得结果,如果调用时计算没有完成,则使用参数值直接返回
public T getNow(T valueIfAbsent)// 如果调用时,计算阶段没有完成,则将get的返回值设置为 value
// 调用时计算已经完成,返回 true 否则,返回 false
public boolean complete(T value)// 如果调用时,计算阶段没有完成,则抛出给定的异常
// 调用时计算已经完成,返回 true 否则,返回 false
public boolean completeExceptionally(Throwable ex)// complete 和 completeExceptionally 如果同时调用,那么只有先调用的方法有效
complete 和 completeExceptionally
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(()->{try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return 1;});boolean success = cf.complete(2);boolean exSuccess = cf.completeExceptionally(new RuntimeException("测试异常"));log.debug("{}",success);log.debug("{}",exSuccess);log.debug("{}",cf.join());
执行结果:
11:42:07.657 [main] DEBUG com.yyoo.thread.future.Test1CompletableFuture - true
11:42:07.659 [main] DEBUG com.yyoo.thread.future.Test1CompletableFuture - false
11:42:07.660 [main] DEBUG com.yyoo.thread.future.Test1CompletableFuture - 2
CompletableFuture 示例
我们再用前面 FutureTask 实现的示例,造一台跑车需要5个任务,1.造骨架(1s);2. 造发动机(5s);3. 造轮胎(1s); 4. 组装(2s);用 CompletableFuture 实现如下:
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;@Slf4j
public class TestCompletableFuture {public static void main(String[] args) {// 造骨架CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {log.debug("制造骨架");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "无梁式骨架";});// 造发动机CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {log.debug("制造发动机");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "2.0T 引擎";});// 造轮胎CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {log.debug("制造轮胎");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "米其林轮胎";});System.out.println("前 3 个任务都已经启动,等待他们完成");// 以上三个任务都是异步执行的,且没有先后顺序// 此时我们需要使用 CompletableFuture 的静态方法 allOf,来确定以上3个任务已经执行完毕CompletableFuture<Void> d = CompletableFuture.allOf(task1,task2,task3);// 等待前3个任务执行完成CompletableFuture<Car> rsTask = d.thenApplyAsync((v) ->{log.debug("v 的值应该是 null:{}",v);log.debug("开始组装汽车");Car car = Car.assemble(task1.join(),task2.join(),task3.join());return car;});log.debug("造车完毕:" + rsTask.join());}@Datastatic class Car{/*** 骨架*/private String skeleton;/*** 发动机*/private String engine;/*** 轮胎*/private String tire;private Car(){}/*** 组装方法*/public final static Car assemble(String skeleton, String engine, String tire){Car car = new Car();car.setSkeleton(skeleton);car.setEngine(engine);car.setTire(tire);try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return car;}}}
CompletableFuture 的优势在于,使用起来是真的简单,而且可以很简单的组织各个任务之间的关系。比如我们新增一个 喷漆(1s) 的任务,它在 组装之前运行,但需要在骨架造好之后才能执行。此时我们只需要在 task1 的基础上添加一个 喷漆的任务就行
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;@Slf4j
public class TestCompletableFuture {public static void main(String[] args) {// 造骨架CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {log.debug("制造骨架");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "无梁式骨架";}).thenApply((skeleton) -> {log.debug("新增的喷漆操作");return "红色的" + skeleton;});// 造发动机CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {log.debug("制造发动机");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "2.0T 引擎";});// 造轮胎CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {log.debug("制造轮胎");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "米其林轮胎";});System.out.println("前 3 个任务都已经启动,等待他们完成");// 以上三个任务都是异步执行的,且没有先后顺序// 此时我们需要使用 CompletableFuture 的静态方法 allOf,来确定以上3个任务已经执行完毕CompletableFuture<Void> d = CompletableFuture.allOf(task1,task2,task3);// 等待前3个任务执行完成CompletableFuture<Car> rsTask = d.thenApplyAsync((v) ->{log.debug("v 的值应该是 null:{}",v);log.debug("开始组装汽车");Car car = Car.assemble(task1.join(),task2.join(),task3.join());return car;});log.debug("造车完毕:" + rsTask.join());}@Datastatic class Car{/*** 骨架*/private String skeleton;/*** 发动机*/private String engine;/*** 轮胎*/private String tire;private Car(){}/*** 组装方法*/public final static Car assemble(String skeleton, String engine, String tire){Car car = new Car();car.setSkeleton(skeleton);car.setEngine(engine);car.setTire(tire);try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return car;}}}
类似这样的工序组合,FutureTask 实现起来就比较困难了,但是我们使用 CompletableFuture 相对比较轻松,这也是 JDK 1.8 为什么又添加 CompletableFuture 的原因。