rxjava最终章
// 创建一个被观察者,在后台线程执行网络请求Observable<String> observable = Observable.just("Network Response").subscribeOn(Schedulers.io()).doOnNext(result -> {// 模拟网络请求的耗时操作try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Network request executed on: " + Thread.currentThread().getName());});// 创建一个观察者,在新线程更新UIObserver<String> observer = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("Observer subscribed on: " + Thread.currentThread().getName());}@Overridepublic void onNext(String result) {System.out.println("Observer received result: " + result + " on: " + Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {System.out.println("Observer received error: " + e.getMessage());}@Overridepublic void onComplete() {System.out.println("Observer completed on: " + Thread.currentThread().getName());}};// 订阅观察者observable.observeOn(Schedulers.newThread()).subscribe(observer);
输出结果:
Observer subscribed on: main
Network request executed on: RxCachedThreadScheduler-1
Observer received result: Network Response on: RxNewThreadScheduler-1
Observer completed on: RxNewThreadScheduler-1
subscribeOn源码分析
直接看源码吧
public final Observable<T> subscribeOn(Scheduler scheduler) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");// 创建一个新的 Observable 对象,该对象会在指定的 Scheduler 上执行订阅操作return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}// ObservableSubscribeOn 类实现了 ObservableOperator 接口,用于将订阅操作切换到指定的 Scheduler 上执行
static final class ObservableSubscribeOn<T> extends Observable<T> implements ObservableConverter<T, Observable<T>> {final ObservableSource<T> source;final Scheduler scheduler;ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {this.source = source;this.scheduler = scheduler;}@Overridepublic Observable<T> apply(Observable<T> upstream) {return new ObservableSubscribeOn<>(upstream, scheduler);}@Overrideprotected void subscribeActual(Observer<? super T> observer) {// 创建一个 SubscribeOnObserver 对象,用于在指定的 Scheduler 上执行订阅操作SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);// 将 SubscribeOnObserver 对象传递给指定的 Scheduler,以便在 Scheduler 上执行订阅操作scheduler.scheduleDirect(new SubscribeTask(parent));}// SubscribeTask 实现了 Runnable 接口,用于在指定的 Scheduler 上执行订阅任务final class SubscribeTask implements Runnable {private final SubscribeOnObserver<T> parent;SubscribeTask(SubscribeOnObserver<T> parent) {this.parent = parent;}@Overridepublic void run() {// 订阅操作的核心方法,调用 source.subscribe() 方法,执行实际的订阅操作source.subscribe(parent);}}// SubscribeOnObserver 实现了 Observer 接口,用于将订阅事件转发给实际的观察者static final class SubscribeOnObserver<T> implements Observer<T>, Disposable {final Observer<? super T> downstream;volatile boolean disposed;Disposable upstream;SubscribeOnObserver(Observer<? super T> downstream) {this.downstream = downstream;}@Overridepublic void onSubscribe(Disposable d) {if (DisposableHelper.validate(this.upstream, d)) {this.upstream = d;downstream.onSubscribe(this);}}@Overridepublic void onNext(T value) {downstream.onNext(value);}@Overridepublic void onError(Throwable e) {downstream.onError(e);}@Overridepublic void onComplete() {downstream.onComplete();}@Overridepublic void dispose() {disposed = true;upstream.dispose();}@Overridepublic boolean isDisposed() {return disposed;}}
}
subscribeOn
操作符返回一个新的Observable
对象ObservableSubscribeOn
,该对象包装了原始的Observable
对象和指定的Scheduler
。
当调用 subscribe
方法时,会创建一个 SubscribeOnObserver
对象,并将其传递给指定的 Scheduler
的 scheduleDirect
方法。SubscribeOnObserver
实现了 Observer
接口,用于将订阅事件转发给实际的观察者。
在 SubscribeTask
的 run
方法中,调用 source.subscribe(parent)
方法执行实际的订阅操作。这样,订阅操作就会在指定的 Scheduler
上执行,从而实现了切换订阅操作的线程。
总结:subscribeOn
操作符通过创建一个新的 Observable
对象,并在指定的 Scheduler
上执行订阅操作,从而实现了切换订阅操作线程
observeOn源码分析
高能预警长文来袭!
public final Observable<T> observeOn(Scheduler scheduler) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");// 创建一个新的 Observable 对象,该对象会在指定的 Scheduler 上执行观察操作return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler));
}// ObservableObserveOn 类实现了 ObservableOperator 接口,用于将观察操作切换到指定的 Scheduler 上执行
static final class ObservableObserveOn<T> extends Observable<T> implements ObservableConverter<T, Observable<T>> {final ObservableSource<T> source;final Scheduler scheduler;ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler) {this.source = source;this.scheduler = scheduler;}@Overridepublic Observable<T> apply(Observable<T> upstream) {return new ObservableObserveOn<>(upstream, scheduler);}@Overrideprotected void subscribeActual(Observer<? super T> observer) {// 创建一个 ObserveOnObserver 对象,用于在指定的 Scheduler 上执行观察操作Observer<? super T> target = scheduler.createWorker().schedule(new ObserveOnObserver<>(observer, scheduler), 0, TimeUnit.MILLISECONDS);// 调用 source.subscribe() 方法,将 ObserveOnObserver 对象传递给原始的 Observable 对象,执行实际的观察操作source.subscribe(target);}// ObserveOnObserver 实现了 Observer 接口,用于将观察事件转发给实际的观察者static final class ObserveOnObserver<T> implements Observer<T>, Disposable {final Observer<? super T> downstream;final Scheduler scheduler;Disposable upstream;// 用于存放观察事件的队列final SimpleQueue<T> queue;// 用于标识观察者是否已经完成volatile boolean done;// 用于标识观察者是否已经取消订阅volatile boolean disposed;Throwable error;ObserveOnObserver(Observer<? super T> downstream, Scheduler scheduler) {this.downstream = downstream;this.scheduler = scheduler;// 使用 SpscLinkedArrayQueue 创建一个无界队列this.queue = new SpscLinkedArrayQueue<>(bufferSize());}@Overridepublic void onSubscribe(Disposable d) {if (DisposableHelper.validate(this.upstream, d)) {this.upstream = d;downstream.onSubscribe(this);}}@Overridepublic void onNext(T value) {if (done) {return;}// 将观察事件放入队列queue.offer(value);// 调度队列中的观察事件进行消费schedule();}@Overridepublic void onError(Throwable e) {if (done) {RxJavaPlugins.onError(e);return;}error = e;done = true;// 调度队列中的观察事件进行消费schedule();}@Overridepublic void onComplete() {if (done) {return;}done = true;// 调度队列中的观察事件进行消费schedule();}@Overridepublic void dispose() {disposed = true;upstream.dispose();scheduler.dispose();}@Overridepublic boolean isDisposed() {return disposed;}// 调度队列中的观察事件进行消费void schedule() {// 如果已经在调度中,则直接返回if (getAndIncrement() != 0) {return;}// 循环消费队列中的观察事件for (;;) {if (disposed) {return;}// 从队列中取出观察事件T v;try {v = queue.poll();} catch (Throwable e) {Exceptions.throwIfFatal(e);disposed = true;upstream.dispose();onError(e);scheduler.dispose();return;}// 如果观察事件为空,表示队列已经消费完毕if (v == null) {// 检查是否已经完成观察if (done) {Throwable ex = error;if (ex != null) {downstream.onError(ex);} else {downstream.onComplete();}scheduler.dispose();}return;}// 将观察事件发送给实际的观察者downstream.onNext(v);// 如果队列中还有更多的观察事件,则继续消费}}}
}
在
subscribeActual
方法中,创建一个ObserveOnObserver
对象,并使用指定的Scheduler
的createWorker
方法创建一个Worker
。然后,调用schedule
方法将ObserveOnObserver
对象传递给Worker
,以便在指定的Scheduler
上执行观察操作。最后,调用source.subscribe
方法,将ObserveOnObserver
对象传递给原始的Observable
对象,执行实际的观察操作。
在 ObserveOnObserver
中,使用 SpscLinkedArrayQueue
创建一个无界队列 queue
,用于存放观察事件。当收到观察事件时,将其放入队列,并调用 schedule
方法进行消费。schedule
方法首先判断当前是否已经在调度中,如果是,则直接返回;否则,循环从队列中取出观察事件,并发送给实际的观察者。如果队列中已经没有观察事件,则检查是否已经完成观察,如果是,则发送 onError
或 onComplete
事件给实际的观察者
总结:observeOn
操作符通过创建一个新的 Observable
对象,并在指定的 Scheduler
上执行观察操作,同时,使用无界队列缓存观察事件,通过循环消费队列中的观察事件,实现异步观察
Schedulers类
Schedulers
类是用于提供不同类型的调度器(Scheduler
)的工具类,前面的文章提到过他的几个相关方法和参数的使用。
public final class Schedulers {// 静态内部类,用于实现调度器的工厂方法static final class Factory {final AtomicReference<Scheduler> computationScheduler = new AtomicReference<>();final AtomicReference<Scheduler> ioScheduler = new AtomicReference<>();final AtomicReference<Scheduler> newThreadScheduler = new AtomicReference<>();final AtomicReference<Scheduler> singleScheduler = new AtomicReference<>();Scheduler createComputationScheduler() {// 创建计算调度器,如果已经存在则直接返回,否则创建新的计算调度器for (;;) {Scheduler current = computationScheduler.get();if (current != null) {return current;}// 创建新的计算调度器Scheduler newInstance = createComputationScheduler0();if (computationScheduler.compareAndSet(null, newInstance)) {return newInstance;}}}Scheduler createComputationScheduler0() {// 创建计算调度器return new ComputationScheduler();}Scheduler createIoScheduler() {// 创建 IO 调度器,如果已经存在则直接返回,否则创建新的 IO 调度器for (;;) {Scheduler current = ioScheduler.get();if (current != null) {return current;}// 创建新的 IO 调度器Scheduler newInstance = createIoScheduler0();if (ioScheduler.compareAndSet(null, newInstance)) {return newInstance;}}}Scheduler createIoScheduler0() {// 创建 IO 调度器return new IoScheduler();}Scheduler createNewThreadScheduler() {// 创建新线程调度器,如果已经存在则直接返回,否则创建新的新线程调度器for (;;) {Scheduler current = newThreadScheduler.get();if (current != null) {return current;}// 创建新的新线程调度器Scheduler newInstance = createNewThreadScheduler0();if (newThreadScheduler.compareAndSet(null, newInstance)) {return newInstance;}}}Scheduler createNewThreadScheduler0() {// 创建新线程调度器return new NewThreadScheduler();}Scheduler createSingleScheduler() {// 创建单线程调度器,如果已经存在则直接返回,否则创建新的单线程调度器for (;;) {Scheduler current = singleScheduler.get();if (current != null) {return current;}// 创建新的单线程调度器Scheduler newInstance = createSingleScheduler0();if (singleScheduler.compareAndSet(null, newInstance)) {return newInstance;}}}Scheduler createSingleScheduler0() {// 创建单线程调度器return new SingleScheduler();}}// 创建调度器工厂private static final Schedulers.Factory DEFAULT_SCHEDULER_FACTORY = new Schedulers.Factory();// 私有构造函数,防止实例化private Schedulers() {throw new IllegalStateException("No instances!");}// 获取计算调度器public static Scheduler computation() {return DEFAULT_SCHEDULER_FACTORY.createComputationScheduler();}// 获取 IO 调度器public static Scheduler io() {return DEFAULT_SCHEDULER_FACTORY.createIoScheduler();}// 获取新线程调度器public static Scheduler newThread() {return DEFAULT_SCHEDULER_FACTORY.createNewThreadScheduler();}// 获取单线程调度器public static Scheduler single() {return DEFAULT_SCHEDULER_FACTORY.createSingleScheduler();}// 获取当前线程调度器public static Scheduler trampoline() {return TrampolineScheduler.instance();}
}