一、概述
CompletableFuture 是Java 8 中引入的 Java Future API的扩展,用于 Java 中的异步编程,它可以使我们的任务运行在与主线程分离的其他线程中,并通过回调在主线程中得到异步任务执行状态,包括是否完成,是否异常等信息。
这样,主线程不会阻塞/等待任务的完成,它可以并行执行其他任务。拥有这种并行性极大地提高了程序的性能。
二、为什么要引入CompletableFuture?
在一些业务场景中我们需要使用多线程异步执行任务,所以Java 1.5 推出的Callable和Future接口解决这个问题,但是因为Future有几个局限:
1.Future的get方法会导致主线程阻塞
2.轮询获取结果会消耗cpu资源。
3.多个Future任务不能按照顺序执行。
4.Future Api无异常处理。
举个例子:比如公司需要发货某样商品,发现库存不足,通知采购去采购一些商品的需求。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;/*** 发货Future实现*/
public class SendGoods {/*** 主线程为发货商品** @param args*/public static void main(String[] args) {System.out.println("发货start");ExecutorService executorService = Executors.newFixedThreadPool(1);FutureTask<String> futureTask = new FutureTask<>(() -> {Thread.sleep(1000);return "采购的商品";});//发现库存不够,提交异步任务(采购货物)executorService.submit(futureTask);/*** 方法1* 局限:导致线程堵塞*/try {//获取采购的商品(堵塞线程)String goods = futureTask.get();System.out.println("采购的商品:" + goods);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}/*** 方法2* 通过while轮询方式会消耗cpu*/while (true) {if (futureTask.isDone()) {try {//获取采购的商品(堵塞线程)String goods = futureTask.get();System.out.println("采购的商品:" + goods);break;} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}} else {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}}System.out.println("发货end");}
}
实例代码提供了两种方式,但都是有局限性并且堵塞了主线程。
因此Java8引入了CompletableFuture来解决这些问题。
三、创建 CompletableFuture
1.CompletableFuture提供了以下静态方法:
// 无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 无返回值 可以自定义线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 有返回值 可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
supply开头:这种方法,可以返回异步线程执行之后的结果。
run开头:这种不会返回结果,就只是执行线程任务。
例如:
CompletableFuture.runAsync(() -> System.out.println("执行无返回值的异步任务"));
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("执行有返回值的异步任务");return "finish";
});
接着我们可以通过get()或者join()方法来获取返回的结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("执行有返回值的异步任务");return "finish";
});
System.out.println(future.get());
2.thenApply()方法
我们可以使用thenApply()方法在 CompletableFuture 到达时对其进行处理和转换,它将Function < T,R >作为参数。Function < T,R >是一个简单的函数式接口,表示一个接受 T 类型参数并产生 R 类型结果的函数。
例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {// 模拟业务时长 TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) { throw new IllegalStateException(e); }return "华为手机";
});CompletableFuture<String> resFuture = future.thenApply(name -> "订单: " + name);System.out.println(resFuture.get());
结果为:订单: 华为手机
你也可以通过附加一系列的thenApply()在回调方法 在CompletableFuture写一个连续的转换。这样的话,结果中的一个 thenApply方法就会传递给该系列的另外一个 thenApply方法。
例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {// 模拟业务时长 TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) { throw new IllegalStateException(e); }return "华为手机";
});CompletableFuture<String> resFuture = future.thenApply(name -> "订单: " + name).thenApply(result-> result + ",。。。。。。。");System.out.println(resFuture.get());
- thenAccept() 和 thenRun()方法。
如果不想从回调函数中返回任何东西,只想在 Future 完成后运行一些代码,那么你就可以使用thenAccept()和thenRun()方法。
例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "华为手机");
future.thenAccept(product -> System.out.println("我拿到了 " + product));
结果为:我拿到了 华为手机
注意:thenRun()无法拿到上一次的结果,thenRun()意为然后执行什么,里面接收一个Runnable参数。
在runAsync方法和supplyAsync方法第二个参数需要指定 线程池Executor,如果不指定,会使用默认的线程池ForkJoinPool。
ForkJoinPool可查大致为:
ForkJoinPool就是用来解决这种问题的:将一个大任务拆分成多个小任务后,
使用fork可以将小任务分发给其他线程同时处理,使用join可以将多个线程处理的结果进行汇总;这实际上就是分治思想的并行版本。
4、组合两个CompletableFuture
我们 使用 thenCompose()组合两个独立的future,假设我们想从一个远程API中获取一个用户的信息,当用户信息可用时,我们想从另外一个服务中获取另外的信息。那么我们就可以使用thenCompose()组合了。
如下:
CompletableFuture<User> getUser(String userId) {return CompletableFuture.supplyAsync(() -> {userService.getUser(userId);});
}CompletableFuture<Double> getOtherInfo(User user) {return CompletableFuture.supplyAsync(() -> {otherService.getOtherInfo(user);});
}
首先先看下thenApply()方法。
CompletableFuture<CompletableFuture<Double>> result = getUser(userId)
.thenApply(user -> getOtherInfo(user));
这种方式只是返回一个嵌套的CompletableFuture,如果想获取最终的结果给最顶层future,使用 thenCompose()方法代替。
CompletableFuture<CompletableFuture<Double>> result = getUser(userId)
.thenCompose(user -> getOtherInfo(user));
因此,你想从CompletableFuture链中获取一个直接合并后的结果,这时候你可以使用thenCompose()。
另外,thenCombine()组合两个独立的 future 虽然thenCompose()被用于当一个future依赖另外一个future的时候用来组合两个future。thenCombine()被用来当两个独立的Future都完成的时候,用来做一些事情。
CompletableFuture<User> getUser(String userId) {return CompletableFuture.supplyAsync(() -> {userService.getUser(userId);});
}CompletableFuture<User> getOther(String userId) {return CompletableFuture.supplyAsync(() -> {otherService.getOther(user);});
}
CompletableFuture<CompletableFuture<Double>> resultFuture = getUser(userId)
.thenCombine(getOther,(a, b)->{List users = new ArrayList<>();users.add(a);users.add(b);
});System.out.println("users is - " + resultFuture .get());
当两个Future都完成的时候,传给thenCombine()
的回调函数将被调用。
5、组合多个CompletableFuture
如果想组合任意数量的CompletableFuture,可以使用CompletableFuture.allOf()。
CompletableFuture.allOf()里面包含了多个CompletableFuture操作。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {return andTree(cfs, 0, cfs.length - 1);
}
接收一个CompletableFuture类型的可变参数。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "用户信息");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "角色信息");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "部门信息");CompletableFuture.allOf(future1, future2, future3);
System.out.println(future1.join());
System.out.println(future2.join());
System.out.println(future3.join());
6.CompletableFuture.anyOf()
这种方式就是多个任务中哪个任务先返回我就返回结果。
例如:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {// 模拟业务时长ThreadUtil.sleep(3000);return "用户信息";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {// 模拟业务时长ThreadUtil.sleep(2000);return "角色信息";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {return "部门信息";
});CompletableFuture<Object> future = CompletableFuture.anyOf(future1, future2, future3);
System.out.println(future.join());
结果:部门信息
7.CompletableFuture 异常处理
如果在执行过程中害怕出错,那么我们可以加上异常处理。
例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {int age = -1;if(age < 0) {throw new IllegalArgumentException("年龄不能为负数");}if(age > 18) {return "成年人";} else {return "小孩";}
}).exceptionally(e -> {System.out.println("年龄错误 " + e.getMessage());return "error!";
});
System.out.println(future.join());
此外handle()方法也可以处理异常。
8.complete() 方法。
CompletableFuture 类中的 complete() 方法用于手动完成一个异步任务,并设置其结果。通过调用 complete() 方法,可以将一个特定的结果设置到 CompletableFuture 对象中,然后任何等待该异步任务的操作都会得到这个预先设置的结果。
public class Result {public static void main(String[] args) {CompletableFuture future = CompletableFuture.supplyAsync(() ->{try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("进入异步方法");return 1;});try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println(future.complete(2) + "," + future.join());}
}
大家可以自己去了解下。