文章目录
- Future 接口理论知识复习
- Future 接口概述
- 场景描述
- 小结
- Future 接口常用实现类 FutureTask 异步任务
- Future 的作用
- Futrue 编码测试
- 优缺点分析
- 优点
- 缺点
- 小结
- 面对一些复杂的任务
- 对于简单的业务场景使用 Future 接口完全 OK
- 回调通知
- 创建异步任务
- 多个任务前后依赖可以组合
- 对计算速度选最快
- CompletableFuture 应运而生
- CompletableFuture 对 Future 的改进
- CompletableFuture 为什么会出现?
- CompletableFuture 与 CompletionStage 源码
- 类继承架构
- 接口 CompletionStage
- 类 CompletionFuture
- 核心的四个静态方法,创建一个异步任务
- runAsync 方法---无返回值
- supplyAsync 方法---有返回值
- 关于参数 Executor 说明
- 减少阻塞和轮询
- CompletableFuture 的优点
- 电商网站比价需求案例
- 函数式编程已经主流
- Lambda 表达式+Stream 流式应用+Chain 链式调用+Java8 函数式编程
- Runnable
- Function
- Consumer
- Supplier
- Summer
- 先说说 join 和 get 的对比
- 大厂业务需求说明
- Java8 函数式编程在 Case 中的应用
- 方案一,step by step
- 方案二,asyncExecutor
- 效果比较
- CompletableFuture 的常用方法
- 获得结果和触发计算
- get()
- get(long time,TimeUnit unit)
- join()
- getNow(String valueIfAbsent)
- complete(T value)
- 对计算结果进行处理
- thenApply
- handle
- Summary
- 对计算结果进行消费
- demo
- 补充
- CompletableFuture 和线程池说明
- 对计算速度进行选用
- 对计算结果进行合并
Future 接口理论知识复习
Future 接口概述
- Future 接口(FutureTask 实现类)定义了异步任务执行的一些方法
- 获取异步任务执行的结果
- 取消异步任务执行
- 判断任务是否被取消,
- 判断任务执行是否完毕等
场景描述
- 主线程让一个子线程去执行任务,子线程可能比较耗时,如果没有实现异步任务执行,主线程只能一直等待
- Future 接口支持了异步任务执行之后,子线程开始执行任务的同时,主线程继续执行自身任务,等到主线程或者子线程任务完成之后,主线程才会获取子线程任务执行结果
- 上课买水案例…
小结
Future 接口可为主线程开启一个分支任务,专门为主线程处理耗时和废力的复杂业务
Future 接口常用实现类 FutureTask 异步任务
- Future 接口是 Java5 新增的一个接口,提供了一种异步并行计算的功能
- 若主线程需要执行一些很耗时的计算任务,可以通过 future 把该任务放到异步线程中去执行
- 主线程继续处理其他任务或者先行结束,再通过 Future 获取计算结果
Future 的作用
- 异步多线程任务执行且返回有结果,三个特点
- 多线程
- 有返回
- 异步任务
- 为什么是 Future?
Futrue 编码测试
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(new MyThread());Thread t1 = new Thread(futureTask, "t1");t1.start();System.out.println(futureTask.get());}
}class MyThread implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("-----come in call() ");return "hello Callable";}
}
优缺点分析
优点
- futrue+线程池异步多线程任务配合,能显著提高代码的执行效率
- 串型执行
//3个任务,目前只有一个线程main来处理,请问耗时多少?long startTime = System.currentTimeMillis();//暂停毫秒try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}long endTime = System.currentTimeMillis();System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");System.out.println(Thread.currentThread().getName() + "\t -----end");
- 使用 futureTask+线程池异步多线程任务
ExecutorService threadPool = Executors.newFixedThreadPool(3);long startTime = System.currentTimeMillis();FutureTask<String> futureTask1 = new FutureTask<String>(() -> {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}return "task1 over";});threadPool.submit(futureTask1);FutureTask<String> futureTask2 = new FutureTask<String>(() -> {try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}return "task2 over";});threadPool.submit(futureTask2);System.out.println(futureTask1.get());System.out.println(futureTask2.get());try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}long endTime = System.currentTimeMillis();System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");System.out.println(Thread.currentThread().getName() + "\t -----end");threadPool.shutdown();
缺点
- get 的获取容易阻塞
FutureTask<String> futureTask = new FutureTask<String>(() -> {System.out.println(Thread.currentThread().getName() + "\t -----come in");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}return "task over";});Thread t1 = new Thread(futureTask, "t1");t1.start();System.out.println(Thread.currentThread().getName() + "\t ----忙其它任务了");System.out.println(futureTask.get());
- get容易导致阻塞,一般建议放在程序后面,一旦调用不见不散,非要等到结果才会离开,不管你是否计算完成,容易程序堵塞。
System.out.println(futureTask.get());System.out.println(Thread.currentThread().getName() + "\t ----忙其它任务了");
- 假如我不愿意等待很长时间,我希望过时不候,可以自动离开.
System.out.println(Thread.currentThread().getName() + "\t ----忙其它任务了");System.out.println(futureTask.get(3,TimeUnit.SECONDS));
- 超过 3 秒结束线程抛出 TimeOutException
- 轮询耗费 CPU
while (true) {if (futureTask.isDone()) {//futureTask执行完成System.out.println(futureTask.get());break;} else {//暂停毫秒,未完成,持续等待try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("正在处理中,不要再催了,越催越慢 ,再催熄火");}}
小结
- get(),一旦调用 get 方法求结果,如果计算没有完成,容易导致线程阻塞
- isDone()轮询
- 轮询的方式会消耗无谓的 CPU 资源,而且也不见得能及时得到计算结果
- 如果想要异步获取,通常都会以轮询的方式去获取结果,尽量不使用阻塞
- Future 对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到结果
面对一些复杂的任务
对于简单的业务场景使用 Future 接口完全 OK
回调通知
- 应对 Future 的完成时间,完成之后发起回调通知
- 通过轮询方式去判断任务是否完成,非常不优雅,也占用 CPU
创建异步任务
- Future+线程池配合
多个任务前后依赖可以组合
- 若想将多个异步任务的计算结果组合起来,则后一个异步任务的计算结果,需要前一个异步任务的值
- 将两个或多个异步计算合成一个异步计算,这几个异步计算,互相独立,同时后面这个又依赖于前一个处理的结果
对计算速度选最快
- 当 Future 集合中某个任务最快结束时,返回结果,返回第一名处理结果
CompletableFuture 应运而生
- 使用 Future 接口提供的 API,处理不够优雅
- CompletableFuture 以声明式方式优雅的处理这些需求同时规避 Future 自身获取计算结果的弊端
CompletableFuture 对 Future 的改进
CompletableFuture 为什么会出现?
- get()方法在 Future 计算完成之前会一直处于阻塞状态下
- isDone()方法容易耗费 CPU 资源
- 对于真正在异步处理中我们希望可以通过传入回调函数,在 Future 结束时自动回调该函数,这样就不需要等待结果
- 阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的 CPU 资源
- 因此,JDK8 设计出 CompletableFuture
- CompletableFuture 提供了一种与观察者模式类似的机制,可以让任务执行完成后通知监听的一方
- CompletableFuture 提供了一种与观察者模式类似的机制,可以让任务执行完成后通知监听的一方
CompletableFuture 与 CompletionStage 源码
类继承架构
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{}
接口 CompletionStage
- CompletionStage 代表异步计算过程中的某个阶段,一个阶段完成以后会触发另一个阶段(类似于 Linux 管道分隔符传参数)
- 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable
//示例如下
stage
.thenApply(x->square(x))
.thenAccept(x->System.out.print(x))
.thenRun(()->System.out.println())
- 一个阶段的执行可能被单个阶段的完成触发,也可能有多个阶段一起触发
类 CompletionFuture
- Java8 中,CompletableFuture 提供了非常强大的 Future 的扩展功能,简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,提供了转化和组合 CompletionFuture 的方法
- 它可能代表了一个明确完成 Future,也可能代表一个完成阶段 CompletionStage,它支持在计算完成之后触发一些函数或执行某些动作
- 实现了 Future 和 CompletionStage 接口
核心的四个静态方法,创建一个异步任务
- 为什么要不用 new CompletionFuture()方式创建异步任务
- API 中说明通过 new CompletionFuture()方式会创建一个不完备的 CompletionFuture,官方也不推荐使用该方式
runAsync 方法—无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
- 使用默认的 ForkJoinPool 线程池
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(completableFuture.get());
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
- 使用定义的线程池对象
ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}},threadPool);System.out.println(completableFuture.get());threadPool.shutdown();
supplyAsync 方法—有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName());//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "hello supplyAsync";});System.out.println(completableFuture.get());
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName());//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "hello supplyAsync";},threadPool);System.out.println(completableFuture.get());threadPool.shutdown();
关于参数 Executor 说明
- 没有指定 Executor 的方法,直接默认使用 ForkJoinPool.commonPool()作为线程池,作为它的线程池执行异步代码
- 若指定线程池,则使用自定义或者特别定义的线程池执行异步代码
减少阻塞和轮询
- 从 Java8 开始引入了 CompletableFuture,它是 Future 的功能增强版,减少阻塞和轮询,
- 可以传入回调对象,当异步任务完成或者发生异常时,自动回调对象的回调方法
- 使用 CompletableFuture 实现 Future 的功能
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("-----1秒钟后出结果:" + result);return result;});System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");System.out.println(completableFuture.get());
- CompletableFuture.supplyAsync()完成异步编程返回结果
try {CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("-----1秒钟后出结果:" + result);return result;}).whenComplete((v, e) -> {//v为上述计算完成的result,e为异常if (e == null) { //没有异常System.out.println("-----计算完成,更新系统UpdateValue:" + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");} catch (Exception e) {e.printStackTrace();} finally {threadPool.shutdown();}
- 解释下为什么默认线程池关闭,自定义线程池记得关闭?
- 用户线程中,程序执行完成需要 1 秒钟,main 线程执行太快,在 ForkJoinPool 线程池中若发现 main 线程执行完成则会关闭线程池
- 解决方法
- 将 main 线程延迟 1 秒,在用户线程的 try/catch/finally 代码之后添加睡眠代码
//主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}
- 或是使用定义的线程对象,或是自定义线程对象
ExecutorService threadPool = Executors.newFixedThreadPool(3);try {CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("-----1秒钟后出结果:" + result);return result;},threadPool).whenComplete((v, e) -> {if (e == null) {System.out.println("-----计算完成,更新系统UpdateValue:" + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");} catch (Exception e) {e.printStackTrace();} finally {threadPool.shutdown();}
- 编写异常代码测试
CompletableFuture 的优点
- 异步任务结束时,会自动调用对象的方法
- 主线程设置好回调之后,不在关系异步任务的执行,异步任务之间可以顺序进行
- 异步任务出错时,会自动调用某个对象的方法
try {//调用异步任务,传入线程池对象asyncTask(threadPool);System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");} catch (Exception e) {e.printStackTrace();} finally {threadPool.shutdown();}//...主线程void asyncTask(ExecutorService threadPool) {//...业务的逻辑return result;}, threadPool).whenComplete((v, e) -> {//回调接口callInterface(v, e);}).exceptionally(e -> {//异常接口e.printStackTrace();exceptionHandel(e);return null;});}
电商网站比价需求案例
函数式编程已经主流
- 大厂面试题
Lambda 表达式+Stream 流式应用+Chain 链式调用+Java8 函数式编程
Runnable
- 无参数,无返回值
package java.lang;@FunctionalInterface
public interface Runnable {public abstract void run();
}
Function
- Function<T,R>接受一个参数,并且有返回值
@FunctionalInterface
public interface Function<T, R> {R apply(T t);
}
Consumer
- Consumer 接受一个参数,没有返回值
@FunctionalInterface
public interface Consumer<T> {void accept(T t);
}
- BiConsumer<T,U>接受两个参数,没有返回值
@FunctionalInterface
public interface BiConsumer<T, U> {void accept(T t, U u);
}
- 在回调 CompletableFuture.whenComplete 方法中进行调用
Supplier
- 供给型函数式接口,没有参数,有一个返回值
@FunctionalInterface
public interface Supplier<T> {/*** Gets a result.** @return a result*/T get();
}
Summer
先说说 join 和 get 的对比
- get 在程序编译时会检查异常,join 在程序编译时不会检查异常,此外于 get 基本等价
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {return "join and get";});System.out.println(supplyAsync.join());
- 说说你过去工作中的项目亮点!!!(面试必备)
大厂业务需求说明
- 切记,先完成功能再到性能的逐步迭代
- 电商网站比价需求分析
案例说明:电商比价需求,模拟如下情况:1. 需求:1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少2. 输出:出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List<String>《mysql》 in jd price is 88.05《mysql》 in dangdang price is 86.11《mysql》 in taobao price is 90.433. 解决方案,对别同一个商品在各个平台上的价格,要求获得一个清单列表3.1. step by step,按部就班查完jd,查taobao,查完taobao查天猫.....3.2. all in ,使用多线程,异步任务执行同时查询多个平台4. 技术要求3.1 函数式编程3.2 链式编程3.3 Stream流式计算
Java8 函数式编程在 Case 中的应用
- 创建资源
- 电商网站类
//电商网站类
class NetMall {/*** 电商网站名 jd,pdd taobao...*/@Getterprivate String netMallName;/*** 构造方法* @param netMallName*/public NetMall(String netMallName) {this.netMallName = netMallName;}/*** 售价* @param productName* @return*/public double calcPrice(String productName) {try {//查询需要1秒钟TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}//模拟价格return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);}
}
- 查询电商网站
static List<NetMall> list = Arrays.asList(new NetMall("jd"),new NetMall("dangdang"),new NetMall("taobao"),new NetMall("pdd"),new NetMall("tmall"));
方案一,step by step
- 使用流式计算,查询返回结果
/*** step by step 一家家搜查* List<NetMall> ----->map------> List<String>** @param list* @param productName* @return*/public static List<String> getPrice(List<NetMall> list, String productName) {//《mysql》 in taobao price is 90.43return list.stream()//流式计算.map(netMall -> //映射为map集合//字符串格式化String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calcPrice(productName))).collect(Collectors.toList());}
- 测试计算结果
public static void main(String[] args) {long startTime = System.currentTimeMillis();List<String> list1 = getPrice(list, "mysql");for (String element : list1) {System.out.println(element);}long endTime = System.currentTimeMillis();System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");}
方案二,asyncExecutor
- 基于CompletableFuture.supplyAsync
/*** List<NetMall> ----->List<CompletableFuture<String>>------> List<String>** @param list* @param productName* @return*/public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {return list.stream().map(netMall ->CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calcPrice(productName)))).collect(Collectors.toList()).stream().map(s -> s.join()).collect(Collectors.toList());}
- 两次流式映射
public static void main(String[] args) {long startTime2 = System.currentTimeMillis();List<String> list2 = getPriceByCompletableFuture(list, "mysql");for (String element : list2) {System.out.println(element);}long endTime2 = System.currentTimeMillis();System.out.println("----costTime: " + (endTime2 - startTime2) + " 毫秒");
}
效果比较
CompletableFuture 的常用方法
获得结果和触发计算
get()
/*** 获得结果和触发计算** @throws InterruptedException* @throws ExecutionException*/private static void group1() throws InterruptedException, ExecutionException {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "abc";});System.out.println(completableFuture.get());}
get(long time,TimeUnit unit)
System.out.println(completableFuture.get(2L,TimeUnit.SECONDS));
join()
System.out.println(completableFuture.join());
getNow(String valueIfAbsent)
System.out.println(completableFuture.getNow("xxx"));
- 源码解读
- 当调用 getNow 时,计算完成,获取计算结果
- 当调用 getNow 时,计算未完成,返回备选值(valueIfabsent)
- 异步任务执行 1s,主线程 2s,在 ,异步任务在 2s 内执行完成,返回结果给 getNow
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "abc";});//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println(completableFuture.getNow("xxx"));
complete(T value)
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try {//执行2sTimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "abc";});//执行1stry { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println(completableFuture.complete("completeValue") + "\t" + completableFuture.join());
- 源码说明
- 主线程调用异步任务时
- 计算未完成,返回 true,同时将 value 作为 result 给到主线程
- 计算完成,返回 false,同时将异步任务的计算结果给到主线程
- 将异步任务与主线程的睡眠时间互换,得到以下结果
对计算结果进行处理
- 此处 supplyAsync 若不使用指定线程池,主线程执行完会直接结束 jvm
thenApply
- 计算结果存在依赖关系,这两个线程串行化
- demo
private static void thenApply1(ExecutorService threadPool) {CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("111");return 1;}, threadPool).thenApply(f -> {System.out.println("222");return f + 2;}).thenApply(f -> {System.out.println("333");return f;}).whenComplete((v, e) -> {if (e == null) {System.out.println("----计算结果: " + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "----主线程先去忙其它任务");threadPool.shutdown();}
- f=1+2=3
- 异常处理
- 计算过程中出现异常,thenApply(),会直接终止计算
handle
- 计算结果存在依赖关系,这两个线程串行化
- demo
private static void handle1(ExecutorService threadPool) {CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("111");return 1;}, threadPool).handle((f, e) -> {System.out.println("222");return f + 2;}).handle((f, e) -> {System.out.println("333");return f + 3;}).whenComplete((v, e) -> {if (e == null) {System.out.println("----计算结果: " + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "----主线程先去忙其它任务");threadPool.shutdown();}
- 异常处理
- 有异常时,跳过异常代码,带着异常参数继续执行后续代码
Summary
对计算结果进行消费
- 接受任务的处理结果,并消费处理,无返回结果
demo
- 源码解读
- 调用了 Consumer 接口,传入参数无返回值
public static void main(String[] args) {CompletableFuture.supplyAsync(() -> {return 1;}).thenApply(f ->{return f + 2;}).thenApply(f ->{return f + 3;}).thenAccept(System.out::println);}
补充
- thenRun 不调用前置计算的结果
- thenAccpet 获取前置计算结果,最终不返回记过,consumer 直接消费
- thenApply,获取前置计算结果,最终返回所有计算结果
CompletableFuture 和线程池说明
- 以 thenRun 和 thenRunAsync 为例
public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(5);try {CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1号任务" + "\t" + Thread.currentThread().getName());return "abcd";}, threadPool).thenRunAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2号任务" + "\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务" + "\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务" + "\t" + Thread.currentThread().getName());});System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));} catch (Exception e) {e.printStackTrace();} finally {threadPool.shutdown();}}
- 将所有的方法都统一为 thenRun
- 将入口睡眠代码注释
- 结论
- 没有传入自定义线程池,都默认使用 ForkJoinPool
- 传入一个指定线程池之后
- 执行第一个任务时,传入指定线程池
- 调用 thenRun 方法执行第二个任务时,则第一个任务和第二个任务共用同一个线程池
- 调用 thenRunAsync 执行第二个任务时,则第一个任务用指定线程池,第二个任务用 ForkJoinPool
- 执行第一个任务时,传入指定线程池
- 有可能处理太快,系统优化切换原则直接使用 main 线程处理
- 其它 thenAccept 与 thenAccpetAsync,thenApply 和 thenApplyAsync 等,之间的区别亦是同理
- 源码解读
- 调用 thenXxxxAsync 方法默认都会调用一个 ForkJoinPool.commonPool()
对计算速度进行选用
- 谁快用谁
- applyToEither
public static void main(String[] args) {//开启两个异步任务CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {System.out.println("A come in");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "playA";});CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {System.out.println("B come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "playB";});//比较两个异步任务,返回率先完成计算的异步任务的结果CompletableFuture<String> result = playA.applyToEither(playB, f -> {return f + " is winer";});System.out.println(Thread.currentThread().getName() + "\t" + "-----: " + result.join());}
对计算结果进行合并
- 两个 CompletionStage 任务都完成后,最终能把两个任务的结果一起交给 thenCombine 进行处理
- 先完成的先等待,所有分支完成后执行 thenCombine
- 拆分方式
public static void main(String[] args) {CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t ---启动");//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return 10;});CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t ---启动");//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return 20;});CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {System.out.println("-----开始两个结果合并");return x + y;});System.out.println(result.join());}
- 函数式接口方式
private static void interfaceChain() {ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t"+"come in 1");return 10;}).thenCombine(CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName() + "\t"+"come in 2");return 20;}),(x,y)->{System.out.println(Thread.currentThread().getName() + "\t"+"x + y = a =" +(x+y));return x + y ;}).thenCombine(CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName() + "\t"+"come in 3");return 30;}),(a,b)->{System.out.println(Thread.currentThread().getName() + "\t"+"a + b = " +(a+b));return a+b;});System.out.println("---主线程结束,END");System.out.println(thenCombineResult.join());}