3章 RxJava操作符

本篇文章已授权微信公众号 YYGeeker 独家发布转载请标明出处

CSDN学院课程地址

  • RxJava2从入门到精通-初级篇:edu.csdn.net/course/deta…
  • RxJava2从入门到精通-中级篇:edu.csdn.net/course/deta…
  • RxJava2从入门到精通-进阶篇:edu.csdn.net/course/deta…
  • RxJava2从入门到精通-源码分析篇:edu.csdn.net/course/deta…

3. RxJava操作符

RxJava操作符也是其精髓之一,可以通过一个简单的操作符,实现复杂的业务逻辑,甚至还可以将操作符组合起来(即RxJava的组合过程),完成更为复杂的业务需求。比如我们前面用到的.create().subscribeOn().observeOn().subscribe()都是RxJava的操作符之一,下面我们将对RxJava的操作符进行分析

掌握RxJava操作符前,首先要学会看得懂RxJava的图片,图片是RxJava主导的精髓,下面我们通过例子说明

这张图片我们先要分清楚概念上的东西,上下两行横向的直线区域代表着事件流,上面一行(上游)是我们的被观察者Observable,下面一行(下游)是我们的观察者Observer,事件流就是从上游的被观察者发送给下游的观察者的。而中间一行的flatMap区域则是我们的操作符部分,它可以对我们的数据进行变换操作。最后,数据流则是图片上的圆形、方形、菱形等区域,也是从上游流向下游的,不同的形状代表着不同的数据类型

这张图片并不是表示没有被观察者Observable,而是Create方法本身就是创建了被观察者,所以可以将被观察者的上游省略。在进行事件的onNext()分发后,执行onComplete()事件,这样就表示事件流已经结束,后续如果上游继续发事件,则下游表示不接收。当事件流的onCompleted()或者onError()正好被调用过一次后,此后就不能再调用观察者的任何其它回调方法

在理解RxJava操作符之前,需要将这几个概念弄明白,整个操作符的章节都是围绕这几个概念进行的

  • 事件流:通过发射器发射的事件,从发射事件到结束事件的过程,这一过程称为事件流
  • 数据流:通过发射器发射的数据,从数据输入到数据输出的过程,这一过程称为数据流
  • 被观察者:事件流的上游,即Observable,事件流开始的地方和数据流发射的地方
  • 观察者:事件流的下游,即Observer,事件流结束的地方和数据流接收的地方

3.1 Creating Observables (创建操作符)

1、create

Observable最原始的创建方式,创建出一个最简单的事件流,可以使用发射器发射特定的数据类型

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {e.onNext(i);}e.onComplete();}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onComplete
复制代码

2、from

创建一个事件流并发出特定类型的数据流,其发射的数据流类型有如下几个操作符

