CompletableFuture温故
- 一、前言
- 二、Future
- 三、CompletableFuture
- 3.1 CompletableFuture定义
- 3.2 CompletableFuture使用场景
- 3.3 CompletableFuture 常见操作
- 3.3.1 创建CompletableFuture
- 3.3.2 使用CompletableFuture
- 3.3.3 异常处理
- 3.3.4 注意事项
- 四、CompletableFuture处理工具类
- 五、异步化收益
一、前言
最近项目上线完成,但随着用户增加和金融产品买卖量持续上升,算法取数接口的特征工程计算作为推荐系统的核心环节,对系统吞吐量的要求越来越高,对内调度各个数据源取数服务获取数据进行聚合,具有鲜明的I/O密集型(I/O Bound)特点。从而将同步加载改为并行加载的可行性方案。
下面就从一个简单案例讲起。
项目开发中有很常见的逻辑:一个接口可能需要调用多个其他服务的接口。例如:用户请求获取订单信息,可能需要调取用户信息、商品详情、物流信息、商品推荐等接口,最后汇总处理数据统一返回:
public class OrderServiceImpl im OrderService{@Autowiredprivate UserService userService; // 用于获取用户信息@Autowiredprivate ProductService productService; // 用于获取商品详情@Autowiredprivate LogisticsService logisticsService; // 用于获取物流信息@Autowiredprivate relService relService; // 商品推荐信息public OrderDTO getOrderInfo(Long orderId) {// 假设这里从数据库获取订单信息Order order = getOrderFromDatabase(orderId);OrderDTO orderDTO = new OrderDTO();BeanUtils.copyProperties(order, orderDTO);// 获取并设置用户信息orderDTO.setUser(userService.getUserById(order.getUserId()));// 耗时任务:获取并设置商品详情orderDTO.setProducts(productService.getProductsByIds(order.getProductIds()));// 获取并设置物流信息orderDTO.setLogistics(logisticsService.getLogisticsInfo(order.getLogisticsNumber()));// 耗时任务:推荐商品,召回客户orderDTO.setRel(relService.recall(orderId));return orderDTO;}
}
栗子中是串行(按顺序依次执行)执行任务,接口的响应速度非常慢。
这些接口之间的数据请求大多是无前后顺序关联 的,可并行执行 ,例如调用获取商品详情的时候,可同时调用获取物流信息。并行执行多个任务,接口的响应速度就可提升。
对于有顺序执行的任务,可进行任务编排,例如:
- 获取用户信息之后,才能调用商品详情和物流信息接口。
- 成功获取商品详情和物流信息之后,才能调用商品推荐接口。
故需要使用异步编程工具帮助实现多个任务的编排。
二、Future
在Java8之前一般通过Future实现异步,主要用于需要执行耗时任务的场景,避免程序一直等待耗时任务执行完成,导致执行效率太低问题。
但只能通过阻塞或者轮询的方式获取结果,获取计算结果的 get() 方法为阻塞调用,而且不支持设置回调方法,不支持异步任务的编排组合,Java8之前若要设置回调,通常使用guava的ListenableFuture,又会导致回调地狱,加深阅读和实现的难度。
例如使用ListenableFuture实现(回调地狱):三个操作step1、step2、step3存在依赖关系,其中step3的执行依赖step1和step2的结果
ExecutorService executor = Executors.newFixedThreadPool(5);ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executor);ListenableFuture<String> future1 = guavaExecutor.submit(() -> {//step 1System.out.println("执行step 1");return "step1 result";
});ListenableFuture<String> future2 = guavaExecutor.submit(() -> {//step 2System.out.println("执行step 2");return "step2 result";
});ListenableFuture<List<String>> future1And2 = Futures.allAsList(future1, future2);Futures.addCallback(future1And2, new FutureCallback<List<String>>() {@Overridepublic void onSuccess(List<String> result) {System.out.println(result);ListenableFuture<String> future3 = guavaExecutor.submit(() -> {System.out.println("执行step 3");return "step3 result";});Futures.addCallback(future3, new FutureCallback<String>() {@Overridepublic void onSuccess(String result) {System.out.println(result);} @Overridepublic void onFailure(Throwable t) {}}, guavaExecutor);}@Overridepublic void onFailure(Throwable t) {}}, guavaExecutor);
而使用CompletableFuture实现:
ExecutorService executor = Executors.newFixedThreadPool(5);CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println("执行step 1");return "step1 result";
}, executor);CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {System.out.println("执行step 2");return "step2 result";
});cf1.thenCombine(cf2, (result1, result2) -> {System.out.println(result1 + " , " + result2);System.out.println("执行step 3");return "step3 result";
}).thenAccept(System.out::println);
显然,CompletableFuture的实现更为简洁,可读性更好,无需引入外部依赖。
三、CompletableFuture
ompletableFuture对Future进行扩展,可通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,一定程度解决回调地狱问题。
3.1 CompletableFuture定义
CompletableFuture 类的定义:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
CompletableFuture实现两个接口(如图):Future、CompletionStage。Future表示异步计算的结果,CompletionStage用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另一个CompletionStage触发的,若当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。从而可以根据实际业务对这些步骤进行多样化的编排组合。
Future 接口有 5 个方法:
boolean cancel(boolean mayInterruptIfRunning):尝试取消执行任务。
boolean isCancelled():判断任务是否被取消。
boolean isDone():判断任务是否已经被执行完成。
get():等待任务执行完成并获取运算结果。
get(long timeout, TimeUnit unit):多一个超时时间。
CompletionStage 接口中的方法比较多,CompletableFuture 的函数式能力就是这个接口赋予的。
3.2 CompletableFuture使用场景
- IO密集型操作:如数据库操作、文件读写、网络请求等,可使用 CompletableFuture 异步执行,避免阻塞主线程。
- 计算密集型操作:如果计算可以在不同的线程中并行执行,使用 CompletableFuture 可以提高计算效率。
- 异步编程模式:在需要处理多个异步操作的结果,并且这些操作之间存在依赖关系时,CompletableFuture 可提供解决方案。
3.3 CompletableFuture 常见操作
3.3.1 创建CompletableFuture
创建 CompletableFuture 对象的方法:
- 通过 new 关键字。
CompletableFuture<Result<T>> resultFuture = new CompletableFuture<>();
// 在某些条件下手动完成Future
if (checkCondition()) {// complete() 方法只能调用一次,后续调用将被忽略。resultFuture.complete(rpcResponse);rpcResponse = completableFuture.get();
}
- 基于 CompletableFuture 自带的静态工厂方法:runAsync()、supplyAsync() 。
// supplyAsync()方法需要一个Supplier函数接口,通常用于执行异步计算。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 模拟耗时的计算simulateTask("数据加载中");return "结果";
});//runAsync()方法接受一个Runnable函数接口,不返回任何结果。不关心异步任务的结果,只想执行一个异步操作
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {simulateTask("正在执行一些处理");
});// 也可用自定义线程池执行任务
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 3,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardOldestPolicy());
CompletableFuture.runAsync(() -> System.out.println("Hello World!"), pool);
3.3.2 使用CompletableFuture
1、组合调用:CompletableFuture的组合能力。假设有两个独立的异步任务,组合使用:
ompletableFuture<User> future1 = CompletableFuture.supplyAsync(() -> {// 加载用户数据return userService.getUserInfo(userId);
});CompletableFuture<Product> future2 = CompletableFuture.supplyAsync(() -> {// 获取用户购买的产品return productService.getUserBuyProd(userId);
});// 组合两个future,等待它们都完成
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (user, product) -> {return "组合处理结果: " + user::getId + "购买产品:" + product::getProdName;
});combinedFuture.thenAccept(System.out::println);// 若一个异步操作依赖于另一个异步操作的结果
CompletableFuture<String> masterFuture = CompletableFuture.supplyAsync(() -> {simulateTask("获取主数据");return "主数据结果";
});CompletableFuture<String> dependentFuture = masterFuture.thenCompose(result -> {return CompletableFuture.supplyAsync(() -> {simulateTask("处理依赖于" + result + "的数据");return "处理后的数据";});
});// 需要等所有操作都完成后再进行下一步
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);allFutures.thenRun(() -> {System.out.println("所有任务完成");
});
2、链式调用:把系列操作依次执行,前一个操作的结果作为下一个操作的输入。
CompletableFuture支持多种链式调用方法,比如thenApply, thenAccept和thenRun。
thenApply()用于处理和转换CompletableFuture的结果。
thenAccept()用于消费CompletableFuture的结果,不返回新的CompletableFuture。
thenRun()则不关心前一个任务的结果,只是在前一个任务执行完后,执行一些后续操作。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {simulateTask("查询数据库");return "查询结果";
});future.thenApply(result -> {// 对结果进行处理return "处理后的结果:" + result;
}).thenAccept(processedResult -> {// 消费处理后的结果System.out.println("最终结果:" + processedResult);
}).thenRun(() -> {// 执行一些不需要前一个结果的操作System.out.println("所有操作完成");
});
用supplyAsync启动一个异步任务来查询数据库。然后用thenApply处理查询结果,用thenAccept消费处理后的结果,最后用thenRun标记所有操作完成。
3.3.3 异常处理
1、基本异常处理:抛出异常,exceptionally方法就会被调用,返回一个包含错误信息的回退结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (new Random().nextBoolean()) {throw new RuntimeException("出错啦!");}return "正常结果";
}).exceptionally(ex -> {return "错误的回退结果:" + ex.getMessage();
});future.thenAccept(System.out::println);
2、细粒度异常处理:无论异步操作是成功还是失败,handle方法都会被调用。如果有异常,它会处理异常;如果没有,就处理正常结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (new Random().nextBoolean()) {throw new RuntimeException("出错啦!");}return "正常结果";
}).handle((result, ex) -> {if (ex != null) {return "处理异常:" + ex.getMessage();}return "处理结果:" + result;
});future.thenAccept(System.out::println);
3、管道式异常处理:把多个异步操作链接起来,并在链的任意位置处理异常。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 第一个异步操作return "第一步结果";
}).thenApply(result -> {// 第二个异步操作,可能会出错throw new RuntimeException("第二步出错啦!");
}).exceptionally(ex -> {// 处理异常return "在第二步捕获异常:" + ex.getMessage();
}).thenApply(result -> {// 第三个异步操作return "第三步使用结果:" + result;
});future.thenAccept(System.out::println);
4、组合异步操作时的错误处理
当组合多个CompletableFuture时,需对每一个Future都进行错误处理。避免一个未捕获的异常破坏整个操作链。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "任务1").exceptionally(ex -> "默认值1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "任务2").exceptionally(ex -> "默认值2");CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " 和 " + result2);
3.3.4 注意事项
1、避免过多的链式调用
过度使用可能会导致代码难以理解和维护。建议把复杂的逻辑分解成多个方法或类:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "原始数据").thenApply(this::step1).thenApply(this::step2).thenApply(this::step3);// 将每个步骤的逻辑封装在不同的方法中
private String step1(String data) {return "处理1:" + data;
}private String step2(String data) {return "处理2:" + data;
}private String step3(String data) {return "处理3:" + data;
}
2、谨慎处理阻塞操作
若CompletableFuture链中包含阻塞调用,如数据库操作或文件I/O,最好将这些操作放在独立的线程池中,避免阻塞ForkJoinPool中的线程。
ExecutorService dbExecutor = Executors.newCachedThreadPool();CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {// 这里是阻塞的数据库操作simulateTask("数据库操作");
}, dbExecutor);// 或者
private ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());CompletableFuture.runAsync(() -> {//...
}, executor);
3、尽量避免使用 get()
CompletableFuture的get()方法是阻塞的,尽量避免使用。如果必须要使用的话,需要添加超时时间,否则可能会导致主线程一直等待,无法执行其他任务。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10_000);} catch (InterruptedException e) {e.printStackTrace();}return "Hello, world!";});// 获取异步任务的返回值,设置超时时间为 5 秒try {String result = future.get(5, TimeUnit.SECONDS);System.out.println(result);} catch (InterruptedException | ExecutionException | TimeoutException e) {// 处理异常e.printStackTrace();}
}
四、CompletableFuture处理工具类
/*** CompletableFuture封装工具类*/
@Slf4j
public class FutureUtils {
/*** 该方法为美团内部rpc注册监听的封装,可以作为其他实现的参照* OctoThriftCallback 为thrift回调方法* ThriftAsyncCall 为自定义函数,用来表示一次thrift调用(定义如上)*/
public static <T> CompletableFuture<T> toCompletableFuture(final OctoThriftCallback<?,T> callback , ThriftAsyncCall thriftCall) {CompletableFuture<T> thriftResultFuture = new CompletableFuture<>();callback.addObserver(new OctoObserver<T>() {@Overridepublic void onSuccess(T t) {thriftResultFuture.complete(t);}@Overridepublic void onFailure(Throwable throwable) {thriftResultFuture.completeExceptionally(throwable);}});if (thriftCall != null) {try {thriftCall.invoke();} catch (TException e) {thriftResultFuture.completeExceptionally(e);}}return thriftResultFuture;
}/*** 设置CF状态为失败*/public static <T> CompletableFuture<T> failed(Throwable ex) {CompletableFuture<T> completableFuture = new CompletableFuture<>();completableFuture.completeExceptionally(ex);return completableFuture;}/*** 设置CF状态为成功*/public static <T> CompletableFuture<T> success(T result) {CompletableFuture<T> completableFuture = new CompletableFuture<>();completableFuture.complete(result);return completableFuture;}/*** 将List<CompletableFuture<T>> 转为 CompletableFuture<List<T>>*/public static <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])).thenApply(v -> completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()));}/*** 将List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>* 多用于分页查询的场景*/public static <T> CompletableFuture<List<T>> sequenceList(Collection<CompletableFuture<List<T>>> completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])).thenApply(v -> completableFutures.stream().flatMap( listFuture -> listFuture.join().stream()).collect(Collectors.toList()));}/** 将List<CompletableFuture<Map<K, V>>> 转为 CompletableFuture<Map<K, V>>* @Param mergeFunction 自定义key冲突时的merge策略*/public static <K, V> CompletableFuture<Map<K, V>> sequenceMap(Collection<CompletableFuture<Map<K, V>>> completableFutures, BinaryOperator<V> mergeFunction) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])).thenApply(v -> completableFutures.stream().map(CompletableFuture::join).flatMap(map -> map.entrySet().stream()).collect(Collectors.toMap(Entry::getKey, Entry::getValue, mergeFunction)));}/*** 将List<CompletableFuture<T>> 转为 CompletableFuture<List<T>>,并过滤调null值*/public static <T> CompletableFuture<List<T>> sequenceNonNull(Collection<CompletableFuture<T>> completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])).thenApply(v -> completableFutures.stream().map(CompletableFuture::join).filter(e -> e != null).collect(Collectors.toList()));}/*** 将List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>,并过滤调null值* 多用于分页查询的场景*/public static <T> CompletableFuture<List<T>> sequenceListNonNull(Collection<CompletableFuture<List<T>>> completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])).thenApply(v -> completableFutures.stream().flatMap( listFuture -> listFuture.join().stream().filter(e -> e != null)).collect(Collectors.toList()));}/*** 将List<CompletableFuture<Map<K, V>>> 转为 CompletableFuture<Map<K, V>>* @Param filterFunction 自定义过滤策略*/public static <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> completableFutures,Predicate<? super T> filterFunction) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])).thenApply(v -> completableFutures.stream().map(CompletableFuture::join).filter(filterFunction).collect(Collectors.toList()));}/*** 将List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>* @Param filterFunction 自定义过滤策略*/public static <T> CompletableFuture<List<T>> sequenceList(Collection<CompletableFuture<List<T>>> completableFutures,Predicate<? super T> filterFunction) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])).thenApply(v -> completableFutures.stream().flatMap( listFuture -> listFuture.join().stream().filter(filterFunction)).collect(Collectors.toList()));}
/*** 将CompletableFuture<Map<K,V>>的list转为 CompletableFuture<Map<K,V>>。 多个map合并为一个map。 如果key冲突,采用新的value覆盖。*/public static <K, V> CompletableFuture<Map<K, V>> sequenceMap(Collection<CompletableFuture<Map<K, V>>> completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])).thenApply(v -> completableFutures.stream().map(CompletableFuture::join).flatMap(map -> map.entrySet().stream()).collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> b)));}}
五、异步化收益
通过异步化改造,推荐系统的性能得到明显提升,与改造前对比的收益如下:
- 核心接口吞吐量大幅提升,其中产品买卖轮询接口的TP99从865ms降为408ms。
- 取数服务器数量减少1/3。