上次我们熟悉了ListenableFuture
。 我答应介绍更高级的技术,即转换和链接。 让我们从简单的事情开始。 假设我们有从某些异步服务获得的ListenableFuture<String>
。 我们还有一个简单的方法:
Document parse(String xml) {//...
我们不需要String
,我们需要Document
。 一种方法是简单地解析Future
( 等待它)并在String
上进行处理。 但是,更优雅的解决方案是在结果可用后立即应用转换,并将我们的方法视为始终返回ListenableFuture<Document>
。 这很简单:
final ListenableFuture<String> future = //...final ListenableFuture<Document> documentFuture = Futures.transform(future, new Function<String, Document>() {@Overridepublic Document apply(String contents) {return parse(contents);}
});
或更可读:
final Function<String, Document> parseFun = new Function<String, Document>() {@Overridepublic Document apply(String contents) {return parse(contents);}
};final ListenableFuture<String> future = //...final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);
Java语法有一定的局限性,但请专注于我们刚刚做的事情。 Futures.transform()
不会等待基础的ListenableFuture<String>
应用parse()
转换。 相反,它在后台注册了一个回调,希望在给定将来完成时得到通知。 在适当的时候对我们动态透明地应用了此转换。 我们仍然有Future
,但是这次包装了Document
。
因此,让我们更进一步。 我们还有一个异步的,可能长时间运行的方法,用于计算给定Document
相关性 (无论在这种情况下是什么):
ListenableFuturecalculateRelevance(Document pageContents) {//...
我们可以以某种方式将其与我们已经拥有的ListenableFuture<Document>
吗? 第一次尝试:
final Function<Document, ListenableFuture<Double>> relevanceFun = new Function<Document, ListenableFuture<Double>>() {@Overridepublic ListenableFuture<Double> apply(Document input) {return calculateRelevance(input);}
};final ListenableFuture<String> future = //...
final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);
final ListenableFuture<ListenableFuture<Double>> relevanceFuture = Futures.transform(documentFuture, relevanceFun);
哎哟! Double
Future的未来,看起来不太好。 一旦我们解决了外部的未来,我们也需要等待内部的未来。 绝对不优雅。 我们可以做得更好吗?
final AsyncFunction<Document, Double> relevanceAsyncFun = new AsyncFunction<Document, Double>() {@Overridepublic ListenableFuture<Double> apply(Document pageContents) throws Exception {return calculateRelevance(pageContents);}
};final ListenableFuture<String> future = //comes from ListeningExecutorService
final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);
final ListenableFuture<Double> relevanceFuture = Futures.transform(documentFuture, relevanceAsyncFun);
请非常仔细地查看所有类型和结果。 注意Function
和AsyncFunction
之间的区别。 最初,我们有一个异步方法返回String
future。 后来,我们对其进行了转换,以将String
无缝转换为XML Document
。 内部将来完成后,此转换将异步发生。 具有Document
future,我们想调用一个需要Document
并返回Double
future的方法。
如果调用relevanceFuture.get()
,则我们的Future
对象将首先等待内部任务完成,其结果( String
-> Document
)将等待外部任务并返回Double
。 我们还可以在relevanceFuture
上注册回调,该回调将在外部任务( calculateRelevance()
)完成时触发。 如果您仍然在这里,那就是更加疯狂的转变。
请记住,所有这些都是循环发生的。 对于每个网站,我们都有ListenableFuture<String>
,我们将其异步转换为ListenableFuture<Double>
。 所以最后我们使用List<ListenableFuture<Double>>
。 这也意味着,为了提取所有结果,我们要么为每个ListenableFuture
注册侦听器,要么等待它们中的每个。 根本没有进步。 但是,如果我们可以轻松地从List<ListenableFuture<Double>>
为ListenableFuture<List<Double>>
怎么办? 仔细阅读-从期货清单到清单的未来。 换句话说,不是拥有一堆小小的期货,而是有一个将在所有子期货都完成后完成的期货–并且将结果一对一映射到目标列表。 猜猜,Guava可以做到这一点!
final List<ListenableFuture<Double>> relevanceFutures = //...;
final ListenableFuture<List<Double>> futureOfRelevance = Futures.allAsList(relevanceFutures);
当然,这里也没有等待。 包装器ListenableFuture<List<Double>>
将在其子期货之一完成时得到通知。 最后一个孩子ListenableFuture<Double>
完成时,外部将来也完成。 一切都是事件驱动的,对您完全隐藏。
你认为就是这样吗? 假设我们要计算整个集合中最大的相关性。 您可能现在已经知道,我们不会等待List<Double>
。 相反,我们将注册从List<Double>
到Double
!
final ListenableFuture<Double> maxRelevanceFuture = Futures.transform(futureOfRelevance, new Function<List<Double>, Double>() {@Overridepublic Double apply(List<Double> relevanceList) {return Collections.max(relevanceList);}
});
最后,我们可以侦听maxRelevanceFuture
完成事件,并使用JMS发送结果(异步!)。 如果您迷路了,这是完整的代码:
private Document parse(String xml) {return //...
}private final Function<String, Document> parseFun = new Function<String, Document>() {@Overridepublic Document apply(String contents) {return parse(contents);}
};private ListenableFuture<Double> calculateRelevance(Document pageContents) {return //...
}final AsyncFunction<Document, Double> relevanceAsyncFun = new AsyncFunction<Document, Double>() {@Overridepublic ListenableFuture<Double> apply(Document pageContents) throws Exception {return calculateRelevance(pageContents);}
};//...final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)
);final List<ListenableFuture<Double>> relevanceFutures = new ArrayList<>(topSites.size());
for (final URL siteUrl : topSites) {final ListenableFuture<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {return IOUtils.toString(siteUrl, StandardCharsets.UTF_8);}});final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);final ListenableFuture<Double> relevanceFuture = Futures.transform(documentFuture, relevanceAsyncFun);relevanceFutures.add(relevanceFuture);
}final ListenableFuture<List<Double>> futureOfRelevance = Futures.allAsList(relevanceFutures);
final ListenableFuture<Double> maxRelevanceFuture = Futures.transform(futureOfRelevance, new Function<List<Double>, Double>() {@Overridepublic Double apply(List<Double> relevanceList) {return Collections.max(relevanceList);}
});Futures.addCallback(maxRelevanceFuture, new FutureCallback<Double>() {@Overridepublic void onSuccess(Double result) {log.debug("Result: {}", result);}@Overridepublic void onFailure(Throwable t) {log.error("Error :-(", t);}
});
它值得吗? 是的 , 没有 。 是的 ,因为我们了解了一些与期货/承诺一起使用的非常重要的构造和原语:链接,映射(转换)和归约。 该解决方案在CPU利用率方面非常出色-无需等待,阻塞等。请记住, Node.js的最大优势在于其“无阻塞”策略。 在Netty期货中也无处不在。 最后但并非最不重要的一点是,它感觉非常实用 。
另一方面,主要是由于Java语法冗长和缺乏类型推断(是的,我们将很快进入Scala),代码似乎非常难以阅读,难以遵循和维护。 好吧,在某种程度上,这适用于所有消息驱动的系统。 但是,只要我们不发明更好的API和原语,我们就必须学习生存并利用异步,高度并行的计算。
如果您想进一步尝试ListenableFuture
,请不要忘记阅读官方文档 。
参考: NoBlogDefFound博客中来自我们的JCG合作伙伴 Tomasz Nurkiewicz的高级ListenableFuture功能 。
翻译自: https://www.javacodegeeks.com/2013/03/advanced-listenablefuture-capabilities.html