标题不是错误。 rx.Observable
1.x的io.reactivex.Observable
与2.x的io.reactivex.Observable
完全不同。 盲目升级rx
依赖关系并重命名项目中的所有导入将进行编译(稍作更改),但不能保证相同的行为。 在项目的早期, Observable
in 1.x中没有背压的概念,但后来包含了背压。 这到底是什么意思? 假设我们有一个流,每1毫秒产生一个事件,但是处理一个这样的项目需要1 秒 。 您会发现从长远来看,它不可能以这种方式工作:
import rx.Observable; //RxJava 1.x
import rx.schedulers.Schedulers;Observable.interval(1, MILLISECONDS).observeOn(Schedulers.computation()).subscribe(x -> sleep(Duration.ofSeconds(1)));
MissingBackpressureException
在几百毫秒内MissingBackpressureException
。 但是这个异常是什么意思呢? 好吧,基本上,这是一个安全网(或如果有的话,请进行健全性检查),以防止损害应用程序。 RxJava自动发现生产者溢出了消费者,并主动终止了流以避免进一步的损害。 那么,如果我们只是在这里和那里搜索并替换少量进口商品,该怎么办?
import io.reactivex.Observable; //RxJava 2.x
import io.reactivex.schedulers.Schedulers;Observable.interval(1, MILLISECONDS).observeOn(Schedulers.computation()).subscribe(x -> sleep(Duration.ofSeconds(1)));
例外不见了! 我们的吞吐量也是如此……一段时间后,应用程序停止运行,一直处于无休止的GC循环中。 您会看到,RxJava 1.x中的Observable
在各处都有断言(绑定队列,检查等),确保您没有在任何地方溢出。 例如,默认情况下,1.x中的observeOn()
运算符的队列限制为128个元素。 当在整个堆栈上正确实现背压时, observeOn()
运算符会要求上游传递不超过128个元素来填充其内部缓冲区。 然后,与此调度程序分开的线程(工作人员)从该队列中拾取事件。 当队列几乎变空时, observeOn()
运算符会要求更多( request()
方法)。 当生产者不遵守背压请求并发送比允许的更多的数据时,此机制就会破裂,从而使消耗者有效溢出。 observeOn()
运算符内的内部队列已满,而interval()
运算符仍在发出新事件-因为这就是interval()
要做的事情。
在1.x中Observable
到的发现了这种溢出,并通过MissingBackpressureException
快速失败。 字面上的意思是: 我尽了最大努力使系统保持健康状态,但是我的上游不尊重背压-缺少背压实现 。 但是,在2.x中Observable
到的没有这种安全机制。 希望你会成为一个好公民,生产缓慢或消费快速的人。 当系统运行Observable
,两个Observable
的行为相同。 但是,在1.x负载下,快速失败,而2.x则缓慢而痛苦地失败。
这是否意味着RxJava 2.x向后退? 恰恰相反! 在2.x中,有一个重要的区别:
-
Observable
不在乎背压,这极大地简化了其设计和实现。 根据定义,它应用于建模不支持背压的流,例如用户界面事件 -
Flowable
确实支持背压,并已采取所有安全措施。 换句话说,计算管道中的所有步骤都确保您不会溢出使用者。
2.x在可以支持背压的流(简单地说“ 如果需要可以减慢 ”)和不支持背压的流之间进行了重要区分。 从类型系统的角度来看,很清楚,我们正在处理哪种源及其保证。 那么我们应该如何将interval()
示例迁移到RxJava 2.x? 比您想像的容易:
Flowable.interval(1, MILLISECONDS).observeOn(Schedulers.computation()).subscribe(x -> sleep(Duration.ofSeconds(1)));
这么简单。 您可能会问自己一个问题,为什么Flowable
可以有一个interval()
运算符,按照定义,它不能支持背压? 假设所有interval()
均以恒定速率传递事件后,它就不会放慢速度! 好吧,如果您看一下interval()
的声明,您会注意到:
@BackpressureSupport(BackpressureKind.ERROR)
简而言之,这告诉我们,每当无法再保证背压时,RxJava都会处理它并抛出MissingBackpressureException
。 这正是我们运行Flowable.interval()
程序时发生的事情–它快速失败,而不是破坏整个应用程序的稳定性。
因此,总结起来,每当您从1.x看到Observable
时,您可能想要的是从2.x开始的Flowable
。 至少,除非您的流按定义不支持背压。 尽管名称相同,但这两个主要版本中的Observable
很大不同。 但是,一旦您进行搜索并从Observable
替换为Flowable
您将注意到迁移并不是那么简单。 这与API更改无关,区别更加深刻。
在2.x中没有直接等效于Observable.create()
简单Flowable.create()
。 过去我过度使用Observable.create()
工厂方法是一个错误。 create()
允许您以任意速率发出事件,而完全忽略了背压。 2.x拥有一些友好的功能来处理背压请求,但它们需要仔细设计流。 这将在下一个常见问题解答中介绍。
翻译自: https://www.javacodegeeks.com/2017/08/1-x-2-x-migration-observable-vs-observable-rxjava-faq.html