一、什么是CompletableFuture?
CompletableFuture 是Java 8引入的一个强大的并发工具类,它是Future接口的扩展实现。它提供了更丰富的异步编程模型和功能,允许开发者以非阻塞的方式处理异步计算的结果,并且可以将多个异步任务链式组合起来形成复杂的流程。
二、主要特点
1. 异步执行:
通过supplyAsync()、runAsync()等方法可以提交一个Runnable或Callable到线程池中异步执行。
2. 结果获取:
与传统的Future类似,可以通过get()方法来阻塞地获取异步任务的结果,或者使用thenApply(), thenAccept(), thenRun()等方法在前一个异步操作完成后进行后续处理,这些方法都不会阻塞当前线程。
3. 异常处理:
提供.exceptionally()方法来捕获并处理Future执行过程中可能抛出的异常。
4. 组合异步操作:
支持多种组合方式,如:
- .thenCompose()用于基于前一个Future的结果启动另一个异步任务。
- .allOf()用于等待一组CompletableFuture都完成。
- .anyOf()用于等待一组CompletableFuture中的任何一个完成。
5. 流式API:
其设计鼓励函数式的编程风格,使得代码更加简洁易读。
6. 取消操作:
支持通过cancel(boolean mayInterruptIfRunning)方法取消正在进行的异步任务。
三、实例
1、以同时获取两个ETA,并将结果相加为例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class ETAExample {// 假设这是异步获取ETA的方法public static int getETA() {try {Thread.sleep(1000); // 模拟耗时操作} catch (InterruptedException e) {Thread.currentThread().interrupt();}return new Random().nextInt(100); // 返回一个模拟的ETA值}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> getETA());CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> getETA());CompletableFuture<Integer> combinedFuture = CompletableFuture.allOf(future1, future2).thenApply(v -> future1.join() + future2.join());int finalETA = combinedFuture.get(); // 获取最终的ETA之和System.out.println("Final ETA: " + finalETA);}
}
.supplyAsync(),用于创建并执行两个异步任务来获取ETA;
.allof(),当两个获取ETA的CompletableFuture完成后,返回一个新的、已完成状态的 CompletableFuture。
.thenApply(),等.allof()执行完后,执行.thenApply()方法。本文中是将两个ETA加和;
.join(),CompletableFuture完成时返回结果值。执行过程中如果发生异常,会抛出一个unchecked异常。
2、异常处理
2.1 .exceptionally(Function<Throwable, T>)
.exceptionally()可以为Future在执行过程中可能抛出的任何异常提供一个恢复策略。参数是一个函数,它接受一个Throwable对象(即异常),并返回一个替代的结果值。当Future由于异常而未能正常完成时,该函数会被调用,并且返回值将作为Future的新结果。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {if (/* 某些条件导致异常 */) {throw new RuntimeException("获取ETA失败");}return getETA(); // 假设这个方法会返回一个整数类型的ETA
});future.exceptionally(throwable -> {System.out.println("发生异常: " + throwable.getMessage());return -1; // 返回一个默认的ETA值或错误码
});int result = future.get(); // 此时无论是否发生异常,都会得到一个非null的结果
2.2 .handle(BiFunction<? super T, Throwable, U>)
这个方法更通用,它同时处理成功和失败的情况。
参数是一个BiFunction,其第一个参数是正常的计算结果(如果有的话),第二个参数是可能出现的异常。不管是正常结束还是异常终止,handle方法都会被执行,它可以统一处理两种情况下的结果。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> getETA());// 使用 handle 方法统一处理成功和失败的结果
future.handle((result, throwable) -> {if (throwable != null) { // 处理异常情况System.out.println("发生异常: " + throwable.getMessage());return -1;} else { // 处理正常情况return result * 2; // 假设这里对正常结果进行某种操作后再返回}
});Integer finalResult = future.get(); // 获取最终处理后的结果
2.3 try ... catch
此方法比较常见,本文就不再赘述了,大家可移步:【Java基础】[异常处理]try,catch,finally_catch块完全有可能得不到执行-CSDN博客
【JAVA基础】[异常处理]项目中悄无声息的RuntimeException_runtimeexception生成随机序列化的id-CSDN博客
以上异常处理的方法,都可以帮我们保证系统的健壮性。
3、线程池:ForkJoinPool.commonPool()
前面我们说过,.supplyAsync(),用于创建并执行两个异步任务来获取ETA,那这两个任务由哪个线程池管理并执行呢?答: ForkJoinPool.commonPool();
ForkJoinPool.commonPool()不同于ThreadPoolExecutor,ForkJoinPool.commonPool()利用工作窃取机制,如果一个线程在其本地队列上没有找到任务时,它会尝试从其他线程的队列中“窃取”任务来执行。后面我计划单独出一篇文章来介绍。
ForkJoinPool.commonPool()是一个全局共享的线程池,它为整个JVM进程提供服务。当在同一个工程中,多个线程或任务使用CompletableFuture.supplyAsync(...)方法时,如果不指定自
定义的ExecutorService,那么默认情况下这些异步计算任务会提交到这个公共的ForkJoinPool中执行。这意味着所有依赖于commonPool()的任务将会共享同一组工作线程来并发执行。如果这些任务是CPU密集型且数量很大,可能会导致线程池资源紧张,从而影响性能。
对于I/O密集型任务或者短生命周期的任务,公共池通常可以很好地复用线程资源。然而,在特定场景下,比如需要对线程数进行精细化控制、确保优先级或者隔离不同类型的计算任务时,建议创建并指定自定义的ForkJoinPool或其它类型的ExecutorService来处理异步任务。
从官方文档中,我们可以看到,给出了两个异步执行方法:
1)static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
返回一个新的CompletableFuture,它是由ForkJoinPool.commonPool()中运行的任务异步完成的,其值通过调用给定的Supplier获得。
2)static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
返回一个新的CompletableFuture,它由给定执行器中运行的任务异步完成,其值通过调用给定的Supplier获得。
使用2),我们可以传入自定义的Executor。
四、使用过程中的注意事项
1. 异常处理:
由于CompletableFuture的异步操作可能抛出异常,因此需要正确处理get()方法调用或者通过.exceptionally()、.handle()等方法捕获和处理可能出现的异常。前面我们结合实例也介绍过了。
2. Future的完成状态不可变:
一旦CompletableFuture完成了(成功或失败),其结果是不可变的。所以我们不能更改已完成Future的结果。
3. 线程池配置:
默认情况下,supplyAsync()会使用ForkJoinPool.commonPool()执行任务,所以如果我们的任务数量非常大或者执行时间较长,可能会对公共线程池造成压力,不同的业务之前可能引起资源竞争。在这种情况下,应该自定义ExecutorService来管理并发任务。
4. 取消任务:
如果需要支持取消异步操作,可以调用CompletableFuture.cancel(true),但要确保任务是可以被中断的,并且在内部逻辑中检查Thread.currentThread().isInterrupted()以响应中断请求。
5. 链式操作顺序:
使用.thenApply(), .thenAccept(), .thenCompose()等方法构建异步计算流程时,注意这些操作的执行顺序与它们在链中的位置保持一致。
6. 资源管理:
在异步操作中打开的任何资源(如数据库连接、文件句柄)应在操作完成后正确关闭或释放。
7. allOf() vs anyOf():
确保使用正确的组合器。CompletableFuture.allOf()等待所有Future都完成,而anyOf()只要任何一个完成就返回。这些就是字面意思,我们就不再赘述了。
8. 避免阻塞主线程:
尽量避免在主线程或其他关键路径上直接调用get()阻塞等待结果。如果必须同步等待,考虑使用join()方法,它不会抛出InterruptedException。
9. 超时处理:
可以结合CompletableFuture.get(long, TimeUnit)设置超时时间,防止因某个异步任务长时间未完成而导致整个程序挂起。
五、总结
以上我们简要的了解了CompletableFuture的主要功能和在并发计算中的应用,我们可以看到CompletableFuture极大地简化了Java中的异步编程模型,提高了代码可读性和程序的响应能力,是构建高效、灵活并发应用的重要组件之一。后面将在应用中不断使用和加深理解。
参考:
Java Platform SE 8 ,GPT