7.8 CompletableFuture

Future 接口理论知识复习

Future 接口(FutureTask 实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其他事情或者先执行完,过了一会才去获取子任务的执行结果,或变更的任务状态。
总结:Future 接口可以为主线程开一个分支任务,专门为主线程处理耗时和复杂业务。

Future 接口常用实现类 FutureTask 异步任务

Future 接口能干什么

Future 是 Java5 新加的一个接口,它提供了一种异步并行计算的功能。
如果主线程需要执行一个很耗时的计算任务,我们就可以通过 Future 把这个任务放到异步线程中执行。主线程继续处理其他任务或者先行结束,再通过 Future 获取计算结果。

本源的 Future 接口相关架构

image.png
绿色虚线:表示实现的关系,实现一个接口
绿色实线:表示接口之间的继承
蓝色实线:表示类之间的继承

Future 编码实战和优缺点分析

编码实战

public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(new MyThread());Thread t1 = new Thread(futureTask);t1.start();System.out.println(futureTask.get());}
}class MyThread implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("------come in call()");return "hello Callable";}
}

优缺点

优点
  • Future + 线程池异步多线程任务配合,能显著提高程序的执行效率。
public class FuturePoolDemo {public static void main(String[] args) {method1();System.out.println("------------------");method2();}private static void method2() {ExecutorService executorService = Executors.newFixedThreadPool(3);long startTime = System.currentTimeMillis();FutureTask<String> futureTask1 = new FutureTask<String>(() -> {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}return "task1 over";});executorService.submit(futureTask1);FutureTask<String> futureTask2 = new FutureTask<String>(() -> {try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}return "task2 over";});executorService.submit(futureTask2);FutureTask<String> futureTask3 = new FutureTask<String>(() -> {try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}return "task3 over";});executorService.submit(futureTask3);long endTime = System.currentTimeMillis();System.out.println("----costTime: " + (endTime - startTime) + "毫秒");System.out.println(Thread.currentThread().getName() + "\t ----end");executorService.shutdown();}private static void method1() {// 3 个任务,目前只有一个线程 main 来处理long startTime = System.currentTimeMillis();// 暂停毫秒try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}long endTime = System.currentTimeMillis();System.out.println("----costTime: " + (endTime - startTime) + "毫秒");System.out.println(Thread.currentThread().getName() + "\t ----end");}
}

输出:

----costTime: 1114毫秒
main	 ----end
------------------
----costTime: 46毫秒
main	 ----end
缺点
  1. get() 阻塞
@Slf4j(topic = "c.FutureAPIDemo")
public class FutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug(Thread.currentThread().getName() + "\t -----come in");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {throw new RuntimeException(e);}return "task over";});Thread t1 = new Thread(futureTask, "t1");t1.start();log.debug(futureTask.get());log.debug(Thread.currentThread().getName() + "\t ----忙其他任务了");}
}

输出

20:59:50.067 [t1] - t1	 -----come in
20:59:55.075 [main] - task over  
20:59:55.075 [main] - main	 ----忙其他任务了

主线程需等待 get 方法获取到 t1 线程的执行结果才开始执行。
建议:

  • get 方法容易导致阻塞,一般建议放在程序后面,一旦调用 get 方法求结果,如果计算没有完成容易导致程序阻塞。
  • 如果不希望等待过长时间,希望在等待指定时间后自动结束。可以使用 get(long timeout, TimeUnit unit)方法

get(long timeout, TimeUnit unit)方法演示

@Slf4j(topic = "c.FutureAPIDemo")
public class FutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug(Thread.currentThread().getName() + "\t -----come in");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {throw new RuntimeException(e);}return "task over";});Thread t1 = new Thread(futureTask, "t1");t1.start();log.debug(Thread.currentThread().getName() + "\t ----忙其他任务了");log.debug(futureTask.get(2, TimeUnit.SECONDS));}
}

输出
image.png

  1. isDone() 轮询容易浪费系统资源
@Slf4j(topic = "c.FutureAPIDemo")
public class FutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug(Thread.currentThread().getName() + "\t -----come in");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {throw new RuntimeException(e);}return "task over";});Thread t1 = new Thread(futureTask, "t1");t1.start();log.debug(Thread.currentThread().getName() + "\t ----忙其他任务了");while(true) {if(futureTask.isDone()) {log.debug(futureTask.get());break;} else {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("正在处理中...");}}}
}

