1.SparkSubmit.scala
主要调用M-prepareSubmitEnvironment,该方法更根据用户定义的参数,匹配不同client,去调用不同clientApp。(ps:本次讲ClientApp 也就是standalone)
在M-runMain通过 调用M-Utils.classForName 反射的方式调用 ClientApp 的 M-main (ps:如果是localhost 或者是client 直接反射用户的定义的main)
几种提交方式
// Following constants are visible for testing. private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication" private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName() private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS ="org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments,conf: Option[HadoopConfiguration] = None): (Seq[String], Seq[String], SparkConf, String)
2.ClientApp.scala
最后driver粗粒度就是DriverWrapper
通过Rpc 发送给driver
override def onStart(): Unit = {driverArgs.cmd match {case "launch" =>val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"asyncSendToMasterAndForwardReply[SubmitDriverResponse](RequestSubmitDriver(driverDescription))
3.Master.scala
master 接受之后,放入map缓存中,调用M-schedule,根据资源选择一个work,向该work发送启动LaunchDriver的消息
case RequestSubmitDriver(description) =>if (state != RecoveryState.ALIVE) {val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +"Can only accept driver submissions in ALIVE state."context.reply(SubmitDriverResponse(self, false, None, msg))} else {logInfo("Driver submitted " + description.command.mainClass)val driver = createDriver(description)persistenceEngine.addDriver(driver)waitingDrivers += driverdrivers.add(driver)schedule()// TODO: It might be good to instead have the submission client poll the master to determine// the current status of the driver. For now it's simply "fire and forget".context.reply(SubmitDriverResponse(self, true, Some(driver.id),s"Driver successfully submitted as ${driver.id}"))} }private def schedule(): Unit = {if (state != RecoveryState.ALIVE) {return}// Drivers take strict precedence over executorsval shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers// We assign workers to each waiting driver in a round-robin fashion. For each driver, we// start from the last worker that was assigned a driver, and continue onwards until we have// explored all alive workers.var launched = falsevar numWorkersVisited = 0while (numWorkersVisited < numWorkersAlive && !launched) {val worker = shuffledAliveWorkers(curPos)numWorkersVisited += 1if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {launchDriver(worker, driver)waitingDrivers -= driverlaunched = true}curPos = (curPos + 1) % numWorkersAlive}}startExecutorsOnWorkers() }private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {logInfo("Launching driver " + driver.id + " on worker " + worker.id)worker.addDriver(driver)driver.worker = Some(worker)worker.endpoint.send(LaunchDriver(driver.id, driver.desc))driver.state = DriverState.RUNNING }
4.Work.scala
work接受消息之后,new DriverRunner() 调用该对象的M-start
case LaunchDriver(driverId, driverDesc) =>logInfo(s"Asked to launch driver $driverId")val driver = new DriverRunner(conf,driverId,workDir,sparkHome,driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),self,workerUri,securityMgr)drivers(driverId) = driverdriver.start()
5.DriverRunner.scala
该对象中,M-start 中new 了一个线程,调用prepareAndRunDriver 最后通过 ProcessBuilder调用 DriverWrapper 的main(step2中的)
private[worker] def start() = {new Thread("DriverRunner for " + driverId) {override def run() {var shutdownHook: AnyRef = nulltry {shutdownHook = ShutdownHookManager.addShutdownHook { () =>logInfo(s"Worker shutting down, killing driver $driverId")kill()}// prepare driver jars and run driverval exitCode = prepareAndRunDriver()// set final state depending on if forcibly killed and process exit codefinalState = if (exitCode == 0) {Some(DriverState.FINISHED)} else if (killed) {Some(DriverState.KILLED)} else {Some(DriverState.FAILED)}} catch {case e: Exception =>kill()finalState = Some(DriverState.ERROR)finalException = Some(e)} finally {if (shutdownHook != null) {ShutdownHookManager.removeShutdownHook(shutdownHook)}}// notify worker of final driver state, possible exception worker.send(DriverStateChanged(driverId, finalState.get, finalException))}}.start() }private[worker] def prepareAndRunDriver(): Int = {val driverDir = createWorkingDirectory()val localJarFilename = downloadUserJar(driverDir)def substituteVariables(argument: String): String = argument match {case "{{WORKER_URL}}" => workerUrlcase "{{USER_JAR}}" => localJarFilenamecase other => other}// TODO: If we add ability to submit multiple jars they should also be added hereval builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)runDriver(builder, driverDir, driverDesc.supervise) }private[worker] def prepareAndRunDriver(): Int = {val driverDir = createWorkingDirectory()val localJarFilename = downloadUserJar(driverDir)def substituteVariables(argument: String): String = argument match {case "{{WORKER_URL}}" => workerUrlcase "{{USER_JAR}}" => localJarFilenamecase other => other}// TODO: If we add ability to submit multiple jars they should also be added hereval builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)runDriver(builder, driverDir, driverDesc.supervise) }6.DriverWrapper.scala (粗粒度Driver client) 开始调用用户指定 jar 和main 真正开始执行我们所写的代码 def main(args: Array[String]) {args.toList match {/** IMPORTANT: Spark 1.3 provides a stable application submission gateway that is both* backward and forward compatible across future Spark versions. Because this gateway* uses this class to launch the driver, the ordering and semantics of the arguments* here must also remain consistent across versions.*/case workerUrl :: userJar :: mainClass :: extraArgs =>val conf = new SparkConf()val host: String = Utils.localHostName()val port: Int = sys.props.getOrElse("spark.driver.port", "0").toIntval rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf))logInfo(s"Driver address: ${rpcEnv.address}")rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))val currentLoader = Thread.currentThread.getContextClassLoaderval userJarUrl = new File(userJar).toURI().toURL()val loader =if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)} else {new MutableURLClassLoader(Array(userJarUrl), currentLoader)}Thread.currentThread.setContextClassLoader(loader)setupDependencies(loader, userJar)// Delegate to supplied main classval clazz = Utils.classForName(mainClass)val mainMethod = clazz.getMethod("main", classOf[Array[String]])mainMethod.invoke(null, extraArgs.toArray[String])rpcEnv.shutdown()case _ =>// scalastyle:off printlnSystem.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")// scalastyle:on printlnSystem.exit(-1)} }