简述JAVA线程调度的原理,Rxjava原理(二)--线程调度

1. 创建线程池和线程管理策略分析

// 在开发中使用Rxjava来完成线程切换会调用到以下方法(还有几个就不一一列举了,原理一样的),那么就从这里开始分析

Schedulers.io()

Schedulers.computation()

Schedulers.newThread()

AndroidSchedulers.mainThread()

当我们调用以上方法中的任意一个,都会调到Schedulers类中,Schedulers使用策略模式封装了所有线程切换策略(因此后面以io()分析)。

// 1. Schedulers类中,静态创建IOTask(),当调用Schedulers.io()的时候,就是返回这个Callable.

static {

SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

IO = RxJavaPlugins.initIoScheduler(new IOTask());

TRAMPOLINE = TrampolineScheduler.instance();

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());

}

// 2.创建IoScheduler

static final class IOTask implements Callable {

@Override

public Scheduler call() throws Exception {

return IoHolder.DEFAULT;

}

}

static final class IoHolder {

static final Scheduler DEFAULT = new IoScheduler();

}

// 3.创建线程池

public IoScheduler(ThreadFactory threadFactory) {

this.threadFactory = threadFactory;

this.pool = new AtomicReference(NONE);

start();

}

public void start() {

// CachedWorkerPool任务池,里面持有任务队列和线程池

CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);

if (!pool.compareAndSet(NONE, update)) {

update.shutdown();

}

}

//4. CachedWorkerPool构造方法中创建线程池,并且暴露get()提供需要执行的任务

static final class CachedWorkerPool implements Runnable {

private final long keepAliveTime;

private final ConcurrentLinkedQueue expiringWorkerQueue;

final CompositeDisposable allWorkers;

private final ScheduledExecutorService evictorService;

private final Future> evictorTask;

private final ThreadFactory threadFactory;

CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {

......

if (unit != null) {

// 创建线程池

evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);

task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);

}

......

}

ThreadWorker get() {

.....

while (!expiringWorkerQueue.isEmpty()) {

// 任务队列不为空,从队列中取一个并返回

ThreadWorker threadWorker = expiringWorkerQueue.poll();

if (threadWorker != null) {

return threadWorker;

}

}

// 如果任务队列是空的,就创建一个并返回

ThreadWorker w = new ThreadWorker(threadFactory);

allWorkers.add(w);

return w;

}

......

}

用一张图可能说明得比较清楚一些。

ce19d5012d66

Schedulers调度过程.png

2. Rxjava上游任务在子线程中执行分析

// 上游线程切换使用过程

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext("JackOu");

}

})

// ObservableCreate.subscribeOn

.subscribeOn(Schedulers.io())

// ObservableSubscribeOn.subscribe

.subscribe(new Observer() {

......

@Override

public void onNext(String s) {

}

......

});

从上面使用过程的代码看下面的图,分析Rxjava封装任务和抛任务到线程池的过程。

ce19d5012d66

上游任务在线程池执行流程图.png

当我们一订阅(调用subscribe(Observer)方法)的时候,Rxjava将会把上游需要执行的任务和下游的观察者经过层层包裹,包裹好之后,就会得到一个Scheduler.Worker任务对象。当调用发射器的onNext的方式的时候,结合第一小节的图片,ObservableSubscribeOn就会将任务抛到线程池执行,在子线程中执行任务并且返回,从而完成线程切换功能。

3. Rxjava下游任务在主线程中执行分析

3.1 创建AndroidSchedulers.mainThread的过程

如第一节Schedulers的创建流程一样,当调用AndroidSchedulers.mainThread()之后,最终会创建HandlerScheduler。

// 1.创建HandlerScheduler,并且传入MainLooper

public final class AndroidSchedulers {

private static final class MainHolder {

// 创建HandlerScheduler

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(

new Callable() {

@Override public Scheduler call() throws Exception {

return MainHolder.DEFAULT;

}

});

public static Scheduler mainThread() {

return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);

}

}

// 2.当创建任务的时候,创建HandlerWorker

final class HandlerScheduler extends Scheduler {

private final Handler handler;

HandlerScheduler(Handler handler) {

this.handler = handler;

}

@Override

public Worker createWorker() {

return new HandlerWorker(handler);

}

}

// 3.当执行任务的时候

private static final class HandlerWorker extends Worker {

private final Handler handler;

HandlerWorker(Handler handler) {

this.handler = handler;

}

@Override

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {

......

// 包装任务

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

// 创建Message包装任务

Message message = Message.obtain(handler, scheduled);

message.obj = this;

// 发送任务到MainLooper中,该任务就在主线程中执行了

handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

......

return scheduled;

}

}

