冷热复位_冷热rx-java可观察

冷热复位

我自己对“热和冷可观测”的理解还很不稳定,但这是我到目前为止所了解的!

冷观测

考虑一个返回rx-java Observable的API:

import obs.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;public class Service1 {private static final Logger logger = LoggerFactory.getLogger(Service1.class);public Observable<String> operation() {return Observable.<String>create(s -> {logger.info("Start: Executing slow task in Service 1");Util.delay(1000);s.onNext("data 1");logger.info("End: Executing slow task in Service 1");s.onCompleted();}).subscribeOn(Schedulers.computation());}
}

现在,首先要注意的是,典型的Observable在订阅之前不会做任何事情:

所以基本上,如果我要这样做:

Observable<String> op1 = service1.operation();

除非通过以下方式在Observable上进行订阅,否则不会打印或返回任何内容:

Observable<String> op1 = service1.operation();CountDownLatch latch = new CountDownLatch(1);op1.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());latch.await();

因此,现在,如果此Observable上有多个订阅,会发生什么情况:

Observable<String> op1 = service1.operation();CountDownLatch latch = new CountDownLatch(3);op1.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());op1.subscribe(s -> logger.info("From Subscriber 2: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());op1.subscribe(s -> logger.info("From Subscriber 3: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());latch.await();

有了冷的可观察到的代码,代码将再次被调用并再次发出项目,这在我的机器上得到了:

06:04:07.206 [RxComputationThreadPool-2] INFO  o.b.Service1 - Start: Executing slow task in Service 1
06:04:07.208 [RxComputationThreadPool-3] INFO  o.b.Service1 - Start: Executing slow task in Service 1
06:04:08.211 [RxComputationThreadPool-2] INFO  o.b.BasicObservablesTest - From Subscriber 2: data 1
06:04:08.211 [RxComputationThreadPool-1] INFO  o.b.BasicObservablesTest - From Subscriber 1: data 1
06:04:08.211 [RxComputationThreadPool-3] INFO  o.b.BasicObservablesTest - From Subscriber 3: data 1
06:04:08.213 [RxComputationThreadPool-2] INFO  o.b.Service1 - End: Executing slow task in Service 1
06:04:08.214 [RxComputationThreadPool-1] INFO  o.b.Service1 - End: Executing slow task in Service 1
06:04:08.214 [RxComputationThreadPool-3] INFO  o.b.Service1 - End: Executing slow task in Service 1

热可观察–使用ConnectableObservable

另一方面,Hot Observable确实不需要订阅即可开始发射项目。 一种实现Hot Observable的方法是使用ConnectableObservable ,它是一个Observable,它在调用connect方法之前不会发出项目,但是一旦开始发出项目,它的任何订阅者只能从订阅点获取项目。 因此,再次回顾前面的示例,但使用ConnectableObservable代替:

Observable<String> op1 = service1.operation();ConnectableObservable<String> connectableObservable =  op1.publish();CountDownLatch latch = new CountDownLatch(3);connectableObservable.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());connectableObservable.subscribe(s -> logger.info("From Subscriber 2: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());connectableObservable.subscribe(s -> logger.info("From Subscriber 3: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());connectableObservable.connect();latch.await();

并打印以下内容:

06:07:23.852 [RxComputationThreadPool-3] INFO  o.b.Service1 - Start: Executing slow task in Service 1
06:07:24.860 [RxComputationThreadPool-3] INFO  o.b.ConnectableObservablesTest - From Subscriber 1: data 1
06:07:24.862 [RxComputationThreadPool-3] INFO  o.b.ConnectableObservablesTest - From Subscriber 2: data 1
06:07:24.862 [RxComputationThreadPool-3] INFO  o.b.ConnectableObservablesTest - From Subscriber 3: data 1
06:07:24.862 [RxComputationThreadPool-3] INFO  o.b.Service1 - End: Executing slow task in Service 1

热点可观察–使用主题

将冷的Observable转换为高温的另一种方法是使用Subject 。 主题既表现为可观察者,又表现为观察者,有不同类型的主题具有不同的行为。 在这里,我使用一个名为PublishSubject的Subject,它具有Pub / Sub行为–这些项目被发送给所有在其上监听的订阅者。 因此,随着PublishSubject的引入,代码如下所示:

Observable<String> op1 = service1.operation();PublishSubject<String> publishSubject = PublishSubject.create();op1.subscribe(publishSubject);CountDownLatch latch = new CountDownLatch(3);publishSubject.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());publishSubject.subscribe(s -> logger.info("From Subscriber 2: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());publishSubject.subscribe(s -> logger.info("From Subscriber 3: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());latch.await();

了解如何将PublishSubject作为Observable的订阅者引入,而其他订阅者则如何订阅PublishSubject。 输出将类似于ConnectableObservable的输出。

从本质上来说,这就是我对“热可观察”的理解程度。 因此,总而言之,Cold和Hot Observable之间的区别在于订户何时获得发射的项目以及何时发射项目–使用Cold Observable,它们在订阅并通常获得所有发射的项目时发射,一个Hot Observable,项目将在没有订阅服务器的情况下发出,而订阅者通常会在订阅点之后获得项目。

参考

  1. http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html
  2. Rx-java上的优秀Javadoc – http://reactivex.io/RxJava/javadoc/index.html

翻译自: https://www.javacodegeeks.com/2015/03/hot-and-cold-rx-java-observable.html

冷热复位

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/337924.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C/C++语言重要语法之输入输出

点击上方蓝字关注我&#xff0c;了解更多咨询C语言是一种编译式的、通用的、大小写敏感的编程语言&#xff0c;完全支持面向对象开发。基本的输入输出cin和cout在C语言中&#xff0c;标准的键盘输入和屏幕输出功能分别使用scanf()和printf()两个函数实现。在C语言中&#xff0c…

mysql int number_Oracle/MySQL decimal/int/number 转字符串

有时客户需要流水数据&#xff0c;当导出为excel的时候&#xff0c;客户编号等很长数字的栏位&#xff0c;被excel变成科学记数法&#xff0c;无法正常查看。因此&#xff0c;需要将Oracle/MySQL中的decimal/int 转 varchar&#xff0c;这样在excel中就可以放心查看了。Oracle的…

C语言的“递归函数”这么难理解,为什么不丢弃它呢?

点击上方蓝字关注我&#xff0c;了解更多咨询变量就是在程序运行期间其值可以变化的量。每个变量都属于一种类型&#xff0c;每种类型都定义了变量的格式和行为。因此&#xff0c;一个变量应该有属于自己的名称&#xff0c;并且在内存中占有存储空间&#xff0c;其中&#xff0…

apache lucene_Apache Lucene的结构

apache lucene无可估量的高贵的Apache软件基金会&#xff08;Apache Software Foundation&#xff09;产生了许多巨大的产品&#xff08;Ant&#xff0c;CouchDB&#xff0c;Hadoop&#xff0c;JMeter&#xff0c;Maven&#xff0c;OpenOffice&#xff0c;Subversion等&#xf…

mysql 排序 过滤_【MYSQL】-3 排序与过滤

上周加入数据蛙二期培训&#xff0c;结束了孤独战斗的现状。断断续续自学了3个月(当然看了各种视频和各种书&#xff0c;一把辛酸泪。。。)&#xff0c;现在选择报班&#xff0c;主要还是觉得一个靠谱的组织和团队&#xff0c;可以极大缓解我学习过程中不时闪现的焦虑和无助&am…

构造函数 构造代码块_构造函数必须没有代码

构造函数 构造代码块构造函数中应完成多少工作&#xff1f; 在构造函数内部进行一些计算然后封装结果似乎是合理的。 这样&#xff0c;当对象方法需要结果时&#xff0c;我们将准备好它们。 听起来是个好方法&#xff1f; 不&#xff0c;这不对。 这是一个坏主意&#xff0c;原…

C语言按位逻辑运算符总结-与、或、非、异或

点击上方蓝字关注我&#xff0c;了解更多咨询C中有按位逻辑运算符&#xff1a;按位取反、按位与、按位或、按位异或。这4个运算符可以用于整型&#xff0c;包括char类型。按位操作针对每一个位进行操作&#xff0c;不影响左右两边的位。4个运算符的作用总结如下&#xff1a;一、…

java quartz没执行完_quartz 防止上一任务未执行完毕,下一时间点重复执行

/*** 订单监控类* 定时扫描所有待付款订单&#xff0c;超时自动取消* Created by huangbaidong* 2017/3/29.*/Componentpublic classOrderMonitorJob {ResourceprivateRedisUtil redisUtil;ResourceprivateBsdOrderService bsdOrderService;ResourceprivateBsdDFKOrderCacheMan…

th:each嵌套_难题:嵌套的computeIfAbsent

th:each嵌套总览 Java 8库在地图上有一个新方法&#xff0c;computeIfAbsent。 这是将地图转换为与键关联的对象的缓存的非常有用的方法。 但是&#xff0c;您可能没有考虑过一种组合。 如果您在内部调用computeIfAbsent会发生什么。 map.computeIfAbsent(Key.Hello, s ->…

java图论_玩转算法系列--图论精讲 面试升职必备(Java版)

第1章 和bobo老师一起&#xff0c;玩转图论算法欢迎大家来到我的新课程&#xff1a;《玩转图论算法》。在这个课程中&#xff0c;我们将一起完整学习图论领域的经典算法&#xff0c;培养大家的图论建模能力。通过这个课程的学习&#xff0c;你将能够真正地&#xff0c;玩转图论…

C语言的本质——位运算

点击上方蓝字关注我&#xff0c;了解更多咨询位运算是指按二进制进行的运算。在系统软件中&#xff0c;常常需要处理二进制位的问题。C语言提供了6个位操作运算符。这些运算符只能用于整型操作数&#xff0c;即只能用于带符号或无符号的char,short,int与long类型。C语言提供的位…

servlet异步_关于Servlet和异步Servlet

servlet异步Servlet API是Java EE标准的一部分&#xff0c;自1998年正式发布2.1规范以来&#xff0c;一直是基于Java的企业体系结构的重要组成部分。 它是一种自以为是的API&#xff0c;用于服务围绕一些基本概念构建的请求/响应协议&#xff1a; 兼容的容器 &#xff0c;这是…

Java创新型模式_java设计模式--创建型模式(一)

2016-04-24 10:10:34创建型模式&#xff1a;工厂方法模式、抽象工厂模式、单例模式、建造者模式、原型模式注意&#xff1a;工厂模式可以分为三类&#xff1a; 1)简单工厂模式(Simple Factory) 2)工厂方法模式(Factory Method) 3)抽象工厂模式(Abstract Factory)这三种模式从上…

原来这就是C语言的基本结构—循环结构?!

点击上方蓝字关注我&#xff0c;了解更多咨询今天我们就着重说说循环结构。循环结构分为三种&#xff0c;分别是for、while、dowhile;我们首先说第一种&#xff1a;for循环..他的代码格式为&#xff1a;for(判断的数值初始化;判断条件;改变判断数值大小){循环语句块&#xff1b…

可视化编码_编码:可视化位图

可视化编码在过去的一个月左右的时间里&#xff0c;我每天花费一些时间来阅读Neo4j代码库的新部分&#xff0c;以使其更加熟悉&#xff0c;而我最喜欢的类之一是Bits类&#xff0c;该类可以完成所有底层工作&#xff0c;到磁盘。 特别是&#xff0c;我喜欢它的toString方法&am…

java count 在哪一类里_java 5线程中 Semaphore信号灯,CyclicBarrier类,CountDownLatch计数器以及Exchanger类使用...

先来讲解一下Semaphore信号灯的作用:可以维护当前访问自身的线程个数&#xff0c;并提供了同步机制&#xff0c;使用semaphore可以控制同时访问资源的线程个数例如&#xff0c;实现一个文件允许的并发访问数。请看下面的演示代码:1 public classSemaphoreTest2 {3 public stati…

spring aop示例_Spring查找方法示例

spring aop示例当一个bean依赖于另一个bean时&#xff0c;我们使用setter属性或通过构造函数注入bean。 getter方法将向我们返回已设置的引用&#xff0c;但是假设您每次调用getter方法时都想要一个依赖bean的新实例&#xff0c;那么您可能将不得不采用另一种方法。 在本文中…

java hive查询_java程序调用hive查询的一个异常

最近在java程序中调用hive做查询时&#xff0c;碰到一个异常&#xff0c;被困扰了许久&#xff0c;经过几番调试&#xff0c;逐步把问题定位清楚。在异常描述前先给出异常信息&#xff1a;java.sql.SQLException: Error while processing statement: FAILED: Execution Error, …

C/C++入门易错点及常用小技巧

点击上方蓝字关注我&#xff0c;了解更多咨询C语言诞生至今已有30多个年头了&#xff0c;主要集中在需要运行效率比较高的行业&#xff0c;比如现在的游戏开发以及高效服务器等等。C学习难度比其它语言都要高&#xff0c;这是不可否认的&#xff0c;其学习难度主要在于它的复杂…

quasar_Quasar和Akka –比较

quasaractor模型是用于容错和高度可扩展系统的设计模式。 角色是独立的工作程序模块&#xff0c;它们仅通过消息传递与其他角色进行通信&#xff0c;可以与其他角色隔离而失败&#xff0c;但是可以监视其他角色的故障并在发生这种情况时采取一些恢复措施。 角色是简单&#xff…