基于CompletableFuture并发任务编排实现

文章目录

  • 并发任务编排实现
    • 不带返回值/参数传递任务
      • 串行执行
      • 并行执行
      • 并行执行-自定义线程池
      • 阻塞等待:多并行任务执行完再执行
      • 任意一个任务并发执行完就执行下个任务
      • 串并行任务依赖场景
    • 带返回值/参数传递任务
      • 带返回值实现
      • 串行执行
    • 多线程任务串行执行
      • 对任务并行执行,返回值combine
    • 写在最后

并发任务编排实现

其实Java8中提供了并发编程框架CompletableFuture,以下结合不同场景进行使用。

不带返回值/参数传递任务

模拟任务代码:

    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");class TaskA implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(2000);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务A", LocalDateTime.now().format(formatter)));}}class TaskB implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(1000);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务B", LocalDateTime.now().format(formatter)));}}class TaskC implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(50);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务C", LocalDateTime.now().format(formatter)));}}

串行执行

A、B、C任务串行执行
在这里插入图片描述

CompletableFuture,runAsync():异步执行
thenRun():上个任务结束再执行(不带上一个返回值结果)下一个任务
get():阻塞等待任务执行完成

实现方式:

    @Testvoid thenRunTest() throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.runAsync(new TaskA()).thenRun(new TaskB()).thenRun(new TaskC());future.get();}

输出:

threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务A] time:[2021-06-01 22:56:51]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务B] time:[2021-06-01 22:56:52]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务C] time:[2021-06-01 22:56:53]

从日志就能看出串行执行就是通过单线程执行多个任务。

并行执行

A、B、C任务并行执行
在这里插入图片描述

CompletableFuture.allOf():等待所有的CompletableFuture执行完成,无返回值

代码实现:

    /*** 并发执行ABC任务*/@SneakyThrows@Testvoid SeqTest(){String start = LocalDateTime.now().format(formatter);System.out.println(String.format("start task [%s]", start));CompletableFuture[] futures = new CompletableFuture[3];futures[0] = CompletableFuture.runAsync(new TaskA());futures[1] = CompletableFuture.runAsync(new TaskB());futures[2] = CompletableFuture.runAsync(new TaskC());CompletableFuture.allOf(futures).get();String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}

输出:

start task [2021-06-01 23:03:49]
threadName: [ForkJoinPool.commonPool-worker-3] taskName:[任务C] time:[2021-06-01 23:03:49]
threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任务B] time:[2021-06-01 23:03:50]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务A] time:[2021-06-01 23:03:51]
end task [2021-06-01 23:03:51]

上述这种方式执行可以看出CompletableFuture默认使用的是ForkJoinPool.commonPool线程池,居然用的默认线程池那线程数是如何配置的呢?后来找到源码发现commonPool线程池配置代码如下
在这里插入图片描述

  1. 先去看看java环境变量有没有制定线程数(如果没有特殊制定默认没有)
  2. 如果没有配置则通过操作系统的核心数减一来设置线程数(我理解的减一应该是为了给main thread执行)
  3. 这种默认配置方式适合用于CPU密集型任务,如果IO型需要我们自己去配置线程池

并行执行-自定义线程池

不是所有任务都是CPU密集型,为了解决上述问题,尤其是IO场景,我们需要根据业务场景配置合理线程数充分使其利用cpu资源。
如何合理配置线程数可以参考我之前文章

    @SneakyThrows@Testvoid ParTestWithThreadPool(){String start = LocalDateTime.now().format(formatter);System.out.println(String.format("start task [%s]", start));ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(24, 32, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[3];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);futures[2] = CompletableFuture.runAsync(new TaskC(), customThreadPool);CompletableFuture.allOf(futures).get();String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}

输出:

start task [2021-06-02 00:00:05]
threadName: [pool-1-thread-3] taskName:[任务C] time:[2021-06-02 00:00:05]
threadName: [pool-1-thread-2] taskName:[任务B] time:[2021-06-02 00:00:06]
threadName: [pool-1-thread-1] taskName:[任务A] time:[2021-06-02 00:00:07]
end task [2021-06-02 00:00:07]

阻塞等待:多并行任务执行完再执行

A、B并行都执行完后再执行C任务
在这里插入图片描述

    @AfterTestvoid after(){String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}@Testvoid SeqAndParTest() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);CompletableFuture.allOf(futures).get();CompletableFuture.runAsync(new TaskC(), customThreadPool).get();}

