深入解析kubernetes controller-runtime

🚀 优质资源分享 🚀

学习路线指引(点击解锁)知识定位人群定位
🧡 Python实战微信订餐小程序 🧡进阶级本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
💛Python量化交易实战💛入门级手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

Overview

controller-runtime 是 Kubernetes 社区提供可供快速搭建一套 实现了controller 功能的工具,无需自行实现Controller的功能了;在 KubebuilderOperator SDK 也是使用 controller-runtime 。本文将对 controller-runtime 的工作原理以及在不同场景下的使用方式进行简要的总结和介绍。

controller-runtime structure

controller-runtime 主要组成是需要用户创建的 ManagerReconciler 以及 Controller Runtime 自己启动的 CacheController

  • Manager:是用户在初始化时创建的,用于启动 Controller Runtime 组件
  • Reconciler:是用户需要提供来处理自己的业务逻辑的组件(即在通过 code-generator 生成的api-like而实现的controller中的业务处理部分)。
  • Cache:一个缓存,用来建立 InformerApiServer 的连接来监听资源并将被监听的对象推送到queue中。
  • Controller: 一方面向 Informer 注册 eventHandler,另一方面从队列中获取数据。controller 将从队列中获取数据并执行用户自定义的 Reconciler 功能。

image

图:controller-runtime structure
image

图:controller-runtime flowchart
由图可知,Controller会向 Informer 注册一些列eventHandler;然后Cache启动Informer(informer属于cache包中),与ApiServer建立监听;当Informer检测到资源变化时,将对象加入queue,Controller 将元素取出并在用户端执行 Reconciler。

Controller引入

我们从 controller-rumtime项目的 example 进行引入看下,整个架构都是如何实现的。

可以看到 example 下的实际上实现了一个 reconciler 的结构体,实现了 Reconciler 抽象和 Client 结构体

type reconciler struct {client.Clientscheme *runtime.Scheme
}

那么来看下 抽象的 Reconciler 是什么,可以看到就是抽象了 Reconcile 方法,这个是具体处理的逻辑过程

type Reconciler interface {Reconcile(context.Context, Request) (Result, error)
}

下面在看下谁来实现了这个 Reconciler 抽象

type Controller interface {reconcile.Reconciler // 协调的具体步骤,通过ns/name\// 通过predicates来评估来源数据,并加入queue中(放入队列的是reconcile.Requests)Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error// 启动controller,类似于自定义的Run()Start(ctx context.Context) errorGetLogger() logr.Logger
}

controller structure

在 controller-runtime\pkg\internal\controller\controller.go 中实现了这个 Controller

type Controller struct {Name string // controller的标识MaxConcurrentReconciles int // 并发运行Reconciler的数量,默认1// 实现了reconcile.Reconciler的调节器, 默认DefaultReconcileFuncDo reconcile.Reconciler// makeQueue会构建一个对应的队列,就是返回一个限速队列MakeQueue func() workqueue.RateLimitingInterface// MakeQueue创造出来的,在出入队列就是操作的这个Queue workqueue.RateLimitingInterface// 用于注入其他内容// 已弃用SetFields func(i interface{}) errormu sync.Mutex// 标识开始的状态Started bool// 在启动时传递的上下文,用于停止控制器ctx context.Context// 等待缓存同步的时间 默认2分钟CacheSyncTimeout time.Duration// 维护了eventHandler predicates,在控制器启动时启动startWatches []watchDescription// 日志构建器,输出入日志LogConstructor func(request *reconcile.Request) logr.Logger// RecoverPanic为是否对reconcile引起的panic恢复RecoverPanic bool
}

看完了controller的structure,接下来看看controller是如何使用的

injection

Controller.Watch 实现了注入的动作,可以看到 watch() 通过参数将 对应的事件函数传入到内部

