Volcano Controller控制器源码解析

Volcano Controller控制器源码解析

本文从源码的角度分析Volcano Controller相关功能的实现。

本篇Volcano版本为v1.8.0。

Volcano项目地址: https://github.com/volcano-sh/volcano

controller命令main入口: cmd/controller-manager/main.go

controller相关代码目录: pkg/controllers
更多文章访问: https://www.cyisme.top

整体实现并不复杂, 而且项目比较简洁、风格一致(与k8s controller代码风格也一致)。可以作为学习开发k8s controller的一个参考。
在这里插入图片描述

代码风格

controller需要实现framework中interface的定义。

type Controller interface {Name() string// 初始化Initialize(opt *ControllerOption) error// 运行Run(stopCh <-chan struct{})
}

Initialize方法作为根据option初始化controller的入口, 像infomer设置、queue设置、cache设置等都在这里完成。

func (jf *jobflowcontroller) Initialize(opt *framework.ControllerOption) error {// clientjf.kubeClient = opt.KubeClientjf.vcClient = opt.VolcanoClient// informerjf.jobFlowInformer = informerfactory.NewSharedInformerFactory(jf.vcClient, 0).Flow().V1alpha1().JobFlows()jf.jobFlowSynced = jf.jobFlowInformer.Informer().HasSyncedjf.jobFlowLister = jf.jobFlowInformer.Lister()jf.jobFlowInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    jf.addJobFlow,UpdateFunc: jf.updateJobFlow,})// 参数jf.maxRequeueNum = opt.MaxRequeueNumif jf.maxRequeueNum < 0 {jf.maxRequeueNum = -1}// queuejf.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// 入队的工具函数jf.enqueueJobFlow = jf.enqueue// 处理队列中数据的处理函数jf.syncHandler = jf.handleJobFlow// ...
}

Run方法会运行多个goroutine, 执行操作

func (jf *jobflowcontroller) Run(stopCh <-chan struct{}) {defer jf.queue.ShutDown()go jf.jobFlowInformer.Informer().Run(stopCh)go jf.jobTemplateInformer.Informer().Run(stopCh)go jf.jobInformer.Informer().Run(stopCh)cache.WaitForCacheSync(stopCh, jf.jobSynced, jf.jobFlowSynced, jf.jobTemplateSynced)// 使用 k8s pkg中的util , 与k8s controller的风格一致go wait.Until(jf.worker, time.Second, stopCh)klog.Infof("JobFlowController is running ...... ")<-stopCh
}

worker会负责处理队列中的数据, 交给handler处理。 vocalno中所有的controller外层都是这执行逻辑(可能会有细微差别), 具体的handler 是差异化的。所以后面的controller介绍也不会再提这一部分, 会着重handler的实现。

func (jf *jobflowcontroller) worker() {// 代理一层for jf.processNextWorkItem() {}
}func (jf *jobflowcontroller) processNextWorkItem() bool {// 获取数据obj, shutdown := jf.queue.Get()if shutdown {// Stop workingreturn false}defer jf.queue.Done(obj)req, ok := obj.(*apis.FlowRequest)if !ok {klog.Errorf("%v is not a valid queue request struct.", obj)return true}// 具体处理handlererr := jf.syncHandler(req)jf.handleJobFlowErr(err, obj)return true
}

Queue Controller

Queue Controler主要监听三个资源对象:

  • Queue
  • PodGroup
  • Command

控制器会监听他们的状态,用以更新Queue资源的状态,从而实现依据Queue资源的调度。

func (c *queuecontroller) Initialize(opt *framework.ControllerOption) error {// 省略部分代码queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    c.addQueue,UpdateFunc: c.updateQueue,DeleteFunc: c.deleteQueue,})pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{// 省略部分代码})if utilfeature.DefaultFeatureGate.Enabled(features.QueueCommandSync) {c.cmdInformer = factory.Bus().V1alpha1().Commands()c.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{FilterFunc: func(obj interface{}) bool {switch v := obj.(type) {case *busv1alpha1.Command:return IsQueueReference(v.TargetObject)default:return false}},Handler: cache.ResourceEventHandlerFuncs{AddFunc: c.addCommand,},})c.cmdLister = c.cmdInformer.Lister()c.cmdSynced = c.cmdInformer.Informer().HasSynced}// 省略部分代码
}

监听到的消息会放到队列中, 队列是通过k8s pkg中的WorkQueue实现的。

type queuecontroller struct {// 省略部分代码// ...// queues that need to be updated.queue        workqueue.RateLimitingInterfacecommandQueue workqueue.RateLimitingInterface// queue name -> podgroup namespace/namepodGroups map[string]map[string]struct{}// 省略部分代码// ...
}

queuecontroller.queue接收apis.Request对象作为消息,queuecontroller.commandQueue接收busv1alpha1.Command对象作为消息。

在经过queuecontroller.handlerCommand方法处理后, queuecontroller.commandQueue中的busv1alpha1.Command对象转换成apis.Request事件,放到queuecontroller.queue中统一处理。

