冷热复位
我自己对“热和冷可观测”的理解还很不稳定,但这是我到目前为止所了解的!
冷观测
考虑一个返回rx-java Observable的API:
import obs.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;public class Service1 {private static final Logger logger = LoggerFactory.getLogger(Service1.class);public Observable<String> operation() {return Observable.<String>create(s -> {logger.info("Start: Executing slow task in Service 1");Util.delay(1000);s.onNext("data 1");logger.info("End: Executing slow task in Service 1");s.onCompleted();}).subscribeOn(Schedulers.computation());}
}
现在,首先要注意的是,典型的Observable在订阅之前不会做任何事情:
所以基本上,如果我要这样做:
Observable<String> op1 = service1.operation();
除非通过以下方式在Observable上进行订阅,否则不会打印或返回任何内容:
Observable<String> op1 = service1.operation();CountDownLatch latch = new CountDownLatch(1);op1.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());latch.await();
因此,现在,如果此Observable上有多个订阅,会发生什么情况:
Observable<String> op1 = service1.operation();CountDownLatch latch = new CountDownLatch(3);op1.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());op1.subscribe(s -> logger.info("From Subscriber 2: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());op1.subscribe(s -> logger.info("From Subscriber 3: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());latch.await();
有了冷的可观察到的代码,代码将再次被调用并再次发出项目,这在我的机器上得到了:
06:04:07.206 [RxComputationThreadPool-2] INFO o.b.Service1 - Start: Executing slow task in Service 1
06:04:07.208 [RxComputationThreadPool-3] INFO o.b.Service1 - Start: Executing slow task in Service 1
06:04:08.211 [RxComputationThreadPool-2] INFO o.b.BasicObservablesTest - From Subscriber 2: data 1
06:04:08.211 [RxComputationThreadPool-1] INFO o.b.BasicObservablesTest - From Subscriber 1: data 1
06:04:08.211 [RxComputationThreadPool-3] INFO o.b.BasicObservablesTest - From Subscriber 3: data 1
06:04:08.213 [RxComputationThreadPool-2] INFO o.b.Service1 - End: Executing slow task in Service 1
06:04:08.214 [RxComputationThreadPool-1] INFO o.b.Service1 - End: Executing slow task in Service 1
06:04:08.214 [RxComputationThreadPool-3] INFO o.b.Service1 - End: Executing slow task in Service 1
热可观察–使用ConnectableObservable
另一方面,Hot Observable确实不需要订阅即可开始发射项目。 一种实现Hot Observable的方法是使用ConnectableObservable ,它是一个Observable,它在调用connect方法之前不会发出项目,但是一旦开始发出项目,它的任何订阅者只能从订阅点获取项目。 因此,再次回顾前面的示例,但使用ConnectableObservable代替:
Observable<String> op1 = service1.operation();ConnectableObservable<String> connectableObservable = op1.publish();CountDownLatch latch = new CountDownLatch(3);connectableObservable.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());connectableObservable.subscribe(s -> logger.info("From Subscriber 2: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());connectableObservable.subscribe(s -> logger.info("From Subscriber 3: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());connectableObservable.connect();latch.await();
并打印以下内容:
06:07:23.852 [RxComputationThreadPool-3] INFO o.b.Service1 - Start: Executing slow task in Service 1
06:07:24.860 [RxComputationThreadPool-3] INFO o.b.ConnectableObservablesTest - From Subscriber 1: data 1
06:07:24.862 [RxComputationThreadPool-3] INFO o.b.ConnectableObservablesTest - From Subscriber 2: data 1
06:07:24.862 [RxComputationThreadPool-3] INFO o.b.ConnectableObservablesTest - From Subscriber 3: data 1
06:07:24.862 [RxComputationThreadPool-3] INFO o.b.Service1 - End: Executing slow task in Service 1
热点可观察–使用主题
将冷的Observable转换为高温的另一种方法是使用Subject 。 主题既表现为可观察者,又表现为观察者,有不同类型的主题具有不同的行为。 在这里,我使用一个名为PublishSubject的Subject,它具有Pub / Sub行为–这些项目被发送给所有在其上监听的订阅者。 因此,随着PublishSubject的引入,代码如下所示:
Observable<String> op1 = service1.operation();PublishSubject<String> publishSubject = PublishSubject.create();op1.subscribe(publishSubject);CountDownLatch latch = new CountDownLatch(3);publishSubject.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());publishSubject.subscribe(s -> logger.info("From Subscriber 2: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());publishSubject.subscribe(s -> logger.info("From Subscriber 3: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());latch.await();
了解如何将PublishSubject作为Observable的订阅者引入,而其他订阅者则如何订阅PublishSubject。 输出将类似于ConnectableObservable的输出。
从本质上来说,这就是我对“热可观察”的理解程度。 因此,总而言之,Cold和Hot Observable之间的区别在于订户何时获得发射的项目以及何时发射项目–使用Cold Observable,它们在订阅并通常获得所有发射的项目时发射,一个Hot Observable,项目将在没有订阅服务器的情况下发出,而订阅者通常会在订阅点之后获得项目。
参考
- http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html
- Rx-java上的优秀Javadoc – http://reactivex.io/RxJava/javadoc/index.html
翻译自: https://www.javacodegeeks.com/2015/03/hot-and-cold-rx-java-observable.html
冷热复位