目录
1、Master接收注册的主要对象
2、Master接收Worker的注册
3、Master接收Driver的注册
4、Master处理Driver状态变化
5、Master接收Application的注册
6、Master处理Executor状态变化
1、Master接收注册的主要对象
Master主要接受注册的对象是:Application,Driver,Worker。
注意:Executor不是注册给master而是注册给Driver中的SchedulerBackend
2、Master接收Worker的注册
Worker启动后主动向Master注册,所以在生产环境下不需要重启集群就能够使用新的Worker。
(1)worker是一个消息循环体,因为继承了ThreadSafeRPCEndpoint。他启动的时候有一大堆我们找到onStart方法。
(2)这里面调用了registerWithMaster,这里面用了tryRegisterAllMaster方法向所有的master提交。
(3)在具体注册向所有的master提交的时候,是用线程池的中一个线程来提交。然后就获得了masterEndpoint,获得了masterEndpoint之后,将其作为参数传入registerWithMaster方法。masterRpcAddresses是所有的master的地址,这里的map是进行对master进行逐个注册。这里发送给所有的master而不时发送给active是因为:如果不这样做的话,worker启动的时候就要搞明白谁是active的master,这样他的负担就加重了,就不符合强内聚,弱耦合的架构设计。
(4)在registerWithMaster的时候传进去的消息体就是RegisterWorker,它是个case class
这个worker是自己的引用,master可以通过这个ref来通信
(5)worker发出的消息经过通信之后,master会接收到注册请求,worker发送的是RegisterWorker,master的消息接收器,接收到了这个消息,如下
这个代码逻辑是:收到了worker启动注册的信息之后,首先判断一下自己的状态,如果是standby那肯定没戏。然后判断idToWorker这个HashMap中有没有这个worker,他的key就代表了worker本身字符串级别的描述,WorkerInfo和注册的信息基本是一致的。所以WorkerInfo就包含了对worker所又了解的内容。也就是说他构建了一个内存数据结构,包含了所有已经注册的信息,如果已经注册过的话就不会注册。
如果master不是standby,worker也没注册过的话我们会构建workerInfo,这里是接收通过模式匹配匹配到的具体的worker发过来的信息并报存。在这基础之上,把workerInfo作为参数传入到registerWorker,执行具体的注册的过程。
注册其实就是登记一下,保留信息。这里会过滤掉workState为dead的work,也就是说如果worker已经dead掉了,将来某段时间注册的话,是不会接受的(因为曾经已经dead掉了,现在突然活了,认为是不可以思议的事情)。
也就是首先会判断worker的状态是否是DEAD的状态则直接过滤掉,对于UNKOWN状态的内容会调用removeWorker清理(包括清理该worker下的executors和drivers),因为在具体的机器上,这个节点向executor和driver被当前的worker,也就是要remove的worker生成和管理的,worker挂了没人管理了。如果都没问题就加入到workers,idToWorker,addressWorker这3个内存数据结构中
(6)回到receiveAndReply中注册完毕之后,persistenceEngine持久化引擎要把注册的worker持久化起来(HA)。然后就reply回复。再然后Schedule()调度
3、Master接收Driver的注册
Driver注册给Master,Master会将Driver的信息放入内存缓存中,然后默认加入等待调度的队列,然后再次用持久化引擎将Driver持久化,然后使用schedule进行调度
(1)在Client启动的时候会创建一个ClientEndpoint。
一个RpcEndpoint经历的过程依次是:构建->onStart→receive→onStop。其中onStart在接收任务消息前调用,receive和receiveAndReply分别用来接收另一个RpcEndpoint(也可以是本身)send和ask过来的消息。在他的onStart方法中调用ayncSendToMasterAndForwardReply方法向每个master发送RequestSubmitDriver这个case class他的参数是driverDescription也是个case class包含了driver的信息
(2)master在接收到client发送过来的RequestSubmitDriver消息后,创建一个DriverInfo,然后persistenceEngine持久化引擎要把driver持久化起来,并且加入到waitingDrivers这个等待队列中
4、Master处理Driver状态变化
Driver的状态有SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED, ERROR。当进行改变时会发送DriverStateChanged的消息给Master。如果状态是ERROR、FINISHED、KILLED、FAILED这几种情况,则调用removeDriver。
removeDriver里面首先根据driverId来find看下有没有这个driver,你说你的状态发生变化我要看你曾经有没有登记,如果没有登记的话就打印日志如果有的话就从drivers中把当前这个driver去掉,同时看下completedDrivers的个数是不是>=RETAED_DRIVERS的个数
5、Master接收Application的注册
Application本身就是通过spark-submit的方式提交application的时候是通过schedulerBackend注册,然后将Application的信息放入内存缓存中,然后application加入等待调度的application队列,然后再次用持久化引擎将drievr持久化,然后使用schedule进行调度。
注意:注册的时候是先注册Driver然后再注册Application
在schedulerBackend的实现类StandaloneSchedulerBackend中的start方法中会创建一个StandaloneAppClient。
在StandaloneAppClient发送RegisterApplication的消息向master注册
Master收到RegisterApplication消息后Application的信息放入内存缓存中,然后application加入等待调度的application队列,然后再次用持久化引擎将drievr持久化,然后使用schedule进行调度。
6、Master处理Executor状态变化
当Executor状态发送变化时也同时会更新在Master端的注册信息,收到的消息是ExecutorStateChanged
首先查一下已注册的executors中有没有这个executor。任何获得executorState信息 如果是RUNNING的状态resetRetryCount重置一下次数。就是Executor挂掉的时候会尝试一定次数的重启(最多重试10次)。这里只是记录次数。
这个时候Executor的状态发生变化,他会告诉driver,send给他,就是driver要知道这个内容。如果是状态是Finish的话就把他remove掉,executor的lost大多是有Shuffle的输出导致的