k8s的operator基石:controller-runtime源码解析

写在之前

今天开始开更controller-runtime的源码阅读,笔者建议大家在阅读前了解以下知识,可能会帮助大家更好的理解源码逻辑。

1.client-go的基础使用
2. 使用kubebuilder搭建一个简单的controller-runtime环境
3.informer的基本思想

1.源码环境搭建

参考链接:https://book.kubebuilder.io/cronjob-tutorial/cronjob-tutorial

2.源码阅读

2.1 万物伊始,问题的关键是定位关键的问题

首先定位controller的核心代码逻辑,main.go,如果你是使用kububuilder生成的代码,该代码在cmd文件夹下。排除掉杂七杂八的flag解析、日志实体初始化逻辑后,main方法的核心逻辑大概有分为三个步骤:

  1. 构建manage
  2. 向manage中注册自定义的Reconciler方法
  3. 启动manage
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme:                 scheme,MetricsBindAddress:     metricsAddr,Port:                   9443,HealthProbeBindAddress: probeAddr,LeaderElection:         enableLeaderElection,LeaderElectionID:       "80807133.tutorial.kubebuilder.io",// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily// when the Manager ends. This requires the binary to immediately end when the// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly// speeds up voluntary leader transitions as the new leader don't have to wait// LeaseDuration time first.//// In the default scaffold provided, the program ends immediately after// the manager stops, so would be fine to enable this option. However,// if you are doing or is intended to do any operation such as perform cleanups// after the manager stops then its usage might be unsafe.// LeaderElectionReleaseOnCancel: true,
})
....
if err = (&controller.CronJobReconciler{Client: mgr.GetClient(),Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "CronJob")os.Exit(1)
}
......
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {setupLog.Error(err, "problem running manager")os.Exit(1)
}

接下来我按照这三条链路逐步进行代码分析。

2.2 ctrl.NewManager

这个方法的注释写的是returns a new Manager for creating Controllers.创建controller的管理器,主要是一些初始化的逻辑,构建controllerManager结构体

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme:                 scheme,MetricsBindAddress:     metricsAddr,Port:                   9443,HealthProbeBindAddress: probeAddr,LeaderElection:         enableLeaderElection,LeaderElectionID:       "80807133.tutorial.kubebuilder.io",// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily// when the Manager ends. This requires the binary to immediately end when the// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly// speeds up voluntary leader transitions as the new leader don't have to wait// LeaseDuration time first.//// In the default scaffold provided, the program ends immediately after// the manager stops, so would be fine to enable this option. However,// if you are doing or is intended to do any operation such as perform cleanups// after the manager stops then its usage might be unsafe.// LeaderElectionReleaseOnCancel: true,
})

上述方法的入参由两部分组成,一部分是ctrl.GetConfigOrDie(),一部分是options,因k8s的client初始化的时候需要加载kubeconfig,所以猜测方法一是kubeconfig加载的核心流程。现在开始追踪ctrl.GetConfigOrDie()去看看kubeconfig是如何加载。

2.2.1 kubeconfig的加载逻辑

我们沿着代码执行链路一步步追踪加载kubeconfig的具体的实现位置。

// 这里是核心逻辑
func loadConfig(context string) (config *rest.Config, configErr error) {// If a flag is specified with the config location, use that// 1.这里的kubeconfig是哪里来的呢,这个参数不是当前function的私有参数,我们可以追踪这个参数的初始化位置,可以查询这个参数的来源if len(kubeconfig) > 0 {// loadConfigWithContext 这个方法似乎是加载client的核心逻辑,整个方法中引用了两次return loadConfigWithContext("", &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig}, context)}// 如果flag中没有传递kubeconfig,那么就从环境变量中获取kubeconfig文件的所在处,通过文件初始化rest.config//RecommendedConfigPathEnvVar = "KUBECONFIG",获取环境变量指向的kubeconfig所在位置kubeconfigPath := os.Getenv(clientcmd.RecommendedConfigPathEnvVar)if len(kubeconfigPath) == 0 {// 从容器中获取token初始化rest.config,参考2c, err := loadInClusterConfig()if err == nil {return c, nil}defer func() {if configErr != nil {log.Error(err, "unable to load in-cluster config")}}()}// 这里定义了kubeconfig加载的规则,会遍历~/.kube或者环境变量中定义的KUBECONFIG路径去加载配置文件loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()if _, ok := os.LookupEnv("HOME"); !ok {u, err := user.Current()if err != nil {return nil, fmt.Errorf("could not get current user: %w", err)}loadingRules.Precedence = append(loadingRules.Precedence, filepath.Join(u.HomeDir, clientcmd.RecommendedHomeDir, clientcmd.RecommendedFileName))}// 这里是核心的client初始化逻辑,参考3.return loadConfigWithContext("", loadingRules, context)
}

1)在当前代码文件中检索到了kubeconfig这个参数的初始化逻辑,该参数是从名为kubeconfig的flag中解析获得的,只要在operator的启动命令中传递了kubeconfig的flag标识,就可以解析到逻辑中