其实真正将任务放在主线程中执行就是上面三个步骤,但是Rxjava增加了很多其他功能,例如解除订阅(将任务包装在Disposable中),增加hook功能(在任务外面在包装了ScheduledRunnable)等等,其最内层的本质就是我们需要执行的任务。细化的包裹情况如下图:

ce19d5012d66

主线程执行任务.png

4.多个线程切换,以哪个为准

如下面代码,我们作死得切换线程,那么哪些线程会最终执行我们的任务呢

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext("JackOu");

}

})

.subscribeOn(Schedulers.io()) // 上游切换,靠近上游的生效

.subscribeOn(Schedulers.newThread())

.subscribeOn(Schedulers.computation())

.observeOn(Schedulers.io())

.observeOn(Schedulers.computation())

.observeOn(AndroidSchedulers.mainThread()) // 下游切换,靠近下游的生效

.subscribe(new Observer() {

......

@Override

public void onNext(String s) {

}

......

});

我们可以从第二节和第三节看出,当我们每调用一次subscribeOn方法,上游就会多包装一层Scheduler,在订阅之后,解包裹的时候越靠近“待执行任务”的subscribeOn越后解包,所以最靠近任务的subscribeOn调用会是最终被执行,也就是最终被执行的线程。

因此我们可以总结得到:

总结一: 在多次调用线程切换的时候,第一次调用subscribeOn的线程切换会是最后执行任务的线程;最后调用observeOn切换的线程会是最后执行的线程。

总结二:从调用关系来看,越靠近上游的线程切换,将是最终执行任务的线程;越靠近下游的线程切换,将是最终执行任务的线程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/394930.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

不同的模块中定义同样的宏为不同的值合法吗_如何创建自定义的建模规范

本文摘要:主要介绍如何创建自定义的建模规范检查,以及在建模规范检查中,如何增加自动修正模型使之符合规范。比如我们想创建一个自定义的规则,对于constant模块,1. 如果value是参数的话,则输出数据类型必须…

wsdl文件是怎么生成的_C++ 动态库.dll的生成---超级详细!!!

怎么将建好的工程生成.dll工程?1、在C中打开工程2、运行结果:输出Print修改开始:1、打开属性。2、修改以下内容:目标文件扩展名,由.exe--》.dll,直接删除修改即可配置类型,由.exe--》.dll,下拉菜单可选择最…

HTTP基本认证(Basic Authentication)的JAVA示例

大家在登录网站的时候,大部分时候是通过一个表单提交登录信息。但是有时候浏览器会弹出一个登录验证的对话框,如下图,这就是使用HTTP基本认证。下面来看看一看这个认证的工作过程:第一步: 客户端发送http request 给服务器,服务器验证该用户…

python能print中文吗_python怎么print汉字

今天就为大家分享一篇python中使用print输出中文的方法,具有很好的参考价值,希望对大家有所帮助。看Python简明教程,学习使用print打印字符串,试了下打印中文,不行。(推荐学习:Python视频教程&a…

oracle 11gogg,【OGG】Oracle GoldenGate 11g (二) GoldenGate 11g 单向同步配置 上

Oracle GoldenGate 11g (二)GoldenGate 11g 单向同步配置 上ItemSource SystemTarget SystemPlatformRHEL6.4 - 64bitRHEL6.4 - 64bitHostnamerhel64.oracle.comora11g.oracle.comDatabaseOracle 11.2.0.3Oracle 11.2.0.3Character SetAL32UTF8AL32UTF8ORACLE_SIDPRODEMREPList…

Centos7-卸载自带的jdk 安装jdk8

卸载JDK Centos7一般都会带有自己的openjdk,我们一般都回用oracle的jdk,所以要卸载 步骤一:查询系统是否以安装jdk #rpm -qa|grep java 或 #rpm -qa|grep jdk 或 #rpm -qa|grep gcj 步骤二:卸载已安装的jdk #rpm -e --nodeps java-1.8.0-openjdk…

iOS开发UIScrollView的底层实现

起始 做开发也有一段时间了,经历了第一次完成项目的激动,也经历了天天调用系统的API的枯燥,于是就有了探索底层实现的想法。 关于scrollView的思考 在iOS开发中我们会大量用到scrollView这个控件,我们使用的tableView/collectionv…

oracle查看登录时间黑屏,oracle 11g默认用户名、密码解锁 以及安装后重启黑屏问题.doc...

oracle 11g默认用户名、密码解锁 以及安装后重启黑屏问题.doc还剩3页未读,继续阅读下载文档到电脑,马上远离加班熬夜!亲,喜欢就下载吧,价低环保!内容要点:遇的同学,参考一下解决办法…

第六十二节,html分组元素

html分组元素 学习要点: 1.分组元素总汇 2.分组元素解析 本章主要探讨HTML5中分组元素的用法。所谓分组,就是用来组织相关内容的HTML5元素,清晰有效的进行归类。 一.分组元素总汇 为了页面的排版需要,HTML5提供了几种语…

WebSocket 实战--转

原文地址:http://www.ibm.com/developerworks/cn/java/j-lo-WebSocket/ WebSocket 前世今生 众所周知,Web 应用的交互过程通常是客户端通过浏览器发出一个请求,服务器端接收请求后进行处理并返回结果给客户端,客户端浏览器将信息呈…

mongodb 安装、启动

MongoDB 之 你得知道MongoDB是个什么鬼 MongoDB - 1 最近有太多的同学向我提起MongoDB,想要学习MongoDB,还不知道MongoDB到底是什么鬼,或者说,知道是数据库,知道是文件型数据库,但是不知道怎么来用 那么好,所谓千呼万唤始出来,现在我就拉给你们看: 一.初识MongoDB 之 什么东西都…

[转载]PSCAD调用MATLAB/SIMULINK之接口元件设计

原文地址:PSCAD调用MATLAB/SIMULINK之接口元件设计作者:luckyhappier1)接口元件 接口元件包括Graphics,Parameters和Script。注意:变量要与DSDYN要一致(PSCAD根据变量名区别变量)。 2)Circuit 定…