输出:

start task [2021-06-02 16:56:42]
threadName: [pool-1-thread-2] taskName:[任务B] time:[2021-06-02 16:56:43]
threadName: [pool-1-thread-1] taskName:[任务A] time:[2021-06-02 16:56:44]
threadName: [pool-1-thread-3] taskName:[任务C] time:[2021-06-02 16:56:44]
end task [2021-06-02 16:56:44]

从输出中能看出B、A任务并发执行完成以后再执行C任务

任意一个任务并发执行完就执行下个任务

A、B并发执行,只要有一个执行完就执行C任务
在这里插入图片描述

anyOf:只要有任意一个CompletableFuture结束,就可以做接下来的事情,而无须像AllOf那样,等待所有的CompletableFuture结束

    @Testvoid anyOf() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);CompletableFuture.anyOf(futures).get();CompletableFuture.runAsync(new TaskC(), customThreadPool).get();}

输出:

start task [2021-06-02 17:43:42]
threadName: [pool-1-thread-2] taskName:[任务B] time:[2021-06-02 17:43:43]
threadName: [pool-1-thread-3] taskName:[任务C] time:[2021-06-02 17:43:43]
-----------
end task [2021-06-02 17:43:43]

串并行任务依赖场景

在这里插入图片描述

    @Testvoid multiSeqAndParTest() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture.runAsync(new TaskA(), customThreadPool).get();CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskB(), customThreadPool).thenRun(new TaskC());futures[1] = CompletableFuture.runAsync(new TaskD(), customThreadPool).thenRun(new TaskE());CompletableFuture.allOf(futures).get();CompletableFuture.runAsync(new TaskF(), customThreadPool).get();}

输出:

start task [2021-06-02 17:33:35]
threadName: [pool-1-thread-1] taskName:[任务A] time:[2021-06-02 17:33:37]
-----------
threadName: [pool-1-thread-3] taskName:[任务D] time:[2021-06-02 17:33:37]
threadName: [pool-1-thread-3] taskName:[任务E] time:[2021-06-02 17:33:37]
-----------
threadName: [pool-1-thread-2] taskName:[任务B] time:[2021-06-02 17:33:38]
threadName: [pool-1-thread-2] taskName:[任务C] time:[2021-06-02 17:33:38]
-----------
threadName: [pool-1-thread-4] taskName:[任务F] time:[2021-06-02 17:33:38]
end task [2021-06-02 17:33:38]

带返回值/参数传递任务

模拟任务

String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务A", LocalDateTime.now().format(formatter));return v;}String taskB(){try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务B", LocalDateTime.now().format(formatter));return v;}String taskC(){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务C", LocalDateTime.now().format(formatter));return v;}

带返回值实现

supplyAsync():异步执行并带返回值

    @Testvoid supplyAsync() throws ExecutionException, InterruptedException {CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> taskA());String result = stringCompletableFuture.get();System.out.println(result);}String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}

串行执行

在这里插入图片描述

thenApply(): 后面跟的是一个有参数、有返回值的方法,称为Function。返回值是CompletableFuture类型。
thenAccept():上个任务结束再执行(前面任务的结果作为下一个任务的入参)下一个任务

    String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务A", LocalDateTime.now().format(formatter));return v;}void taskC(String param){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务C", LocalDateTime.now().format(formatter));System.out.println(param + "\n ->" + v);}@Testvoid seqTest1() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> taskA()).thenApply(param -> {String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务B", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}).thenAccept(param -> taskC(param));completableFuture.get();}

输出:

start task [2021-06-03 11:14:27]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务A] time:[2021-06-03 11:14:30]->threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务B] time:[2021-06-03 11:14:30]->threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务C] time:[2021-06-03 11:14:31]
end task [2021-06-03 11:14:31]

多线程任务串行执行

A、B、C任务在多个线程环境下执行,但是执行需要带要带参数传递A->B->C,感觉这种使用场景比较少
在这里插入图片描述

thenCompose():第1个参数是一个CompletableFuture类型,第2个参数是一个方法,并且是一个BiFunction,也就是该方法有2个输入参数,1个返回值。从该接口的定义可以大致推测,它是要在2个 CompletableFuture 完成之后,把2个CompletableFuture的返回值传进去,再额外做一些事情。

