RxJava 操作符的原理

今天再一次看Rxjava的几个操作符时发现对于操作符到底做了什么事不是很清楚,使用just,create等操作符创建一个Observable,和使用filter、map等操作符对Observable发送的数据进行转换有什么区别和联系?filter和map这样的操作符最终是如何对数据进行处理的?带着这几个问题再次对对应的源码进行查看并做以下记录。

Just、create等 创建操作符:

首先看下just操作符的源码,做了什么:

Observable.just("","").subscribe({/// 观察者对象代码...
})
   public static <T> Observable<T> just(T item1, T item2) {ObjectHelper.requireNonNull(item1, "The first item is null");ObjectHelper.requireNonNull(item2, "The second item is null");return fromArray(item1, item2);}
 public static <T> Observable<T> fromArray(T... items) {ObjectHelper.requireNonNull(items, "items is null");if (items.length == 0) {return empty();} elseif (items.length == 1) {return just(items[0]);}return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));}

just操作符调用了一个多参数的just方法,传入创建参数,然后调用fromArray方法处理,

最终调用RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items))组装了一个对象返回。

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {Function<? super Observable, ? extends Observable> f = onObservableAssembly;if (f != null) {return apply(f, source);}return source;}

onAssembly() 方法在满足内部字段不为null的时候会调用一次apply方法处理,否则则直接把创建传入的Observable返回。最后在调用Observable的订阅方法subscribe的时候执行实现类的subscribeActual方法逻辑。

实现类ObservableFromArray来处理传入的数组类型数据。

//继承了 Observable 抽象类
public final class ObservableFromArray<T> extends Observable<T> {final T[] array;public ObservableFromArray(T[] array) {this.array = array;}@Overridepublic void subscribeActual(Observer<? super T> s) {// 内部 通过 FromArrayDisposable来处理数据FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);s.onSubscribe(d);if (d.fusionMode) {return;}// 调用数据处理的方法d.run();}static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {//订阅的时候传入的观察者对象final Observer<? super T> actual;final T[] array;int index;boolean fusionMode;volatile boolean disposed;FromArrayDisposable(Observer<? super T> actual, T[] array) {this.actual = actual;this.array = array;}@Overridepublic int requestFusion(int mode) {if ((mode & SYNC) != 0) {fusionMode = true;return SYNC;}return NONE;}@Nullable@Overridepublic T poll() {int i = index;T[] a = array;if (i != a.length) {index = i + 1;return ObjectHelper.requireNonNull(a[i], "The array element is null");}return null;}@Overridepublic boolean isEmpty() {return index == array.length;}@Overridepublic void clear() {index = array.length;}@Overridepublic void dispose() {disposed = true;}@Overridepublic boolean isDisposed() {return disposed;}void run() {T[] a = array;int n = a.length;// 遍历数组,不断的发送传入的数组中的元素调用onNext方法for (int i = 0; i < n && !isDisposed(); i++) {T value = a[i];if (value == null) {actual.onError(new NullPointerException("The " + i + "th element is null"));return;}actual.onNext(value);}if (!isDisposed()) {actual.onComplete();}}}
}

ObservableFromArray 类中的 subscribeActual方法,调用FromArrayDisposable进行处理。

在看下调用subscribe方法订阅的时候做了什么:

