【Java异步编程】CompletableFuture基础(1):创建不同线程的子任务、子任务链式调用与异常处理

文章目录

    • 1. 三种实现接口
    • 2. 链式调用:保证链的顺序性与异步性
    • 3. CompletableFuture创建CompletionStage子任务
    • 4. 处理异常
      • a. 创建回调钩子
      • b. 调用handle()方法统一处理异常和结果
    • 5. 如何选择线程池:不同的业务选择不同的线程池

CompletableFuture是JDK 1.8引入的实现类,该类实现了Future和CompletionStage两个接口。该类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。

CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会进入另一个阶段。一个阶段可以理解为一个子任务,每一个子任务会包装一个Java函数式接口实例,表示该子任务所要执行的操作。

 

1. 三种实现接口

每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。

这三个常用的函数式接口的特点如下:

被包装接口功能描述
FunctionFunction接口的特点是:有输入、有输出。包装了Function实例的CompletionStage子任务需要一个输入参数,并会产生一个输出结果到下一步
RunnableRunnable接口的特点是:无输入、无输出。包装了Runnable实例的CompletionStage子任务既不需要任何输入参数,又不会产生任何输出。
ConsumerConsumer接口的特点是:有输入、无输出。包装了Consumer实例的CompletionStage子任务需要一个输入参数,但不会产生任何输出。

 

2. 链式调用:保证链的顺序性与异步性

多个CompletionStage构成了一条任务流水线,一个环节执行完成了可以将结果移交给下一个环节(子任务)​。多个CompletionStage子任务之间可以使用链式调用。

下面是一个顺序调用的例子:

使用 CompletionStage 及其方法构建了一个异步任务链,thenApply 用于对前一个阶段的结果进行计算并传递结果,thenAccept 用于消费前一个阶段的结果并执行操作,thenRun 用于执行无输入输出的操作。

oneStage//被thenApply包装CompletionStage子任务,由输入输出.thenApply(x -> square(x))  //消耗上游输出,但是没有输出.thenAccept(y -> System.out.println(y)) //不消耗上一个子任务的输出又不产生结果.thenRun(() -> System.out.println()) 

这种链式操作可以方便地将多个异步操作连接起来,同时保证了操作的顺序性和异步性,提高了代码的可维护性和并发性能。

 

接下来是一个异步调用的例子:

在这个例子中,task2 和 task3 都依赖于 task1 完成后执行,但它们可能并行执行,也就是说,task2 和 task3 的执行顺序是不确定的,它们不一定会按照 thenRunAsync 的顺序执行。

CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {System.out.println("Task 1");
});CompletableFuture<Void> task2 = task1.thenRunAsync(() -> {System.out.println("Task 2");
});CompletableFuture<Void> task3 = task1.thenRunAsync(() -> {System.out.println("Task 3");
});

 

3. CompletableFuture创建CompletionStage子任务

CompletableFuture定义了一组方法用于创建CompletionStage子任务(或者阶段性任务)​,基础的方法如下:

//子任务包装一个Runnable实例,并调用ForkJoinPool.commonPool()线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable)//子任务包装一个Runnable实例,并调用指定的executor线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)//子任务包装一个Supplier实例,并调用ForkJoinPool.commonPool()线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)//子任务包装一个Supplier实例,并使用指定的executor线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

其中主要注意的信息是:

  1. Supplier 表示一个无参数但有返回值的函数,Runnable表示无惨无返回值的函数
  2. 在使用CompletableFuture创建CompletionStage子任务时,如果没有指定Executor线程池,在默认情况下CompletionStage会使用公共的ForkJoinPool线程池。
  3. 它们都会交给线程池执行,get方法会堵塞主线程等待执行结果。

给一个例子:

//无返回值异步调用  
@Test  
public void runAsyncDemo() throws Exception {  CompletableFuture<Void> future = CompletableFuture.runAsync(() ->  {  sleepSeconds(1);//模拟执行1秒  Print.tco("run end ...");  });  //等待异步任务执行完成,最多等待2秒  future.get(2, TimeUnit.SECONDS);  
}  //有返回值异步调用  
@Test  
public void supplyAsyncDemo() throws Exception {  CompletableFuture<Long> future = CompletableFuture.supplyAsync(() ->  {  long start = System.currentTimeMillis();  sleepSeconds(1);//模拟执行1秒  Print.tco("run end ...");  return System.currentTimeMillis() - start;  });  //等待异步任务执行完成,现时等待2秒  long time = future.get(2, TimeUnit.SECONDS);  Print.tco("异步执行耗时(秒) = " + time / 1000);  
}

 

4. 处理异常

a. 创建回调钩子

可以为CompletionStage子任务设置特定的回调钩子,当计算结果完成或者抛出异常的时候,执行这些特定的回调钩子。

//设置子任务完成时的回调钩子
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)//设置子任务完成时的回调钩子,可能不在同一线程执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)//设置子任务完成时的回调钩子,提交给线程池executor执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action,Executor executor)//设置异常处理的回调钩子
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
@Test  
public void whenCompleteDemo() throws Exception {  CompletableFuture<Void> future = CompletableFuture.runAsync(() ->  {  sleepSeconds(1);//模拟执行1秒  Print.tco("抛出异常!");  throw new RuntimeException("发生异常");  //Print.tco("run end ...");  });  //设置执行完成后的回调钩子  future.whenComplete(new BiConsumer<Void, Throwable>() {  @Override  public void accept(Void t, Throwable action) {  Print.tco("执行完成!");  }  });  //设置发生异常后的回调钩子  future.exceptionally(new Function<Throwable, Void>() {  @Override  public Void apply(Throwable t) {  Print.tco("执行失败!" + t.getMessage());  return null;  }  });  future.get();  
}[ForkJoinPool.commonPool-worker-9]:抛出异常!
[main]:执行完成!
[ForkJoinPool.commonPool-worker-9]:执行失败!java.lang.RuntimeException: 发生异常
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 发生异常

有如下几个注意点:

  1. 调用cancel()方法取消CompletableFuture时,任务被视为异常完成,completeExceptionally()方法所设置的异常回调钩子也会被执行。
  2. 如果没有设置异常回调钩子,发生内部异常时会有两种情况发生:
    • 在调用get()时,如果遇到内部异常,get()方法就会抛出ExecutionException(执行异常)​。
    • 在调用join()和getNow(T)启动任务时,如果遇到内部异常,join()和getNow(T)方法就会抛出CompletionException。

 

b. 调用handle()方法统一处理异常和结果

//在执行任务的同一个线程中处理异常和结果
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);//可能不在执行任务的**同一个线程**中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);//在指定线程池executor中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
@Test  
public void handleDemo() throws Exception {  CompletableFuture<Void> future = CompletableFuture.runAsync(() ->  {  sleepSeconds(1);//模拟执行1秒  Print.tco("抛出异常!");  throw new RuntimeException("发生异常");  //Print.tco("run end ...");  });  //设置执行完成后的回调钩子  future.handle(new BiFunction<Void, Throwable, Void>() {  @Override  public Void apply(Void input, Throwable throwable) {  if (throwable == null) {  Print.tcfo("没有发生异常!");  } else {  Print.tcfo("sorry,发生了异常!");  }  return null;  }  });  future.get();  
}//
//[ForkJoinPool.commonPool-worker-1]:抛出异常! 
//[ForkJoinPool.commonPool-worker-1|CompletableFutureDemo$3.apply]: sorry,发生了异常!

 

5. 如何选择线程池:不同的业务选择不同的线程池

默认情况下,通过静态方法runAsync()、supplyAsync()创建的CompletableFuture任务会使用公共的ForkJoinPool线程池,默认的线程数是CPU的核数。当然,它的线程数可以通过以下JVM参数设置

     option:-Djava.util.concurrent.ForkJoinPool.common.parallelism

 

如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的IO操作,就会导致线程池中的所有线程都阻塞在IO操作上,造成线程饥饿,进而影响整个系统的性能。所以,强烈建议大家根据不同的业务类型创建不同的线程池,以避免互相干扰。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/894269.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自制虚拟机(C/C++)(一、分析语法和easyx运用,完整虚拟机实现)

网上对虚拟机的解释很多&#xff0c;其实本质就一句话 虚拟机就是机器语言解释器 我们今天要实现汇编语言解释器&#xff0c;下一次再加上ndisasm反汇编器就是真正虚拟机了 注:这里的虚拟机指的是VMware一类的&#xff0c;而不是JVM&#xff0c;python一样的高级语言解释器 …

36. printf

1. printf 格式化函数说的是 printf、 sprintf 和 scanf 这样的函数&#xff0c;分为格式化输入和格式化输出两类函数。学习 C 语言的时候常常通过 printf 函数在屏幕上显示字符串&#xff0c;通过 scanf 函数从键盘获取输入。这样就有了输入和输出了&#xff0c;实现了最基本…

实验八 JSP访问数据库

实验八 JSP访问数据库 目的&#xff1a; 1、熟悉JDBC的数据库访问模式。 2、掌握使用My SQL数据库的使用 实验要求&#xff1a; 1、通过JDBC访问mysql数据&#xff0c;实现增删改查功能的实现 2、要求提交实验报告&#xff0c;将代码和实验结果页面截图放入报告中 实验过程&a…

python学opencv|读取图像(四十六)使用cv2.bitwise_or()函数实现图像按位或运算

【0】基础定义 按位与运算&#xff1a;全1取1&#xff0c;其余取0。按位或运算&#xff1a;全0取0&#xff0c;其余取1。 【1】引言 前序学习进程中&#xff0c;已经对图像按位与计算进行了详细探究&#xff0c;相关文章链接如下&#xff1a; python学opencv|读取图像&…

使用vhd虚拟磁盘安装两个win10系统

使用vhd虚拟磁盘安装两个win10系统 前言vhd虚拟磁盘技术简介准备工具开始动手实践1.winX选择磁盘管理2.选择“操作”--“创建VHD”3.自定义一个位置&#xff0c;输入虚拟磁盘大小4.右键初始化磁盘5.选择GPT分区表格式6.右键新建简单卷7.给卷起个名字&#xff0c;用于区分8.打开…

基于云计算、大数据与YOLO设计的火灾/火焰目标检测

摘要&#xff1a;本研究针对火灾早期预警检测需求&#xff0c;采用在Kaggle平台获取数据、采用云计算部署的方式&#xff0c;以YOLOv11构建模型&#xff0c;使用云计算服务器训练模型。经训练&#xff0c;box loss从约3.5降至1.0&#xff0c;cls loss从约4.0降至1.0&#xff0c…

计算机毕业设计Python+CNN卷积神经网络考研院校推荐系统 考研分数线预测 考研推荐系统 考研爬虫 考研大数据 Hadoop 大数据毕设 机器学习

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

小程序-基础加强-自定义组件

前言 这次讲自定义组件 1. 准备今天要用到的项目 2. 初步创建并使用自定义组件 这样就成功在home中引入了test组件 在json中引用了这个组件才能用这个组件 现在我们来实现全局引用组件 在app.json这样使用就可以了 3. 自定义组件的样式 发现页面里面的文本和组件里面的文…

docker安装emqx

emqx安装 拉取emqx镜像 docker pull emqx/emqx:v4.1.0 运行docker容器 docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.1.0 放行端口 1、如果要是自己的虚拟机&#xff0c;并且关闭了防火墙&a…

【4Day创客实践入门教程】Day4 迈向高手之路——进一步学习!

Day4 迈向高手之路——进一步学习&#xff01; 目录 Day4 迈向高手之路——进一步学习&#xff01;更多的开发板外壳制作 Day0 创想启程——课程与项目预览Day1 工具箱构建——开发环境的构建Day2 探秘微控制器——单片机与MicroPython初步Day3 实战演练——桌面迷你番茄钟Day4…

深度学习之“缺失数据处理”

缺失值检测 缺失数据就是我们没有的数据。如果数据集是由向量表示的特征组成&#xff0c;那么缺失值可能表现为某些样本的一个或多个特征因为某些原因而没有测量的值。通常情况下&#xff0c;缺失值由特殊的编码方式。如果正常值都是正数&#xff0c;那么缺失值可能被标记为-1…

日志收集Day007

1.配置ES集群TLS认证: (1)elk101节点生成证书文件 cd /usr/share/elasticsearch ./bin/elasticsearch-certutil cert -out config/elastic-certificates.p12 -pass "" --days 3650 (2)elk101节点为证书文件修改属主和属组 chown elasticsearch:elasticsearch con…

arm-linux-gnueabihf安装

Linaro Releases windows下打开wsl2中的ubuntu&#xff0c;资源管理器中输入&#xff1a; \\wsl$gcc-linaro-4.9.4-2017.01-x86_64_arm-linux-gnueabihf.tar.xz 复制到/home/ark01/tool 在 Ubuntu 中创建目录&#xff1a; /usr/local/arm&#xff0c;命令如下&#xff1a; …

LabVIEW透镜多参数自动检测系统

在现代制造业中&#xff0c;提升产品质量检测的自动化水平是提高生产效率和准确性的关键。本文介绍了一个基于LabVIEW的透镜多参数自动检测系统&#xff0c;该系统能够在单一工位上完成透镜的多项质量参数检测&#xff0c;并实现透镜的自动搬运与分选&#xff0c;极大地提升了检…

【算法】动态规划专题① ——线性DP python

目录 引入简单实现稍加变形举一反三实战演练总结 引入 楼梯有个台阶&#xff0c;每次可以一步上1阶或2阶。一共有多少种不同的上楼方法&#xff1f; 怎么去思考&#xff1f; 假设就只有1个台阶&#xff0c;走法只有&#xff1a;1 只有2台阶&#xff1a; 11&#xff0c;2 只有3台…

C++11(中)

新增默认成员函数 C11之前&#xff0c;默认成员函数有六个&#xff0c;构造函数&#xff0c;析构函数&#xff0c;拷贝构造&#xff0c;拷贝赋值重载&#xff0c;取地址重载&#xff0c;const 取地址重载。 C11增加了 移动构造 和 移动赋值重载 如果类没有实现移动构造&…

强化学习笔记——4策略迭代、值迭代、TD算法

基于策略迭代的贝尔曼方程和基于值迭代的贝尔曼方程&#xff0c;关系还是不太理解 首先梳理一下&#xff1a; 通过贝尔曼方程将强化学习转化为值迭代和策略迭代两种问题 求解上述两种贝尔曼方程有三种方法&#xff1a;DP&#xff08;有模型&#xff09;&#xff0c;MC&#xff…

计算机网络 笔记 网络层 3

IPv6 IPv6 是互联网协议第 6 版&#xff08;Internet Protocol Version 6&#xff09;的缩写&#xff0c;它是下一代互联网协议&#xff0c;旨在解决 IPv4 面临的一些问题&#xff0c;以下是关于 IPv6 的详细介绍&#xff1a; 产生背景&#xff1a; 随着互联网的迅速发展&…

【搜索回溯算法篇】:拓宽算法视野--BFS如何解决拓扑排序问题

✨感谢您阅读本篇文章&#xff0c;文章内容是个人学习笔记的整理&#xff0c;如果哪里有误的话还请您指正噢✨ ✨ 个人主页&#xff1a;余辉zmh–CSDN博客 ✨ 文章所属专栏&#xff1a;搜索回溯算法篇–CSDN博客 文章目录 一.广度优先搜索&#xff08;BFS&#xff09;解决拓扑排…

23.Word:小王-制作公司战略规划文档❗【5】

目录 NO1.2.3.4 NO5.6​ NO7.8.9​ NO10.11​ NO12​ NO13.14 NO1.2.3.4 布局→页面设置对话框→纸张&#xff1a;纸张大小&#xff1a;宽度/高度→页边距&#xff1a;上下左右→版式&#xff1a;页眉页脚→文档网格&#xff1a;勾选只指定行网格✔→ 每页&#xff1a;…