【Rxjava详解】(七)线程调度原理

rxjava最终章

// 创建一个被观察者,在后台线程执行网络请求Observable<String> observable = Observable.just("Network Response").subscribeOn(Schedulers.io()).doOnNext(result -> {// 模拟网络请求的耗时操作try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Network request executed on: " + Thread.currentThread().getName());});// 创建一个观察者,在新线程更新UIObserver<String> observer = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("Observer subscribed on: " + Thread.currentThread().getName());}@Overridepublic void onNext(String result) {System.out.println("Observer received result: " + result + " on: " + Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {System.out.println("Observer received error: " + e.getMessage());}@Overridepublic void onComplete() {System.out.println("Observer completed on: " + Thread.currentThread().getName());}};// 订阅观察者observable.observeOn(Schedulers.newThread()).subscribe(observer);

输出结果:

Observer subscribed on: main
Network request executed on: RxCachedThreadScheduler-1
Observer received result: Network Response on: RxNewThreadScheduler-1
Observer completed on: RxNewThreadScheduler-1

subscribeOn源码分析

直接看源码吧

public final Observable<T> subscribeOn(Scheduler scheduler) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");// 创建一个新的 Observable 对象,该对象会在指定的 Scheduler 上执行订阅操作return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}// ObservableSubscribeOn 类实现了 ObservableOperator 接口,用于将订阅操作切换到指定的 Scheduler 上执行
static final class ObservableSubscribeOn<T> extends Observable<T> implements ObservableConverter<T, Observable<T>> {final ObservableSource<T> source;final Scheduler scheduler;ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {this.source = source;this.scheduler = scheduler;}@Overridepublic Observable<T> apply(Observable<T> upstream) {return new ObservableSubscribeOn<>(upstream, scheduler);}@Overrideprotected void subscribeActual(Observer<? super T> observer) {// 创建一个 SubscribeOnObserver 对象,用于在指定的 Scheduler 上执行订阅操作SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);// 将 SubscribeOnObserver 对象传递给指定的 Scheduler,以便在 Scheduler 上执行订阅操作scheduler.scheduleDirect(new SubscribeTask(parent));}// SubscribeTask 实现了 Runnable 接口,用于在指定的 Scheduler 上执行订阅任务final class SubscribeTask implements Runnable {private final SubscribeOnObserver<T> parent;SubscribeTask(SubscribeOnObserver<T> parent) {this.parent = parent;}@Overridepublic void run() {// 订阅操作的核心方法,调用 source.subscribe() 方法,执行实际的订阅操作source.subscribe(parent);}}// SubscribeOnObserver 实现了 Observer 接口,用于将订阅事件转发给实际的观察者static final class SubscribeOnObserver<T> implements Observer<T>, Disposable {final Observer<? super T> downstream;volatile boolean disposed;Disposable upstream;SubscribeOnObserver(Observer<? super T> downstream) {this.downstream = downstream;}@Overridepublic void onSubscribe(Disposable d) {if (DisposableHelper.validate(this.upstream, d)) {this.upstream = d;downstream.onSubscribe(this);}}@Overridepublic void onNext(T value) {downstream.onNext(value);}@Overridepublic void onError(Throwable e) {downstream.onError(e);}@Overridepublic void onComplete() {downstream.onComplete();}@Overridepublic void dispose() {disposed = true;upstream.dispose();}@Overridepublic boolean isDisposed() {return disposed;}}
}

subscribeOn 操作符返回一个新的 Observable 对象 ObservableSubscribeOn,该对象包装了原始的 Observable 对象和指定的 Scheduler

当调用 subscribe 方法时,会创建一个 SubscribeOnObserver 对象,并将其传递给指定的 SchedulerscheduleDirect 方法。SubscribeOnObserver 实现了 Observer 接口,用于将订阅事件转发给实际的观察者。

SubscribeTaskrun 方法中,调用 source.subscribe(parent) 方法执行实际的订阅操作。这样,订阅操作就会在指定的 Scheduler 上执行,从而实现了切换订阅操作的线程。

总结:subscribeOn 操作符通过创建一个新的 Observable 对象,并在指定的 Scheduler 上执行订阅操作,从而实现了切换订阅操作线程

observeOn源码分析

高能预警长文来袭!

public final Observable<T> observeOn(Scheduler scheduler) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");// 创建一个新的 Observable 对象,该对象会在指定的 Scheduler 上执行观察操作return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler));
}// ObservableObserveOn 类实现了 ObservableOperator 接口,用于将观察操作切换到指定的 Scheduler 上执行
static final class ObservableObserveOn<T> extends Observable<T> implements ObservableConverter<T, Observable<T>> {final ObservableSource<T> source;final Scheduler scheduler;ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler) {this.source = source;this.scheduler = scheduler;}@Overridepublic Observable<T> apply(Observable<T> upstream) {return new ObservableObserveOn<>(upstream, scheduler);}@Overrideprotected void subscribeActual(Observer<? super T> observer) {// 创建一个 ObserveOnObserver 对象,用于在指定的 Scheduler 上执行观察操作Observer<? super T> target = scheduler.createWorker().schedule(new ObserveOnObserver<>(observer, scheduler), 0, TimeUnit.MILLISECONDS);// 调用 source.subscribe() 方法,将 ObserveOnObserver 对象传递给原始的 Observable 对象,执行实际的观察操作source.subscribe(target);}// ObserveOnObserver 实现了 Observer 接口,用于将观察事件转发给实际的观察者static final class ObserveOnObserver<T> implements Observer<T>, Disposable {final Observer<? super T> downstream;final Scheduler scheduler;Disposable upstream;// 用于存放观察事件的队列final SimpleQueue<T> queue;// 用于标识观察者是否已经完成volatile boolean done;// 用于标识观察者是否已经取消订阅volatile boolean disposed;Throwable error;ObserveOnObserver(Observer<? super T> downstream, Scheduler scheduler) {this.downstream = downstream;this.scheduler = scheduler;// 使用 SpscLinkedArrayQueue 创建一个无界队列this.queue = new SpscLinkedArrayQueue<>(bufferSize());}@Overridepublic void onSubscribe(Disposable d) {if (DisposableHelper.validate(this.upstream, d)) {this.upstream = d;downstream.onSubscribe(this);}}@Overridepublic void onNext(T value) {if (done) {return;}// 将观察事件放入队列queue.offer(value);// 调度队列中的观察事件进行消费schedule();}@Overridepublic void onError(Throwable e) {if (done) {RxJavaPlugins.onError(e);return;}error = e;done = true;// 调度队列中的观察事件进行消费schedule();}@Overridepublic void onComplete() {if (done) {return;}done = true;// 调度队列中的观察事件进行消费schedule();}@Overridepublic void dispose() {disposed = true;upstream.dispose();scheduler.dispose();}@Overridepublic boolean isDisposed() {return disposed;}// 调度队列中的观察事件进行消费void schedule() {// 如果已经在调度中,则直接返回if (getAndIncrement() != 0) {return;}// 循环消费队列中的观察事件for (;;) {if (disposed) {return;}// 从队列中取出观察事件T v;try {v = queue.poll();} catch (Throwable e) {Exceptions.throwIfFatal(e);disposed = true;upstream.dispose();onError(e);scheduler.dispose();return;}// 如果观察事件为空,表示队列已经消费完毕if (v == null) {// 检查是否已经完成观察if (done) {Throwable ex = error;if (ex != null) {downstream.onError(ex);} else {downstream.onComplete();}scheduler.dispose();}return;}// 将观察事件发送给实际的观察者downstream.onNext(v);// 如果队列中还有更多的观察事件,则继续消费}}}
}

