一网打尽异步神器CompletableFuture

Future接口以及它的局限性

我们都知道,Java中创建线程的方式主要有两种方式,继承Thread或者实现Runnable接口。但是这两种都是有一个共同的缺点,那就是都无法获取到线程执行的结果,也就是没有返回值。于是在JDK1.5 以后为了解决这种没有返回值的问题,提供了Callable和Future接口以及Future对应的实现类FutureTask,通过FutureTask的就可以获取到异步执行的结果。

于是乎,我们想要开启异步线程,执行任务,获取结果,就可以这么实现。

 FutureTask<String> futureTask = new FutureTask<>(() -> "三友");new Thread(futureTask).start();System.out.println(futureTask.get());

或者使用线程池的方式

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
System.out.println(future.get());
executorService.shutdown();

线程池底层也是将提交的Callable的实现先封装成FutureTask,然后通过execute方法来提交任务,执行异步逻辑。

Future接口的局限性

虽然通过Future接口的get方法可以获取任务异步执行的结果,但是get方法会阻塞主线程,也就是异步任务没有完成,主线程会一直阻塞,直到任务结束。

Future也提供了isDone方法来查看异步线程任务执行是否完成,如果完成,就可以获取任务的执行结果,代码如下。

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
while (!future.isDone()) {//任务有没有完成,没有就继续循环判断
}
System.out.println(future.get());
executorService.shutdown();

但是这种轮询查看异步线程任务执行状态,也是非常消耗cpu资源。

同时对于一些复杂的异步操作任务的处理,可能需要各种同步组件来一起完成。

所以,通过上面的介绍可以看出,Future在使用的过程中还是有很强的局限性,所以为了解决这种局限性,在JDK1.8的时候,Doug Lea 大神为我们提供了一种更为强大的类CompletableFuture。

什么是CompletableFuture?

CompletableFuture在JDK1.8提供了一种更加强大的异步编程的api。它实现了Future接口,也就是Future的功能特性CompletableFuture也有;除此之外,它也实现了CompletionStage接口,CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。

CompletableFuture相比于Future最大的改进就是提供了类似观察者模式的回调监听的功能,也就是当上一阶段任务执行结束之后,可以回调你指定的下一阶段任务,而不需要阻塞获取结果之后来处理结果。

CompletableFuture常见api详解

CompletableFuture的方法api多,但主要可以分为以下几类。

1、实例化CompletableFuture

构造方法创建
CompletableFuture<String> completableFuture = new CompletableFuture<>();
System.out.println(completableFuture.get());

此时如果有其它线程执行如下代码,就能执行打印出 三友

completableFuture.complete("三友")
静态方法创建

除了使用构造方法构造,CompletableFuture还提供了静态方法来创建

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

supply 和 run 的主要区别就是 supply 可以有返回值,run 没有返回值。至于另一个参数Executor 就是用来执行异步任务的线程池,如果不传Executor 的话,默认是ForkJoinPool这个线程池的实现。

一旦通过静态方法来构造,会立马开启异步线程执行Supplier或者Runnable提交的任务。

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "三友");
System.out.println(completableFuture.get());

一旦任务执行完成,就可以打印返回值,这里的使用方法跟Future是一样的。

所以对比两个两种实例化的方法,使用静态方法的和使用构造方法主要区别就是,使用构造方法需要其它线程主动调用complete来表示任务执行完成,因为很简单,因为在构造的时候没有执行异步的任务,所以需要其它线程主动调用complete来表示任务执行完成。

2、获取任务执行结果

public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();

get()和get(long timeout, TimeUnit unit)是实现了Future接口的功能,两者主要区别就是get()会一直阻塞直到获取到结果,get(long timeout, TimeUnit unit)值可以指定超时时间,当到了指定的时间还未获取到任务,就会抛出TimeoutException异常。

getNow(T valueIfAbsent):就是获取任务的执行结果,但不会产生阻塞。如果任务还没执行完成,那么就会返回你传入的 valueIfAbsent 参数值,如果执行完成了,就会返回任务执行的结果。

join():跟get()的主要区别就是,get()会抛出检查时异常,join()不会。

3、主动触发任务完成

public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);

complete:主动触发当前异步任务的完成。调用此方法时如果你的任务已经完成,那么方法就会返回false;如果任务没完成,就会返回true,并且其它线程获取到的任务的结果就是complete的参数值。

completeExceptionally:跟complete的作用差不多,complete是正常结束任务,返回结果,而completeExceptionally就是触发任务执行的异常。

4、对任务执行结果进行下一步处理

