Java8 CompletableFuture异步编程-进阶篇

🏷️个人主页:牵着猫散步的鼠鼠 

🏷️系列专栏:Java全栈-专栏

🏷️个人学习笔记,若有缺误,欢迎评论区指正 

前言

我们在前面文章讲解了CompletableFuture这个异步编程类的基本用法,这节我们继续学习CompletableFuture相关的进阶知识,上文入口:Java8 CompletableFuture异步编程-入门篇-CSDN博客

1、异步任务的交互

异步任务交互指 将异步任务获取结果的速度相比较,按一定的规则( 先到先用 )进行下一步处理。

1.1 applyToEither

applyToEither() 把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步的操作。

CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)

演示案例:使用最先完成的异步任务的结果

public class ApplyToEitherDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 开启异步任务1CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {int x = new Random().nextInt(3);CommonUtils.sleepSecond(x);CommonUtils.printThreadLog("任务1耗时:" + x + "秒");return x;});
​// 开启异步任务2CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {int y = new Random().nextInt(3);CommonUtils.sleepSecond(y);CommonUtils.printThreadLog("任务2耗时:" + y + "秒");return y;});
​// 哪些异步任务的结果先到达,就使用哪个异步任务的结果CompletableFuture<Integer> future = future1.applyToEither(future2, (result -> {CommonUtils.printThreadLog("最先到达的结果:" + result);return result;}));
​// 主线程休眠4秒,等待所有异步任务完成CommonUtils.sleepSecond(4);Integer ret = future.get();CommonUtils.printThreadLog("ret = " + ret);}
}
​

速记心法:任务1、任务2就像两辆公交,哪路公交先到,就乘坐(使用)哪路公交。

以下是applyToEither 和其对应的异步回调版本

CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)
CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func)
CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func,Executor executor)

1.2 acceptEither

acceptEither() 把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步操作 ( 消费使用 )。

CompletableFuture<Void> acceptEither(CompletableFuture<T> other, Consumer<T> action)
CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action)  
CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action,Executor executor)

演示案例:使用最先完成的异步任务的结果

public class AcceptEitherDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 异步任务交互CommonUtils.printThreadLog("main start");// 开启异步任务1CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {int x = new Random().nextInt(3);CommonUtils.sleepSecond(x);CommonUtils.printThreadLog("任务1耗时:" + x + "秒");return x;});
​// 开启异步任务2CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {int y = new Random().nextInt(3);CommonUtils.sleepSecond(y);CommonUtils.printThreadLog("任务2耗时:" + y + "秒");return y;});
​// 哪些异步任务的结果先到达,就使用哪个异步任务的结果future1.acceptEither(future2,result -> {CommonUtils.printThreadLog("最先到达的结果:" + result);});
​// 主线程休眠4秒,等待所有异步任务完成CommonUtils.sleepSecond(4);CommonUtils.printThreadLog("main end");}
}

1.3 runAfterEither

如果不关心最先到达的结果,只想在有一个异步任务先完成时得到完成的通知,可以使用 runAfterEither() ,以下是它的相关方法:

CompletableFuture<Void> runAfterEither(CompletableFuture<T> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action, Executor executor)

提示

异步任务交互的三个方法和之前学习的异步的回调方法 thenApply、thenAccept、thenRun 有异曲同工之妙。

2、get() 和 join() 区别

get() 和 join() 都是CompletableFuture提供的以阻塞方式获取结果的方法。

那么该如何选用呢?请看如下案例:

public class GetOrJoinDemo {public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {return "hello";});
​String ret = null;// 抛出检查时异常,必须处理try {ret = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println("ret = " + ret);
​// 抛出运行时异常,可以不处理ret = future.join();System.out.println("ret = " + ret);}
}

使用时,我们发现,get() 抛出检查时异常 ,需要程序必须处理;而join() 方法抛出运行时异常,程序可以不处理。所以,join() 更适合用在流式编程中。

3、ParallelStream VS CompletableFuture

CompletableFuture 虽然提高了任务并行处理的能力,如果它和 Stream API 结合使用,能否进一步多个任务的并行处理能力呢?

同时,对于 Stream API 本身就提供了并行流ParallelStream,它们有什么不同呢?

我们将通过一个耗时的任务来体现它们的不同,更重要地是,我们能进一步加强 CompletableFuture 和 Stream API 的结合使用,同时搞清楚CompletableFuture 在流式操作的优势

需求:创建10个MyTask耗时的任务,统计它们执行完的总耗时

定义一个MyTask类,来模拟耗时的长任务

