RxJava学习记录

文章目录

  • 1. 总览
    • 1.1 基本原理
    • 1.2 导入包和依赖
  • 2. 操作符
    • 2.1 创建操作符
    • 2.2 转换操作符
    • 2.3 组合操作符
    • 2.4 功能操作符

1. 总览

1.1 基本原理

在这里插入图片描述
参考文献
在这里插入图片描述
构建流:每一步操作都会生成一个新的Observable节点(没错,包括ObserveOn和SubscribeOn线程变换操作),并将新生成的Observable返回,直到最后一步执行subscribe方法。编写Rxjava代码的过程其实就是构建一个一个Observable节点的过程
订阅流:从最后一个N5节点的订阅行为开始,依次执行前面各个节点真正的订阅方法。在每个节点的订阅方法中,都会生成一个新的Observer**,这个Observer会包含“下游”的Observer,这样当每个节点都执行完订阅(subscribeActual)后,也就生成了一串Observer,它们通过downstream,upstream引用连接
回调流: 当订阅流执行到最后,也就是第一个节点N0时,用onNext方法,两个作用,一个是把上个节点返回的数据进行一次map变换,另一个就是将map后的结果传递给下游。
小结:先从上到下把各个变换的Observable连成链(拼装流水线),然后在最后subscribe的时候,又从下到上通过每个Observable的OnSubscribe从最下的Subscriber对象开始连成链(流水线开始工作包装Subscriber),直到顶端,当顶端的Subscriber对象调用了onNext方法的时候,又从上往下调用Subscriber链的onNext(用户一层层拆开包装盒),里面执行了每个操作的变换逻辑。

1.2 导入包和依赖

implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

2. 操作符

添加链接描述

2.1 创建操作符

  • Create
    private void test1() {//被观察者Observable;观察者Observer/消费者consumer;通过subsribe订阅Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {emitter.onNext("1");
//                emitter.onError(new Throwable("异常模拟"));emitter.onComplete();}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("subscribe");}@Overridepublic void onNext(Object o) {System.out.println("onNext Observer " + o);}@Overridepublic void onError(Throwable e) {System.out.println("erro");}@Overridepublic void onComplete() {System.out.println("Complete Observer....");}});}private void test2() {Disposable d = Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {emitter.onNext("2");emitter.onError(new Throwable("模拟异常"));emitter.onComplete();}}).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("Accept " + o);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("Accept " + throwable);}});}

Observer:
适合需要完整事件处理的场景,包括处理数据、错误和完成信号。
提供了更灵活的事件处理能力,可以根据需求实现对错误和完成事件的响应。
Consumer:
适合简单的场景,只需处理每个发出的数据项,而不需要关心错误或完成事件。
简化了代码结构,特别是在处理简单流时,使用起来更为便捷和直观。

  • 其他
    just 10个发射源
    from 将一个Iterable、一个Future、 或者一个数组,内部通过代理的方式转换成一个Observable
    interval操作符 创建一个按固定时间间隔发射整数序列的Observable,这个序列为一个无限递增的整数序列
    range操作符 发射一个范围内的有序整数序列,并且我们可以指定范围的起始和长度
    repeat操作符 重复发射原始Observable的数据序列,这个序列或者是无限的,或者通过repeat(n)指定重复次数

2.2 转换操作符

map
将源Observable发送的数据转换为一个新的Observable对象

    private void test3(){Observable.just("111").map(new Function<String, Object>() {@Overridepublic Object apply(String s) throws Exception {return "my name is " + s;}}).subscribe(ob);}//subscribe
//onNext Observer my name is 111
//Complete Observer....

flatmap
添加链接描述
在这里插入图片描述
将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里(但是是无序的)

    private void test4(){Disposable ob = Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);}}).flatMap(new Function<Integer, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(Integer o) throws Exception {final List<String> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add("I am value " + o);}return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);//为了无序 加了延迟}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String o) throws Exception {System.out.println(o);}});}//出现的 1 2 3会随机出现

concatMap
concatMap操作符类似于flatMap操作符,不同的一点是它按次序连接。

2.3 组合操作符

concat
concatArray
合并多个对象,按照一定的顺序
在这里插入图片描述
merge
在这里插入图片描述

2.4 功能操作符

SubscribeOn 改变调用它之前代码的线程,只有第一次有效
ObserveOn 改变调用它之后代码的线程, 可以多次调用

        Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {Log.d(TAG,"加了subscribeOn和observeOn: " + Thread.currentThread().getName());emitter.onNext("1111");emitter.onNext("22222");emitter.onComplete();}}).subscribeOn(Schedulers.newThread()) //1 进行创建和发射在子线程.observeOn(AndroidSchedulers.mainThread())// 2 在主线程消费;由于程序是test里面执行,所以不是main线程;后续改成了main是一样的道理.subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG,"onSubscribe " + Thread.currentThread().getName());}@Overridepublic void onNext(Object o) {Log.d(TAG,"onNext " + Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {Log.d(TAG,"onError " + Thread.currentThread().getName());}@Overridepublic void onComplete() {Log.d(TAG,"onComplete " + Thread.currentThread().getName());}});}

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
这一个onSubsribe 一直是在测试线程里


