基于CompletableFuture并发任务编排实现

文章目录

  • 并发任务编排实现
    • 不带返回值/参数传递任务
      • 串行执行
      • 并行执行
      • 并行执行-自定义线程池
      • 阻塞等待:多并行任务执行完再执行
      • 任意一个任务并发执行完就执行下个任务
      • 串并行任务依赖场景
    • 带返回值/参数传递任务
      • 带返回值实现
      • 串行执行
    • 多线程任务串行执行
      • 对任务并行执行,返回值combine
    • 写在最后

并发任务编排实现

其实Java8中提供了并发编程框架CompletableFuture,以下结合不同场景进行使用。

不带返回值/参数传递任务

模拟任务代码:

    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");class TaskA implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(2000);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务A", LocalDateTime.now().format(formatter)));}}class TaskB implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(1000);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务B", LocalDateTime.now().format(formatter)));}}class TaskC implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(50);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务C", LocalDateTime.now().format(formatter)));}}

串行执行

A、B、C任务串行执行
在这里插入图片描述

CompletableFuture,runAsync():异步执行
thenRun():上个任务结束再执行(不带上一个返回值结果)下一个任务
get():阻塞等待任务执行完成

实现方式:

    @Testvoid thenRunTest() throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.runAsync(new TaskA()).thenRun(new TaskB()).thenRun(new TaskC());future.get();}

输出:

threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务A] time:[2021-06-01 22:56:51]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务B] time:[2021-06-01 22:56:52]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务C] time:[2021-06-01 22:56:53]

从日志就能看出串行执行就是通过单线程执行多个任务。

并行执行

A、B、C任务并行执行
在这里插入图片描述

CompletableFuture.allOf():等待所有的CompletableFuture执行完成,无返回值

代码实现:

    /*** 并发执行ABC任务*/@SneakyThrows@Testvoid SeqTest(){String start = LocalDateTime.now().format(formatter);System.out.println(String.format("start task [%s]", start));CompletableFuture[] futures = new CompletableFuture[3];futures[0] = CompletableFuture.runAsync(new TaskA());futures[1] = CompletableFuture.runAsync(new TaskB());futures[2] = CompletableFuture.runAsync(new TaskC());CompletableFuture.allOf(futures).get();String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}

输出:

start task [2021-06-01 23:03:49]
threadName: [ForkJoinPool.commonPool-worker-3] taskName:[任务C] time:[2021-06-01 23:03:49]
threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任务B] time:[2021-06-01 23:03:50]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务A] time:[2021-06-01 23:03:51]
end task [2021-06-01 23:03:51]

上述这种方式执行可以看出CompletableFuture默认使用的是ForkJoinPool.commonPool线程池,居然用的默认线程池那线程数是如何配置的呢?后来找到源码发现commonPool线程池配置代码如下
在这里插入图片描述

  1. 先去看看java环境变量有没有制定线程数(如果没有特殊制定默认没有)
  2. 如果没有配置则通过操作系统的核心数减一来设置线程数(我理解的减一应该是为了给main thread执行)
  3. 这种默认配置方式适合用于CPU密集型任务,如果IO型需要我们自己去配置线程池

并行执行-自定义线程池

不是所有任务都是CPU密集型,为了解决上述问题,尤其是IO场景,我们需要根据业务场景配置合理线程数充分使其利用cpu资源。
如何合理配置线程数可以参考我之前文章

    @SneakyThrows@Testvoid ParTestWithThreadPool(){String start = LocalDateTime.now().format(formatter);System.out.println(String.format("start task [%s]", start));ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(24, 32, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[3];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);futures[2] = CompletableFuture.runAsync(new TaskC(), customThreadPool);CompletableFuture.allOf(futures).get();String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}

输出:

start task [2021-06-02 00:00:05]
threadName: [pool-1-thread-3] taskName:[任务C] time:[2021-06-02 00:00:05]
threadName: [pool-1-thread-2] taskName:[任务B] time:[2021-06-02 00:00:06]
threadName: [pool-1-thread-1] taskName:[任务A] time:[2021-06-02 00:00:07]
end task [2021-06-02 00:00:07]