 //public abstract class Observable<T> 类中的方法 subscribepublic final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");// 调用了 subscribeActual 把观察者传入,subscribeActual这个方法就在操作符的// 实现类中实现,比如just操作符的实现类 ObservableFromArraysubscribeActual(observer);} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}

可以看到我们订阅的时候调用了 Observable中的 方法subscribe,里边调用了其抽象方法subscribeActual(observer);传入了观察者对象。而 subscribeActual方法的具体实现就在各个操作符的实现类中。比如上面说的just操作符的实现类ObservableFromArray,实现了just的具体逻辑。

take 、filter、map等转换操作符是如何实现的

// take和filter操作符,是如何实现的?
sub.take(1)
sub.filter { 1==1 }
public final Observable<T> take(long count) {if (count < 0) {throw new IllegalArgumentException("count >= 0 required but it was " + count);}//take 操作符最后创建了 ObservableTake对象return RxJavaPlugins.onAssembly(new ObservableTake<T>(this, count));}
public final Observable<T> filter(Predicate<? super T> predicate) {ObjectHelper.requireNonNull(predicate, "predicate is null");// filter 操作符,创建了ObservableFilter对象return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));}

以上两个操作符最后都使用了RxJavaPlugins这个类的onAssembly方法进行组装。各自创建了一个实现类的对象。

// ObservableTake类
public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {final long limit;public ObservableTake(ObservableSource<T> source, long limit) {super(source);this.limit = limit;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {source.subscribe(new TakeObserver<T>(observer, limit));}static final class TakeObserver<T> implements Observer<T>, Disposable {final Observer<? super T> actual;boolean done;Disposable subscription;long remaining;TakeObserver(Observer<? super T> actual, long limit) {this.actual = actual;this.remaining = limit;}@Overridepublic void onSubscribe(Disposable s) {if (DisposableHelper.validate(this.subscription, s)) {subscription = s;if (remaining == 0) {done = true;s.dispose();EmptyDisposable.complete(actual);} else {actual.onSubscribe(this);}}}@Overridepublic void onNext(T t) {// 处理take的参数,不为0的时候就onNext方法中就自减1,否则才调用onNext传数据,if (!done && remaining-- > 0) {boolean stop = remaining == 0;actual.onNext(t);if (stop) {onComplete();}}}@Overridepublic void onError(Throwable t) {if (done) {RxJavaPlugins.onError(t);return;}done = true;subscription.dispose();actual.onError(t);}@Overridepublic void onComplete() {if (!done) {done = true;subscription.dispose();actual.onComplete();}}@Overridepublic void dispose() {subscription.dispose();}@Overridepublic boolean isDisposed() {return subscription.isDisposed();}}
}

可以看到ObservableTake处理了take操作符参数的逻辑,在onNext中判断参数是否是0,不为0则不断的自减,直到0才调用onNext方法发送数据。

//ObservableFilter filter操作符实现
public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {final Predicate<? super T> predicate;public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {super(source);this.predicate = predicate;}@Overridepublic void subscribeActual(Observer<? super T> s) {source.subscribe(new FilterObserver<T>(s, predicate));}static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {final Predicate<? super T> filter;FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {super(actual);this.filter = filter;}@Overridepublic void onNext(T t) {if (sourceMode == NONE) {boolean b;try {//调用filter操作符的方法获取一个boolean的值                    b = filter.test(t);} catch (Throwable e) {fail(e);return;}if (b) {// 满足条件才调用 onNext方法actual.onNext(t);}} else {actual.onNext(null);}}@Overridepublic int requestFusion(int mode) {return transitiveBoundaryFusion(mode);}@Nullable@Overridepublic T poll() throws Exception {for (;;) {T v = qs.poll();if (v == null || filter.test(v)) {return v;}}}}
}

同样的filter操作符的实现中,通过获取filter的值,进行了判断,满足条件的才调用onNext发送数据。

如果我们对一个数据调用了多个操作符处理:

// 连续调用 just 创建,然后调用take和map,最后使用subscribe订阅
Observable.just(1,5,6,7,8,10,).filter { it%2 == 0 }.map {println("map == : $it")it * 2}.subscribe {println("subscribe -result== : $it")}

从上面的分析实际上可以知道:每个操作符都返回了一个Observable对象:

由上一个操作符返回传给下一个操作符继续调用下一个操作符的实现逻辑。每个满足的数据都是一个个的进行处理,最终交给订阅者接收。

所以,本质上操作符就是我们对原始创建的Observable对象做的一次转换,每个操作符的实现都是Observable抽象类的一个子类,中间通过对操作符逻辑实现转换,调用观察者对象的onNext方法发送数据,满足操作符的条件就发送,否则做对应的处理(过滤、转换、取值等)。

 参考文献

RxJava(10-操作符原理&自定义操作符)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/707358.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Python】Code2flow学习笔记

1 Code2flow介绍 Code2flow是一个代码可视化工具库&#xff0c;旨在帮助开发人员更好地理解和分析代码&#xff1a; 可以将Python代码转换为流程图&#xff0c;以直观的方式展示代码的执行流程和逻辑结构。具有简单易用、高度可定制化和美观的特点&#xff0c;适用于各种代码…

人工智能与网络安全

目录 概述 人工智能在网络安全中的应用 威胁检测 自动化响应

Groovy(第九节) Groovy 之单元测试

JUnit 利用 Java 对 Song 类进行单元测试 默认情况下 Groovy 编译的类属性是私有的,所以不能直接在 Java 中访问它们,必须像下面这样使用 setter: 编写这个测试用例余下的代码就是小菜一碟了。测试用例很好地演示了这样一点:用 Groovy 所做的一切都可以轻易地在 Java 程序…

MySQL中的 left join 使用场景介绍及注意事项

left join 1. 使用场景 在MySQL中多表联查有多种方式&#xff0c;使用left join 一般是想保留某张表的数据完整&#xff0c;也就是说在查询结果中指定表行数不会随查询条件改变。 2. 语法 select &#xff08;所需要的列&#xff09; from 表1 left join 表2 on 表1.关联…

算法--动态规划(线性DP、区间DP)

这里写目录标题 tip数组下标从0开始还是从1开始 线性DP数学三角形介绍算法思想例题代码 最长上升子序列介绍算法思想例题代码 最长公共子序列介绍算法思想例题代码 编辑距离介绍例题代码 区间DP问题石子合并介绍算法思想例题代码 tip 数组下标从0开始还是从1开始 如果代码中涉…

Opencv实战(3)详解霍夫变换

霍夫变换 Opencv实战系列指路前文&#xff1a; Opencv(1)读取与图像操作 Opencv(2)绘图与图像操作 文章目录 霍夫变换1.霍夫线变换1.1 原理1.2 HoughLines() 2.霍夫圆变换2.1 原理2.2 HoughCircles() 最基本的霍夫变换是从黑白图像中检测直线(线段) 霍夫变换(Hough Transform…

【vue】什么是虚拟Dom,怎么实现虚拟DOM,虚拟DOM一定更快吗

什么是虚拟Dom 虚拟 DOM 基于虚拟节点 VNode&#xff0c;VNode 本质上是一个对象&#xff0c;VDOM 就是VNode 组成的 废话&#xff0c;js 中所有的东西都是对象 虚拟DOM 为什么快&#xff0c;做了哪些优化 批量更新 多个DOM合并更新减少浏览器的重排和重绘局部更新 通过新VDO…

【PHP设计模式08】装饰模式

【装饰模式】 装饰模式,又称装饰器模式 或 装饰者模式 或 油漆工模式,通过创建一个“装饰对象”,在不改变原有类和使用继承的情况下,动态地扩展一个对象的功能,比直接生成子类继承更加灵活,可以通过多个不同的具体装饰类,创建多个不同的行为组合。 结构: 抽象构件…

Spring中的ApplicationContext.publishEvent

简单理解 其实就是监听处理。比如找工作平台上&#xff0c;雇主 employer 发布自己的雇佣条件&#xff0c;目的是平台中有符合条件的求职者时&#xff0c;及时向雇主推荐。求职者发布简历&#xff0c;当平台发现某个求职者比较符合条件&#xff0c;就触发被动&#xff0c;推荐…

selenium元素等待及滚动条滚动

selenium三大等待&#xff0c;sleep&#xff08;强制&#xff09;、implicitlyWait&#xff08;隐式等待&#xff09;、WebDriverWait&#xff08;显式等待&#xff09;&#xff0c;主要记一下最后面的WebDriverWait。 WebDriverWait是三大等待中最常用也是最好用的一种等待方…

docker 容器修改端口和目录映射

一、容器修改端口映射 一般在运行容器时&#xff0c;我们都会通过参数 -p&#xff08;使用大写的-P参数则会随机选择宿主机的一个端口进行映射&#xff09;来指定宿主机和容器端口的映射&#xff0c;例如 docker run -it -d --name [container-name] -p 8088:80 [image-name]…

vue3的echarts从后端获取数据,用于绘制图表

场景需求&#xff1a;后端采用flask通过pymysql从数据库获取数据&#xff0c;并返回给前端。前端vue3利用axios获取数据并运用到echarts绘制图表。 第一步&#xff0c;vue中引入echarts 首先vue下载echarts npm install echarts 然后在main.js文件写如下代码 import {create…

东芝工控机维修东芝电脑PC机维修FA3100A

TOSHIBA东芝工控机维修电脑控制器PC机FA3100A MODEL8000 UF8A11M 日本东芝TOSHIBA IA controller维修SYU7209A 001 FXMC12/FXMC11;BV86R-T2GKR-DR7YF-8CPPY-4T3QD; CPU处理单元是可编程逻辑控制器的控制部分。它按照可编程逻辑控制器系统程序赋予的功能接收并存储从编程器键入…

C++知识点总结(22):模拟算法真题 ★★★☆☆《安全警报》

安全警报 1. 审题 题目描述 Z市最大的金融公司&#xff1a;太平洋金融遭到了入侵&#xff0c;一名黑客潜入到了公司中&#xff0c;公司紧急启动安保程序&#xff0c;将大楼封锁&#xff0c;并安排作为安全主管的你对楼层进行搜查。所以你准备写一个程序&#xff0c;输入搜查楼…

基于 LVGL 使用 SquareLine Studio 快速设计 UI 界面

目录 简介注册与软件获取工程配置设计 UI导出源码板级验证更多内容 简介 SquareLine Studio 是一款专业的 UI 设计软件&#xff0c;它与 LVGL&#xff08;Light and Versatile Graphics Library&#xff0c;轻量级通用图形库&#xff09;紧密集成。LVGL 是一个轻量化的、开源的…

K8S之Deployment的介绍和使用

Deployment的理论和实操 Deployment控制器&#xff1a;概念、原理解读概述工作原理 编写Deployment资源清单文件使用案例&#xff1a;创建一个web站点Deployment管理pod&#xff1a;扩容、缩容通过deployment管理应用&#xff0c;实现扩容&#xff0c;把副本数变成3通过deploym…

135 Linux 系统编程12,linux命令重定向,dup 和dup2,fcntl实现dup和dup2 ,进程和程序概念,虚拟内存和物理内存映射关系,pcb进程块详解

一 linux 命令中重定向&#xff0c;使用>实现 通过 大于号 将前面的内容写入到某一个地方 cat main.c > b.txt //cat main 本身的意思是 显示main.c的值&#xff0c;后面加上 > b.txt 会将显示在屏幕上的字符全部写到b.txt中 例子&#xff1a;将 ls -l 的内容 通…

JavaScript最新实现城市级联操作,json格式的数据

前置知识&#xff1a; <button onclick"doSelect()">操作下拉列表</button><hr>学历&#xff1a;<select id"degree"><option value"0">--请选择学历--</option><option value"1">专科<…

配置前端项目到 github-pages

Quickstart for GitHub Pages - GitHub Docs

【Day59】代码随想录之动态规划_647回文子串_516最长回文子序列

文章目录 动态规划理论基础动规五部曲&#xff1a;出现结果不正确&#xff1a; 1. 647回文子串2. 516最长回文子序列 动态规划理论基础 动规五部曲&#xff1a; 确定dp数组 下标及dp[i] 的含义。递推公式&#xff1a;比如斐波那契数列 dp[i] dp[i-1] dp[i-2]。初始化dp数组…