Future 接口理论知识复习
Future 接口(FutureTask 实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其他事情或者先执行完,过了一会才去获取子任务的执行结果,或变更的任务状态。
总结:Future 接口可以为主线程开一个分支任务,专门为主线程处理耗时和复杂业务。
Future 接口常用实现类 FutureTask 异步任务
Future 接口能干什么
Future 是 Java5 新加的一个接口,它提供了一种异步并行计算的功能。
如果主线程需要执行一个很耗时的计算任务,我们就可以通过 Future 把这个任务放到异步线程中执行。主线程继续处理其他任务或者先行结束,再通过 Future 获取计算结果。
本源的 Future 接口相关架构
绿色虚线:表示实现的关系,实现一个接口
绿色实线:表示接口之间的继承
蓝色实线:表示类之间的继承
Future 编码实战和优缺点分析
编码实战
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(new MyThread());Thread t1 = new Thread(futureTask);t1.start();System.out.println(futureTask.get());}
}class MyThread implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("------come in call()");return "hello Callable";}
}
优缺点
优点
- Future + 线程池异步多线程任务配合,能显著提高程序的执行效率。
public class FuturePoolDemo {public static void main(String[] args) {method1();System.out.println("------------------");method2();}private static void method2() {ExecutorService executorService = Executors.newFixedThreadPool(3);long startTime = System.currentTimeMillis();FutureTask<String> futureTask1 = new FutureTask<String>(() -> {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}return "task1 over";});executorService.submit(futureTask1);FutureTask<String> futureTask2 = new FutureTask<String>(() -> {try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}return "task2 over";});executorService.submit(futureTask2);FutureTask<String> futureTask3 = new FutureTask<String>(() -> {try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}return "task3 over";});executorService.submit(futureTask3);long endTime = System.currentTimeMillis();System.out.println("----costTime: " + (endTime - startTime) + "毫秒");System.out.println(Thread.currentThread().getName() + "\t ----end");executorService.shutdown();}private static void method1() {// 3 个任务,目前只有一个线程 main 来处理long startTime = System.currentTimeMillis();// 暂停毫秒try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}long endTime = System.currentTimeMillis();System.out.println("----costTime: " + (endTime - startTime) + "毫秒");System.out.println(Thread.currentThread().getName() + "\t ----end");}
}
输出:
----costTime: 1114毫秒
main ----end
------------------
----costTime: 46毫秒
main ----end
缺点
- get() 阻塞
@Slf4j(topic = "c.FutureAPIDemo")
public class FutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug(Thread.currentThread().getName() + "\t -----come in");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {throw new RuntimeException(e);}return "task over";});Thread t1 = new Thread(futureTask, "t1");t1.start();log.debug(futureTask.get());log.debug(Thread.currentThread().getName() + "\t ----忙其他任务了");}
}
输出
20:59:50.067 [t1] - t1 -----come in
20:59:55.075 [main] - task over
20:59:55.075 [main] - main ----忙其他任务了
主线程需等待 get 方法获取到 t1 线程的执行结果才开始执行。
建议:
- get 方法容易导致阻塞,一般建议放在程序后面,一旦调用 get 方法求结果,如果计算没有完成容易导致程序阻塞。
- 如果不希望等待过长时间,希望在等待指定时间后自动结束。可以使用
get(long timeout, TimeUnit unit)
方法
get(long timeout, TimeUnit unit)
方法演示
@Slf4j(topic = "c.FutureAPIDemo")
public class FutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug(Thread.currentThread().getName() + "\t -----come in");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {throw new RuntimeException(e);}return "task over";});Thread t1 = new Thread(futureTask, "t1");t1.start();log.debug(Thread.currentThread().getName() + "\t ----忙其他任务了");log.debug(futureTask.get(2, TimeUnit.SECONDS));}
}
输出
- isDone() 轮询容易浪费系统资源
@Slf4j(topic = "c.FutureAPIDemo")
public class FutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug(Thread.currentThread().getName() + "\t -----come in");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {throw new RuntimeException(e);}return "task over";});Thread t1 = new Thread(futureTask, "t1");t1.start();log.debug(Thread.currentThread().getName() + "\t ----忙其他任务了");while(true) {if(futureTask.isDone()) {log.debug(futureTask.get());break;} else {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("正在处理中...");}}}
}
输出
22:56:26.267 [t1] - t1 -----come in
22:56:26.267 [main] - main ----忙其他任务了
22:56:26.775 [main] - 正在处理中...
22:56:27.281 [main] - 正在处理中...
22:56:27.784 [main] - 正在处理中...
22:56:28.285 [main] - 正在处理中...
22:56:28.787 [main] - 正在处理中...
22:56:29.292 [main] - 正在处理中...
22:56:29.795 [main] - 正在处理中...
22:56:30.299 [main] - 正在处理中...
22:56:30.801 [main] - 正在处理中...
22:56:31.306 [main] - 正在处理中...
22:56:31.306 [main] - task over
轮询的方式会耗费无谓的 CPU 资源,而且也不见得能及时地得到计算结果。如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。
总结
Future 对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
想完成一些复杂的任务
对于简单的业务场景使用 Future 是完全 OK 的。但是对于回调通知,通过轮询的方式去判断任务是否完成,这样非常占用 CPU,并且代码也不优雅。
一些复杂任务:
- 将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值。
- 将这两个或多个异步计算合成一个一步计算,这几个一步计算相互独立,同时后面这个又依赖前一个处理的结果。
- 对计算速度选最快:当Future集合中某个任务最快结束时,返回结果,返回第一名处理
对于简单的业务场景使用Future完全OK,但想完成上述一些复杂的任务,使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以声明式的方式优雅的处理这些需求。Future能干的,CompletableFuture都能干。
CompletableFuture 对 Future 的改进
CompletableFuture 为什么出现
get() 方法在 Future 计算完成之前会一直处在阻塞状态下,isDone() 方法容易耗费 CPU 资源。对于真正的异步处理,我们希望是可以通过传入回调函数,在 Future 结束时自动调用该回调函数,这样,我们就不用等待结果。
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的 CPU 资源。因此,JDK8 设计出 CompletableFuture。CompletableFuture 提供了一种观察者模式类似的机制,可以让人物执行完成后通知监听的一方。
CompletableFuture 和 CompletionStage 源码分别介绍
类架构说明
CompletableFuture 实现了 Future 接口和 CompletionStage 接口。
接口 CompletionStage
- CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
- 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如
stage.thenApply(x -> square(x)).thenAccept(x -> Systemm.out.print(x)).thenRun(() -> System.out.println())
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。
类 CompletableFuture
- 在 Java8 中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
- 它可能代表一个明确完成的 Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。
- 它实现了 Future 和 CompletionStage 接口
核心的四个静态方法
方法介绍
runAsync 无返回值:
public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);
}public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);
}
supplyAsync 有返回值:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);
}
上述Executor executor
参数说明:
- 没有指定
Executor
的方法,直接使用默认的ForkJoinPool.commonPool()
作为它的线程池执行异步代码。 - 如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码。
代码演示
runAsync 未设置线程池参数
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {log.debug(Thread.currentThread().getName() + "\t ------come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("--------task is over");});System.out.println(future.get());}
}
输出:
runAsync 设置线程池参数
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {log.debug(Thread.currentThread().getName() + "\t ------come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("--------task is over");}, executorService);System.out.println(future.get());executorService.shutdown();}
}
输出
supplyAsync 未设置线程池参数
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {log.debug(Thread.currentThread().getName() + "\t ------come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("--------task is over");return "hello supplyAsync";});log.debug(future.get());}
}
输出
supplyAsync 设置线程池参数
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {log.debug(Thread.currentThread().getName() + "\t ------come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("--------task is over");return "hello supplyAsync";}, executorService);log.debug(future.get());executorService.shutdown();}
}
输出
CompletableFuture 之通用异步编程
从 Java8 开始引入了 CompletableFuture,它是 Future 的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("----1秒钟后出结果:" + result);return result;}).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算完成,更新系统UpdateValue" + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");}
}
执行结果
没有发生异常,为什么没有执行 whenComplete 中的逻辑?
Fork/Join 线程池中的线程类似于守护线程,由于主线程执行速度过快,先执行结束,导致 Fork/Join 线程池被关闭。
解决方法
- 可以让主线程不要立刻结束,否则 CompletableFuture 默认使用的线程池会立刻关闭:暂停 3 秒钟主线程。
- 使用自定义线程池
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("----1秒钟后出结果:" + result);return result;}, pool).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算完成,更新系统UpdateValue: " + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");pool.shutdown();}
}
输出:
whenComplete 方法
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(null, action);
}
传入的参数是一个 BiConsumer 函数接口,BiConsumer 接口可以接收两个参数,其中第一个参数为上一步中产生的结果,第二个参数为上一步代码执行过程中产生的异常。
exceptionally 方法
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {return uniExceptionallyStage(fn);
}
如果 CompletableFuture 在执行时发生异常,则执行 exceptionally 中的逻辑,并返回新的 CompletableFuture
代码实例如下,当产生的随机数大于 2,触发异常,执行 exceptionally 中的逻辑
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("----1秒钟后出结果:" + result);if(result > 2) {int i = 1 / 0;}return result;}, pool).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算完成,更新系统UpdateValue: " + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");pool.shutdown();}
}
输出
CompletableFuture 优点
- 异步任务结束时,会自动回调某个对象的方法
- 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
- 异步任务出错时,会自动回调某个对象的方法
案例精讲 - 从电商网站的比价需求说开去
函数式接口
函数式接口名称 | 方法名称 | 参数 | 返回值 |
---|---|---|---|
Runnable | run | 无参数 | 无返回值 |
Function | apply | 1 个参数 | 有返回值 |
Consumer | accept | 1 个参数 | 无返回值 |
Supplier | get | 无参数 | 有返回值 |
BiConsumer | accept | 2 个参数 | 无返回值 |
业务需求
业务需求说明
电商网站比价需求分析:
- 需求说明:
- 同一款产品,同时搜索出同款产品在各大电商平台的售价
- 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
- 输出返回:
- 出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List
例如:《Mysql》 in jd price is 88.05 《Mysql》 in taobao price is 90.43
- 解决方案,对比同一个产品在各个平台上的价格,要求获得一个清单列表
- step by step,按部就班,查完淘宝查京东,查完京东查天猫…
- all in,万箭齐发,一口气多线程异步任务同时查询
代码实现
public class CompletableFutureMallDemo {static List<NetMall> list = Arrays.asList(new NetMall("jd"),new NetMall("dangdang"),new NetMall("taobao"));/*** 一家一家的查* @param list* @param productName* @return*/public static List<String> getPrice(List<NetMall> list, String productName) {return list.stream().map(netMall ->String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calPrice(productName))).collect(Collectors.toList());}public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {return list.stream().map(netMall ->CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calPrice(productName)))).collect(Collectors.toList()).stream().map(s -> s.join()).collect(Collectors.toList());}public static void main(String[] args) {long startTime = System.currentTimeMillis();
// List<String> list1 = getPrice(list, "mysql");List<String> list1 = getPriceByCompletableFuture(list, "mysql");for (String element : list1) {System.out.println(element);}long endTime = System.currentTimeMillis();System.out.println("----costTime: " + (endTime - startTime) + "毫秒");}
}@AllArgsConstructor
class NetMall {@Getterprivate String netMallName;public double calPrice(String productName) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);}
}
普通方式查询,即调用 getPrice() 方法
采用 CompletableFuture 方式查询,即调用 getPriceByCompletableFuture 方法
CompletableFuture 常用方法
获得结果和触发计算
获取结果
public T get() throws InterruptedException, ExecutionException {// ...
}public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {// ...
}// 和get一样的作用,只是不需要抛出异常
public T join() {// ...
}// 计算完成就返回正常值,否则返回备胎值(传入的参数),立即获取结果不阻塞
public T getNow(T valueIfAbsent) {// ...
}
主动触发计算
// 是否打断 get 方法立即返回括号值
public boolean complete(T value)
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "abc";});try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(future.getNow("xxx"));System.out.println(future.complete("completeValue") + "\t" + future.join());}
}
输出
如果主线程中睡够 2 秒再去获取,输出结果:
对计算结果进行处理
thenApply
- 计算结果存在依赖关系,两个线程串行化
- 由于存在依赖关系(当前步骤出错,不再执行下一步),当前步骤有异常的话就停止运行。
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("111");return 1;}, pool).thenApply(f -> {System.out.println("222");return f + 2;}).thenApply(f -> {System.out.println("333");return f + 3;}).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算结果:" + v);}}).exceptionally(e -> {e.printStackTrace();;System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");}
}
输出
如下,若是在第二步执行过程中出错,程序将退出运行。
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("111");return 1;}, pool).thenApply(f -> {int i = 1 / 0;System.out.println("222");return f + 2;}).thenApply(f -> {System.out.println("333");return f + 3;}).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算结果:" + v);}}).exceptionally(e -> {e.printStackTrace();;System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");pool.shutdown();}
}
输出结果
handle
- 计算结果存在依赖关系,两个线程串行化
- 有异常也可以继续向下运行,根据带的异常参数可以进一步处理
如下代码中,步骤 2 中出错,步骤 3 使用 handle 进行处理
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("111");return 1;}, pool).thenApply(f-> {int i = 1 / 0;System.out.println("222");return f + 2;}).handle((f, e) -> {System.out.println("333");return f + 3;}).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算结果:" + v);}}).exceptionally(e -> {e.printStackTrace();;System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");pool.shutdown();}
}
输出结果
步骤 3 可以正常执行,但是由于步骤 2 中出错,最终会执行 exceptionally 中的逻辑
对计算结果进行消费
thenAccept
接收任务的处理结果,并消费处理,无返回结果
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {return 1;}).thenApply(f -> {return f + 2;}).thenApply(f -> {return f + 3;}).thenAccept(r -> {System.out.println(r);});}
}
输出
对比补充
theRun(Runnable runnable)
任务 A 执行完执行任务 B,并且 B 不需要 A 的结果theAccept(Consumer action)
任务 A 执行完执行任务 B,B 需要 A 的结果,但是任务 B 无返回值thenApply(Function fn)
任务 A 执行完执行任务 B,B 需要 A 的结果,同时任务 B 有返回值
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println(CompletableFuture.supplyAsync(() -> "resultA")// 任务 A 执行完执行任务 B,并且 B 不需要 A 的结果.thenRun(() -> {}).join());System.out.println("--------------------------------");System.out.println(CompletableFuture.supplyAsync(() -> "resultA")// 任务 A 执行完执行任务 B,B 需要 A 的结果,但是任务 B 无返回值.thenAccept(r -> {System.out.println(r);}).join());System.out.println("--------------------------------");System.out.println(CompletableFuture.supplyAsync(() -> "resultA")// 任务 A 执行完执行任务 B,B 需要 A 的结果,同时任务 B 有返回值.thenApply(r -> r + "resultB").join());}
}
输出结果
CompletableFuture 和线程池说明
theRun 和 theRunAsync 有什么区别?
- 没有传入自定义线程池,都使用默认线程池 ForkJoinPool
- 传入了一个自定义线程池
- 如果执行第一个任务的时候,传入了一个自定义线程池
- 调用 theRun 方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池
- 调用 theRunAsync 执行第二个任务时,则第一个人任务使用的是你自己传入的线程池,第二个任务使用的是 ForkJoinPool 线程池
- 有可能处理太快,系统优化切换原则,直接使用 main 线程处理
其他如:thenAccept 和 thenAcceptAsync,thenApply 和 thenApplyAsync 等,它们之间的区别也是同理。
使用 theRun 方法且未使用自定义线程池
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("1号任务\t" + Thread.currentThread().getName());return "abcd";}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("2号任务\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("3号任务\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("4号任务\t" + Thread.currentThread().getName());});System.out.println(future.get(2L, TimeUnit.SECONDS));}
}
输出结果
使用 theRunAsync 方法且未使用自定义线程池与使用 theRun 方法且未使用自定义线程池情况一样
使用 theRun 方法且使用自定义线程池
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("1号任务\t" + Thread.currentThread().getName());return "abcd";}, pool).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("2号任务\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("3号任务\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("4号任务\t" + Thread.currentThread().getName());});System.out.println(future.get(2L, TimeUnit.SECONDS));pool.shutdown();}
}
输出结果
使用 theRunAsync 方法且未使用自定义线程池
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("1号任务\t" + Thread.currentThread().getName());return "abcd";}, pool).thenRunAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("2号任务\t" + Thread.currentThread().getName());}).thenRunAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("3号任务\t" + Thread.currentThread().getName());}).thenRunAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("4号任务\t" + Thread.currentThread().getName());});System.out.println(future.get(2L, TimeUnit.SECONDS));pool.shutdown();}
}
输出结果
对计算速度进行选用
applyToEither
谁快用谁
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<String> planA = CompletableFuture.supplyAsync(() -> {System.out.println("A come in");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "planA";});CompletableFuture<String> planB = CompletableFuture.supplyAsync(() -> {System.out.println("B come in");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}return "planB";});CompletableFuture<String> result = planA.applyToEither(planB, f -> {return f + " is winer";});System.out.println(Thread.currentThread().getName() + "\t----" + result.join());}
}
输出结果
对计算结果进行合并
thenCombine
两个 CompletionStage 任务都完成后,最终把两个任务的结果一起交给 thenCombine 来处理。先完成的先等着,等待其他分支任务结束。
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t----启动");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t----启动");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return 20;});CompletableFuture<Integer> combine = future1.thenCombine(future2, (x, y) -> {System.out.println("-----开始两个结果的合并");return x + y;});System.out.println(combine.join());}
}
输出结果