阻塞等待:多并行任务执行完再执行

A、B并行都执行完后再执行C任务
在这里插入图片描述

    @AfterTestvoid after(){String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}@Testvoid SeqAndParTest() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);CompletableFuture.allOf(futures).get();CompletableFuture.runAsync(new TaskC(), customThreadPool).get();}

输出:

start task [2021-06-02 16:56:42]
threadName: [pool-1-thread-2] taskName:[任务B] time:[2021-06-02 16:56:43]
threadName: [pool-1-thread-1] taskName:[任务A] time:[2021-06-02 16:56:44]
threadName: [pool-1-thread-3] taskName:[任务C] time:[2021-06-02 16:56:44]
end task [2021-06-02 16:56:44]

从输出中能看出B、A任务并发执行完成以后再执行C任务

任意一个任务并发执行完就执行下个任务

A、B并发执行,只要有一个执行完就执行C任务
在这里插入图片描述

anyOf:只要有任意一个CompletableFuture结束,就可以做接下来的事情,而无须像AllOf那样,等待所有的CompletableFuture结束

    @Testvoid anyOf() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);CompletableFuture.anyOf(futures).get();CompletableFuture.runAsync(new TaskC(), customThreadPool).get();}

输出:

start task [2021-06-02 17:43:42]
threadName: [pool-1-thread-2] taskName:[任务B] time:[2021-06-02 17:43:43]
threadName: [pool-1-thread-3] taskName:[任务C] time:[2021-06-02 17:43:43]
-----------
end task [2021-06-02 17:43:43]

串并行任务依赖场景

在这里插入图片描述

    @Testvoid multiSeqAndParTest() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture.runAsync(new TaskA(), customThreadPool).get();CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskB(), customThreadPool).thenRun(new TaskC());futures[1] = CompletableFuture.runAsync(new TaskD(), customThreadPool).thenRun(new TaskE());CompletableFuture.allOf(futures).get();CompletableFuture.runAsync(new TaskF(), customThreadPool).get();}

输出:

start task [2021-06-02 17:33:35]
threadName: [pool-1-thread-1] taskName:[任务A] time:[2021-06-02 17:33:37]
-----------
threadName: [pool-1-thread-3] taskName:[任务D] time:[2021-06-02 17:33:37]
threadName: [pool-1-thread-3] taskName:[任务E] time:[2021-06-02 17:33:37]
-----------
threadName: [pool-1-thread-2] taskName:[任务B] time:[2021-06-02 17:33:38]
threadName: [pool-1-thread-2] taskName:[任务C] time:[2021-06-02 17:33:38]
-----------
threadName: [pool-1-thread-4] taskName:[任务F] time:[2021-06-02 17:33:38]
end task [2021-06-02 17:33:38]

带返回值/参数传递任务

模拟任务

String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务A", LocalDateTime.now().format(formatter));return v;}String taskB(){try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务B", LocalDateTime.now().format(formatter));return v;}String taskC(){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务C", LocalDateTime.now().format(formatter));return v;}

带返回值实现

supplyAsync():异步执行并带返回值

    @Testvoid supplyAsync() throws ExecutionException, InterruptedException {CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> taskA());String result = stringCompletableFuture.get();System.out.println(result);}String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}

串行执行

在这里插入图片描述

thenApply(): 后面跟的是一个有参数、有返回值的方法,称为Function。返回值是CompletableFuture类型。
thenAccept():上个任务结束再执行(前面任务的结果作为下一个任务的入参)下一个任务

    String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务A", LocalDateTime.now().format(formatter));return v;}void taskC(String param){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务C", LocalDateTime.now().format(formatter));System.out.println(param + "\n ->" + v);}@Testvoid seqTest1() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> taskA()).thenApply(param -> {String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务B", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}).thenAccept(param -> taskC(param));completableFuture.get();}

输出:

start task [2021-06-03 11:14:27]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务A] time:[2021-06-03 11:14:30]->threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务B] time:[2021-06-03 11:14:30]->threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务C] time:[2021-06-03 11:14:31]
end task [2021-06-03 11:14:31]

