Java8 CompletableFuture异步编程-入门篇

🏷️个人主页:牵着猫散步的鼠鼠 

🏷️系列专栏:Java全栈-专栏

🏷️个人学习笔记,若有缺误,欢迎评论区指正 

目录

前言

1、Future vs CompletableFuture

1.1 准备工作

1.2 Future 的局限性

PS: 后续的演示文件,就不一一展示,我们可以自己定义一些内容,用于处理即可

1.3 CompletableFuture 的优势

2、创建异步任务

2.1 runAsync

2.2 supplyAsync

2.3 异步任务中的线程池

2.4 异步编程思想

3、异步任务回调

3.1 thenApply

3.2 thenAccept

3.3 thenRun

3.4 更进一步提升并行化

4、异步任务编排

4.1 编排2个依赖关系的异步任务 thenCompose()

4.2 编排2个非依赖关系的异步任务 thenCombine()

4.3 合并多个异步任务 allOf / anyOf

5、异步任务的异常处理

5.1 exceptionally()

5.2 handle()

总结


前言

JDK 5引入了Future模式。Future接口是Java多线程Future模式的实现,在java.util.concurrent包中,可以来进行异步计算。

Future模式是多线程设计常用的一种设计模式。Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。

Java 8新增的CompletableFuture类正是吸收了所有Google Guava中ListenableFuture和SettableFuture的特征,还提供了其它强大的功能,让Java拥有了完整的非阻塞编程模型:Future、Promise 和 Callback(在Java8之前,只有无Callback 的Future)。

CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。

CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。

1、Future vs CompletableFuture

1.1 准备工作

为了便于后续更好地调试,我们需要定义一个工具类辅助我们对知识的理解。

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
​
public class CommonUtils {
​public static String readFile(String pathToFile) {try {return Files.readString(Paths.get(pathToFile));} catch(Exception e) {e.printStackTrace();return "";}}
​//当前线程休眠-毫秒public static void sleepMillis(long millis) {try {TimeUnit.MILLISECONDS.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();}}
​//当前线程休眠-秒public static void sleepSecond(int seconds) {try {TimeUnit.SECONDS.sleep(seconds);} catch (InterruptedException e) {e.printStackTrace();}}
​//打印日志public static void printThreadLog(String message) {//时间戳|线程id|线程名|日志信息String result = new StringJoiner(" | ").add(String.valueOf(System.currentTimeMillis())).add(String.format("%2d",Thread.currentThread().getId())).add(String.valueOf(Thread.currentThread().getName())).add(message).toString();System.out.println(result);}
}
​

1.2 Future 的局限性

需求:替换新闻稿 ( news.txt ) 中敏感词汇 ,把敏感词汇替换成*,敏感词存储在 filter_words.txt 中 news.txt

oh my god!completablefuture真tmd好用呀
PS: 后续的演示文件,就不一一展示,我们可以自己定义一些内容,用于处理即可
public class FutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {
​ExecutorService executor = Executors.newFixedThreadPool(5);// step 1: 读取敏感词汇Future<String[]> filterWordFuture = executor.submit(() -> {String str = CommonUtils.readFile("filter_words.txt");String[] filterWords = str.split(",");return filterWords;});
​// step 2: 读取新闻稿文件内容Future<String> newsFuture = executor.submit(() -> {return CommonUtils.readFile("news.txt");});
​// step 3: 替换操作(当敏感词汇很多,文件很多,替换也会是个耗时的任务)Future<String> replaceFuture = executor.submit(() -> {String[] words = filterWordFuture.get();String news = newsFuture.get();
​// 替换敏感词汇for (String word : words) {if (news.indexOf(word) >= 0) {news = news.replace(word, "**");}}return news;});
​String filteredNews = replaceFuture.get();System.out.println("过滤后的新闻稿:" + filteredNews);
​executor.shutdown();}
}
​

通过上面的代码,我们会发现,Future相比于所有任务都直接在主线程处理,有很多优势,但同时也存在不足,至少表现如下:

  • 在没有阻塞的情况下,无法对Future的结果执行进一步的操作。Future不会告知你它什么时候完成,你如果想要得到结果,必须通过一个get()方法,该方法会阻塞直到结果可用为止。 它不具备将回调函数附加到Future后并在Future的结果可用时自动调用回调的能力。
  • 无法解决任务相互依赖的问题。filterWordFuture和newsFuture的结果不能自动发送给replaceFuture,需要在replaceFuture中手动获取,所以使用Future不能轻而易举地创建异步工作流。
  • 不能将多个Future合并在一起。假设你有多种不同的Future,你想在它们全部并行完成后然后再运行某个函数,Future很难独立完成这一需要。
  • 没有异常处理。Future提供的方法中没有专门的API应对异常处理,还是需要开发者自己手动异常处理。