模拟任务:

    String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务A", LocalDateTime.now().format(formatter));return v;}String taskB(String param){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务B", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}String taskC2(String param){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务C", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}

实现一:

        @Testvoid multiCompletableFutureSeqTest() throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> taskA()).thenCompose(firstTaskReturn -> CompletableFuture.supplyAsync(() -> taskB(firstTaskReturn))).thenCompose(secondTaskReturn -> CompletableFuture.supplyAsync(() -> taskC2(secondTaskReturn)));System.out.println(future.get());}

输出:

start task [2021-06-03 15:04:45]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务A] time:[2021-06-03 15:04:48]->threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任务B] time:[2021-06-03 15:04:51]->threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任务C] time:[2021-06-03 15:04:54]
end task [2021-06-03 15:04:54]

对任务并行执行,返回值combine

如果希望返回值是一个非嵌套的CompletableFuture,可以使用thenCompose

    @SneakyThrows@Testvoid multiCombineTest(){CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> taskA()).thenCombine(CompletableFuture.supplyAsync(() -> taskB2()), (s1, s2) -> s1 + "\n" +  s2 + "\n" + "combine: " + Thread.currentThread().getName()).thenCombine(CompletableFuture.supplyAsync(() -> taskC2()), (s1, s2) -> s1 + "\n" +  s2 + "\n" + "combine: " + Thread.currentThread().getName());System.out.println(future.get());}

写在最后

推荐一个大佬的并发编程框架,文章思路是照着他的readme去写的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/508789.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

搜索研发工程师需要掌握的一些技能

文章目录基础语言数据结构与算法工程方面搜索相关搜索主要模块电商搜索流程分词相关搜索召回相似度算法相关词推荐排序相关国美搜索搜索算法工程师需要掌握的技能基础 语言 大部分公司用的是Solr、ElasticSearch&#xff0c;都是基于Java实现的&#xff0c;因此熟悉掌握Java语…

Flink入门看完这篇文章就够了

文章目录第一章&#xff1a;概述第一节&#xff1a;什么是Flink&#xff1f;第二节&#xff1a;Flink特点&#xff1f;第三节&#xff1a;Flink应用场景&#xff1f;第四节&#xff1a;Flink核心组成第五节&#xff1a;Flink处理模型&#xff1a;流处理和批处理第六节&#xff…

word小结

域代码/域结果显示设置 word选项---->>高级------>>显示域代码而非域值将样式传给其它文件使用 首先启动Word打开包含这些样式的一个文件&#xff0c;然后选择“工具”---->“模板和加载项”。在弹出的对话框中单击“管理器”按钮。在弹出的“管理器”对话框中&…

线程属性总结

今天面试那哥们问起线程属性&#xff0c;me竟然就说出了一个&#xff0c;囧 学习&#xff1a;http://blog.csdn.net/zsf8701/article/details/7842392 http://blog.csdn.net/jxhnuaa/article/details/3254299 http://blog.sina.com.cn/s/blog_9bd573450101hgdr.html int pthre…

百度2015校园招聘软件开发笔试题及答案

简单题&#xff08;本题共30分&#xff09; 请简述Tcp-ip的3次握手以及4次挥手过程&#xff1f;并解释为何关闭连接需要4次挥手(10分) 详细答案参见TCP/IP协议三次握手与四次握手流程解析 TCP三次握手、四次挥手过程如下: 通常情况下&#xff0c;一个正常的TCP连接&#xf…

linux ps 命令使用

Linux中的ps命令是Process Status的缩写。ps命令用来列出系统中当前运行的那些进程。ps命令列出的是当前那些进程的快照&#xff0c;就是执行ps命令的那个时刻的那些进程&#xff0c;如果想要动态的显示进程信息&#xff0c;就可以使用top命令。 linux上进程有5种状态 ps命令使…

UML序列图总结

序列图主要用于展示对象之间交互的顺序。 序列图将交互关系表示为一个二维图。纵向是时间轴&#xff0c;时间沿竖线向下延伸。横向轴代表了在协作中各独立对象的类元角色。类元角色用生命线表示。当对象存在时&#xff0c;角色用一条虚线表示&#xff0c;当对象的过程处于激活…

UML用例图总结

用例图主要用来描述 用户、需求、系统功能单元 之间的关系。它展示了一个外部用户能够观察到的系统功能模型图。 【用途】&#xff1a;帮助开发团队以一种可视化的方式理解系统的功能需求。 用例图所包含的元素如下&#xff1a; 1. 参与者(Actor) 表示与您的应用程序或…

Linux网络编程常见面试题

