Java中CompletableFuture 异步编排的基本使用

一、前言

        在复杂业务场景中,有些数据需要远程调用,导致查询时间缓慢,影响以下代码逻辑运行,并且这些浪费时间的逻辑与以后的请求并没有关系,这样会大大增加服务的时间。

        假如商品详情页的每个查询,需要如下标注的时间才能完成 。那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。 如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。

 

        在 Java 8 , 新增加了一个包含 50 个方法左右的类 : CompletableFuture ,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以 通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。 CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过 `get` 方法阻塞或 者轮询的方式获得结果,但是这种方式不推荐使用。 CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

 

public static Completab1eFuture runAsync(Runnable runnable)
public static completableFuturecVoid> runAsync(Runnable runnable,Executor executor)
public static CompletableFuture supplyAsync(Suppliersupplier)
public static CompletableFuturecU> supplyAsync(Supplier supplier,Executor executor)
1 runXxxx 都是没有返回结果的, supplyXxx 都是可以获取返回结果的
2 、可以传入自定义的线程池,否则就用默认的线程池;
3、Async代表异步方法

 

1.1 runAsync 不带返回值

public class ThreadTest {//        ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(  100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) {CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {System.out.println("当前线程:"+Thread.currentThread().getName());int i = 10 / 2;System.out.println("运行结果...."+i);}, executor);}
}

1.2 supplyAsync 带返回值 

public class ThreadTest {//        ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(  100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("运行结果...." + i);return i;}, executor);Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
}

 2、计算完成时回调方法

public completableFuture whencomplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuturewhenCompleteAsync(BiConsumer <? super T,? super Throwable> action);
public completableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action,Executor executor);
public completableFutureexceptionally(Function<Throwable,? extends T> fn);
whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。
whenComplete 和 whenCompleteAsync 的区别:
whenComplete: 是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync: 是执行把 whenCompleteAsync 这个任务继续提交给线程池
来进行执行。
方法不以 Async 结尾, 意味着 Action 使用相同的线程执行, 而 Async 可能会使用其他线程执行(如果是使用相同的线程池, 也可能会被同一个线程选中执行)

2.1 whenCompleteAsync 完成回调 (没有异常情况情况)

public class ThreadTest {//        ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(  100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("运行结果...." + i);return i;}, executor).whenCompleteAsync((res, exception) -> {System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);},executor);Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
}

 有异常情况

public class ThreadTest {//        ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(  100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 0;System.out.println("运行结果...." + i);return i;}, executor).whenCompleteAsync((res, exception) -> {System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);},executor);Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
}

此处虽然得到了异常信息但是没有办法修改返回数据,使用exceptionally自定义异常时的返回值 

 2.2 exceptionally 异常感知及处理

异常情况

public class ThreadTest {//        ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(  100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 0;System.out.println("运行结果...." + i);return i;}, executor).whenCompleteAsync((res, exception) -> {System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);},executor).exceptionally(throwable -> {return 0;});Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
}

 无异常,情况正常返回不会进exceptionally

public class ThreadTest {//        ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(  100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("运行结果...." + i);return i;}, executor).whenCompleteAsync((res, exception) -> {System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);},executor).exceptionally(throwable -> {return 0;});Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
}

2.3 最终处理 handle 方法

和 complete 一样, 可对结果做最后的处理(可处理异常),可改变返回值。

总结:使用R apply(T t, U u); 可以感知异常,和修改返回值的功能。

public completionStage handle(BiFunction<? super T,Throwable,? extends U> fn);
public completionStagehandleAsync(BiFunction<? super T,Throwable,? extends U> fn);
public > CompletionStage handleAsync(BiFunction<? super T,Throwable,? extends U> fn,Executor executor ) ;

 有异常情况

	public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 0;System.out.println("运行结果...." + i);return i;}, executor).handleAsync((res, throwable) -> {if (res!=null){return res*2;}if (throwable!=null){System.out.println("出现异常"+throwable.getMessage());return -1;}return 0;},executor);Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}

无异常情况 

	public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 6;System.out.println("运行结果...." + i);return i;}, executor).handleAsync((res, throwable) -> {if (res!=null){return res*2;}if (throwable!=null){System.out.println("出现异常"+throwable.getMessage());return -1;}return 0;},executor);Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
2.3.1总结 

总结:一般用handle,因为whencomplete如果异常不能给定默认返回结果,需要再调用exceptionally,而handle可以

该方法作用:获得前一任务的返回值【自己也可以是异步执行的】,也可以处理上一任务的异常,调用exceptionally修改前一任务的返回值【例如异常情况时给一个默认返回值】而handle方法可以简化操作


以下用法大致相同,只列举具体方法 

 2.4 线程串行化方法

public CompletableFuture thenApply(Function<? super T,? extends U> fn)
public Completab1eFuture thenApplyAsync(Function<? super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn,Executor executor)public completionstage thenAccept(Consumer<? super T> action);
public completionStage thenAcceptAsync(Consumer<? super T> action);
public CompletionStagecVoid> thenAcceptAsync(Consumer<? super T> action,Executor executor);public Completionstage thenRun(Runnable action);
public Completionstage thenRunAsync(Runnable action);
public completionStage thenRunAsync(Runnable action,Executor executor);

 thenApply:继续执行,感知上一任务的返回结果,并且自己的返回结果也被下一个任务所感知
