spark DAGScheduler、TaskSchedule、Executor执行task源码分析

摘要

spark的调度一直是我想搞清楚的东西,以及有向无环图的生成过程、task的调度、rdd的延迟执行是怎么发生的和如何完成的,还要就是RDD的compute都是在executor的哪个阶段调用和执行我们定义的函数的。这些都非常的基础和困难。花一段时间终于弄白了其中的奥秘。总结起来,以便以后继续完善。spark的调度分为两级调度:DAGSchedule和TaskSchedule。DAGSchedule是根据job来生成相互依赖的stages,然后把stages以TaskSet形式传递给TaskSchedule来进行任务的分发过程,里面的细节会慢慢的讲解出来的,比较长。

本文目录

1、spark的RDD逻辑执行链
2、spark的job的划分、stage的划分
3、spark的DAGScheduler的调度
4、spark的TaskSchedule的调度
5、executor如何执行task以及我们定义的函数

spark的RDD的逻辑执行链

都说spark进行延迟执行,通过RDD的DAG来生成相应的Stage等,RDD的DAG的形成过程,是通过依赖来完成的,每一个RDD通过转换算子的时候都会生成一个和多个子RDD,在通过转换算子的时候,在创建一个新的RDD的时候,也会创建他们之间的依赖关系。因此他们是通过Dependencies连接起来的,RDD的依赖不是我们的重点,如果想了解RDD的依赖,可以自行google,RDD的依赖分为:1:1的OneToOneDependency,m:1的RangeDependency,还有m:n的ShuffleDependencies,其中OneToOneDependency和RangeDependency又被称为NarrowDependency,这里的1:1,m:1,m:n的粒度是对于RDD的分区而言的。
spark DAGScheduler、TaskSchedule、Executor执行task源码分析

依赖中最根本的是保留了父RDD,其rdd的方法就是返回父RDD的方法。这样其就形成了一个链表形式的结构,通过最后面的RDD根据依赖,可以向前回溯到所有的父类RDD。
我们以map为例,来看一下依赖是如何产生的。
spark DAGScheduler、TaskSchedule、Executor执行task源码分析

通过map其实其实创建了一个MapPartitonsRDD的RDD
spark DAGScheduler、TaskSchedule、Executor执行task源码分析
然后我们看一下MapPartitonsRDD的主构造函数,其又对RDD进行了赋值,其中父RDD就是上面的this对象指定的RDD,我们再看一下RDD这个类的构造函数:
spark DAGScheduler、TaskSchedule、Executor执行task源码分析
其又调用了RDD的主构造函数
spark DAGScheduler、TaskSchedule、Executor执行task源码分析
其实依赖都是在RDD的构造函数中形成的。
通过上面的依赖转换就形成了RDD额DAG图
生成了一个RDD的DAG图:
spark DAGScheduler、TaskSchedule、Executor执行task源码分析
spark的job的划分、stage的划分
spark的Application划分job其实挺简单的,一个Application划分为几个job,我们就要看这个Application中有多少个Action算子,一个Action算子对应一个job,这个可以通过源码来看出来,转换算子是形成一个或者多个RDD,而Action算子是触发job的提交。
比如上面的map转换算子就是这样的
spark DAGScheduler、TaskSchedule、Executor执行task源码分析
而Action算子是这样的:
spark DAGScheduler、TaskSchedule、Executor执行task源码分析
通过runJob方法提交作业。stage的划分是根据是否进行shuflle过程来决定的,这个后面会细说。

spark的DAGScheduler的调度

当我们通过客户端,向spark集群提交作业时,如果利用的资源管理器是yarn,那么客户端向spark提交申请运行driver进程的机器,driver其实在spark中是没有具体的类的,driver机器主要是用来运行用户编写的代码的地方,完成DAGScheduler和TaskSchedule,追踪task运行的状态。记住,用户编写的主函数是在driver中运行的,但是RDD转换和执行是在不同的机器上完成。其实driver主要负责作业的调度和分发。Action算子到stage的划分和DAGScheduler的完成过程。
当我们在driver进程中运行用户定义的main函数的时候,首先会创建SparkContext对象,这个是我们与spark集群进行交互的入口它会初始化很多运行需要的环境,最主要的是初始化了DAGScheduler和TaskSchedule。
spark DAGScheduler、TaskSchedule、Executor执行task源码分析
我们以这样的的一个RDD的逻辑执行图来分析整个DAGScheduler的过程。
spark DAGScheduler、TaskSchedule、Executor执行task源码分析
因为DAGScheduler发生在driver进程中,我们就冲Driver进程运行用户定义的main函数开始。在上图中RDD9是最后一个RDD并且其调用了Action算子,就会触发作业的提交,其会调用SparkContext的runjob函数,其经过一系列的runJob的封装,会调用DAGScheduler的runJob