public static void main(String[] args) {Observable.fromArray(new Integer[]{1, 2, 3, 4, 5}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

3、just

just操作符和from操作符很像,只是方法的参数有所差别,它可以接受多个参数

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

4、defer

defer与just的区别是,just是直接将发射当前的数据流,而defer会等到订阅的时候,才会去执行它的call()回调,再去发射当前的数据流。复杂点的理解就是:defer操作符是将一组数据流在原有的事件流基础上缓存一个新的事件流,直到有人订阅的时候,才会创建它缓存的事件流

public static void main(String[] args) {i = 10;Observable<Integer> just = Observable.just(i, i);Observable<Object> defer = Observable.defer(new Callable<ObservableSource<?>>() {@Overridepublic ObservableSource<?> call() throws Exception {//缓存新的事件流return Observable.just(i, i);}});i = 15;just.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});defer.subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("onNext=" + (int) o);}});i = 20;defer.subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("onNext=" + (int) o);}});
}
复制代码

输出

onNext=10
onNext=10
onNext=15
onNext=15
onNext=20
onNext=20
复制代码

5、interval

interval操作符是按固定的时间间隔发射一个无限递增的整数数据流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行,interval默认在computation调度器上执行

public void interval() {Observable.interval(1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
......
复制代码

6、range

range操作符发射一个范围内的有序整数数据流,你可以指定范围的起始和长度

public static void main(String[] args) {Observable.range(1, 5).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

7、repeat

repeat操作符可以重复发送指定次数的某个事件流,repeat操作符默认在trampoline调度器上执行

public static void main(String[] args) {Observable.just(1).repeat(5).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=1
onNext=1
onNext=1
onNext=1
onNext=1
复制代码

8、timer

timer操作符可以创建一个延时的事件流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行,默认在computation调度器上执行

public void timer() {Observable.timer(5, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}
复制代码

输出

onNext=0
复制代码

9、小结

  1. create():创建最简单的事件流
  2. from():创建事件流,可发送不同类型的数据流
  3. just():创建事件流,可发送多个参数的数据流
  4. defer():创建事件流,可缓存可激活事件流
  5. interval():创建延时重复的事件流
  6. range():创建事件流,可发送范围内的数据流
  7. repeat():创建可重复次数的事件流
  8. timer():创建一次延时的事件流

补充:interval()、timer()、delay()的区别

  1. interval():用于创建事件流,周期性重复发送
  2. timer():用于创建事件流,延时发送一次
  3. delay():用于事件流中,可以延时某次事件流的发送

3.2 Transforming Observables (转换操作符)

1、map

map操作符可以将数据流进行类型转换

public static void main(String[] args) {Observable.just(1).map(new Function<Integer, String>() {@Overridepublic String apply(Integer integer) throws Exception {return "发送过来的数据会被变成字符串" + integer;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}
复制代码

输出

onNext=发送过来的数据会被变成字符串1
复制代码

2、flatMap

flatMap操作符将数据流进行类型转换,然后将新的数据流传递给新的事件流进行分发,这里通过模拟请求登录的延时操作进行说明,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void flatMap() {Observable.just(new UserParams("hensen", "123456")).flatMap(new Function<UserParams, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(UserParams userParams) throws Exception {return Observable.just(userParams.username + "登录成功").delay(2, TimeUnit.SECONDS);}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println(s);}});
}public static class UserParams {public UserParams(String username, String password) {this.username = username;this.password = password;}public String username;public String password;
}
复制代码

输出

hensen登录成功
复制代码

补充:

  • concatMap与flatMap功能一样,唯一的区别就是concatMap是有序的,flatMap是乱序的

3、groupBy

groupBy操作符可以将发射出来的数据项进行分组,并将分组后的数据项保存在具有key-value映射的事件流中。groupBy具体的分组规则由groupBy操作符传递进来的函数参数Function所决定的,它可以将key和value按照Function的返回值进行分组,返回一个具有分组规则的事件流GroupedObservable,注意这里分组出来的事件流是按照原始事件流的顺序输出的,我们可以通过sorted()对数据项进行排序,然后输出有序的数据流。

public static void main(String[] args) {Observable.just("java", "c++", "c", "c#", "javaScript", "Android").groupBy(new Function<String, Character>() {@Overridepublic Character apply(String s) throws Exception {return s.charAt(0);//按首字母分组}}).subscribe(new Consumer<GroupedObservable<Character, String>>() {@Overridepublic void accept(final GroupedObservable<Character, String> characterStringGroupedObservable) throws Exception {//排序后,直接订阅输出key和valuecharacterStringGroupedObservable.sorted().subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext= key:" + characterStringGroupedObservable.getKey() + " value:" + s);}});}});
}
复制代码

输出

onNext= key:A value:Android
onNext= key:c value:c
onNext= key:c value:c#
onNext= key:c value:c++
onNext= key:j value:java
onNext= key:j value:javaScript
复制代码

4、scan

scan操作符会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。其应用场景有简单的累加计算,判断所有数据的最小值等

public static void main(String[] args) {Observable.just(8, 2, 13, 1, 15).scan(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {return integer < integer2 ? integer : integer2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer item) throws Exception {System.out.println("onNext=" + item);}});
}
复制代码

输出

onNext=8
onNext=2
onNext=2
onNext=1
onNext=1
复制代码

5、buffer

buffer操作符可以将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据全部发射出去。如果发射出来的数据不够缓存池的大小,则按照当前发射出来的数量进行输出。如果对buffer操作符设置了skip参数,则buffer每次缓存池溢满时,会跳过指定的skip数据项,然后再进行缓存和输出。

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).buffer(5).subscribe(new Consumer<List<Integer>>() {@Overridepublic void accept(List<Integer> integers) throws Exception {System.out.println("onNext=" + integers.toString());}
});
复制代码

输出

onNext=[1, 2, 3, 4, 5]
onNext=[6, 7, 8, 9]
复制代码

6、window

window操作符和buffer操作符在功能上实现的效果是一样的,但window操作符最大区别在于同样是缓存一定数量的数据项,window操作符最终发射出来的是新的事件流integerObservable,而buffer操作符发射出来的是新的数据流,也就是说,window操作符发射出来新的事件流中的数据项,还可以经过Rxjava其他操作符进行处理。

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).window(2, 1).subscribe(new Consumer<Observable<Integer>>() {@Overridepublic void accept(Observable<Integer> integerObservable) throws Exception {integerObservable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});}});
}
复制代码

输出

onNext=1
onNext=2
onNext=2
onNext=3
onNext=3
onNext=4
onNext=4
onNext=5
onNext=5
onNext=6
onNext=6
onNext=7
onNext=7
onNext=8
onNext=8
onNext=9
onNext=9
复制代码

7、小结

  1. map():对数据流的类型进行转换
  2. flatMap():对数据流的类型进行包装成另一个数据流
  3. groupby():对所有的数据流进行分组
  4. scan():对上一轮处理过后的数据流进行函数处理
  5. buffer():缓存发射的数据流到一定数量,随后发射出数据流集合
  6. window():缓存发射的数据流到一定数量,随后发射出新的事件流

3.3 Filtering Observables (过滤操作符)

1、debounce

debounce操作符会去过滤掉发射速率过快的数据项,下面的例子onNext事件可以想象成按钮的点击事件,如果在2秒种内频繁的点击,则其点击事件会被忽略,当i为3的除数的时候,发射的事件的时间会超过规定忽略事件的时间,那么则允许触发点击事件。这就有点像我们频繁点击按钮,但始终只会触发一次点击事件,这样就不会导致重复去响应点击事件

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 100; i++) {if (i % 3 == 0) {Thread.sleep(3000);} else {Thread.sleep(1000);}emitter.onNext(i);}}}).debounce(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=2
onNext=5
onNext=8
onNext=11
onNext=14
......
复制代码

2、distinct

distinct操作符会过滤重复发送的数据项

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).distinct().subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
复制代码

3、elementAt

elementAt操作符只取指定的角标的事件

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).elementAt(0).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=1
复制代码

4、filter

filter操作符可以过滤指定函数的数据项

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer > 2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=3
onNext=4
onNext=3
复制代码

5、first

first操作符只发射第一项数据项

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).first(7).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=1
复制代码

6、ignoreElements

ignoreElements操作符不发射任何数据,只发射事件流的终止通知

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).ignoreElements().subscribe(new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
复制代码

输出

onComplete
复制代码

7、last

last操作符只发射最后一项数据

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).last(7).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=3
复制代码

8、sample

sample操作符会在指定的事件内从数据项中采集所需要的数据,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void sample() {Observable.interval(1, TimeUnit.SECONDS).sample(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}
复制代码

输出

onNext=2
onNext=4
onNext=6
onNext=8
复制代码

9、skip

skip操作符可以忽略事件流发射的前N项数据项,只保留之后的数据

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8).skip(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer i) throws Exception {System.out.println("onNext=" + i);}});
}
复制代码