1.3 CompletableFuture 的优势

CompletableFuture 实现了FutureCompletionStage接口

CompletableFuture 相对于 Future 具有以下优势:

  • 为快速创建、链接依赖和组合多个Future提供了大量的便利方法。
  • 提供了适用于各种开发场景的回调函数,它还提供了非常全面的异常处理支持。
  • 无缝衔接和亲和 lambda 表达式 和 Stream - API 。
  • 我见过的真正意义上的异步编程,把异步编程和函数式编程、响应式编程多种高阶编程思维集于一身,设计上更优雅。

2、创建异步任务

2.1 runAsync

如果你要异步运行某些耗时的后台任务,并且不想从任务中返回任何内容,则可以使用CompletableFuture.runAsync()方法。它接受一个Runnable接口的实现类对象,方法返回CompletableFuture<Void> 对象

static CompletableFuture<Void> runAsync(Runnable runnable);

演示案例:开启一个不从任务中返回任何内容的CompletableFuture异步任务

public class RunAsyncDemo {public static void main(String[] args) {// runAsync 创建异步任务CommonUtils.printThreadLog("main start");// 使用Runnable匿名内部类CompletableFuture.runAsync(new Runnable() {@Overridepublic void run() {CommonUtils.printThreadLog("读取文件开始");// 使用睡眠来模拟一个长时间的工作任务(例如读取文件,网络请求等)CommonUtils.sleepSecond(3);CommonUtils.printThreadLog("读取文件结束");}});
​CommonUtils.printThreadLog("here are not blocked,main continue");CommonUtils.sleepSecond(4); //  此处休眠为的是等待CompletableFuture背后的线程池执行完成。CommonUtils.printThreadLog("main end");}
}

我们也可以以Lambda表达式的形式传递Runnable接口实现类对象

public class RunAsyncDemo2 {public static void main(String[] args) {// runAsync 创建异步任务CommonUtils.printThreadLog("main start");// 使用Lambda表达式CompletableFuture.runAsync(() -> {CommonUtils.printThreadLog("读取文件开始");CommonUtils.sleepSecond(3);CommonUtils.printThreadLog("读取文件结束");});
​CommonUtils.printThreadLog("here are not blocked,main continue");CommonUtils.sleepSecond(4);CommonUtils.printThreadLog("main end");}
}
​

需求:使用CompletableFuture开启异步任务读取 news.txt 文件中的新闻稿,并打印输出。

public class RunAsyncDemo3 {public static void main(String[] args) {// 需求:使用多线程异步读取 words.txt 中的敏感词汇,并打印输出。CommonUtils.printThreadLog("main start");
​CompletableFuture.runAsync(()->{String news = CommonUtils.readFile("news.txt");CommonUtils.printThreadLog(news);});
​CommonUtils.printThreadLog("here are not blocked,main continue");CommonUtils.sleepSecond(4);CommonUtils.printThreadLog("main end");}
}

在后续的章节中,我们会经常使用Lambda表达式。

2.2 supplyAsync

CompletableFuture.runAsync() 开启不带返回结果异步任务。但是,如果您想从后台的异步任务中返回一个结果怎么办?此时,CompletableFuture.supplyAsync()是你最好的选择了。

static CompletableFuture<U> supplyAsync(Supplier<U> supplier)

它入参一个 Supplier 供给者,用于供给带返回值的异步任务 并返回CompletableFuture<U>,其中U是供给者给程序供给值的类型。

需求:开启异步任务读取 news.txt 文件中的新闻稿,返回文件中内容并在主线程打印输出

public class SupplyAsyncDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CommonUtils.printThreadLog("main start");
​CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {String news = CommonUtils.readFile("news.txt");return news;}});
​CommonUtils.printThreadLog("here are not blocked,main continue");// 阻塞并等待newsFuture完成String news = newsFuture.get();CommonUtils.printThreadLog("news = " + news);CommonUtils.printThreadLog("main end");}
}

如果想要获取newsFuture结果,可以调用completableFuture.get()方法,get()方法将阻塞,直到newsFuture完成。

我们依然可以使用Java 8的Lambda表达式使上面的代码更简洁。

CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {String news = CommonUtils.readFile("news.txt");return news;
});

2.3 异步任务中的线程池

大家已经知道,runAsync()和supplyAsync()方法都是开启单独的线程中执行异步任务。但是,我们从未创建线程对吗? 不是吗!

CompletableFuture 会从全局的ForkJoinPool.commonPool() 线程池获取线程来执行这些任务