在SparkContext中存在着runJob方法

----------------------------------------------

def runJob[T, U: ClassTag](
rdd: RDD[T], // rdd为上面提到的RDD逻辑执行图中的RDD9
func: (TaskContext, Iterator[T]) => U,这个方法也是RDD9调用Action算子传入的函数
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

----------------------------------------------

DAGScheduler的runJob

----------------------------------------------

def runJob[T, U](
rdd: RDD[T], //RDD9
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
//在这里会生成一个job的守护进程waiter,用来等待作业提交执行是否完成,其又调用了submitJob,其以下的代
//码都是用来处运行结果的一些log日志信息
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}

----------------------------------------------

submitJob的源代码

----------------------------------------------

def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// 检查RDD的分区是否合法
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}

val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}

assert(partitions.size > 0)
//这一块是把我们的job继续进行封装到JobSubmitted,然后放入到一个进程中池里,spark会启动一个线程来处理我
//们提交的作业
val func2 = func.asInstanceOf[(TaskContext, Iterator[]) => ]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}

----------------------------------------------

在DAGScheduler类中有一个DAGSchedulerEventProcessLoop的类,用来接收处理DAGScheduler的消息事件
spark DAGScheduler、TaskSchedule、Executor执行task源码分析
JobSubmitted对象,因此会执行第一个操作handleJobSubmitted,在这里我们要说一下,Stage的类型,在spark中有两种类型的stage一种是ShuffleMapStage,和ResultStage,最后一个RDD对应的Stage是ResultStage,遇到Shuffle过程的RDD被称为ShuffleMapStage。

----------------------------------------------

private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[],//对应RDD9
func: (TaskContext, Iterator[
]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// 先创建ResultStage。
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))

val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}

----------------------------------------------

上面的createResultStage其实就是RDD转换为Stage的过程,方法如下

----------------------------------------------