输出

onNext=4
onNext=5
onNext=6
onNext=7
onNext=8
复制代码

10、skipLast

skipLast操作符可以抑制事件流发射的后N项数据

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8).skipLast(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer i) throws Exception {System.out.println("onNext=" + i);}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

11、take

take操作符可以在事件流中只发射前面的N项数据

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8).take(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer i) throws Exception {System.out.println("onNext=" + i);}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
复制代码

12、takeLast

takeLast操作符事件流只发射数据流的后N项数据项,忽略前面的数据项

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8).takeLast(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer i) throws Exception {System.out.println("onNext=" + i);}});
}
复制代码

输出

onNext=6
onNext=7
onNext=8
复制代码

还有一个操作符叫takeLastBuffer,它和takeLast类似,,唯一的不同是它把所有的数据项收集到一个List再发射,而不是依次发射一个

13、小结

  1. debounce():事件流只发射规定范围时间内的数据项
  2. distinct():事件流只发射不重复的数据项
  3. elementAt():事件流只发射第N个数据项
  4. filter():事件流只发射符合规定函数的数据项
  5. first():事件流只发射第一个数据项
  6. ignoreElements():忽略事件流的发射,只发射事件流的终止事件
  7. last():事件流只发射最后一项数据项
  8. sample():事件流对指定的时间间隔进行数据项的采样
  9. skip():事件流忽略前N个数据项
  10. skipLast():事件流忽略后N个数据项
  11. take():事件流只发射前N个数据项
  12. takeLast():事件流只发射后N个数据项

3.4 Combining Observables (组合操作符)

1、merge/concat

merge操作符可以合并两个事件流,如果在merge操作符上增加延时发送的操作,那么就会导致其发射的数据项是无序的,会跟着发射的时间点进行合并。虽然是将两个事件流合并成一个事件流进行发射,但在最终的一个事件流中,发射出来的却是两次数据流。由于concat操作符和merge操作符的效果是一样的,这里只举一例

merge和concat的区别

  • merge():合并后发射的数据项是无序的
  • concat():合并后发射的数据项是有序的
public static void main(String[] args) {Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");Observable.merge(just1, just2).subscribe(new Consumer<Serializable>() {@Overridepublic void accept(Serializable serializable) throws Exception {System.out.println("onNext=" + serializable.toString());}});
}
复制代码

输出

onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

2、zip

zip操作符是将两个数据流进行指定的函数规则合并