输出

22:56:26.267 [t1] - t1	 -----come in
22:56:26.267 [main] - main	 ----忙其他任务了
22:56:26.775 [main] - 正在处理中...
22:56:27.281 [main] - 正在处理中...
22:56:27.784 [main] - 正在处理中...
22:56:28.285 [main] - 正在处理中...
22:56:28.787 [main] - 正在处理中...
22:56:29.292 [main] - 正在处理中...
22:56:29.795 [main] - 正在处理中...
22:56:30.299 [main] - 正在处理中...
22:56:30.801 [main] - 正在处理中...
22:56:31.306 [main] - 正在处理中...
22:56:31.306 [main] - task over

轮询的方式会耗费无谓的 CPU 资源,而且也不见得能及时地得到计算结果。如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。

总结

Future 对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。

想完成一些复杂的任务

对于简单的业务场景使用 Future 是完全 OK 的。但是对于回调通知,通过轮询的方式去判断任务是否完成,这样非常占用 CPU,并且代码也不优雅。
一些复杂任务:

  • 将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值。
  • 将这两个或多个异步计算合成一个一步计算,这几个一步计算相互独立,同时后面这个又依赖前一个处理的结果。
  • 对计算速度选最快:当Future集合中某个任务最快结束时,返回结果,返回第一名处理

对于简单的业务场景使用Future完全OK,但想完成上述一些复杂的任务,使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以声明式的方式优雅的处理这些需求。Future能干的,CompletableFuture都能干。

CompletableFuture 对 Future 的改进

CompletableFuture 为什么出现

get() 方法在 Future 计算完成之前会一直处在阻塞状态下,isDone() 方法容易耗费 CPU 资源。对于真正的异步处理,我们希望是可以通过传入回调函数,在 Future 结束时自动调用该回调函数,这样,我们就不用等待结果。
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的 CPU 资源。因此,JDK8 设计出 CompletableFuture。CompletableFuture 提供了一种观察者模式类似的机制,可以让人物执行完成后通知监听的一方。

CompletableFuture 和 CompletionStage 源码分别介绍

类架构说明

image.png
CompletableFuture 实现了 Future 接口和 CompletionStage 接口。

接口 CompletionStage

  • CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如 stage.thenApply(x -> square(x)).thenAccept(x -> Systemm.out.print(x)).thenRun(() -> System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。

类 CompletableFuture

  • 在 Java8 中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
  • 它可能代表一个明确完成的 Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。
  • 它实现了 Future 和 CompletionStage 接口

核心的四个静态方法

方法介绍

runAsync 无返回值:

public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);
}public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);
}

supplyAsync 有返回值:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);
}

上述Executor executor参数说明:

  • 没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码。
  • 如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码。

代码演示

runAsync 未设置线程池参数

@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {log.debug(Thread.currentThread().getName() + "\t ------come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("--------task is over");});System.out.println(future.get());}
}

输出:
image.png
runAsync 设置线程池参数

@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {log.debug(Thread.currentThread().getName() + "\t ------come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("--------task is over");}, executorService);System.out.println(future.get());executorService.shutdown();}
}

输出
image.png
supplyAsync 未设置线程池参数

@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {log.debug(Thread.currentThread().getName() + "\t ------come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("--------task is over");return "hello supplyAsync";});log.debug(future.get());}
}

输出
image.png
supplyAsync 设置线程池参数

@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {log.debug(Thread.currentThread().getName() + "\t ------come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("--------task is over");return "hello supplyAsync";}, executorService);log.debug(future.get());executorService.shutdown();}
}

输出
image.png

CompletableFuture 之通用异步编程

从 Java8 开始引入了 CompletableFuture,它是 Future 的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("----1秒钟后出结果:" + result);return result;}).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算完成,更新系统UpdateValue" + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");}
}

执行结果
image.png
没有发生异常,为什么没有执行 whenComplete 中的逻辑?
Fork/Join 线程池中的线程类似于守护线程,由于主线程执行速度过快,先执行结束,导致 Fork/Join 线程池被关闭。
解决方法

  • 可以让主线程不要立刻结束,否则 CompletableFuture 默认使用的线程池会立刻关闭:暂停 3 秒钟主线程。
  • 使用自定义线程池
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("----1秒钟后出结果:" + result);return result;}, pool).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算完成,更新系统UpdateValue: " + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");pool.shutdown();}
}

