生产调度java程序原码_Rxjava的线程调度源码解析

代码调用

Observable.just(1)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

}

});

直接进入主题,先看subscribe中调用了哪些方法

//Observable.java

public final Disposable subscribe(Consumer super T> onNext) {

return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());

}

public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError,

Action onComplete, Consumer super Disposable> onSubscribe) {

ObjectHelper.requireNonNull(onNext, "onNext is null");

ObjectHelper.requireNonNull(onError, "onError is null");

ObjectHelper.requireNonNull(onComplete, "onComplete is null");

ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);

subscribe(ls);

return ls;

}

public final void subscribe(Observer super T> observer) {

ObjectHelper.requireNonNull(observer, "observer is null");

try {

observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

subscribeActual(observer);

} catch (NullPointerException e) { // NOPMD

throw e;

} catch (Throwable e) {

Exceptions.throwIfFatal(e);

// can't call onError because no way to know if a Disposable has been set or not

// can't call onSubscribe because the call might have set a Subscription already

RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");

npe.initCause(e);

throw npe;

}

}

//最终调用了Observable的subscribeActual方法

protected abstract void subscribeActual(Observer super T> observer);

接下来我们看下subscribeOn方法中进行了什么操作

//Observable.java

public final Observable subscribeOn(Scheduler scheduler) {

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

//这里返回了一个ObservableSubscribeOn对象,参考RxJavaPlugins.onAssembly方法,

//返回的值就是传入的值,再根据流式调用,

//即上面分析调用的subscribeActual方法,即是ObservableSubscribeOn的subscribeActual方法

return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));

}

接下来我们看ObservableSubscribeOn的subscribeActual方法

//ObservableSubscribeOn.java

public void subscribeActual(final Observer super T> observer) {

final SubscribeOnObserver parent = new SubscribeOnObserver(observer);

observer.onSubscribe(parent);

//这里生成了一个SubscribeTask,查看源码可知实现了Runnable接口

//这里调用了scheduler.scheduleDirect

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

}

看下scheduler.scheduleDirect,再次之前,我们先看下传入的Scheduler.io

查看传入的Schedule

public static Scheduler io() {

// 这里查看下IO

return RxJavaPlugins.onIoScheduler(IO);

}

//new IOTask

IO = RxJavaPlugins.initIoScheduler(new IOTask());

static final class IOTask implements Callable {

@Override

public Scheduler call() throws Exception {

return IoHolder.DEFAULT;

}

//由此可见,最后Schedulers.io就是IoScheduler

static final class IoHolder {

static final Scheduler DEFAULT = new IoScheduler();

}

//scheduler

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

//这里生成一个Worker,但是createWorker是一个虚方法,有上可知

//这里调用了IoScheduler.createWorker,生成EventLoopWorker对象

final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

//调用了EventLoopWorker.schedule

w.schedule(task, delay, unit);

return task;

}

接下来看EventLoopWorker

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {

//取消注册

if (tasks.isDisposed()) {

// don't schedule, we are unsubscribed

return EmptyDisposable.INSTANCE;

}

//NewThreadWorker.scheduleActual

return threadWorker.scheduleActual(action, delayTime, unit, tasks);

}

真正进入线程调度的代码,在NewThreadWorker中

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {

Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {

if (!parent.add(sr)) {

return sr;

}

}

Future> f;

try {

if (delayTime <= 0) {

//executor是一个线程池

f = executor.submit((Callable)sr);

} else {

//存在延迟的

f = executor.schedule((Callable)sr, delayTime, unit);

}

sr.setFuture(f);

} catch (RejectedExecutionException ex) {

if (parent != null) {

parent.remove(sr);

}

RxJavaPlugins.onError(ex);

}

return sr;

}

所以到最后,真正进行线程调度的,其实是一个线程池,看完了subscribeOn,我们再来看看observeOn,首先看下AndroidSchedulers.mainThread()到底是哪个线程