多线程任务串行执行

A、B、C任务在多个线程环境下执行,但是执行需要带要带参数传递A->B->C,感觉这种使用场景比较少
在这里插入图片描述

thenCompose():第1个参数是一个CompletableFuture类型,第2个参数是一个方法,并且是一个BiFunction,也就是该方法有2个输入参数,1个返回值。从该接口的定义可以大致推测,它是要在2个 CompletableFuture 完成之后,把2个CompletableFuture的返回值传进去,再额外做一些事情。

模拟任务:

    String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务A", LocalDateTime.now().format(formatter));return v;}String taskB(String param){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务B", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}String taskC2(String param){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任务C", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}

实现一:

        @Testvoid multiCompletableFutureSeqTest() throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> taskA()).thenCompose(firstTaskReturn -> CompletableFuture.supplyAsync(() -> taskB(firstTaskReturn))).thenCompose(secondTaskReturn -> CompletableFuture.supplyAsync(() -> taskC2(secondTaskReturn)));System.out.println(future.get());}

输出:

start task [2021-06-03 15:04:45]
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任务A] time:[2021-06-03 15:04:48]->threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任务B] time:[2021-06-03 15:04:51]->threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任务C] time:[2021-06-03 15:04:54]
end task [2021-06-03 15:04:54]

对任务并行执行,返回值combine

如果希望返回值是一个非嵌套的CompletableFuture,可以使用thenCompose

    @SneakyThrows@Testvoid multiCombineTest(){CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> taskA()).thenCombine(CompletableFuture.supplyAsync(() -> taskB2()), (s1, s2) -> s1 + "\n" +  s2 + "\n" + "combine: " + Thread.currentThread().getName()).thenCombine(CompletableFuture.supplyAsync(() -> taskC2()), (s1, s2) -> s1 + "\n" +  s2 + "\n" + "combine: " + Thread.currentThread().getName());System.out.println(future.get());}

写在最后

推荐一个大佬的并发编程框架,文章思路是照着他的readme去写的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/508789.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

搜索研发工程师需要掌握的一些技能

文章目录基础语言数据结构与算法工程方面搜索相关搜索主要模块电商搜索流程分词相关搜索召回相似度算法相关词推荐排序相关国美搜索搜索算法工程师需要掌握的技能基础 语言 大部分公司用的是Solr、ElasticSearch&#xff0c;都是基于Java实现的&#xff0c;因此熟悉掌握Java语…

Flink入门看完这篇文章就够了

文章目录第一章&#xff1a;概述第一节&#xff1a;什么是Flink&#xff1f;第二节&#xff1a;Flink特点&#xff1f;第三节&#xff1a;Flink应用场景&#xff1f;第四节&#xff1a;Flink核心组成第五节&#xff1a;Flink处理模型&#xff1a;流处理和批处理第六节&#xff…

管理实践-教练技术的应用

文章目录简介课程学习的工具总结深度倾听3R原则倾听地图&#xff1a;开放式提问层次提问和SMART提问框架BIA积极性反馈GROW模型简介 最近在参加管理培训课程&#xff0c;学习《教练式指导》一课&#xff0c;现将内容总结分享一下。 课程学习的工具总结 深度倾听3R原则 工具…

spark整合MySQL

spark整合MySQL <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency>import java.sql.{Connection, DriverManager, PreparedStatement} import org…

DataFrame不同风格比较

DataFrame不同风格比较 一&#xff0c;DSL风格语法 //加载数据 val rdd1sc.textFile("/person.txt").map(x>x.split(" ")) //定义一个样例类 case class Person(id:String,name:String,age:Int) //把rdd与样例类进行关联 val personRDDrdd1.map(x>…

sparkSQL操作hiveSQL

sparkSQL操作hiveSQL <dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.3.3</version></dependency>import org.apache.spark.sql.SparkSession//todo:利用sparksql操作h…