输出:
image.png
whenComplete 方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(null, action);
}

传入的参数是一个 BiConsumer 函数接口,BiConsumer 接口可以接收两个参数,其中第一个参数为上一步中产生的结果,第二个参数为上一步代码执行过程中产生的异常。
exceptionally 方法

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {return uniExceptionallyStage(fn);
}

如果 CompletableFuture 在执行时发生异常,则执行 exceptionally 中的逻辑,并返回新的 CompletableFuture
代码实例如下,当产生的随机数大于 2,触发异常,执行 exceptionally 中的逻辑

@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("----1秒钟后出结果:" + result);if(result > 2) {int i = 1 / 0;}return result;}, pool).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算完成,更新系统UpdateValue: " + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");pool.shutdown();}
}

输出
image.png

CompletableFuture 优点

  • 异步任务结束时,会自动回调某个对象的方法
  • 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
  • 异步任务出错时,会自动回调某个对象的方法

案例精讲 - 从电商网站的比价需求说开去

函数式接口

函数式接口名称方法名称参数返回值
Runnablerun无参数无返回值
Functionapply1 个参数有返回值
Consumeraccept1 个参数无返回值
Supplierget无参数有返回值
BiConsumeraccept2 个参数无返回值

业务需求

业务需求说明

电商网站比价需求分析:

  1. 需求说明:
    1. 同一款产品,同时搜索出同款产品在各大电商平台的售价
    2. 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
  2. 输出返回:
    1. 出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List

例如:《Mysql》 in jd price is 88.05 《Mysql》 in taobao price is 90.43

  1. 解决方案,对比同一个产品在各个平台上的价格,要求获得一个清单列表
    1. step by step,按部就班,查完淘宝查京东,查完京东查天猫…
    2. all in,万箭齐发,一口气多线程异步任务同时查询

代码实现

public class CompletableFutureMallDemo {static List<NetMall> list = Arrays.asList(new NetMall("jd"),new NetMall("dangdang"),new NetMall("taobao"));/*** 一家一家的查* @param list* @param productName* @return*/public static List<String> getPrice(List<NetMall> list, String productName) {return list.stream().map(netMall ->String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calPrice(productName))).collect(Collectors.toList());}public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {return list.stream().map(netMall ->CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calPrice(productName)))).collect(Collectors.toList()).stream().map(s -> s.join()).collect(Collectors.toList());}public static void main(String[] args) {long startTime = System.currentTimeMillis();
//        List<String> list1 = getPrice(list, "mysql");List<String> list1 = getPriceByCompletableFuture(list, "mysql");for (String element : list1) {System.out.println(element);}long endTime = System.currentTimeMillis();System.out.println("----costTime: " + (endTime - startTime) + "毫秒");}
}@AllArgsConstructor
class NetMall {@Getterprivate String netMallName;public double calPrice(String productName) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);}
}

普通方式查询,即调用 getPrice() 方法
image.png
采用 CompletableFuture 方式查询,即调用 getPriceByCompletableFuture 方法
image.png

CompletableFuture 常用方法

获得结果和触发计算

获取结果

public T get() throws InterruptedException, ExecutionException {// ...
}public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {// ...
}// 和get一样的作用,只是不需要抛出异常
public T join() {// ...
}// 计算完成就返回正常值,否则返回备胎值(传入的参数),立即获取结果不阻塞
public T getNow(T valueIfAbsent) {// ...
}

主动触发计算

// 是否打断 get 方法立即返回括号值
public boolean complete(T value)
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "abc";});try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(future.getNow("xxx"));System.out.println(future.complete("completeValue") + "\t" + future.join());}
}

输出
image.png
如果主线程中睡够 2 秒再去获取,输出结果:
image.png

对计算结果进行处理

thenApply

  • 计算结果存在依赖关系,两个线程串行化
  • 由于存在依赖关系(当前步骤出错,不再执行下一步),当前步骤有异常的话就停止运行。
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("111");return 1;}, pool).thenApply(f -> {System.out.println("222");return f + 2;}).thenApply(f -> {System.out.println("333");return f + 3;}).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算结果:" + v);}}).exceptionally(e -> {e.printStackTrace();;System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");}
}