public static Scheduler mainThread() {

return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);

}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(

new Callable() {

@Override public Scheduler call() throws Exception {

return MainHolder.DEFAULT;

}

});

private static final class MainHolder {

static final Scheduler DEFAULT

//从Looper.getMainLooper()可以看出,这里是获取了主线程的Looper

= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);

}

好确定了这个问题,我们再继续往下看

public final Observable observeOn(Scheduler scheduler) {

return observeOn(scheduler, false, bufferSize());

}

public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

ObjectHelper.verifyPositive(bufferSize, "bufferSize");

//生成一个新的ObservableObserverOn对象

return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));

}

接下去看ObservableObserveOn对象

protected void subscribeActual(Observer super T> observer) {

if (scheduler instanceof TrampolineScheduler) {

source.subscribe(observer);

} else {

//跟之前一样还是调用createWorker,从上面代码可知调用了HandlerScheduler.createWorker返回HandlerWorker

Scheduler.Worker w = scheduler.createWorker();

//这里有一个内部类对象ObserveOnObserver

source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));

}

}

//内部类ObserveOnObserver,以下是回调方法

public void onSubscribe(Disposable d) {

if (DisposableHelper.validate(this.upstream, d)) {

this.upstream = d;

if (d instanceof QueueDisposable) {

@SuppressWarnings("unchecked")

QueueDisposable qd = (QueueDisposable) d;

int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

if (m == QueueDisposable.SYNC) {

sourceMode = m;

queue = qd;

done = true;

downstream.onSubscribe(this);

//调用schedule

schedule();

return;

}

if (m == QueueDisposable.ASYNC) {

sourceMode = m;

queue = qd;

downstream.onSubscribe(this);

return;

}

}

queue = new SpscLinkedArrayQueue(bufferSize);

downstream.onSubscribe(this);

}

}

@Override

public void onNext(T t) {

if (done) {

return;

}

if (sourceMode != QueueDisposable.ASYNC) {

queue.offer(t);

}

//调用schedule

schedule();

}

@Override

public void onError(Throwable t) {

if (done) {

RxJavaPlugins.onError(t);

return;

}

error = t;

done = true;

//调用schedule

schedule();

}

@Override

public void onComplete() {

if (done) {

return;

}

done = true;

//调用schedule

schedule();

}

void schedule() {

if (getAndIncrement() == 0) {

//所以当回调的时候,最终是调用了worker.schedule

worker.schedule(this);

}

}

//最终看一下HandlerWorker的schedule方法,一看源码,老朋友了,Handler就不解释了

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {

if (run == null) throw new NullPointerException("run == null");

if (unit == null) throw new NullPointerException("unit == null");

if (disposed) {

return Disposables.disposed();

}

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);

message.obj = this; // Used as token for batch disposal of this worker's runnables.

if (async) {

message.setAsynchronous(true);

}

handler.sendMessageDelayed(message, unit.toMillis(delay));

// Re-check disposed state for removing in case we were racing a call to dispose().

if (disposed) {

handler.removeCallbacks(scheduled);

return Disposables.disposed();

}

return scheduled;

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/563467.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux 触摸屏测试源码_Linux触摸屏驱动

问题二&#xff1a;echo "ac_cv_func_malloc_0_nonnullyes" >arm-linux.cache//避免检查ac_cv_func_malloc_0_nonnull若出现提示: undefined reference to rpl_malloc解决&#xff1a;发现config.h.in和config.h里定义了#undef malloc#undef realloc把这两个用//注…

java有没有number数据类型_Java基本数据类型之Number

数据类型byte&#xff1a;byte数据类型是8位、有符号的&#xff0c;以二进制补码表示的整数&#xff1b;最小值是-128(-2^7)&#xff1b;最大值是127(2^7-1)&#xff1b;byte类型用在大型数组中节约空间&#xff0c;主要代替整数&#xff0c;因为byte变量占用的空间只有int类型…