当然,你也可以创建一个线程池,并将其传递给runAsync()和supplyAsync()方法,以使它们在从您指定的线程池获得的线程中执行任务。

CompletableFuture API中的所有方法都有两种变体,一种是接受传入的Executor参数作为指定的线程池,而另一种则使用默认的线程池 (ForkJoinPool.commonPool()) 。

// runAsync() 的重载方法 
static CompletableFuture<Void>  runAsync(Runnable runnable)
static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
// supplyAsync() 的重载方法 
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

需求:指定线程池,开启异步任务读取 news.txt 中的新闻稿,返回文件中内容并在主线程打印输出

ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("异步读取文件开始");String news = CommonUtils.readFile("news.txt");CommonUtils.printThreadLog("异步读取文件完成");return news;
},executor);

最佳实践:创建属于自己的业务线程池

如果所有CompletableFuture共享一个线程池,那么一旦有异步任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。

所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

2.4 异步编程思想

综合上述,看到了吧,我们没有显式地创建线程,更没有涉及线程通信的概念,整个过程根本就没涉及线程知识吧,以上专业的说法是:线程的创建和线程负责的任务进行解耦,它给我们带来的好处线程的创建和启动全部交给线程池负责,具体任务的编写就交给程序员,专人专事

异步编程是可以让程序并行( 也可能是并发 )运行的一种手段,其可以让程序中的一个工作单元作为异步任务与主线程分开独立运行,并且在异步任务运行结束后,会通知主线程它的运行结果或者失败原因,毫无疑问,一个异步任务其实就是开启一个线程来完成的,使用异步编程可以提高应用程序的性能和响应能力等。

作为开发者,只需要有一个意识:

开发者只需要把耗时的操作交给CompletableFuture开启一个异步任务,然后继续关注主线程业务,当异步任务运行完成时会通知主线程它的运行结果。我们把具备了这种编程思想的开发称为异步编程思想

3、异步任务回调

CompletableFuture.get()方法是阻塞的。调用时它会阻塞等待 直到这个Future完成,并在完成后返回结果。 但是,很多时候这不是我们想要的。

对于构建异步系统,我们应该能够将回调附加到CompletableFuture上,当这个Future完成时,该回调应自动被调用。 这样,我们就不必等待结果了,然后在Future的回调函数内编写完成Future之后需要执行的逻辑。 您可以使用thenApply(),thenAccept()和thenRun()方法,它们可以把回调函数附加到CompletableFuture

3.1 thenApply

使用 thenApply() 方法可以处理和转换CompletableFuture的结果。 它以Function<T,R>作为参数。 Function<T,R>是一个函数式接口,表示一个转换操作,它接受类型T的参数并产生类型R的结果

CompletableFuture<R> thenApply(Function<T,R> fn)

需求:异步读取 filter_words.txt 文件中的内容,读取完成后,把内容转换成数组( 敏感词数组 ),异步任务返回敏感词数组

public class ThenApplyDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {  CommonUtils.printThreadLog("main start");CompletableFuture<String> readFileFuture = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;});CompletableFuture<String[]> filterWordsFuture = readFileFuture.thenApply((content) -> {CommonUtils.printThreadLog("文件内容转换成敏感词数组");String[] filterWords = content.split(",");return filterWords;});CommonUtils.printThreadLog("main continue");String[] filterWords = filterWordsFuture.get();CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));CommonUtils.printThreadLog("main end");}
}

你还可以通过附加一系列thenApply()回调方法,在CompletableFuture上编写一系列转换序列。一个thenApply()方法的结果可以传递给序列中的下一个,如果你对链式操作很了解,你会发现结果可以在链式操作上传递。

CompletableFuture<String[]> filterWordsFuture = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;
}).thenApply((content) -> {CommonUtils.printThreadLog("转换成敏感词数组");String[] filterWords = content.split(",");return filterWords;
});

3.2 thenAccept

如果你不想从回调函数返回结果,而只想在Future完成后运行一些代码,则可以使用thenAccept()

这些方法是入参一个 Consumer,它可以对异步任务的执行结果进行消费使用,方法返回CompletableFuture。

CompletableFuture<Void>	thenAccept(Consumer<T> action)

通常用作回调链中的最后一个回调。

需求:异步读取 filter_words.txt 文件中的内容,读取完成后,转换成敏感词数组,然后打印敏感词数组

public class ThenAcceptDemo {public static void main(String[] args) {CommonUtils.printThreadLog("main start");CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;}).thenApply((content) -> {CommonUtils.printThreadLog("转换成敏感词数组");String[] filterWords = content.split(",");return filterWords;}).thenAccept((filterWords) -> {CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));});CommonUtils.printThreadLog("main continue");CommonUtils.sleepSecond(4);CommonUtils.printThreadLog("main end");}
}