1. **Observable 的创建和订阅**:- 在 `subscribe()` 方法中,你创建了一个 `Observer` 对象,并将其订阅到了 `Observable` 对象上。2. **onSubscribe 方法执行**:- 当 `subscribe()` 方法被调用后,`Observer` 对象的 `onSubscribe` 方法会立即执行。这是因为 `onSubscribe` 是 `Observer` 接口的一部分,它负责接收 `Disposable` 对象,表示订阅关系,而不是响应数据流本身。3. **异步操作执行**:- 然后,`Observable` 中的异步操作开始执行。在你的例子中,通过 `Observable.create()` 创建了一个新的数据流,该数据流会在新线程(通过 `subscribeOn(Schedulers.newThread())` 指定的线程)中执行。这意味着 `Observable.create()` 中的代码块会在新线程中运行,而不会阻塞主线程。4. **数据流发射和消费**:- 在新线程中,`ObservableEmitter` 会发射数据项(通过 `emitter.onNext()` 发送数据)并在合适的时机调用 `onComplete()` 或者 `onError()`,表示数据流的结束。5. **observeOn 切换到主线程**:- 通过 `observeOn(AndroidSchedulers.mainThread())`,确保在数据流中的消费者部分(即 `Observer` 的 `onNext()`, `onError()`, `onComplete()` 方法)在主线程中执行。这个切换保证了在主线程更新UI或处理数据,从而避免了在主线程中执行耗时操作而导致的UI阻塞问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/42172.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

asp.netWebForm(.netFramework) CSRF漏洞

asp.netWebForm(.netFramework) CSRF漏洞 CSRF&#xff08;Cross-Site Request Forgery&#xff09;跨站请求伪造是一种常见的 Web 应用程序安全漏 洞&#xff0c;攻击者通过诱使已认证用户在受信任的网站上执行恶意操作&#xff0c;从而利用用户的身份 执行未经授权的操作。攻…

echarts实现3D饼图

先看下最终效果 实现思路 使用echarts-gl的曲面图&#xff08;surface&#xff09;类型 通过parametric绘制曲面参数实现3D效果 代码实现 <template><div id"surfacePie"></div> </template> <script setup>import {onMounted} fro…

简单的找到自己需要的flutter ui 模板

简单的找到自己需要的flutter ui 模板 网站 https://flutterawesome.com/ 简介 我原本以为会很难用 实际上不错 很简单 打开后界面类似于,右上角可以搜索 点击view github 相当简单 很oks

RabbitMq,通过prefetchCount限制消费并发数