public class MyTask {private int duration;
​public MyTask(int duration) {this.duration = duration;}
​// 模拟耗时的长任务public int doWork() {CommonUtils.printThreadLog("doWork");CommonUtils.sleepSecond(duration);return duration;}
}

同时,我们创建10个任务,每个持续1秒。

IntStream intStream = IntStream.range(0, 10);
List<MyTask> tasks = intStream.mapToObj(item -> {return new MyTask(1);
}).collect(Collectors.toList());

3.1 并行流的局限

我们先使用串行执行,让所有的任务都在主线程 main 中执行。

public class SequenceDemo {public static void main(String[] args) {// 方案一:在主线程中使用串行执行// step 1: 创建10个MyTask对象,每个任务持续1s,存入list集合便于启动Stream操作IntStream intStream = IntStream.range(0, 10);List<MyTask> tasks = intStream.mapToObj(item -> {return new MyTask(1);}).collect(Collectors.toList());// step 2: 执行tasks集合中的每个任务,统计总耗时long start = System.currentTimeMillis();List<Integer> result = tasks.stream().map(myTask -> {return myTask.doWork();}).collect(Collectors.toList());long end = System.currentTimeMillis();double costTime = (end - start) / 1000.0;System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);}
}

它花费了10秒, 因为每个任务在主线程一个接一个的执行。

因为涉及 Stream API,而且存在耗时的长任务,所以,我们可以使用 parallelStream()

public class ParallelDemo {public static void main(String[] args) {// 方案二:使用并行流// step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合IntStream intStream = IntStream.range(0, 10);List<MyTask> tasks = intStream.mapToObj(item -> {return new MyTask(1);}).collect(Collectors.toList());// step 2: 执行10个MyTask,统计总耗时long start = System.currentTimeMillis();List<Integer> results = tasks.parallelStream().map(myTask -> {return myTask.doWork();}).collect(Collectors.toList());long end = System.currentTimeMillis();double costTime = (end - start) / 1000.0;System.out.printf("processed %d tasks %.2f second",tasks.size(),costTime);}
}

它花费了2秒多,因为此次并行执行使用了8个线程 (7个是ForkJoinPool线程池中的, 一个是 main 线程),需要注意是:运行结果由自己电脑CPU的核数决定。

3.2 CompletableFuture 在流式操作的优势

让我们看看使用CompletableFuture是否执行的更有效率

public class CompletableFutureDemo {public static void main(String[] args) {// 需求:创建10MyTask耗时的任务,统计它们执行完的总耗时// 方案三:使用CompletableFuture// step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合IntStream intStream = IntStream.range(0, 10);List<MyTask> tasks = intStream.mapToObj(item -> {return new MyTask(1);}).collect(Collectors.toList());// step 2: 根据MyTask对象构建10个耗时的异步任务long start = System.currentTimeMillis();List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {return CompletableFuture.supplyAsync(() -> {return myTask.doWork();});}).collect(Collectors.toList());// step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入List集合中List<Integer> results = futures.stream().map(future -> {return future.join();}).collect(Collectors.toList());long end = System.currentTimeMillis();double costTime = (end - start) / 1000.0;System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);}
}

运行发现,两者使用的时间大致一样。能否进一步优化呢?

CompletableFutures 比 ParallelStream 优点之一是你可以指定Executor去处理任务。你能选择更合适数量的线程。我们可以选择大于Runtime.getRuntime().availableProcessors() 数量的线程,如下所示:

public class CompletableFutureDemo2 {public static void main(String[] args) {// 需求:创建10MyTask耗时的任务,统计它们执行完的总耗时// 方案三:使用CompletableFuture// step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合IntStream intStream = IntStream.range(0, 10);List<MyTask> tasks = intStream.mapToObj(item -> {return new MyTask(1);}).collect(Collectors.toList());// 准备线程池final int N_CPU = Runtime.getRuntime().availableProcessors();// 设置线程池的数量最少是10个,最大是16个ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), N_CPU * 2));// step 2: 根据MyTask对象构建10个耗时的异步任务long start = System.currentTimeMillis();List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {return CompletableFuture.supplyAsync(() -> {return myTask.doWork();},executor);}).collect(Collectors.toList());// step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入List集合中List<Integer> results = futures.stream().map(future -> {return future.join();}).collect(Collectors.toList());long end = System.currentTimeMillis();double costTime = (end - start) / 1000.0;System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);// 关闭线程池executor.shutdown();}
}

测试代码时,电脑配置是4核8线程,而我们创建的线程池中线程数最少也是10个,所以,每个线程负责一个任务( 耗时1s ),总体来说,处理10个任务总共需要约1秒。

3.3 合理配置线程池中的线程数

正如我们看到的,CompletableFuture 可以更好地控制线程池中线程的数量,而 ParallelStream 不能

问题1:如何选用 CompletableFuture 和 ParallelStream ?

如果你的任务是IO密集型的,你应该使用CompletableFuture;

如果你的任务是CPU密集型的,使用比处理器更多的线程是没有意义的,所以选择ParallelStream ,因为它不需要创建线程池,更容易使用。

问题2:IO密集型任务和CPU密集型任务的区别?

CPU密集型也叫计算密集型,此时,系统运行时大部分的状况是CPU占用率近乎100%,I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU 使用率很高。比如说要计算1+2+3+…+ 10万亿、天文计算、圆周率后几十位等, 都是属于CPU密集型程序。

CPU密集型任务的特点:大量计算,CPU占用率一般都很高,I/O时间很短

IO密集型指大部分的状况是CPU在等I/O (硬盘/内存) 的读写操作,但CPU的使用率不高。

简单的说,就是需要大量的输入输出,例如读写文件、传输文件、网络请求。

IO密集型任务的特点:大量网络请求,文件操作,CPU运算少,很多时候CPU在等待资源才能进一步操作。

问题3:既然要控制线程池中线程的数量,多少合适呢?

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 Ncpu+1

如果是IO密集型任务,参考值可以设置为 2 * Ncpu,其中Ncpu 表示 核心数。

总结

通过这两篇文章的讲解,我们基本学习了CompletableFuture这个异步编程类的基础用法和相关进阶玩法,不过总体上还是偏理论,我后续可以可能会开一篇新的专栏,专门讲解和分享Java高并发相关的代码片段,都是比较实用,请多多支持吧~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/733167.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Web Worker:JavaScript的后台任务解决方案

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

SpringBoot项目没有启动按键

问题一&#xff1a; pom文件正常&#xff0c;但是springboot包报红&#xff0c;同时Plugin ‘org.springframework.boot:spring-boot-maven-plugin:‘ not found报红 解决办法&#xff1a; 无法识别使用哪个版本的 spring-boot-maven-plugin 包 <build><plugins>&…

javase day01笔记

第一天课堂笔记 Java第三代高级语言中的面向对象的语言 b/s 浏览器/服务器c/s 客户端/服务端 1991年詹姆斯高斯林在sun公司开发的Java 常用的dos命令 磁盘操作系统&#xff1a;dos win &#xff0b; r -》 cmd dos命令 切换盘符&#xff1a;直接输入对应盘符目录操作&#x…

【C++进阶】哈希的应用 --- 布隆过滤器

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前学习C和算法 ✈️专栏&#xff1a;C航路 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你有帮助的话 欢迎 评论&#x1f4ac; 点赞&#x1…

面试准备不充分,被Java守护线程干懵了,面试官主打一个东西没用但你得会

写在开头 面试官&#xff1a;小伙子请聊一聊Java中的精灵线程&#xff1f; 我&#xff1a;什么&#xff1f;精灵线程&#xff1f;啥时候精灵线程&#xff1f; 面试官&#xff1a;精灵线程没听过&#xff1f;那守护线程呢&#xff1f; 我&#xff1a;守护线程知道&#xff0c;就…

计算机软件文档编制规范GB_T 8567-2006

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 计算机软件文档编制规范概述 计算机软件文档编制规范&#xff08;Specification for computer software documentation&#xff09; 由TC28&#xff08;全国信息技术标准化技…

《HTTPS协议》

文章目录 一、什么是HTTPS协议二、理解关键字三、为什么要加密四、常见的加密方式1.对称加密2.非对称加密 五、如何进行加密&#xff1f;1.只使用对称加密2.只使用非对称加密3.双方都使用非对称加密4.使用对称加密非对称加密5.对称加密非对称加密CA证书认证5.1数据摘要&#xf…

GPT-4-turbo还是大家心中第一!Claude 3竞技场人类投票成绩出炉:仅居第三

Claude 3的竞技场排名终于揭晓了&#xff1a; 在仅仅3天的时间里&#xff0c;20000张投票使得排名的流量达到了前所未有的高度。 最后&#xff0c;Claude 3的"大杯"模型Opus以1233的分数赢得了胜利&#xff0c;成为了第一个能和GPT-4-Turbo匹敌的选手。 "中杯…

QGridLayout网格布局和QVBoxLayout垂直布局有着非常大的差别

QGridLayout网格布局&#xff1a;1.把这块控件划分成一个个的 单元格 2.把你的控件填充进入 单元格 3.这些有关限制大小的函数接口统统失效 setMaximumWidth&#xff08;&#xff09; setMinimumWidth() setPolicySize()图示&#xff1a;我是用的网格布局&#xff0c;左边放QT…

C# WinForm AndtUI第三方库 Table控件使用记录

环境搭建 1.在NuGet中搜索AndtUI并下载至C# .NetFramework WinForm项目。 2.添加Table控件至窗体。 使用方法集合 1.单元格点击事件 获取被点击记录特定列内容 private void dgv_CellClick(object sender, MouseEventArgs args, object record, int rowIndex, int columnIn…

【Python】装饰器函数

专栏文章索引&#xff1a;Python 原文章&#xff1a;装饰器函数基础_装饰函数-CSDN博客 目录 1. 学习装饰器的基础 2.最简单的装饰器 3.闭包函数装饰器 4.装饰器将传入的函数中的值大写 5. 装饰器的好处 6. 多个装饰器的执行顺序 7. 装饰器传递参数 8. 结语 1. 学习装饰…

利用IDEA创建Java项目使用Servlet工具

【文件】-【项目结构】 【模块】-【依赖】-【】-【JAR】 找到Tomcat的安装路径打开【lib】找到【servlet.jar】点击【确定】 勾选上jar,然后【应用】-【确定】 此时新建文件可以发现多了一个Servlet&#xff0c;我们点击会自动创建一个继承好的Servlet类

STL容器之哈希的补充——其他哈希问题

1.其他哈希问题 ​ 减少了空间的消耗&#xff1b; 1.1位图 ​ 位图判断在不在的时间复杂度是O(1)&#xff0c;速度特别快; ​ 使用哈希函数直接定址法&#xff0c;1对1映射&#xff1b; ​ 对于海量的数据判断在不在的问题&#xff0c;使用之前的一些结构已经无法满足&…

基于PSO粒子群算法的三角形采集堆轨道优化matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 5.完整程序 1.程序功能描述 假设一个收集轨道&#xff0c;上面有5个采集堆&#xff0c;这5个采集堆分别被看作一个4*20的矩阵&#xff08;下面只有4*10&#xff09;&#xff0c;每个模块&…

opencv编程

opencv编程 引言&#xff1a; ​ 本实验旨在介绍使用OpenCV进行图像处理和视频处理的基本操作。OpenCV&#xff08;Open Source Computer Vision Library&#xff09;是一个开源计算机视觉库&#xff0c;提供了丰富的图像和视频处理功能&#xff0c;既可以进行图像的读取、显示…

187基于matlab的弹道目标跟踪滤波方法

基于matlab的弹道目标跟踪滤波方法&#xff0c;扩展卡尔曼滤波&#xff08;extended Kalman filter, EKF&#xff09;、转换测量卡尔曼滤波&#xff08;conversion measurement Kalman filter, CMKF&#xff09;跟踪滤波&#xff0c;得到距离、方位角、俯仰角误差结果。程序已调…

人工智能|机器学习——DBSCAN聚类算法(密度聚类)

1.算法简介 DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的聚类算法&#xff0c;簇集的划定完全由样本的聚集程度决定。聚集程度不足以构成簇落的那些样本视为噪声点&#xff0c;因此DBSCAN聚类的方式也可以用于异常点的检测。 2.算法原…

《汇编语言》- 读书笔记 - 第17章-实验17 编写包含多个功能子程序的中断例程

《汇编语言》- 读书笔记 - 第17章-实验17 编写包含多个功能子程序的中断例程 逻辑扇区根据逻辑扇区号算出物理编号中断例程&#xff1a;通过逻辑扇区号对软盘进行读写 代码安装 int 7ch 测试程序效果 实现通过逻辑扇区号对软盘进行读写 逻辑扇区 计算公式: 逻辑扇区号 (面号*8…

CSS基础知识

font-family: "Trebuchet MS", Verdana, sans-serif; 字体栈&#xff0c;浏览器会一个一个试过去看下哪个可以用 font-size16px; font-size1em; font-size100%;//相对于16px 字体大小&#xff0c;需要进行单位换算16px1em font-weightnormal;//400font-weight属性…

YoloV5改进策略:Block改进|自研Block,涨点超猛|代码详解|附结构图

涨点效果 参考模型 参考的Block,如下图: 我对Block做了修改,修改后的结构图如下: 代码详解 from timm.models.layers import DropPathfrom torch import Tensor def channel_shuffle(x: Tensor, groups: