尽管CompletableFuture
大约是两年前(!)于2014年3月在Java 8中引入的,但它仍然是一个相对较新的概念。但是,此类不是很广为人知是一件好事,因为它很容易被滥用,尤其是在线程和线程方面。一路涉及的线程池。 本文旨在描述如何将线程与CompletableFuture
一起使用。
运行任务
这是API的基本部分。 有一个便捷的supplyAsync()
方法类似于ExecutorService.submit()
,但是返回CompletableFuture
:
CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {try (InputStream is = new URL("http://www.nurkiewicz.com").openStream()) {log.info("Downloading");return IOUtils.toString(is, StandardCharsets.UTF_8);} catch (IOException e) {throw new RuntimeException(e);}});
问题是, supplyAsync()
默认情况下使用ForkJoinPool.commonPool()
,所有CompletableFuture
,所有并行流以及部署在同一JVM上的所有应用程序之间共享的线程池(如果不幸的是,仍然使用具有许多已部署工件的应用程序服务器) 。 这个硬编码的,不可配置的线程池完全在我们的控制范围之外,难以监视和扩展。 因此,您应该始终指定自己的Executor
,例如此处(并查看我如何创建一个的一些技巧 ):
ExecutorService pool = Executors.newFixedThreadPool(10);final CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {//...}, pool);
但这仅仅是开始……
回调和转换
假设您要转换给定的CompletableFuture
,例如,提取String
的长度:
CompletableFuture<Integer> intFuture =future.thenApply(s -> s.length());
究竟是谁调用s.length()
代码? 坦白地说,我亲爱的开发人员,我们不给该死[1] 。 只要像thenApply
这样的所有运算符中的lambda表达式thenApply
便宜,我们就不在乎谁调用它。 但是,如果此表达式花费一点CPU时间来完成或进行阻塞的网络调用怎么办?
首先,默认情况下会发生什么? 想想看:我们有一个String
类型的后台任务,我们想在该值完成后异步应用一些特定的转换。 最简单的实现方法是包装原始任务(返回String
),并在完成任务时对其进行拦截。 内部任务完成后,我们的回调开始,应用转换并返回修改后的值。 这就像介于我们的代码和原始计算结果之间的一个方面。 话虽这么说,很明显s.length()
转换将在与原始任务相同的线程中执行,是吗? 不完全的!
CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {sleepSeconds(2);return "ABC";}, pool);future.thenApply(s -> {log.info("First transformation");return s.length();
});future.get();
pool.shutdownNow();
pool.awaitTermination(1, TimeUnit.MINUTES);future.thenApply(s -> {log.info("Second transformation");return s.length();
});
当任务仍在运行时,将注册thenApply()
的第一个转换。 因此,它将在任务完成后立即在与任务相同的线程中执行。 但是,在注册第二个转换之前,我们要等到任务实际完成为止。 更糟糕的是,我们完全关闭了线程池,以确保在那里没有其他代码可以执行。 那么哪个线程将运行第二次转换? 我们知道它必须立即发生,因为future
我们在已经完成的回调上进行注册。 事实证明,默认情况下使用客户端线程(!)! 输出如下:
pool-1-thread-1 | First transformation main | Second transformation
在注册了第二个转换后,它意识到CompletableFuture
已经完成,因此它立即执行了转换。 周围没有其他线程,因此在当前main
线程的上下文中调用thenApply()
。 当实际的转换成本很高时,就会出现这种行为容易出错的最大原因。 想象一下thenApply()
lambda表达式进行了一些繁重的计算或阻塞了网络调用。 突然,我们的异步CompletableFuture
阻止了调用线程!
控制回调的线程池
有两种技术可以控制哪个线程执行我们的回调和转换。 请注意,仅当您的转换成本很高时才需要这些解决方案。 否则,差异可以忽略不计。 因此,首先我们可以选择*Async
版本的运算符,例如:
future.thenApplyAsync(s -> {log.info("Second transformation");return s.length();
});
这次,第二个转换自动卸载给我们的朋友ForkJoinPool.commonPool()
:
pool-1-thread-1 | First transformation
ForkJoinPool.commonPool-worker-1 | Second transformation
但是我们不喜欢commonPool
所以我们提供自己的:
future.thenApplyAsync(s -> {log.info("Second transformation");return s.length();
}, pool2);
请注意,使用了不同的线程池( pool-1
与pool-2
):
pool-1-thread-1 | First transformation
pool-2-thread-1 | Second transformation
将回调视为另一个计算步骤
但是我相信,如果您在长时间运行的回调和转换方面遇到麻烦(请记住,本文适用于CompletableFuture
上的几乎所有其他方法),则应该简单地使用另一个显式的CompletableFuture
,例如:
//Imagine this is slow and costly
CompletableFuture<Integer> strLen(String s) {return CompletableFuture.supplyAsync(() -> s.length(),pool2);
}//...CompletableFuture<Integer> intFuture = future.thenCompose(s -> strLen(s));
这种方法更加明确。 知道我们的转换成本很高,因此我们不冒险在任意或不受控制的线程上运行它。 相反,我们将其显式建模为从String
到CompletableFuture<Integer>
异步操作。 但是,我们必须将thenApply()
替换为thenCompose()
,否则最终将获得CompletableFuture<CompletableFuture<Integer>>
。
但是,如果我们的转换没有一个与嵌套CompletableFuture
applyToEither()
的版本,例如, applyToEither()
等待第一个Future
完成并应用转换,该怎么办?
CompletableFuture<CompletableFuture<Integer>> poor = future1.applyToEither(future2, s -> strLen(s));
有一个方便的技巧可以“解包”这种晦涩的数据结构,称为flatten
,可以使用flatMap(identity)
(或flatMap(x -> x)
)轻松实现。 在我们的例子中, flatMap()
称为thenCompose
( duh! ):
CompletableFuture<Integer> good = poor.thenCompose(x -> x);
我由您自己决定如何运作以及为什么运作。 我希望本文CompletableFuture
您更清楚地了解如何在CompletableFuture
中涉及线程。
翻译自: https://www.javacodegeeks.com/2015/12/thread-executes-completablefutures-tasks-callbacks.html