java rx.observable_Rxjava2 Observable的条件操作符详解及实例

简要:

需求了解:

在使用 Rxjava 开发中,经常有一些各种条件的操作 ,如比较两个 Observable 谁先发射了数据、跳过指定条件的 Observable 等一系列的条件操作需求,那么很幸运, Rxjava 中已经有了很多条件操作符,一起来了解一下吧。

下面列出了一些Rxjava的用于条件操作符:

Amb:给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据。

DefaultIfEmpty:发射来自原始Observable的值,如果原始 Observable 没有发射任何数据项,就发射一个默认值。

SwitchIfEmpty:如果原始Observable没有发射数据时,发射切换一个指定的Observable继续发射数据。

SkipUntil:丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据。

SkipWhile:丢弃原始 Observable 发射的数据,直到一个特定的条件为假,然后发射原始 Observable 剩余的数据。

TakeUntil:发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知。

1. Amb

给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据。

f40a4327a2c2adba4fc950a71788803f.png

解析: 对多个Observable进行监听,首先发射通知(包括数据)的Observable将会被观察者观察,发射这个Observable的所有数据。

示例代码:

// 创建Observable

Observable delayObservable = Observable.range(1, 5)

.delay(100, TimeUnit.MILLISECONDS); // 延迟100毫秒发射数据

Observable rangeObservable = Observable.range(6, 5);

// 创建Observable的集合

ArrayList> list = new ArrayList<>();

list.add(delayObservable);

list.add(rangeObservable);

// 创建Observable的数组

Observable[] array = new Observable[2];

array[0] = delayObservable;

array[1] = rangeObservable;

/**

* 1. ambWith(ObservableSource extends T> other)

* 与另外一个Observable比较,只发射首先发射通知的Observable的数据

*/

rangeObservable.ambWith(delayObservable)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("--> accept(1): " + integer);

}

});

System.in.read();

System.out.println("------------------------------------------------");

/**

* 2. amb(Iterable extends ObservableSource extends T>> sources)

* 接受一个Observable类型的集合, 只发射集合中首先发射通知的Observable的数据

*/

Observable.amb(list)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("--> accept(2): " + integer);

}

});

System.in.read();

System.out.println("------------------------------------------------");

/**

* 3. ambArray(ObservableSource extends T>... sources)

* 接受一个Observable类型的数组, 只发射数组中首先发射通知的Observable的数据

*/

Observable.ambArray(array)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("--> accept(3): " + integer);

}

});

System.in.read();

输出:

--> accept(1): 6

--> accept(1): 7

--> accept(1): 8

--> accept(1): 9

--> accept(1): 10

------------------------------------------------

--> accept(2): 6

--> accept(2): 7

--> accept(2): 8

--> accept(2): 9

--> accept(2): 10

------------------------------------------------

--> accept(3): 6

--> accept(3): 7

--> accept(3): 8

--> accept(3): 9

--> accept(3): 10

2. DefaultIfEmpty

发射来自原始Observable的值,如果原始 Observable 没有发射数据项,就发射一个默认值。

b41b8f8b7cce3d8f01c94ea216fec477.png

解析: DefaultIfEmpty 简单的精确地发射原始Observable的值,如果原始Observable没有发射任何数据正常终止(以 onCompleted 的形式), DefaultIfEmpty 返回的Observable就发射一个你提供的默认值。如果你需要发射更多的数据,或者切换备用的Observable,你可以考虑使用 switchIfEmpty 操作符 。

示例代码:

/**

* defaultIfEmpty(@NotNull T defaultItem)

* 如果原始Observable没有发射任何数据正常终止(以 onCompleted 的形式),

* DefaultIfEmpty 返回的Observable就发射一个你提供的默认值defaultItem。

*/

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onComplete(); // 不发射任何数据,直接发射完成通知

}

}).defaultIfEmpty("No Data emitter!!!")

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(String s) {

System.out.println("--> onNext: " + s);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

输出:

--> onSubscribe

--> onNext: No Data emitter!!!

--> onComplete

3. SwitchIfEmpty

如果原始Observable没有发射数据时,发射切换一个指定的Observable继续发射数据。

9d017aaf3a3e11f5fe727c06510ba824.png

解析: 如果原始 Observable 没有发射数据时,发射切换指定的 other 继续发射数据。

示例代码:

/**

* switchIfEmpty(ObservableSource other)

* 如果原始Observable没有发射数据时,发射切换指定的other继续发射数据

*/

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onComplete(); // 不发射任何数据,直接发射完成通知

}

}).switchIfEmpty(Observable.just(888)) // 如果原始Observable没有发射数据项,默认发射备用的Observable,发射数据项888

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(Integer integer) {

System.out.println("--> onNext: " + integer);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

输出:

--> onSubscribe

--> onNext: 888

--> onComplete

4. SkipUntil

丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据。

94fa5f808fd92f5bcd361b647a5b7952.png

示例代码:

/**

* skipUntil(ObservableSource other)

* 丢弃原始Observable发射的数据,直到other发射了一个数据,然后发射原始Observable的剩余数据。

*/

Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)