func (c *queuecontroller) handleCommand(cmd *busv1alpha1.Command) error {// 接受处理, 删除commanderr := c.vcClient.BusV1alpha1().Commands(cmd.Namespace).Delete(context.TODO(), cmd.Name, metav1.DeleteOptions{})if err != nil {// 省略部分代码}// command对象中会有ownerReference, 从中提取queue对象名称req := &apis.Request{QueueName: cmd.TargetObject.Name,// CommandIssuedEvent是内部事件类型, 用户引发命令时, 会触发该事件Event:     busv1alpha1.CommandIssuedEvent,Action:    busv1alpha1.Action(cmd.Action),}// 将command事件转换成request事件,放到queue中c.enqueueQueue(req)return nil
}

queuecontroller.handleQueuequeuecontroller.queue的事件处理函数,主要是根据request事件的类型,调用不同的处理函数更新Queue资源的状态。

func (c *queuecontroller) handleQueue(req *apis.Request) error {// 这里的queue是k8s中的Queue资源对象queue, err := c.queueLister.Get(req.QueueName)if err != nil {// 省略部分代码}// 根据queue当前的状态, 生成不同执行器queueState := queuestate.NewState(queue)// 执行操作if err := queueState.Execute(req.Action); err != nil {// 省略部分代码}return nil
}

Queue资源有4中状态(QueueState), 四种状态分别对应四种执行器:

  • Open --> openState
  • Closed --> closedState
  • Closing --> closingState
  • Unknown --> unknownState

closeState执行器为例,代码实现如下:(其他的执行器实现类似,不再举例)

type closedState struct {queue *v1beta1.Queue
}
func (cs *closedState) Execute(action v1alpha1.Action) error {switch action {// 开启动作case v1alpha1.OpenQueueAction:return OpenQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {status.State = v1beta1.QueueStateOpen})// 关闭动作case v1alpha1.CloseQueueAction:return SyncQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {status.State = v1beta1.QueueStateClosed})// 默认动作default:return SyncQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {specState := cs.queue.Status.Stateif specState == v1beta1.QueueStateOpen {status.State = v1beta1.QueueStateOpenreturn}if specState == v1beta1.QueueStateClosed {status.State = v1beta1.QueueStateClosedreturn}status.State = v1beta1.QueueStateUnknown})}
}

Queue资源volcano中有4种动作(Action), 执行器中将根据动作执行不同的操作:

  • EnqueueJob (这个动作执行器中没有用到)
  • SyncQueue (这个动作执行器中执行默认操作)
  • OpenQueue
  • CloseQueue

实际上, 对应这三个动作会有三个处理函数,他们被定义为QueueActionFn类型

type QueueActionFn func(queue *v1beta1.Queue, fn UpdateQueueStatusFn) error

因为Queue资源可以重复的Close或者Open, 所以其实执行器中并没有拦截或者限制这种操作, 而是比较简单的对状态进行重置。

操作调用的函数如下:

  • closedStateclosingState状态执行器中
OpenQueueAction
OpenQueue
CloseQueueAction
SyncQueue
Other
  • openState状态执行器中
OpenQueueAction
SyncQueue
CloseQueueAction
CloseQueue
Other
  • unknownState状态执行器中
OpenQueueAction
OpenQueue
CloseQueueAction
CloseQueue
Other
SyncQueue

可以看出, 执行逻辑:

  • 如果当前状态与预期状态一致, 则调用SyncQueue同步状态
  • 如果当前状态与预期状态不一致, 则调用OpenQueue或者CloseQueue更新状态
  • 如果状态未知, 则调用SyncQueue同步状态

然后来看一下具体的函数实现

// syncQueue主要是更新queue中podgroup的状态计数
func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {// 获取queue中的podgrouppodGroups := c.getPodGroups(queue.Name)queueStatus := schedulingv1beta1.QueueStatus{}for _, pgKey := range podGroups {// 获取podgroup对象pg, err := c.pgLister.PodGroups(ns).Get(name)// 更新计数器switch pg.Status.Phase {case schedulingv1beta1.PodGroupPending:queueStatus.Pending++case schedulingv1beta1.PodGroupRunning:queueStatus.Running++case schedulingv1beta1.PodGroupUnknown:queueStatus.Unknown++case schedulingv1beta1.PodGroupInqueue:queueStatus.Inqueue++}}// updateStateFn是在执行器中定义的函数, 用于更新queue的状态if updateStateFn != nil {updateStateFn(&queueStatus, podGroups)} else {queueStatus.State = queue.Status.State}// 省略部分代码// ...// 调用api更新queue的状态if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {}return nil
}
func (c *queuecontroller) openQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {newQueue := queue.DeepCopy()newQueue.Status.State = schedulingv1beta1.QueueStateOpen// 这里调用Update没有看懂, copy出来的对应应该除了状态,其他的都是一样的// 而Update方法是更新对象, 而不是更新状态if queue.Status.State != newQueue.Status.State {if _, err := c.vcClient.SchedulingV1beta1().Queues().Update(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction),fmt.Sprintf("Open queue failed for %v", err))return err}c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.OpenQueueAction), "Open queue succeed")} else {return nil}// 获取queue对象q, err := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), newQueue.Name, metav1.GetOptions{})newQueue = q.DeepCopy()// 执行操作if updateStateFn != nil {updateStateFn(&newQueue.Status, nil)} else {return fmt.Errorf("internal error, update state function should be provided")}// 调用api更新queue的状态if queue.Status.State != newQueue.Status.State {if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {}}return nil
}
// closeQueue与之类似, 不再举例

PodGroup Controller

PodGroup Controller比较简单, 它负责为未指定PodGroup的Pod分配PodGroup。

func (pg *pgcontroller) processNextReq() bool {// 省略部分代码// 获取pod对象pod, err := pg.podLister.Pods(req.podNamespace).Get(req.podName)// 根据调度器名称过滤if !commonutil.Contains(pg.schedulerNames, pod.Spec.SchedulerName) {return true}// 如果pod已经有podgroup, 则不再处理if pod.Annotations != nil && pod.Annotations[scheduling.KubeGroupNameAnnotationKey] != "" {return true}// 为pod分配podgroupif err := pg.createNormalPodPGIfNotExist(pod); err != nil {// AddRateLimited将在一段时间后重新添加req到队列中pg.queue.AddRateLimited(req)return true}// 省略部分代码
}
func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod) error {// pgname将以”podgroup-“开头pgName := helpers.GeneratePodgroupName(pod)if _, err := pg.pgLister.PodGroups(pod.Namespace).Get(pgName); err != nil {// podgroup不存在, 则创建if !apierrors.IsNotFound(err) {return err}// 省略了一些从pod中继承赋值的代码obj := &scheduling.PodGroup{ObjectMeta: metav1.ObjectMeta{// podgroup的ownerReference是podOwnerReferences: newPGOwnerReferences(pod),},Spec: scheduling.PodGroupSpec{// 最小成员数为1MinMember:         1,},Status: scheduling.PodGroupStatus{// 状态为pendingPhase: scheduling.PodGroupPending,},}// 继承pod的owner信息,写入到annotationspg.inheritUpperAnnotations(pod, obj)// 继承pod annotationsif queueName, ok := pod.Annotations[scheduling.QueueNameAnnotationKey]; ok {obj.Spec.Queue = queueName}// 省略annotations继承的代码// ...// 创建podgroupif _, err := pg.vcClient.SchedulingV1beta1().PodGroups(pod.Namespace).Create(context.TODO(), obj, metav1.CreateOptions{}); err != nil {}}// 如果存在pg,则更新pod的annotationsreturn pg.updatePodAnnotations(pod, pgName)
}

JobFlow Controller

JobFlow是在volcano 1.8之后引入的CRD对象, 它配合JobTemplate使用,用于vcjob任务的编排。

JobFlow Controller主要监听JobFlowJob两个对象的变化, 并更新JobFlow的状态。

func (jf *jobflowcontroller) Initialize(opt *framework.ControllerOption) error {// ...jf.jobFlowInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    jf.addJobFlow,UpdateFunc: jf.updateJobFlow,})jf.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{UpdateFunc: jf.updateJob,})// 省略部分代码
}

Job更新时,会判断是否属于JobFlow, 如果是,则将关联的JobFlow加入到队列。

func (jf *jobflowcontroller) updateJob(oldObj, newObj interface{}) {// ...if newJob.ResourceVersion == oldJob.ResourceVersion {return}jobFlowName := getJobFlowNameByJob(newJob)if jobFlowName == "" {return}req := &apis.FlowRequest{Namespace:   newJob.Namespace,JobFlowName: jobFlowName,Action:      jobflowv1alpha1.SyncJobFlowAction,Event:       jobflowv1alpha1.OutOfSyncEvent,}jf.queue.Add(req)
}

放入队列的apis.FlowRequest对象,最终会由handleJobFlow函数处理, 然后根据当前JobFlow的状态,生成并调用不同执行器。(这里的运行逻辑和Queue的差不多)

func (jf *jobflowcontroller) handleJobFlow(req *apis.FlowRequest) error {// 省略部分代码// ...jobflow, err := jf.jobFlowLister.JobFlows(req.Namespace).Get(req.JobFlowName)// 根据jobflow的状态, 生成不同的执行器jobFlowState := jobflowstate.NewState(jobflow)if err := jobFlowState.Execute(req.Action); err != nil {}return nil
}

JobFlow有5种状态(Flow Phase), 分别对应5种执行器::

  • Succeed --> succeedState
  • Terminating --> terminatingState (这个状态的执行器并没有实际动作,因为资源即将释放)
  • Failed --> failedState (这个状态的执行器并没有实际动作,因为状态异常)
  • Running --> runningState
  • Pending --> pendingState

