Java 8即将到来,因此该学习新功能了。 尽管Java 7和Java 6只是次要的发行版,但版本8将向前迈出一大步。 也许太大了? 今天,我将为您详细介绍JDK 8中的新抽象– CompletableFuture<T>
。 众所周知,Java 8有望在不到一年的时间内发布,因此本文基于具有lambda支持的JDK 8 build 88 。 CompletableFuture<T>
通过提供功能性的单子(!)操作并促进异步的,事件驱动的编程模型(而不是在较早的Java中进行阻塞)来扩展Future<T>
。 如果您打开CompletableFuture<T>
JavaDoc,您肯定会不知所措。 大约有五十种方法 (!),其中一些方法非常隐秘和奇特,例如:
public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor)
别担心,请继续阅读。 CompletableFuture
使用SettableFuture
收集了番石榴中ListenableFuture
所有功能。 此外,内置的lambda支持使它更接近Scala / Akka期货 。 听起来好得令人难以置信,但请继续阅读。 CompletableFuture
有两个优于Future<T>
主要方面-异步回调/转换支持以及可以在任何时间从任何线程设置CompletableFuture
值的功能。
提取/修改包装值
通常,期货代表由其他线程运行的一段代码。 但这并非总是如此。 有时您想创建一个Future
表示某个已知事件,例如JMS消息到达 。 因此,您具有Future<Message>
但此未来没有任何异步作业。 您只想在JMS消息到达时完成(解决)将来,而这是由事件驱动的。 在这种情况下,您可以简单地创建CompletableFuture
,将其返回给客户端,并且只要您认为结果可用,就可以complete()
future并解锁等待该将来的所有客户端。
对于初学者,您可以简单地凭空创建新的CompletableFuture
并将其提供给您的客户:
public CompletableFuture<String> ask() {final CompletableFuture<String> future = new CompletableFuture<>();//...return future;
}
请注意,此未来与任何Callable<String>
,任何线程池,任何异步作业都没有关联。 如果现在客户端代码调用ask().get()
,它将永远阻塞。 如果它注册了一些完成回调,它们将永远不会触发。 那有什么意义呢? 现在您可以说:
future.complete("42")
…这时,所有在Future.get()
上阻止的客户端都将获得结果字符串。 完成回调也会立即触发。 当您要表示将来的任务时,这非常方便,但不一定要在某个执行线程上运行计算任务。 CompletableFuture.complete()
只能被调用一次,后续调用将被忽略。 但是有一个称为CompletableFuture.obtrudeValue(...)
的后门,它将用新值覆盖Future
先前值。 请谨慎使用。
有时您想发出失败的信号。 如您所知, Future
对象可以处理包装的结果或异常。 如果您想进一步传递一些异常,则可以使用CompletableFuture.completeExceptionally(ex)
(和obtrudeException(ex)
替代以前的异常的邪恶兄弟)。 completeExceptionally()
还会解锁所有正在等待的客户端,但是这次从get()
抛出异常。 说到get()
,还有CompletableFuture.join()
方法,在错误处理方面有一些细微的变化。 但总的来说,它们是相同的。 最后还有一个CompletableFuture.getNow(valueIfAbsent)
方法不会阻塞,但是如果Future
还没有完成,则返回默认值。 在构建我们不想等待太多的强大系统时很有用。
最后一个static
实用程序方法为completedFuture(value)
,该方法返回已完成的Future
对象。 对于测试或编写某些适配器层可能很有用。
创建并获取
好的,那么手动创建CompletableFuture
是我们唯一的选择吗? 不完全的。 与正常的Future
一样,我们可以使用以下工厂方法系列将现有任务包装到CompletableFuture
:
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
不将Executor
作为参数而是以...Async
结尾的方法将使用ForkJoinPool.commonPool()
(JDK 8中引入的全局通用池)。 这适用于CompletableFuture
类中的大多数方法。 runAsync()
很容易理解,请注意,它采用Runnable
,因此它返回CompletableFuture<Void>
因为Runnable
不返回任何内容。 如果需要异步处理某些东西并返回结果,请使用Supplier<U>
:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {//...long running...return "42";}
}, executor);
但是,嘿,我们在Java 8中有lambda!
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {//...long running...return "42";
}, executor);
甚至:
final CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);
本文与Lambda项目无关,但是我将广泛使用Lambda。
转换并作用于一个
所以我说CompletableFuture
优于Future
但是您还没有看到为什么? 简而言之,这是因为CompletableFuture
是一个monad和一个函子。 我帮不上忙吗? 当将来完成时, Scala和JavaScript都允许注册异步回调。 我们无需等待就可以准备就绪。 我们可以简单地说: 在结果到达时运行此函数 。 此外,我们可以堆叠这些函数,将多个Future组合在一起,等等。例如,如果我们有一个从String
到Integer
的函数,则可以将CompletableFuture<String>
为CompletableFuture<Integer
而不用拆开它。 这是通过thenApply()
系列方法实现的:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
如前所述...Async
为CompletableFuture
上的大多数操作提供了...Async
版本,因此在后续部分中将跳过它们。 只需记住,第一种方法将在将来完成的同一线程中应用函数,而其余两种将在不同的线程池中异步应用它。
让我们看看thenApply()
工作方式:
CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);
或在一个陈述中:
CompletableFuture<Double> f3 =f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);
您会在此处看到一系列转换。 从String
到Integer
,再到Double
。 但是最重要的是,这些转换既不会立即执行也不会阻塞。 只需记住它们,当原始f1
完成时便会为您执行。 如果某些转换很耗时,则可以提供自己的Executor
来异步运行它们。 请注意,此操作等效于Scala中的单子map
。
完成时运行代码(
CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenRun(Runnable action);
这两种方法是未来管道中典型的“最终”阶段。 它们使您可以在准备就绪时消费未来的价值。 在thenAccept()
提供最终值的同时, thenRun
执行了甚至无法访问计算值的Runnable
。 例:
future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor);
log.debug("Continuing");
...Async
变体也可用于两种方法,具有隐式和显式执行器。 我对此不够强调:
thenAccept()
/ thenRun()
方法不会阻塞 (即使没有显式executor
)。 像对待事件监听器/处理程序那样对待它们,将它们附加到将来,并将在将来执行。 即使future
甚至还没有完成, "Continuing"
消息也会立即出现。
单个
到目前为止,我们仅讨论了计算结果。 但是异常呢? 我们也可以异步处理它们吗? 当然!
CompletableFuture<String> safe =future.exceptionally(ex -> "We have a problem: " + ex.getMessage());
exceptionally()
接受一个函数,当原始的future抛出异常时将调用该函数。 然后,我们就有机会通过将此异常转换为与Future
的类型兼容的值来进行恢复。 safe
进一步转换将不再产生异常,而是从提供的函数返回的String
。
一个更灵活的方法是handle()
,它接受一个接收正确结果或异常的函数:
CompletableFuture<Integer> safe = future.handle((ok, ex) -> {if (ok != null) {return Integer.parseInt(ok);} else {log.warn("Problem", ex);return -1;}
});
总是调用handle()
,结果或异常参数都不为null
。 这是一站式的万能策略。
将两个
一个CompletableFuture
异步处理很不错,但是当多个此类期货以各种方式组合在一起时,它的确显示了其强大功能。
结合(链接)两个期货(
有时,您想根据未来的价值运行某些功能(准备就绪时)。 但是此函数也将返回将来。 CompletableFuture
应该足够聪明,以至于与CompletableFuture<CompletableFuture<T>>
相对,我们的函数结果现在应该用作顶级将来。 因此, thenCompose()
方法等效于Scala中的flatMap
:
<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
...Async
变体也可用。 在下面的示例中,当应用返回CompletableFuture<Double>
的calculateRelevance()
函数时,请仔细查看thenApply()
( map
)和thenCompose()
( flatMap
)之间的类型和区别:
CompletableFuture<Document> docFuture = //...CompletableFuture<CompletableFuture<Double>> f =docFuture.thenApply(this::calculateRelevance);CompletableFuture<Double> relevanceFuture =docFuture.thenCompose(this::calculateRelevance);//...private CompletableFuture<Double> calculateRelevance(Document doc) //...
thenCompose()
是一种必不可少的方法,它允许构建健壮的异步管道,而无需阻塞或等待中间步骤。
转换两个期货的价值(
尽管thenCompose()
用于链接一个依赖于另一个的期货,然后当两个都完成时, thenCombine
了两个独立的期货:
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
...Async
变体也可用。 假设您有两个CompletableFuture
,一个加载Customer
,另一个加载最近的Shop
。 它们彼此完全独立,但是当它们都完成时,您想使用它们的值来计算Route
。 这是一个剥离的示例:
CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture =customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));//...private Route findRoute(Customer customer, Shop shop) //...
请注意,在Java 8中,您可以使用以下简单的this::findRoute
方法参考来替换(cust, shop) -> findRoute(cust, shop)
:
customerFuture.thenCombine(shopFuture, this::findRoute);
这样您就知道了。 我们有customerFuture
和shopFuture
。 然后routeFuture
将它们包装起来并“等待”完成。 当它们都准备就绪时,它将运行我们提供的findRoute()
结果的函数( findRoute()
)。 因此,当两个基础的期货被解析并完成 findRoute()
时, routeFuture
将完成。
等待
如果不是只希望在完成结果时就通知两个结果,而不会产生新的CompletableFuture
,而是可以使用thenAcceptBoth()
/ runAfterBoth()
系列方法( ...Async
变体也可用)。 它们的工作方式与thenAccept()
和thenRun()
类似,但是要等待两个thenRun()
,而不是一个thenRun()
:
<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)
想象一下,在上面的示例中,您只是想发送一些事件或立即刷新GUI,而不是生成新的CompletableFuture<Route>
。 这可以通过thenAcceptBoth()
轻松实现:
customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {final Route route = findRoute(cust, shop);//refresh GUI with route
});
我希望我错了,但也许你们中的一些人在问自己一个问题: 为什么我不能简单地阻止这两个期货交易? 像这儿:
Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closestShop();
findRoute(customerFuture.get(), shopFuture.get());
好吧,当然可以。 但是CompletableFuture
的全部目的是允许异步的,事件驱动的编程模型,而不是阻塞并急于等待结果。 因此,从功能上讲,上面的两个代码段是等效的,但后者不必要地占用了一个执行线程。
等待第一个
CompletableFuture
API的另一个有趣的部分是能够等待第一个 (而不是全部 )完成的将来。 当您有两个任务产生相同类型的结果,而您只关心响应时间,而不关注哪个任务首先产生时,这会很方便。 API方法( ...Async
变体也可用):
CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block)
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)
例如,您要集成两个系统。 一个具有较小的平均响应时间但具有较高的标准偏差。 另一个通常较慢,但更可预测。 为了同时兼顾两个方面(性能和可预测性),您需要同时调用两个系统,并等待第一个系统完成。 通常它是第一个,但是如果变慢,第二个会在可接受的时间内结束:
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {System.out.println("Result: " + s);
});
s
表示来自fetchFast()
或fetchPredictably()
String
回复。 我们既不知道也不在乎。
改造先完成
applyToEither()
是一个大哥哥acceptEither()
当两个期货中的较快完成时,后者只是简单地调用一些代码,而applyToEither()
将返回一个新的期货。 当两个基础期货中的第一个完成时,该未来将完成。 API有点类似( ...Async
版本也可用):
<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn)
在完成的第一个Future的结果上调用额外的fn
函数。 我真的不确定这种专门方法的目的是什么,毕竟可以简单地使用: fast.applyToEither(predictable).thenApply(fn)
。 由于我们一直使用此API,但实际上并不需要额外的功能应用程序,因此我将仅使用Function.identity()
占位符:
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
CompletableFuture<String> firstDone =fast.applyToEither(predictable, Function.<String>identity());
然后可以传递firstDone
未来。 注意,从客户的角度来看,两个期货实际上落后于firstDone
是隐藏的。 客户端只是等待将来完成, applyToEither()
会在两者中的任何一个先完成时通知客户端。
将多个
因此,我们现在知道如何等待两个期货完成(使用thenCombine()
)和第一个期货完成( applyToEither()
)。 但是它可以扩展到任意数量的期货吗? 当然,使用static
助手方法:
static CompletableFuture<Void< allOf(CompletableFuture<?<... cfs)
static CompletableFuture<Object< anyOf(CompletableFuture<?<... cfs)
allOf()
接收一组期货,并返回一个期货,该期货在所有基础期货都完成时(屏障等待所有)完成。 另一方面, anyOf()
将仅等待最快的基础期货。 请查看退货期货的通用类型。 不太符合您的期望吗? 我们将在下一篇文章中解决这个问题。
摘要
我们研究了几乎整个CompletableFuture
API 。 我敢肯定,这是不堪重负的,因此在下一篇文章中,我们将很快利用CompletableFuture
功能和Java 8 lambda来开发简单的Web爬网程序的另一种实现。 我们还将研究CompletableFuture
缺点和不足。
翻译自: https://www.javacodegeeks.com/2013/05/java-8-definitive-guide-to-completablefuture.html