本文以下面代码为例逐步解析
Observable.just("数据源").map(new Function<String, Integer>() {@Overridepublic Integer apply(String s) throws Exception {return 1;}}).filter(integer -> {return integer == 1;}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Object o) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}
从just开始
public static <T> Observable<T> just(T item) {ObjectHelper.requireNonNull(item, "item is null");return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));}
返回了一个将传入的参数封装成了一个 ObservableJust对象
其他的Rxjava创建操作符类似:比如create(), just(),fromArray(),fromIterable(),timer(),interval()等
ObservableJust类
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {private final T value;public ObservableJust(final T value) { //将传入的参数赋值给valuethis.value = value;}//重点方法 稍后看@Overrideprotected void subscribeActual(Observer<? super T> observer) {ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);observer.onSubscribe(sd);sd.run();}@Overridepublic T call() {return value;}
}
map方法
由于just方法返回了一个ObservableJust对象,所以调用链的map方法调用的ObservableJust对象的map方法
但是我们看到ObservableJust类中并没有map方法,所以去看他的父类Observable
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {ObjectHelper.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));}
在他的父类Observable中看到,map()依然是返回了一个ObservableMap对象,这个对象将当前对象(也就是上一步的ObservableJust对象)和map()传入的参数一起封装了起来 从上面的调用链来看就是这一段代码:
ObservableMap类
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {final Function<? super T, ? extends U> function;public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {super(source); //这里的source也就是上一步的ObservableJust对象this.function = function; //这里的function就是map就是map()传入的参数}//这个方法一样待会分析@Overridepublic void subscribeActual(Observer<? super U> t) {source.subscribe(new MapObserver<T, U>(t, function));}@Overridepublic void onNext(T t) {if (done) {return;}if (sourceMode != NONE) {downstream.onNext(null);return;}U v;try {v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {fail(ex);return;}downstream.onNext(v);}}这时候发现ObservableMap和上面的ObservableJust类一样,都实现了subscribeActual()
filter方法
接着继续分析调用链上的方法filter,一样我们去ObservableMap父类里去找这个方法,他的父类AbstractObservableWithUpstream里面没有这个方法,但是AbstractObservableWithUpstream跟ObservableJust一样继承自Observable
public final Observable<T> filter(Predicate<? super T> predicate) {ObjectHelper.requireNonNull(predicate, "predicate is null");return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));}看到没 filter和前两个方法还是一样的套路,返回了一个ObservableFilter对象,不出意外这个ObservableFilter里面肯定也有一个subscribeActual方法,并且也是直接或者间接继承自Observable
public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {final Predicate<? super T> predicate;public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {super(source); this.predicate = predicate;}@Overridepublic void subscribeActual(Observer<? super T> observer) {source.subscribe(new FilterObserver<T>(observer, predicate));}static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {final Predicate<? super T> filter;FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {super(actual);this.filter = filter;}@Overridepublic void onNext(T t) {if (sourceMode == NONE) {boolean b;try {b = filter.test(t);} catch (Throwable e) {fail(e);return;}if (b) {downstream.onNext(t);}} else {downstream.onNext(null);}}
}
一模一样的套路
subscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));}public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {final Scheduler scheduler;public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {super(source); this.scheduler = scheduler;}@Overridepublic void subscribeActual(final Observer<? super T> observer) {final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);observer.onSubscribe(parent);parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));}static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {private static final long serialVersionUID = 8094547886072529208L;final Observer<? super T> downstream;final AtomicReference<Disposable> upstream;SubscribeOnObserver(Observer<? super T> downstream) {this.downstream = downstream;this.upstream = new AtomicReference<Disposable>();}@Overridepublic void onSubscribe(Disposable d) {DisposableHelper.setOnce(this.upstream, d);}@Overridepublic void onNext(T t) {downstream.onNext(t);}@Overridepublic void onError(Throwable t) {downstream.onError(t);}@Overridepublic void onComplete() {downstream.onComplete();}@Overridepublic void dispose() {DisposableHelper.dispose(upstream);DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}void setDisposable(Disposable d) {DisposableHelper.setOnce(this, d);}}final class SubscribeTask implements Runnable {private final SubscribeOnObserver<T> parent;SubscribeTask(SubscribeOnObserver<T> parent) {this.parent = parent;}@Overridepublic void run() {source.subscribe(parent);}}
}
这个subscribeOn用于切换上游线程:
主要是这一句parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
scheduler就是我们传入的Schedulers.io(),上面代码可以看到SubscribeTask是一个Runnable,run()里调用的sourcesource.subscribe(parent),还记得source吗,source就是调用链上一步返回的对象,也就是上一步的
ObservableFilter;
去看看Schedulers.io()返回的是个什么类
public static Scheduler io() {return RxJavaPlugins.onIoScheduler(IO);}看到他返回的是一个Scheduler,去Scheduler中找scheduleDirect
@NonNullpublic Disposable scheduleDirect(@NonNull Runnable run) {return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);}public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {final Worker w = createWorker();final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);DisposeTask task = new DisposeTask(decoratedRun, w);w.schedule(task, delay, unit);return task;}继续往下追踪会发现最终将这个Runable经过各种封装,最后提交到一个线程池(ScheduledExecutorService)中去执行任务,这样就实现了SubscribeOn上游数据源代码的线程切换
至于下游代码线程切换来看ObserveOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");ObjectHelper.verifyPositive(bufferSize, "bufferSize");return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));}public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {final Scheduler scheduler;final boolean delayError;final int bufferSize;public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {super(source);this.scheduler = scheduler;this.delayError = delayError;this.bufferSize = bufferSize;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {if (scheduler instanceof TrampolineScheduler) {source.subscribe(observer);} else {Scheduler.Worker w = scheduler.createWorker();source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));}}
}
ObserveOnObserver类:
mplements Observer<T>, Runnable {private static final long serialVersionUID = 6576896619930983584L;final Observer<? super T> downstream;final Scheduler.Worker worker;final boolean delayError;final int bufferSize;SimpleQueue<T> queue;Disposable upstream;Throwable error;volatile boolean done;volatile boolean disposed;int sourceMode;boolean outputFused;ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {this.downstream = actual;this.worker = worker;this.delayError = delayError;this.bufferSize = bufferSize;}@Overridepublic void onSubscribe(Disposable d) {if (DisposableHelper.validate(this.upstream, d)) {this.upstream = d;if (d instanceof QueueDisposable) {@SuppressWarnings("unchecked")QueueDisposable<T> qd = (QueueDisposable<T>) d;int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);if (m == QueueDisposable.SYNC) {sourceMode = m;queue = qd;done = true;downstream.onSubscribe(this);schedule();return;}if (m == QueueDisposable.ASYNC) {sourceMode = m;queue = qd;downstream.onSubscribe(this);return;}}queue = new SpscLinkedArrayQueue<T>(bufferSize);downstream.onSubscribe(this);}}@Overridepublic void onNext(T t) {if (done) {return;}if (sourceMode != QueueDisposable.ASYNC) {queue.offer(t);}schedule();}@Overridepublic void onError(Throwable t) {if (done) {RxJavaPlugins.onError(t);return;}error = t;done = true;schedule();}@Overridepublic void onComplete() {if (done) {return;}done = true;schedule();}@Overridepublic void dispose() {if (!disposed) {disposed = true;upstream.dispose();worker.dispose();if (!outputFused && getAndIncrement() == 0) {queue.clear();}}}@Overridepublic boolean isDisposed() {return disposed;}void schedule() {if (getAndIncrement() == 0) {worker.schedule(this);}}void drainNormal() {int missed = 1;final SimpleQueue<T> q = queue;final Observer<? super T> a = downstream;for (;;) {if (checkTerminated(done, q.isEmpty(), a)) {return;}for (;;) {boolean d = done;T v;try {v = q.poll();} catch (Throwable ex) {Exceptions.throwIfFatal(ex);disposed = true;upstream.dispose();q.clear();a.onError(ex);worker.dispose();return;}boolean empty = v == null;if (checkTerminated(d, empty, a)) {return;}if (empty) {break;}a.onNext(v);}missed = addAndGet(-missed);if (missed == 0) {break;}}}void drainFused() {int missed = 1;for (;;) {if (disposed) {return;}boolean d = done;Throwable ex = error;if (!delayError && d && ex != null) {disposed = true;downstream.onError(error);worker.dispose();return;}downstream.onNext(null);if (d) {disposed = true;ex = error;if (ex != null) {downstream.onError(ex);} else {downstream.onComplete();}worker.dispose();return;}missed = addAndGet(-missed);if (missed == 0) {break;}}}@Overridepublic void run() {if (outputFused) {drainFused();} else {drainNormal();}}boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {if (disposed) {queue.clear();return true;}if (d) {Throwable e = error;if (delayError) {if (empty) {disposed = true;if (e != null) {a.onError(e);} else {a.onComplete();}worker.dispose();return true;}} else {if (e != null) {disposed = true;queue.clear();a.onError(e);worker.dispose();return true;} elseif (empty) {disposed = true;a.onComplete();worker.dispose();return true;}}}return false;}@Overridepublic int requestFusion(int mode) {if ((mode & ASYNC) != 0) {outputFused = true;return ASYNC;}return NONE;}@Nullable@Overridepublic T poll() throws Exception {return queue.poll();}@Overridepublic void clear() {queue.clear();}@Overridepublic boolean isEmpty() {return queue.isEmpty();}}
不想看代码直接总结,从ObserveOnObserver类中发现他的onSubscribe,onNext,onError,OnNext方法都调用了schedule(),追踪schedule()发现,最终同样是把任务交给了线程池处理,在本例子中由于传递的是AndroidSchedulers.mainThread(),所以下游是切换到主线程执行,这里是用了Handler将任务提交给主线程
final class HandlerScheduler extends Scheduler {private final Handler handler;private final boolean async;HandlerScheduler(Handler handler, boolean async) {this.handler = handler;this.async = async;}@Override@SuppressLint("NewApi") // Async will only be true when the API is available to call.public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {if (run == null) throw new NullPointerException("run == null");if (unit == null) throw new NullPointerException("unit == null");run = RxJavaPlugins.onSchedule(run);ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);Message message = Message.obtain(handler, scheduled);if (async) {message.setAsynchronous(true);}handler.sendMessageDelayed(message, unit.toMillis(delay));return scheduled;}
}
终于到了最后一步suscribe
这里调用的是Observable的subscribe
public final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");subscribeActual(observer);//重点看这里} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}
看到subscribeActual()方法没,原来subscribe()里会调用subscribeActual;
现在往回追溯:
在subscribe方法中会调用当前对象的subscribeActual(),所以往回追溯他首先会去调ObservableObserveOn的subscribeActual(),参数就是最终传入的Observer
回忆一下ObservableObserveOn的subscribeActual()
@Overrideprotected void subscribeActual(Observer<? super T> observer) {if (scheduler instanceof TrampolineScheduler) {source.subscribe(observer);} else {Scheduler.Worker w = scheduler.createWorker();source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));}}
继续将Observer封装成ObserveOnObserver,然后调用source.subcribe(),source还记得吧,就是调用链上一步的返回的对象,也就是ObservableSubscribeOn,这个类没有实现subscribe,但是他的父类有这个方法,那不就是Observable的subcribe()吗?是的,也就是跟调用链最后一步调用的subcribe()是同一个方法,只不过他的参数是基于下游的参数的进一步封装,那么同样我他会调用到susscribeActual()
@Overridepublic void subscribeActual(final Observer<? super T> observer) { //这里的Observer就是将下游封装后的Observer//将oberser继续封装final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);observer.onSubscribe(parent);//经过刚才的分析 这里是将任务交给线程池处理,所以去看SubscribeTask的run()parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));}// SubscribeTask的run()@Overridepublic void run() {source.subscribe(parent); //同样继续调用source.subscribe,那么他也是同意是调用到调用链上一步返回对象的subscribeActual(),,也就是ObservableFilter对象对象}
不出意外ObservableFilter对象里也是将Observer继续封装,然后调用source.subscribe
@Overridepublic void subscribeActual(Observer<? super T> observer) {source.subscribe(new FilterObserver<T>(observer, predicate));}
现在来到了第一步ObservableJust的subscribeActual():
@Overrideprotected void subscribeActual(Observer<? super T> observer) {ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); //将Observer和value进行封装,value就是我们第一步传入的数据源了observer.onSubscribe(sd);sd.run();}//ScalarDisposable的run方法@Overridepublic void run() {if (get() == START && compareAndSet(START, ON_NEXT)) {observer.onNext(value); //这里开始把数据源往下游传, value指数据源 observer就是下游一步一步封装的Observer啦if (get() == ON_NEXT) {lazySet(ON_COMPLETE);observer.onComplete();}}}
还记得回溯时封装的那些Observer吗?分别是MapObserver,FilterObserver,SubscribeOnObserver,ObserveOnObserver以及调用链上最后一步我们自己自定义的Observer
分别再看他们的onNext(),其他方法套路一致
MapObserver:
@Overridepublic void onNext(T t) {if (done) {return;}if (sourceMode != NONE) {downstream.onNext(null);return;}U v;try {//处理数据源,将数据源转换成想要的类型v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {fail(ex);return;}// 继续调用下游Observer的onNextdownstream.onNext(v);}
FilterObserver
@Overridepublic void onNext(T t) {if (sourceMode == NONE) {boolean b;try {// 数据判断b = filter.test(t);} catch (Throwable e) {fail(e);return;}//满足过滤条件继续调用下游onNextif (b) {downstream.onNext(t);}} else {downstream.onNext(null);}}
SubscribeOnObserver
由于subscribeOn只是起到切换上游线程的作用,所以对下游他不做任何操作,继续调用下游的onNext
@Overridepublic void onNext(T t) {downstream.onNext(t);}
ObserveOnObserver:
@Overridepublic void onNext(T t) {if (done) {return;}if (sourceMode != QueueDisposable.ASYNC) {queue.offer(t);}schedule(); //切换下游线程,将任务交给线程池或者主线程handler,然后调用下游onNext}
最终就调到我们自定义的onNext()啦,整个流程就结束了
总结一下
Rxjava的链式调用整个流程就是从下到上,由上而下
每一步的操作符都是将上游对象作为source封装成新的Observable,然后继续往下传递,直到最后的subsribe方法反向开始调用source.subscribe然后调用到每个soource对象的subscriActual(),每一步的subscribActual()又会将下游传递来的Observer一步步封装,直到传递到最上游,在最上游开始再一步步调用封装好的Observe的相关方法,这样就实现了将数据源传递到下游。
切换上游线程:
创建一个Task,继承自Runable,在Runable的run()里调用source.subscribe(),然后将这个Runable进一步封装,根据传递的参数创建对应的线程池或者主线程Handler,将Runable提交给线程池或者Handler去执行
切换下游线程:
封装的Observer的onSubscribe,onNext,onError,OnNext方法都调用了schedule(),追踪schedule()发现,最终同样是把任务交给了线程池处理,在本例子中由于传递的是AndroidSchedulers.mainThread(),所以下游是切换到主线程执行