public static void main(String[] args) {Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");Observable.zip(just1, just2, new BiFunction<String, String, String>() {@Overridepublic String apply(String s, String s2) throws Exception {return s + s2;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}
复制代码

输出

onNext=A1
onNext=B2
onNext=C3
onNext=D4
onNext=E5
复制代码

3、startWith

startWith操作符是将另一个数据流合并到原数据流的开头

public static void main(String[] args) {Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");just1.startWith(just2).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
复制代码

4、join

join操作符是有时间期限的合并操作符,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void join() {Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);just1.join(just2, new Function<String, ObservableSource<Long>>() {@Overridepublic ObservableSource<Long> apply(String s) throws Exception {return Observable.timer(3, TimeUnit.SECONDS);}}, new Function<Long, ObservableSource<Long>>() {@Overridepublic ObservableSource<Long> apply(Long l) throws Exception {return Observable.timer(8, TimeUnit.SECONDS);}}, new BiFunction<String, Long, String>() {@Overridepublic String apply(String s, Long l) throws Exception {return s + l;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}
复制代码

join操作符有三个函数需要设置

  • 第一个函数:规定just2的过期期限
  • 第二个函数:规定just1的过期期限
  • 第三个函数:规定just1和just2的合并规则

由于just2的期限只有3秒的时间,而just2延时1秒发送一次,所以just2只发射了2次,其输出的结果就只能和just2输出的两次进行合并,其输出格式有点类似我们的排列组合

onNext=A0
onNext=B0
onNext=C0
onNext=D0
onNext=E0
onNext=A1
onNext=B1
onNext=C1
onNext=D1
onNext=E1
复制代码

5、combineLatest

conbineLatest操作符会寻找其他事件流最近发射的数据流进行合并,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public static String[] str = {"A", "B", "C", "D", "E"};public void combineLatest() {Observable<String> just1 = Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {@Overridepublic String apply(Long aLong) throws Exception {return str[(int) (aLong % 5)];}});Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);Observable.combineLatest(just1, just2, new BiFunction<String, Long, String>() {@Overridepublic String apply(String s, Long l) throws Exception {return s + l;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}
复制代码

输出

onNext=A0
onNext=B0
onNext=B1
onNext=C1
onNext=C2
onNext=D2
onNext=D3
onNext=E3
onNext=E4
onNext=A4
onNext=A5
复制代码

6、小结

  1. merge()/concat():无序/有序的合并两个数据流
  2. zip():两个数据流的数据项合并成一个数据流一同发出
  3. startWith():将待合并的数据流放在自身前面一同发出
  4. join():将数据流进行排列组合发出,不过数据流都是有时间期限的
  5. combineLatest():合并最近发射出的数据项成数据流一同发出

3.5 Error Handling Operators(错误处理操作符)

1、onErrorReturn

onErrorReturn操作符表示当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted()

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if(i == 4){e.onError(new Exception("onError crash"));}e.onNext(i);}}}).onErrorReturn(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {return -1;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
复制代码

2、onErrorResumeNext

onErrorResumeNext操作符表示当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted()

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if(i == 4){e.onError(new Exception("onError crash"));}e.onNext(i);}}}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {@Overridepublic ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {return Observable.just(-1);}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
复制代码

3、onExceptionResumeNext

onExceptionResumeNext操作符表示当错误发生时,如果onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,如果onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if(i == 4){e.onError(new Exception("onException crash"));//e.onError(new Error("onError crash"));}e.onNext(i);}}}).onExceptionResumeNext(new ObservableSource<Integer>() {@Overridepublic void subscribe(Observer<? super Integer> observer) {//备用事件流observer.onNext(8);}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=8
复制代码

4、retry

retry操作符表示当错误发生时,发射器会重新发射

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if (i == 4) {e.onError(new Exception("onError crash"));}e.onNext(i);}}}).retry(1).onErrorReturn(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {return -1;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
复制代码
  • retry():表示重试无限次
  • retry(long times):表示重试指定次数
  • retry(Func predicate):可以根据函数参数中的Throwable类型和重试次数决定本次需不需要重试

5、retryWhen

retryWhen操作符和retry操作符相似,区别在于retryWhen将错误Throwable传递给了函数进行处理并产生新的事件流进行处理,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

private static int retryCount = 0;
private static int maxRetries = 2;public void retryWhen(){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if (i == 4) {e.onError(new Exception("onError crash"));}e.onNext(i);}}}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Throwable throwable) throws Exception {if (++retryCount <= maxRetries) {// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).System.out.println("get error, it will try after " + 1 + " seconds, retry count " + retryCount);return Observable.timer(1, TimeUnit.SECONDS);}return Observable.error(throwable);}});}}).onErrorReturn(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {return -1;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 1
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 2
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
复制代码

6、小结

  • onErrorReturn():当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted()
  • onErrorResumeNext():当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted()
  • onExceptionResumeNext():当错误发生时,如果onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,如果onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射
  • retry():当错误发生时,发射器会重新发射
  • retryWhen():当错误发生时,根据Tharowble类型决定发射器是否重新发射

3.6 Observable Utility Operators(辅助性操作符)

1、delay

delay操作符可以延时某次事件发送的数据流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void deley() {Observable.just(1, 2, 3, 4, 5).delay(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

delay和delaySubscription的效果是一样的,只不过delay是对数据流的延时,而delaySubscription是对事件流的延时

2、do

do操作符可以监听整个事件流的生命周期,do操作符分为多个类型,而且每个类型的作用都不同

  1. doOnNext():接收每次发送的数据项
  2. doOnEach():接收每次发送的数据项
  3. doOnSubscribe():当事件流被订阅时被调用
  4. doOnDispose():当事件流被释放时被调用
  5. doOnComplete():当事件流被正常终止时被调用
  6. doOnError():当事件流被异常终止时被调用
  7. doOnTerminate():当事件流被终止之前被调用,无论正常终止还是异常终止都会调用
  8. doFinally():当事件流被终止之后被调用,无论正常终止还是异常终止都会调用
public static void main(String[] args) {Observable.just(1, 2, 3).doOnNext(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("doOnNext");}}).doOnEach(new Consumer<Notification<Integer>>() {@Overridepublic void accept(Notification<Integer> integerNotification) throws Exception {System.out.println("doOnEach");}}).doOnSubscribe(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) throws Exception {System.out.println("doOnSubscribe");}}).doOnDispose(new Action() {@Overridepublic void run() throws Exception {System.out.println("doOnDispose");}}).doOnTerminate(new Action() {@Overridepublic void run() throws Exception {System.out.println("doOnTerminate");}}).doOnError(new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("doOnError");}}).doOnComplete(new Action() {@Overridepublic void run() throws Exception {System.out.println("doOnComplete");}}).doFinally(new Action() {@Overridepublic void run() throws Exception {System.out.println("doFinally");}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

doOnSubscribe
doOnNext
doOnEach
onNext=1
doOnNext
doOnEach
onNext=2
doOnNext
doOnEach
onNext=3
doOnEach
doOnTerminate
doOnComplete
doFinally
复制代码

3、materialize/dematerialize

materialize操作符将发射出的数据项转换成为一个Notification对象,而dematerialize操作符则是跟materialize操作符相反,这两个操作符有点类似我们Java对象的装箱和拆箱功能

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).materialize().subscribe(new Consumer<Notification<Integer>>() {@Overridepublic void accept(Notification<Integer> integerNotification) throws Exception {System.out.println("onNext=" + integerNotification.getValue());}});Observable.just(1, 2, 3, 4, 5).materialize().dematerialize().subscribe(new Consumer<Object>() {@Overridepublic void accept(Object object) throws Exception {System.out.println("onNext=" + object.toString());}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=null
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

输出的时候,materialize会输出多个null,是因为null的事件为onCompleted事件,而dematerialize把onCompleted事件给去掉了,这个原因也可以从图片中看出来

4、serialize

serialize操作符可以将异步执行的事件流进行同步操作,直到事件流结束

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).serialize().subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

5、timeInterval

timeInterval操作符可以将发射的数据项转换为带有时间间隔的数据项,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void timeInterval(){Observable.interval(2, TimeUnit.SECONDS).timeInterval(TimeUnit.SECONDS).subscribe(new Consumer<Timed<Long>>() {@Overridepublic void accept(Timed<Long> longTimed) throws Exception {System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());}});
}
复制代码

输出

onNext=0 timeInterval=2
onNext=1 timeInterval=2
onNext=2 timeInterval=2
onNext=3 timeInterval=2
onNext=4 timeInterval=2
复制代码

6、timeout

timeout操作符表示当发射的数据项超过了规定的限制时间,则发射onError事件,这里直接让程序超过规定的限制时间,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void timeOut(){Observable.interval(2, TimeUnit.SECONDS).timeout(1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}});
}
复制代码