JobFlow目前只有1种动作SyncJobFlow(Action), 由SyncJobFlow函数执行具体操作。

func (jf *jobflowcontroller) syncJobFlow(jobFlow *v1alpha1flow.JobFlow, updateStateFn state.UpdateJobFlowStatusFn) error {// ...// 如果当前jobflow的状态为succeed, 且job的保留策略为delete, 则删除所有由jobflow创建的jobif jobFlow.Spec.JobRetainPolicy == v1alpha1flow.Delete && jobFlow.Status.State.Phase == v1alpha1flow.Succeed {if err := jf.deleteAllJobsCreatedByJobFlow(jobFlow); err != nil {}return nil}// 根据jobflow中声明的jobtemplate创建job, 声明顺序即为创建顺序if err := jf.deployJob(jobFlow); err != nil {}// 获取jobflow下所有job的状态jobFlowStatus, err := jf.getAllJobStatus(jobFlow)if err != nil {return err}// 更新jobflow的状态jobFlow.Status = *jobFlowStatusupdateStateFn(&jobFlow.Status, len(jobFlow.Spec.Flows))_, err = jf.vcClient.FlowV1alpha1().JobFlows(jobFlow.Namespace).UpdateStatus(context.Background(), jobFlow, metav1.UpdateOptions{})return nil
}
func (jf *jobflowcontroller) deployJob(jobFlow *v1alpha1flow.JobFlow) error {for _, flow := range jobFlow.Spec.Flows {jobName := getJobName(jobFlow.Name, flow.Name)if _, err := jf.jobLister.Jobs(jobFlow.Namespace).Get(jobName); err != nil {if errors.IsNotFound(err) {// 如果job没有依赖, 则直接创建if flow.DependsOn == nil || flow.DependsOn.Targets == nil {// createJob根据jobtemplat创建job// 创建已经存在的job, 不会报错if err := jf.createJob(jobFlow, flow); err != nil {return err}} else {// 有依赖则判断依赖的job是否已经完成// 任何一个依赖的job未完成都不会创建flag, err := jf.judge(jobFlow, flow)if flag {if err := jf.createJob(jobFlow, flow); err != nil {return err}}}continue}return err}}return nil
}

Job Controller

Jobvolcano中的核心资源对象, 为了避免与k8s中的Job对象混淆, 也会称之为vcjob或者vj

Job Controller监听多个资源对象的变更事件:

func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {// ...cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    cc.addJob,UpdateFunc: cc.updateJob,DeleteFunc: cc.deleteJob,})cc.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{FilterFunc: func(obj interface{}) bool {switch v := obj.(type) {case *busv1alpha1.Command:if v.TargetObject != nil &&v.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() &&v.TargetObject.Kind == "Job" {return true}return falsedefault:return false}},Handler: cache.ResourceEventHandlerFuncs{AddFunc: cc.addCommand,},},)cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    cc.addPod,UpdateFunc: cc.updatePod,DeleteFunc: cc.deletePod,})cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{UpdateFunc: cc.updatePodGroup,})// ...
}

vcjob的处理量会比较大, 所以Job Controller会启动多个worker来处理事件, 每个worker会有属于自己的queue

func (cc *jobcontroller) Run(stopCh <-chan struct{}) {// ...// commandQueue是用于处理busv1alpha1.Command对象的队列// 与Queue Controller中类似, 最终会转换成apis.Request对象, 放入queue中go wait.Until(cc.handleCommands, 0, stopCh)var i uint32// 启动多个workerfor i = 0; i < cc.workers; i++ {go func(num uint32) {wait.Until(func() {cc.worker(num)},time.Second,stopCh)}(i)}// cache用于缓存资源状态go cc.cache.Run(stopCh)// 处理错误taskgo wait.Until(cc.processResyncTask, 0, stopCh)// ...
}

新的事件会通过getWokerQueue函数来获取对应的queue, 然后放入队列中。

func (cc *jobcontroller) getWorkerQueue(key string) workqueue.RateLimitingInterface {// ...hashVal = fnv.New32()hashVal.Write([]byte(key))val = hashVal.Sum32()// 通过hash值取模来获取queuequeue := cc.queueList[val%cc.workers]return queue
}

command事件转换成request事件的过程与Queue Controller类似, 这里不再赘述。 queue中的数据处理是由processNextReq函数接收的。

func (cc *jobcontroller) processNextReq(count uint32) bool {// 获取queue, queue的数量与worker数量相同并一一对应queue := cc.queueList[count]req := obj.(apis.Request)key := jobcache.JobKeyByReq(&req)if !cc.belongsToThisRoutine(key, count) {// 这里做了校验, 如果key不属于当前worker, 则重新放入queue中queueLocal := cc.getWorkerQueue(key)queueLocal.Add(req)return true}jobInfo, err := cc.cache.Get(key)// state.NewState 这个名字见过很多次了, 用于生成执行器st := state.NewState(jobInfo)if st == nil {return true}// 获取当前需要执行的动作action := applyPolicies(jobInfo.Job, &req)// 非同步动作, 记录事件if action != busv1alpha1.SyncJobAction {cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf("Start to execute action %s ", action))}// 执行动作if err := st.Execute(action); err != nil {// 如果执行失败, 则根据重试次数, 决定是否重新放入queue中。// maxRequeueNum -1, 表示无限重试if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum {queue.AddRateLimited(req)return true}}// 如果执行成功, 则删除queue中的事件queue.Forget(req)return true
}

vcjob有10种状态(JobPhase), 对应8种执行器:

  • Pending --> pendingState
  • Aborting --> abortingState
  • Aborted --> abortedState
  • Running --> runningState
  • Restarting --> restartingState
  • Completing --> completingState
  • Terminating --> terminatingState
  • Terminated、Failed、Completed --> terminatedState

abortedState为例, 代码实现如下:

func (as *abortedState) Execute(action v1alpha1.Action) error {switch action {case v1alpha1.ResumeJobAction:return KillJob(as.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {status.State.Phase = vcbatch.Restartingstatus.RetryCount++return true})default:return KillJob(as.job, PodRetainPhaseSoft, nil)}
}

vcjob有11种动作(Action), 执行器中将根据动作执行不同的操作:

  • AbortJob 如果设置此操作,整个工作将被中止;所有作业的Pod都将被驱逐,并且不会重新创建任何Pod
  • RestartJob 如果设置了此操作,整个作业将重新启动
  • RestartTask 如果设置此操作,则仅重新启动任务;默认操作。
  • TerminateJob 如果设置了此操作,整个工作将被终止并且无法恢复;所有作业的Pod都将被驱逐,并且不会重新创建任何Pod。
  • CompleteJob 如果设置此操作,未完成的pod将被杀死,作业完成。
  • ResumeJob 恢复中止的工作。
  • SyncJob 同步Job/Pod状态的操作。(内部动作)
  • EnqueueJob 同步作业入队状态的操作。(内部动作)
  • SyncQueue 同步队列状态的操作。(内部动作)
  • OpenQueue 打开队列的操作。(内部动作)
  • CloseQueue 关闭队列的操作。(内部动作)

实际上, 对应这些动作会有不同的处理函数,他们被定义为ActionFn类型和KillActionFn类型。 这两个类型被声明为SyncJobKillJob的函数,并被执行器调用。

type ActionFn func(job *apis.JobInfo, fn UpdateStatusFn) error
type KillActionFn func(job *apis.JobInfo, podRetainPhase PhaseMap, fn UpdateStatusFn) error
var (// SyncJob将根据Job的规范创建或删除Pod。SyncJob ActionFn// KillJob 将杀死状态不在podRetainPhase中的pod.KillJob KillActionFn
)

操作调用的函数如下:(虽然不同动作调用的操作可能相同, 但是会更新不同的状态信息)

  • pendingStaterunningState状态执行器中:
RestartJobAction
KillJob
AbortJobAction
CompleteJobAction
TerminateJobAction
Other
SyncJob
  • restartingState状态执行器中, 直接调用KillJob
  • finishedState为最终状态, 所以不会执行任何动作。
  • terminatingState 直接调用KillJob
  • abortingStateabortedState状态执行器中:
ResumeJobAction
KillJob
Other
  • completingState直接调用KillJob

可以看出, 执行逻辑:

  • 如果是干预vcjob状态的动作, 则调用KillJob
  • 反之, 则调用SyncJob

然后来看一下具体实现函数。

killJob

killJob对应删除pod的操作。

遍历pod执行动作
设置job状态
执行删除插件
更新job状态
删除podgroup
func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error {// job已经处于删除状态, 则不再处理if job.DeletionTimestamp != nil {return nil}// 状态计数器, 用于更新job的状态var pending, running, terminating, succeeded, failed, unknown int32taskStatusCount := make(map[string]batch.TaskState)for _, pods := range jobInfo.Pods {for _, pod := range pods {total++if pod.DeletionTimestamp != nil {// pod处于删除状态, 则不再处理continue}maxRetry := job.Spec.MaxRetrylastRetry := false// 判断是否是最后一次重试if job.Status.RetryCount >= maxRetry-1 {lastRetry = true}// 如果是最后一次重试, 则保留失败和成功的podretainPhase := podRetainPhaseif lastRetry {// var PodRetainPhaseSoft = PhaseMap{//     v1.PodSucceeded: {},//     v1.PodFailed:    {},// }retainPhase = state.PodRetainPhaseSoft}_, retain := retainPhase[pod.Status.Phase]// 如果不保留pod, 则删除podif !retain {err := cc.deleteJobPod(job.Name, pod)if err == nil {terminating++continue}// 失败放入重试队列errs = append(errs, err)cc.resyncTask(pod)}// 更新状态计数器classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)calcPodStatus(pod, taskStatusCount)}}if len(errs) != 0 {return fmt.Errorf("failed to kill %d pods of %d", len(errs), total)}// 更新job的状态计数job = job.DeepCopy()job.Status.Version++job.Status.Pending = pendingjob.Status.Running = runningjob.Status.Succeeded = succeededjob.Status.Failed = failedjob.Status.Terminating = terminatingjob.Status.Unknown = unknownjob.Status.TaskStatusCount = taskStatusCount// 更新运行持续时间job.Status.RunningDuration = &metav1.Duration{Duration: time.Since(jobInfo.Job.CreationTimestamp.Time)}// 更新job的状态if updateStatus != nil {if updateStatus(&job.Status) {job.Status.State.LastTransitionTime = metav1.Now()jobCondition := newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)job.Status.Conditions = append(job.Status.Conditions, jobCondition)}}// 执行删除插件if err := cc.pluginOnJobDelete(job); err != nil {return err}// 调用api更新job的状态newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})if err != nil {return err}if e := cc.cache.Update(newJob); e != nil {return e}// 删除podgrouppgName := job.Name + "-" + string(job.UID)if err := cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), pgName, metav1.DeleteOptions{}); err != nil {if !apierrors.IsNotFound(err) {return err}}return nil
}

syncJob

syncJob对应创建pod的操作。

遍历task统计add/del的pod
创建pod
删除pod
更新job状态
func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {if jobInfo.Job.DeletionTimestamp != nil {return nil}// ...// 获取job的queue信息queueInfo, err := cc.GetQueueInfo(job.Spec.Queue)if err != nil {return err}var jobForwarding bool// ExtendClusters 这个属性没有找到介绍, 好像只在这里用到了if len(queueInfo.Spec.ExtendClusters) != 0 {jobForwarding = trueif len(job.Annotations) == 0 {job.Annotations = make(map[string]string)}job.Annotations[batch.JobForwardingKey] = "true"job, err = cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})if err != nil {return err}}// 初始化jobif !isInitiated(job) {// initiateJob中会更新job状态、调用add插件、更新podgroupif job, err = cc.initiateJob(job); err != nil {return err}} else {// initOnJobUpdate会调用add插件、更新podgroupif err = cc.initOnJobUpdate(job); err != nil {return err}}// ... 省略 queueInfo.Spec.ExtendClusters 的处理var syncTask boolpgName := job.Name + "-" + string(job.UID)if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(pgName); pg != nil {if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending {syncTask = true}// ...}var jobCondition batch.JobCondition// 如果包含刚创建的podgroup, 则更新job状态if !syncTask {if updateStatus != nil {if updateStatus(&job.Status) {job.Status.State.LastTransitionTime = metav1.Now()jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)job.Status.Conditions = append(job.Status.Conditions, jobCondition)}}newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})// ...return nil}// ... 省略一些计数声明代码// ...waitCreationGroup := sync.WaitGroup{}// 遍历job中的taskfor _, ts := range job.Spec.Tasks {// ...var podToCreateEachTask []*v1.Pod// 根据副本数, 创建或删除podfor i := 0; i < int(ts.Replicas); i++ {podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i)if pod, found := pods[podName]; !found {newPod := createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding)if err := cc.pluginOnPodCreate(job, newPod); err != nil {return err}podToCreateEachTask = append(podToCreateEachTask, newPod)waitCreationGroup.Add(1)} else {delete(pods, podName)if pod.DeletionTimestamp != nil {atomic.AddInt32(&terminating, 1)continue}// 更新状态计数器classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)calcPodStatus(pod, taskStatusCount)}}// 统计需要创建和删除的podpodToCreate[ts.Name] = podToCreateEachTaskfor _, pod := range pods {podToDelete = append(podToDelete, pod)}}// 创建podfor taskName, podToCreateEachTask := range podToCreate {if len(podToCreateEachTask) == 0 {continue}go func(taskName string, podToCreateEachTask []*v1.Pod) {taskIndex := jobhelpers.GetTasklndexUnderJob(taskName, job)if job.Spec.Tasks[taskIndex].DependsOn != nil {// 统一判断依赖关系是否满足需求, 不满足则不创建podif !cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job) {for _, pod := range podToCreateEachTask {go func(pod *v1.Pod) {defer waitCreationGroup.Done()}(pod)}return}}// 执行创建for _, pod := range podToCreateEachTask {go func(pod *v1.Pod) {defer waitCreationGroup.Done()newPod, err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})if err != nil && !apierrors.IsAlreadyExists(err) {appendError(&creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err))} else {classifyAndAddUpPodBaseOnPhase(newPod, &pending, &running, &succeeded, &failed, &unknown)calcPodStatus(pod, taskStatusCount)}}(pod)}}(taskName, podToCreateEachTask)}// 等待创建完成waitCreationGroup.Wait()if len(creationErrs) != 0 {return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))}// 删除podfor _, pod := range podToDelete {go func(pod *v1.Pod) {defer waitDeletionGroup.Done()err := cc.deleteJobPod(job.Name, pod)if err != nil {appendError(&deletionErrs, err)cc.resyncTask(pod)} else {klog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>",pod.Name, job.Namespace, job.Name)atomic.AddInt32(&terminating, 1)}}(pod)}// 等待删除完成waitDeletionGroup.Wait()if len(deletionErrs) != 0 {return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))}job.Status = batch.JobStatus{State: job.Status.State,Pending:             pending,Running:             running,Succeeded:           succeeded,Failed:              failed,Terminating:         terminating,Unknown:             unknown,Version:             job.Status.Version,MinAvailable:        job.Spec.MinAvailable,TaskStatusCount:     taskStatusCount,ControlledResources: job.Status.ControlledResources,Conditions:          job.Status.Conditions,RetryCount:          job.Status.RetryCount,}// 更新job状态if updateStatus != nil && updateStatus(&job.Status) {job.Status.State.LastTransitionTime = metav1.Now()jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)job.Status.Conditions = append(job.Status.Conditions, jobCondition)}// 调用api更新job状态newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})if err != nil {klog.Errorf("Failed to update status of Job %v/%v: %v",job.Namespace, job.Name, err)return err}if e := cc.cache.Update(newJob); e != nil {return e}return nil
}

其他控制器

其他一些控制器因为逻辑比较简单,就不再从代码解析了:

  • jobTemplate controller 监听vcjobjobtemplate, 用于更新jobtemplate 状态中的JobDependsOnList, 即有哪些vcjob依赖于该jobtemplatejobTemplate被官方称之为vcjob的套壳(jobTemplate.spec = vcjob.spec), 目的是为了职责区分。
  • gc controller 监听具有.spec.ttlSecondsAfterFinished属性的vcjob, ttl过期则删除job

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/596081.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一加 Buds 3正式发布:普及旗舰音质 一加用户首选

1月4日&#xff0c;一加新品发布会正式推出旗下新款耳机一加 Buds 3。延续一加经典美学&#xff0c;秉承音质完美主义追求&#xff0c;一加 Buds 3全面普及一加旗舰耳机体验&#xff0c;其搭载旗舰同款“超清晰同轴双单元”&#xff0c;配备49dB 4000Hz超宽频主动降噪&#xff…

万界星空科技MES系统中的生产管理

MES系统能够帮助企业实现生产计划管理、生产过程控制、产品质量管理、车间库存管理、项目看板管理等&#xff0c;提高企业制造执行能力。 万界星空MES系统特点&#xff1a; 1. 采用强大数据采集引擎、整合数据采集渠道&#xff08;RFID、条码设备、PLC、Sensor、IPC、PC等&…

【中小型企业网络实战案例 七】配置限速

相关学习文章&#xff1a; 【中小型企业网络实战案例 一】规划、需求和基本配置 【中小型企业网络实战案例 二】配置网络互连互通【中小型企业网络实战案例 三】配置DHCP动态分配地址 【中小型企业网络实战案例 四】配置OSPF动态路由协议【中小型企业网络实战案例 五】配置可…

Hubery-个人项目经历记录

研究生期间很有幸的进入到了崔老师的组&#xff0c;从此也就进入到了分析人体生理信号的领域&#xff0c;充满挑战的同时也充满了乐趣。借着CSDN整理一下近几年来参与的项目&#xff0c;这里蕴含着我各种美好的回忆&#xff0c;也作为一个展示自己的平台吧。 开始之前&#xff…

作业--day41

利用模板类完成顺序表 #include <iostream>using namespace std;//模板类 template <typename T> class SeqTab{T arr[20];int maxsize; public:SeqTab():maxsize(0){}void Insert(T a);void Search(T a);void Delete(int index);void Show(); };//尾插 template …

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之线性布局容器Column组件

鸿蒙&#xff08;HarmonyOS&#xff09;项目方舟框架&#xff08;ArkUI&#xff09;之线性布局容器Column组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 二、Column组件 沿垂直方向布局的容器。 子组件 可以包含子组件。 接…

完美解决Github 2fa二次验证问题

完美解决Github 2fa二次验证问题 原文阅读 https://onedayxyy.cn/docs/github-2fa 前言 你的 Github 账户可能被封禁! 教你应对 Github 最新的 2FA 二次验证! 无地区限制, 无额外设备的全网最完美方案 1、2FA 的定义 双因素身份验证 (2FA) 是一种身份和访管理安全方法&…

物流实时数仓:数仓搭建(DWS)一

系列文章目录 物流实时数仓&#xff1a;采集通道搭建 物流实时数仓&#xff1a;数仓搭建 物流实时数仓&#xff1a;数仓搭建&#xff08;DIM&#xff09; 物流实时数仓&#xff1a;数仓搭建&#xff08;DWD&#xff09;一 物流实时数仓&#xff1a;数仓搭建&#xff08;DWD&am…

遥测终端机——连接智能世界的桥梁

在当今的智能化时代&#xff0c;数据的重要性日益凸显。各个行业都需要对数据进行实时监测、处理和分析&#xff0c;以提升生产效率、优化运营管理。遥测终端机作为连接智能世界的桥梁&#xff0c;正逐渐成为各行业的必备设备。 遥测终端机是一种集数据采集、存储、传输和管理于…

小红书12月内容趋势分析

为洞察小红书平台的内容创作趋势及品牌营销策略&#xff0c;新红推出12月月度榜单&#xff0c;从创作者、品牌、热搜词多方面入手&#xff0c;解析月榜数据&#xff0c;为从业者提供参考。 以下为12月部分榜单解析&#xff0c;想要查看更多行业榜单&#xff0c;创作优质内容&am…

vue3+Cesium 添加地面测控站台

效果 cesiumRadar.js import * as Cesium from cesium; export function addentities(viewer, res, index) {viewer.entities.add({id: index,position: Cesium.Cartesian3.fromDegrees(res[0], res[1]),wall: {positions: new Cesium.CallbackProperty(() > {return Cesiu…

kubernetes(K8s)的使用和常用命令

K8S kubernetes&#xff0c;由于k和s之间有8个字符&#xff0c;所以简称k8s&#xff0c;是一个全新的基于容器技术的分布式架构领先方案&#xff0c;是谷歌严格保密十几年的秘密武器----Borg系统的一个开源版本&#xff0c;于2015年7月发布第一个正式版本&#xff0c;它的本质…

【电商项目实战】实现订单超时支付取消

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《电商项目实战》。&#x1f3af;&#x1f3af; &am…

大创项目推荐 深度学习卷积神经网络垃圾分类系统 - 深度学习 神经网络 图像识别 垃圾分类 算法 小程序

文章目录 0 简介1 背景意义2 数据集3 数据探索4 数据增广(数据集补充)5 垃圾图像分类5.1 迁移学习5.1.1 什么是迁移学习&#xff1f;5.1.2 为什么要迁移学习&#xff1f; 5.2 模型选择5.3 训练环境5.3.1 硬件配置5.3.2 软件配置 5.4 训练过程5.5 模型分类效果(PC端) 6 构建垃圾…

网络通信(9)-C#TCP服务端实例

本文使用Socket在C#语言环境下完成TCP服务端的实例。 实例完成的功能: 服务器能够连接多个客户端显示在列表中,实现实时刷新。 服务器接收客户端的字符串数据。 选中列表中的客户端发送字符串数据。 在VS中创建C# Winform项目,编辑界面,如下: UI文件 namespace MyTc…

多元线性回归案例--客户价值模型

文章目录 step 1&#xff1a;读取数据step 2&#xff1a;搭建模型step 3&#xff1a;构造回归方程step 4&#xff1a;评估模型 利用多元线性回归模型可以根据多个因素来预测客户价值&#xff0c;当模型搭建完成后&#xff0c;便可对不同价值的客户采用不同的业务策略。 这里以信…

RuoYi-Cloud-Plus使用minio进行文件上传图片后无法预览解决_修改minio配置minio桶权限---SpringCloud工作笔记198

在文件管理的位置,发现刚刚上传的图片文件,会显示 预览图片失败 后来经过多方查看,发现是minio的配置的问题 可以从这里: 可以看到首先登录RuoYi-Cloud-Plus系统然后,打开文件管理页面可以看到,当上传了图片文件以后 显示文件展示中,文件预览失败,那么这个时候,去修改minio的配…

python识别验证码+灰度图片base64转换图片

一、为后面识别验证码准备 1、base64转换为图片&#xff0c;保存本地、并且置灰 上文中的base64,后面的就是包含Base64编码的PNG图像的字符串复制下来 import base64 from PIL import Image import io# 这里是你的Base64编码的字符串 base64_data "iVBORw0KGgoAAAANSUhE…

鸿鹄电子招投标系统:源码级别解析电子招投标的精髓

招投标管理系统是一个集门户管理、立项管理、采购项目管理、采购公告管理、考核管理、报表管理、评审管理、企业管理、采购管理和系统管理于一体的综合性应用平台。它适用于招标代理、政府采购、企业采购和工程交易等业务的企业&#xff0c;旨在提高项目管理的效率和质量。该系…

大数据HCIE成神之路之特征工程——特征选择

特征选择 1.1 特征选择 - Filter方法1.1.1 实验任务1.1.1.1 实验背景1.1.1.2 实验目标1.1.1.3 实验数据解析1.1.1.4 实验思路 1.1.2 实验操作步骤 1.2 特征选择 - Wrapper方法1.2.1 实验任务1.2.1.1 实验背景1.2.1.2 实验目标1.2.1.3 实验数据解析1.2.1.4 实验思路 1.2.2 实验操…