RxJava 2.0 全体系梳理(持续更新ing)

事件流 数据流

如何组合和变换流

如何订阅任何可观察的数据流

目录

一.源码解析

二.基本使用

步骤

一、创建Observable

二、创建Observer

三、使用subscribe进行订阅

四、取消订阅 Disposable

五种Observable(被观察者)

Observable 和 Observer

Cold Observable

定义:

1. 对应操作符:

Hot Observable

定义:

对应操作符:

冷流 to 热流

ConnectableObservable

(需要调用publish,connect)

Subject / Processor(支持背压)

Subject

Processor

热流 to 冷流

reCount操作符

share操作符

Flowable(背压) 和 Subscriber

背压

Single 和 SingleObserver

Completable 和 CompletableObserver

Maybe 和 MaybeObserver

转换器 - Transformer

操作符

创建操作符

线程操作符

subscribeOn

observeOn()

变换|过滤操作符

条件|布尔操作符

合并操作符|连接操作符

其他操作符

do

compose

线程调度

三.实战使用

四.注意事项:


一.源码解析

todo

二.基本使用

步骤

一、创建Observable

Observable的字面意思是被观察者,使用RxJava时需要创建一个被观察者,它会决定什么时候触发事件以及触发怎样的事件。

有点类似在上游发送命令,并且可以指定同异步或者操作模块的顺序与次数。

二、创建Observer

Observer即观察者,可以在未来某个时刻响应Observable的通知,它可以在不同的线程中执行任务。

三、使用subscribe进行订阅

创建了Observable和Observer之后,使用subscribe将他们链接起来,使得整个上下游能衔接起来实现链式调用。

四、取消订阅 Disposable

Observable.subscribe()方法会返回一个Disposable对象,可以用它来取消订阅

CompositeDisposable 复合订阅,可以add很多Disposable然后一起取消

五种Observable(被观察者)

类型

描述

Observable

能够发射0或n个数据,并以成功或错误事件终止。

Flowable

能够发射0或n个数据,并以成功或错误事件终止。支持背压,可以控制数据源发射的速度。

Single

只发射单个数据或错误事件。

Completable

从来不发射数据,只处理 onComplete 和 onError事件。 可以看成Rx的Runnbale。

Maybe

能够发射0或者1个数据,要么成功,要么失败。有点类似Optional。

Observable 和 Observer

Cold Observable

定义:

只有观察者订阅了,才开始执行发射数据的代码。并且Cold Observable和Observer只能是一对一的关系。当有多个不同的订阅者时,消息是重新完整发送的,也就是说,对于Cold Observeable来说,有多个Observer存在的时候,各自的事件是独立的。

多个订阅的sunbscribe(或者说观察者)事件各自独立。

1. 对应操作符:

just,create,range,fromXXX

Hot Observable

定义:

Hot Observable 无论有没有观察者订阅,事件始终都会发生。当Hot Observable有多个订阅者时,Hot Observable与订阅者们的关系是一对多的关系。可以与多个订阅者共享信息。

多个订阅的sunbscribe(或者说观察者)共享同一事件。

对应操作符:

..

冷流 to 热流

ConnectableObservable
(需要调用publish,connect)

Subject / Processor(支持背压)
Subject

Subject类型

功能描述

AsyncSubject

接受onComplete之前的最后一个数据

BehaviorSubject

接收到订阅前的最后一条数据和订阅后的所有数据。

PublishSubject

接收到订阅之后的所有数据

ReplaySubject

接收到所有的数据,包括订阅之前的所有数据和订阅之后的所有数据。

Processor

支持背压!

Processor和Subject用法一样,只是Processor支持背压。
它也包含4中类型:AsyncProcessor, BehaviorProcessor,ReplayProcessor,PublishProcessor。
用法同Subject一样。

热流 to 冷流

reCount操作符
share操作符

Flowable(背压) 和 Subscriber