thenAccept:继续执行,接受上一个任务的返回结果,自己执行完没有返回结果
thenRun:继续执行,不接受上一个任务的返回结果,自己执行完也没有返回结果
以上都要前置任务成功完成。
Function<? super T,? extends U>
T: 上一个任务返回结果的类型
U: 当前任务的返回值类型

 2.5 两任务组合 - 都要完成

public <U,V> CompletableFuture thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);public CompletableFuture thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);public CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);public CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);public CompletableFuture runAfterBoth(CompletionStage<?> other, Runnable action);public CompletableFuture runAfterBothAsync(CompletionStage<?> other, Runnable action);public CompletableFuture runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);

thenCombine:组合两个future,获取前两个future的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取前两个future任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个future,不需要获取之前任务future的结果,只需两个future处理完任务后,处理该任务。

 2.5.1 runAfterBothAsync
 public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());System.out.println("任务二运行结束....");return "hello";}, executor);future01.runAfterBothAsync(future02,() -> {System.out.println("任务三开始...");});System.out.println("返回数据:");}

 2.6 两个任务 - 一个完成

  1.  applyToEither: 两个任务有一个执行完成, 获取它的返回值, 处理任务并有新的返回值。
  2. acceptEither: 两个任务有一个执行完成, 获取它的返回值, 处理任务, 没有新的返回值。
  3. runAfterEither: 两个任务有一个执行完成, 不需要获取 future 的结果, 处理任务, 也没有返回值。

2.7 多任务组合 

//allOf: 等待所有任务完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {return andTree(cfs, 0, cfs.length - 1);
}//anyOf: 只要有一个任务完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {return orTree(cfs, 0, cfs.length - 1);
}
 2.7.1 allOf
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务二运行结束....");return "hello";}, executor);CompletableFuture<Object> future03 = CompletableFuture.supplyAsync(() -> {System.out.println("任务三线程开始:" + Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务三运行结束....");return "hello2";}, executor);CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03);allOf.get();//等待所有任务完成System.out.println("返回数据:");}
 2.7.2 anyOf
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务二运行结束....");return "hello";}, executor);CompletableFuture<Object> future03 = CompletableFuture.supplyAsync(() -> {System.out.println("任务三线程开始:" + Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务三运行结束....");return "hello2";}, executor);CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future01, future02, future03);anyOf.get();//等待其中之一任务完成System.out.println("返回数据:");}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/611679.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

书生·浦语第三次作业

我最近在参加书生浦语大模型实战营&#xff0c;这是第三次作业打卡&#xff01; 如果你也想两周玩转大模型微调&#xff0c;部署与测评全链路。报名链接&#xff1a;invite 书生浦语大模型实战营报名 邀请码可以填026014 一、基础作业&#xff1a;复现课程知识库助手搭建过程…

WEB之HTML练习

第一题&#xff1a;用户注册界面 HTML代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><titl…

【Python】使用tkinter设计开发Windows桌面程序记事本(2)

上一篇&#xff1a;【Python】使用tkinter设计开发Windows桌面程序记事本&#xff08;1&#xff09;-CSDN博客 下一篇&#xff1a; 作者发炎 此代码模块是继承上一篇文章的代码模块的基础上开始设计开发的。 如果不知道怎么新建"记事本项目"文件夹&#xff0c;请参…

C++ 多态以及多态的原理

文章目录 多态的概念多态的构成条件虚函数的重写虚函数重写的两个例外 重载、重写(覆盖)、重定义(隐藏)对比C11 final 和 override关键字抽象类接口继承和普通继承多态的原理虚函数表多态的原理 单继承和多继承关系的虚函数表单继承中的虚函数表多继承中的虚函数表 多态的概念 …

Linux安装nginx并设置为开机自启动

1.更新gcc安装包 yum install -y gcc pcre-devel zlib-devel2.下载文件并解压 命令如下 wget https://nginx.org/download/nginx-1.18.0.tar.gz //解压nginx tar -zxvf nginx-1.18.0.tar.gz //进入后进行指定安装位置 ./configure --prefix/usr/local/nginx //安装 make &…

面试宝典之spring框架常见面试题

F1、类的反射机制有啥用&#xff1f; &#xff08;1&#xff09;增加程序的灵活性&#xff0c;可扩展性&#xff0c;动态创建对象。 &#xff08;2&#xff09;框架必备&#xff0c;任何框架的封装都要用反射。&#xff08;框架的灵魂&#xff09; F2、获取Class对象的三种方…

小程序分销商城,打造高效线上购物体验

小程序商城系统&#xff0c;为您带来前所未有的在线购物体验。它不仅提供线上商城购物、在线下单、支付及配送等功能&#xff0c;还凭借其便捷性成为众多商家的首选。 想象一下&#xff0c;商家可以展示琳琅满目的商品&#xff0c;包括图片、文字描述、价格及库存等详尽信息。而…