subscribeActual 方法中,创建一个 ObserveOnObserver 对象,并使用指定的 SchedulercreateWorker 方法创建一个 Worker。然后,调用 schedule 方法将 ObserveOnObserver 对象传递给 Worker,以便在指定的 Scheduler 上执行观察操作。最后,调用 source.subscribe 方法,将 ObserveOnObserver 对象传递给原始的 Observable 对象,执行实际的观察操作。

ObserveOnObserver 中,使用 SpscLinkedArrayQueue 创建一个无界队列 queue,用于存放观察事件。当收到观察事件时,将其放入队列,并调用 schedule 方法进行消费。schedule 方法首先判断当前是否已经在调度中,如果是,则直接返回;否则,循环从队列中取出观察事件,并发送给实际的观察者。如果队列中已经没有观察事件,则检查是否已经完成观察,如果是,则发送 onErroronComplete 事件给实际的观察者

总结:observeOn 操作符通过创建一个新的 Observable 对象,并在指定的 Scheduler 上执行观察操作,同时,使用无界队列缓存观察事件,通过循环消费队列中的观察事件,实现异步观察

Schedulers类

Schedulers 类是用于提供不同类型的调度器(Scheduler)的工具类,前面的文章提到过他的几个相关方法和参数的使用。

public final class Schedulers {// 静态内部类,用于实现调度器的工厂方法static final class Factory {final AtomicReference<Scheduler> computationScheduler = new AtomicReference<>();final AtomicReference<Scheduler> ioScheduler = new AtomicReference<>();final AtomicReference<Scheduler> newThreadScheduler = new AtomicReference<>();final AtomicReference<Scheduler> singleScheduler = new AtomicReference<>();Scheduler createComputationScheduler() {// 创建计算调度器,如果已经存在则直接返回,否则创建新的计算调度器for (;;) {Scheduler current = computationScheduler.get();if (current != null) {return current;}// 创建新的计算调度器Scheduler newInstance = createComputationScheduler0();if (computationScheduler.compareAndSet(null, newInstance)) {return newInstance;}}}Scheduler createComputationScheduler0() {// 创建计算调度器return new ComputationScheduler();}Scheduler createIoScheduler() {// 创建 IO 调度器,如果已经存在则直接返回,否则创建新的 IO 调度器for (;;) {Scheduler current = ioScheduler.get();if (current != null) {return current;}// 创建新的 IO 调度器Scheduler newInstance = createIoScheduler0();if (ioScheduler.compareAndSet(null, newInstance)) {return newInstance;}}}Scheduler createIoScheduler0() {// 创建 IO 调度器return new IoScheduler();}Scheduler createNewThreadScheduler() {// 创建新线程调度器,如果已经存在则直接返回,否则创建新的新线程调度器for (;;) {Scheduler current = newThreadScheduler.get();if (current != null) {return current;}// 创建新的新线程调度器Scheduler newInstance = createNewThreadScheduler0();if (newThreadScheduler.compareAndSet(null, newInstance)) {return newInstance;}}}Scheduler createNewThreadScheduler0() {// 创建新线程调度器return new NewThreadScheduler();}Scheduler createSingleScheduler() {// 创建单线程调度器,如果已经存在则直接返回,否则创建新的单线程调度器for (;;) {Scheduler current = singleScheduler.get();if (current != null) {return current;}// 创建新的单线程调度器Scheduler newInstance = createSingleScheduler0();if (singleScheduler.compareAndSet(null, newInstance)) {return newInstance;}}}Scheduler createSingleScheduler0() {// 创建单线程调度器return new SingleScheduler();}}// 创建调度器工厂private static final Schedulers.Factory DEFAULT_SCHEDULER_FACTORY = new Schedulers.Factory();// 私有构造函数,防止实例化private Schedulers() {throw new IllegalStateException("No instances!");}// 获取计算调度器public static Scheduler computation() {return DEFAULT_SCHEDULER_FACTORY.createComputationScheduler();}// 获取 IO 调度器public static Scheduler io() {return DEFAULT_SCHEDULER_FACTORY.createIoScheduler();}// 获取新线程调度器public static Scheduler newThread() {return DEFAULT_SCHEDULER_FACTORY.createNewThreadScheduler();}// 获取单线程调度器public static Scheduler single() {return DEFAULT_SCHEDULER_FACTORY.createSingleScheduler();}// 获取当前线程调度器public static Scheduler trampoline() {return TrampolineScheduler.instance();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/169447.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【教学类-06-08】20231125(55格版)X-Y之间“减法-题”(以10-20之间为例)(必须X>Y,题目少)

图片展示 需求&#xff1a; 20以内减法&#xff0c;不需要再练习其中10以内部分&#xff0c;改为10-20以内的减法&#xff0c;X-Y大于10&#xff0c;小于20的所有减法题。 代码展示&#xff1a; “-”减法 X-Y 之间的所有减法-题&#xff08;如10-20之间的所有减法&#xff0…

TDA笔记:夏克林老师,南洋理工大学

TDA比传统的统计方法有优势&#xff1a;benchmark中展现了这种优势 laplacian矩阵 多种单纯复形构造方式&#xff0c;可以构造出不同表征 二部图&#xff1a;Dowker complex Tor algebra可以用到多大数据 目前较新

Python基础教程之循环结构详解,循环结构逻辑解析。

文章目录 前言一、While循环二、While…else…循环三、for循环四、for…else…循环五、循环体结束语句六、嵌套循环关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③…

NX二次开发UF_CURVE_ask_curve_turn_angle 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_CURVE_ask_curve_turn_angle Defined in: uf_curve.h int UF_CURVE_ask_curve_turn_angle(tag_t curve, double orientation [ 3 ] , double * angle ) overview 概述 Returns …

[架构之路-250]:目标系统 - 设计方法 - 软件工程 - 需求工程 - 需求开发:如何用图形表达需求,面向对象需求分析OOA与UML视图

目录 一、面向对象需求分析 1.1 面向对象的基本概念 1.2 什么是面向对象的需求分析 2.3 什么是UML图 2.4 UML视图 2.4 UML图与UML视图的关系 2.5 UML图与面向对象需求分析的关系 二、需求分析相关的UML图形与视图&#xff1a;14视图 2.1 用例模型与用例图&#xff1a;…

Unity优化——脚本优化策略1

Hello&#xff0c;大家好&#xff0c;这里是七七&#xff0c;今天来给大家介绍的是Unity脚本中的一些优化策略。 目录 一、最快方法获取组件 二、移除空的回调定义 三、缓存组件引用 四、共享计算输出 五、Update、Coroutines和InvokeRepeating 一、最快方法获取组件 Ge…

面试题:工作中做过 JVM 调优吗?怎么做的?

文章目录 前言cpu占用过高死锁内存泄漏上面只是其中一种处理方法 总结 前言 最近很多小伙伴跟我说&#xff0c;自己学了不少JVM的调优知识&#xff0c;但是在实际工作中却不知道何时对JVM进行调优。今天&#xff0c;我就为大家介绍几种JVM调优的场景。 在阅读本文时&#xff…

github使用token认证

向github提交代码时报错&#xff1a;Support for password authentication was removed on August 13, 2021. Please use a personal access token instead。大概意思就是&#xff0c;原先的密码凭证从2021年8月13日开始就不能用了&#xff0c;后续必须使用个人访问令牌&#x…

死磕Nacos系列:Nacos在我的SpringCloud项目中做了什么?

Nacos服务注册 我们一个SpringCloud项目中集成了Nacos&#xff0c;当项目启动成功后&#xff0c;就可以在Nacos管理界面上看到我们项目的注册信息&#xff0c;还可以看到项目的健康状态等等信息&#xff1a; 那Nacos是什么时候进行了哪些操作的呢&#xff1f;今天我们来一探究…

【Web安全】sql注入绕过技法

sql注入绕过技法 1. 注释符号绕过 原理&#xff1a;SQL注释符号&#xff08;如--, /* */&#xff09;可以用来忽略查询的一部分&#xff0c;特别是在注入点之后的部分。这对于绕过需要闭合的查询或移除查询余下部分的情况特别有用。 -- 注释内容 # 注释内容 /*注释内容*/ ;2…

redis运维(二十一)redis 的扩展应用 lua(三)

一 redis 的扩展应用 lua redis加载lua脚本文件 ① 调试lua脚本 redis-cli 通过管道 --pipe 快速导入数据到redis中 ② 预加载方式 1、错误方式 2、正确方式 "案例讲解" ③ 一次性加载 执行命令&#xff1a; redis-cli -a 密码 --eval Lua脚本路径 key …

希尔伯特变换-matlab仿真

希尔伯特变换&#xff08;hilbert transform&#xff09;简介 在信号处理中我们常见的有傅里叶变换&#xff0c;用来分析频域信息&#xff0c;还有拉普拉斯变换和z变换&#xff0c;用于系统分析系统响应。短时傅里叶分析和小波分析用于时频分析。希尔伯特变换似乎听到的比较少…

jQuery_04 jQuery选择器应用

jQuery中的选择器 1.基本选择器 1.1 id $("#id值") id名称 1.2 class $(".class值") class名称 1.3 标签选择器 $("标签名字") 标签名称 1.4 所有选择器 $("*") 所有标签 1.5 组合选择器 …

Selenium技巧大揭秘:动态数据、分页和Cookie的获取利器

背景&#xff1a; ​ 昨天我们讲了讲关于seleium的一些基础操作&#xff0c;今天讲讲如何将seleium和爬虫结合起来&#xff0c;可以使用selenium获取网页的动态加载数据&#xff0c;可以使用selenium获得cookie&#xff0c;这两个是比较常用的。我将一一展开。 实战案例&…

基于浣熊算法优化概率神经网络PNN的分类预测 - 附代码

基于浣熊算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于浣熊算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于浣熊优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神经网络的光滑…

网络运维与网络安全 学习笔记2023.11.24

网络运维与网络安全 学习笔记 第二十五天 今日目标 DHCP中继代理、三层交换机DHCP、子网划分的原理、子网划分的应用 项目需求分析、技术方案选型、网络拓扑绘制 基础交换网络设计、内网优化、连接外网服务器 DHCP中继代理 DHCP中继概述 场景&#xff1a; DHCP客户端与DH…

Java LCR 089 打家劫舍

题目链接&#xff1a;打家劫舍 定义一个数组 dp&#xff0c;其中 dp[i] 表示从第 0 间房子到第 i 间房子&#xff08;包括第 i 间&#xff09;能够偷窃到的最高金额。 对于第 i 间房子有两种选择&#xff0c;偷或不偷&#xff1a; 偷就不能偷第 i - 1 间房子&#xff1a; dp[i]…

中职网安-Linux操作系统渗透测-Server2130(环境加qq)

B-9:Linux操作系统渗透测 任务环境说明:  服务器场景:Server2130  服务器场景操作系统:Linux(关闭链接) 1.通过本地PC中渗透测试平台Kali对靶机场景进行系统服务及版本扫描渗透测试,并将该操作显示结果中Apache服务对应的版本信息字符串作为Flag值提交; 2.…

中间件渗透测试-Server2131(解析+环境)

B-10&#xff1a;中间件渗透测试 需要环境的加qq 任务环境说明&#xff1a; 服务器场景&#xff1a;Server2131&#xff08;关闭链接&#xff09; 服务器场景操作系统&#xff1a;Linux Flag值格式&#xff1a;Flag&#xff5b;Xxxx123&#xff5d;&#xff0c;括…

【Netty专题】Netty调优及网络编程中一些问题补充(面向面试学习)

目录 前言阅读对象阅读导航笔记正文一、如何选择序列化框架1.1 基本介绍1.2 在网络编程中如何选择序列化框架1.3 常用Java序列化框架比较 二、Netty调优2.1 CONNECT_TIMEOUT_MILLIS&#xff1a;客户端连接时间2.2 SO_BACKLOG&#xff1a;最大同时连接数2.3 TCP_NODELAY&#xf…