func init() {RegisterFlags(flag.CommandLine)
}const KubeconfigFlagName = "kubeconfig"
func RegisterFlags(fs *flag.FlagSet) {if fs == nil {fs = flag.CommandLine}// KubeconfigFlagName的值是kubeconfigif f := fs.Lookup(KubeconfigFlagName); f != nil {kubeconfig = f.Value.String()} else {fs.StringVar(&kubeconfig, KubeconfigFlagName, "", "Paths to a kubeconfig. Only required if out-of-cluster.")}
}

2) 从pod中初始化rest.config

//InClusterConfig 返回一个配置对象,该对象使用 kubernetes 提供给 pod 的服务帐户。
//它适用于希望在 kubernetes 上运行的 pod 内运行的客户端。如果从不在 kubernetes 环境中运行的进程调用,
//它将返回 ErrNotInCluster。
func InClusterConfig() (*Config, error) {const (tokenFile  = "/var/run/secrets/kubernetes.io/serviceaccount/token"rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")// 如果好奇这两个值的含义,我可以在后文中贴上在pod中检索这两个环境变量的贴图host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")if len(host) == 0 || len(port) == 0 {return nil, ErrNotInCluster}token, err := os.ReadFile(tokenFile)if err != nil {return nil, err}tlsClientConfig := TLSClientConfig{}if _, err := certutil.NewPool(rootCAFile); err != nil {// 这一步主要是验证这个证书路径是否合法klog.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)} else {tlsClientConfig.CAFile = rootCAFile}// 返回rest.configreturn &Config{// TODO: switch to using cluster DNS.Host:            "https://" + net.JoinHostPort(host, port), //这里是吧host:port做ipv4和ipv6的格式转换TLSClientConfig: tlsClientConfig,BearerToken:     string(token),BearerTokenFile: tokenFile,}, nil
}

在这里插入图片描述
在这里插入图片描述
3)restconfig初始化逻辑
这里是client go初始化rest.config的标准function,这里不做解读了。

func loadConfigWithContext(apiServerURL string, loader clientcmd.ClientConfigLoader, context string) (*rest.Config, error) {return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loader,&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: apiServerURL,},CurrentContext: context,}).ClientConfig()
}

2.2.2 开始探索ctrl.NewManager 这个方法

这个方法的注释写的是returns a new Manager for creating Controllers.创建controller的管理器,主要是一些初始化的逻辑,构建controllerManager结构体.

func New(config *rest.Config, options Options) (Manager, error) {// Set default values for options fields// 2.1.设置options的默认值,这里是对于options中未被用户显示传入的参数进行进行默认值赋值options = setOptionsDefaults(options)// 2.2这里是对cluster默认初始化,cluster类就是与集群进行交互的实体类cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {clusterOptions.Scheme = options.SchemeclusterOptions.MapperProvider = options.MapperProviderclusterOptions.Logger = options.LoggerclusterOptions.SyncPeriod = options.SyncPeriodclusterOptions.Namespace = options.NamespaceclusterOptions.NewCache = options.NewCacheclusterOptions.NewClient = options.NewClientclusterOptions.ClientDisableCacheFor = options.ClientDisableCacheForclusterOptions.DryRunClient = options.DryRunClientclusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck})if err != nil {return nil, err}......return &controllerManager{stopProcedureEngaged:          pointer.Int64(0),cluster:                       cluster,runnables:                     runnables,errChan:                       errChan,recorderProvider:              recorderProvider,resourceLock:                  resourceLock,metricsListener:               metricsListener,metricsExtraHandlers:          metricsExtraHandlers,controllerOptions:             options.Controller,logger:                        options.Logger,elected:                       make(chan struct{}),port:                          options.Port,host:                          options.Host,certDir:                       options.CertDir,tlsOpts:                       options.TLSOpts,webhookServer:                 options.WebhookServer,leaderElectionID:              options.LeaderElectionID,leaseDuration:                 *options.LeaseDuration,renewDeadline:                 *options.RenewDeadline,retryPeriod:                   *options.RetryPeriod,healthProbeListener:           healthProbeListener,readinessEndpointName:         options.ReadinessEndpointName,livenessEndpointName:          options.LivenessEndpointName,gracefulShutdownTimeout:       *options.GracefulShutdownTimeout,internalProceduresStop:        make(chan struct{}),leaderElectionStopped:         make(chan struct{}),leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,}, nil
}

2.3 SetupWithManager 注册自定义的Reconciler

我们首先把Reconciler的接口定义贴上来,这是整个opeator中为数不多的需要自定义编码的位置,这里的reconcile是用户针对指定k8s资源的变动事件(增、删除、改)的自定义处理步骤,你可以理解为informer的eventHandler中的update、add、delete处理逻辑都放在 Reconcile(context.Context, Request) (Result, error)的方法实现中,由用户自己判断资源object处于哪一类事件的状态中,并执行相应的处理逻辑。