Rxjava2.0,Observable不再支持背压,改由Flowable来支持背压

背压

在RxJava中,会遇到被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息,这就是典型的背压(Back Pressure)场景。

BackPressure经常被翻译为背压,背压的字面意思比较晦涩,难以理解。它是指在异步场景下,被观察者发送事件速度远快于观察者处理的速度,从而导致下游的buffer溢出,这种现象叫做背压。

背压只在异步场景出现,即被观察者和观察者处于不同的线程中。

RxJava2 背压_rxjava 背压_xiaopangcame的博客-CSDN博客

Flowable.create<Int>({for (num in 0..127) {it.onNext(num)}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe {LogUtil.d("subscribe : " + it)}

Flowable的异步缓存池默认大小128

背压策略BackpressureStrategy

说明

MISSING

没有指定策略,不会对通过OnNext发送的数据做处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLastest())指定背压策略

ERROR

如果放入Flowable的异步缓存池里的数据超限了。则会抛出MissingBackPressureException异常

BUFFER

表示Flowable的异步缓存池同Observable的一样,没有固定大小,不会抛出MissingBackPressureException,但是会OOM

DROP

如果Flowable的异步缓存池满了,就丢弃

LASTEST

和DROP一样,如果缓存池满了,就会丢掉将要放入缓存池中的数据

但是无论如何,会将最后一条数据强行放入缓存池中(此时总数变为缓存池大小+1)

Single 和 SingleObserver

Completable 和 CompletableObserver

Maybe 和 MaybeObserver

转换器 - Transformer

Transformer能够将一个 Observable/Flowable/SIngle/Completable/Maybe 对象转换为另一个 Observable/Flowable/SIngle/Completable/Maybe对象,与调用一系列的内联操作符一摸一样。

与compose操作符结合使用

示例1. 将发射Int的Observable转换为发射字符串

  Observable.just(1, 2, 3, 4, 5, 6).compose { upstream ->upstream.map { integer ->integer.toString()}}.subscribe { s -> print(s) }

示例2.封装切换线程操作

object RxJavaUtils {//封装切换线程操作fun <T> observableToMain(): ObservableTransformer<T, T> {return ObservableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}}@SuppressLint("CheckResult")
fun main() {Observable.just(1, 2, 3, 4, 5, 6).compose(RxJavaUtils.observableToMain()).subscribe { s -> print(s) }}

操作符

创建操作符

创建操作符

功能

just

from

create

defer

range

interval

timer

empty

error

never

线程操作符

subscribeOn

subscribeOn()指示对数据的操作运行在特定的线程调度器Scheduler上

subscribeOn()指定工作执行所在的线程池,它的位置无关紧要,它可以在流的任何位置,

如果流中有多个实例subscribeOn,则只有第一个具有实际效果。

observeOn()

指定下游操作运行在特定的线程调度器Scheduler上

变换|过滤操作符

变换操作符

描述

map

对原始Observable发射的每一项数据应用一个函数,执行变换操作

flatMap

将发射源变为多个Observable,然后将他们组合后放进一个单独的Observable

和map的区别:

这两个在本质上是一样的,都是 map 操作,即对流形式的传入数据进行处理返回一个数据。但是区别方面从字面上就可以体现出来,flatMap 比 map 多了一个 flat 操作,也就是 “展平/扁平化” 处理的意思。

所以 flatMap 是一个 map 和一个 flat 操作的组合。其首先将一个函数应用于元素,然后将其展平,当你需要将 [[a,b,c],[d,e,f],[x,y,z]] 具有两个级别的数据结构转换为 [a,b,c,d,e,f,x,y,z] 这样单层的数据结构时,就选择使用 flatMap 处理。如果是 [a,b,c,d,e,f,x,y,z] 转换为大写 [A,B,C,D,E,F,X,Y,Z] 这样单层转换,就使用 map 即可。

switchMap

scan

groupBy

将一个Observable拆分为一些Observables集合

比如区分1~9中哪些是奇数哪些是偶数

buffer

将数据缓存并发出

打印0~10,buffer 2 :变成了1-2,3-4.。。五个数组

window

将Observable的数据分解为多个Observable窗口发出

cast

过滤操作符

描述

distinct

过滤掉重复的数据项,只允许还没发射过的数据项通过

1,2,3,3,4,4,5,5->

1,2,3,4,5

distinctUntilChanged

与distinct的区别是,只判断一个数据和它的直接前驱是否不同

1,2,3,3,5->

1,2,5

filter

按规则过滤

takeLast

last

....太多

条件|布尔操作符

条件操作符

说明

amb

给定多个Observable,只让第一个发射数据的Observable发射全部数据

defaultIfEmpty

发射来自原始Observable的数据,如果原始Observable没有发射数据,则发射一个默认数据

skipUntil

丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的数据

skipWhile

丢弃原始Observable的数据,直到一个特定的条件为假,然后再发射原始Observable的数据

takeUntil

发射原始Observable的数据,直到第二个Observable发射了一个数据

takeWhile

发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据

布尔操作符

说明

all

判断是否所有的数据项都满足条件

contains

判断Observable是否会发射一个指定的值

exists 和 isEmpty

判断Observable是否发射了一个值

sequenceEqual

判断两个Observable发射的序列是否相等

合并操作符|连接操作符

合并操作符

说明

startWith

在数据序列的开头增加一项数据

merge

将多个Observable合并为一个

1,2,3 merge 4,5,6 ->

1,2,3,4,5,6

mergeDelayError

将多个Observable合并为一个,让没有错误的Observable完成后再发射错误

zip

使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果

(需要原始的Observable中每一个都发射了数据时触发)

1,2,3 zip 4,5,6 ->

5,7,9

combineLast

当两个Observable中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果

(原始Observable中任意一个发射了数据时就触发)

如果A发送了,B没发送咋办?

join 和 groupJoin

无论何时,如果一个Observable发射了一个数据项,就需要在另一个Observable发射的数据项定义的时间窗口内,将两个Observable合并发射

1,2,3 join 4,5,6->

1:4

1:5

1:6

2:4

2:5

2:6

3:4

3:5

3:6

switchOnnext

将一个发射Observable的Observable转换成另一个Observable,后者发射这些Observable最近发送的数据

连接操作符

说明

ConnectableObservable.connect()

用来触发ConnectableObservable发送数据

Observable.publish()

将一个Observable转换为一个可连接的Observable(ConnectableObservable)

Observable.replay()

确保所有的订阅者看到相同的数据序列,即使他们在Observable开始发射数据之后才订阅

有点像ReplaySubject?

replay 之后会被包装成ConnectableObservable

ConnectableObservable的线程切换只能通过replay操作实现,普通的subscribeOn和observerOn在ConnectableObservable中不起作用。

replay可以通过指定线程方式来切换线程。

ConnectableObservable.refCount()

让一个可连接的Observable表现得像一个普通的Observable

其他操作符

do

可以给Observable的生命周期各个阶段加上一系列回调监听。doOnNext, doAfterNext, doOnComplete,都是void函数。

compose

与转换器结合使用

线程调度

角色

描述

Scheduler

线程任务调度器

Worker

线程任务的具体执行者

三.实战使用


 

四.注意事项:

behaviorSubject.hide 方法,转化为Observable,防止被从外部更改状态,只暴露状态的修改结果给外部

Observable.hide的用法_observable hide_Flying Rookie的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/193.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第二次CCF计算机软件能力认证

第一题&#xff1a;相邻数对 给定 n 个不同的整数&#xff0c;问这些数中有多少对整数&#xff0c;它们的值正好相差 1。 输出格式 输入的第一行包含一个整数 n&#xff0c;表示给定整数的个数。 第二行包含所给定的 n 个整数。 输出格式 输出一个整数&#xff0c;表示值正好相…

KMP算法

KMP KMP 算法是一个快速查找匹配串的算法&#xff0c;它的作用其实就是本题问题&#xff1a;如何快速在「原字符串」中找到「匹配字符串」。 而 KMP 算法的复杂度为 O(mn)实际上是O(N),因为O(M)不可能大于O(N) KMP 之所以能够在 O(mn)复杂度内完成查找&#xff0c;是因为其能…

uniapp:tabBar配置后不显示的问题

问题描述 uniapp的tabBar配置后不显示 问题解决 tabBar.list 数组的第一项必须和pages配置中的第一项要相同 参考文章 uniapp tabBar配置后不显示&#xff0c;无效的问题

巴斯夫与长三角物理研究中心开展合作,专注固态和钠离子电池领域

“巴斯夫&#xff0c;全球知名化学公司&#xff0c;宣布与长三角物理研究中心合作&#xff0c;在江苏溧阳市成立联合研究中心&#xff0c;专注于固态电池和钠离子电池的科研。” 根据巴斯夫官方微博消息&#xff0c;新成立的研究中心名为“巴斯夫–长三角物理研究中心新能源汽车…

高德地图的使用

JS API 结合 Vue 使用 高德地图 jsapi 下载、引入 npm add amap/amap-jsapi-loaderimport AMapLoader from amap/amap-jsapi-loader 使用2.0版本的loader需要在window对象下先配置 securityJsCode JS API 安全密钥使用 JS API 使用 script 标签同步加载增加代理服务器设置…

【计算机网络】网络编程套接字(二)

文章目录 网络编程套接字&#xff08;二&#xff09;简单TCP服务器实现创建套接字服务器绑定服务器监听服务器接收连接服务器处理请求 简单TCP客户端实现创建套接字客户端发起连接客户端发起请求 服务器简单测试服务器简单测评多进程版TCP服务器捕捉SIGCHLD信号孙子进程提供服务…

【RuoYi-Cloud-Plus】学习笔记 09 - Sentinel(四)熔断降级知识整理

文章目录 前言参考目录版本说明学习笔记1、包结构2、DegradeSlot3、DegradeRule4、DegradeRuleManager5、CircuitBreaker5.1 CircuitBreaker.State6、AbstractCircuitBreaker6.1、AbstractCircuitBreaker#fromCloseToOpen6.2、AbstractCircuitBreaker#fromHalfOpenToOpen6.3、A…

【Android知识笔记】系统进程(二)

AMS ActivityManagerService(以下简称AMS) 主要负责四大组件的启动、切换、调度以及应用进程的管理和调度工作。所有的APP应用都需要与AMS打交道,ActivityManager的组成主要分为以下几个部分: 服务代理:由ActivityManagerProxy实现,用于与Server端提供的系统服务进行进程…

支付宝接入

支付宝接入 python-alipay-sdk pycryptodome一、电脑网站支付 1.1 获取支付宝密钥 沙箱网址 1.APPID 2.应用私钥 3.支付宝公钥1.2 存放密钥 在与 settings.py 的同级目录下创建 pem 文件夹pem 文件夹下创建 app_private_key.pem 和 alipay_public_key.pem app_private_key…

神经网络初谈

文章目录 简介神经网络的发展历程神经网络的初生神经网络的第一次折戟神经网络的新生&#xff0c;Hinton携BP算法登上历史舞台命途多舛&#xff0c;神经网络的第二次寒冬神经网络的重生&#xff0c;黄袍加身&#xff0c;一步封神神经网络的未来&#xff0c;众说纷纭其他时间点 …

STM32 Proteus仿真LCD12864俄罗斯方块-FZ0063

STM32 Proteus仿真LCD12864俄罗斯方块-FZ0063 Proteus仿真小实验&#xff1a; STM32 Proteus仿真LCD12864俄罗斯方块-FZ0063 功能&#xff1a; 硬件组成&#xff1a;STM32F103R6单片机 LCD12864显示器多个按键 1.标准俄罗斯方块经典游戏玩法&#xff0c;带计时&#xff0c…

Kong 服务和路由的添加

管理服务 这里参考DB-less-Mode&#xff0c;因为使用的是yaml配置文件的形式&#xff0c;所以所有的相关配置只需要往初始化的kong.yml文件中添加就可以了&#xff0c;就像nginx的配置文件 DB-less-Mode 创建服务 vim /etc/kong/kong.yml services: - name: my-service #…

MySQL---表数据高效率查询(简述)

目录 前言 一、聚合查询 &#x1f496;聚合函数 &#x1f496;GROUP BY子句 &#x1f496;HAVING 二、联合查询 &#x1f496;内连接 &#x1f496;外连接 &#x1f496;自连接 &#x1f496;子查询 &#x1f496;合并查询 &#x1f381;博主介绍&#xff1a;博客名…

Idea 修改默认 Maven 为自己的

每次我们打开新项目时,都要去配置一遍 maven,很麻烦,其实可以去修改 idea 里面默认的 maven 配置,这样后面不管是打开新项目还是老项目,就都是用的自己的 maven 了. 1.文件->新项目设置->新项目的设置 File->Other Settings -> Settings for New Project 2.然后和…

git下载源码及环境搭建之数据库(二)

学习目标&#xff1a; 数据库 新项目使用 数据库文件 的配置 及相关属性的设置 步骤&#xff1a; 数据库 下图所示为开发时所用数据库 第一步&#xff1a;新建一个数据库 注意&#xff1a; 字符集与排序规则我们应该选择utf-8 相关 选中新创建的表&#xff0c;点击备份—还…

MySQL单表查询练习题

目录 第一题 第二题 第三题 第一题 1.创建数据表pet&#xff0c;并对表进行插入、更新与删除操作&#xff0c;pet表结构如表8.3所示。 (1&#xff09;首先创建数据表pet&#xff0c;使用不同的方法将表8.4中的记录插入到pet表中。 mysql> create table pet( name varchar(…

centos7.9php8swoole5swoft2环境安装遇到确实redis扩展的解决办法

1、环境介绍 运行系统&#xff1a;centos7.9 php版本&#xff1a;php8.0.29 swoole版本&#xff1a;swoole5 swoft版本&#xff1a;swoft2.02、遇到的问题 The requested PHP extension ext-redis * is missing from your system. Install or enable PHPs redis extension。这…

python爬虫哪个库用的最多

目录 常用的python爬虫库有哪些 1. Requests&#xff1a; 2. BeautifulSoup&#xff1a; 3. Scrapy&#xff1a; 4. Selenium&#xff1a; 5. Scrapy-Redis&#xff1a; 哪个爬虫库用的最多 Scrapy示例代码 总结 常用的python爬虫库有哪些 Python拥有许多常用的爬虫库…

Java反射与“整活--(IOC容器)”

文章目录 前言反射什么是反射基本操作获取类对象获取类属性获取类方法方法的执行对构造方法的操作 注解定义获取注解 整活&#xff08;IOC容器&#xff09;项目结构IOC/DI流程ApplicationContextBeanDefinitionReaderBeanDefinitionBeanWrappergetBean&#xff08;&#xff09;…

C(一致性) A(可用性) P(分区容错性)中一致性和可用性的理解 和 BASE

怎么理解&#xff1a;一致性和可用性不可兼得呢? 现在有3个节点node1、node2、node3,其中node3因为网络原因暂时不可用了&#xff0c;但是&#xff0c;依然有些请求已经到达了, node1和node2数据是同步的,node3节点虽然存活&#xff0c;但是因为网络原因&#xff0c;并没有同…