java rx.observable_Rxjava2 Observable的条件操作符详解及实例

简要:

需求了解:

在使用 Rxjava 开发中,经常有一些各种条件的操作 ,如比较两个 Observable 谁先发射了数据、跳过指定条件的 Observable 等一系列的条件操作需求,那么很幸运, Rxjava 中已经有了很多条件操作符,一起来了解一下吧。

下面列出了一些Rxjava的用于条件操作符:

Amb:给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据。

DefaultIfEmpty:发射来自原始Observable的值,如果原始 Observable 没有发射任何数据项,就发射一个默认值。

SwitchIfEmpty:如果原始Observable没有发射数据时,发射切换一个指定的Observable继续发射数据。

SkipUntil:丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据。

SkipWhile:丢弃原始 Observable 发射的数据,直到一个特定的条件为假,然后发射原始 Observable 剩余的数据。

TakeUntil:发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知。

1. Amb

给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据。

f40a4327a2c2adba4fc950a71788803f.png

解析: 对多个Observable进行监听,首先发射通知(包括数据)的Observable将会被观察者观察,发射这个Observable的所有数据。

示例代码:

// 创建Observable

Observable delayObservable = Observable.range(1, 5)

.delay(100, TimeUnit.MILLISECONDS); // 延迟100毫秒发射数据

Observable rangeObservable = Observable.range(6, 5);

// 创建Observable的集合

ArrayList> list = new ArrayList<>();

list.add(delayObservable);

list.add(rangeObservable);

// 创建Observable的数组

Observable[] array = new Observable[2];

array[0] = delayObservable;

array[1] = rangeObservable;

/**

* 1. ambWith(ObservableSource extends T> other)

* 与另外一个Observable比较,只发射首先发射通知的Observable的数据

*/

rangeObservable.ambWith(delayObservable)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("--> accept(1): " + integer);

}

});

System.in.read();

System.out.println("------------------------------------------------");

/**

* 2. amb(Iterable extends ObservableSource extends T>> sources)

* 接受一个Observable类型的集合, 只发射集合中首先发射通知的Observable的数据

*/

Observable.amb(list)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("--> accept(2): " + integer);

}

});

System.in.read();

System.out.println("------------------------------------------------");

/**

* 3. ambArray(ObservableSource extends T>... sources)

* 接受一个Observable类型的数组, 只发射数组中首先发射通知的Observable的数据

*/

Observable.ambArray(array)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("--> accept(3): " + integer);

}

});

System.in.read();

输出:

--> accept(1): 6

--> accept(1): 7

--> accept(1): 8

--> accept(1): 9

--> accept(1): 10

------------------------------------------------

--> accept(2): 6

--> accept(2): 7

--> accept(2): 8

--> accept(2): 9

--> accept(2): 10

------------------------------------------------

--> accept(3): 6

--> accept(3): 7

--> accept(3): 8

--> accept(3): 9

--> accept(3): 10

2. DefaultIfEmpty

发射来自原始Observable的值,如果原始 Observable 没有发射数据项,就发射一个默认值。

b41b8f8b7cce3d8f01c94ea216fec477.png

解析: DefaultIfEmpty 简单的精确地发射原始Observable的值,如果原始Observable没有发射任何数据正常终止(以 onCompleted 的形式), DefaultIfEmpty 返回的Observable就发射一个你提供的默认值。如果你需要发射更多的数据,或者切换备用的Observable,你可以考虑使用 switchIfEmpty 操作符 。

示例代码:

/**

* defaultIfEmpty(@NotNull T defaultItem)

* 如果原始Observable没有发射任何数据正常终止(以 onCompleted 的形式),

* DefaultIfEmpty 返回的Observable就发射一个你提供的默认值defaultItem。

*/

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onComplete(); // 不发射任何数据,直接发射完成通知

}

}).defaultIfEmpty("No Data emitter!!!")

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(String s) {

System.out.println("--> onNext: " + s);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

输出:

--> onSubscribe

--> onNext: No Data emitter!!!

--> onComplete

3. SwitchIfEmpty

如果原始Observable没有发射数据时,发射切换一个指定的Observable继续发射数据。

9d017aaf3a3e11f5fe727c06510ba824.png

解析: 如果原始 Observable 没有发射数据时,发射切换指定的 other 继续发射数据。

示例代码:

/**

* switchIfEmpty(ObservableSource other)

* 如果原始Observable没有发射数据时,发射切换指定的other继续发射数据

*/

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onComplete(); // 不发射任何数据,直接发射完成通知

}

}).switchIfEmpty(Observable.just(888)) // 如果原始Observable没有发射数据项,默认发射备用的Observable,发射数据项888

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(Integer integer) {

System.out.println("--> onNext: " + integer);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

输出:

--> onSubscribe

--> onNext: 888

--> onComplete

4. SkipUntil

丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据。

94fa5f808fd92f5bcd361b647a5b7952.png

示例代码:

/**

* skipUntil(ObservableSource other)

* 丢弃原始Observable发射的数据,直到other发射了一个数据,然后发射原始Observable的剩余数据。

*/

Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)