1.问题:项目瓶颈,通过rabbitMq来异步上传图片,由于并发上传的图片过多导致阿里OSS异常, 解决方法:通过prefetchCount限制图片上传OSS的并发数量 2.定义消费者 Component AllArgsConstructor Slf4j public class ReceiveFaceImageEvent {private final UPloadService uploadSe…

【见刊通知】MVIPIT 2023机器视觉、图像处理与影像技术国际会议

MVIPIT 2023&#xff1a;https://ieeexplore.ieee.org/xpl/conhome/10578343/proceeding 入库Ei数据库需等20-50天左右 第二届会议征稿启动&#xff08;MVIPIT 2024&#xff09; The 2nd International Conference on Machine Vision, Image Processing & Imaging Techn…

MacOS和Windows中怎么安装Redis

希望文章能给到你启发和灵感&#xff5e; 如果觉得文章对你有帮助的话&#xff0c;点赞 关注 收藏 支持一下博主吧&#xff5e; 阅读指南 开篇说明一、基础环境说明1.1 硬件环境1.2 软件环境 二、MacOS中Redis的安装2.1 HomeBrew 安装&#xff08;推荐&#xff09;2.2 通过官方…

70.WEB渗透测试-信息收集- WAF、框架组件识别(10)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a; 易锦网校会员专享课 上一个内容&#xff1a;69.WEB渗透测试-信息收集- WAF、框架组件识别&#xff08;9&#xff09; 关于waf相应的识…

arcgis js 4.x实现类似openalayers加载tilewms图层效果

一、普通wms与tilewms区别 相同点&#xff1a;都是加载WMS服务。 不同点&#xff1a;TitleWMS会把当前可视窗口根据网格&#xff08;开发者可以在调用OpenLayers api的时候自定义&#xff09;切分&#xff0c;一片一片地返回回来&#xff0c;在前端进行整合。而ImageWMS则不会…

Springboot 配置 log4j 时的注意事项

感谢博主 https://www.cnblogs.com/fishlittle/p/17950944 依赖 SpringBoot 的 starter 自带的是 logback 日志&#xff0c;若要使用 log4j2 日志&#xff0c;需要引入对应依赖。logback 日志和 log4j2 日志都是对 slf4j 门面的实现&#xff0c;只能存在一个&#xff0c;且必…

江协科技51单片机学习- p25 无源蜂鸣器

&#x1f680;write in front&#x1f680; &#x1f50e;大家好&#xff0c;我是黄桃罐头&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流 &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd;​…

环信IM实现小米、oppo推送详细步骤

本文教大家集成环信IM后如何实现小米、oppo推送。 一、小米推送 步骤一、在小米开放平台创建应用。 在 小米开放平台 创建应用&#xff0c;开启推送服务。详见小米官方网站的 推送服务接入指南。 步骤二、上传推送证书。 注册完成后&#xff0c;需要在环信即时通讯云控制台…

WebSocket 双向通信

WebSocket 是一种在前端开发中用于实现双向通信的网络技术。它与传统的 HTTP 请求-响应模式不同&#xff0c;允许客户端和服务器之间实时、双向的数据传输。 1. 实时性 能够实现数据的即时推送和接收&#xff0c;无需轮询服务器&#xff0c;大大降低了延迟。 2. 双向通信 客…

LeetCode-刷题记录-前缀和合集(本篇blog会持续更新哦~)

一、前缀和&#xff08;Prefix Sum&#xff09;算法概述 前缀和算法通过预先计算数组的累加和&#xff0c;可以在常数时间内回答多个区间和相关的查询问题&#xff0c;是解决子数组和问题中的重要工具。 它的基本思想是通过预先计算和存储数组的前缀和&#xff0c;可以在 O(1)…

初步理解六__《面向互联网大数据的威胁情报 并行挖掘技术研究 》

初步理解 六 STIX 提出了一种标准化的网络威胁情报格式(Structured Threat Information eXpression, STIX) gtp STIX&#xff08;Structured Threat Information eXpression&#xff09;是一种用于标准化描述和共享网络威胁情报的格式和语言。它的设计目标是提供一个通用的…

7.8作业

一、思维导图 二、 1】按值修改 2】按值查找&#xff0c;返回当前节点的地址 &#xff08;先不考虑重复&#xff0c;如果有重复&#xff0c;返回第一个&#xff09; 3】反转 4】销毁链表 //按值修改 int value_change(linklistptr H,datatype e,int value) {if(HNULL||empty(H…

Greenplum(二)【SQL】

前言 Greenplum 的剩余部分主要其实主要就是 DDL 和之前学的 MySQL 不大一样&#xff0c;毕竟 Greenplum 是基于 PostgreSQL 数据库的&#xff0c;不过那些 DML 和 MySQL、Hive 基本上大差不差&#xff0c;所以就没有必要浪费时间了。 1、DDL 1.1、库操作 1.1.1、创建数据库…

python爬虫加入进度条

安装tqdm和requests库 pip install tqdm -i https://pypi.tuna.tsinghua.edu.cn/simplepip install requests -i https://pypi.tuna.tsinghua.edu.cn/simple带进度条下载 import time # 引入time模块&#xff0c;用于处理时间相关的功能 from tqdm import * # 从tqdm包中…

算法力扣刷题 三十六【二叉树迭代遍历】

前言 记录三十五 介绍了二叉树基础&#xff0c;和递归法模版及遍历方式&#xff1b; 递归&#xff1a;代码简单&#xff0c;但要想清楚三步&#xff1a; 确定参数和返回值&#xff1b;确定终止条件&#xff0c;并return什么&#xff1f;&#xff1b;终止条件外的逻辑&#xf…

【AI大模型】赋能儿童安全:楼层与室内定位实践与未来发展

文章目录 引言第一章&#xff1a;AI与室内定位技术1.1 AI技术概述1.2 室内定位技术概述1.3 楼层定位的挑战与解决方案 第二章&#xff1a;儿童定位与安全监控的需求2.1 儿童安全问题的现状2.2 智能穿戴设备的兴起 第三章&#xff1a;技术实现细节3.1 硬件设计与选择传感器选择与…

SpringSecurity中文文档(Servlet Authorization Architecture )

Authorization 在确定了用户将如何进行身份验证之后&#xff0c;还需要配置应用程序的授权规则。 Spring Security 中的高级授权功能是其受欢迎的最有说服力的原因之一。无论您选择如何进行身份验证(无论是使用 Spring Security 提供的机制和提供者&#xff0c;还是与容器或其…