【原】Spark中Master源码分析(一)

Master作为集群的Manager,对于集群的健壮运行发挥着十分重要的作用。下面,我们一起了解一下Master是听从Client(Leader)的号召,如何管理好Worker的吧。

1.家当(静态属性)

1.设置一个守护单线程的消息发送器,
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
2.根据sparkConf得到hadoopConf
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
3.一个bool类型的标识,如果设置为true,那么app的执行将会尽量分步到尽可能多的worker上,否则app的执行将会先用完一个worker的资源,然后再使用下一个worker的资源
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
4.设置执行app默认的最大核数为Int类型的最大值
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
5.还有一些关于worker、driver、app等的字段信息,都比较简单,限于篇幅限制就不一一列出了

2.技能(方法)

由于Master上本质上是一个RpcEndpoint,所以我们按照它的生命周期进行介绍。如果不明白,请看文章

Spark Rpc通信源码分析 http://www.cnblogs.com/yourarebest/p/5297157.html

1.构造函数就是Master默认的主构造器
2.onStart方法,主要功能是启动Jetty的WebUI服务,Rest服务、选出持久化引擎及持久化代理

override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
webUi = new MasterWebUI(this, webUiPort)
//启动JettyServer并绑定webUI端口号
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
//forwardMessageThread线程每1min中检查Worker是否宕了
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
//启动Rest服务,默认端口6066
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
//返回绑定的端口号
restServerBoundPort = restServer.map(.start())
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
//当metrics系统启动后,将master和app的metrics servlet的hadnler给webui
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
//序列化Spark的配置文件
val serializer = new JavaSerializer(conf)
//支持三种持久化引擎,将Spark的配置参数持久化,便于以后恢复使用
val (persistenceEngine
, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}

3.onStop方法,停止master的metrics系统、停止app的metrics系统、取消异步执行的任务、停止WebUi服务、停止rest服务以及持久化引擎和选举代理的停止。

override def onStop() {
masterMetricsSystem.report()
applicationMetricsSystem.report()
//避免异步发出的CompleteRecovery消息导致master的重启
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel(true)
}
if (checkForWorkerTimeOutTask != null) {
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
leaderElectionAgent.stop()
}

还有一个重要的方法receive方法,留到下一篇吧。

转载于:https://www.cnblogs.com/yourarebest/p/5312965.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/458153.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

XML——XML介绍和基本语法

from:https://blog.csdn.net/gavin_john/article/details/51511180 1.XML历史 gml(1969)->sgml(1985)->html(1993)->xml(1998) 1969 gml(通用标记语言),主要目的是要在不同的机器之间进行通信的数据规范1985 sgml(标准通用标记语言)1993 htm…

Tomcat7.0安装配置

很久没有通过博客对学习所得进行记录了。 现在将使用Tomcat的一些经验和心得写到这里,作为记录和备忘。如果有朋友看到,也请不吝赐教。 首先,我个人使用的是apache-tomcat-7.0.27你可以下载使用,前提条件你需要安装JDK1.6或者1.7都…

TIFF图像文件格式详解

from:https://www.cnblogs.com/gywei/p/3393816.html 1 什么是TIFF? TIFF是Tagged Image File Format的缩写。在现在的标准中,只有TIFF存在, 其他的提法已经舍弃不用了。做为一种标记语言,TIFF与其他文件格式最大的不…

java 抽象工厂模式简单实例

抽象工厂模式:提供一个创建一系列的相关的或者依赖的对象的接口,无需指定它们的具体实现类,具体的时间分别在子类工厂中产生。 类似于工厂模式:隔离了具体类的生产实现,使得替换具体的工厂实现类很容易。包含有以下模块…

图像处理之积分图应用三(基于NCC快速相似度匹配算法)

from:https://blog.csdn.net/jia20003/article/details/53021614 图像处理之积分图应用三(基于NCC快速相似度匹配算法) 基于Normalized cross correlation(NCC)用来比较两幅图像的相似程度已经是一个常见的图像处理手段。在工业生产环节检测…

深入浅出地理解机器人手眼标定

from:https://blog.csdn.net/qq_16481211/article/details/79764730 所谓手眼系统,就是人眼镜看到一个东西的时候要让手去抓取,就需要大脑知道眼镜和手的坐标关系。如果把大脑比作B,把眼睛比作A,把手比作C,如果A和B的…