3.3 thenRun

前面我们已经知道,通过thenApply( Function<T,R> ) 对链式操作中的上一个异步任务的结果进行转换,返回一个新的结果;

通过thenAccept( Consumer ) 对链式操作中上一个异步任务的结果进行消费使用,不返回新结果;

如果我们只是想从CompletableFuture的链式操作得到一个完成的通知,甚至都不使用上一步链式操作的结果,那么 CompletableFuture.thenRun() 会是你最佳的选择,它需要一个Runnable并返回CompletableFuture<Void>。

CompletableFuture<Void> thenRun(Runnable action);

演示案例:我们仅仅想知道 filter_words.txt 的文件是否读取完成

public class ThenRunDemo {public static void main(String[] args) {CommonUtils.printThreadLog("main start");CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;}).thenRun(() -> {CommonUtils.printThreadLog("读取filter_words文件读取完成");});CommonUtils.printThreadLog("main continue");CommonUtils.sleepSecond(4);CommonUtils.printThreadLog("main end");}
}

3.4 更进一步提升并行化

CompletableFuture 提供的所有回调方法都有两个异步变体

CompletableFuture<U> thenApply(Function<T,U> fn)
// 回调方法的异步变体(异步回调)
CompletableFuture<U> thenApplyAsync(Function<T,U> fn)
CompletableFuture<U> thenApplyAsync(Function<T,U> fn, Executor executor)

注意:这些带了Async的异步回调 通过在单独的线程中执行回调任务 来帮助您进一步促进并行化计算。

回顾需求:异步读取 filter_words.txt 文件中的内容,读取完成后,转换成敏感词数组,主线程获取结果打印输出这个数组

public class ThenApplyAsyncDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CommonUtils.printThreadLog("main start");CompletableFuture<String[]> filterWordFuture = CompletableFuture.supplyAsync(() -> {/*CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;*/// 此时,立即返回结果return "尼玛, NB, tmd";}).thenApply((content) -> {/*** 一般而言,thenApply任务的执行和supplyAsync()任务执行可以使用同一线程执行* 如果supplyAsync()任务立即返回结果,则thenApply的任务在主线程中执行*/CommonUtils.printThreadLog("把内容转换成敏感词数组");String[] filterWords = content.split(",");return filterWords;});CommonUtils.printThreadLog("main continue");String[] filterWords = filterWordFuture.get();CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));CommonUtils.printThreadLog("main end");}
}

要更好地控制执行回调任务的线程,可以使用异步回调。如果使用thenApplyAsync()回调,那么它将在从ForkJoinPool.commonPool() 获得的另一个线程中执行

public class ThenApplyAsyncDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException {CommonUtils.printThreadLog("main start");CompletableFuture<String[]> filterWordFuture = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;}).thenApplyAsync((content) -> {CommonUtils.printThreadLog("把内容转换成敏感词数组");String[] filterWords = content.split(",");return filterWords;});CommonUtils.printThreadLog("main continue");String[] filterWords = filterWordFuture.get();CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));CommonUtils.printThreadLog("main end");}
}

以上程序一种可能的运行结果(需要多运行几次):

1672885914481 |  1 | main | main start
1672885914511 | 16 | ForkJoinPool.commonPool-worker-1 | 读取filter_words.txt文件
1672885914511 |  1 | main | main continue
1672885914521 | 17 | ForkJoinPool.commonPool-worker-2 | 把内容转换成敏感词数组
1672885914521 |  1 | main | filterWords = [尼玛, NB, tmd]
1672885914521 |  1 | main | main end

此外,如果将Executor传递给thenApplyAsync()回调,则该回调的异步任务将在从Executor的线程池中获取的线程中执行;

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String[]> filterWordFuture = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;
}).thenApplyAsync((content) -> {CommonUtils.printThreadLog("把内容转换成敏感词数组");String[] filterWords = content.split(",");return filterWords;
},executor);
executor.shutdown();

其他两个回调的变体版本如下:

// thenAccept和其异步回调
CompletableFuture<Void>	thenAccept(Consumer<T> action)
CompletableFuture<Void>	thenAcceptAsync(Consumer<T> action)
CompletableFuture<Void>	thenAcceptAsync(Consumer<T> action, Executor executor)// thenRun和其异步回调
CompletableFuture<Void>	thenRun(Runnable action)
CompletableFuture<Void>	thenRunAsync(Runnable action)
CompletableFuture<Void>	thenRunAsync(Runnable action, Executor executor)