// 丢弃2000毫秒的原始Observable发射的数据,接受后面的剩余部分数据

.skipUntil(Observable.timer(2000, TimeUnit.MILLISECONDS))

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext: " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

System.in.read();

输出:

--> onSubscribe

--> onNext: 5

--> onNext: 6

--> onNext: 7

--> onNext: 8

--> onNext: 9

--> onNext: 10

--> onComplete

5. SkipWhile

丢弃原始 Observable 发射的数据,直到一个特定的条件为假,然后发射原始 Observable 剩余的数据。

5b76f7677d1af3622a1a85130ac5a679.png

示例代码:

/**

* skipWhile(Predicate super T> predicate)

* 丢弃原始 Observable 发射的数据,直到函数predicate的条件为假,然后发射原始Observable剩余的数据。

*/

Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)

.skipWhile(new Predicate() {

@Override

public boolean test(Long aLong) throws Exception {

if (aLong > 5) {

return false; // 当原始数据大于5时,发射后面的剩余部分数据

}

return true; // 丢弃原始数据项

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext: " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

System.in.read();

输出:

--> onSubscribe

--> onNext: 6

--> onNext: 7

--> onNext: 8

--> onNext: 9

--> onNext: 10

--> onComplete

6. TakeUntil

发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知。

d7cb642fcee7caaad142803632a97a30.png

6.1 takeUntil(ObservableSource other)

TakeUntil 订阅并开始发射原始 Observable,它还监视你提供的第二个 Observable。如果第二个 Observable 发射了一项数据或者发射了一个终止通知,TakeUntil 返回的 Observable 会停止发射原始 Observable 并终止。

a021e38a2dbc89b1ce6f9581d27887aa.png

解析: 第二个Observable发射一项数据或一个 onError 通知或一个 onCompleted 通知都会导致 takeUntil 停止发射数据。

示例代码:

// 创建Observable,发送数字1~10,每间隔200毫秒发射一个数据

Observable observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);

/**

* 1. takeUntil(ObservableSource other)

* 发射来自原始Observable的数据,直到other发射了一个数据或一个通知后停止发射原始Observable并终止。

*/

observable.takeUntil(Observable.timer(1000, TimeUnit.MILLISECONDS)) // 1000毫秒后停止发射原始数据

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe(1)");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext(1): " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError(1): " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete(1)");

}

});

System.in.read();

输出:

--> onSubscribe(1)

--> onNext(1): 1

--> onNext(1): 2

--> onNext(1): 3

--> onNext(1): 4

--> onNext(1): 5

--> onComplete(1)

6.2 takeUntil(Predicate stopPredicate)

每次发射数据后,通过一个谓词函数来判定是否需要终止发射数据。

9824fe7a3dd6238365c5f5df7e43c42e.png

解析: 每次发射数据后,通过一个谓词函数 stopPredicate 来判定是否需要终止发射数据,如果 stopPredicate 返回 true 怎表示停止发射原始Observable后面的数据,否则继续发射后面的数据。

示例代码:

/**

* 2. takeUntil(Predicate super T> stopPredicate)

* 每次发射数据后,通过一个谓词函数stopPredicate来判定是否需要终止发射数据

* 如果stopPredicate返回true怎表示停止发射后面的数据,否则继续发射后面的数据

*/

observable.takeUntil(new Predicate() {

@Override

public boolean test(Long aLong) throws Exception { // 函数返回false则为继续发射原始数据,true则停止发射原始数据

if(aLong > 5){

return true; // 满足条件后,停止发射数据

}

return false; // 继续发射数据

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe(2)");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext(2): " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError(2): " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete(2)");

}

});

System.in.read();

输出:

--> onSubscribe(2)

--> onNext(2): 1

--> onNext(2): 2

--> onNext(2): 3

--> onNext(2): 4

--> onNext(2): 5

--> onNext(2): 6

--> onComplete(2)

7. TakeWhile

发射原始Observable的数据,直到一个特定的条件,然后跳过剩余的数据。

b861d4b550206b343bb41b49b759ba77.png