func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {c.mu.Lock()defer c.mu.Unlock()// 使用SetFields来完成注入操作if err := c.SetFields(src); err != nil {return err}if err := c.SetFields(evthdler); err != nil {return err}for _, pr := range prct {if err := c.SetFields(pr); err != nil {return err}}// 如果Controller还未启动,那么将这些动作缓存到本地if !c.Started {c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})return nil}c.LogConstructor(nil).Info("Starting EventSource", "source", src)return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

启动操作实际上为informer注入事件函数

type Source interface {// start 是Controller 调用,用以向 Informer 注册 EventHandler, 将 reconcile.Requests(一个入队列的动作) 排入队列。Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,prct ...predicate.Predicate) error {// Informer should have been specified by the user.if is.Informer == nil {return fmt.Errorf("must specify Informer.Informer")}is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})return nil
}

我们知道对于 eventHandler,实际上应该是一个 onAddonUpdate 这种类型的函数,queue则是workqueue,那么 Predicates 是什么呢?

通过追踪可以看到定义了 Predicate 抽象,可以看出Predicate 是Watch到的事件时什么类型的,当对于每个类型的事件,对应的函数就为 true,在 eventHandler 中,这些被用作,事件的过滤。

// Predicate filters events before enqueuing the keys.
type Predicate interface {// Create returns true if the Create event should be processedCreate(event.CreateEvent) bool// Delete returns true if the Delete event should be processedDelete(event.DeleteEvent) bool// Update returns true if the Update event should be processedUpdate(event.UpdateEvent) bool// Generic returns true if the Generic event should be processedGeneric(event.GenericEvent) bool
}

在对应的动作中,可以看到这里作为过滤操作

func (e EventHandler) OnAdd(obj interface{}) {c := event.CreateEvent{}// Pull Object out of the objectif o, ok := obj.(client.Object); ok {c.Object = o} else {log.Error(nil, "OnAdd missing Object","object", obj, "type", fmt.Sprintf("%T", obj))return}for _, p := range e.Predicates {if !p.Create(c) {return}}// Invoke create handlere.EventHandler.Create(c, e.Queue)
}

上面就看到了,对应是 EventHandler.Create 进行添加的,那么这些动作具体是在做什么呢?

在代码 pkg/handler ,可以看到这些操作,类似于create,这里将ns/name放入到队列中。

func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {if evt.Object == nil {enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)return}q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name:      evt.Object.GetName(),Namespace: evt.Object.GetNamespace(),}})
}

unqueue

上面看到了,入队的动作实际上都是将 ns/name 加入到队列中,那么出队列时又做了些什么呢?

通过 controller.Start() 可以看到controller在启动后都做了些什么动作

func (c *Controller) Start(ctx context.Context) error {c.mu.Lock()if c.Started {return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")}c.initMetrics()// Set the internal context.c.ctx = ctxc.Queue = c.MakeQueue() // 初始化queuego func() { // 退出时,让queue关闭<-ctx.Done()c.Queue.ShutDown()}()wg := &sync.WaitGroup{}err := func() error {defer c.mu.Unlock()defer utilruntime.HandleCrash()// 启动informer前,将之前准备好的 evnetHandle predictates source注册for _, watch := range c.startWatches {c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))// 上面我们看过了,start就是真正的注册动作if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {return err}}// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer cachesc.LogConstructor(nil).Info("Starting Controller")// startWatches上面我们也看到了,是evnetHandle predictates source被缓存到里面,// 这里是拿出来将其启动for _, watch := range c.startWatches {syncingSource, ok := watch.src.(source.SyncingSource)if !ok {continue}if err := func() error {// use a context with timeout for launching sources and syncing caches.sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)defer cancel()// WaitForSync waits for a definitive timeout, and returns if there// is an error or a timeoutif err := syncingSource.WaitForSync(sourceStartCtx); err != nil {err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")return err}return nil}(); err != nil {return err}}// which won't be garbage collected if we hold a reference to it.c.startWatches = nil// Launch workers to process resourcesc.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)wg.Add(c.MaxConcurrentReconciles)// 启动controller消费端的线程for i := 0; i < c.MaxConcurrentReconciles; i++ {go func() {defer wg.Done()for c.processNextWorkItem(ctx) {}}()}c.Started = truereturn nil}()if err != nil {return err}<-ctx.Done() // 阻塞,直到上下文关闭c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")wg.Wait() // 等待所有线程都关闭c.LogConstructor(nil).Info("All workers finished")return nil
}

通过上面的分析,可以看到,每个消费的worker线程,实际上调用的是 processNextWorkItem 下面就来看看他究竟做了些什么?

func (c *Controller) processNextWorkItem(ctx context.Context) bool {obj, shutdown := c.Queue.Get() // 从队列中拿取数据if shutdown {return false}defer c.Queue.Done(obj)// 下面应该是prometheus指标的一些东西ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)// 获得的对象通过reconcileHandler处理c.reconcileHandler(ctx, obj)return true
}