4、异步任务编排

4.1 编排2个依赖关系的异步任务 thenCompose()

回顾需求:异步读取 filter_words.txt 文件中的内容,读取完成后,转换成敏感词数组让主线程待用。

关于读取和解析内容,假设使用以下的 readFileFuture(String) 和 splitFuture(String) 方法完成。

public static CompletableFuture<String> readFileFuture(String fileName) {return CompletableFuture.supplyAsync(() -> {String filterWordsContent = CommonUtils.readFile(fileName);return filterWordsContent;});
}public static CompletableFuture<String[]> splitFuture(String context) {return CompletableFuture.supplyAsync(() -> {String[] filterWords = context.split(",");return filterWords;});
}

现在,让我们先了解如果使用thenApply() 结果会发生什么

CompletableFuture<CompletableFuture<String[]>> future = readFileFuture("filter_words.txt").thenApply((context) -> {return splitFuture(context);});

回顾在之前的案例中,thenApply(Function<T,R>) 中Function回调会对上一步任务结果转换后得到一个简单值 ,但现在这种情况下,最终结果是嵌套的CompletableFuture,所以这是不符合预期的,那怎么办呢?

我们想要的是:把上一步异步任务的结果,转成一个CompletableFuture对象,这个CompletableFuture对象中包含本次异步任务处理后的结果。也就是说,我们想组合上一步异步任务的结果到下一个新的异步任务中, 结果由这个新的异步任务返回

此时,你需要使用thenCompose()方法代替,我们可以把它理解为 异步任务的组合

CompletableFuture<R> thenCompose(Function<T,CompletableFuture<R>> func)

所以,thenCompose()用来连接两个有依赖关系的异步任务,结果由第二个任务返回

CompletableFuture<String[]> future = readFileFuture("filter_words.txt").thenCompose((context) -> { return splitFuture(context);});

因此,这里积累了一个经验:

如果我们想连接( 编排 ) 两个依赖关系的异步任务( CompletableFuture 对象 ) ,请使用 thenCompose() 方法

当然,thenCompose 也存在异步回调变体版本:

CompletableFuture<R> thenCompose(Function<T,CompletableFuture<R>> fn)CompletableFuture<R> thenComposeAsync(Function<T,CompletableFuture<R>> fn)
CompletableFuture<R> thenComposeAsync(Function<T,CompletableFuture<R>> fn, Executor executor)

4.2 编排2个非依赖关系的异步任务 thenCombine()

我们已经知道,当其中一个Future依赖于另一个Future,使用thenCompose()用于组合两个Future。如果两个Future之间没有依赖关系,你希望两个Future独立运行并在两者都完成之后执行回调操作时,则使用thenCombine();

// T是第一个任务的结果 U是第二个任务的结果 V是经BiFunction应用转换后的结果
CompletableFuture<V> thenCombine(CompletableFuture<U> other, BiFunction<T,U,V> func)

需求:替换新闻稿 ( news.txt ) 中敏感词汇 ,把敏感词汇替换成*,敏感词存储在 filter_words.txt 中