输出
image.png
如下,若是在第二步执行过程中出错,程序将退出运行。

@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("111");return 1;}, pool).thenApply(f -> {int i = 1 / 0;System.out.println("222");return f + 2;}).thenApply(f -> {System.out.println("333");return f + 3;}).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算结果:" + v);}}).exceptionally(e -> {e.printStackTrace();;System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");pool.shutdown();}
}

输出结果
image.png

handle

  • 计算结果存在依赖关系,两个线程串行化
  • 有异常也可以继续向下运行,根据带的异常参数可以进一步处理

如下代码中,步骤 2 中出错,步骤 3 使用 handle 进行处理

@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("111");return 1;}, pool).thenApply(f-> {int i = 1 / 0;System.out.println("222");return f + 2;}).handle((f, e) -> {System.out.println("333");return f + 3;}).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算结果:" + v);}}).exceptionally(e -> {e.printStackTrace();;System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");pool.shutdown();}
}

输出结果
image.png
步骤 3 可以正常执行,但是由于步骤 2 中出错,最终会执行 exceptionally 中的逻辑

对计算结果进行消费

thenAccept

接收任务的处理结果,并消费处理,无返回结果

@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {return 1;}).thenApply(f -> {return f + 2;}).thenApply(f -> {return f + 3;}).thenAccept(r -> {System.out.println(r);});}
}

输出
image.png

对比补充

  • theRun(Runnable runnable)任务 A 执行完执行任务 B,并且 B 不需要 A 的结果
  • theAccept(Consumer action)任务 A 执行完执行任务 B,B 需要 A 的结果,但是任务 B 无返回值
  • thenApply(Function fn)任务 A 执行完执行任务 B,B 需要 A 的结果,同时任务 B 有返回值
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println(CompletableFuture.supplyAsync(() -> "resultA")// 任务 A 执行完执行任务 B,并且 B 不需要 A 的结果.thenRun(() -> {}).join());System.out.println("--------------------------------");System.out.println(CompletableFuture.supplyAsync(() -> "resultA")// 任务 A 执行完执行任务 B,B 需要 A 的结果,但是任务 B 无返回值.thenAccept(r -> {System.out.println(r);}).join());System.out.println("--------------------------------");System.out.println(CompletableFuture.supplyAsync(() -> "resultA")// 任务 A 执行完执行任务 B,B 需要 A 的结果,同时任务 B 有返回值.thenApply(r -> r + "resultB").join());}
}

输出结果
image.png

CompletableFuture 和线程池说明

theRun 和 theRunAsync 有什么区别?
  • 没有传入自定义线程池,都使用默认线程池 ForkJoinPool
  • 传入了一个自定义线程池
  • 如果执行第一个任务的时候,传入了一个自定义线程池
    • 调用 theRun 方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池
    • 调用 theRunAsync 执行第二个任务时,则第一个人任务使用的是你自己传入的线程池,第二个任务使用的是 ForkJoinPool 线程池
  • 有可能处理太快,系统优化切换原则,直接使用 main 线程处理

其他如:thenAccept 和 thenAcceptAsync,thenApply 和 thenApplyAsync 等,它们之间的区别也是同理。

使用 theRun 方法且未使用自定义线程池
  @Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("1号任务\t" + Thread.currentThread().getName());return "abcd";}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("2号任务\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("3号任务\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("4号任务\t" + Thread.currentThread().getName());});System.out.println(future.get(2L, TimeUnit.SECONDS));}
}

输出结果
image.png
使用 theRunAsync 方法且未使用自定义线程池与使用 theRun 方法且未使用自定义线程池情况一样

使用 theRun 方法且使用自定义线程池
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("1号任务\t" + Thread.currentThread().getName());return "abcd";}, pool).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("2号任务\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("3号任务\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("4号任务\t" + Thread.currentThread().getName());});System.out.println(future.get(2L, TimeUnit.SECONDS));pool.shutdown();}
}

输出结果
image.png