oracle数字类型ef映射,Entity Framework 学习中级篇5—使EF支持Oracle9i - ♂风车车.Net - 博客园...

从Code MSDN上下载下来的EFOracleProvider不支持Oracle9i.但是,目前我所使用的还是Oracle9i。为此,对EFOracleProvider修改了以下,以便使其支持Oracle9i.下面说说具体修改地方.(红色部分为添加或修改的代码部分)一,修改EFOracleProvider1,修改EFOraclePr…

Oracle 数据库之最:你见过最高的 SQL Version 是多少?

Oracle数据库中执行的SQL,很多时候会因为种种原因产生多个不同的执行版本,一个游标的版本过多很容易引起数据库的性能问题,甚至故障。 有时候一个SQL的版本数量可能多达数万个,以下是我之前在"云和恩墨大讲堂”分享过的一个案…

C 怎么读取Cpp文件_opencv从yaml文件中读取矩阵(c++)

PS:由于我是新手,因此记录的比较罗里吧嗦,本文也属于一个没有任何技术的编程积累。在SLAM系统中,经常需要从配置文件中读取参数文件,读取整型,浮点型都是比较常见的操作,在读取矩阵卡了一下,记录…

3.SFB标准版前端安装

SFB服务器准备部分:1.修改服务器名称,sfb加入域,用域管理员账户登录2.配置服务器IP地址,DNS3.安装Windows组件Add-WindowsFeature NET-Framework-Core, RSAT-ADDS, Windows-Identity-Foundation, Web-Server, Web-Static-Content,…

向spark standalone集群提交任务

文档链接 #切换到spark安装目录,执行下面一条命令,192.168.0.10是master的ip, examples/src/main/python/pi.py 是python 文件的路径 ./bin/spark-submit --master spark://192.168.0.106:7077 examples/src/main/python/pi.py任务已经执行完毕,耗时10秒 转载于:https://www.c…

python excelwriter保存路径_Python和Excel 终于可以互通了!!

点击“开发者技术前线”,选择“星标🔝”在看|星标|留言, 真爱作者:小天真_5eed 链接:https://www.jianshu.com/p/6ecf414f3372今天为大家分享一篇使用python将大量数据导出到Excel中的技巧心得,可以让Python和Excel…

MySQL 导出数据

2019独角兽企业重金招聘Python工程师标准>>> 1、导出整个数据库 mysqldump -u 用户名 -p 数据库名 > 存放位置比如: mysqldump -u root -p project > c:/a.sql 2.导出一个表的结构,并且带表中的数据 mysqldump -u 用户名 -p 数据库名 …

单片机STM8S测量电压电路_单片机电路设计中的10个难点

单片机是嵌入式系统的核心元件,使用单片机的电路要复杂得多,但在更改和添加新功能时,带有单片机的电路更加容易实现,这也正是电器设备使用单片机的原因。那么在单片机电路的设计中需要注意的难点有哪些?嵌入式ARM开发 …