java中main缺少主体_缺少方法主体,或声明了摘要

我收到此错误消息&#xff1a;线程“主”中的异常java.lang.RuntimeException&#xff1a;无法编译的源代码-错误的符号类型&#xff1a;PetTest.main(PetTest.java:18)上的Pet.saySomething Java结果&#xff1a;1这是我所拥有的&#xff1a;对于Speak课堂&#xff0c;public …

java获取b站动态列表地址_爬虫入门(三)爬取b站搜索页视频分析(动态页面,DBUtils存储)...

这一次终于到了分析b站视频了。开始体会到写博客非常占用学技术的时间&#xff0c;但是还是希望能总结&#xff0c;沉淀下来。工具&#xff1a;使用Webmaigc框架&#xff0c;DBUtils&#xff0c;C3P0连接池。分析过程&#xff1a;b站的搜索页面是这样的。如果浏览器右键查看源代…

python a和b字符串和占位符输出_Python占位符的使用与format函数字符串格式化详解...

Python字符串格式化01字符串的格式化分类字符串的格式化方法共两种&#xff1a;占位符(%)与format方式。占位符方式在Python2比较常见&#xff0c;随着Python3到来&#xff0c;format方式变得广泛起来&#xff0c;format函数常与print()函数结合使用&#xff0c;具备很强的格式…

python list tuple 消耗_Python内存消耗:dict VS元组列表

在这种情况下&#xff0c;你实际上得到了一个不完整的内存使用图片。字典的总大小以不规则的间隔增加一倍以上&#xff0c;如果在字典大小增加后比较这两个结构的大小&#xff0c;它会再次变大。一个带有递归大小函数的简单脚本(见下面的代码)显示了一个非常清晰的模式&#xf…

python 项目构建工具_GitHub - shjlone/emake: 你见过的最简单的 GCC/CLANG 项目构建工具(python3版本)...

python3实现版本PrefaceGNU Make 太麻烦&#xff1f;Makefile 写起来太臃肿&#xff1f;头文件依赖生成搞不定&#xff1f;多核同时编译太麻烦&#xff1f;Emake 帮你解决这些问题&#xff1a;使用简单&#xff1a;设定源文件&#xff0c;设定编译参数和输出目标就行了&#xf…

18135usm_佳能PZ-E1+EF-S 18-135mm f/3.5-5.6 IS USM镜头 小型工作室的利器

EF-S 18-135mm f/3.5-5.6 IS USM 在大神眼里据对是属于狗头系列的 哈哈哈 但是这货如果搭配佳能的 PZ-E1 在配合佳能80D 那绝对是小型视频工作室的首选 &#xff01;&#xff01;&#xff01;mxcpTB2rqUOg80kpuFjSsppXXcGTXXa_!!104284319.jpg (156.5 KB, 下载次数: 1)2017-3-…

开启php缩略图,PHP生成缩略图

//参数1 文件名 参数2 缩放比例function _thumb($_filename,$_percent){ob_clean();//生成png标头文件header(Content-type:image/png);$_nexplode(., $_filename);//获取文件的信息,宽和高list($_width,$_height)getimagesize($_filename);//生成缩略后的大小$_new_wid…

php项目中sql,php – 大括号{}在SQL查询中做了什么?

有关双引号字符串语法,请参见http://www.php.net/manual/de/language.types.string.php#language.types.string.parsing.花括号用于复杂的变量表达式.它们由PHP解释,而不是由SQL接口解释.$query "SELECT * FROM users WHERE user$_POST[username] AND password$_POST[pas…

php获取本机ip外网地址,php获取本机ip(远程IP地址)

例子&#xff0c;php获取用户IP地址。复制代码 代码示例:// 111111111111echo $_SERVER[REMOTE_ADDR];// 2222222222222function get_local_ip() {$preg "/\A((([0-9]?[0-9])|(1[0-9]{2})|(2[0-4][0-9])|(25[0-5]))\.){3}(([0-9]?[0-9])|(1[0-9]{2})|(2[0-4][0-9])|(25…