type Reconciler interface {Reconcile(context.Context, Request) (Result, error)
}

在main方法中的应用位置是:

if err = (&controller.CronJobReconciler{Client: mgr.GetClient(),Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "CronJob")os.Exit(1)
}

我们追踪SetupWithManager的代码逻辑,从NewControllerManagedBy到For,再到Owns都是Builder这个结构体的构建过程,这里不做展开,这里只介绍Complete的代码逻辑。

func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {.....// 核心逻辑return ctrl.NewControllerManagedBy(mgr).For(&batchv1.CronJob{}).Owns(&kbatch.Job{}).Complete(r)
}

跳过一些冗余的代码逻辑,我们快进到核心逻辑。在下面的Build方法中,主要执行了两块核心逻辑:

  • 核心逻辑一:初始化controller
  • 核心逻辑二:初始化watch逻辑
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {if r == nil {return nil, fmt.Errorf("must provide a non-nil Reconciler")}if blder.mgr == nil {return nil, fmt.Errorf("must provide a non-nil Manager")}if blder.forInput.err != nil {return nil, blder.forInput.err}// Set the ControllerManagedBy// 核心逻辑一:初始化controllerif err := blder.doController(r); err != nil {return nil, err}// Set the Watch// 核心逻辑二:初始化watch逻辑if err := blder.doWatch(); err != nil {return nil, err}return blder.ctrl, nil
}

我们沿着这两条链路逐条进行分析。

2.3.1 controller的初始化

func (blder *Builder) doController(r reconcile.Reconciler) error {...... 构建ctrlOptions// Build the controller and return.// 初始化controllerblder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)return err
}

继续深入newController看一下这个结构体是怎么初始化的,下面是核心实现代码,主要做了两件事情

  • 1.初始化一个controller结构体
  • 2.把controller添加到了controllerManage中

func New(name string, mgr manager.Manager, options Options) (Controller, error) {c, err := NewUnmanaged(name, mgr, options)if err != nil {return nil, err}// Add the controller as a Manager componentsreturn c, mgr.Add(c)
}

mgr.Add©这个逻辑比较简单,就是把新生成的controller添加到manager中的缓存结构体中,我们来看看NewUnmanaged中的controller
初始化字段有哪些:

func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {// Create controller with dependencies setreturn &controller.Controller{// 这个就是自定义的ReconcilerDo: options.Reconciler,// 这个有一个队列,记住,后面要考MakeQueue: func() workqueue.RateLimitingInterface {return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{Name: name,})},MaxConcurrentReconciles: options.MaxConcurrentReconciles,CacheSyncTimeout:        options.CacheSyncTimeout,Name:                    name,LogConstructor:          options.LogConstructor,RecoverPanic:            options.RecoverPanic,LeaderElected:           options.NeedLeaderElection,}, nil
}

2.3.2 doWatch初始化watch逻辑

代码实现展示在下文,这里有两个问题没有解释,一个是
source.Kind这个结构体有什么作用,一个就是allPredicates这个实体也没有解释。我们先留个坑,目前只有定义,在实际调用的时候我们在进行解释。

