在Java 8中全面研究了CompletableFuture
API之后,我们准备编写一个简单的Web搜寻器。 我们已经使用ExecutorCompletionService
, Guava ListenableFuture
和Scala / Akka解决了类似的问题。 我选择了相同的问题,以便轻松比较方法和实现技术。
首先,我们将定义一个简单的阻止方法来下载单个URL的内容:
private String downloadSite(final String site) {try {log.debug("Downloading {}", site);final String res = IOUtils.toString(new URL("http://" + site), UTF_8);log.debug("Done {}", site);return res;} catch (IOException e) {throw Throwables.propagate(e);}
}
没有什么花哨。 稍后将为线程池内的其他站点调用此方法。 另一种方法将String
解析为XML Document
(让我省略实现,没有人愿意看一下它):
private Document parse(String xml) //...
最后,我们算法的核心是以Document
为输入的每个网站的功能计算相关性 。 就像上面我们不在乎实现一样,只有签名很重要:
private CompletableFuture<Double> calculateRelevance(Document doc) //...
让我们把所有的东西放在一起。 抓取到一份网站列表后,我们的搜寻器应开始异步并发下载每个网站的内容。 然后,将每个下载HTML字符串解析为XML Document
并随后计算相关性 。 最后,我们采用所有计算出的相关性指标并找到最大的指标。 当您意识到下载内容和计算相关性都是异步的(返回CompletableFuture
)并且我们绝对不想阻塞或忙于等待时,这听起来很简单。 这是第一部分:
ExecutorService executor = Executors.newFixedThreadPool(4);List<String> topSites = Arrays.asList("www.google.com", "www.youtube.com", "www.yahoo.com", "www.msn.com"
);List<CompletableFuture<Double>> relevanceFutures = topSites.stream().map(site -> CompletableFuture.supplyAsync(() -> downloadSite(site), executor)).map(contentFuture -> contentFuture.thenApply(this::parse)).map(docFuture -> docFuture.thenCompose(this::calculateRelevance)).collect(Collectors.<CompletableFuture<Double>>toList());
实际上这里有很多事情。 定义要爬网的线程池和站点是显而易见的。 但是这种链式表达式计算relevanceFutures
。 最后的map()
和collect()
的序列具有很强的描述性。 从网站列表开始,我们通过将异步任务( downloadSite()
)提交到线程池中,将每个网站( String
)转换为CompletableFuture<String>
。
因此,我们有了CompletableFuture<String>
。 我们继续对其进行转换,这一次在每个parse()
上都应用了parse()
方法。 请记住,当基础将来完成时, thenApply()
将调用提供的lambda并立即返回CompletableFuture<Document>
。 第三个也是最后一个转换步骤是使用calculateRelevance()
将输入列表中的每个CompletableFuture<Document>
组成。 请注意, calculateRelevance()
返回CompletableFuture<Double>
而不是Double
,因此我们使用thenCompose()
而不是thenApply()
。 经过这么多阶段,我们终于collect()
了CompletableFuture<Double>
。
现在,我们想对所有结果进行一些计算。 我们有一份期货清单,我们想知道所有这些期货(最后一个)何时完成。 当然,我们可以在每个将来注册完成回调,并使用CountDownLatch
阻止直到调用所有回调。 我对此很懒,让我们利用现有的CompletableFuture.allOf()
。 不幸的是,它有两个小缺点-使用vararg而不是Collection
,并且不返回将来的合计结果,而是返回Void
。 通过汇总结果,我的意思是:如果我们提供List<CompletableFuture<Double>>
该方法应返回CompletableFuture<List<Double>>
,而不是CompletableFuture<Void>
! 幸运的是,使用一些粘合代码很容易修复:
private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {CompletableFuture<Void> allDoneFuture =CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));return allDoneFuture.thenApply(v ->futures.stream().map(future -> future.join()).collect(Collectors.<T>toList()));
}
仔细观察sequence()
参数和返回类型。 实现非常简单,诀窍是使用现有的allOf()
但是当allDoneFuture
完成时(这意味着所有基础期货都已完成),只需遍历所有期货并在每个期货上进行join()
(阻塞等待)。 但是,由于目前所有期货都已完成,因此保证此电话不会被阻止! 有了这种实用程序,我们终于可以完成我们的任务:
CompletableFuture<List<Double>> allDone = sequence(relevanceFutures);
CompletableFuture<OptionalDouble> maxRelevance = allDone.thenApply(relevances ->relevances.stream().mapToDouble(Double::valueOf).max()
);
这很容易–当allDone
完成后,应用我们的功能即可计算整个集合中的最大相关性。 maxRelevance
仍然是未来。 到JVM到达这一行时,可能尚未下载任何网站。 但是我们将业务逻辑封装在期货之上,并以事件驱动的方式将其堆叠。 代码保持可读性(不带lambda和普通Future
的版本至少要长两倍),但避免阻塞主线程。 当然, allDone
也可以作为中间步骤,我们可以对其进行进一步的转换,而实际上还没有结果。
缺点
Java 8中的CompletableFuture
是向前迈出的一大步。 从对异步任务的细微抽象到功能完善,功能丰富的实用程序。 但是,在玩了几天之后,我发现了一些小缺点:
- 返回前面提到的
CompletableFuture<Void>
CompletableFuture.allOf()
。 我认为可以这样说:如果我通过一组期货并希望等待所有这些期货,那么我也想在它们容易到达时提取结果。 使用CompletableFuture.anyOf()
甚至更糟。 如果我等待任何期货完成,那么我将无法想象传递不同类型的期货,比如说CompletableFuture<Car>
和CompletableFuture<Restaurant>
。 如果我不在乎哪个先完成,那么我该如何处理返回类型? 通常,您将传递同类期货的集合(例如CompletableFuture<Car>
),然后anyOf()
可以简单地返回该类型的期货(而不是再次代替CompletableFuture<Void>
)。 - 混合可设置和可听的抽象。 在番石榴中,有
ListenableFuture
和SettableFuture
扩展。ListenableFuture
允许注册回调,而SettableFuture
增加了从任意线程和上下文设置(解析)将来值的可能性。CompletableFuture
与SettableFuture
等效,但是没有等效于ListenableFuture
受限版本。 为什么会出问题呢? 如果API返回CompletableFuture
,然后有两个线程等待它完成(这没什么问题),那么其中一个线程可以解决此将来并唤醒其他线程,而只有API实现才可以执行此操作。 但是,当API尝试在以后解决将来时,对complete()
调用将被忽略。 它可能会导致真正令人讨厌的错误,在Guava中,将这两个责任分开可以避免。 - 在JDK中,
CompletableFuture
被忽略。 未对ExecutorService
进行改装以返回CompletableFuture
。 从字面上看,CompletableFuture
在JDK中未引用任何地方。 这是一个非常有用的类,与Future
向下兼容,但在标准库中并未真正推广。 - 膨胀的API(?)总共50种方法,大多数为三种形式。 拆分可设置和可听 (见上文)将有所帮助。 同样,恕我直言,诸如
runAfterBoth()
或runAfterEither()
类的某些方法runAfterBoth()
并不属于任何CompletableFuture
。fast.runAfterBoth(predictable, ...)
和predictable.runAfterBoth(fast, ...)
之间有区别吗? 否,但是API支持两者之一。 实际上,我相信runAfterBoth(fast, predictable, ...)
更好地表达我的意图。 -
CompletableFuture.getNow(T)
应该使用Supplier<T>
而不是原始引用。 在下面的示例中,无论将来是否完成,expensiveAlternative()
始终是代码:future.getNow(expensiveAlternative());
但是,我们可以轻松地调整此行为(我知道,这里有一个小的竞争条件,但是原始的getNow()也可以这种方式工作):
public static <T> T getNow(CompletableFuture<T> future,Supplier<T> valueIfAbsent) throws ExecutionException, InterruptedException {if (future.isDone()) {return future.get();} else {return valueIfAbsent.get();} }
使用此实用程序方法,我们可以避免在不需要时调用
expensiveAlternative()
:getNow(future, () -> expensiveAlternative()); //or: getNow(future, this::expensiveAlternative);
总体而言, CompletableFuture
是我们JDK腰带中的一款出色的新工具。 较小的API问题,有时由于有限的类型推断而导致语法过于冗长,这不会阻止您使用它。 至少它为更好的抽象和更健壮的代码奠定了坚实的基础。
翻译自: https://www.javacodegeeks.com/2013/05/java-8-completablefuture-in-action.html