使用 theRunAsync 方法且未使用自定义线程池
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("1号任务\t" + Thread.currentThread().getName());return "abcd";}, pool).thenRunAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("2号任务\t" + Thread.currentThread().getName());}).thenRunAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("3号任务\t" + Thread.currentThread().getName());}).thenRunAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("4号任务\t" + Thread.currentThread().getName());});System.out.println(future.get(2L, TimeUnit.SECONDS));pool.shutdown();}
}

输出结果
image.png

对计算速度进行选用

applyToEither

谁快用谁

@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<String> planA = CompletableFuture.supplyAsync(() -> {System.out.println("A come in");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "planA";});CompletableFuture<String> planB = CompletableFuture.supplyAsync(() -> {System.out.println("B come in");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}return "planB";});CompletableFuture<String> result = planA.applyToEither(planB, f -> {return f + " is winer";});System.out.println(Thread.currentThread().getName() + "\t----" + result.join());}
}

输出结果
image.png

对计算结果进行合并

thenCombine

两个 CompletionStage 任务都完成后,最终把两个任务的结果一起交给 thenCombine 来处理。先完成的先等着,等待其他分支任务结束。

@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<Integer>  future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t----启动");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}return 10;});CompletableFuture<Integer>  future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t----启动");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return 20;});CompletableFuture<Integer> combine = future1.thenCombine(future2, (x, y) -> {System.out.println("-----开始两个结果的合并");return x + y;});System.out.println(combine.join());}
}

输出结果
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/45930.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

day05-matplotlit设置图形各种参数

matplotlib网格 1. 显示网格:plt.grid() plt.grid(True, linestyle "--",color "gray", linewidth "0.5",axis x)显示网格linestyle&#xff1a;线型&#xff0c;“–”:表示网格是虚线&#xff0c;默认为实线color&#xff1a;网格颜色li…

数列分块<1>

本期是数列分块入门<1>。该系列的所有题目来自hzwer在LOJ上提供的数列分块入门系列。 Blog:http://hzwer.com/8053.html sto hzwer orz %%% [转载] -----------------------------------------------------------------…

JAVA设计模式>>结构型>>适配器模式

本文介绍23种设计模式中结构型模式的适配器模式 目录 1. 适配器模式 1.1 基本介绍 1.2 工作原理 1.3 适配器模式的注意事项和细节 1.4 类适配器模式 1.4.1 类适配器模式介绍 1.4.2 应用实例 1.4.3 注意事项和细节 1.5 对象适配器模式 1.5.1 基本介绍 1.5.2 …

VUE+ELEMENTUI表格的表尾合计