func (blder *Builder) doWatch() error {// Reconcile typeif blder.forInput.object != nil {obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)if err != nil {return err}// 这一个是什么source?src := source.Kind(blder.mgr.GetCache(), obj)// 这里是事件处理的eventHandler结构体,定义了队列处理逻辑hdler := &handler.EnqueueRequestForObject{}allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)allPredicates = append(allPredicates, blder.forInput.predicates...)// 看起来这里是核心逻辑执行的位置if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {return err}}......后续逻辑大同小异

我们继续深入blder.ctrl.Watch的逻辑中

func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {c.mu.Lock()defer c.mu.Unlock()if !c.Started {// 首次调用这个方法的时候进入到这个逻辑中,看起来只是把src、handler、predicate包装成一个机构体存储在watches中c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})return nil}c.LogConstructor(nil).Info("Starting EventSource", "source", src)return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

关于下面的src.Start,这个步骤虽然在我们的核心流程中没有执行,但是这个方法的实现似乎有助于我们理解source.Kind这个数据结构,我们就浪费点时间,进入到start中看一下:

  • 启动了一个定时任务,循环检索献相应的informer是否已经被声明
  • 往这个informer中添加handler的处理逻辑
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,prct ...predicate.Predicate) error {....省略无关逻辑go func() {var (i       cache.InformerlastErr error)// Tries to get an informer until it returns true,// 这个步骤一直循环在寻找一个相应的informerif err := wait.PollUntilContextCancel(ctx, 10*time.Second, true, func(ctx context.Context) (bool, error) {// Lookup the Informer from the Cache and add an EventHandler which populates the Queuei, lastErr = ks.Cache.GetInformer(ctx, ks.Type)if lastErr != nil {kindMatchErr := &meta.NoKindMatchError{}switch {case errors.As(lastErr, &kindMatchErr):log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start","kind", kindMatchErr.GroupKind)case runtime.IsNotRegisteredError(lastErr):log.Error(lastErr, "kind must be registered to the Scheme")default:log.Error(lastErr, "failed to get informer from cache")}return false, nil // Retry.}return true, nil}); err != nil {if lastErr != nil {ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)return}ks.started <- errreturn}// 对informer添加handler处理逻辑_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())if err != nil {ks.started <- errreturn}if !ks.Cache.WaitForCacheSync(ctx) {// Would be great to return something more informative hereks.started <- errors.New("cache did not sync")}close(ks.started)}()return nil
}

看到这里,可能大家已经迷糊了,不要着急,我们一步一步的进行分析,因为不仅是大家迷糊,我也迷糊,这乱七八糟的是什么啊。首先看一下这个cache的真实实现是什么?

func New(cfg *rest.Config, opts Options) (Cache, error) {....配置项解析newCacheFunc := newCache(cfg, opts)var defaultCache Cacheif len(opts.DefaultNamespaces) > 0 {//这一步暂时忽略defaultConfig := optionDefaultsToConfig(&opts)defaultCache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, opts.DefaultNamespaces, &defaultConfig)} else {// 这一步就是构建的informerCache的实体类defaultCache = newCacheFunc(optionDefaultsToConfig(&opts), corev1.NamespaceAll)}if len(opts.ByObject) == 0 {return defaultCache, nil}// 分类将不同资源的informer对象保存在这个结构体中delegating := &delegatingByGVKCache{scheme:       opts.Scheme,//这里的ByObject已经通过追踪是在初始化的options中的cache字段配置的caches:       make(map[schema.GroupVersionKind]Cache, len(opts.ByObject)),defaultCache: defaultCache,}for obj, config := range opts.ByObject {gvk, err := apiutil.GVKForObject(obj, opts.Scheme)if err != nil {return nil, fmt.Errorf("failed to get GVK for type %T: %w", obj, err)}var cache Cacheif len(config.Namespaces) > 0 {cache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, config.Namespaces, nil)} else {cache = newCacheFunc(byObjectToConfig(config), corev1.NamespaceAll)}delegating.caches[gvk] = cache}return delegating, nil
}//这个方法就是构建一个informerCache的实体类
func newCache(restConfig *rest.Config, opts Options) newCacheFunc {return func(config Config, namespace string) Cache {return &informerCache{scheme: opts.Scheme,Informers: internal.NewInformers(restConfig, &internal.InformersOpts{HTTPClient:   opts.HTTPClient,Scheme:       opts.Scheme,Mapper:       opts.Mapper,ResyncPeriod: *opts.SyncPeriod,Namespace:    namespace,Selector: internal.Selector{Label: config.LabelSelector,Field: config.FieldSelector,},Transform:             config.Transform,UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false),NewInformer:           opts.newInformer,}),readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,}}
}

这一步骤看起来和sharedInformerFactory比较像,里面定义了一个map缓存,保存了GVK和对应的informer的单例实现。现在我们可以根据delegatingByGVKCache来寻找GetInformer的具体实现方法是什么了,从下面的三个方法来看,这一步骤主要是用来检索实体结构体对应的informerCache实体。

func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {// 这一步是从delegatingByGVKCache的map缓存中获取informerCache的结构体cache, err := dbt.cacheForObject(obj)if err != nil {return nil, err}// 这一步真正实现是在informerCache的方法中体现的return cache.GetInformer(ctx, obj, opts...)
}
func (dbt *delegatingByGVKCache) cacheForObject(o runtime.Object) (Cache, error) {gvk, err := apiutil.GVKForObject(o, dbt.scheme)if err != nil {return nil, err}gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")return dbt.cacheForGVK(gvk), nil
}func (dbt *delegatingByGVKCache) cacheForGVK(gvk schema.GroupVersionKind) Cache {if specific, hasSpecific := dbt.caches[gvk]; hasSpecific {return specific}return dbt.defaultCache
}func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {gvk, err := apiutil.GVKForObject(obj, ic.scheme)if err != nil {return nil, err}// 这一步有兴趣的读者可以深入了解,这里主要是进行数据的转换,转换的结果就是返回了一个sharedIndexedInformer_, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...))if err != nil {return nil, err}return i.Informer, nil
}

到这里,似乎我们就可以继续分析_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs()),这行的代码逻辑了。这里就和informer串起来了。


func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler {return &EventHandler{ctx:        ctx,handler:    handler,queue:      queue,predicates: predicates,}
}
func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs {return cache.ResourceEventHandlerFuncs{AddFunc:    e.OnAdd,UpdateFunc: e.OnUpdate,DeleteFunc: e.OnDelete,}
}

这三个处理逻辑是差不多的,我们看一下这里OnAdd这个方法的实现逻辑,以点代面去看一下具体的执行逻辑。

// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e *EventHandler) OnAdd(obj interface{}) {c := event.CreateEvent{}// Pull Object out of the objectif o, ok := obj.(client.Object); ok {c.Object = o} else {log.Error(nil, "OnAdd missing Object","object", obj, "type", fmt.Sprintf("%T", obj))return}//上面的内容索然无味,这里值得注意一下,先放直接放结论,之前我们没有分析predicates这个结构体,看起来这里是filer,用来过滤一些不关注的事件for _, p := range e.predicates {if !p.Create(c) {return}}// Invoke create handlerctx, cancel := context.WithCancel(e.ctx)defer cancel()// e.handler.Create(ctx, c, e.queue)
}

既然提到了,我们去找一下这个predicates的实现位置,他是在Builder结构体提供的一个WithEventFilter方法设置的,具体的使用方式的构建一个predicate.Predicate实体类,在这个实体类中定义不同事件类型的filter逻辑,可以过滤掉一些我们不关系的变更事件。

type Predicate interface {// Create returns true if the Create event should be processedCreate(event.CreateEvent) bool// Delete returns true if the Delete event should be processedDelete(event.DeleteEvent) bool// Update returns true if the Update event should be processedUpdate(event.UpdateEvent) bool// Generic returns true if the Generic event should be processedGeneric(event.GenericEvent) bool
}

我们继续追踪 e.handler.Create(ctx, c, e.queue)这个方法,看看里面发生了什么,这里的handler的真实实现位置是hdler := &handler.EnqueueRequestForObject{}。

// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {if evt.Object == nil {enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)return}//这里是往队列中添加了一个request事件q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name:      evt.Object.GetName(),Namespace: evt.Object.GetNamespace(),}})
}