那么下面看看 reconcileHandler 做了些什么

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {// Update metrics after processing each itemreconcileStartTS := time.Now()defer func() {c.updateMetrics(time.Since(reconcileStartTS))}()// 检查下取出的数据是否为reconcile.Request,在之前enqueue时了解到是插入的这个类型的值req, ok := obj.(reconcile.Request)if !ok {// 如果错了就忘记c.Queue.Forget(obj)c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)return}log := c.LogConstructor(&req)log = log.WithValues("reconcileID", uuid.NewUUID())ctx = logf.IntoContext(ctx, log)// 这里调用了自己在实现controller实现的Reconcile的动作result, err := c.Reconcile(ctx, req)switch {case err != nil:c.Queue.AddRateLimited(req)ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()log.Error(err, "Reconciler error")case result.RequeueAfter > 0:c.Queue.Forget(obj)c.Queue.AddAfter(req, result.RequeueAfter)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()case result.Requeue:c.Queue.AddRateLimited(req)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()default:c.Queue.Forget(obj)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()}
}

通过对example中的 Reconcile 查找其使用,可以看到,调用他的就是上面我们说道的 reconcileHandler ,到这里我们就知道了,controller 的运行流为 Controller.Start() > Controller.processNextWorkItem > Controller.reconcileHandler > Controller.Reconcile 最终到达了我们自定义的业务逻辑处理 Reconcile

image

Manager

在上面学习 controller-runtime 时了解到,有一个 Manager 的组件,这个组件是做什么呢?我们来分析下。

Manager 是用来创建与启动 controller 的(允许多个 controller 与 一个 manager 关联),Manager会启动分配给他的所有controller,以及其他可启动的对象。

在 example 看到,会初始化一个 ctrl.NewManager

func main() {ctrl.SetLogger(zap.New())mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})if err != nil {setupLog.Error(err, "unable to start manager")os.Exit(1)}// in a real controller, we'd create a new scheme for thiserr = api.AddToScheme(mgr.GetScheme())if err != nil {setupLog.Error(err, "unable to add scheme")os.Exit(1)}err = ctrl.NewControllerManagedBy(mgr).For(&api.ChaosPod{}).Owns(&corev1.Pod{}).Complete(&reconciler{Client: mgr.GetClient(),scheme: mgr.GetScheme(),})if err != nil {setupLog.Error(err, "unable to create controller")os.Exit(1)}err = ctrl.NewWebhookManagedBy(mgr).For(&api.ChaosPod{}).Complete()if err != nil {setupLog.Error(err, "unable to create webhook")os.Exit(1)}setupLog.Info("starting manager")if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {setupLog.Error(err, "problem running manager")os.Exit(1)}
}

这个 manager 就是 controller-runtime\pkg\manager\manager.go 下的 Manager, Manager 通过初始化 Caches 和 Clients 等共享依赖,并将它们提供给 Runnables。

type Manager interface {// 提供了与APIServer交互的方式,如incluster,indexer,cache等cluster.Cluster// Runnable 是任意可允许的cm中的组件,如 webhook,controller,Caches,在new中调用时,// 可以看到是传入的是一个controller,这里可以启动的是带有Start()方法的,通过调用Start()// 来启动组件Add(Runnable) error// 实现选举方法。当elected关闭,则选举为leaderElected() <-chan struct{}// 这为一些列健康检查和指标的方法,和我们关注的没有太大关系AddMetricsExtraHandler(path string, handler http.Handler) errorAddHealthzCheck(name string, check healthz.Checker) errorAddReadyzCheck(name string, check healthz.Checker) error// Start将启动所有注册进来的控制器,直到ctx取消。如果有任意controller报错,则立即退出// 如果使用了 LeaderElection,则必须在此返回后立即退出二进制文件,Start(ctx context.Context) error// GetWebhookServer returns a webhook.ServerGetWebhookServer() *webhook.Server// GetLogger returns this manager's logger.GetLogger() logr.Logger// GetControllerOptions returns controller global configuration options.GetControllerOptions() v1alpha1.ControllerConfigurationSpec
}

controller-manager

controllerManager 则实现了这个manager的抽象

