我想使用Java 8 CompletableFuture和Rx-Java Observable探索一个简单的分散聚集场景。
场景很简单–产生大约10个任务,每个任务返回一个字符串,最终将结果收集到一个列表中。
顺序的
其顺序版本如下:
public void testSequentialScatterGather() throws Exception {List<String> list =IntStream.range(0, 10).boxed().map(this::generateTask).collect(Collectors.toList());logger.info(list.toString());
}private String generateTask(int i) {Util.delay(2000);return i + "-" + "test";
}
随着CompletableFuture
可以使用称为supplyAsync的实用程序方法来使方法返回CompletableFuture,我正在使用此方法的一种变体,它接受要使用的显式Executor ,而且我故意为其中一个输入抛出异常:
private CompletableFuture<String> generateTask(int i,ExecutorService executorService) {return CompletableFuture.supplyAsync(() -> {Util.delay(2000);if (i == 5) {throw new RuntimeException("Run, it is a 5!");}return i + "-" + "test";}, executorService);
}
现在分散任务:
List<CompletableFuture<String>> futures =IntStream.range(0, 10).boxed().map(i -> this.generateTask(i, executors).exceptionally(t -> t.getMessage())).collect(Collectors.toList());
在分散任务结束时,结果是CompletableFuture列表。 现在,要从中获取String列表有些棘手,这里我使用Stackoverflow中建议的一种解决方案:
CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
这里仅使用CompletableFuture.allOf方法来构成下一步操作,一旦所有分散的任务都完成了,则一旦完成任务,期货就会再次流式传输并收集到一个字符串列表中。
然后可以异步显示最终结果:
result.thenAccept(l -> {logger.info(l.toString());
});
使用Rx-java Observable
使用Rx-java进行分散收集相对比CompletableFuture版本更干净,因为Rx-java提供了更好的方法将结果组合在一起,这也是执行分散任务的方法:
private Observable<String> generateTask(int i, ExecutorService executorService) {return Observable.<String>create(s -> {Util.delay(2000);if ( i == 5) {throw new RuntimeException("Run, it is a 5!");}s.onNext( i + "-test");s.onCompleted();}).onErrorReturn(e -> e.getMessage()).subscribeOn(Schedulers.from(executorService));
}
并分散任务:
List<Observable<String>> obs =IntStream.range(0, 10).boxed().map(i -> generateTask(i, executors)).collect(Collectors.toList());
我又有了一个Observable的列表,而我需要的是一个结果列表,Observable提供了一个合并方法来做到这一点:
Observable<List<String>> merged = Observable.merge(obs).toList();
可以订阅并在可用时打印结果:
merged.subscribe(l -> logger.info(l.toString()));
翻译自: https://www.javacodegeeks.com/2015/08/using-java-8-completablefuture-and-rx-java-observable.html