文章目录
- 1、获得结果和触发计算
- 2、对计算结果进行处理
- 3、对结算结果进行消费
- 4、CompletableFuture的thenXX和thenXXAsync
- 5、对计算速度进行选用
- 6、对计算结果进行合并
1、获得结果和触发计算
- 获取任务结果
public T get()
public T get(long timeout, TimeUnit unit)
public T join()
- 获取当前计算结果,传入缺省值。计算完成时正常返回,未计算完成时,返回传入的缺省值
public T getNow(T valueIfAbsent)
- 传入value,是否打断了get方法,且再次get时,返回刚才传入的值。有点像getNow
boolean complete(T value)
//举例:
public class CompletDemo1 {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "9527";});System.out.println(completableFuture.complete("completValue") + "\t" + completableFuture.join());}
}
2、对计算结果进行处理
结果的续传和计算,上一步的计算结果传给下一步,类比买鱼、下锅、调料,相关方法:
- thenApply方法
- handle方法
二者都是计算结果存在依赖关系时用的,即两个任务所在的线程串行化。区别是当某一步发生异常时:
- thenApply叫停,不再走一下步骤
- handle会继续走下一步
public class CompletDemo1 {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(4);try {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("11");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "11";}, pool).thenApply(f -> {System.out.println("22");return f + "22";}).thenApply(f -> {System.out.println("33");return f + "33";}).whenComplete((v, e) -> {if (null == e) {System.out.println("計算完成:" + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "先去处理其他任务了...");} catch (Exception e) {e.printStackTrace();} finally {pool.shutdown();}}
}
模拟异常:
换handle:
3、对结算结果进行消费
接收任务的执行结果,并进行消费处理,无返回结果,相关方法:thenAccept
CompletableFuture.supplyAsync(() -> 1).thenApply(f -> f + 2).thenAccept(t -> {System.out.println(t);});
//.thenAccept(System.out::println);//輸出3
对比:
//null(因为thenRun无返回值,最后join就是null)
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
//resultA(中途任务输出的)
//null(最后join的)
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(f -> System.out.println(f)).join());
//resultA resultB
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(f -> f + " resultB").join());
4、CompletableFuture的thenXX和thenXXAsync
可以发现以上整理的方法中,每个都有xxAsync方法:
看下源码:
跟进defaultExecutor,发现其来自ASYNC_POOL:
三目运算符,除非不支持并行,一般就是FrokJoinPool。写个试验的代码:
public class CompletableDemo4 {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(6);try {CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("1号任务在" + Thread.currentThread().getName());return "1024";},pool).thenRun(() -> {System.out.println("2号任务在" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(30);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务在" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务在" + Thread.currentThread().getName());});System.out.println(completableFuture.join());} catch (Exception e) {e.printStackTrace();} finally {pool.shutdown();}}
}
结论:
- 不管thenRun还是thenRunAsync,没有传入自定义的线程池时,二者都使用默认的ForkJoinPool线程池
- 传入一个自定义线程池在任务x,则调用thenRun方法执行下一个任务时,这两个任务共用同一个线程池
- 调用thenRunAsync方法执行执行下一个任务时,第一个用自己传入的线程池,第二个用ForkJoin线程池
- 总之thenXX和thenXXSync就是影响到了任务是否另起炉灶(线程池)
- 此外,有可能系统处理太快,系统优化切换规则,会出现直接使用main线程处理的现象
5、对计算速度进行选用
A和B两个任务谁先执行完,相关方法:
applyToEither(xx,xx)
示例:
public class CompletableDemo5 {public static void main(String[] args) {CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {System.out.println("A come in");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "playerA";});CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println("B come in");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "playerB";});CompletableFuture<String> result = completableFuture1.applyToEither(completableFuture2, f -> f + " is winer!");System.out.println(Thread.currentThread().getName() + "----" + "\t" + result.join());}
}
6、对计算结果进行合并
两个任务都完成后,最终把两个任务的结果一起交给thenCombine来处理,先完成的任务先等着,等待其他分支完成任务。
thenCombine
可链式编程,因为其返回类型还是CompletableFuture,传入:
- 参数一:要合并的另一个CompletableFuture
- 参数二:BiFunction,定义对两个任务的合并逻辑,比如你选择相加
public class CompletableDemo6 {public static void main(String[] args) {CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t --- 开始计算");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return 20;});CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName()+ "\t --- 开始计算");try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {e.printStackTrace();}return 10;});CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {System.out.println("---计算结果开始合并");return x + y;});System.out.println(result.join());}
}
一个2s,一个4s,先完成的先等等: