写在之前
今天开始开更controller-runtime的源码阅读,笔者建议大家在阅读前了解以下知识,可能会帮助大家更好的理解源码逻辑。
1.client-go的基础使用
2. 使用kubebuilder搭建一个简单的controller-runtime环境
3.informer的基本思想
1.源码环境搭建
参考链接:https://book.kubebuilder.io/cronjob-tutorial/cronjob-tutorial
2.源码阅读
2.1 万物伊始,问题的关键是定位关键的问题
首先定位controller的核心代码逻辑,main.go,如果你是使用kububuilder生成的代码,该代码在cmd文件夹下。排除掉杂七杂八的flag解析、日志实体初始化逻辑后,main方法的核心逻辑大概有分为三个步骤:
- 构建manage
- 向manage中注册自定义的Reconciler方法
- 启动manage
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme,MetricsBindAddress: metricsAddr,Port: 9443,HealthProbeBindAddress: probeAddr,LeaderElection: enableLeaderElection,LeaderElectionID: "80807133.tutorial.kubebuilder.io",// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily// when the Manager ends. This requires the binary to immediately end when the// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly// speeds up voluntary leader transitions as the new leader don't have to wait// LeaseDuration time first.//// In the default scaffold provided, the program ends immediately after// the manager stops, so would be fine to enable this option. However,// if you are doing or is intended to do any operation such as perform cleanups// after the manager stops then its usage might be unsafe.// LeaderElectionReleaseOnCancel: true,
})
....
if err = (&controller.CronJobReconciler{Client: mgr.GetClient(),Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "CronJob")os.Exit(1)
}
......
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {setupLog.Error(err, "problem running manager")os.Exit(1)
}
接下来我按照这三条链路逐步进行代码分析。
2.2 ctrl.NewManager
这个方法的注释写的是returns a new Manager for creating Controllers.创建controller的管理器,主要是一些初始化的逻辑,构建controllerManager结构体
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme,MetricsBindAddress: metricsAddr,Port: 9443,HealthProbeBindAddress: probeAddr,LeaderElection: enableLeaderElection,LeaderElectionID: "80807133.tutorial.kubebuilder.io",// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily// when the Manager ends. This requires the binary to immediately end when the// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly// speeds up voluntary leader transitions as the new leader don't have to wait// LeaseDuration time first.//// In the default scaffold provided, the program ends immediately after// the manager stops, so would be fine to enable this option. However,// if you are doing or is intended to do any operation such as perform cleanups// after the manager stops then its usage might be unsafe.// LeaderElectionReleaseOnCancel: true,
})
上述方法的入参由两部分组成,一部分是ctrl.GetConfigOrDie(),一部分是options,因k8s的client初始化的时候需要加载kubeconfig,所以猜测方法一是kubeconfig加载的核心流程。现在开始追踪ctrl.GetConfigOrDie()去看看kubeconfig是如何加载。
2.2.1 kubeconfig的加载逻辑
我们沿着代码执行链路一步步追踪加载kubeconfig的具体的实现位置。
// 这里是核心逻辑
func loadConfig(context string) (config *rest.Config, configErr error) {// If a flag is specified with the config location, use that// 1.这里的kubeconfig是哪里来的呢,这个参数不是当前function的私有参数,我们可以追踪这个参数的初始化位置,可以查询这个参数的来源if len(kubeconfig) > 0 {// loadConfigWithContext 这个方法似乎是加载client的核心逻辑,整个方法中引用了两次return loadConfigWithContext("", &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig}, context)}// 如果flag中没有传递kubeconfig,那么就从环境变量中获取kubeconfig文件的所在处,通过文件初始化rest.config//RecommendedConfigPathEnvVar = "KUBECONFIG",获取环境变量指向的kubeconfig所在位置kubeconfigPath := os.Getenv(clientcmd.RecommendedConfigPathEnvVar)if len(kubeconfigPath) == 0 {// 从容器中获取token初始化rest.config,参考2c, err := loadInClusterConfig()if err == nil {return c, nil}defer func() {if configErr != nil {log.Error(err, "unable to load in-cluster config")}}()}// 这里定义了kubeconfig加载的规则,会遍历~/.kube或者环境变量中定义的KUBECONFIG路径去加载配置文件loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()if _, ok := os.LookupEnv("HOME"); !ok {u, err := user.Current()if err != nil {return nil, fmt.Errorf("could not get current user: %w", err)}loadingRules.Precedence = append(loadingRules.Precedence, filepath.Join(u.HomeDir, clientcmd.RecommendedHomeDir, clientcmd.RecommendedFileName))}// 这里是核心的client初始化逻辑,参考3.return loadConfigWithContext("", loadingRules, context)
}
1)在当前代码文件中检索到了kubeconfig这个参数的初始化逻辑,该参数是从名为kubeconfig的flag中解析获得的,只要在operator的启动命令中传递了kubeconfig的flag标识,就可以解析到逻辑中
func init() {RegisterFlags(flag.CommandLine)
}const KubeconfigFlagName = "kubeconfig"
func RegisterFlags(fs *flag.FlagSet) {if fs == nil {fs = flag.CommandLine}// KubeconfigFlagName的值是kubeconfigif f := fs.Lookup(KubeconfigFlagName); f != nil {kubeconfig = f.Value.String()} else {fs.StringVar(&kubeconfig, KubeconfigFlagName, "", "Paths to a kubeconfig. Only required if out-of-cluster.")}
}
2) 从pod中初始化rest.config
//InClusterConfig 返回一个配置对象,该对象使用 kubernetes 提供给 pod 的服务帐户。
//它适用于希望在 kubernetes 上运行的 pod 内运行的客户端。如果从不在 kubernetes 环境中运行的进程调用,
//它将返回 ErrNotInCluster。
func InClusterConfig() (*Config, error) {const (tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")// 如果好奇这两个值的含义,我可以在后文中贴上在pod中检索这两个环境变量的贴图host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")if len(host) == 0 || len(port) == 0 {return nil, ErrNotInCluster}token, err := os.ReadFile(tokenFile)if err != nil {return nil, err}tlsClientConfig := TLSClientConfig{}if _, err := certutil.NewPool(rootCAFile); err != nil {// 这一步主要是验证这个证书路径是否合法klog.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)} else {tlsClientConfig.CAFile = rootCAFile}// 返回rest.configreturn &Config{// TODO: switch to using cluster DNS.Host: "https://" + net.JoinHostPort(host, port), //这里是吧host:port做ipv4和ipv6的格式转换TLSClientConfig: tlsClientConfig,BearerToken: string(token),BearerTokenFile: tokenFile,}, nil
}
3)restconfig初始化逻辑
这里是client go初始化rest.config的标准function,这里不做解读了。
func loadConfigWithContext(apiServerURL string, loader clientcmd.ClientConfigLoader, context string) (*rest.Config, error) {return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loader,&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: apiServerURL,},CurrentContext: context,}).ClientConfig()
}
2.2.2 开始探索ctrl.NewManager 这个方法
这个方法的注释写的是returns a new Manager for creating Controllers.创建controller的管理器,主要是一些初始化的逻辑,构建controllerManager结构体.
func New(config *rest.Config, options Options) (Manager, error) {// Set default values for options fields// 2.1.设置options的默认值,这里是对于options中未被用户显示传入的参数进行进行默认值赋值options = setOptionsDefaults(options)// 2.2这里是对cluster默认初始化,cluster类就是与集群进行交互的实体类cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {clusterOptions.Scheme = options.SchemeclusterOptions.MapperProvider = options.MapperProviderclusterOptions.Logger = options.LoggerclusterOptions.SyncPeriod = options.SyncPeriodclusterOptions.Namespace = options.NamespaceclusterOptions.NewCache = options.NewCacheclusterOptions.NewClient = options.NewClientclusterOptions.ClientDisableCacheFor = options.ClientDisableCacheForclusterOptions.DryRunClient = options.DryRunClientclusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck})if err != nil {return nil, err}......return &controllerManager{stopProcedureEngaged: pointer.Int64(0),cluster: cluster,runnables: runnables,errChan: errChan,recorderProvider: recorderProvider,resourceLock: resourceLock,metricsListener: metricsListener,metricsExtraHandlers: metricsExtraHandlers,controllerOptions: options.Controller,logger: options.Logger,elected: make(chan struct{}),port: options.Port,host: options.Host,certDir: options.CertDir,tlsOpts: options.TLSOpts,webhookServer: options.WebhookServer,leaderElectionID: options.LeaderElectionID,leaseDuration: *options.LeaseDuration,renewDeadline: *options.RenewDeadline,retryPeriod: *options.RetryPeriod,healthProbeListener: healthProbeListener,readinessEndpointName: options.ReadinessEndpointName,livenessEndpointName: options.LivenessEndpointName,gracefulShutdownTimeout: *options.GracefulShutdownTimeout,internalProceduresStop: make(chan struct{}),leaderElectionStopped: make(chan struct{}),leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,}, nil
}
2.3 SetupWithManager 注册自定义的Reconciler
我们首先把Reconciler的接口定义贴上来,这是整个opeator中为数不多的需要自定义编码的位置,这里的reconcile是用户针对指定k8s资源的变动事件(增、删除、改)的自定义处理步骤,你可以理解为informer的eventHandler中的update、add、delete处理逻辑都放在 Reconcile(context.Context, Request) (Result, error)的方法实现中,由用户自己判断资源object处于哪一类事件的状态中,并执行相应的处理逻辑。
type Reconciler interface {Reconcile(context.Context, Request) (Result, error)
}
在main方法中的应用位置是:
if err = (&controller.CronJobReconciler{Client: mgr.GetClient(),Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "CronJob")os.Exit(1)
}
我们追踪SetupWithManager的代码逻辑,从NewControllerManagedBy到For,再到Owns都是Builder这个结构体的构建过程,这里不做展开,这里只介绍Complete的代码逻辑。
func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {.....// 核心逻辑return ctrl.NewControllerManagedBy(mgr).For(&batchv1.CronJob{}).Owns(&kbatch.Job{}).Complete(r)
}
跳过一些冗余的代码逻辑,我们快进到核心逻辑。在下面的Build方法中,主要执行了两块核心逻辑:
- 核心逻辑一:初始化controller
- 核心逻辑二:初始化watch逻辑
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {if r == nil {return nil, fmt.Errorf("must provide a non-nil Reconciler")}if blder.mgr == nil {return nil, fmt.Errorf("must provide a non-nil Manager")}if blder.forInput.err != nil {return nil, blder.forInput.err}// Set the ControllerManagedBy// 核心逻辑一:初始化controllerif err := blder.doController(r); err != nil {return nil, err}// Set the Watch// 核心逻辑二:初始化watch逻辑if err := blder.doWatch(); err != nil {return nil, err}return blder.ctrl, nil
}
我们沿着这两条链路逐条进行分析。
2.3.1 controller的初始化
func (blder *Builder) doController(r reconcile.Reconciler) error {...... 构建ctrlOptions// Build the controller and return.// 初始化controllerblder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)return err
}
继续深入newController看一下这个结构体是怎么初始化的,下面是核心实现代码,主要做了两件事情
- 1.初始化一个controller结构体
- 2.把controller添加到了controllerManage中
func New(name string, mgr manager.Manager, options Options) (Controller, error) {c, err := NewUnmanaged(name, mgr, options)if err != nil {return nil, err}// Add the controller as a Manager componentsreturn c, mgr.Add(c)
}
mgr.Add©这个逻辑比较简单,就是把新生成的controller添加到manager中的缓存结构体中,我们来看看NewUnmanaged中的controller
初始化字段有哪些:
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {// Create controller with dependencies setreturn &controller.Controller{// 这个就是自定义的ReconcilerDo: options.Reconciler,// 这个有一个队列,记住,后面要考MakeQueue: func() workqueue.RateLimitingInterface {return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{Name: name,})},MaxConcurrentReconciles: options.MaxConcurrentReconciles,CacheSyncTimeout: options.CacheSyncTimeout,Name: name,LogConstructor: options.LogConstructor,RecoverPanic: options.RecoverPanic,LeaderElected: options.NeedLeaderElection,}, nil
}
2.3.2 doWatch初始化watch逻辑
代码实现展示在下文,这里有两个问题没有解释,一个是
source.Kind这个结构体有什么作用,一个就是allPredicates这个实体也没有解释。我们先留个坑,目前只有定义,在实际调用的时候我们在进行解释。
func (blder *Builder) doWatch() error {// Reconcile typeif blder.forInput.object != nil {obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)if err != nil {return err}// 这一个是什么source?src := source.Kind(blder.mgr.GetCache(), obj)// 这里是事件处理的eventHandler结构体,定义了队列处理逻辑hdler := &handler.EnqueueRequestForObject{}allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)allPredicates = append(allPredicates, blder.forInput.predicates...)// 看起来这里是核心逻辑执行的位置if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {return err}}......后续逻辑大同小异
我们继续深入blder.ctrl.Watch的逻辑中
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {c.mu.Lock()defer c.mu.Unlock()if !c.Started {// 首次调用这个方法的时候进入到这个逻辑中,看起来只是把src、handler、predicate包装成一个机构体存储在watches中c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})return nil}c.LogConstructor(nil).Info("Starting EventSource", "source", src)return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
关于下面的src.Start,这个步骤虽然在我们的核心流程中没有执行,但是这个方法的实现似乎有助于我们理解source.Kind这个数据结构,我们就浪费点时间,进入到start中看一下:
- 启动了一个定时任务,循环检索献相应的informer是否已经被声明
- 往这个informer中添加handler的处理逻辑
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,prct ...predicate.Predicate) error {....省略无关逻辑go func() {var (i cache.InformerlastErr error)// Tries to get an informer until it returns true,// 这个步骤一直循环在寻找一个相应的informerif err := wait.PollUntilContextCancel(ctx, 10*time.Second, true, func(ctx context.Context) (bool, error) {// Lookup the Informer from the Cache and add an EventHandler which populates the Queuei, lastErr = ks.Cache.GetInformer(ctx, ks.Type)if lastErr != nil {kindMatchErr := &meta.NoKindMatchError{}switch {case errors.As(lastErr, &kindMatchErr):log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start","kind", kindMatchErr.GroupKind)case runtime.IsNotRegisteredError(lastErr):log.Error(lastErr, "kind must be registered to the Scheme")default:log.Error(lastErr, "failed to get informer from cache")}return false, nil // Retry.}return true, nil}); err != nil {if lastErr != nil {ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)return}ks.started <- errreturn}// 对informer添加handler处理逻辑_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())if err != nil {ks.started <- errreturn}if !ks.Cache.WaitForCacheSync(ctx) {// Would be great to return something more informative hereks.started <- errors.New("cache did not sync")}close(ks.started)}()return nil
}
看到这里,可能大家已经迷糊了,不要着急,我们一步一步的进行分析,因为不仅是大家迷糊,我也迷糊,这乱七八糟的是什么啊。首先看一下这个cache的真实实现是什么?
func New(cfg *rest.Config, opts Options) (Cache, error) {....配置项解析newCacheFunc := newCache(cfg, opts)var defaultCache Cacheif len(opts.DefaultNamespaces) > 0 {//这一步暂时忽略defaultConfig := optionDefaultsToConfig(&opts)defaultCache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, opts.DefaultNamespaces, &defaultConfig)} else {// 这一步就是构建的informerCache的实体类defaultCache = newCacheFunc(optionDefaultsToConfig(&opts), corev1.NamespaceAll)}if len(opts.ByObject) == 0 {return defaultCache, nil}// 分类将不同资源的informer对象保存在这个结构体中delegating := &delegatingByGVKCache{scheme: opts.Scheme,//这里的ByObject已经通过追踪是在初始化的options中的cache字段配置的caches: make(map[schema.GroupVersionKind]Cache, len(opts.ByObject)),defaultCache: defaultCache,}for obj, config := range opts.ByObject {gvk, err := apiutil.GVKForObject(obj, opts.Scheme)if err != nil {return nil, fmt.Errorf("failed to get GVK for type %T: %w", obj, err)}var cache Cacheif len(config.Namespaces) > 0 {cache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, config.Namespaces, nil)} else {cache = newCacheFunc(byObjectToConfig(config), corev1.NamespaceAll)}delegating.caches[gvk] = cache}return delegating, nil
}//这个方法就是构建一个informerCache的实体类
func newCache(restConfig *rest.Config, opts Options) newCacheFunc {return func(config Config, namespace string) Cache {return &informerCache{scheme: opts.Scheme,Informers: internal.NewInformers(restConfig, &internal.InformersOpts{HTTPClient: opts.HTTPClient,Scheme: opts.Scheme,Mapper: opts.Mapper,ResyncPeriod: *opts.SyncPeriod,Namespace: namespace,Selector: internal.Selector{Label: config.LabelSelector,Field: config.FieldSelector,},Transform: config.Transform,UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false),NewInformer: opts.newInformer,}),readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,}}
}
这一步骤看起来和sharedInformerFactory比较像,里面定义了一个map缓存,保存了GVK和对应的informer的单例实现。现在我们可以根据delegatingByGVKCache来寻找GetInformer的具体实现方法是什么了,从下面的三个方法来看,这一步骤主要是用来检索实体结构体对应的informerCache实体。
func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {// 这一步是从delegatingByGVKCache的map缓存中获取informerCache的结构体cache, err := dbt.cacheForObject(obj)if err != nil {return nil, err}// 这一步真正实现是在informerCache的方法中体现的return cache.GetInformer(ctx, obj, opts...)
}
func (dbt *delegatingByGVKCache) cacheForObject(o runtime.Object) (Cache, error) {gvk, err := apiutil.GVKForObject(o, dbt.scheme)if err != nil {return nil, err}gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")return dbt.cacheForGVK(gvk), nil
}func (dbt *delegatingByGVKCache) cacheForGVK(gvk schema.GroupVersionKind) Cache {if specific, hasSpecific := dbt.caches[gvk]; hasSpecific {return specific}return dbt.defaultCache
}func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {gvk, err := apiutil.GVKForObject(obj, ic.scheme)if err != nil {return nil, err}// 这一步有兴趣的读者可以深入了解,这里主要是进行数据的转换,转换的结果就是返回了一个sharedIndexedInformer_, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...))if err != nil {return nil, err}return i.Informer, nil
}
到这里,似乎我们就可以继续分析_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs()),这行的代码逻辑了。这里就和informer串起来了。
func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler {return &EventHandler{ctx: ctx,handler: handler,queue: queue,predicates: predicates,}
}
func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs {return cache.ResourceEventHandlerFuncs{AddFunc: e.OnAdd,UpdateFunc: e.OnUpdate,DeleteFunc: e.OnDelete,}
}
这三个处理逻辑是差不多的,我们看一下这里OnAdd这个方法的实现逻辑,以点代面去看一下具体的执行逻辑。
// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e *EventHandler) OnAdd(obj interface{}) {c := event.CreateEvent{}// Pull Object out of the objectif o, ok := obj.(client.Object); ok {c.Object = o} else {log.Error(nil, "OnAdd missing Object","object", obj, "type", fmt.Sprintf("%T", obj))return}//上面的内容索然无味,这里值得注意一下,先放直接放结论,之前我们没有分析predicates这个结构体,看起来这里是filer,用来过滤一些不关注的事件for _, p := range e.predicates {if !p.Create(c) {return}}// Invoke create handlerctx, cancel := context.WithCancel(e.ctx)defer cancel()// e.handler.Create(ctx, c, e.queue)
}
既然提到了,我们去找一下这个predicates的实现位置,他是在Builder结构体提供的一个WithEventFilter方法设置的,具体的使用方式的构建一个predicate.Predicate实体类,在这个实体类中定义不同事件类型的filter逻辑,可以过滤掉一些我们不关系的变更事件。
type Predicate interface {// Create returns true if the Create event should be processedCreate(event.CreateEvent) bool// Delete returns true if the Delete event should be processedDelete(event.DeleteEvent) bool// Update returns true if the Update event should be processedUpdate(event.UpdateEvent) bool// Generic returns true if the Generic event should be processedGeneric(event.GenericEvent) bool
}
我们继续追踪 e.handler.Create(ctx, c, e.queue)这个方法,看看里面发生了什么,这里的handler的真实实现位置是hdler := &handler.EnqueueRequestForObject{}。
// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {if evt.Object == nil {enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)return}//这里是往队列中添加了一个request事件q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.Object.GetName(),Namespace: evt.Object.GetNamespace(),}})
}
现在我们可以总结一下这里的source.start做了一些什么事情了。当事件的informer监听到资源发生变换时,会触发一个handler.EnqueueRequestForObject{}的事件处理逻辑,将这个事封装成reconcile.Request{}结构体放置到controller对应的限速队列中去。
2.4 mgr.Start(ctrl.SetupSignalHandler())
启动controllerManage。我们仅截取关键代码进行分析。
2.4.1 cluster初始化=informer初始化
....
// Start and wait for caches.
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {if err != nil {return fmt.Errorf("failed to start caches: %w", err)}
}
这里的cluster我们在上文中已经见识过他的初始化逻辑了。
return &cluster{config: originalConfig,httpClient: options.HTTPClient,scheme: options.Scheme,cache: cache,fieldIndexes: cache,client: clientWriter,apiReader: clientReader,recorderProvider: recorderProvider,mapper: mapper,logger: options.Logger,
}, nil
我们来看一下这里的cm.add做了些什么事情,他往cm的runnables这个结构体中添加了一个Runnable实体,Runnable接口中只包含了一个Start()方法。
func (cm *controllerManager) add(r Runnable) error {return cm.runnables.Add(r)
}
我们先看一下runnables里面的定义,看起来是对Runnable进行了分类存放。
type runnables struct {HTTPServers *runnableGroupWebhooks *runnableGroupCaches *runnableGroupLeaderElection *runnableGroupOthers *runnableGroup
}type runnableGroup struct {ctx context.Contextcancel context.CancelFuncstart sync.MutexstartOnce sync.Oncestarted boolstartQueue []*readyRunnablestartReadyCh chan *readyRunnablestop sync.RWMutexstopOnce sync.Oncestopped bool// errChan is the error channel passed by the caller// when the group is created.// All errors are forwarded to this channel once they occur.errChan chan error// ch is the internal channel where the runnables are read off from.ch chan *readyRunnable// wg is an internal sync.WaitGroup that allows us to properly stop// and wait for all the runnables to finish before returning.wg *sync.WaitGroup
}
我们看一下Add方法,印证了我们之前的猜测
func (r *runnables) Add(fn Runnable) error {switch runnable := fn.(type) {case *server:return r.HTTPServers.Add(fn, nil)case hasCache:return r.Caches.Add(fn, func(ctx context.Context) bool {return runnable.GetCache().WaitForCacheSync(ctx)})case webhook.Server:return r.Webhooks.Add(fn, nil)case LeaderElectionRunnable:if !runnable.NeedLeaderElection() {return r.Others.Add(fn, nil)}return r.LeaderElection.Add(fn, nil)default:return r.LeaderElection.Add(fn, nil)}
}
cluster属于hasCache的实现,我们看看他做了些什么事情。把runnable和runnale启动检验的逻辑包装到readyRunnable这个实体,然后做了两件事情:
- 如果没有启动,把runnable放到startQueue这个队列中
- 如果启动了,把runnable放到 r.ch这个channel通道中
接下来就是启动的步骤,我们看看start中做了些什么事情。
func (r *runnableGroup) Start(ctx context.Context) error {var retErr errorr.startOnce.Do(func() {defer close(r.startReadyCh)// Start the internal reconciler.// 启动reconcile,这里启动所有的自定义逻辑go r.reconcile()// Start the group and queue up all// the runnables that were added prior.r.start.Lock()r.started = true//还记得上文Add逻辑吗,这里是将startQueue中的readyRunnable实体塞到ch这个channel中for _, rn := range r.startQueue {rn.signalReady = truer.ch <- rn}r.start.Unlock()// If we don't have any queue, return.if len(r.startQueue) == 0 {return}// Wait for all runnables to signal.// 判断是否所有的runnables都已经启动for {select {case <-ctx.Done():if err := ctx.Err(); !errors.Is(err, context.Canceled) {retErr = err}// 这里判断readyRunnable是不是已经启动了,如果启动了就从startQueue中删除case rn := <-r.startReadyCh:for i, existing := range r.startQueue {if existing == rn {// Remove the item from the start queue.r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)break}}// We're done waiting if the queue is empty, return.if len(r.startQueue) == 0 {return}}}})return retErr
}
我们回过头来分析一下,r.reconcile()做了些什么事情
func (r *runnableGroup) reconcile() {for runnable := range r.ch {....// Start the runnable.go func(rn *readyRunnable) {go func() {//如果检查执行完毕后if rn.Check(r.ctx) {if rn.signalReady {//传入到startReadyCh的channel中来r.startReadyCh <- rn}}}().....// 执行start函数if err := rn.Start(r.ctx); err != nil {r.errChan <- err}}(runnable)}
}
这里cluster的start函数执行逻辑是
func (c *cluster) Start(ctx context.Context) error {defer c.recorderProvider.Stop(ctx)return c.cache.Start(ctx)
}
我们找到这个start的具体实现逻辑,追踪下去,我们就见到了核心的informer.Run(ip.ctx.Done()),这个就是原生的informer的用法。
2.4.2 自定义的的reconciler的启动
如果还记得SetUpWithManager的逻辑,我们知道自定义的reconciler被包装成了Controller实体放到了cm.runnables.LeaderElection这个分组中了,如果,我们继续追踪Start方法内的代码逻辑.
{ctx, cancel := context.WithCancel(context.Background())cm.leaderElectionCancel = cancelgo func() {// 如果没有抢占资源锁,就继续等待if cm.resourceLock != nil {if err := cm.startLeaderElection(ctx); err != nil {cm.errChan <- err}} else {// Treat not having leader election enabled the same as being elected.// 如果选主成功,执行核心启动逻辑if err := cm.startLeaderElectionRunnables(); err != nil {cm.errChan <- err}close(cm.elected)}}()
}
最后一步了,cm.startLeaderElectionRunnables(),我们追踪一下核心启动逻辑,与cluster是不是异曲同工。
func (r *runnableGroup) Start(ctx context.Context) error {var retErr errorr.startOnce.Do(func() {defer close(r.startReadyCh)// Start the internal reconciler.go r.reconcile()// Start the group and queue up all// the runnables that were added prior.r.start.Lock()r.started = truefor _, rn := range r.startQueue {rn.signalReady = truer.ch <- rn}r.start.Unlock()// If we don't have any queue, return.if len(r.startQueue) == 0 {return}// Wait for all runnables to signal.for {select {case <-ctx.Done():if err := ctx.Err(); !errors.Is(err, context.Canceled) {retErr = err}case rn := <-r.startReadyCh:for i, existing := range r.startQueue {if existing == rn {// Remove the item from the start queue.r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)break}}// We're done waiting if the queue is empty, return.if len(r.startQueue) == 0 {return}}}})return retErr
}
我们深入看一下关键是Controller的start方法是什么。
func (c *Controller) Start(ctx context.Context) error {.....c.Queue = c.MakeQueue()......err := func() error {defer c.mu.Unlock()// TODO(pwittrock): Reconsider HandleCrashdefer utilruntime.HandleCrash()// NB(directxman12): launch the sources *before* trying to wait for the// caches to sync so that they have a chance to register their intendeded// caches.for _, watch := range c.startWatches {c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))// 这个start是不是很眼熟,就是上文分析的Kind的start方法,主要从informer中监听事件放到queue中if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {return err}}......wg.Add(c.MaxConcurrentReconciles)for i := 0; i < c.MaxConcurrentReconciles; i++ {go func() {defer wg.Done()// Run a worker thread that just dequeues items, processes them, and marks them done.// It enforces that the reconcileHandler is never invoked concurrently with the same object.for c.processNextWorkItem(ctx) {}}()}....}()if err != nil {return err}.......
}
我们之前之说生产者是怎么往controller的队列中添加数据的,而没有说队列中的事件是怎么消费数据的,核心逻辑就在processNextWorkItem这个方法中,
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {.......// 这里的c.Reconcile就是controllerresult, err := c.Reconcile(ctx, req)switch {case err != nil:if errors.Is(err, reconcile.TerminalError(nil)) {ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()} else {c.Queue.AddRateLimited(req)}ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()if !result.IsZero() {log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler")}log.Error(err, "Reconciler error")case result.RequeueAfter > 0:log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))// The result.RequeueAfter request will be lost, if it is returned// along with a non-nil error. But this is intended as// We need to drive to stable reconcile loops before queuing due// to result.RequestAfterc.Queue.Forget(obj)c.Queue.AddAfter(req, result.RequeueAfter)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()case result.Requeue:log.V(5).Info("Reconcile done, requeueing")c.Queue.AddRateLimited(req)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()default:log.V(5).Info("Reconcile successful")// Finally, if no error occurs we Forget this item so it does not// get queued again until another change happens.c.Queue.Forget(obj)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()}
}
我们先分析c.Reconcile做了些什么事情,主要是调用了controller中的Do属性的Reconcile方法,而Do就是我们自定义的Reconcile实体属性。
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
......return c.Do.Reconcile(ctx, req)
}
现在回过头来分析c.reconcileHandler(ctx, obj).
switch {// 如果上一步执行有异常报错,1)忽略,添加通知 2)重新入队
case err != nil:if errors.Is(err, reconcile.TerminalError(nil)) {ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()} else {c.Queue.AddRateLimited(req)}.......// 如果返回的result有RequeueAfter这个属性字段,选择在一段时间后延迟入队
case result.RequeueAfter > 0:log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))// The result.RequeueAfter request will be lost, if it is returned// along with a non-nil error. But this is intended as// We need to drive to stable reconcile loops before queuing due// to result.RequestAfterc.Queue.Forget(obj)c.Queue.AddAfter(req, result.RequeueAfter)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
case result.Requeue:// 如果返回的result,直接入队log.V(5).Info("Reconcile done, requeueing")c.Queue.AddRateLimited(req)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
default:// 默认情况下,从队列中移除log.V(5).Info("Reconcile successful")// Finally, if no error occurs we Forget this item so it does not// get queued again until another change happens.c.Queue.Forget(obj)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
}
至此,controller-runtime的源码分析完毕.
遗留
workqueue.RateLimitingInterface这个队列的分析没写