只能接收任务正常执行后的回调
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

这类回调的特点就是,当任务正常执行完成,没有异常的时候就会回调。

thenApply:可以拿到上一步任务执行的结果进行处理,并且返回处理的结果 thenRun:拿不到上一步任务执行的结果,但会执行Runnable接口的实现 thenAccept:可以拿到上一步任务执行的结果进行处理,但不需要返回处理的结果

thenApply示例:

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> 10).thenApply(v -> ("上一步的执行的结果为:" + v));
System.out.println(completableFuture.join());

执行结果:

上一步的执行的结果为:10

thenRun示例:

CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10).thenRun(() -> System.out.println("上一步执行完成"));

执行结果:

上一步执行完成

thenAccept示例:

CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10).thenAccept(v -> System.out.println("上一步执行完成,结果为:" + v));

执行结果:

上一步执行完成,结果为:10

thenApply有异常示例:

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {//模拟异常int i = 1 / 0;return 10;
}).thenApply(v -> ("上一步的执行的结果为:" + v));
System.out.println(completableFuture.join());

执行结果:

Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)

当有异常时是不会回调的

只能接收任务处理异常后的回调
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

当上面的任务执行过程中出现异常的时候,会回调exceptionally方法指定的回调,但是如果没有出现异常,是不会回调的。

exceptionally能够将异常给吞了,并且fn的返回值会返回回去。

其实这个exceptionally方法有点像降级的味道。当出现异常的时候,走到这个回调,可以返回一个默认值回去。

没有异常情况下:

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {return 100;
}).exceptionally(e -> {System.out.println("出现异常了,返回默认值");return 110;
});
System.out.println(completableFuture.join());

执行结果:

100

有异常情况下:

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 100;
}).exceptionally(e -> {System.out.println("出现异常了,返回默认值");return 110;
});
System.out.println(completableFuture.join());

执行结果:

出现异常了,返回默认值
110
能同时接收任务执行正常和异常的回调
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> actin);

不论前面的任务执行成功还是失败都会回调的这类方法指定的回调方法。

handle : 跟exceptionally有点像,但是exceptionally是出现异常才会回调,两者都有返回值,都能吞了异常,但是handle正常情况下也能回调。

whenComplete:能接受正常或者异常的回调,并且不影响上个阶段的返回值,也就是主线程能获取到上个阶段的返回值;当出现异常时,whenComplete并不能吞了这个异常,也就是说主线程在获取执行异常任务的结果时,会抛出异常。

这里演示一下whenComplete处理异常示例情况,handle跟exceptionally对异常的处理差不多。

whenComplete处理异常示例:

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 10;
}).whenComplete((r, e) -> {System.out.println("whenComplete被调用了");
});
System.out.println(completableFuture.join());

执行结果:

whenComplete被调用了
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)

5、对任务结果进行合并

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);

这个方法的意思是,当前任务和other任务都执行结束后,拿到这两个任务的执行结果,回调 BiFunction ,然后返回新的结果。

thenCombine的例子请往下继续看。

6、以Async结尾的方法

上面说的一些方法,比如说thenAccept方法,他有两个对应的Async结尾的方法,如下:

public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

thenAcceptAsync跟thenAccept的主要区别就是thenAcceptAsync会重新开一个线程来执行下一阶段的任务,而thenAccept还是用上一阶段任务执行的线程执行。

两个thenAcceptAsync主要区别就是一个使用默认的线程池来执行任务,也就是ForkJoinPool,一个是使用方法参数传入的线程池来执行任务。

当然除了thenAccept方法之外,上述提到的方法还有很多带有Async结尾的对应的方法,他们的主要区别就是执行任务是否开启异步线程来执行的区别。

当然,还有一些其它的api,可以自行查看

CompletableFuture在RocketMQ中的使用

CompletableFuture在RocketMQ中的使用场景比较多,这里我举一个消息存储的场景。

在RocketMQ中,Broker接收到生产者产生的消息的时候,会将消息持久化到磁盘和同步到从节点中。持久化到磁盘和消息同步到从节点是两个独立的任务,互不干扰,可以相互独立执行。当消息持久化到磁盘和同步到从节点中任务完成之后,需要统计整个存储消息消耗的时间,所以统计整个存储消息消耗的时间是依赖前面两个任务的完成。

图片

实现代码如下