/*
创建ResultStage的时候,它会调用相关函数
*/
private def createResultStage(
rdd: RDD[], //对应上图的RDD9
func: (TaskContext, Iterator[
]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

/**

  • 形成ResultStage依赖的父Stage
    */
    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
    }
    /**
  • 采用的是深度优先遍历找到Action算子的父依赖中的宽依赖
  • 这个是最主要的方法,要看懂这个方法,其实后面的就好理解,最好结合这例子上面给出的RDD逻辑依赖图,比*
  • 较容易看出来,根据上面的RDD逻辑依赖图,其返回的ShuffleDependency就是RDD2和RDD1,RDD7和RDD6的依
    赖,如果存在A<-B<-C,这两个都是shuffle依赖,那么对于C其只返回B的shuffle依赖,而不会返回A
    /
    private[scheduler] def getShuffleDependencies(
    rdd: RDD[]): HashSet[ShuffleDependency[, , ]] = {
    //用来存放依赖
    val parents = new HashSet[ShuffleDependency[, , ]]
    //遍历过的RDD放入这个里面
    val visited = new HashSet[RDD[
    ]]
    //创建一个待遍历RDD的栈结构
    val waitingForVisit = new ArrayStack[RDD[]]
    //压入finalRDD,逻辑图中的RDD9
    waitingForVisit.push(rdd)
    //循环遍历这个栈结构
    while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.pop()
    // 如果RDD没有被遍历过执行其中的代码
    if (!visited(toVisit)) {
    //然后把其放入已经遍历队列中
    visited += toVisit
    //得到依赖,我们知道依赖中存放的有父RDD的对象
    toVisit.dependencies.foreach {
    //如果这个依赖是shuffle依赖,则放入返回队列中
    case shuffleDep: ShuffleDependency[
    , , ] =>
    parents += shuffleDep
    case dependency =>
    //如果不是shuffle依赖,把其父RDD压入待访问栈中,从而进行循环
    waitingForVisit.push(dependency.rdd)
    }
    }
    }
    parents
    }
    /创建shuffleMapStage,根据上面得到的两个Shuffle对象,分别创建了两个shuffleMapStage
    /
    /
    def createShuffleMapStage(shuffleDep: ShuffleDependency[, , _], jobId: Int): ShuffleMapStage = {
    //这个RDD其实就是RDD1和RDD6
    val rdd = shuffleDep.rdd
    val numTasks = rdd.partitions.length
    val parents = getOrCreateParentStages(rdd, jobId) //查看这两个ShuffleMapStage是否存在父Shuffle的Stage
    val id = nextStageId.getAndIncrement()
    //创建ShuffleMapStage,下面是更新一下SparkContext的状态
    val stage = new ShuffleMapStage(
    id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)

    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
    }

    ----------------------------------------------

    通过上面的源代码分析,结合RDD的逻辑执行图,我们可以看出,这个job拥有三个Stage,一个ResultStage,两个ShuffleMapStage,一个ShuffleMapStage中的RDD是RDD1,另一个stage中的RDD是RDD6,从而,以上完成了RDD到Stage的切分工作。当切分完成后在handleJobSubmitted这个方法的最后,调用提交stage的方法。

submitStage源代码比较简单,它会检查我们当前的stage依赖的父stage是否已经执行完成,如果没有执行完成会循环提交其父stage等待其父stage执行完成了,才提交我们当前的stage进行执行。

----------------------------------------------

private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}

----------------------------------------------

提交task的方法源代码,我们按照刚才的三个stage中,提交的是前两个stage的过程来看待这个源代码。以包含RDD1的stage为例

----------------------------------------------

private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingPartitions.clear()

// 计算需要计算的分区数
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).propertiesrunningStages += stage// 封装stage的一些信息,得到stage到分区数的映射关系,即一个stage对应多少个分区需要计算
stage match {case s: ShuffleMapStage =>outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)case s: ResultStage =>outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}

//得到每个分区对应的具体位置,即分区的数据位于集群的哪台机器上。
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
// 这个把上面stage要计算的分区和每个分区对应的物理位置进行了从新封装,放在了latestInfo里面
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

//序列化我们刚才得到的信息,以便在driver机器和work机器之间进行传输
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

  taskBinary = sc.broadcast(taskBinaryBytes)
} catch {// In the case of a failure during serialization, abort the stage.case e: NotSerializableException =>abortStage(stage, "Task not serializable: " + e.toString, Some(e))runningStages -= stage// Abort executionreturncase NonFatal(e) =>abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))runningStages -= stagereturn
}

//封装stage构成taskSet集合,ShuffleMapStage对应的task为ShuffleMapTask,而ResultStage对应的taskSet为ResultTask
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

  case stage: ResultStage =>partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)}
}

} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}

//提交task给TaskSchedule
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)

val debugString = stage match {case stage: ShuffleMapStage =>s"Stage ${stage} is actually done; " +s"(available: ${stage.isAvailable}," +s"available outputs: ${stage.numAvailableOutputs}," +s"partitions: ${stage.numPartitions})"case stage : ResultStage =>s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)submitWaitingChildStages(stage)

}
}

----------------------------------------------

到此,完成了整个DAGScheduler的调度。

spark的TaskSchedule的调度

spark的Task的调度,我们要明白其调度过程,其根据不同的资源管理器拥有不同的调度策略,因此也拥有不同的调度守护进程,这个守护进程管理着集群的资源信息,spark提供了一个基本的守护进程的类,来完成与driver和executor的交互:CoarseGrainedSchedulerBackend,它应该运行在集群资源管理器上,比如yarn等。他收集了集群work机器的一般资源信息。当我们形成tasks将要进行调度的时候,driver进程会与其通信,请求资源的分配和调度,其会把最优的work节点分配给task来执行其任务。而TaskScheduleImpl实现了task调度的过程,采用的调度算法默认的是FIFO的策略,也可以采用公平调度策略。
spark DAGScheduler、TaskSchedule、Executor执行task源码分析

当我们提交task时,其会创建一个管理task的类TaskSetManager,然后把其加入到任务调度池中。

----------------------------------------------

override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
// 创建taskSetManager,以下为更新一下状态
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{
._2.taskSet.id}.mkString(",")}")
}
//把封装好的taskSet,加入到任务调度队列中。
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true

}
//这个地方就是向资源管理器发出请求,请求任务的调度
backend.reviveOffers()
}

/*

*这个方法是位于CoarseGrainedSchedulerBackend类中,driver进程会想集群管理器发送请求资源的请求。
/
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}

----------------------------------------------

当其收到这个请求时,其会调用这样的方法。

----------------------------------------------

override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
//发送的请求满足这个条件
case ReviveOffers =>
makeOffers()

case KillTask(taskId, executorId, interruptThread) =>
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))
case None =>
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
}

/*

*这个方法是搜集集群上现在还在活着的机器的相关信息。并且进行封装成WorkerOffer类,

  • 然后其会调用TaskSchedulerImpl中的resourceOffers方法,来进行筛选,筛选出符合请求资源的机器,来执行我们当前的任务
    /
    private def makeOffers() {
    // Filter out executors under killing
    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    val workOffers = activeExecutors.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toIndexedSeq
    launchTasks(scheduler.resourceOffers(workOffers))
    }

/*
得到集群中空闲机器的信息后,我们通过此方法来筛选出满足我们这次任务要求的机器,然后返回TaskDescription类
*这个类封装了task与excutor的相关信息

  • /
    def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    //检查work是否已经存在了,把不存在的加入到work调度池中
    for (o <- offers) {
    if (!hostToExecutors.contains(o.host)) {
    hostToExecutors(o.host) = new HashSet[String]()
    }
    if (!executorIdToRunningTaskIds.contains(o.executorId)) {
    hostToExecutors(o.host) += o.executorId
    executorAdded(o.executorId, o.host)
    executorIdToHost(o.executorId) = o.host
    executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
    newExecAvail = true
    }
    for (rack <- getRackForHost(o.host)) {
    hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
    }
    }
    // 打乱work机器的顺序,以免每次分配任务时都在同一个机器上进行。避免某一个work计算压力太大。
    val shuffledOffers = Random.shuffle(offers)
    //对于每一work,创建一个与其核数大小相同的数组,数组的大小决定了这台work上可以并行执行task的数目.
    val tasks = shuffledOffers.map(o => new ArrayBufferTaskDescription)
    //取出每台机器的cpu核数
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    //从task任务调度池中,按照我们的调度算法,取出需要执行的任务
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
    taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
    taskSet.executorAdded()
    }
    }
    // 下面的这个循环,是用来标记task根据work的信息来标定数据本地化的程度的。当我们在yarn资源管理器,以--driver-mode配置
    //为client时,我们就会在打出来的日志上看出每一台机器上运行task的数据本地化程度。同时还会选择每个task对应的work机器
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    for (taskSet <- sortedTaskSets) {
    var launchedAnyTask = false
    var launchedTaskAtCurrentMaxLocality = false
    for (currentMaxLocality <- taskSet.myLocalityLevels) {
    do {
    launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
    taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
    launchedAnyTask |= launchedTaskAtCurrentMaxLocality
    } while (launchedTaskAtCurrentMaxLocality)
    }
    if (!launchedAnyTask) {
    taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    }
    }

    if (tasks.size > 0) {
    hasLaunchedTask = true
    }
    //返回taskDescription对象
    return tasks
    }

/*
task选择执行其任务的work其实是在这个函数中实现的,从这个可以看出,一台work上其实是可以运行多个task,主要是看如何
*进行算法调度

  • /
    private def resourceOfferSingleTaskSet(
    taskSet: TaskSetManager,
    maxLocality: TaskLocality,
    shuffledOffers: Seq[WorkerOffer],
    availableCpus: Array[Int],
    tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
    //循环所有的机器,找适合此机器的task
    for (i <- 0 until shuffledOffers.size) {
    val execId = shuffledOffers(i).executorId
    val host = shuffledOffers(i).host
    //判断其剩余的cpu核数是否满足我们的最低配置,满足则为其分配任务,否则不为其分配任务。
    if (availableCpus(i) >= CPUS_PER_TASK) {
    try {
    //这个for中的resourOffer就是来判断其标记任务数据本地化的程度的。task(i)其实是一个数组,数组大小和其cpu核心数大小相同。
    for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
    tasks(i) += task
    val tid = task.taskId
    taskIdToTaskSetManager(tid) = taskSet
    taskIdToExecutorId(tid) = execId
    executorIdToRunningTaskIds(execId).add(tid)
    availableCpus(i) -= CPUS_PER_TASK
    assert(availableCpus(i) >= 0)
    launchedTask = true
    }
    } catch {
    case e: TaskNotSerializableException =>
    logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
    // Do not offer resources for this task, but don't throw an error to allow other
    // task sets to be submitted.
    return launchedTask
    }
    }
    }
    return launchedTask
    }

    ----------------------------------------------

    以上完成了从TaskSet到task和work机器的绑定过程的所有任务。下面就是如何发送task到executor进行执行。在makeOffers()方法中调用了launchTasks方法,这个方法其实就是发送task作业到指定的机器上。只此,spark TaskSchedule的调度就此结束。

spark DAGScheduler、TaskSchedule、Executor执行task源码分析

executor如何执行task以及我们定义的函数

当TaskSchedule完成对task的调度时,task需要在work机器上来进行执行。此时,work机器就会启动一个Backend的守护进程,用来完成与driver和资源管理器的通信。这个Backend就是CoarseGrainedExecutorBackend,启动的main主函数为,从main函数中可以看出,其主要进行参数的解析,然后运行run方法。

----------------------------------------------

def main(args: Array[String]) {
var driverUrl: String = null
var executorId: String = null
var hostname: String = null
var cores: Int = 0
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()
var argv = args.toList
while (!argv.isEmpty) {
argv match {
case ("--driver-url") :: value :: tail =>
driverUrl = value
argv = tail
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--hostname") :: value :: tail =>
hostname = value
argv = tail
case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail
case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case Nil =>
case tail =>
// scalastyle:off println
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
// scalastyle:on println
printUsageAndExit()
}
}
if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
}
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}
/*
可以看出,run方法只是进行了一些需要运行task所需要的环境进行配置。并且创建相应的运行环境。

  • /
    private def run(
    driverUrl: String,
    executorId: String,
    hostname: String,
    cores: Int,
    appId: String,
    workerUrl: Option[String],
    userClassPath: Seq[URL]) {

    Utils.initDaemon(log)

    SparkHadoopUtil.get.runAsSparkUser { () =>
    // Debug code
    Utils.checkHost(hostname)

    // Bootstrap to fetch the driver's Spark properties.
    val executorConf = new SparkConf
    val port = executorConf.getInt("spark.executor.port", 0)
    val fetcher = RpcEnv.create(
    "driverPropsFetcher",
    hostname,
    port,
    executorConf,
    new SecurityManager(executorConf),
    clientMode = true)
    val driver = fetcher.setupEndpointRefByURI(driverUrl)
    val cfg = driver.askWithRetrySparkAppConfig
    val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
    fetcher.shutdown()

    // Create SparkEnv using properties we fetched from the driver.
    val driverConf = new SparkConf()
    for ((key, value) <- props) {
    // this is required for SSL in standalone mode
    if (SparkConf.isExecutorStartupConf(key)) {
    driverConf.setIfMissing(key, value)
    } else {
    driverConf.set(key, value)
    }
    }
    if (driverConf.contains("spark.yarn.credentials.file")) {
    logInfo("Will periodically update credentials from: " +
    driverConf.get("spark.yarn.credentials.file"))
    SparkHadoopUtil.get.startCredentialUpdater(driverConf)
    }

    val env = SparkEnv.createExecutorEnv(
    driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)

    env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
    env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
    workerUrl.foreach { url =>
    env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
    }
    env.rpcEnv.awaitTermination()
    SparkHadoopUtil.get.stopCredentialUpdater()
    }
    }

    ----------------------------------------------

    其执行函数的调用过程如下:
    spark DAGScheduler、TaskSchedule、Executor执行task源码分析

我们知道当我们完成TaskSchedule的调度时,是通过rpc发送了一个消息,如下图所示,当work机器的Backend启动以后,其会与driver进程进行rpc通信,当其收到LaunchTask的消息后,其会执行下面的代码。
spark DAGScheduler、TaskSchedule、Executor执行task源码分析
我们可以看出此方法存在很多的情况,根据接收到的不同的消息,执行不同的代码。我们上面执行的是LaunchTask的请求。

----------------------------------------------

override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}

case RegisterExecutorFailed(message) =>
exitExecutor(1, "Slave registration failed: " + message)
//提交任务时,执行这样的操作。
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
//先反序列化
val taskDesc = ser.deserializeTaskDescription
logInfo("Got assigned task " + taskDesc.taskId)
//然后执行launchTask操作。
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}

case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
exitExecutor(1, "Received KillTask command but executor was null")
} else {
executor.killTask(taskId, interruptThread)
}

case StopExecutor =>
stopping.set(true)
logInfo("Driver commanded a shutdown")
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
// a message to self to actually do the shutdown.
self.send(Shutdown)

case Shutdown =>
stopping.set(true)
new Thread("CoarseGrainedExecutorBackend-stop-executor") {
override def run(): Unit = {
// executor.stop() will call SparkEnv.stop() which waits until RpcEnv stops totally.
// However, if executor.stop() runs in some thread of RpcEnv, RpcEnv won't be able to
// stop until executor.stop() returns, which becomes a dead-lock (See SPARK-14180).
// Therefore, we put this line in a new thread.
executor.stop()
}
}.start()
}

----------------------------------------------

Executor的相关源代码,从源码中我们可以看出,对于Task,其创建了一个TaskRunner的线程,并且把其放入到执行队列中进行执行。

----------------------------------------------

def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}

----------------------------------------------

从下面可以看出,其定义的就是一个线程,那我们就看一下这个线程的run方法。

spark DAGScheduler、TaskSchedule、Executor执行task源码分析

----------------------------------------------

override def run(): Unit = {
//初始化线程运行需要的一些环境
val threadMXBean = ManagementFactory.getThreadMXBean
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
//得到当前进程的类加载器
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
//更新相关的状态
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
var taskStartCpu: Long = 0
startGCTime = computeTotalGcTime()

try {

//反序列化类相关的依赖,得到相关的参数
val (taskFiles, taskJars, taskProps, taskBytes) =
Task.deserializeWithDependencies(serializedTask)

  // Must be set before updateDependencies() is called, in case fetching dependencies// requires access to properties contained within (e.g. for access control).Executor.taskDeserializationProps.set(taskProps)

//更新依赖配置
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
task.localProperties = taskProps
task.setTaskMemoryManager(taskMemoryManager)

  // If this task has been killed before we deserialized it, let's quit now. Otherwise,// continue executing the task.if (killed) {// Throw an exception rather than returning, because returning within a try{} block// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl// exception will be caught by the catch block, leading to an incorrect ExceptionFailure// for the task.throw new TaskKilledException}logDebug("Task " + taskId + "'s epoch is " + task.epoch)

//追踪缓存数据的位置
env.mapOutputTracker.updateEpoch(task.epoch)

  // Run the actual task and measure its runtime.taskStart = System.currentTimeMillis()taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lvar threwException = true

//运行任务的run方法来运行task,主要就是下面的task.run方法,它又会调用runTask方法来真正执行task,前面我们提到过,job变
//为stage有两种,ShuffleMapStage和ResultStage,那么其对应的也有两个Task:ShuffleMapTask和ResultTask,不同的task类型,执行不同的run方法。
val value = try {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
} finally {
//下面就是根据上面的运行结果,来进行一些判断和日志的打出
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()

    if (freedMemory > 0 && !threwException) {val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {throw new SparkException(errMsg)} else {logWarning(errMsg)}}if (releasedLocks.nonEmpty && !threwException) {val errMsg =s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +releasedLocks.mkString("[", ", ", "]")if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) {throw new SparkException(errMsg)} else {logWarning(errMsg)}}}val taskFinish = System.currentTimeMillis()val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0L// If the task has been killed, let's fail it.if (task.killed) {throw new TaskKilledException}val resultSer = env.serializer.newInstance()val beforeSerialization = System.currentTimeMillis()val valueBytes = resultSer.serialize(value)val afterSerialization = System.currentTimeMillis()// Deserialization happens in two parts: first, we deserialize a Task object, which// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.task.metrics.setExecutorDeserializeTime((taskStart - deserializeStartTime) + task.executorDeserializeTime)task.metrics.setExecutorDeserializeCpuTime((taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)// We need to subtract Task.run()'s deserialization time to avoid double-countingtask.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)task.metrics.setExecutorCpuTime((taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)// Note: accumulator updates must be collected after TaskMetrics is updatedval accumUpdates = task.collectAccumulatorUpdates()// TODO: do not serialize value twiceval directResult = new DirectTaskResult(valueBytes, accumUpdates)val serializedDirectResult = ser.serialize(directResult)val resultSize = serializedDirectResult.limit// directSend = sending directly back to the driverval serializedResult: ByteBuffer = {if (maxResultSize > 0 && resultSize > maxResultSize) {logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +s"dropping it.")ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))} else if (resultSize > maxDirectResultSize) {val blockId = TaskResultBlockId(taskId)env.blockManager.putBytes(blockId,new ChunkedByteBuffer(serializedDirectResult.duplicate()),StorageLevel.MEMORY_AND_DISK_SER)logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))} else {logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")serializedDirectResult}}execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)} catch {case ffe: FetchFailedException =>val reason = ffe.toTaskFailedReasonsetTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))case _: TaskKilledException =>logInfo(s"Executor killed $taskName (TID $taskId)")setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))case _: InterruptedException if task.killed =>logInfo(s"Executor interrupted and killed $taskName (TID $taskId)")setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))case CausedBy(cDE: CommitDeniedException) =>val reason = cDE.toTaskFailedReasonsetTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))case t: Throwable =>// Attempt to exit cleanly by informing the driver of our failure.// If anything goes wrong (or this was a fatal exception), we will delegate to// the default uncaught exception handler, which will terminate the Executor.logError(s"Exception in $taskName (TID $taskId)", t)// Collect latest accumulator values to report back to the driverval accums: Seq[AccumulatorV2[_, _]] =if (task != null) {task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)task.collectAccumulatorUpdates(taskFailed = true)} else {Seq.empty}val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))val serializedTaskEndReason = {try {ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))} catch {case _: NotSerializableException =>// t is not serializable so just send the stacktraceser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))}}setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)// Don't forcibly exit unless the exception was inherently fatal, to avoid// stopping other tasks unnecessarily.if (Utils.isFatalError(t)) {SparkUncaughtExceptionHandler.uncaughtException(t)}} finally {runningTasks.remove(taskId)
}

}
}

----------------------------------------------

前面我们提到过,job变为stage有两种,ShuffleMapStage和ResultStage,那么其对应的也有两个Task:ShuffleMapTask和
ResultTask,不同的task类型,执行不同的Task.runTask方法。Task.run方法中调用了runTask的方法,这个方法在上面两个Task类中都进行了重写。
ShuffleMapTask的runTask方法

----------------------------------------------

override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
//首先进行一些初始化操作
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
//反序列化,这里的rdd,其实是我们进行shuffle之前的最后一个rdd,这个我们在前面已经说到的。
val (rdd, dep) = ser.deserialize[(RDD[], ShuffleDependency[, , ])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
//下面就是把每一个shuffle之前的stage的最后一个rdd进行写入操作,但是没有看到task执行我们写的函数,也没有看到其调用compute函数以及rdd之间的管道执行也没有体现出来,往下看,会揭露这些问题的面纱。
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[
<: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}

----------------------------------------------

对于上面红色部分的问题,我们在这里进行详细的解释。RDD会根据依赖关系来形成一个有向无环图,通过最后一个RDD和其依赖,我们就可以反向查找其对应的所有父类。如果没有shuffle过程,那么其就会形成管道,形成管道的好处就是所有RDD的中间结果不需要进行存储,直接就把我们的定义的多个函数串连起来,从输入到输出中间结果不需要存储,节省了时间和空间。同时我们也知道RDD的中间结果可以持久化到内存或者硬盘上,spark对于这个是可以追踪到的。
spark DAGScheduler、TaskSchedule、Executor执行task源码分析

通过上面的分析,我们可以看出,executor中
spark DAGScheduler、TaskSchedule、Executor执行task源码分析
正是我们RDD往前回溯的开始。对于shuffle过程和ResultTask的runTask的执行过程以后会在慢慢跟进。

转载于:https://blog.51cto.com/9269309/2091219

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/539065.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码实现tan graph model for classification_自定义 Estimator 实现(以BERT为例)

本文将主要介绍tensorflow 的Estimator 这个高级API&#xff0c;它的主要作用就是提出一个高级范式&#xff08;paradigm&#xff09;&#xff0c;将模型的训练&#xff0c;验证&#xff0c;预测&#xff0c;以及保存规范起来&#xff0c;免去了tensorflow的Session.run 的操作…

英雄联盟怎么解除小窗口_英雄联盟手游怎么加好友_英雄联盟手游怎么加好友一起玩_资讯...

英雄联盟手游是腾讯联合英雄联盟开发商拳头开发的英雄联盟手游。不仅能够高度还原端游的经典操作和竞技体验&#xff0c;也具有非常多创新的元素&#xff0c;对于英雄联盟的全球生态布局具有重要意义。英雄联盟手游游戏中有非常多的英雄可以供玩家选择&#xff0c;并且拥有排位…

Sonar与jenkins集成

2019独角兽企业重金招聘Python工程师标准>>> 参考文档&#xff1a;http://blog.csdn.net/kefengwang/article/details/54377055 一.下载&#xff1a;wget https://fossies.org/linux/misc/sonarqube-7.0.zip 二.配置sonar.properties ## sudo vim /opt/sonarqube-6.…

eplan连接定义点不显示_EPLAN电气图实例--控制柜(控制面板)

EPLAN电气图实例--控制柜(控制面板)上期回顾(上期主要画了硬件的布局图)&#xff1a;这期主要画一个控制面板控制柜布局1.0 上期主要做了一个长方形的结构板&#xff0c;里面插入了一个结构盒&#xff0c;然后放置一个HMI的宏(这里是KTP1000&#xff0c;在官网随便找下就行了)&…

markdown 语法_markdown特殊语法之上下标

markdown特殊语法之上下标​markdown的基本语法很简单&#xff0c;百度一下就可以了&#xff0c;有空的话我再转载一些过来。我想的是平常其实需要用到的一些输入技巧&#xff0c;特殊用法或者扩展语法&#xff0c;还有一些难点倒是要记录学习一下。在写作的时候&#xff0c;大…

oracle安装向导卡住了_JDK 8 的安装与配置

一、安装环节1. 打开网页https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html2.找到对象的版本 尽量从官网下载(官网可能会比较慢&#xff0c;也可以通过浏览器输入jdk版本号进行下载)官网下载需要注册一个账号3.双击下载的 exe,如 jdk-8u131-windows…

web 前端 如何分享到instagram_如何找到靠谱的Web培训机构?web前端培训机构哪个好?...

现如今Web前端开发应用越来越广泛&#xff0c;Web前端工程师人才需求逐年递增&#xff0c;薪资待遇也是水涨船高&#xff0c;也因此吸引了越来越多的人想要迈入Web前端行业&#xff0c;参加Web前端培训是很多人选择学习前端开发技能的途径&#xff0c;那么Web前端培训机构哪个好…

Spring Data JPA 实例查询

转自&#xff1a;https://www.cnblogs.com/rulian/p/6533109.html 一、相关接口方法 在继承JpaRepository接口后&#xff0c;自动拥有了按“实例”进行查询的诸多方法。这些方法主要在两个接口中定义&#xff0c;一是QueryByExampleExecutor&#xff0c;一个是JpaRepository&am…

windows找不到文件gpedit.msc_此电脑右键管理提示windows找不到文件的解决方法

也许当你右键点击此电脑管理时&#xff0c;也许会出现Windows找不到文件的提示&#xff0c;下面提供一些解决方法&#xff1a;1、首先按下“Windows”R组合键打开运行&#xff0c;在弹出的“运行“对话框中输入”compmgmt.msc“&#xff0c;点击”确定“&#xff1b;2、如果此时…

java序列化流_java 序列化流与反序列化流

一 对象序列化流ObjectOutputStreamObjectOutputStream 将Java对象的基本数据类型和图形写入OutputStream。可以使用ObjectInputStream读取(重构)对象。通过在流中使用文件可以实现对象的持久存储。注意&#xff1a;只能将支持 java.io.Serializable 接口的对象写入流中用于从流…

win10文件显示后缀名_win10系统,如何去除“此电脑” 里的6个多余文件夹

Windows系统从XP到win7,再到进化到如今的win10&#xff0c;在系统性能和使用便捷性方面&#xff0c;确实改进了很多&#xff0c;但是&#xff0c;金无赤足&#xff0c;没有完美的东西&#xff0c;总是有一些小细节让我们感觉不舒服&#xff0c;如如鲠在喉。比如我们下面看到的这…

数字化工厂的五大系统_如何搭建以MES系统为核心的数字化工厂?

MES强调车间级的过程集成、控制和监控&#xff0c;以及合理地配置和组织所有资源。满足车间信息化需要&#xff0c;提高车间对随机事件的快速响应和处理能力&#xff0c;有力地促进企业信息化进程向车间层拓展。通过构建以“精益生产、智能制造”为特点的车间管理系统&#xff…

zookeeper数据结构及Znode类型

结构 1、层次化的目录结构&#xff0c;命名符合常规文件系统规范 2、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识。 3、节点Znode可以包含数据和子节点&#xff08;但是EPHEMERAL类型的节点不能有子节点&#xff09;。 4、客户端应用可以在节点上设置监视器…

设计企业网站大纲_企业网站设计布局

网站制作一年350元&#xff0c;五站合一&#xff0c;快速建站 &#xff0c;www.sxjcwzjs.com,只需进入网站右上角注册快速建站即可(需要电脑登录注册)&#xff0c;需要联系我吧&#xff01;电话&#xff1a;13752214574&#xff0c;微信号&#xff1a;m1078582894企业网站布局很…

vue导入静态js_如何在vue js中加载静态图像

我有一个组件,它接受一个prop字符串,即一个url。它可以是远程图片,也可以是资产文件夹中的本地静态资产。目录结构:-assets/- logo.png-app.vue-components/-ImageTest.vue图像测试.vue{{imagelink}}export default {name: "ImageTest",props:{imagelink: String,},c…

搭建linux测试环境有什么用_谈谈现在搭建网站用什么程序好,选择对的程序是很重要的开头...

目前可以选择的网站程序还是蛮多的&#xff0c;开源的系统也很多&#xff0c;也有很多精品。更多时候&#xff0c;选择网站程序要结合网站的定位。说说dedecms&#xff0c;就是我们都知道的织梦程序&#xff0c;功能很齐全&#xff0c;作为一个简单的资讯门户网站&#xff0c;算…

Zookeeper应用:服务端上下线

需求 客户端感知服务器的上下线。 示意图 步骤 服务端&#xff1a; 1、所有机子向Zookeeper注册&#xff0c;注册znode为临时的。 2、有机子下线&#xff0c;连接断开后被Zookeeper自动删除&#xff0c;触发监听事件。 3、有机子上线&#xff0c;触发监听事件。 客户端&am…

哈希表查找失败的平均查找长度_你还应该知道的哈希冲突解决策略

本文首发于 vivo互联网技术 微信公众号 链接&#xff1a;https://mp.weixin.qq.com/s/5vxYoeARG1nC7Z0xTYXELA作者&#xff1a;Xuegui Chen哈希是一种通过对数据进行压缩, 从而提高效率的一种解决方法&#xff0c;但由于哈希函数有限&#xff0c;数据增大等缘故&#xff0c;哈希…

python 正则替换_5分钟速览Python正则表达式常用函数!五分钟就掌握它!

导读&#xff1a;正则表达式是处理字符串类型的"核武器"&#xff0c;不仅速度快&#xff0c;而且功能强大。本文不过多展开正则表达式相关语法&#xff0c;仅简要介绍python中正则表达式常用函数及其使用方法&#xff0c;以作快速查询浏览。01 Re概览Re模块是python的…

oracle 分组_大数据分组怎样才会更快

分组是数据库的常见运算&#xff0c;无论数据如何准备&#xff0c;通常都需要将所有数据遍历。建立索引这时是不起作用的&#xff0c;存储格式才是决定遍历效率的主要因素。数据库中数据的存放虽然是二进制格式的&#xff0c;但普遍IO性能差&#xff0c;库内遍历快&#xff0c;…