输出

onError
复制代码

7、timestamp

timestamp操作符会给每个发射的数据项带上时间戳,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void timeStamp() {Observable.interval(2, TimeUnit.SECONDS).timestamp(TimeUnit.MILLISECONDS).subscribe(new Consumer<Timed<Long>>() {@Overridepublic void accept(Timed<Long> longTimed) throws Exception {System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());}});
}
复制代码

输出

onNext=0 timeInterval=1525755132132
onNext=1 timeInterval=1525755134168
onNext=2 timeInterval=1525755136132
onNext=3 timeInterval=1525755138132
复制代码

8、using

using操作符可以让你的事件流存在一次性的数据项,即用完就将资源释放掉

using操作符接受三个参数:

  • 一个用户创建一次性资源的工厂函数
  • 一个用于创建一次性事件的工厂函数
  • 一个用于释放资源的函数
public static class UserBean {String name;int age;public UserBean(String name, int age) {this.name = name;this.age = age;}
}public static void main(String[] args) {Observable.using(new Callable<UserBean>() {@Overridepublic UserBean call() throws Exception {//从网络中获取某个对象return new UserBean("俊俊俊", 22);}}, new Function<UserBean, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(UserBean userBean) throws Exception {//拿出你想要的资源return Observable.just(userBean.name);}}, new Consumer<UserBean>() {@Overridepublic void accept(UserBean userBean) throws Exception {//释放对象userBean = null;}}).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("onNext=" + o.toString());}});
}
复制代码

输出

onNext=俊俊俊
复制代码

9、to

to操作符可以将数据流中的数据项进行集合的转换,to操作符分为多个类型,而且每个类型的作用都不同

  1. toList():转换成List类型的集合
  2. toMap():转换成Map类型的集合
  3. toMultimap():转换成一对多(即<A类型,List<B类型>>)的Map类型的集合
  4. toSortedList():转换成具有排序的List类型的集合
public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).toList().subscribe(new Consumer<List<Integer>>() {@Overridepublic void accept(List<Integer> integers) throws Exception {System.out.println("onNext=" + integers.toString());}});
}
复制代码

输出

onNext=[1, 2, 3, 4, 5]
复制代码

10、小结

  1. delay():延迟事件发射的数据项
  2. do():监听事件流的生命周期
  3. materialize()/dematerialize():对事件流进行装箱/拆箱
  4. serialize():同步事件流的发射
  5. timeInterval():对事件流增加时间间隔
  6. timeout():对事件流增加限定时间
  7. timestamp():对事件流增加时间戳
  8. using():对事件流增加一次性的资源
  9. to():对数据流中的数据项进行集合的转换

3.7 Conditional and Boolean Operators(条件和布尔操作符)

1、all

all操作符表示对所有数据项进行校验,如果所有都通过则返回true,否则返回false

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).all(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer > 0;}}).subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {System.out.println("onNext=" + aBoolean);}});
}
复制代码