解析: 发射原始 Observable 的数据,直到 predicate 的条件为 false ,然后跳过剩余的数据。

示例代码:

// 创建Observable,发送数字1~10,每间隔200毫秒发射一个数据

Observable observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);

/**

* takeWhile(Predicate predicate)

* 发射原始Observable的数据,直到predicate的条件为false,然后跳过剩余的数据

*/

observable.takeWhile(new Predicate() {

@Override

public boolean test(Long aLong) throws Exception { // 函数返回值决定是否继续发射后续的数据

if(aLong > 5){

return false; // 满足条件后跳过后面的数据

}

return true; // 继续发射数据

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext: " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

System.in.read();

输出:

--> onSubscribe(1)

--> onNext(1): 1

--> onNext(1): 2

--> onNext(1): 3

--> onNext(1): 4

--> onNext(1): 5

--> onComplete(1)

小结

本节主要介绍了Rxjava条件操作符可以根据不同的条件进行数据的发射,变换等相关行为。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/356396.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux poll 和 select 机制

poll select 介绍 使用非阻塞 I/O 的应用程序常常使用 poll, select, 和 epoll 系统调用. poll, select 和 epoll 本质上有相同的功能: 每个允许一个进程来决定它是否可读或者写一个 或多个文件而不阻塞. 这些调用也可阻塞进程直到任何一个给定集合的文件描述符可用来 读或写.…

hprof 不大 泄露_HPROF –内存泄漏分析教程

hprof 不大 泄露本文将为您提供有关如何通过生成和分析Sun HotSpot JVM HPROF堆转储文件来分析JVM内存泄漏问题的教程。 一个现实的案例研究将用于此目的&#xff1a;Weblogic 9.2内存泄漏影响Weblogic Admin服务器。 环境规格 Java EE服务器&#xff1a;Oracle Weblogic Ser…

java枚举怎么编译不行的_java枚举类型

public classTestEnum {/*最普通的枚举*/public enumColorSelect {red, green, yellow, blue;}/*枚举也可以象一般的类一样添加方法和属性,你可以为它添加静态和非静态的属性或方法,这一切都象你在一般的类中做的那样.*/public enumSeason {//枚举列表必须写在最前面&#xff0…

eclipse光标变成黑块变粗解决办法

就是按下了键盘的insert按键转载于:https://www.cnblogs.com/panxuejun/p/6170717.html

投资银行对Java进行的二十大核心面试问答

这是在金融领域&#xff08;主要是大型投资银行&#xff09;共享Java核心访谈问题和答案的新系列。 在JP Morgan&#xff0c;Morgan Stanley&#xff0c;Barclays或Goldman Sachs上会问许多这些Java面试问题。 银行主要从多线程 &#xff0c; 集合 &#xff0c;序列化&#xff…

java 支付重复问题_Airbnb支付系统如何在分布式环境下避免重复打款

原文链接&#xff1a;https://medium.com/airbnb-engineering/avoiding-double-payments-in-a-distributed-payments-system-2981f6b070bbAirbnb一直在将其基础架构迁移到面向服务的体系结构(SOA)。 SOA具有许多优势&#xff0c;例如使开发人员能够专业化并具有更快迭代的能力。…

用注解方式写定时任务

spring里加上 </context:component-scan><!-- 任务自动扫描 --><task:annotation-driven/><!-- 扫描位置 --><context:annotation-config/> <context:component-scan base-package"com.xxx.xxx"/>然后在类上面添加注解 Compon…

myeclipse配置java8_MyEclipse 2017配置Tomcat8

MyEclipse 2017的配置其实跟MyEclipse 2015的比较一致&#xff0c;再往之前的就有出入了。不得不吐槽其卡&#xff0c;跟Eclipse不在一个量级上呀。。。一、配置转自&#xff1a;http://www.xuexila.com/diannao/diy/2788109.html工具/原料myeclipse 2017tomcat8 服务器方法/步…

moxy json介绍_MOXy作为您的JAX-RS JSON提供程序–服务器端

moxy json介绍在之前的系列文章中&#xff0c;我介绍了如何利用EclipseLink JAXB&#xff08;MOXy&#xff09;创建RESTful数据访问服务。 在本文中&#xff0c;我将介绍在服务器端利用MOXy的新JSON绑定添加对基于JAXB映射的JSON消息的支持有多么容易。 MOXy作为您的JAX-RS JSO…

mysql教程清华课后答案_mysql学习之路_sql

查看数据库&#xff1a;Show databases;查看指定部分数据库&#xff1a;模糊查询Show databases like ‘patten’;--paatten是匹配模式%&#xff1a;表示是匹配模式_&#xff1a;表示匹配单个字符&#xff1b;查看数据库创建语句&#xff1a;Show create database 数据库名[库选…

那么您想做微服务吗? 请观看微服务以防万一

Bert Ertman在本次有关微服务的演示中提供了许多有效的观点。 您是否正在考虑在项目&#xff0c;团队或公司中遵循此路径&#xff1f; 请拿起咖啡&#xff0c;记事本&#xff08;如果您喜欢手写笔记&#xff09;并观看。 看到许多公司和团队确实很痛苦&#xff0c;精力浪费&…

MDX中Filter 与Exist的区别

获得一个集合&#xff0c;这个一般用来筛选出一个自定义的set&#xff0c;比如在中国的餐厅 该set返回所有MSDNteam下并且在Fact Thread度量上有记录的products 用Exists实现 select Exists( [Dim Queue].[Product - Queue].MEMBERS, [Dim Engineer].[Team - Engineer].[Team N…

java chsftp.get 追加_Java SFTP上传使用JSch,但如何覆盖当前文件?

我试图使用JSch将两个文件上传到具有SFTP的服务器.如果目录为空,则上传文件可以正常工作,但我想一遍又一遍地上传相同的文件(只需更改内部的id),但是我无法弄清楚如何做到这一点. JSch中有一些静态参数叫做OVERWRITE,但是我无法找到如何使用它.任何人都关心我如何添加这个设置&…

MySQL在Django框架下的基本操作(MySQL在Linux下配置)

注&#xff1a;本文已迁移至CSDN&#xff0c;后续的更新也会在CSDN。 http://blog.csdn.net/houchaoqun_xmu/article/details/53813633 http://blog.csdn.net/houchaoqun_xmu 【原】本文根据实际操作主要介绍了Django框架下MySQL的一些常用操作&#xff0c;核心内容如下&#x…

Clean Sheet – Windows 10的人体工程学Eclipse主题

Clean Sheet是适用于Windows 10的符合人体工程学的Eclipse主题。它基于干净&#xff0c;低眩光的外观和感觉&#xff0c;旨在减轻视觉疲劳和眼睛疲劳。 它融合了均衡的颜色选择&#xff0c;可以突出谐波语法并注重可读性。 除自定义滚动条外&#xff0c;它还努力满足现代美学要…

php mysql插入的数据有引号_php – 由于’引号’的不同,数据没有插入到mysql数据库中...

您不需要在查询中封装表,除非它们有空格或者它们是保留字.INSERT INTO lms.test2 (trn) VALUES (17)// This makes no real sense to the db. It should be:INSERT INTO lms.test2 (trn) VALUES (17)如果列trn接受数字,它应该是&#xff1a;INSERT INTO lms.test2 (trn) VALUES…

CentOS上安装MyCat-MySQL

1、安装JDK&#xff0c;要求JDK7以上。 2、下载MyCat&#xff0c;地址。 3、解压Mycat-server-1.6-RELEASE-20161028204710-linux.tar.gz&#xff0c;到usr/local/Mycat目录下。 4、添加用户Mycat。 [rootlocalhost Desktop]# useradd Mycat [rootlocalhost Desktop]# passwd M…

Java中的Redis的思维导图_Redis思维导图

常见相关问题Redis 有哪些功能&#xff1f;数据缓存功能分布式锁的功能支持数据持久化支持事务支持消息队列Redis 为什么是单线程的&#xff1f;因为 cpu 不是 Redis 的瓶颈&#xff0c;Redis 的瓶颈最有可能是机器内存或者网络带宽。既然单线程容易实现&#xff0c;而且 cpu 又…

在Java EE应用程序中实现自动重试

最初&#xff0c;我想将此博客称为“ 具有拦截器驱动的重试策略的灵活超时 ”&#xff0c;但后来我认为它太“繁重”。 该声明以及修改后的标题应该&#xff08;希望&#xff09;使您了解此帖子可能谈论的内容;-) 触发 这篇文章主要由我在较早的一篇文章中收到的评论/问题之一…

2016-12-17 新浪博客服务器挂掉了,所有博客页面都无法打开

今天&#xff08;2016-12-17 10:20&#xff09;早晨从百度上检索到一篇新浪博客文章&#xff0c;点进去之后显示无法访问此网址&#xff0c;如下截图 去新浪博客主页看了看&#xff0c;主页是可以打开的 但是点进去任何一篇文章&#xff0c;都无法打开此页面 应该是新浪博客的服…