public class ThenCombineDemo {public static void main(String[] args) throws Exception {// 读取敏感词汇的文件并解析到数组中CompletableFuture<String[]> future1 = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取敏感词汇并解析");String context = CommonUtils.readFile("filter_words.txt");String[] words = context.split(",");return words;});// 读取news文件内容CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取news文件内容");String context = CommonUtils.readFile("news.txt");return context;});CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (words, context) -> {// 替换操作CommonUtils.printThreadLog("替换操作");for (String word : words) {if(context.indexOf(word) > -1) {context = context.replace(word, "**");}}return context;});String filteredContext = combinedFuture.get();System.out.println("filteredContext = " + filteredContext);}
}

注意:当两个Future都完成时,才将两个异步任务的结果传递给thenCombine()的回调函数做进一步处理。

和以往一样,thenCombine 也存在异步回调变体版本

CompletableFuture<V> thenCombine(CompletableFuture<U> other, BiFunction<T,U,V> func)
CompletableFuture<V> thenCombineAsync(CompletableFuture<U> other, BiFunction<T,U,V> func)
CompletableFuture<V> thenCombineAsync(CompletableFuture<U> other, BiFunction<T,U,V> func,Executor executor)

4.3 合并多个异步任务 allOf / anyOf

我们使用thenCompose()和thenCombine()将两个CompletableFuture组合和合并在一起。

如果要编排任意数量的CompletableFuture怎么办?可以使用以下方法来组合任意数量的CompletableFuture

public static CompletableFuture<Void>	allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

CompletableFuture.allOf()用于以下情形中:有多个需要独立并行运行的Future,并在所有这些Future 都完成后执行一些操作。

需求:统计news1.txt、new2.txt、new3.txt 文件中包含CompletableFuture关键字的文件的个数

public class AllOfDemo {public static CompletableFuture<String> readFileFuture(String fileName) {return CompletableFuture.supplyAsync(() -> {String content = CommonUtils.readFile(fileName);return content;});}public static void main(String[] args) {// step 1: 创建List集合存储文件名List<String> fileList = Arrays.asList("news1.txt", "news2.txt", "news3.txt");// step 2: 根据文件名调用readFileFuture创建多个CompletableFuture,并存入List集合中List<CompletableFuture<String>> readFileFutureList = fileList.stream().map(fileName -> {return readFileFuture(fileName);}).collect(Collectors.toList());// step 3: 把List集合转换成数组待用,以便传入allOf方法中int len = readFileFutureList.size();CompletableFuture[] readFileFutureArr = readFileFutureList.toArray(new CompletableFuture[len]);// step 4: 使用allOf方法合并多个异步任务CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(readFileFutureArr);// step 5: 当多个异步任务都完成后,使用回调操作文件结果,统计符合条件的文件个数CompletableFuture<Long> countFuture = allOfFuture.thenApply(v -> {return readFileFutureList.stream().map(future -> future.join()).filter(content -> content.contains("CompletableFuture")).count();});// step 6: 主线程打印输出文件个数Long count = countFuture.join();System.out.println("count = " + count);}
}

顾名思义,当给定的多个异步任务中的有任意Future一个完成时,需要执行一些操作,可以使用 anyOf 方法

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

anyOf()返回一个新的CompletableFuture,新的CompletableFuture的结果和 cfs中已完成的那个异步任务结果相同。

演示案例:anyOf 执行过程

public class AnyOfDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {Tools.sleepMillis(2);return "Future1的结果";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {Tools.sleepMillis(1);return "Future2的结果";});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {Tools.sleepMillis(3);return "Future3的结果";});CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);// 输出Future2的结果System.out.println(anyOfFuture.get());}
}

在上面的示例中,当三个CompletableFuture中的任意一个完成时,anyOfFuture就完成了。 由于future2的睡眠时间最少,因此它将首先完成,最终结果将是"Future2的结果"。

注意:

  • anyOf() 方法返回类型必须是 CompletableFuture <Object>。
  • anyOf()的问题在于,如果您拥有返回不同类型结果的CompletableFuture,那么您将不知道最终CompletableFuture的类型。

5、异步任务的异常处理

在前面的章节中,我们并没有更多地关心异常处理的问题,其实,CompletableFuture 提供了优化处理异常的方式。

首先,让我们了解异常如何在回调链中传播

public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {int r = 1 / 0;return "result1";}).thenApply(result -> {return result + " result2";}).thenApply(result -> {return result + " result3";}).thenAccept((result)->{System.out.println(result);});}

如果在 supplyAsync 任务中出现异常,后续的 thenApply 和 thenAccept 回调都不会执行,CompletableFuture 将转入异常处理

如果在第一个 thenApply 任务中出现异常,第二个 thenApply 和 最后的 thenAccept 回调不会被执行,CompletableFuture 将转入异常处理,依次类推。

5.1 exceptionally()

exceptionally 用于处理回调链上的异常,回调链上出现的任何异常,回调链不继续向下执行,都在exceptionally中处理异常。

// Throwable表示具体的异常对象e
CompletableFuture<R> exceptionally(Function<Throwable, R> func)
public class ExceptionallyDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {int r = 1 / 0;return "result1";}).thenApply(result -> {String str = null;int len = str.length();return result + " result2";}).thenApply(result -> {return result + " result3";}).exceptionally(ex -> {System.out.println("出现异常:" + ex.getMessage());return "Unknown";});String ret = future.get();Tools.printThreadLog("最终结果:" + ret);}
}

 因为exceptionally只处理一次异常,所以常常用在回调链的末端。

5.2 handle()

CompletableFuture API 还提供了一种更通用的方法 handle() 表示从异常中恢复

handle() 常常被用来恢复回调链中的一次特定的异常,回调链恢复后可以进一步向下传递。

CompletableFuture<R> handle(BiFunction<T, Throwable, R> fn)
public class HandleDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {int r = 1 / 0;return "result";}).handle((ret, ex) -> {if(ex != null) {System.out.println("我们得到异常:" + ex.getMessage());return "Unknown!";}return ret;});String ret = future.get();CommonUtils.printThreadLog(ret);}
}

 如果发生异常,则res参数将为null,否则ex参数将为null。