输出

onNext=true
复制代码

2、contains

contains操作符表示事件流中发射的数据项当中是否包含有指定的数据项

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).contains(2).subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {System.out.println("onNext=" + aBoolean);}});
}
复制代码

输出

onNext=true
复制代码

3、amb

amb操作符在多个事件流中只发射最先发出数据的事件流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void amb(){List<Observable<Integer>> list = new ArrayList<>();list.add(Observable.just(1, 2, 3).delay(3, TimeUnit.SECONDS));list.add(Observable.just(4, 5, 6).delay(2, TimeUnit.SECONDS));list.add(Observable.just(7, 8, 9).delay(1, TimeUnit.SECONDS));Observable.amb(list).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=7
onNext=8
onNext=9
复制代码

4、defaultIfEmpty

defaultIfEmpty操作符会在事件流没有发射任何数据时,发射一个指定的默认值

public static void main(String[] args) {Observable.empty().defaultIfEmpty(-1).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("onNext=" + o.toString());}});
}
复制代码

输出

onNext=-1
复制代码

5、sequenceEqual

sequenceEqual操作符可以判断两个数据流是否完全相等

public static void main(String[] args) {Observable<Integer> just1 = Observable.just(1, 2, 3);Observable<Integer> just2 = Observable.just(1, 2, 3);Observable.sequenceEqual(just1, just2).subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {System.out.println("onNext=" + aBoolean);}});
}
复制代码

输出

onNext=true
复制代码

6、skipUntil/skipWhile

skipUtils操作符是在两个事件流发射的时候,第一个事件流会等到第二个事件流开始发射的时候,第一个事件流才开始发射出数据项,它会忽略之前发射过的数据项,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void skipUntil(){Observable<Long> just1 = Observable.interval(1, TimeUnit.SECONDS);Observable<Integer> just2 = Observable.just(8).delay(3, TimeUnit.SECONDS);just1.skipUntil(just2).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}
复制代码

输出

onNext=2
onNext=3
onNext=4
onNext=5
......
复制代码

skipWhile操作符是在一个事件流中,从第一项数据项开始判断是否符合某个特定条件,如果判断值返回true,则不发射该数据项,继续从下一个数据项执行同样的判断,直到某个数据项的判断值返回false时,则终止判断,发射剩余的所有数据项。需要注意的是,这里只要一次判断为false则后面的所有数据项都不判断

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).skipWhile(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer < 3;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=3
onNext=4
onNext=5
复制代码

7、takeUntil/takeWhile

takeUntil操作符跟skipUntil类似,skip表示跳过的意思,而take表示取值的意思,takeUntil操作符是在两个事件流发射的时候,第一个事件流会等到第二个事件流开始发射的时候,第一个事件流停止发射数据项,它会忽略之后的数据项,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void takeUntil(){Observable<Long> just1 = Observable.interval(1, TimeUnit.SECONDS);Observable<Integer> just2 = Observable.just(8).delay(3, TimeUnit.SECONDS);just1.takeUntil(just2).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}
复制代码

输出

onNext=0
onNext=1
复制代码

takeWhile操作符是在一个事件流中,从第一项数据项开始判断是否符合某个特定条件,如果判断值返回true,则发射该数据项,继续从下一个数据项执行同样的判断,直到某个数据项的判断值返回false时,则终止判断,且剩余的所有数据项不会发射。需要注意的是,这里只要一次判断为false则后面的所有数据项都不判断

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 0).takeWhile(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer < 3;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=1
onNext=2
复制代码

8、小结

  1. all():对所有数据项进行校验
  2. contains():所有数据项是否包含指定数据项
  3. amb():多个事件流中,只发射最先发出的事件流
  4. defaultIfEmpty():如果数据流为空则发射默认数据项
  5. sequenceEqual():判断两个数据流是否完全相等
  6. skipUntil():当两个事件流发射时,第一个事件流的数据项会等到第二个事件流开始发射时才进行发射
  7. skipWhile():当发射的数据流达到某种条件时,才开始发射剩余所有数据项
  8. takeUntil():当两个事件流发射时,第一个事件流的数据项会等到第二个事件流开始发射时终止发射
  9. takeWhile():当发射的数据流达到某种条件时,才停止发射剩余所有数据项

3.8 Mathematical and Aggregate Operators(数学运算及聚合操作符)

数学运算操作符比较简单,对于数学运算操作符会放在小结中介绍,下面是对聚合操作符做介绍

1、reduce

reduce操作符跟scan操作符是一样的,会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。reduce与scan的唯一区别在于reduce只输出最后的结果,而scan会输出每一次的结果,这点从图片中也能看出来

public static void main(String[] args) {Observable.just(8, 2, 13, 1, 15).reduce(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {return integer < integer2 ? integer : integer2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer item) throws Exception {System.out.println("onNext=" + item);}});
}
复制代码

输出

onNext=1
复制代码

2、collect

collect操作符跟reduce操作符类似,只不过collect增加了一个可改变数据结构的函数供我们处理