type controllerManager struct {sync.Mutexstarted boolstopProcedureEngaged *int64errChan              chan errorrunnables            *runnablescluster cluster.Cluster// recorderProvider 用于记录eventhandler source predictaterecorderProvider *intrec.Provider// resourceLock forms the basis for leader electionresourceLock resourcelock.Interface// 在退出时是否关闭选举租约leaderElectionReleaseOnCancel bool// 一些指标性的,暂时不需要关注metricsListener net.ListenermetricsExtraHandlers map[string]http.HandlerhealthProbeListener net.ListenerreadinessEndpointName stringlivenessEndpointName stringreadyzHandler *healthz.HandlerhealthzHandler *healthz.Handler// 有关controller全局参数controllerOptions v1alpha1.ControllerConfigurationSpeclogger logr.Logger// 用于关闭 LeaderElection.Run(...) 的信号leaderElectionStopped chan struct{}// 取消选举,在失去选举后,必须延迟到gracefulShutdown之后os.exit()leaderElectionCancel context.CancelFunc// leader取消选举elected chan struct{}port inthost stringcertDir stringwebhookServer *webhook.ServerwebhookServerOnce sync.Once// 非leader节点强制leader的等待时间leaseDuration time.Duration// renewDeadline is the duration that the acting controlplane will retry// refreshing leadership before giving up.renewDeadline time.Duration// LeaderElector重新操作的时间retryPeriod time.Duration// gracefulShutdownTimeout 是在manager停止之前让runnables停止的持续时间。gracefulShutdownTimeout time.Duration// onStoppedLeading is callled when the leader election lease is lost.// It can be overridden for tests.onStoppedLeading func()shutdownCtx context.ContextinternalCtx    context.ContextinternalCancel context.CancelFuncinternalProceduresStop chan struct{}
}

workflow

了解完ControllerManager之后,我们通过 example 来看看 ControllerManager 的workflow

func main() {ctrl.SetLogger(zap.New())// New一个managermgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})if err != nil {setupLog.Error(err, "unable to start manager")os.Exit(1)}// in a real controller, we'd create a new scheme for thiserr = api.AddToScheme(mgr.GetScheme())if err != nil {setupLog.Error(err, "unable to add scheme")os.Exit(1)}err = ctrl.NewControllerManagedBy(mgr).For(&api.ChaosPod{}).Owns(&corev1.Pod{}).Complete(&reconciler{Client: mgr.GetClient(),scheme: mgr.GetScheme(),})if err != nil {setupLog.Error(err, "unable to create controller")os.Exit(1)}err = ctrl.NewWebhookManagedBy(mgr).For(&api.ChaosPod{}).Complete()if err != nil {setupLog.Error(err, "unable to create webhook")os.Exit(1)}setupLog.Info("starting manager")if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {setupLog.Error(err, "problem running manager")os.Exit(1)}
}
  • 通过 manager.New() 初始化一个manager,这里面会初始化一些列的manager的参数
  • 通过 ctrl.NewControllerManagedBy 注册 controller 到manager中
    • ctrl.NewControllerManagedBy 是 builder的一个别名,构建出一个builder类型的controller
    • builder 中的 ctrl 就是 controller
  • 启动manager

builder

下面看来看下builder在构建时做了什么

// Builder builds a Controller.
type Builder struct {forInput         ForInputownsInput        []OwnsInputwatchesInput     []WatchesInputmgr              manager.ManagerglobalPredicates []predicate.Predicatectrl             controller.ControllerctrlOptions      controller.Optionsname             string
}

我们看到 example 中是调用了 For() 动作,那么这个 For() 是什么呢?

通过注释,我们可以看到 For() 提供了 调解对象类型,ControllerManagedBy 通过 reconciling object 来相应对应create/delete/update 事件。调用 For() 相当于调用了 Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})