<el-table :data"XXXX" :summary-method"getSummaries" show-summary "true" > getSummaries(param) { const { columns, data } param; const sums []; columns.forEach((column, index) > { if (index 0) { sums[index] 合计; }…

FFM(Field-aware Factorization Machine -领域感知的因子分解机)解析及举例

FFM&#xff08;Field-aware Factorization Machines&#xff09;模型是一种广泛应用于推荐系统、广告点击率预测等领域的机器学习模型。与传统的因子分解机&#xff08;FM&#xff09;相比&#xff0c;FFM模型考虑了不同特征字段之间的交互关系&#xff0c;从而能够更好地捕捉…

树莓派pico入坑笔记,dht11使用及温湿度表制作

目录 关于树莓派pico和circuitpython的更多玩法&#xff0c;请看树莓派pico专栏 用到的库adafruit_dht&#xff0c;需要导入pico才能使用&#xff0c;在这里下载 样例程序 进阶玩法&#xff0c;显示信息的温湿度计 屏幕使用见树莓派pico专栏的ssd1306oled屏幕使用 代码 效…

Go 初始化一个字典

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

4K60无缝一体矩阵 HDMI2.0功能介绍

关于GF-HDMI0808S 4K60无缝一体矩阵的功能介绍&#xff0c;由于直接针对GF-HDMI0808S型号的具体信息较少&#xff0c;我将结合类似4K60无缝HDMI矩阵的一般功能特性和可能的GF-HDMI0808系列产品的特点来进行说明。请注意&#xff0c;以下信息可能不完全针对GF-HDMI0808S型号&…

springboot+vue系统开发

链接: https://pan.baidu.com/s/1P1YpHAx9QOBPxjFZ9SAbig 提取码: u6f1

Java基础(十九):集合框架

目录 一、Java集合框架体系二、Collection接口及方法1、添加2、判断3、删除4、其它 三、Iterator(迭代器)接口1、Iterator接口2、迭代器的执行原理3、foreach循环 四、Collection子接口1&#xff1a;List1、List接口特点2、List接口方法3、List接口主要实现类&#xff1a;Array…

GuLi商城-商品服务-API-品牌管理-统一异常处理

每个方法都加这段校验太麻烦了 准备做一个统一异常处理@ControllerAdvice 后台代码: package com.nanjing.gulimall.product.exception;import com.nanjing.common.exception.BizCodeEnum; import com.nanjing.common.utils.R; import lombok.extern.slf4j.Slf4j; import org…

【Linux】任务管理

这个任务管理&#xff08;job control&#xff09;是用在bash环境下的&#xff0c;也就是说&#xff1a;【当我们登录系统获取bashshell之后&#xff0c;在单一终端下同时执行多个任务的操作管理】。 举例来说&#xff0c;我们在登录bash后&#xff0c;可以一边复制文件、一边查…

代码随想录算法训练营第五十二天(图论)| 98. 所有可达路径、深度优先搜索、广度优先搜索

邻接矩阵 邻接矩阵是一种使用二维数组来表示图的方法。矩阵中的元素表示节点之间是否存在边。如果存在边&#xff0c;则对应的矩阵元素为1&#xff08;或边的权重&#xff09;&#xff1b;否则为0。 特点&#xff1a; 空间复杂度高&#xff1a;无论图是否稀疏&#xff0c;邻…

前端Canvas入门——一些注意事项

创建渐变的三种方法&#xff1a; createLinearGradient() - 线性渐变 createRadialGradient() - 径向渐变&#xff08;放射性渐变&#xff09; createConicGradient() - 锥形渐变 这三种的核心观点都是&#xff1a; 创建一个gradient对象&#xff0c;然后调用addColorStop()方法…

【java】力扣 合并两个有序链表

文章目录 题目描述题目链接思路代码 题目描述 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 题目链接 21. 合并两个有序链表 思路 先定义一个哨兵节点dummy&#xff0c;为了方便解题 然后定义一个节点pre&#xff0…

paddlepaddle2.6,paddleorc2.8,cuda12,cudnn,nccl,python10环境

1.安装英伟达显卡驱动 首先需要到NAVIDIA官网去查自己的电脑是不是支持GPU运算。 网址是&#xff1a;CUDA GPUs | NVIDIA Developer。打开后的界面大致如下&#xff0c;只要里边有对应的型号就可以用GPU运算&#xff0c;并且每一款设备都列出来相关的计算能力&#xff08;Compu…

为二进制文件添加.gnu_debugdata调试信息

前言 在使用gcc/g编译二进制文件过程中&#xff0c;如果添加了-g参数&#xff0c;编译出来的二进制文件会带有debug信息&#xff0c;供调试使用。但是debug信息往往占用空间很大&#xff0c;导致二进制文件太大&#xff0c;在发布到生产环境时&#xff0c;一般会去掉调试信息&…

(南京观海微电子)——电容应用及选取

什么是电容器&#xff1f; 电容器是一种在内部电场中储存能量的电子器件。它与电阻器、电感器一样&#xff0c;都是基本的无源电子元件。所有电容器都具有相同的基本结构&#xff0c;两块导电极板中间由绝缘体隔开&#xff0c;该绝缘体称为电介质&#xff0c;可在施加电场后发…

时间轮算法理解、Kafka实现

概述 TimingWheel&#xff0c;时间轮&#xff0c;简单理解就是一种用来存储若干个定时任务的环状队列&#xff08;或数组&#xff09;&#xff0c;工作原理和钟表的表盘类似。 关于环形队列&#xff0c;请参考环形队列。 时间轮由两个部分组成&#xff0c;一个环状数组&…

一文了解MySQL的表级锁

文章目录 ☃️概述☃️表级锁❄️❄️介绍❄️❄️表锁❄️❄️元数据锁❄️❄️意向锁⛷️⛷️⛷️ 介绍 ☃️概述 锁是计算机协调多个进程或线程并发访问某一资源的机制。在数据库中&#xff0c;除传统的计算资源&#xff08;CPU、RAM、I/O&#xff09;的争用以外&#xff0…