需求:对回调链中的一次异常进行恢复处理

public class HandleExceptionDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {int r = 1 / 0;return "result1";}).handle((ret, ex) -> {if (ex != null) {System.out.println("我们得到异常:" + ex.getMessage());return "Unknown1";}return ret;}).thenApply(result -> {String str = null;int len = str.length();return result + " result2";}).handle((ret, ex) -> {if (ex != null) {System.out.println("我们得到异常:" + ex.getMessage());return "Unknown2";}return ret;}).thenApply(result -> {return result + " result3";});String ret = future.get();Tools.printThreadLog("最终结果:" + ret);}
}

和以往一样,为了提升并行化,异常处理可以方法单独的线程执行,以下是它们的异步回调版本

CompletableFuture<R> exceptionally(Function<Throwable, R> fn)
CompletableFuture<R> exceptionallyAsync(Function<Throwable, R> fn)  // jdk17+
CompletableFuture<R> exceptionallyAsync(Function<Throwable, R> fn,Executor executor) // jdk17+CompletableFuture<R> handle(BiFunction<T,Throwable,R> fn)
CompletableFuture<R> handleAsync(BiFunction<T,Throwable,R> fn)
CompletableFuture<R> handleAsync(BiFunction<T,Throwable,R> fn, Executor executor)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/726530.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从0开始学习NEON(2)

1、前言 继上一个例子&#xff0c;本次继续来学习NEON&#xff0c;本次学习NEON中向量拼接的操作&#xff0c;主要应用在图像的padding中。 https://blog.csdn.net/weixin_42108183/article/details/136440707 2、案例 2.1 案例1 在某些情况下&#xff0c;需要取在每个向量…

小程序环形进度条爬坑

在做微信小程序的时候&#xff0c;发现用canvas做的环形进度条&#xff0c;在带滚动条的view里面显示有闪动、显示不全的问题&#xff0c;后面改成echart-weixin的pie图实现了&#xff0c;option配置如下 // 表示进度的百分比 var progressValue 70;option {series: [{type: …

指数幂+力扣

题目 题目链接 . - 力扣&#xff08;LeetCode&#xff09; 题目描述 代码实现 class Solution { public:double myPow(double x, int n) {long t n;return t > 0 ? _myPow(x, t) : 1 / _myPow(x, -t);}double _myPow(double x, int n){if(n 0) return 1;double y _…

docker安装与配置-网络方式ftp方式

说明&#xff1a; 本文环境&#xff1a;CentOS 7 1、#ip地址配置(在xnode2的基础上) [rootxnode2 ~]# cat /etc/sysconfig/network-scripts/ifcfg-eno16777736TYPEEthernetBOOTPROTOstaticDEFROUTEyesPEERDNSyesPEERROUTESyesIPV4_FAILURE_FATALnoIPV6INITyesIPV6_AUTOCONF…

Python 文件基础科普与文件打开技术详解【第117篇—文件打开技术】

Python 文件基础科普与文件打开技术详解 在Python编程中&#xff0c;文件操作是一项基础而重要的任务。无论是读取数据、写入文件还是进行其他文件处理操作&#xff0c;都需要对文件基础知识有一定的了解。在本文中&#xff0c;我们将首先介绍Python文件的基础概念&#xff0c…

哈希专题 - leetcode 1. 两数之和 - 简单难度

leetcode 1. 两数之和 leetcode 1. 两数之和 简单难度 哈希1. 题目详情1. 原题链接2. 基础框架 2. 解题思路1. 题目分析2. 算法原理3. 时间复杂度 3. 代码实现4. 知识与收获 leetcode 1. 两数之和 简单难度 哈希 1. 题目详情 给定一个整数数组 nums 和一个整数目标值 target…

抽奖小程序怎么在线制作_引爆你的营销活动

抽奖小程序&#xff0c;轻松在线制作&#xff0c;引爆你的营销活动&#xff01; 在如今数字化时代&#xff0c;营销方式层出不穷&#xff0c;如何快速吸引用户眼球&#xff0c;提高品牌知名度&#xff0c;成为每个企业关注的焦点。今天&#xff0c;我要向大家介绍一款神奇的工…

用xshell7连接服务器,读取后台日志

有时候前端需要读取一些后台日志&#xff0c;比如&#xff0c;有时候接一些验证码啥的 或者有时候前后端不分离时&#xff0c;前端上线项目 先讲一下怎么用密码方式连接服务器 密码方式连接服务器 第一步&#xff0c;安装xshell&#xff0c;在新建会话中填写主机&#xff0…