func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {if blder.forInput.object != nil {blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")return blder}input := ForInput{object: object}for _, opt := range opts {opt.ApplyToFor(&input) //最终把我们要监听的对象每个 opts注册进去}blder.forInput = inputreturn blder
}

接下来是调用的 Owns() ,Owns() 看起来和 For() 功能是类似的。只是说属于不同,是通过Owns方法设置的

func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {input := OwnsInput{object: object}for _, opt := range opts {opt.ApplyToOwns(&input)}blder.ownsInput = append(blder.ownsInput, input)return blder
}

最后到了 Complete(),Complete 是完成这个controller的构建

// Complete builds the Application Controller.
func (blder *Builder) Complete(r reconcile.Reconciler) error {_, err := blder.Build(r)return err
}// Build 创建控制器并返回
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {if r == nil {return nil, fmt.Errorf("must provide a non-nil Reconciler")}if blder.mgr == nil {return nil, fmt.Errorf("must provide a non-nil Manager")}if blder.forInput.err != nil {return nil, blder.forInput.err}// Checking the reconcile type exist or notif blder.forInput.object == nil {return nil, fmt.Errorf("must provide an object for reconciliation")}// Set the ControllerManagedByif err := blder.doController(r); err != nil {return nil, err}// Set the Watchif err := blder.doWatch(); err != nil {return nil, err}return blder.ctrl, nil
}

这里面可以看到,会完成 doController 和 doWatch

doController会初始化好这个controller并返回

func (blder *Builder) doController(r reconcile.Reconciler) error {globalOpts := blder.mgr.GetControllerOptions()ctrlOptions := blder.ctrlOptionsif ctrlOptions.Reconciler == nil {ctrlOptions.Reconciler = r}// 通过检索GVK获得默认的名称gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())if err != nil {return err}// 设置并发,如果最大并发为0则找到一个// 追踪下去看似是对于没有设置时,例如会根据 app group中的 ReplicaSet设定// 就是在For()传递的一个类型的数量来确定并发的数量if ctrlOptions.MaxConcurrentReconciles == 0 {groupKind := gvk.GroupKind().String()if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {ctrlOptions.MaxConcurrentReconciles = concurrency}}// Setup cache sync timeout.if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil {ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout}// 给controller一个name,如果没有初始化传递,则使用Kind做名称controllerName := blder.getControllerName(gvk)// Setup the logger.if ctrlOptions.LogConstructor == nil {log := blder.mgr.GetLogger().WithValues("controller", controllerName,"controllerGroup", gvk.Group,"controllerKind", gvk.Kind,)lowerCamelCaseKind := strings.ToLower(gvk.Kind[:1]) + gvk.Kind[1:]ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger {log := logif req != nil {log = log.WithValues(lowerCamelCaseKind, klog.KRef(req.Namespace, req.Name),"namespace", req.Namespace, "name", req.Name,)}return log}}// 这里就是构建一个新的控制器了,也就是前面说到的 manager.New()blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)return err
}

manager.New()

start Manager

接下来是manager的启动,也就是对应的 start() 与 doWatch()

通过下述代码我们可以看出来,对于 doWatch() 就是把 compete() 前的一些资源的事件函数都注入到controller 中

func (blder *Builder) doWatch() error {// 调解类型,这也也就是对于For的obj来说,我们需要的是什么结构的,如非结构化数据或metadata-only// metadata-only就是配置成一个GVK schema.GroupVersionKindtypeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)if err != nil {return err}&source.Kind{}// 一些准备工作,将对象封装为&source.Kind{}// src := &source.Kind{Type: typeForSrc}hdler := &handler.EnqueueRequestForObject{} // 就是包含obj的一个事件队列allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)// 这里又到之前说过的controller watch了// 将一系列的准备动作注入到cache 如 source eventHandler predicateif err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {return err}// 再重复 ownsInput 动作for _, own := range blder.ownsInput {typeForSrc, err := blder.project(own.object, own.objectProjection)if err != nil {return err}src := &source.Kind{Type: typeForSrc}hdler := &handler.EnqueueRequestForOwner{OwnerType:    blder.forInput.object,IsController: true,}allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)allPredicates = append(allPredicates, own.predicates...)if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {return err}}// 在对 ownsInput 进行重复的操作for _, w := range blder.watchesInput {allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)allPredicates = append(allPredicates, w.predicates...)// If the source of this watch is of type *source.Kind, project it.if srckind, ok := w.src.(*source.Kind); ok {typeForSrc, err := blder.project(srckind.Type, w.objectProjection)if err != nil {return err}srckind.Type = typeForSrc}if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {return err}}return nil
}

由于前两部 builder 的操作将 mgr 指针传入到 builder中,并且操作了 complete() ,也就是操作了 build() ,这代表了对 controller 完成了初始化,和事件注入(watch)的操作,所以 Start(),就是将controller启动

func (cm *controllerManager) Start(ctx context.Context) (err error) {cm.Lock()if cm.started {cm.Unlock()return errors.New("manager already started")}var ready booldefer func() {if !ready {cm.Unlock()}}()// Initialize the internal context.cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)// 这个channel代表了controller的停止stopComplete := make(chan struct{})defer close(stopComplete)// This must be deferred after closing stopComplete, otherwise we deadlock.defer func() {stopErr := cm.engageStopProcedure(stopComplete)if stopErr != nil {if err != nil {err = kerrors.NewAggregate([]error{err, stopErr})} else {err = stopErr}}}()// Add the cluster runnable.if err := cm.add(cm.cluster); err != nil {return fmt.Errorf("failed to add cluster to runnables: %w", err)}// 指标类if cm.metricsListener != nil {cm.serveMetrics()}if cm.healthProbeListener != nil {cm.serveHealthProbes()}if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {if !errors.Is(err, wait.ErrWaitTimeout) {return err}}// 等待informer同步完成if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {if !errors.Is(err, wait.ErrWaitTimeout) {return err}}// 非选举模式,runnable将在cache同步完成后启动if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {if !errors.Is(err, wait.ErrWaitTimeout) {return err}}// Start the leader election and all required runnables.{ctx, cancel := context.WithCancel(context.Background())cm.leaderElectionCancel = cancelgo func() {if cm.resourceLock != nil {if err := cm.startLeaderElection(ctx); err != nil {cm.errChan <- err}} else {// Treat not having leader election enabled the same as being elected.if err := cm.startLeaderElectionRunnables(); err != nil {cm.errChan <- err}close(cm.elected)}}()}ready = truecm.Unlock()select {case <-ctx.Done():// We are donereturn nilcase err := <-cm.errChan:// Error starting or running a runnablereturn err}
}

可以看到上面启动了4种类型的runnable,实际上就是对这runnable进行启动,例如 controller,cache等。

回顾一下,我们之前在使用code-generator 生成,并自定义controller时,我们也是通过启动 informer.Start() ,否则会报错。

最后可以通过一张关系图来表示,client-go与controller-manager之间的关系

image

Reference

diving controller runtime

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/400786.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构练习 00-自测3. 数组元素循环右移问题 (20)

一个数组A中存有N&#xff08;N>0&#xff09;个整数&#xff0c;在不允许使用另外数组的前提下&#xff0c;将每个整数循环向右移M&#xff08;M>0&#xff09;个位置&#xff0c;即将A中的数据由&#xff08;A0A1……AN-1&#xff09;变换为&#xff08;AN-M …… AN-1…

MSP432P401R TI Drivers 库函数学习笔记(一)导入工程模板

目录从Resource Explorer中导入从SDK导入测试平台&#xff1a;Code Composer Studio 10.4.0 MSP432P401R SimpleLink™ 微控制器 LaunchPad™ 开发套件 (MSP-EXP432P401R) 从Resource Explorer中导入 本文所用编程平台为Code Composer Studio 10.4.0 启动软件后在Help中打开…

ThreadLocal的简单理解

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

JMS 在 SpringBoot 中的使用

当前环境 Mac OS 10.11.xdocker 1.12.1JDK 1.8SpringBoot 1.5前言 基于之前一篇“一个故事告诉你什么是消息队列”&#xff0c;了解了消息队列的使用场景以及相关的特性。本文主要讲述消息服务在 JAVA 中的使用。 市面上的有关消息队列的技术选型非常多&#xff0c;如果我们的代…

MSP432P401R TI Drivers 库函数学习笔记(二)认识TI-RTOS (TI-POSIX)

目录简介TI-RTOSFreeRTOSPOSIX运行时对象查看器 (Runtime Object Viewer)TI-POSIX 介绍在源代码中使用 POSIXTI-POSIX支持的函数摘要线程函数调用的前后关系线程管理线程属性线程同步障碍属性条件变量条件变量属性互斥锁互斥属性读写锁定读写锁属性辅助函数调用的前后关系时钟消…

arcgis创建postgre企业级数据库

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

[floyd+路径输出]HDU1385 Minimum Transport Cost

题目链接 题目翻译: 有N个城市&#xff0c;然后直接给出这些城市之间的邻接矩阵&#xff0c;矩阵中-1代表那两个城市无道路相连&#xff0c;其他值代表路径长度。 如果一辆汽车经过某个城市&#xff0c;必须要交一定的钱(可能是过路费)。 现在要从a城到b城&#xff0c;花费为路…

MSP432P401R TI Drivers 库函数学习笔记(三)认识任务的创建及图形化配置

目录任务的简单创建图形化配置上手简简单单点个灯配置引脚建立任务实验结果整体代码main.cmyTask.cmyTask.h平台&#xff1a;Code Composer Studio 10.4.0 MSP432P401R SimpleLink™ 微控制器 LaunchPad™ 开发套件 (MSP-EXP432P401R) 任务的简单创建 根据上篇机翻的函数作用…

JavaScript实现自适应宽度的瀑布流

摘要: 主要介绍瀑布流的一种实现方法&#xff1a;绝对定位(css)javascriptajaxjson。简单一点如果不做滚动加载的话就是绝对定位(css)javascript了&#xff0c;ajax和json是滚动加载更多内容的时候用到的。 这样的布局并不陌生&#xff0c;从2011年Pinterest创立以来&#xff0…

.net web 开发平台- 表单设计器 一(web版)

如今为了适应需求的不断变化&#xff0c;动态表单设计器应运而生。它主要是为了满足界面的不断变化和提高开发速度。比如&#xff1a;一些页面客户可能也无法确定页面的终于布局&#xff0c;控件的位置&#xff0c;在哪种情况下显示或不显示等可能须要随时改动。为了应对这些需…

.NET程序配置文件操作(ini,cfg,config)

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

MSP432P401R TI Drivers 库函数学习笔记(四)GPIO

目录API头文件函数 (机翻)宏GPIO驱动程序api返回的通用状态代码GPIO_PinConfig输出引脚配置宏GPIO_PinConfig输入引脚配置宏GPIO_PinConfig中断配置宏特殊的GPIO_PinConfig配置宏类型别名示例配置引脚示例配置外部中断示例上机实战配置引脚main.cmyTask.cmyTask.h实验结果平台&…

# Vue3 toRef 和 toRefs 函数

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

二分检索函数lower_bound()和upper_bound()

二分检索函数lower_bound()和upper_bound() 一、说明 头文件&#xff1a;<algorithm> 二分检索函数lower_bound()和upper_bound() lower_bound()&#xff1a;找到大于等于某值的第一次出现upper_bound()&#xff1a;找到大于某值的第一次出现必须从小到大排序后才能用 内…

MSP432P401R TI Drivers 库函数学习笔记(五)PWM

目录API (机翻)函数上机实战配置引脚PWM初始化&#xff0c;实现简易呼吸灯的效果实验结果完整代码myPWM.cmyPWM.hmyTask.cmyTask.hmain.cmain.h平台&#xff1a;Code Composer Studio 10.4.0 MSP432P401R SimpleLink™ 微控制器 LaunchPad™ 开发套件 (MSP-EXP432P401R) API …

防火墙配置十大任务之十,构建虚拟防火墙

防火墙配置任务十构建虚拟防火墙任务拓扑图10.11.inside区域的交换机的基本配置&#xff0c;在交换机上开启vlan2&#xff0c;vlan3&#xff0c;vlan4.三个vlan。图10.22.outside区域的Internet基本配置。图10.33.交换机上连接防火墙接口的配置。图10.44.inside区域各个PC的主机…

使用纯 CSS 实现超酷炫的粘性气泡效果

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

MSP432P401R TI Drivers 库函数学习笔记(六)UART 串口

目录API (机翻)上机实战配置初始化和实验实验结果整体代码myUart.cmyUart.hmyTask.cmyTask.hmain.cmain.h平台&#xff1a;Code Composer Studio 10.4.0 MSP432P401R SimpleLink™ 微控制器 LaunchPad™ 开发套件 (MSP-EXP432P401R) API (机翻) 官方UART API 手册 void UA…

Linux 批量杀死进程(详细版本)

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

MSP432 库函数实现 PID 电机调角度、调速

目录引脚配置PWM引脚外部中断测量编码器引脚配置代码部分初始化编码器解读Encoder.cEncoder.h测速和控制部分卡尔曼滤波器&#xff0c;用于对所测速度进行滤波kalman.ckalman.h实验效果速度滤波效果控速效果控角效果平台&#xff1a;Code Composer Studio 10.4.0 MSP432P401R …