消息存储刷盘任务和主从复制任务:

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 提交刷盘的请求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
//提交主从复制的请求
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);//刷盘 和 主从复制 两个异步任务通过thenCombine联合
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {// 当两个刷盘和主从复制任务都完成的时候,就会回调// 如果刷盘没有成功,那么就将消息存储的状态设置为失败if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}// 如果主从复制没有成功,那么就将消息存储的状态设置为失败if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}// 最终返回消息存储的结果return putMessageResult;
});

对上面两个合并的任务执行结果通过thenAccept方法进行监听,统计消息存储的耗时:

//消息存储的开始时间
long beginTime = this.getSystemClock().now();
// 存储消息,然后返回 CompletableFuture,也就是上面一段代码得返回值‍
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);//监听消息存储的结果
putResultFuture.thenAccept((result) -> {// 消息存储完成之后会回调long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().add(1);}
});

CompletableFuture的优点

1、异步函数式编程,实现优雅,易于维护;

2、它提供了异常管理的机制,让你有机会抛出、管理异步任务执行中发生的异常,监听这些异常的发生;

3、拥有对任务编排的能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/172167.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FloodFill

"绝境之中才窥见&#xff0c;Winner&#xff0c;Winner" FloodFill算法简介: floodfill又翻译成漫水填充。我们可以将下面的矩阵理解为一片具有一定高度的坡地&#xff0c;此时突发洪水&#xff0c;洪水会将高度<0的地方填满。 话句话来说&#xff0c;Fl…

IDEA2023版本创建Sping项目只能勾选17和21,却无法使用Java8?(已解决)

文章目录 前言分析解决方案一&#xff1a;替换创建项目的源方案二&#xff1a;升级JDK版本 参考文献 前言 起因 想创建一个springboot的项目&#xff0c;本地安装的是1.8&#xff0c;但是在使用Spring Initializr创建项目时&#xff0c;发现版本只有17和21。 在JDK为1.8的情况下…

代码随想录算法训练营第四十六天【动态规划part08】 | 139.单词拆分、背包总结

139.单词拆分 题目链接&#xff1a; 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 求解思路&#xff1a; 单词是物品&#xff0c;字符串s是背包&#xff0c;单词能否组成字符串s&#xff0c;就是问物品能不能把背包装满。 动规五部曲 确定dp数…

弹窗concrt140.dll丢失的解决方法,深度解析concrt140.dll丢失的原因

在计算机使用过程中&#xff0c;我们经常会遇到一些错误提示或者系统崩溃的情况。其中&#xff0c;concrt140.dll是一个常见的错误提示&#xff0c;这个错误通常会导致某些应用程序无法正常运行。为了解决这个问题&#xff0c;本文将介绍5种详细的解决方法&#xff0c;帮助您恢…

蓝桥杯官网算法赛(蓝桥小课堂)

问题描述 蓝桥小课堂开课啦&#xff01; 海伦公式&#xff08;Herons formula&#xff09;&#xff0c;也称为海伦-秦九韶公式&#xff0c;是用于计算三角形面积的一种公式&#xff0c;它可以通过三条边的长度来确定三角形的面积&#xff0c;而无需知道三角形的高度。 海伦公…

有理数比较

【问题描述】编写函数CompareRational()&#xff0c;比较两个有理数的大学&#xff0c;该函数的参数为两个有理数&#xff08;结构体类型&#xff0c;包含分子分母两个整数&#xff09;&#xff0c;若第一个有理数小于第二个&#xff0c;返回一个负数&#xff1b;若相等&#x…

Python 进阶(十):数学计算(math 模块)

《Python入门核心技术》专栏总目录・点这里 文章目录 1. 导入math模块2. 常用数学函数3. 常量4. 其他函数和用法5. 总结 大家好&#xff0c;我是水滴~~ Python的math模块提供了许多数学运算函数&#xff0c;为开发者在数值计算和数据处理方面提供了强大的工具。本教程将详细介…

【100个Cocos实例】看完这个,我再也不要当赌狗了...

引言 探索游戏开发中抽奖转盘的奥秘。 抽奖转盘是一种常见的互动元素&#xff0c;通常用于游戏、营销活动等场景。 本文将介绍一下抽奖转盘的原理和实现。 本文源工程可在文末阅读原文获取&#xff0c;小伙伴们自行前往。 1.抽奖转盘的组成 抽奖转盘的实现涉及多个组成部分…

基于springboot校园车辆管理系统

背景 伴随着社会经济的快速发展&#xff0c;机动车保有量不断增加。不断提高的大众生活水平以及人们不断增长的自主出行需求&#xff0c;人们对汽车的 依赖性在不断增强。汽车已经发展成为公众日常出行的一种重要的交通工具。在如此形势下&#xff0c;高校校园内的机动车数量也…