现在我们可以总结一下这里的source.start做了一些什么事情了。当事件的informer监听到资源发生变换时,会触发一个handler.EnqueueRequestForObject{}的事件处理逻辑,将这个事封装成reconcile.Request{}结构体放置到controller对应的限速队列中去。

2.4 mgr.Start(ctrl.SetupSignalHandler())

启动controllerManage。我们仅截取关键代码进行分析。

2.4.1 cluster初始化=informer初始化

....
// Start and wait for caches.
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {if err != nil {return fmt.Errorf("failed to start caches: %w", err)}
}

这里的cluster我们在上文中已经见识过他的初始化逻辑了。

return &cluster{config:           originalConfig,httpClient:       options.HTTPClient,scheme:           options.Scheme,cache:            cache,fieldIndexes:     cache,client:           clientWriter,apiReader:        clientReader,recorderProvider: recorderProvider,mapper:           mapper,logger:           options.Logger,
}, nil

我们来看一下这里的cm.add做了些什么事情,他往cm的runnables这个结构体中添加了一个Runnable实体,Runnable接口中只包含了一个Start()方法。

func (cm *controllerManager) add(r Runnable) error {return cm.runnables.Add(r)
}

我们先看一下runnables里面的定义,看起来是对Runnable进行了分类存放。

type runnables struct {HTTPServers    *runnableGroupWebhooks       *runnableGroupCaches         *runnableGroupLeaderElection *runnableGroupOthers         *runnableGroup
}type runnableGroup struct {ctx    context.Contextcancel context.CancelFuncstart        sync.MutexstartOnce    sync.Oncestarted      boolstartQueue   []*readyRunnablestartReadyCh chan *readyRunnablestop     sync.RWMutexstopOnce sync.Oncestopped  bool// errChan is the error channel passed by the caller// when the group is created.// All errors are forwarded to this channel once they occur.errChan chan error// ch is the internal channel where the runnables are read off from.ch chan *readyRunnable// wg is an internal sync.WaitGroup that allows us to properly stop// and wait for all the runnables to finish before returning.wg *sync.WaitGroup
}

我们看一下Add方法,印证了我们之前的猜测

func (r *runnables) Add(fn Runnable) error {switch runnable := fn.(type) {case *server:return r.HTTPServers.Add(fn, nil)case hasCache:return r.Caches.Add(fn, func(ctx context.Context) bool {return runnable.GetCache().WaitForCacheSync(ctx)})case webhook.Server:return r.Webhooks.Add(fn, nil)case LeaderElectionRunnable:if !runnable.NeedLeaderElection() {return r.Others.Add(fn, nil)}return r.LeaderElection.Add(fn, nil)default:return r.LeaderElection.Add(fn, nil)}
}

cluster属于hasCache的实现,我们看看他做了些什么事情。把runnable和runnale启动检验的逻辑包装到readyRunnable这个实体,然后做了两件事情:

  • 如果没有启动,把runnable放到startQueue这个队列中
  • 如果启动了,把runnable放到 r.ch这个channel通道中

接下来就是启动的步骤,我们看看start中做了些什么事情。