public static void main(String[] args) {Observable.just(8, 2, 13, 1, 15).collect(new Callable<String>() {@Overridepublic String call() throws Exception {return "A";}}, new BiConsumer<String, Integer>() {@Overridepublic void accept(String s, Integer integer) throws Exception {System.out.println("onNext=" + s + "  " + integer);}}).subscribe(new BiConsumer<String, Throwable>() {@Overridepublic void accept(String s, Throwable throwable) throws Exception {System.out.println("onNext2=" + s);}});
}
复制代码

输出

onNext=A  8
onNext=A  2
onNext=A  13
onNext=A  1
onNext=A  15
onNext2=A
复制代码

3、小结

数学运算操作符的使用需要在gradle中添加rxjava-math的依赖

implementation 'io.reactivex:rxjava-math:1.0.0'
复制代码
  1. average():求所有数据项的平均值
  2. max/min():求所有数据项的最大或最小值
  3. sum():求所有数据项的总和
  4. reduce():对上一轮处理过后的数据流进行函数处理,只返回最后的结果
  5. collect():对上一轮处理过后的数据流进行函数处理,可改变原始的数据结构

3.9 Connectable Observable(连接操作符)

1、publish

publish操作符是将普通的事件流转化成可连接的事件流ConnectableObservable,它与普通的事件流不一样,ConnectableObservable在没有调用connect()进行连接的情况下,事件流是不会发射数据的

public static void main(String[] args) {ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();connectableObservable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

复制代码

2、connect

connect操作符是将可连接的事件流进行连接并开始发射数据。这个方法需要注意的是,connect操作符必须在所有事件流被订阅后才开始发射数据。如果放在subscribe之前的话,则订阅者是无法收到数据的。如果后面还有订阅者将订阅此次事件流,则会丢失已经调用了connect后,发射出去的数据项

public static void main(String[] args) {ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();connectableObservable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});connectableObservable.connect();
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

3、refCount

refCount操作符可以将可连接的事件流转换成普通的事件流

