rxjava 并行
通过RxJava 1.1.1中引入的Completable
抽象,如何并行执行阻止“仅副作用”(也称为void)任务的并行执行变得更加容易。 “
正如您可能已经注意到,阅读我的博客时,我主要专注于软件Craft.io和自动代码测试。 但是,此外,我还是持续交付和广义并发的狂热者。 最后一点从C中的纯线程和信号量到更高级别的解决方案(例如ReactiveX和actor模型)不等。 这次是全新RxJava 1.1.1 – rx.Completable
引入的非常方便(在特定情况下)功能的用例。 与我的许多博客条目类似,这也反映了我在处理实际任务和用例时遇到的实际事件。
要做的任务
想象一下,一个系统对来自不同来源的异步事件进行了非常复杂的处理。 过滤,合并,转换,分组,丰富等等。 RxJava非常适合这里,特别是如果我们想要React式的话。 假设我们已经实现了它(外观和效果很好),只剩下一件事了。 在开始处理之前,需要告知3个外部系统我们准备好接收消息。 对遗留系统的3个同步调用(通过RMI,JMX或SOAP)。 它们每个都可以持续几秒钟,我们需要等待所有它们之后才能开始。 幸运的是,它们已经实现,我们将它们视为可能成功(或因异常而失败)的黑匣子。 我们只需要调用它们(最好同时调用)并等待完成即可。
rx.Observable –方法1
触手可及的RxJava似乎是显而易见的方法。 首先,可以使用Observable
来包装作业执行:
private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}
不幸的是(在我们的例子中) Observable
期望返回一些元素。 我们需要使用Void
并且尴尬地return null
(而不是仅仅引用方法job::execute
。
接下来,我们可以使用subscribeOn()
方法来使用另一个线程来执行我们的作业(而不是阻塞主/当前线程–我们不想顺序执行我们的作业)。 Schedulers.io()
为调度Schedulers.io()
提供了一组用于IO绑定工作的线程。
Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());
最后,我们需要等待所有它们完成(所有Obvervable
s完成)。 为此,可以调整zip功能。 它结合了Obserbable
拉链发射的物品的序列号。 在我们的例子中,我们只对每个Observable
到的作业中的第一个伪项目感兴趣(我们仅发出null
以满足API),并以阻塞的方式等待它们。 zip运算符中的zip函数需要返回某些内容,因此我们需要重复null
的变通方法。
Observable.zip(run1, run2, (r1, r2) -> return null).toBlocking().single();
显而易见, Observable
设计为Observable
使用值流,并且需要进行一些额外的工作才能将其调整为仅产生副作用(不返回任何值)操作。 当我们需要将仅具有副作用的操作与其他返回一些值的值合并(例如合并)时,情况变得更加糟糕–需要更丑陋的转换。 请参阅RxNetty API的实际用例 。
public void execute() {Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.zip(run1, run2, (r1, r2) -> null).toBlocking().single();
}private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}
rx.Observable –方法2
可能会使用另一种方法。 代替生成人工项目,可以将我们的任务的空Observable
作为onComplete
操作执行。 这迫使我们从zip
操作切换到merge
。 结果,我们需要提供一个onNext
动作(对于空的Observable
永远不会执行),这肯定了我们试图破解该系统的信念。
public void execute() {Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.merge(run1, run2).toBlocking().subscribe(next -> {});
}private Observable<Object> rxJobExecute(Job job) {return Observable.empty().doOnCompleted(job::execute);
}
rx.Completable
RxJava 1.1.1解决了对不返回任何值的Observable的更好支持。 Completable
可以视为Observable
的简化版本,可以成功完成(发出onCompleted
事件)或失败( onError
)。 创建Completable
实例的最简单方法是使用fromAction
方法,该方法采用不返回任何值的Action0
(例如Runnable
)。
Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());
Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());
接下来,我们可以使用merge()
方法,该方法返回一个Completable
实例,该实例立即订阅所有下游Completable
,并在它们全部完成(或其中一个失败)时完成。 由于我们在外部调度程序中使用了subscribeOn
方法,因此所有作业都是并行执行的(在不同的线程中)。
Completable.merge(completable1, completable2).await();
await()
方法将阻塞,直到所有作业完成(如果发生错误,将重新抛出异常)。 纯净而简单。
public void execute() {Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());Completable.merge(completable1, completable2).await();
}
java.util.concurrent.CompletableFuture
有人可能会问:为什么不只使用CompletableFuture
? 这将是一个很好的问题。 Java 5中引入的纯Future
可能需要我们做更多的工作,而ListenableFuture
(来自Guava)和CompletableFuture
(来自Java 8)使其变得微不足道。
首先,我们需要运行/安排作业执行。 接下来,使用CompletableFuture.allOf()
方法,我们可以创建一个新的CompletableFuture
,它在所有作业完成时就完成了(我们以前没有看到过这个概念吗?)。 get()
方法只是阻止等待。
public void execute() {try {CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute);CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute);CompletableFuture.allOf(run1, run2).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Jobs execution failed", e);}
}
我们需要对受检查的异常做一些事情(很多时候我们不想使用它们来污染我们的API),但是总的来说,这看起来很明智。 但是,值得记住的是,当需要更复杂的链处理时, CompletableFuture
不足。 另外,在我们的项目中已经使用RxJava时,使用相同(或相似)的API而不是引入全新的东西通常会很有用。
摘要
多亏了rx.Completable
,使用RxJava仅完成副作用(不返回任何内容)任务的执行更加轻松。 在已经使用RxJava的代码库中,即使在简单情况下,它也可能比CompletableFuture
更受欢迎。 但是, Completable
提供了许多先进的操作员和技术,此外,还可以轻松地将它与Observable
混合使用,这使其功能更加强大。
要了解有关Completable
更多信息,您可能需要查看发行说明 。 对于那些想深入了解主题的人,Advanced RxJava博客( 第1部分和第2 部分 )上有关于Completable API的非常详细的介绍。
- 可以从GitHub获得代码示例的源代码。
顺便说一句,如果您总体上对RxJava感兴趣,我可以凭良心向您推荐一本书,该书目前由Tomasz Nurkiewicz和Ben Christensen – RxJava的React式编程编写 。
翻译自: https://www.javacodegeeks.com/2016/03/parallel-execution-blocking-tasks-rxjava-completable.html
rxjava 并行