Linux 下安装 Git

Linux 下安装 Git 1 参考2 安装2.1 通过 yum方式安装&#xff08;不推荐&#xff09;2.2 通过源码编译安装&#xff08;推荐&#xff09; 3 配置SSH 1 参考 Linux 下安装 Git 2 安装 2.1 通过 yum方式安装&#xff08;不推荐&#xff09; 在Linux上安装git仅需一行命令即可…

11 |「异步任务与多线程」

前言 实践是最好的学习方式&#xff0c;技术也如此。 文章目录 前言一、同步和异步的概念1、同步和异步任务2、线程 二、Android 多线程与 Handler 机制1、分类2、原则3、Handler 机制1&#xff09;问题&#xff08;背景&#xff09;2&#xff09;Handler 异步通信系统3&#x…

重塑生产格局:新质生产力引领下的新型工业操作系统和工业母机创新动向

新质生产力是创新起主导作用&#xff0c;摆脱传统经济增长方式、生产力发展路径&#xff0c;具有高科技、高效能、高质量特征&#xff0c;符合新发展理念的先进生产力质态。 **风口情报&#xff1a;**近日&#xff0c;中央经济工作会议首次提出“发展新质生产力”&#xff1b;…

作业1-32 P1059 [NOIP2006 普及组] 明明的随机数

题目 思路 根据题意&#xff0c;需要将读入的数据排序&#xff0c;去重。 参考代码 #include<bits/stdc.h> using namespace std; int n,a[5000],k;int main() {while(cin>>n){//读入数据for(int i0;i<n;i)cin>>a[i];sort(a,an);//排序int b[5000];in…

深度学习需要掌握哪些数学基础?

《深度学习的数学》这本书再合适不过了。作者罗纳德.T.纽塞尔&#xff08;Ronald T. Kneusel&#xff09;&#xff0c;拥有超过 20年的机器学习行业经验。 本书适合有一定深度学习基础、了解Python编程语言的读者阅读&#xff0c;也可作为用于拓展深度学习理论的参考书。 为什么…

CKA考试必备:解锁Pod封装多容器的高级技巧!

往期精彩文章 : 提升CKA考试胜算&#xff1a;一文带你全面了解RBAC权限控制&#xff01;揭秘高效运维&#xff1a;如何用kubectl top命令实时监控K8s资源使用情况&#xff1f;CKA认证必备&#xff1a;掌握k8s网络策略的关键要点提高CKA认证成功率&#xff0c;CKA真题中的节点维…

【MySQL】用户管理 -- 详解

如果我们只能使用 root 用户&#xff0c;这样存在安全隐患。这时就需要使用 MySQL 的用户管理。 一、 用户 1、用户信息 MySQL 中的用户都存储在系统数据库 MySQL 的 user 表中。 字段解释&#xff1a; host&#xff1a;表示这个用户可以从哪个主机登陆&#xff0c;如果…

【三】【SQL Server】如何运用SQL Server中查询设计器通关数据库期末查询大题

数据库学生选择1122 数据库展示 course表展示 SC表展示 student表展示 数据库学生选课1122_1 第一题 第二题 第三题 第四题 第五题 数据库学生选课1122_2 第六题 第七题 第八题 第九题 第十题 结尾 最后&#xff0c;感谢您阅读我的文章&#xff0c;希望这些内容能够对您有所启…

实践:qemu 运行 linux riscv with AIA(APLICIMSIC)

RISCV架构 Linux AIA支持 目标&#xff1a;在 Qemu 中运行一个支持 riscv aia 的 linux 翻译参考自&#xff1a;https://lwn.net/Articles/963231/ 文章日期&#xff1a;2024年2月22日&#xff0c;星期四&#xff08;截至2024年3月&#xff0c;最新&#xff09; 这个网站里在不…

Spring Boot工程集成验证码生成与验证功能教程

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

《Vite 报错》ReferenceError: module is not defined in ES module scope

ReferenceError: module is not defined in ES module scope 解决方案 postcss.config.js 要改为 postcss.config.cjs&#xff0c;也就是 .cjs 后缀。 原因解析 下图提示&#xff0c;packages.json 中的属性 type 设置为 module。所有 *.js 文件现在都被解释为 ESM&#xff…

电商店群系统的搭建需要用到的官方接口如何申请?

电商电子商务企业往往都会需要再很多平台上面铺货&#xff0c;上传商品。 高科技的今天&#xff0c;我们已经不需要手动一个个品去上传了。那通过官方接口&#xff0c;如何实现快速铺货呢&#xff1f; 1688官方开放平台的API接口类型众多&#xff0c;并不是所有的企业都能申请…