// 丢弃2000毫秒的原始Observable发射的数据,接受后面的剩余部分数据

.skipUntil(Observable.timer(2000, TimeUnit.MILLISECONDS))

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext: " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

System.in.read();

输出:

--> onSubscribe

--> onNext: 5

--> onNext: 6

--> onNext: 7

--> onNext: 8

--> onNext: 9

--> onNext: 10

--> onComplete

5. SkipWhile

丢弃原始 Observable 发射的数据,直到一个特定的条件为假,然后发射原始 Observable 剩余的数据。

5b76f7677d1af3622a1a85130ac5a679.png

示例代码:

/**

* skipWhile(Predicate super T> predicate)

* 丢弃原始 Observable 发射的数据,直到函数predicate的条件为假,然后发射原始Observable剩余的数据。

*/

Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)

.skipWhile(new Predicate() {

@Override

public boolean test(Long aLong) throws Exception {

if (aLong > 5) {

return false; // 当原始数据大于5时,发射后面的剩余部分数据

}

return true; // 丢弃原始数据项

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext: " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

System.in.read();

输出:

--> onSubscribe

--> onNext: 6

--> onNext: 7

--> onNext: 8

--> onNext: 9

--> onNext: 10

--> onComplete

6. TakeUntil

发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知。

d7cb642fcee7caaad142803632a97a30.png

6.1 takeUntil(ObservableSource other)

TakeUntil 订阅并开始发射原始 Observable,它还监视你提供的第二个 Observable。如果第二个 Observable 发射了一项数据或者发射了一个终止通知,TakeUntil 返回的 Observable 会停止发射原始 Observable 并终止。

a021e38a2dbc89b1ce6f9581d27887aa.png

解析: 第二个Observable发射一项数据或一个 onError 通知或一个 onCompleted 通知都会导致 takeUntil 停止发射数据。

示例代码:

// 创建Observable,发送数字1~10,每间隔200毫秒发射一个数据

Observable observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);

/**

* 1. takeUntil(ObservableSource other)

* 发射来自原始Observable的数据,直到other发射了一个数据或一个通知后停止发射原始Observable并终止。

*/

observable.takeUntil(Observable.timer(1000, TimeUnit.MILLISECONDS)) // 1000毫秒后停止发射原始数据

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe(1)");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext(1): " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError(1): " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete(1)");

}

});

System.in.read();

输出:

--> onSubscribe(1)

--> onNext(1): 1

--> onNext(1): 2

--> onNext(1): 3

--> onNext(1): 4

--> onNext(1): 5

--> onComplete(1)

6.2 takeUntil(Predicate stopPredicate)

每次发射数据后,通过一个谓词函数来判定是否需要终止发射数据。

9824fe7a3dd6238365c5f5df7e43c42e.png

解析: 每次发射数据后,通过一个谓词函数 stopPredicate 来判定是否需要终止发射数据,如果 stopPredicate 返回 true 怎表示停止发射原始Observable后面的数据,否则继续发射后面的数据。

示例代码:

/**

* 2. takeUntil(Predicate super T> stopPredicate)

* 每次发射数据后,通过一个谓词函数stopPredicate来判定是否需要终止发射数据

* 如果stopPredicate返回true怎表示停止发射后面的数据,否则继续发射后面的数据

*/

observable.takeUntil(new Predicate() {

@Override

public boolean test(Long aLong) throws Exception { // 函数返回false则为继续发射原始数据,true则停止发射原始数据

if(aLong > 5){

return true; // 满足条件后,停止发射数据

}

return false; // 继续发射数据

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe(2)");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext(2): " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError(2): " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete(2)");

}

});

System.in.read();

输出:

--> onSubscribe(2)

--> onNext(2): 1

--> onNext(2): 2

--> onNext(2): 3

--> onNext(2): 4

--> onNext(2): 5

--> onNext(2): 6

--> onComplete(2)

7. TakeWhile

发射原始Observable的数据,直到一个特定的条件,然后跳过剩余的数据。

b861d4b550206b343bb41b49b759ba77.png

解析: 发射原始 Observable 的数据,直到 predicate 的条件为 false ,然后跳过剩余的数据。

示例代码:

// 创建Observable,发送数字1~10,每间隔200毫秒发射一个数据