概述 TCP和UDP是网络体系结构TCP/IP模型中传输层一层中的两个不同的通信协议。 TCP&#xff1a;传输控制协议&#xff0c;一种面向连接的协议&#xff0c;给用户进程提供可靠的全双工的字节流&#xff0c;TCP套接口是字节流套接口(stream socket)的一种。UDP&#xff1a;用户…

linux动态库查找路径以及依赖关系梳理

编译时与运行时库的路径 linux下&#xff0c;编译时与运行时库的搜索路径是不同的 运行时动态库的路径搜索顺序 LD_PRELOAD环境变量&#xff0c;一般用于hack 编译目标代码时指定的动态库搜索路径(指的是用 -wl,rpath 或-R选项而不是-L)&#xff0c;readelf -d命令可以查看编…

eclipse--android开发环境搭建教程

引言 在windows安装Android的开发环境不简单也说不上算复杂&#xff0c;但由于国内无法正常访问google给android开发环境搭建带来不小的麻烦。现将本人搭建过程记录如下&#xff0c;希望会对投身android开发的小伙伴有所帮助。 android开发环境部署过程 安装JDK环境 下载安装…

eclipse--python开发环境搭建

pydev插件介绍 PyDev is a Python IDE for Eclipse pydev官方网站&#xff1a;http://www.pydev.org/ 在Eclipse中安装pydev插件 启动Eclipse, 点击Help->Install New Software… 在弹出的对话框中&#xff0c;点Add 按钮。 Name中填:Pydev, Location中填http://pydev.or…

Win7虚拟无线AP以及Android手机抓包

设备要求 Windows7操作系统装有无线网卡的笔记本或台式机无线网卡必须支持“承载网络” 查看无线网卡是否支持“承载” 方法一: 开始菜单→所有程序→附件→命令提示符→右键“以管理员权限运行”; 键入命令“netsh wlan show drivers”,查看“支持承载网络”这一项,如果是…

CMD命令之BAT脚本路径信息

CD命令解疑 cd是chdir的缩写&#xff0c;命令详解参见cd /? 可以看到/d参数的解释如下&#xff1a; 使用 /D命令行开关&#xff0c;除了改变驱动器的当前目录之外&#xff0c;还可改变当前驱动器。 通常我们在xp系统中打开cmd窗口时&#xff0c;会显示 C:\Documents and Se…

【ubuntu 22.04】安装vscode并配置正常访问应用商店

注意&#xff1a;要去vscode官网下载deb安装包&#xff0c;在软件商店下载的版本不支持输入中文 在ubuntu下用火狐浏览器无法访问vscode官网&#xff0c;此时可以手动进行DNS解析&#xff0c;打开DNS在线查询工具&#xff0c;解析以下主机地址&#xff08;复制最后一个IP地址&a…

卷积与傅立叶变换

一、卷积 1、一维的卷积 连续&#xff1a; 在泛函分析中&#xff0c;卷积是通过两个函数f(x)f(x)和g(x)g(x)生成第三个函数的一种算子&#xff0c;它代表的意义是&#xff1a;两个函数中的一个(我取g(x)g(x)&#xff0c;可以任意取)函数&#xff0c;把g(x)g(x)经过翻转平移,…

OpenCV-Python bindings是如何生成的(2)

OpenCV-Python bindings生成流程 通过上篇文章和opencv python模块中的CMakeLists.txt文件&#xff0c;可以了解到opencv-python bindings生成的整个流程: 生成headers.txt文件 将每个模块的头文件添加到list中&#xff0c;通过一些关键词过滤掉一些不需要扩展的头文件&#x…

tcp状态机-三次握手-四次挥手以及常见面试题

TCP状态机介绍 在网络协议栈中&#xff0c;目前只有TCP提供了一种面向连接的可靠性数据传输。而可靠性&#xff0c;无非就是保证&#xff0c;我发给你的&#xff0c;你一定要收到。确保中间的通信过程中&#xff0c;不会丢失数据和乱序。在TCP保证可靠性数据传输的实现来看&am…

CentOS7开发环境搭建(1)

文章目录BIOS开启VT支持U盘安装系统(2019-03-11)CentOS DNS配置CentOS网络配置配置静态IP克隆虚拟机网卡名称变更 CentOS6.5时间配置安装VMWare-tools用户管理 (2019-03-15 7.6.1810)给一般账号 root 权限Samba服务配置安装必备软件获取本机公网ipyum源和第三方库源管理配置本地…