Spark-Luanch Driver

1.SparkSubmit.scala
主要调用M-prepareSubmitEnvironment,该方法更根据用户定义的参数,匹配不同client,去调用不同clientApp。(ps:本次讲ClientApp  也就是standalone)
在M-runMain通过  调用M-Utils.classForName 反射的方式调用 ClientApp 的 M-main (ps:如果是localhost 或者是client 直接反射用户的定义的main)
几种提交方式
// Following constants are visible for testing.
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"
private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS ="org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments,conf: Option[HadoopConfiguration] = None): (Seq[String], Seq[String], SparkConf, String)
2.ClientApp.scala   
最后driver粗粒度就是DriverWrapper
通过Rpc 发送给driver
override def onStart(): Unit = {driverArgs.cmd match {case "launch" =>val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"asyncSendToMasterAndForwardReply[SubmitDriverResponse](RequestSubmitDriver(driverDescription))
3.Master.scala
master 接受之后,放入map缓存中,调用M-schedule,根据资源选择一个work,向该work发送启动LaunchDriver的消息
case RequestSubmitDriver(description) =>if (state != RecoveryState.ALIVE) {val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +"Can only accept driver submissions in ALIVE state."context.reply(SubmitDriverResponse(self, false, None, msg))} else {logInfo("Driver submitted " + description.command.mainClass)val driver = createDriver(description)persistenceEngine.addDriver(driver)waitingDrivers += driverdrivers.add(driver)schedule()// TODO: It might be good to instead have the submission client poll the master to determine//       the current status of the driver. For now it's simply "fire and forget".context.reply(SubmitDriverResponse(self, true, Some(driver.id),s"Driver successfully submitted as ${driver.id}"))}
}private def schedule(): Unit = {if (state != RecoveryState.ALIVE) {return}// Drivers take strict precedence over executorsval shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers// We assign workers to each waiting driver in a round-robin fashion. For each driver, we// start from the last worker that was assigned a driver, and continue onwards until we have// explored all alive workers.var launched = falsevar numWorkersVisited = 0while (numWorkersVisited < numWorkersAlive && !launched) {val worker = shuffledAliveWorkers(curPos)numWorkersVisited += 1if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {launchDriver(worker, driver)waitingDrivers -= driverlaunched = true}curPos = (curPos + 1) % numWorkersAlive}}startExecutorsOnWorkers()
}private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {logInfo("Launching driver " + driver.id + " on worker " + worker.id)worker.addDriver(driver)driver.worker = Some(worker)worker.endpoint.send(LaunchDriver(driver.id, driver.desc))driver.state = DriverState.RUNNING
}

 

4.Work.scala
work接受消息之后,new DriverRunner() 调用该对象的M-start
case LaunchDriver(driverId, driverDesc) =>logInfo(s"Asked to launch driver $driverId")val driver = new DriverRunner(conf,driverId,workDir,sparkHome,driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),self,workerUri,securityMgr)drivers(driverId) = driverdriver.start()

 

5.DriverRunner.scala
该对象中,M-start 中new 了一个线程,调用prepareAndRunDriver 最后通过 ProcessBuilder调用 DriverWrapper 的main(step2中的)
private[worker] def start() = {new Thread("DriverRunner for " + driverId) {override def run() {var shutdownHook: AnyRef = nulltry {shutdownHook = ShutdownHookManager.addShutdownHook { () =>logInfo(s"Worker shutting down, killing driver $driverId")kill()}// prepare driver jars and run driverval exitCode = prepareAndRunDriver()// set final state depending on if forcibly killed and process exit codefinalState = if (exitCode == 0) {Some(DriverState.FINISHED)} else if (killed) {Some(DriverState.KILLED)} else {Some(DriverState.FAILED)}} catch {case e: Exception =>kill()finalState = Some(DriverState.ERROR)finalException = Some(e)} finally {if (shutdownHook != null) {ShutdownHookManager.removeShutdownHook(shutdownHook)}}// notify worker of final driver state, possible exception
      worker.send(DriverStateChanged(driverId, finalState.get, finalException))}}.start()
}private[worker] def prepareAndRunDriver(): Int = {val driverDir = createWorkingDirectory()val localJarFilename = downloadUserJar(driverDir)def substituteVariables(argument: String): String = argument match {case "{{WORKER_URL}}" => workerUrlcase "{{USER_JAR}}" => localJarFilenamecase other => other}// TODO: If we add ability to submit multiple jars they should also be added hereval builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)runDriver(builder, driverDir, driverDesc.supervise)
}private[worker] def prepareAndRunDriver(): Int = {val driverDir = createWorkingDirectory()val localJarFilename = downloadUserJar(driverDir)def substituteVariables(argument: String): String = argument match {case "{{WORKER_URL}}" => workerUrlcase "{{USER_JAR}}" => localJarFilenamecase other => other}// TODO: If we add ability to submit multiple jars they should also be added hereval builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)runDriver(builder, driverDir, driverDesc.supervise)
}6.DriverWrapper.scala (粗粒度Driver client)
开始调用用户指定 jar 和main 真正开始执行我们所写的代码
def main(args: Array[String]) {args.toList match {/** IMPORTANT: Spark 1.3 provides a stable application submission gateway that is both* backward and forward compatible across future Spark versions. Because this gateway* uses this class to launch the driver, the ordering and semantics of the arguments* here must also remain consistent across versions.*/case workerUrl :: userJar :: mainClass :: extraArgs =>val conf = new SparkConf()val host: String = Utils.localHostName()val port: Int = sys.props.getOrElse("spark.driver.port", "0").toIntval rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf))logInfo(s"Driver address: ${rpcEnv.address}")rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))val currentLoader = Thread.currentThread.getContextClassLoaderval userJarUrl = new File(userJar).toURI().toURL()val loader =if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)} else {new MutableURLClassLoader(Array(userJarUrl), currentLoader)}Thread.currentThread.setContextClassLoader(loader)setupDependencies(loader, userJar)// Delegate to supplied main classval clazz = Utils.classForName(mainClass)val mainMethod = clazz.getMethod("main", classOf[Array[String]])mainMethod.invoke(null, extraArgs.toArray[String])rpcEnv.shutdown()case _ =>// scalastyle:off printlnSystem.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")// scalastyle:on printlnSystem.exit(-1)}
}

 

转载于:https://www.cnblogs.com/chouc/p/10885678.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/362687.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大风大浪大鱼

一群年轻人常在一泓深潭边钓鱼&#xff0c;而有一个渔夫总是在潭上边水流湍急的河里捕鱼。 年轻人觉得这渔夫可笑&#xff0c;在大风大浪的河里怎么会捕到鱼呢?有一天&#xff0c;年轻人忍不住去问渔夫&#xff1a;“鱼能在这么湍急的地方停留吗?”渔夫说&#xff0c;当然不能…

清空表单时出现问题

打开页面时报警告&#xff1a; 解决办法&#xff1a; &#xff08;1&#xff09;npm i default-passive-events -S &#xff08;2&#xff09;main.js中加入&#xff1a;import ‘default-passive-events’ 参考&#xff1a;https://www.jianshu.com/p/23850d4cade8 出现原…

JQuery Ajax 使用FormData上传文件对象

FormData部分: 先new FormData对象 :let somedata new FormData(),然后将数据添加进去&#xff0c;这里我们使用append()进行添加。 这里举一个上传头像的例子&#xff1a; let token localStorage.token; let img $(".file")..get(0).files[0]; let somedat…

[探索][管理]《现在,发现你的优势》

此书是我迄今为止看过最棒的一本书&#xff01;&#xff01;&#xff01; 为什么这么说&#xff0c;因为就像前言彼得德鲁克所说的一样 -- 大部分的人都不知道他们的优势何在。此书的宗旨就是测试你的优势是什么&#xff0c;并且发展你的优势。为何要去改变你不擅长的东西呢&am…

box2d——1.tiles瓦片积木

【调试渲染】 将TestCpp里Box2DTestBed的GLES-Render.h/cpp加入到项目中。声明绘制变量&#xff1a;GLESDebugDrawmDebugDraw。 【创建世界】 // 依据重力创建世界b2Vec2 gravity;gravity.Set(0.0f, -10.0f);mWorld new b2World(gravity);// 设置调试渲染和碰撞侦听mWorld-&…

如何在JSF中实现自定义密码强度指示器

使用JavaScript验证密码强度是一项常见任务。 在本文中&#xff0c;我将展示如何向基于JSF的Web应用程序添加密码强度指示器。 的 PrimeFaces中的密码组件已经具有密码强度的反馈指示符&#xff0c;但是它有两个主要缺点&#xff1a; 反馈指示器没有响应&#xff08;固定宽度…

CSS 学习路线(二)选择器

选择器 规则结构: 分两个基本部分 选择器(selector)和声明块(declaration block) 组成 声明块:由一个或多个声明组成,每一个声明都是属性-值对 选择器分为:元素选择器,类选择器,后代选择器,通配选择器,ID选择器,属性选择器,伪类选择器.子元素选择器,相邻兄弟选择器. 元素选…

关于vue打包的问题

一、vue-cli2 二、vue-cli3 一、vue-cli2 错误提示&#xff1a; npm ERR! code ELIFECYCLE npm ERR! errno 1 npm ERR! hewelry1.0.0 build: node build/build.js npm ERR! Exit status 1 npm ERR! npm ERR! Failed at the hewelry1.0.0 build script. npm ERR! This is prob…

关于==和equals的区别和联系,面试这么回答就可以

长篇大论的话&#xff0c;我这里就不多写了&#xff0c;相信大家入门java 的时候就知道个大概了&#xff0c;这里想表述的是&#xff0c;如果面试官问你《关于和equals的区别》&#xff0c;该怎么回答完美呢&#xff1f;可以这样说 总结的来说&#xff1a; 1&#xff09;对于&a…

如何使用新的Apache Http Client发出HEAD请求

如果您已更新Apache HTTP Client代码以使用最新的库&#xff08;在撰写本文时&#xff0c;它是4.2.x版本的httpclient 4.3.5版本和httpcore 4.3.2版本&#xff09;&#xff0c;您会注意到某些类&#xff08;例如org.apache.http.impl.client.DefaultHttpClient或org.apache.htt…

Delphi中禁止WebBrowser右键的方法

usesMSHtml;//在控件标签additional中找到TApplicationEvents控件&#xff0c;拖到窗体上&#xff0e;在TApplicationEvents的OnMessage事件中加入以下代码&#xff1a;//替换右键菜单procedureTForm1.ApplicationEvents1Message(varMsg: tagMSG; varHandled: Boolean);varmPoi…

【前端框架-Vue-基础】$attr及$listeners实现跨多级组件的通信

父子 A 组件与 B 组件之间的通信&#xff1a; &#xff08;父子组件&#xff09; 如上图所示&#xff0c;A、B、C三个组件依次嵌套&#xff0c;按照 Vue 的开发习惯&#xff0c;父子组件通信可以通过以下方式实现&#xff1a; A to B 通过props的方式向子组件传递&#xff0c…

html笔记(一)html4+css2.0、css基础和属性、盒模型

w3c 官网 这里是 html4 的内容 大标题小节一、关于HTML1. 基本语法2. HTML常用标签3. 相对路径和绝对路径二、css基础1. 表单元素2. 创建样式表3. css语法4. css选择器三、css的相关属性1. 列表 li 独有的属性list-style2. 边框属性border3. overflow4. 浮动 float 遇到的坑5.…

JUnit:使用Java 8和Lambda表达式测试异常

在JUnit中&#xff0c;有许多方法可以在测试代码中测试异常&#xff0c;包括try-catch idiom JUnit Rule和catch-exception库。 从Java 8开始&#xff0c;我们还有另一种处理异常的方法&#xff1a;使用lambda表达式。 在这篇简短的博客文章中&#xff0c;我将演示一个简单的示…

动态语言和静态语言的比较

一 、静态语言的优势到底在哪&#xff1f; 来自robbin 摘自 http://www.javaeye.com/article/33971?page7 引用是像Java或者C&#xff03;这样强类型的准静态语言在实现复杂的业务逻辑、开发大型商业系统、以及那些生命周期很长的应用中也有着非常强的优势这是一个存在于大家心…

sqlserver2012——使用子查询

1 select A.成绩&#xff0c;A.分数,B.姓名 FROM 成绩信息 A, 学生信息 BWHERE A.学生编号B.学号 AND A.课程编号‘2’ AND A.考试编号‘0801’ AND A.分数 <( SELECT AVG(分数) FROM 成绩信息 A&#xff0c;学生信息 B where A.学生编号B.学号 AND A.课程编号‘2’ and A.…

返回顶部小火箭

代码如下&#xff1a; <!DOCTYPE html> <html> <head lang"en"><meta charset"UTF-8"><title></title><style>body {width: 2000px;}.top{position: fixed;right:50px;bottom:100px;display: none;}</style&…

最佳5本Java性能调优书籍–精选,必读

为什么Java开发人员应该阅读有关性能调优的书&#xff1f; 当我很久以前第一次面对这个问题时&#xff0c;我以为以后会做&#xff0c;但是我很长一段时间都没有回过头来。 仅当我在用Java编写的任务关键型服务器端财务应用程序中遇到严重的性能和可伸缩性问题时&#xff0c;我…

html笔记(三)html5+css3(html5、css3、文字相关)

W3school在线教程 html5css3基本不兼容ie678。 大标题小节一、html51. html4 和 html5 的区别2. 标签语义化3. 智能表单的使用和规范二、css3选择器1. 属性选择器2. 结构性伪类选择器&#xff08;层级选择器/符&#xff09;3. UI状态性伪类选择器4. 相邻兄弟选择器5. 其他选择…

美登杯 E、小花梨的数组* 线段树

操作过程中标记传递 询问的时候再计算 #include<bits/stdc.h> using namespace std; //input by bxd #define rep(i,a,b) for(int i(a);i<(b);i) #define repp(i,a,b) for(int i(a);i>(b);--i) #define RI(n) scanf("%d",&(n)) #define RII(n,m) sc…