【KubeSphere】基于AWS在 Linux 上以 All-in-One 模式安装 KubeSphere

文章目录 一、实验配置说明二、实验准备工作1.确认系统版本2. 修改网络DNS3. 关闭SELINUX4. 关闭防火墙 三、实验依赖项安装四、下载 KubeKey五、一键化安装部署六、验证安装结果七、登录KubeSphere管理控制台八、参考链接 一、实验配置说明 本实验基于AWS启动一台新实例&…

I/O多路转接之epoll

承接上文 I/O多路转接之poll-CSDN博客 简介 epoll的相关系统调用 epoll底层原理 编写epoll的server 重新回归一下epoll原理&#xff0c;LT&#xff0c;ET epoll改成ET工作模式 -- 初识(有bug) epoll初识 按照man手册的说法: 是为处理大批量句柄而作了改进的poll. 它是在2.5.4…

CANdelaStudio 使用教程4 编辑State

文章目录 简述1、State Groups2、Dependencies3、 Defaults State1、 会话状态2、 新增会话状态3、 编辑 服务对 State 的依赖关系 State Diagram 简述 1、State Groups 2、Dependencies 在这里&#xff0c;可以编辑现有服务在不同会话状态或安全访问状态的支持情况和状态转换…

python每日一题——7接雨水

题目 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 示例 1&#xff1a; 输入&#xff1a;height [0,1,0,2,1,0,1,3,2,1,2,1] 输出&#xff1a;6 解释&#xff1a;由数组 [0,1,0,2,1,0,1,3,2,1,2,1] 表…

粒子群算法Particle Swarm Optimization (PSO)的定义,应用优点和缺点的总结!!

文章目录 前言一、粒子群算法的定义二、粒子群算法的应用三、粒子群算法的优点四、粒子群算法的缺点&#xff1a;粒子群算法的总结 前言 粒子群算法是一种基于群体协作的随机搜索算法&#xff0c;通过模拟鸟群觅食行为而发展起来。该算法最初是由Eberhart博士和Kennedy博士于1…

概念解析 | 玻尔兹曼机

注1:本文系“概念解析”系列之一,致力于简洁清晰地解释、辨析复杂而专业的概念。本次辨析的概念是:玻尔兹曼机。 概念解析 | 玻尔兹曼机 引言 随着人工智能技术的飞速发展,玻尔兹曼机作为一种重要的生成模型,受到了广泛的关注。 背景介绍 玻尔兹曼机(Boltzmann Machine)是一…

Mapper 编写有哪几种方式, 使用MyBatis的mapper接口调用时有哪些要求,接口绑定有两种实现方式, MyBatis高级查询

文章目录 Mapper 编写有哪几种方式&#xff1f;接口绑定有两种实现方式使用MyBatis的mapper接口调用时有哪些要求&#xff1f;Mybatis的Xml映射文件中&#xff0c;不同的Xml映射文件&#xff0c;id是否可以重复&#xff1f;简述Mybatis的Xml映射文件和Mybatis内部数据结构之间的…

uniapp+vue基于Android的校园二手跳蚤市场的设计与实现 微信小程序

实现功能&#xff1a; 用户管理&#xff1a;登陆、注册、注销、修改密码、上传头像、修改资料 发布与检索&#xff1a;发布商品、模糊搜索、人气排序、价格排序、时间排序、推送商品&#xff08;协同过滤算法实现个性化推荐&#xff09;&#xff0c;最新发布、分类检索 核心交易…

Java算法小结-Arrays的应用

/* public static String toString(数组) 数组拼接成一个字符串 public static int binarySearch(数组,查找的元素) 二分查找法查找元素 public static int[] copyOf(原数组&#xff0c;新数组的长度) public static int[] copyOfRange(原数组&#xff0c;起始索引&#xff0c;…

1、分布式锁实现原理与最佳实践(一)

在单体的应用开发场景中涉及并发同步时&#xff0c;大家往往采用Synchronized&#xff08;同步&#xff09;或同一个JVM内Lock机制来解决多线程间的同步问题。而在分布式集群工作的开发场景中&#xff0c;就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题&…

安卓系统修图软件(二)

晚上好&#xff0c;自上一次博主分享修图软件之后&#xff0c;今天博主将带来第二期安卓修图软件的推送&#xff0c;个个都是宝藏&#xff0c;建议大家赶紧体验哦。 1.canva可画 如果说有一款手机APP可以与PS媲美&#xff0c;那么一定非canvas莫属。这款强大的修图软件支持海报…