事件流 数据流
如何组合和变换流
如何订阅任何可观察的数据流
目录
一.源码解析
二.基本使用
步骤
一、创建Observable
二、创建Observer
三、使用subscribe进行订阅
四、取消订阅 Disposable
五种Observable(被观察者)
Observable 和 Observer
Cold Observable
定义:
1. 对应操作符:
Hot Observable
定义:
对应操作符:
冷流 to 热流
ConnectableObservable
(需要调用publish,connect)
Subject / Processor(支持背压)
Subject
Processor
热流 to 冷流
reCount操作符
share操作符
Flowable(背压) 和 Subscriber
背压
Single 和 SingleObserver
Completable 和 CompletableObserver
Maybe 和 MaybeObserver
转换器 - Transformer
操作符
创建操作符
线程操作符
subscribeOn
observeOn()
变换|过滤操作符
条件|布尔操作符
合并操作符|连接操作符
其他操作符
do
compose
线程调度
三.实战使用
四.注意事项:
一.源码解析
todo
二.基本使用
步骤
一、创建Observable
Observable的字面意思是被观察者,使用RxJava时需要创建一个被观察者,它会决定什么时候触发事件以及触发怎样的事件。
有点类似在上游发送命令,并且可以指定同异步或者操作模块的顺序与次数。
二、创建Observer
Observer即观察者,可以在未来某个时刻响应Observable的通知,它可以在不同的线程中执行任务。
三、使用subscribe进行订阅
创建了Observable和Observer之后,使用subscribe将他们链接起来,使得整个上下游能衔接起来实现链式调用。
四、取消订阅 Disposable
Observable.subscribe()方法会返回一个Disposable对象,可以用它来取消订阅
CompositeDisposable 复合订阅,可以add很多Disposable然后一起取消
五种Observable(被观察者)
类型 | 描述 |
Observable | 能够发射0或n个数据,并以成功或错误事件终止。 |
Flowable | 能够发射0或n个数据,并以成功或错误事件终止。支持背压,可以控制数据源发射的速度。 |
Single | 只发射单个数据或错误事件。 |
Completable | 从来不发射数据,只处理 onComplete 和 onError事件。 可以看成Rx的Runnbale。 |
Maybe | 能够发射0或者1个数据,要么成功,要么失败。有点类似Optional。 |
Observable 和 Observer
Cold Observable
定义:
只有观察者订阅了,才开始执行发射数据的代码。并且Cold Observable和Observer只能是一对一的关系。当有多个不同的订阅者时,消息是重新完整发送的,也就是说,对于Cold Observeable来说,有多个Observer存在的时候,各自的事件是独立的。
多个订阅的sunbscribe(或者说观察者)事件各自独立。
1. 对应操作符:
just,create,range,fromXXX
Hot Observable
定义:
Hot Observable 无论有没有观察者订阅,事件始终都会发生。当Hot Observable有多个订阅者时,Hot Observable与订阅者们的关系是一对多的关系。可以与多个订阅者共享信息。
多个订阅的sunbscribe(或者说观察者)共享同一事件。
对应操作符:
..
冷流 to 热流
ConnectableObservable
(需要调用publish,connect)
Subject / Processor(支持背压)
Subject
Subject类型 | 功能描述 |
AsyncSubject | 接受onComplete之前的最后一个数据 |
BehaviorSubject | 接收到订阅前的最后一条数据和订阅后的所有数据。 |
PublishSubject | 接收到订阅之后的所有数据 |
ReplaySubject | 接收到所有的数据,包括订阅之前的所有数据和订阅之后的所有数据。 |
Processor
支持背压!
Processor和Subject用法一样,只是Processor支持背压。
它也包含4中类型:AsyncProcessor, BehaviorProcessor,ReplayProcessor,PublishProcessor。
用法同Subject一样。
热流 to 冷流
reCount操作符
share操作符
Flowable(背压) 和 Subscriber
Rxjava2.0,Observable不再支持背压,改由Flowable来支持背压
背压
在RxJava中,会遇到被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息,这就是典型的背压(Back Pressure)场景。
BackPressure经常被翻译为背压,背压的字面意思比较晦涩,难以理解。它是指在异步场景下,被观察者发送事件速度远快于观察者处理的速度,从而导致下游的buffer溢出,这种现象叫做背压。
背压只在异步场景出现,即被观察者和观察者处于不同的线程中。
RxJava2 背压_rxjava 背压_xiaopangcame的博客-CSDN博客
Flowable.create<Int>({for (num in 0..127) {it.onNext(num)}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe {LogUtil.d("subscribe : " + it)}
Flowable的异步缓存池默认大小128
背压策略BackpressureStrategy | 说明 |
MISSING | 没有指定策略,不会对通过OnNext发送的数据做处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLastest())指定背压策略 |
ERROR | 如果放入Flowable的异步缓存池里的数据超限了。则会抛出MissingBackPressureException异常 |
BUFFER | 表示Flowable的异步缓存池同Observable的一样,没有固定大小,不会抛出MissingBackPressureException,但是会OOM |
DROP | 如果Flowable的异步缓存池满了,就丢弃 |
LASTEST | 和DROP一样,如果缓存池满了,就会丢掉将要放入缓存池中的数据 但是无论如何,会将最后一条数据强行放入缓存池中(此时总数变为缓存池大小+1) |
Single 和 SingleObserver
Completable 和 CompletableObserver
Maybe 和 MaybeObserver
转换器 - Transformer
Transformer能够将一个 Observable/Flowable/SIngle/Completable/Maybe 对象转换为另一个 Observable/Flowable/SIngle/Completable/Maybe对象,与调用一系列的内联操作符一摸一样。
与compose操作符结合使用
示例1. 将发射Int的Observable转换为发射字符串
Observable.just(1, 2, 3, 4, 5, 6).compose { upstream ->upstream.map { integer ->integer.toString()}}.subscribe { s -> print(s) }
示例2.封装切换线程操作
object RxJavaUtils {//封装切换线程操作fun <T> observableToMain(): ObservableTransformer<T, T> {return ObservableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}}@SuppressLint("CheckResult") fun main() {Observable.just(1, 2, 3, 4, 5, 6).compose(RxJavaUtils.observableToMain()).subscribe { s -> print(s) }}
操作符
创建操作符
创建操作符 | 功能 |
just | |
from | |
create | |
defer | |
range | |
interval | |
timer | |
empty | |
error | |
never |
线程操作符
subscribeOn
subscribeOn()指示对数据的操作运行在特定的线程调度器Scheduler上
subscribeOn()指定工作执行所在的线程池,它的位置无关紧要,它可以在流的任何位置,
如果流中有多个实例subscribeOn,则只有第一个具有实际效果。
observeOn()
指定下游操作运行在特定的线程调度器Scheduler上
变换|过滤操作符
变换操作符 | 描述 | |
map | 对原始Observable发射的每一项数据应用一个函数,执行变换操作 | |
flatMap | 将发射源变为多个Observable,然后将他们组合后放进一个单独的Observable | 和map的区别: 这两个在本质上是一样的,都是 map 操作,即对流形式的传入数据进行处理返回一个数据。但是区别方面从字面上就可以体现出来,flatMap 比 map 多了一个 flat 操作,也就是 “展平/扁平化” 处理的意思。 所以 flatMap 是一个 map 和一个 flat 操作的组合。其首先将一个函数应用于元素,然后将其展平,当你需要将 [[a,b,c],[d,e,f],[x,y,z]] 具有两个级别的数据结构转换为 [a,b,c,d,e,f,x,y,z] 这样单层的数据结构时,就选择使用 flatMap 处理。如果是 [a,b,c,d,e,f,x,y,z] 转换为大写 [A,B,C,D,E,F,X,Y,Z] 这样单层转换,就使用 map 即可。 |
switchMap | ||
scan | ||
groupBy | 将一个Observable拆分为一些Observables集合 | 比如区分1~9中哪些是奇数哪些是偶数 |
buffer | 将数据缓存并发出 | 打印0~10,buffer 2 :变成了1-2,3-4.。。五个数组 |
window | 将Observable的数据分解为多个Observable窗口发出 | |
cast |
过滤操作符 | 描述 | |
distinct | 过滤掉重复的数据项,只允许还没发射过的数据项通过 | 1,2,3,3,4,4,5,5-> 1,2,3,4,5 |
distinctUntilChanged | 与distinct的区别是,只判断一个数据和它的直接前驱是否不同 | 1,2,3,3,5-> 1,2,5 |
filter | 按规则过滤 | |
takeLast | ||
last | ||
....太多 |
条件|布尔操作符
条件操作符 | 说明 |
amb | 给定多个Observable,只让第一个发射数据的Observable发射全部数据 |
defaultIfEmpty | 发射来自原始Observable的数据,如果原始Observable没有发射数据,则发射一个默认数据 |
skipUntil | 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的数据 |
skipWhile | 丢弃原始Observable的数据,直到一个特定的条件为假,然后再发射原始Observable的数据 |
takeUntil | 发射原始Observable的数据,直到第二个Observable发射了一个数据 |
takeWhile | 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据 |
布尔操作符 | 说明 |
all | 判断是否所有的数据项都满足条件 |
contains | 判断Observable是否会发射一个指定的值 |
exists 和 isEmpty | 判断Observable是否发射了一个值 |
sequenceEqual | 判断两个Observable发射的序列是否相等 |
合并操作符|连接操作符
合并操作符 | 说明 | |
startWith | 在数据序列的开头增加一项数据 | |
merge | 将多个Observable合并为一个 | 1,2,3 merge 4,5,6 -> 1,2,3,4,5,6 |
mergeDelayError | 将多个Observable合并为一个,让没有错误的Observable完成后再发射错误 | |
zip | 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果 (需要原始的Observable中每一个都发射了数据时触发) | 1,2,3 zip 4,5,6 -> 5,7,9 |
combineLast | 当两个Observable中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果 (原始Observable中任意一个发射了数据时就触发) | 如果A发送了,B没发送咋办? |
join 和 groupJoin | 无论何时,如果一个Observable发射了一个数据项,就需要在另一个Observable发射的数据项定义的时间窗口内,将两个Observable合并发射 | 1,2,3 join 4,5,6-> 1:4 1:5 1:6 2:4 2:5 2:6 3:4 3:5 3:6 |
switchOnnext | 将一个发射Observable的Observable转换成另一个Observable,后者发射这些Observable最近发送的数据 |
连接操作符 | 说明 | |
ConnectableObservable.connect() | 用来触发ConnectableObservable发送数据 | |
Observable.publish() | 将一个Observable转换为一个可连接的Observable(ConnectableObservable) | |
Observable.replay() | 确保所有的订阅者看到相同的数据序列,即使他们在Observable开始发射数据之后才订阅 | 有点像ReplaySubject? replay 之后会被包装成ConnectableObservable ConnectableObservable的线程切换只能通过replay操作实现,普通的subscribeOn和observerOn在ConnectableObservable中不起作用。 replay可以通过指定线程方式来切换线程。 |
ConnectableObservable.refCount() | 让一个可连接的Observable表现得像一个普通的Observable |
其他操作符
do
可以给Observable的生命周期各个阶段加上一系列回调监听。doOnNext, doAfterNext, doOnComplete,都是void函数。
compose
与转换器结合使用
线程调度
角色 | 描述 |
Scheduler | 线程任务调度器 |
Worker | 线程任务的具体执行者 |
三.实战使用
四.注意事项:
behaviorSubject.hide 方法,转化为Observable,防止被从外部更改状态,只暴露状态的修改结果给外部
Observable.hide的用法_observable hide_Flying Rookie的博客-CSDN博客