func (r *runnableGroup) Start(ctx context.Context) error {var retErr errorr.startOnce.Do(func() {defer close(r.startReadyCh)// Start the internal reconciler.// 启动reconcile,这里启动所有的自定义逻辑go r.reconcile()// Start the group and queue up all// the runnables that were added prior.r.start.Lock()r.started = true//还记得上文Add逻辑吗,这里是将startQueue中的readyRunnable实体塞到ch这个channel中for _, rn := range r.startQueue {rn.signalReady = truer.ch <- rn}r.start.Unlock()// If we don't have any queue, return.if len(r.startQueue) == 0 {return}// Wait for all runnables to signal.// 判断是否所有的runnables都已经启动for {select {case <-ctx.Done():if err := ctx.Err(); !errors.Is(err, context.Canceled) {retErr = err}// 这里判断readyRunnable是不是已经启动了,如果启动了就从startQueue中删除case rn := <-r.startReadyCh:for i, existing := range r.startQueue {if existing == rn {// Remove the item from the start queue.r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)break}}// We're done waiting if the queue is empty, return.if len(r.startQueue) == 0 {return}}}})return retErr
}

我们回过头来分析一下,r.reconcile()做了些什么事情

func (r *runnableGroup) reconcile() {for runnable := range r.ch {....// Start the runnable.go func(rn *readyRunnable) {go func() {//如果检查执行完毕后if rn.Check(r.ctx) {if rn.signalReady {//传入到startReadyCh的channel中来r.startReadyCh <- rn}}}().....// 执行start函数if err := rn.Start(r.ctx); err != nil {r.errChan <- err}}(runnable)}
}

这里cluster的start函数执行逻辑是

func (c *cluster) Start(ctx context.Context) error {defer c.recorderProvider.Stop(ctx)return c.cache.Start(ctx)
}

我们找到这个start的具体实现逻辑,追踪下去,我们就见到了核心的informer.Run(ip.ctx.Done()),这个就是原生的informer的用法。

2.4.2 自定义的的reconciler的启动

如果还记得SetUpWithManager的逻辑,我们知道自定义的reconciler被包装成了Controller实体放到了cm.runnables.LeaderElection这个分组中了,如果,我们继续追踪Start方法内的代码逻辑.

{ctx, cancel := context.WithCancel(context.Background())cm.leaderElectionCancel = cancelgo func() {// 如果没有抢占资源锁,就继续等待if cm.resourceLock != nil {if err := cm.startLeaderElection(ctx); err != nil {cm.errChan <- err}} else {// Treat not having leader election enabled the same as being elected.// 如果选主成功,执行核心启动逻辑if err := cm.startLeaderElectionRunnables(); err != nil {cm.errChan <- err}close(cm.elected)}}()
}

最后一步了,cm.startLeaderElectionRunnables(),我们追踪一下核心启动逻辑,与cluster是不是异曲同工。

func (r *runnableGroup) Start(ctx context.Context) error {var retErr errorr.startOnce.Do(func() {defer close(r.startReadyCh)// Start the internal reconciler.go r.reconcile()// Start the group and queue up all// the runnables that were added prior.r.start.Lock()r.started = truefor _, rn := range r.startQueue {rn.signalReady = truer.ch <- rn}r.start.Unlock()// If we don't have any queue, return.if len(r.startQueue) == 0 {return}// Wait for all runnables to signal.for {select {case <-ctx.Done():if err := ctx.Err(); !errors.Is(err, context.Canceled) {retErr = err}case rn := <-r.startReadyCh:for i, existing := range r.startQueue {if existing == rn {// Remove the item from the start queue.r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)break}}// We're done waiting if the queue is empty, return.if len(r.startQueue) == 0 {return}}}})return retErr
}

我们深入看一下关键是Controller的start方法是什么。

func (c *Controller) Start(ctx context.Context) error {.....c.Queue = c.MakeQueue()......err := func() error {defer c.mu.Unlock()// TODO(pwittrock): Reconsider HandleCrashdefer utilruntime.HandleCrash()// NB(directxman12): launch the sources *before* trying to wait for the// caches to sync so that they have a chance to register their intendeded// caches.for _, watch := range c.startWatches {c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))// 这个start是不是很眼熟,就是上文分析的Kind的start方法,主要从informer中监听事件放到queue中if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {return err}}......wg.Add(c.MaxConcurrentReconciles)for i := 0; i < c.MaxConcurrentReconciles; i++ {go func() {defer wg.Done()// Run a worker thread that just dequeues items, processes them, and marks them done.// It enforces that the reconcileHandler is never invoked concurrently with the same object.for c.processNextWorkItem(ctx) {}}()}....}()if err != nil {return err}.......
}

我们之前之说生产者是怎么往controller的队列中添加数据的,而没有说队列中的事件是怎么消费数据的,核心逻辑就在processNextWorkItem这个方法中,

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {.......// 这里的c.Reconcile就是controllerresult, err := c.Reconcile(ctx, req)switch {case err != nil:if errors.Is(err, reconcile.TerminalError(nil)) {ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()} else {c.Queue.AddRateLimited(req)}ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()if !result.IsZero() {log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler")}log.Error(err, "Reconciler error")case result.RequeueAfter > 0:log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))// The result.RequeueAfter request will be lost, if it is returned// along with a non-nil error. But this is intended as// We need to drive to stable reconcile loops before queuing due// to result.RequestAfterc.Queue.Forget(obj)c.Queue.AddAfter(req, result.RequeueAfter)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()case result.Requeue:log.V(5).Info("Reconcile done, requeueing")c.Queue.AddRateLimited(req)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()default:log.V(5).Info("Reconcile successful")// Finally, if no error occurs we Forget this item so it does not// get queued again until another change happens.c.Queue.Forget(obj)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()}
}

我们先分析c.Reconcile做了些什么事情,主要是调用了controller中的Do属性的Reconcile方法,而Do就是我们自定义的Reconcile实体属性。

func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
......return c.Do.Reconcile(ctx, req)
}

现在回过头来分析c.reconcileHandler(ctx, obj).

switch {// 如果上一步执行有异常报错,1)忽略,添加通知 2)重新入队
case err != nil:if errors.Is(err, reconcile.TerminalError(nil)) {ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()} else {c.Queue.AddRateLimited(req)}.......// 如果返回的result有RequeueAfter这个属性字段,选择在一段时间后延迟入队
case result.RequeueAfter > 0:log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))// The result.RequeueAfter request will be lost, if it is returned// along with a non-nil error. But this is intended as// We need to drive to stable reconcile loops before queuing due// to result.RequestAfterc.Queue.Forget(obj)c.Queue.AddAfter(req, result.RequeueAfter)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
case result.Requeue:// 如果返回的result,直接入队log.V(5).Info("Reconcile done, requeueing")c.Queue.AddRateLimited(req)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
default:// 默认情况下,从队列中移除log.V(5).Info("Reconcile successful")// Finally, if no error occurs we Forget this item so it does not// get queued again until another change happens.c.Queue.Forget(obj)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
}

至此,controller-runtime的源码分析完毕.

遗留

workqueue.RateLimitingInterface这个队列的分析没写

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/658721.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代理模式(静态代理、JDK 动态代理、CGLIB 动态代理)

代理模式(静态代理、JDK 动态代理、CGLIB 动态代理) 一、代理模式概述1. 生活中的代理案例2. 为什么要使用代理3. 代理模式在 Java 中的应用4. 概述5. 生活中代理图示二、代理的实现方式1. Java 中代理图示2. 静态代理2.1 案例2.2 实现案例2.3 静态代理存在的问题三、动态代理…

SpringBoot的默认组件扫描

本篇博客主要探究&#xff1a;为什么SpringBoot项目中我们没有配置组件扫描的包&#xff0c;为什么它会默认扫描启动类所在的包&#xff1f; 一、访问与启动类所在同一包下的接口 我们先来看一个简单的接口&#xff1a; 我们可以观察到&#xff0c;HelloController这个类处在…

福布斯财富增长榜前十富豪身价暴增3.5万亿!他们致富的秘诀究竟是?

按照《福布斯》最新的数据显示&#xff0c;今年全球前十位财富增长最多的富豪的身家总共增加了4900亿美元&#xff08;约3.5万人民币&#xff09;&#xff0c;大家可能对于3.5万亿没什么概念&#xff0c;但是换算一下&#xff0c;中国一共才14亿人&#xff0c;如果把这3.5万亿平…

jenkins部署(docker)

docker部署&#xff0c;避免安装tomcat 1.拉镜像 docker pull jenkins/jenkins2.宿主机创建文件夹 mkdir -p /lzp/jenkins_home chmod 777 /lzp/jenkins_home/3.启动容器 docker run -d -p 49001:8080 -p 49000:50000 --privilegedtrue -v /lzp/jenkins_home:/var/jenkins_…

BUUCTF-Real-[PHP]XXE

目录 1、原理 2、XXE漏洞产生的原因 3、开始复现 paylaod 复现 4、flag 1、原理 XML数据在传输过程中&#xff0c;攻击者强制XML解析器去访问攻击者指定的资源内容&#xff08;本地/远程&#xff09;&#xff0c;外部实体声明关键字SYSTEM会令XML解析器读取数据&#xf…

【Axure教程0基础入门】00Axure9汉化版下载、安装、汉化、注册+01制作线框图

写在前面&#xff1a;在哔哩哔哩上面找到的Axure自学教程0基础入门课程&#xff0c;播放量最高&#xff0c;5个多小时。课程主要分为4个部分&#xff0c;快速入门、动态面板、常用动效、项目设计。UP主账号【Song老师产品经理课堂】。做个有素质的白嫖er&#xff0c;一键三连必…

【C/C++】深入理解--函数重载(什么是函数重载?为什么要有函数重载?)

目录 一、前言 二、 函数重载 &#x1f34e;什么是函数重载 &#x1f350;函数重载的条件 &#x1f347;函数重载的注意点 &#x1f349;为什么要有函数重载 &#x1f353;为何C语言不支持函数重载&#xff0c;反倒C可以&#xff1f; &#x1f4a6; Linux环境下演示函数重…

Cocos creator 动作系统

动作系统简介 是用于控制物体运动的一套系统&#xff0c;完全依赖代码进行实现&#xff0c;动态调节节点的移动。 移动 cc.moveTo 移动到某个坐标&#xff08;x,y&#xff09; //1秒时间内&#xff0c;移动到0,0let action1 cc.moveTo(1,0,0)this.node.runAction(action1)c…

基于单片机的烟草干燥温度控制系统设计

摘 要&#xff1a;烟草干燥研究一直备受国内外烟草工作者的重视&#xff0c;在烟草干燥的方法中热风管处理法是利用热空气对流使烟草达到干燥的效果&#xff0c;这样可以控制烟草干燥时的温度&#xff0c;使烟草能够更好更快地干燥&#xff0c;因此温度的检测和控制是很重要的。…

写静态页面——浮动练习

0、效果&#xff1a; 1、html代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>浮动…

C++-类和对象(2)

目录 5.类的作用域 6.类的实例化 7.类对象模型 7.1 如何计算类对象的大小 7.2 类对象的存储方式猜测 1.对象中包含类的各个成员 ​编辑 2.代码只保存一份&#xff0c;在对象中保存存放代码的地址 ​编辑 3.只保存成员变量&#xff0c;成员函数存放在公共的代码段 ​编辑…

C++进阶(八)红黑树

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、红黑树的概念二、红黑树的性质三、红黑树结构四、红黑树的插入操作1、情况一2、情况二3、…

【C++】构造函数和析构函数详解

目录 前言 类中的六个默认成员函数 构造函数 概念 特性 析构函数 概念 特性&#xff1a; 前言 类中的六个默认成员函数 如果一个类中什么成员都没有&#xff0c;简称为空类。 空类中真的什么都没有吗&#xff1f;并不是&#xff0c;任何类在什么都不写时&#xff0c;编…

【Cookie反爬虫】某采购网站动态Cookie加点选验证码校验分析与实战

文章目录 1. 写在前面2. 请求分析3. JS反混淆4. 深度分析 【作者主页】&#xff1a;吴秋霖 【作者介绍】&#xff1a;Python领域优质创作者、阿里云博客专家、华为云享专家。长期致力于Python与爬虫领域研究与开发工作&#xff01; 【作者推荐】&#xff1a;对JS逆向感兴趣的朋…

字符串转换const char* , char*,QByteArray,QString,string相互转换,支持中文

文章目录 1.char * 与 const char * 的转换2.QByteArray 与 char* 的转换3.QString 与 QByteArray 的转换4.QString 与 string 的转换5.QString与const string 的转换6.QString 与 char* 的转换 在开发中&#xff0c;经常会遇到需要将数据类型进行转换的情况&#xff0c;下面依…

【C++入门到精通】特殊类的设计 | 单例模式 [ C++入门 ]

阅读导航 引言一、设计模式概念&#xff08;了解&#xff09;二、单例模式1. 饿汉模式&#xff08;1&#xff09;概念&#xff08;2&#xff09;模拟实现&#xff08;3&#xff09;优缺点&#xff08;4&#xff09;适用场景 2. 懒汉模式&#xff08;1&#xff09;概念&#xff…

金田金业教你如何看懂国际黄金价格走势图

对于黄金投资者来说&#xff0c;看懂国际黄金价格走势图是至关重要的。通过观察走势图&#xff0c;可以了解金价的实时动态&#xff0c;预测未来的走势&#xff0c;从而做出相应的投资决策。本文将详细解析如何看懂国际黄金价格走势图。 一、国际黄金价格走势图的基本构成 国…

【JavaEE】UDP协议与TCP协议

作者主页&#xff1a;paper jie_博客 本文作者&#xff1a;大家好&#xff0c;我是paper jie&#xff0c;感谢你阅读本文&#xff0c;欢迎一建三连哦。 本文于《JavaEE》专栏&#xff0c;本专栏是针对于大学生&#xff0c;编程小白精心打造的。笔者用重金(时间和精力)打造&…

NoSQL数据库简介

NoSQL数据库简介 Brief Introduction to NoSQL Databases By JacksonML 1. 什么是SQL&#xff1f; 在了解NoSQL之前&#xff0c;先简要介绍一下SQL。 SQL是 Structured Query Language&#xff08;结构化查询语言&#xff09;的缩写。 SQL在关系型数据中广泛使用&#xf…

大数据学习之Redis,十大数据类型的具体应用(三)

目录 3.7 Redis位图&#xff08;bitmap&#xff09; 概念 需求 是什么 说明 能干嘛? 基本命令 3.7 Redis位图&#xff08;bitmap&#xff09; 概念 由0和1状态表现的二进制位的bit数组 需求 用户是否登陆过&#xff1f;Y / N 广告是否被点击过&#xff1f; 钉钉打…