sparksql加载mysql表中的数据

sparksql加载mysql表中的数据 <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version> </dependency>import java.util.Propertiesimport org.apache.spark.SparkCon…

sparksql保存数据常见操作

sparksql保存数据操作 import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession}//todo:sparksql可以把结果数据保存到不同的外部存储介质中 object SaveResult {def main(args: Array[String]): Unit {//1、创建SparkConf对象val sparkCon…

sparksql自定义函数

sparksql中自定义函数 import org.apache.spark.sql.api.java.UDF1 import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame, SparkSession}//TODO:自定义sparksql的UDF函数 一对一的关系 object SparkSQLFunction {def main(args: Array[S…

sparksql整合hive

sparksql整合hive 步骤 1、需要把hive安装目录下的配置文件hive-site.xml拷贝到每一个spark安装目录下对应的conf文件夹中2、需要一个连接mysql驱动的jar包拷贝到spark安装目录下对应的jars文件夹中3、可以使用spark-sql脚本 后期执行sql相关的任务 启动脚本 spark-sql \ --…

hive的一些常见内置函数

hive行转列 selectt1.base,concat_ws(|, collect_set(t1.name)) namefrom(selectname,concat(constellation, "," , blood_type) basefromperson_info) t1group byt1.base;hive列转行 select movie, category_name from movie_info lateral view explode(category)…

hive的一些调优参数

hive的一些调优参数 set hive.exec.dynamic.partition.modenonstrict; 使用动态分区 set hive.exec.max.dynamic.partitions100000;自动分区数最大值 set hive.exec.max.dynamic.partitions.pernode100000; set hive.hadoop.supports.splittable.combineinputformattrue;支持切…

hive的SerDe序列化

hive使用Serde进行行对象的序列与反序列化。最后实现把文件内容映射到 hive 表中的字段数据类型。 HDFS files –> InputFileFormat –> <key, value> –> Deserializer –> Row objectRow object –> Serializer –> <key, value> –> Outp…

窗口函数和hive优化简记

窗口函数&#xff1a; &#xff08;1&#xff09; OVER()&#xff1a;指定分析函数工作的数据窗口大小&#xff0c;这个数据窗口大小可能会随着行的变而变化。常用partition by 分区order by排序。 &#xff08;2&#xff09;CURRENT ROW&#xff1a;当前行 &#xff08;3&…

Kafka一些参数配置

Producer消息发送 producer.send(msg); // 用类似这样的方式去发送消息&#xff0c;就会把消息给你均匀的分布到各个分区上去 producer.send(key, msg); // 订单id&#xff0c;或者是用户id&#xff0c;他会根据这个key的hash值去分发到某个分区上去&#xff0c;他可以保证相同…

hive避免MR的情况

什么情况下Hive可以避免进行MapReduce hive 为了执行效率考虑&#xff0c;简单的查询&#xff0c;就是只是select&#xff0c;不带count,sum,group by这样的&#xff0c;都不走map/reduce&#xff0c;直接读取hdfs目录中的文件进行filter过滤。 sql select * from employee; …

flink常见算子的一些操作

常见Transformation操作 map和filter /*** 数据源&#xff1a;1 2 3 4 5.....源源不断过来* 通过map打印一下接受到数据* 通过filter过滤一下数据&#xff0c;我们只需要偶数*/ public class MapDemo {public static void main(String[] args) throws Exception {StreamExecut…

flink的watermark参考配置

需求描述&#xff1a;每隔5秒&#xff0c;计算最近10秒单词出现的次数。 TimeWindow实现 /*** 每隔5秒计算最近10秒单词出现的次数*/ public class TimeWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env StreamExe…

hbase常见处理方式

相关依赖 <dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.1</version></dependency><dependency><groupId>org.apache.hbase</gro…

flink连接kafka整合hbase,scala

解析kafka当中的json格式的数据&#xff0c;入hbase import java.util.Propertiesimport com.alibaba.fastjson.{JSON, JSONObject} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBack…