任务提交流程
概述
在阐明了Spark的Master的启动流程与Worker启动流程。接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程
Spark-submit
提交一个任务到集群通过的是Spark-submit
通过启动脚本的方式启动它的主类,这里以WordCount为例子spark-submit --class cn.apache.spark.WordCount
- bin/spark-clas -> org.apache.spark.deploy.SparkSubmit 调用这个类的main方法
- doRunMain方法中传进来一个自定义spark应用程序的main方法
class cn.apache.spark.WordCount
- 通过反射拿到类的实例的引用
mainClass = Utils.classForName(childMainClass)
- 在通过反射调用
class cn.apache.spark.WordCount
的main
方法
我们来看SparkSubmit的main方法
def main(args: Array[String]): Unit = {val appArgs = new SparkSubmitArguments(args)if (appArgs.verbose) {printStream.println(appArgs)}//匹配任务类型appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs)case SparkSubmitAction.KILL => kill(appArgs)case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)}}
这里的类型是submit,调用submit方法
private[spark] def submit(args: SparkSubmitArguments): Unit = {val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)def doRunMain(): Unit = {。。。。。。try {proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {override def run(): Unit = {//childMainClass这个你自己定义的App的main所在的全类名runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)}})} catch {。。。。。。 }} 。。。。。。。//掉用上面的doRunMaindoRunMain()}
submit里调用了doRunMain(),然后调用了runMain,来看runMain
private def runMain(。。。。。。try {//通过反射mainClass = Class.forName(childMainClass, true, loader)} catch {。。。。。。}//反射拿到面方法实例val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)if (!Modifier.isStatic(mainMethod.getModifiers)) {throw new IllegalStateException("The main method in the given main class must be static")}。。。。。。try {//调用App的main方法mainMethod.invoke(null, childArgs.toArray)} catch {case t: Throwable =>throw findCause(t)}}
最主要的流程就在这里了,上面的代码注释很清楚,通过反射调用我们写的类的main方法,大体的流程到此
SparkSubmit时序图
Executor启动流程
SparkSubmit通过反射调用了我们程序的main方法后,就开始执行我们的代码
,一个Spark程序中需要创建SparkContext对象,我们就从这个对象开始
SparkContext的构造方法代码很长,主要关注的地方如下
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {。。。。。。private[spark] def createSparkEnv(conf: SparkConf,isLocal: Boolean,listenerBus: LiveListenerBus): SparkEnv = {//通过SparkEnv来创建createDriverEnvSparkEnv.createDriverEnv(conf, isLocal, listenerBus)}//在这里调用了createSparkEnv,返回一个SparkEnv对象,这个对象里面有很多重要属性,最重要的ActorSystemprivate[spark] val env = createSparkEnv(conf, isLocal, listenerBus)SparkEnv.set(env)//创建taskScheduler// Create and start the schedulerprivate[spark] var (schedulerBackend, taskScheduler) =SparkContext.createTaskScheduler(this, master)//创建DAGSchedulerdagScheduler = new DAGScheduler(this)//启动TaksSchedulertaskScheduler.start()。。。。。
}
Spark的构造方法主要干三件事,创建了一个SparkEnv,taskScheduler,dagScheduler,我们先来看createTaskScheduler
里干了什么
//通过给定的URL创建TaskSchedulerprivate def createTaskScheduler(.....//匹配URL选择不同的方式master match {。。。。。。//这个是Spark的Standalone模式case SPARK_REGEX(sparkUrl) =>//首先创建TaskSchedulerval scheduler = new TaskSchedulerImpl(sc)val masterUrls = sparkUrl.split(",").map("spark://" + _)//很重要val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)//初始化了一个调度器,默认是FIFOscheduler.initialize(backend)(backend, scheduler)。。。。。}
}
通过master的url来匹配到Standalone模式:然后初始化了SparkDeploySchedulerBackend和TaskSchedulerImpl,这两个对象很重要,是启动任务调度的核心,然后调用了scheduler.initialize(backend)
进行初始化
启动TaksScheduler初始化完成,回到我们的SparkContext构造方法后面继续调用了taskScheduler.start()
启动TaksScheduler
来看start方法
override def start() {//调用backend的实现的start方法backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread")import sc.env.actorSystem.dispatchersc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,SPECULATION_INTERVAL milliseconds) {Utils.tryOrExit { checkSpeculatableTasks() }}}}
这里的backend是SparkDeploySchedulerBackend调用了它的start
override def start() {//CoarseGrainedSchedulerBackend的start方法,在这个方法里面创建了一个DriverActorsuper.start()// The endpoint for executors to talk to us//下面是为了启动java子进程做准备,准备一下参数val driverUrl = AkkaUtils.address(AkkaUtils.protocol(actorSystem),SparkEnv.driverActorSystemName,conf.get("spark.driver.host"),conf.get("spark.driver.port"),CoarseGrainedSchedulerBackend.ACTOR_NAME)val args = Seq("--driver-url", driverUrl,"--executor-id", "{{EXECUTOR_ID}}","--hostname", "{{HOSTNAME}}","--cores", "{{CORES}}","--app-id", "{{APP_ID}}","--worker-url", "{{WORKER_URL}}")val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").map(Utils.splitCommandString).getOrElse(Seq.empty)val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath").map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)// When testing, expose the parent class path to the child. This is processed by// compute-classpath.{cmd,sh} and makes all needed jars available to child processes// when the assembly is built with the "*-provided" profiles enabled.val testingClassPath =if (sys.props.contains("spark.testing")) {sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq} else {Nil}// Start executors with a few necessary configs for registering with the schedulerval sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)val javaOpts = sparkJavaOpts ++ extraJavaOpts//用command拼接参数,最终会启动org.apache.spark.executor.CoarseGrainedExecutorBackend子进程val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")//用ApplicationDescription封装了一些重要的参数val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,appUIAddress, sc.eventLogDir, sc.eventLogCodec)//在这里面创建ClientActorclient = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)//启动ClientActorclient.start()waitForRegistration()}
这里是拼装了启动Executor的一些参数,类名+参数 封装成ApplicationDescription。最后传给并创建AppClient并调用它的start方法
AppClient创建时序图
AppClient的start方法
接来下关注start方法
def start() {// Just launch an actor; it will call back into the listener.actor = actorSystem.actorOf(Props(new ClientActor))}
在start方法里创建了与Master通信的ClientActor,然后会调用它的preStart方法向Master注册,接下来看它的preStart
override def preStart() {context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])try {//ClientActor向Master注册registerWithMaster()} catch {case e: Exception =>logWarning("Failed to connect to master", e)markDisconnected()context.stop(self)}}
最后会调用该方法向所有Master注册
def tryRegisterAllMasters() {for (masterAkkaUrl <- masterAkkaUrls) {logInfo("Connecting to master " + masterAkkaUrl + "...")//t通过actorSelection拿到了Master的引用val actor = context.actorSelection(masterAkkaUrl)//向Master发送异步的注册App的消息actor ! RegisterApplication(appDescription)}}
ClientActor发送来的注册App的消息,ApplicationDescription,他包含了需求的资源,要求启动的Executor类名和一些参数
Master的Receiver
case RegisterApplication(description) => {if (state == RecoveryState.STANDBY) {// ignore, don't send response} else {logInfo("Registering app " + description.name)//创建App sender:ClientActorval app = createApplication(description, sender)//注册AppregisterApplication(app)logInfo("Registered app " + description.name + " with ID " + app.id)//持久化ApppersistenceEngine.addApplication(app)//向ClientActor反馈信息,告诉他app注册成功了sender ! RegisteredApplication(app.id, masterUrl)//TODO 调度任务schedule()}}
registerApplication(app)
def registerApplication(app: ApplicationInfo): Unit = {val appAddress = app.driver.path.addressif (addressToApp.contains(appAddress)) {logInfo("Attempted to re-register application at same address: " + appAddress)return}//把App放到集合里面applicationMetricsSystem.registerSource(app.appSource)apps += appidToApp(app.id) = appactorToApp(app.driver) = appaddressToApp(appAddress) = appwaitingApps += app}
Master将接受的信息保存到集合并序列化后发送一个RegisteredApplication
消息通知反馈给ClientActor,接着执行schedule()方法,该方法中会遍历workers集合,并执行launchExecutor
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)//记录该worker上使用了多少资源worker.addExecutor(exec)//Master向Worker发送启动Executor的消息worker.actor ! LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)//Master向ClientActor发送消息,告诉ClientActor executor已经启动了exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)}
这里Master向Worker发送启动Executor的消息
`worker.actor ! LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)`
application.desc里包含了Executor类的启动信息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>。。。。。appDirectories(appId) = appLocalDirs//创建一个ExecutorRunner,这个很重要,保存了Executor的执行配置和参数val manager = new ExecutorRunner(appId,execId,appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),cores_,memory_,self,workerId,host,webUi.boundPort,publicAddress,sparkHome,executorDir,akkaUrl,conf,appLocalDirs, ExecutorState.LOADING)executors(appId + "/" + execId) = manager//TODO 开始启动ExecutorRunnermanager.start()。。。。。。}}}
Worker的Receiver接受到了启动Executor的消息,appDesc对象保存了Command命令、Executor的实现类和参数
manager.start()
里会创建一个线程
def start() {//启动一个线程workerThread = new Thread("ExecutorRunner for " + fullId) {//用一个子线程来帮助Worker启动Executor子进程override def run() { fetchAndRunExecutor() }}workerThread.start()// Shutdown hook that kills actors on shutdown.shutdownHook = new Thread() {override def run() {killProcess(Some("Worker shutting down"))}}Runtime.getRuntime.addShutdownHook(shutdownHook)}
在线程中调用了fetchAndRunExecutor()
方法,我们来看该方法
def fetchAndRunExecutor() {try {// Launch the processval builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,sparkHome.getAbsolutePath, substituteVariables)//构建命令val command = builder.command()logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))builder.directory(executorDir)builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))// In case we are running this from within the Spark Shell, avoid creating a "scala"// parent process for the executor commandbuilder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")// Add webUI log urlsval baseUrl =s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")//启动子进程process = builder.start()val header = "Spark Executor Command: %s\n%s\n\n".format(command.mkString("\"", "\" \"", "\""), "=" * 40)// Redirect its stdout and stderr to filesval stdout = new File(executorDir, "stdout")stdoutAppender = FileAppender(process.getInputStream, stdout, conf)val stderr = new File(executorDir, "stderr")Files.write(header, stderr, UTF_8)stderrAppender = FileAppender(process.getErrorStream, stderr, conf)// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)// or with nonzero exit code//开始执行,等待结束信号val exitCode = process.waitFor()。。。。}}
这里面进行了类名和参数的拼装,具体拼装过程不用关心,最终builder.start()
会以SystemRuntime的方式启动一个子进程,这个是进程的类名是CoarseGrainedExecutorBackend
到此Executor进程就启动起来了
Executor创建时序图
Executor任务调度对象启动
Executor进程后,就首先要执行main方法,main的代码如下
//Executor进程启动的入口def main(args: Array[String]) {。。。。//拼装参数while (!argv.isEmpty) {argv match {case ("--driver-url") :: value :: tail =>driverUrl = valueargv = tailcase ("--executor-id") :: value :: tail =>executorId = valueargv = tailcase ("--hostname") :: value :: tail =>hostname = valueargv = tailcase ("--cores") :: value :: tail =>cores = value.toIntargv = tailcase ("--app-id") :: value :: tail =>appId = valueargv = tailcase ("--worker-url") :: value :: tail =>// Worker url is used in spark standalone mode to enforce fate-sharing with workerworkerUrl = Some(value)argv = tailcase ("--user-class-path") :: value :: tail =>userClassPath += new URL(value)argv = tailcase Nil =>case tail =>System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")printUsageAndExit()}}if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||appId == null) {printUsageAndExit()}//开始执行Executorrun(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)}
执行了run方法
private def run(driverUrl: String,executorId: String,hostname: String,cores: Int,appId: String,workerUrl: Option[String],userClassPath: Seq[URL]) 。。。。。//通过actorSystem创建CoarseGrainedExecutorBackend -> Actor//CoarseGrainedExecutorBackend -> DriverActor通信env.actorSystem.actorOf(Props(classOf[CoarseGrainedExecutorBackend],driverUrl, executorId, sparkHostPort, cores, userClassPath, env),name = "Executor")。。。。。。}env.actorSystem.awaitTermination()}}
run方法中创建了CoarseGrainedExecutorBackend的Actor对象用于准备和DriverActor通信,接着会继续调用preStart生命周期方法
override def preStart() {logInfo("Connecting to driver: " + driverUrl)//Executor跟DriverActor建立连接driver = context.actorSelection(driverUrl)//Executor向DriverActor发送消息driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])}
Executor向DriverActor发送注册的消息 driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
DriverActor的receiver收到消息后
def receiveWithLogging = {//Executor发送给DriverActor的注册消息case RegisterExecutor(executorId, hostPort, cores, logUrls) =>Utils.checkHostPort(hostPort, "Host port expected " + hostPort)if (executorDataMap.contains(executorId)) {sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)} else {logInfo("Registered executor: " + sender + " with ID " + executorId)//DriverActor向Executor发送注册成功的消息sender ! RegisteredExecutoraddressToExecutorId(sender.path.address) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val (host, _) = Utils.parseHostPort(hostPort)//将Executor的信息封装起来val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)// This must be synchronized because variables mutated// in this block are read when requesting executorsCoarseGrainedSchedulerBackend.this.synchronized {//往集合添加Executor的信息对象executorDataMap.put(executorId, data)if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))//将来用来执行真正的业务逻辑makeOffers()}
DriverActor的receiver里将Executor信息封装到Map中保存起来,并发送反馈消息 sender ! RegisteredExecutor
给CoarseGrainedExecutorBackend
override def receiveWithLogging = {case RegisteredExecutor =>logInfo("Successfully registered with driver")val (hostname, _) = Utils.parseHostPort(hostPort)executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
CoarseGrainedExecutorBackend收到消息后创建一个Executor对象用于准备任务的执行,到此Executor的创建就完成了,接下来下篇介绍任务的调度。