centos 6.5 安装 mongodb

官方给出的链接地址:https://docs.mongodb.org/manual/tutorial/install-mongodb-on-red-hat/ 安装后重要的日志 win10 上使用mongochef连接不上数据库 解决方案: 修改 /etc/mongod.conf 将bindIP 改为0.0.0.0 监听外网转载于:https://www.cnblogs.com/l…

scala学习资料

1. scala-sbt 构建工具: http://www.scala-sbt.org/0.13/docs/zh-cn/Directories.html 2. 资料: http://www.ibm.com/developerworks/cn/java/j-lo-funinscala2/ https://www.zhihu.com/question/34548588?sortcreated http://nerd-is.in/2013-09/scala…

opencv3/C++ 机器学习-SVM应用实例:药品(胶囊)识别与分类

from:https://blog.csdn.net/akadiao/article/details/79278072 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/akadiao/article/details/79278072 问题描述: 现对6种不同颜色药品(胶囊…

Elasticsearch 搜索不到数据问题(_mapping 设置)

需求 由于 kibana3 中,不支持直接在请求的 url 中设置搜索的 type (是不是我不知道???)。 为了支持特定 type 的搜索,所以我设置了个下每个 panel 的查询语句,让它增加一个&#xff…

SVM之交叉验证【转】

交叉验证(CrossValidation)方法思想简介 以下简称交叉验证(Cross Validation)为CV.CV是用来验证分类器的性能一种统计分析方法,基本思想是把在某种意义下将原始数据(dataset)进行分组,一部分做为训练集(train set),另一部分做为验证集(validation set),首先用训练集对分类器进…

linux命令学习-1-less

less 工具也是对文件或其它输出进行分页显示的工具,应该说是linux正统查看文件内容的工具,功能极其强大。less 的用法比起 more 更加的有弹性。在 more 的时候,我们并没有办法向前面翻, 只能往后面看,但若使用了 less …

python问题汇总

问题1:如何解决python3中numpy报错No module named numpy 打开terminal pip3 install numpy 问题2:ModuleNotFoundError No module named matplotlib 打开terminal pip3 install matplotlib

jspspy database help

.转载于:https://www.cnblogs.com/outline/p/5316051.html

SVM 调参策略

转自:SVM 调参策略:https://blog.csdn.net/u014484783/article/details/78220646 SVM 怎样能得到好的结果 1. 对数据做归一化(simple scaling) 2. 应用 RBF kernel 3. 用cross-validation和grid-search 得到最优的c和g 4. 用…

美好的⼀天 从ActionTab开始 美观、智能、⾼效的新标签⻚ iTab 新标签页iTab新标签页Atop100工具推荐

文章目录 ActionTabiTab 新标签页iTab新标签页,小组件,起始页,标签页,日历,股票,浏览器扩展 https://www.actiontab.cn/ ActionTab 收费???? iTab 新标签页iT…

Oracle学习之merge

--使用merge语句 create table new as select * from emp where 10; insert into new (empno,ename) select empno,ename from emp where deptno10;merge into new n using emp e on (n.empnoe.empno) when matched then update set n.sale.salwhen not matched then insert (…

机器学习中的算法(2)-支持向量机(SVM)基础

from:http://www.cnblogs.com/LeftNotEasy/archive/2011/05/18/2034566.html 版权声明: 本文由LeftNotEasy发布于http://leftnoteasy.cnblogs.com, 本文可以被全部的转载或者部分使用,但请注明出处,如果有问题,请联系wheeleastgm…

HDU 2586 How far away ?【LCA】

题目链接: http://acm.hdu.edu.cn/showproblem.php?pid2586 题意: 无向图,给定边及边权重,任意两点之间都有一条唯一的道路,道路上每个点只能出现一次。给定询问,求询问的结点之间的距离。 分析&#xff1…

深入理解拉格朗日乘子法(Lagrange Multiplier) 和KKT条件

from:https://blog.csdn.net/xianlingmao/article/details/7919597 在求取有约束条件的优化问题时,拉格朗日乘子法(Lagrange Multiplier) 和KKT条件是非常重要的两个求取方法,对于等式约束的优化问题,可以应用拉格朗日乘子法去求…