php打png图片水印颜色失真,ThinkPHP水印功能实现修复PNG透明水印并增加JPEG图片质量可调整...

/**———————————————————-* 为图片添加水印———————————————————-* static public———————————————————-* param string $source 原文件名* param string $water 水印图片* param string $$savename 添加水印后的图片名…

java服务器要二次编译,ecology项目二次开发环境搭建

ecology项目二次开发环境搭建Submitted By Weaver文档版本控制文档简要信息&#xff1a;文档主题(Title)ecology项目二次开发环境搭建作者(Author)审批者 (To Be Approved By)说明 (Comments)ecology项目二次开发环境搭建文件名称 (File Name)文档版本历史&#xff1a;序号日期…

matlab基础试题,MATLAB基础试题题目及答案,课程2020最新期末考试题库,章节测验答案...

【判断题】手指第一关节应该始终保持支撑。【单选题】如果陈先生购买了一套 200 万的住房(家庭首套住房)&#xff0c;房屋面积为 105 平方米&#xff0c;容积率为 3.0&#xff0c; 房价低于当地平均价格&#xff0c;则他要缴纳的契税为( )万元。在做高抬指练习时注意下键速度要…

乘法口诀表编程php视频,PHP学习之制作乘法口诀表

进入学习php语言状态。昨日看了一天视频。才能够把这个小口诀表实现。好难啊。里面有个口诀表的函数、echo "";for ($a1;$a<9;$a){ //循环输出a a默认值是1 a的值不会大于9 且不断加一for ($b1;$b<$a;$b){//{b默认值是1 且b值不能大于a值 也就是九 加一运算/*一…

列车matlab模型,【国家级精品课程】-中南大学-数学建模-lingo-matlab-优化建模-数模培训-全国赛论文-京沪线列车调度模型(B题)...

【国家级精品课程】-中南大学-数学建模-lingo-matlab-优化建模-数模培训-全国赛论文-京沪线列车调度模型(B题) 答卷编号&#xff1a; 答卷编号&#xff1a; 论文题目&#xff1a; 京沪线列车调度模型(B题) 参赛队员&#xff1a; 1. 唐欢 电话&#xff1a;13100251389 2. 任礼秋…

如何学习matlab 知乎,知乎日报

利用记忆软件 Anki 进行复习&#xff1a;先看一下效果&#xff1a;通过编程(Matlab 程序 main.m)把一集美剧中的每一句话的中英文和语音都分离出来&#xff0c;输入 Anki 进行复习。制作每一集的学习包的大概流程如下&#xff1a;下面是详细步骤&#xff1a;* 流程只适用于装了…

php javascript对象,JavaScript 对象

JavaScript 对象JavaScript 对象是拥有属性和方法的数据。真实生活中的对象&#xff0c;属性和方法真实生活中&#xff0c;一辆汽车是一个对象。对象有它的属性&#xff0c;如重量和颜色等&#xff0c;方法有启动停止等:对象属性方法car.name Fiatcar.model 500car.weight 8…

oracle12c 删除pdb用户,oracle 12c pdb测试:创建、开关、删除

pdb测试&#xff1a;创建、开关、删除-----------------------ORACLE12C中提出来CDB和PDB的概念他们可以分别理解为容器和插件(PDB插入在CDB中)CDB的管理和传统数据库区别不大--确认当前cdbSQL> select name,cdb from v$database;NAME CDB--------- ---ZARADB YES--…

oracle 备份批处理,windows下oracle自动备份批处理

上一篇日志中&#xff0c;我将windows下的备份流程分成了五步走。本日志就结合上篇的理论来个实例。该实例是来自于我现网的中的一个备份示例(不要想着弄我密码啊&#xff0c;内容改过的&#xff0c;呵呵)。费话少说&#xff0c;直接上脚本&#xff1a;eche off//下面一部分是设…