SparkContext源码分析
standalone模式===》》创建一个TaskSchedulerImpl
1、 底层通过操作SchedulerBackend,针对不同种类的cluster(standalone、yarn。mesoso(亚马逊))调度task
2、 他也可以通过一个LoaclBackend,并且将isLocal设置为true,来在本地模式下工作
3、 他负责处理一下通用的逻辑,比如说决定多个job的调度顺序(FIFO),启动推测任务执行
4、 客户端首先应该调用它的initialize()方法和start()方法,然后通过runTasks()方法提交tasksets
创建SparkDeploySchedulerBackend()
initializer方法中创建一个Pool调度池,FIFO、FAIR
taskScher。start()方法=====》调用了一下SparkDeploySchedulerBackend的start方法
此时:val AppDesc = newApplicationDescription(sc.appName、maxCores,sc.executorMemory,command,appUIaddress)
创建一个ApplicationDescription,非常重要!它代表了当前执行的Application的一下情况,包括Application最大需要多少CPU core 每个slave上需要多大内存。
创建APPclient(Application与spark之间通信)
一个借口。
它负责接收一个spark master的url,以及一个ApplicationDescription,和一个集群事件的监听器,以及各种事件发生时,监听器的回调函数!
start()方法,创建一个clientActor
调用registerWithMaster()里面调用tryRegisterAllMasters(),里面去连接所有的master。
DAGScheduler:实现了面向stage的调度机制的高层次的调度层,他会为每一个job计算一个stage的DAG(有向无环图),追踪RDD和stage的输出是否被物化(写入磁盘或者内存等地方),并且寻找一个最少消耗(最优、最小)调度机制来运行job,他会将stage作为tasksets提交到底层的TaskScheduler上,来在集群上运行他们(task)。
除了处理stage的DAG,还负责决定运行每个task的最佳位置,基于当前的缓存状态,将这些最佳位置提交给底层的TaskSchedulerImpl,此外,他会处理由于shuffle输出文件丢失导致的失败,在这种情况下,旧的stage可能会被重新提交,一个stage内部的失败,如果不是由于shuffle文件丢失导致的,会被TaskScheduler处理,他会多次重复每一个task,知道最后实在不行,才会去取消整个stage。
SparkUI:jetty工具类。