public static void main(String[] args) {ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();connectableObservable.refCount().subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
复制代码

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码

4、replay

replay操作符将弥补connect操作符的缺陷,由于connect会让后面进行订阅的订阅者丢失之前发射出去的数据项,所以使用replay操作符可以将发射出去的数据项进行缓存,这样使得后面的订阅者都可以获得完整的数据项。这里需要注意的是,replay操作符不能和publish操作符同时使用,否则将不会发射数据。例子中,读者可以将replay操作符换成publish操作符,这时候的输出就会丢失前2秒发射的数据项

public void replay(){ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS).replay();connectableObservable.connect();connectableObservable.delaySubscription(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}
复制代码

输出

onNext=0
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
......
复制代码

5、小结

  1. publish():将普通的事件流转换成可连接的事件流
  2. connect():将可连接的事件流进行连接并发射数据
  3. refCount():将可连接的事件流转换成普通的事件流
  4. replay():缓存可连接的事件流中的所有数据项

转载于:https://juejin.im/post/5cd8dc55f265da039f0f3030

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/387704.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

virtualbox 使用

实现文件拖拽功能 1、设备 -- 安装增强功能 -- /bin/sh VboxLinuxaddition.run -- reboot 2、设备 -- 拖放 -- 双向 3、虚拟机 -- 设置 -- 存储 -- 控制器&#xff1a;SATA -- 勾选 使用主机输入输出&#xff08;I\O 缓存&#xff09; 4、虚拟机硬盘 -- 勾选固态驱动器 转载于…

linux安装mysql 5.6.33

.到MySQL官网下载mysql编译好的二进制安装包&#xff0c;在下载页面Select Platform:选项选择linux-generic&#xff0c;然后把页面拉到底部&#xff0c;64位系统下载Linux - Generic (glibc 2.5) (x86, 64-bit)&#xff0c;下载后文件名&#xff1a;mysql-5.6.33-linux-glibc2…

Go 函数特性和网络爬虫示例

爬取页面 这篇通过网络爬虫的示例&#xff0c;来了解 Go 语言的递归、多返回值、延迟函数调用、匿名函数等方面的函数特性。首先是爬虫的基础示例&#xff0c;下面两个例子展示通过 net/http 包来爬取页面的内容。 获取一个 URL 下面的程序展示从互联网获取信息&#xff0c;获…

Qt的安装和使用中的常见问题(详细版)

对于太长不看的朋友&#xff0c;可参考Qt的安装和使用中的常见问题&#xff08;简略版&#xff09;。 目录 1、引入2、Qt简介3、Qt版本 3.1 查看安装的Qt版本3.2 查看当前项目使用的Qt版本3.3 查看当前项目使用的QtCreator版本3.4 Linux命令行下查看和使用不同版本的Qt4、Qt模块…

python与C#的互相调用

python与C#的互相调用一、C#调用python新建一个项目&#xff0c;添加引用&#xff1a;IronPython.dll&#xff0c;Microsoft.Scripting.dll&#xff08;在IronPython的安装目录中&#xff09;。创建一个文本文件命名为hello.py,把该文件添加的当前的项目中,并设置为总是输出。#…

各行业大数据可视化界面参考

转载于:https://www.cnblogs.com/wangsongbai/p/10178096.html

mysql远程连接 Host * is not allowed to connect to this MySQL server

localhost改成% 进入mysql的BIN目录 代码如下 复制代码 mysql -u root -p mysql>use mysql; mysql>update user set host ’%where user ’root’; mysql>flush privileges; 具体分析 1、在本机登入mysql后&#xff0c;更改“mysql”数据库里的“user”表里的“h…

今日听闻这几款手机软件比较火爆 果然名不虚传!

如今的时代&#xff0c;智能手机已经成为我们生活中不可缺少的一部分&#xff0c;大家之所以这么爱玩手机&#xff0c;其实并不是手机本身有多么吸引人&#xff0c;而是安装在手机上的各种各样的APP&#xff0c;比如各种社交软件、音频软件、购物软件以及地图软件等等。下面我们…

setdefault()方法

setdefault()方法 描述 字典 setdefault() 方法和 get()方法类似,返回指定键的值&#xff0c;如果键不在字典中&#xff0c;将会添加键并将值设置为一个指定值&#xff0c;默认为None。 get() 和 setdefault() 区别&#xff1a; setdefault() 返回的键如果不在字典中&#xff0…

Hive2.1.1、Hadoop2.7.3 部署

本文以远程模式安装Hive2.1.1将hive的元数据放置在MySQL数据库中。 1 安装mysql数据库 sudo apt-get install mysql-server11 重启mysql服务使得配置文件生效 sudo service mysql restart11 创建hive专用账户 CREATE USER hive% IDENTIFIED BY 123456;11 给hive账户授予所有权限…

Django 的简单ajax

需要通过ajax实现局部刷新 js代码 $(#guo-sou-ajax).click(function(){ #获取id为guo-sou-ajax点击后的信号console.log($(this).attr("data-action")) $.ajax({ #调用ajaxurl: $(this).attr("data-action"), #url保存在标签里面的data-actio…

postman提取返回值

Postman是做接口测试的&#xff0c;但是很多接口并不是直接就能测&#xff0c;有的需要一些预处理。比如说身份认证&#xff0c;需要传递一个token。如果做网页测试&#xff0c;一般打开登陆界面的时候就会生成一个token&#xff0c;如果返回值是json格式&#xff0c;用Postman…

docker下用keepalived+Haproxy实现高可用负载均衡集群

启动keepalived后宿主机无法ping通用keepalived&#xff0c;报错&#xff1a; [rootlocalhost ~]# ping 172.18.0.15 PING 172.18.0.15 (172.18.0.15) 56(84) bytes of data. From 172.18.0.1 icmp_seq1 Destination Host Unreachable From 172.18.0.1 icmp_seq2 Destination H…

hadoop hive 2.1.1 将Hive启动为服务

我们之前使用的Shell方式与Hive交互只是Hive交互方式中的一种&#xff0c;还有一种就是将Hive启动为服务&#xff0c;然后运行在一个节点上&#xff0c;那么剩下的节点就可以使用客户端来连接它&#xff0c;从而也可以使用Hive的数据分析服务。 前台模式 可以使用下面的命令来将…

大数据学习要知道的十大发展趋势,以及学习大数据的几点建议

2016年&#xff0c;近40%的公司正在实施和扩展大数据技术应用&#xff0c;另有30%的公司计划在未来12个月内采用大数据技术&#xff0c;62.5%的公司现在至少有一个大数据项目投入生产&#xff0c;只有5.4%的公司没有大数据应用计划&#xff0c;或者是没有正在进行的大数据项目&…

pickle 模块

import pickle # class Elephant:def __init__(self, name, weight, height):self.name nameself.weight weightself.height heightdef tiaoxi(self):print(f"{self.name}大象特别喜欢调戏人")# e Elephant("宝宝", "185T", "175"…

Hiv:SQuirrel连接hive配置

熟悉了Sqlserver的sqlserver management studio、Oracle的PL/SQL可视化数据库查询分析工具&#xff0c;在刚开始使用hive、phoenix等类sql组件时&#xff0c;一直在苦苦搜寻是否也有类似的工具&#xff0c;不负所望&#xff0c;SQuirrel Sql client 可视化数据库工具基本可满足…

MariaDB 数据库索引详解(9)

MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可MariaDB的目的是完全兼容MySQL,包括API和命令行,MySQL由于现在闭源了,而能轻松成为MySQL的代替品.在存储引擎方面,使用XtraDB来代替MySQL的InnoDB,MariaDB由MySQL的创始人Michael Widenius主导开发…

Kettle连接Hive2的问题解决思路

在kettle上当选择好HIVE2连接时候有报错 org.pentaho.di.core.exception.KettleDatabaseException: Error occured while trying to connect to the databaseError connecting to database: (using class org.apache.hive.jdbc.HiveDriver)org/apache/http/client/CookieStore…

windows下cmd常用的命令

2019独角兽企业重金招聘Python工程师标准>>> windows下常用的命令指示行: windows下 CMD比较常见的命令1. gpedit.msc-----组策略 2. sndrec32-------录音机 3. Nslookup-------IP地址侦测器 4. explorer-------打开资源管理器 5. logoff---------注销命令 6. …