Observable observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);

/**

* takeWhile(Predicate predicate)

* 发射原始Observable的数据,直到predicate的条件为false,然后跳过剩余的数据

*/

observable.takeWhile(new Predicate() {

@Override

public boolean test(Long aLong) throws Exception { // 函数返回值决定是否继续发射后续的数据

if(aLong > 5){

return false; // 满足条件后跳过后面的数据

}

return true; // 继续发射数据

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext: " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

System.in.read();

输出:

--> onSubscribe(1)

--> onNext(1): 1

--> onNext(1): 2

--> onNext(1): 3

--> onNext(1): 4

--> onNext(1): 5

--> onComplete(1)

小结

本节主要介绍了Rxjava条件操作符可以根据不同的条件进行数据的发射,变换等相关行为。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/356396.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux poll 和 select 机制

poll select 介绍 使用非阻塞 I/O 的应用程序常常使用 poll, select, 和 epoll 系统调用. poll, select 和 epoll 本质上有相同的功能: 每个允许一个进程来决定它是否可读或者写一个 或多个文件而不阻塞. 这些调用也可阻塞进程直到任何一个给定集合的文件描述符可用来 读或写.…

hprof 不大 泄露_HPROF –内存泄漏分析教程

hprof 不大 泄露本文将为您提供有关如何通过生成和分析Sun HotSpot JVM HPROF堆转储文件来分析JVM内存泄漏问题的教程。 一个现实的案例研究将用于此目的&#xff1a;Weblogic 9.2内存泄漏影响Weblogic Admin服务器。 环境规格 Java EE服务器&#xff1a;Oracle Weblogic Ser…

投资银行对Java进行的二十大核心面试问答

这是在金融领域&#xff08;主要是大型投资银行&#xff09;共享Java核心访谈问题和答案的新系列。 在JP Morgan&#xff0c;Morgan Stanley&#xff0c;Barclays或Goldman Sachs上会问许多这些Java面试问题。 银行主要从多线程 &#xff0c; 集合 &#xff0c;序列化&#xff…

java 支付重复问题_Airbnb支付系统如何在分布式环境下避免重复打款

原文链接&#xff1a;https://medium.com/airbnb-engineering/avoiding-double-payments-in-a-distributed-payments-system-2981f6b070bbAirbnb一直在将其基础架构迁移到面向服务的体系结构(SOA)。 SOA具有许多优势&#xff0c;例如使开发人员能够专业化并具有更快迭代的能力。…

myeclipse配置java8_MyEclipse 2017配置Tomcat8

MyEclipse 2017的配置其实跟MyEclipse 2015的比较一致&#xff0c;再往之前的就有出入了。不得不吐槽其卡&#xff0c;跟Eclipse不在一个量级上呀。。。一、配置转自&#xff1a;http://www.xuexila.com/diannao/diy/2788109.html工具/原料myeclipse 2017tomcat8 服务器方法/步…

MySQL在Django框架下的基本操作(MySQL在Linux下配置)

注&#xff1a;本文已迁移至CSDN&#xff0c;后续的更新也会在CSDN。 http://blog.csdn.net/houchaoqun_xmu/article/details/53813633 http://blog.csdn.net/houchaoqun_xmu 【原】本文根据实际操作主要介绍了Django框架下MySQL的一些常用操作&#xff0c;核心内容如下&#x…

Clean Sheet – Windows 10的人体工程学Eclipse主题

Clean Sheet是适用于Windows 10的符合人体工程学的Eclipse主题。它基于干净&#xff0c;低眩光的外观和感觉&#xff0c;旨在减轻视觉疲劳和眼睛疲劳。 它融合了均衡的颜色选择&#xff0c;可以突出谐波语法并注重可读性。 除自定义滚动条外&#xff0c;它还努力满足现代美学要…

CentOS上安装MyCat-MySQL

1、安装JDK&#xff0c;要求JDK7以上。 2、下载MyCat&#xff0c;地址。 3、解压Mycat-server-1.6-RELEASE-20161028204710-linux.tar.gz&#xff0c;到usr/local/Mycat目录下。 4、添加用户Mycat。 [rootlocalhost Desktop]# useradd Mycat [rootlocalhost Desktop]# passwd M…

Java中的Redis的思维导图_Redis思维导图

常见相关问题Redis 有哪些功能&#xff1f;数据缓存功能分布式锁的功能支持数据持久化支持事务支持消息队列Redis 为什么是单线程的&#xff1f;因为 cpu 不是 Redis 的瓶颈&#xff0c;Redis 的瓶颈最有可能是机器内存或者网络带宽。既然单线程容易实现&#xff0c;而且 cpu 又…

在Java EE应用程序中实现自动重试

最初&#xff0c;我想将此博客称为“ 具有拦截器驱动的重试策略的灵活超时 ”&#xff0c;但后来我认为它太“繁重”。 该声明以及修改后的标题应该&#xff08;希望&#xff09;使您了解此帖子可能谈论的内容;-) 触发 这篇文章主要由我在较早的一篇文章中收到的评论/问题之一…

2016-12-17 新浪博客服务器挂掉了,所有博客页面都无法打开

今天&#xff08;2016-12-17 10:20&#xff09;早晨从百度上检索到一篇新浪博客文章&#xff0c;点进去之后显示无法访问此网址&#xff0c;如下截图 去新浪博客主页看了看&#xff0c;主页是可以打开的 但是点进去任何一篇文章&#xff0c;都无法打开此页面 应该是新浪博客的服…

php版本7历史,php的版本发展历史(1995-2020)

PHP一直作为Web开发中的统治力量而存在&#xff0c;在WEB服务端开发领域&#xff0c;全球份额始终保持在78%以上。PHP快速&#xff0c;非常强大&#xff0c;生态好&#xff0c;而且免费&#xff0c;是一个为WEB而生的编程语言&#xff0c;自从诞生起PHP就被大多数开发者称为世界…

netbeans7.4_NetBeans 7.2 beta:更快,更有用

netbeans7.4NetBeans 7.2的beta版本引起了极大的兴奋。 在本文中&#xff0c;我将简要介绍一下此版本令人兴奋的原因&#xff08;包括更好的性能&#xff0c;提供更多的提示以及集成FindBugs&#xff09;。 NetBeans 7.2 beta在典型的下载捆绑软件中可用&#xff0c;从较小的Ja…

C-Free 5.0编译失败问题解决办法

解决关于C-Free 5.0编译时提示错误&#xff1a;[Error] undefined reference to __dyn_tls_init_callback 解决办法&#xff1a; 因为错误提示的路径是C:\MinGW\..... 首先想到的是编译器出问题了&#xff0c;因为我在安装完C-Free 5.0后&#xff0c;重新装过MinGW编译器&#…

用Cucumber JVM编写BDD测试

Cucumber JVM是编写BDD测试的出色工具。在本文中&#xff0c;我想对Cucumber JVM的BDD进行介绍。 让我们开始吧… 什么是BDD&#xff1f; 简而言之&#xff0c;BDD试图解决“通过示例理解需求”的问题 BDD工具 有许多用于BDD的工具&#xff0c;有趣的是&#xff0c;您可以在…

计算器界面网格布局java,安卓案例:网格布局实现计算器界面

安卓案例&#xff1a;网格布局实现计算器界面一、网格布局(GridLayout)GridLayout布局使用虚细线将布局划分为行、列和单元格&#xff0c;也支持一个控件在行、列上都有交错排列。(一)继承关系图(二)常用属性1、针对布局的属性(1)rowCount&#xff1a;行数(2)columnCount&#…

如何让多文本内容只显示一行,其余用省略号来显示

在此需要用到三个属性配合使用&#xff1a; overflow: hidden;/*超出部分隐藏*/white-space: nowrap;/*不换行*/text-overflow:ellipsis;/*超出部分省略号显示*/实例&#xff1a;<p> 兴高采烈的破蛹 重获新生的冲动 寻找爱情世界 美梦  既然不是毛毛虫 就要壮烈的扑火 …

php正则替换p闭合标签,php正则替换标签的实现方法

php正则替换标签的实现方法&#xff1a;首先通过“strip_tags”函数剥去字符串中的HTML标签&#xff1b;然后利用正则表达式替换标签&#xff0c;代码语句如“pregreplace("/s/","",strpregreplace("/s/","")”。推荐&#xff1a;《PH…

POJ1201 区间

题目大意&#xff1a; 给定n个整数区间[ai,bi]和n个整数ci,求一个最小集合Z&#xff0c;满足|Z∩[ai,bi]|>ci(Z里边在闭区间[ai,bi]的个数不小于ci)。 多组数据&#xff1a; n&#xff08;1<n<50000&#xff09;区间的个数 n行&#xff1a; ai bi ci(0<ai<bi<…

vue.js 2.x 能否设置某个组件不被keep-alive 的解决方案

开发项目的时候&#xff0c;使用的是 vue 2.x 版本 搭配路由切换 vue-router &#xff0c;但是很多时候有些页面需要被缓存&#xff0c;有些页面不需要缓存&#xff0c;那么该如何进行一个处理 这里我使用了一个方案来解决这个问题 : 首先在入口页面进行全部缓存标签 即 <ke…