内网渗透实战攻略

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 &#x1f4ab;个人格言:"没有罗马,那就自己创造罗马~" 目录 介绍 什么是内网&#xff1f; 什么是内网渗透&#xff1f; 内网渗透的目的&#xff1a; 内网…

2024--Django平台开发-Django知识点(六)

day06 Django知识点 今日概要&#xff1a; Form和ModelForm组件【使用】【源码】缓存【使用】ORM【使用】其他&#xff1a;ContentTypes、Admin、权限、分页、信号等 1.Form和ModelForm组件 背景&#xff1a;某个公司后台管理项目。 垃圾 def register(request):"&quo…

k8s之pod基础

k8s之pod基础 pod&#xff1a; pod是k8s中最小的资源管理组件 pod也是最小化运行容器化的应用的资源管理对象 pod是一个抽象的概念&#xff0c;可以理解为一个或者多个容器化应用的集合 在一个pod当中运行一个容器是最常用的方式 在pod当中同时运行多个容器&#xff0c;…

Qt QRadioButton单选按钮控件

文章目录 1 属性和方法1.1 文本1.2 选中状态1.3 自动排他1.4 信号和槽 2 实例2.1 布局2.2 代码实现 Qt中的单选按钮类是QRadioButton它是一个可以切换选中&#xff08;checked&#xff09;或未选中&#xff08;unchecked&#xff09;状态的单选按钮单选按钮常用在“多选一”的场…

分布式全局id

分布式全局id snowflake 算法是 twitter 开源的分布式 id 生成算法&#xff0c;采用 Scala 语言实现&#xff0c;是把一个 64 位的 long 型的 id&#xff0c;1 个 bit 是不用的&#xff0c;用其中的 41 bits 作为毫秒数&#xff0c;用 10 bits 作为工作机器 id&#xff0c;12 …

基于sprinmgboot实习管理系统源码和论文

随着信息化时代的到来&#xff0c;管理系统都趋向于智能化、系统化&#xff0c;实习管理也不例外&#xff0c;但目前国内仍都使用人工管理&#xff0c;市场规模越来越大&#xff0c;同时信息量也越来越庞大&#xff0c;人工管理显然已无法应对时代的变化&#xff0c;而实习管理…

人工智能复习

机器学习中线性回归和逻辑回归&#xff1a; 机器学习的分类&#xff1a; 监督学习和无监督学习&#xff0c;半监督学习 监督学习&#xff08;Supervised Learning&#xff09;&#xff1a; 监督学习是一种利用带有标签&#xff08;标记&#xff09;的数据进行训练的机器学习…

使用Windbg动态调试目标进程的一般步骤详解

目录 1、概述 2、将Windbg附加到已经启动起来的目标进程上&#xff0c;或者用Windbg启动目标程序 2.1、将Windbg附加到已经启动起来的目标进程上 2.2、用Windbg启动目标程序 2.3、Windbg关联到目标进程上会中断下来&#xff0c;输入g命令将该中断跳过去 3、分析实例说明 …

鸿蒙HarmonyOS学习手册_入门篇

鸿蒙HarmonyOS学习手册_入门篇 文章目录 鸿蒙HarmonyOS学习手册_入门篇入门快速入门开发准备基本概念UI框架应用模型工具准备 构建第一个ArkTS应用&#xff08;Stage模型&#xff09;-快速入门-入门创建ArkTS工程ArkTS工程目录结构&#xff08;Stage模型&#xff09;构建第一个…

Vue-10、Vue键盘事件

1、vue中常见的按键别名 回车 ---------enter <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>键盘事件</title><!--引入vue--><script type"text/javascript" src"h…

使用Vivado Design Suite平台板、将IP目录与平台板流一起使用

使用Vivado Design Suite平台板流 Vivado设计套件允许您使用AMD目标设计平台板&#xff08;TDP&#xff09;创建项目&#xff0c;或者已经添加到板库的用户指定板。当您选择特定板&#xff0c;Vivado设计工具显示有关板的信息&#xff0c;并启用其他设计器作为IP定制的一部分以…

以数据资产入表为抓手,推动数据资产化

在数字化时代&#xff0c;数据已经成为企业的重要资产。数据资产化是将数据视为一种有价值的资产&#xff0c;对其进行有效管理和利用的过程。而数据资产入表则是将数据资产纳入财务报表&#xff0c;以反映其价值和对企业财务状况的影响。本文亿信华辰 将深入探讨数据资产化与数…

每天学习一点点之 Spring Boot 1.x 升级 2.x 之 allowBeanDefinitionOverriding

最近组内大佬正在进行 Spring Boot 版本的升级&#xff0c;从 1.x 版本升级到 2.x 版本。在查看代码变更时&#xff0c;我注意到我之前编写的一个名为 ShardingRuleStrategy 的类被添加了 Primary 注解。这个类在原来的代码中被标记为 Component&#xff0c;同时也在 API 中被定…