几乎所有的Controller manager 和CRD Controller 都会使用Client-go 的Informer 函数,这样通过Watch 或者Get List 可以获取对应的Object,下面我们从源码分析角度来看一下Client go Informer 的机制。
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)controller := NewController(kubeClient, exampleClient,kubeInformerFactory.Apps().V1().Deployments(),exampleInformerFactory.Samplecontroller().V1alpha1().Foos())// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
kubeInformerFactory.Start(stopCh)
这里的例子是以https://github.com/kubernetes/sample-controller/blob/master/main.go
节选,主要以 k8s 默认的Deployment Informer
为例子。可以看到直接使用Client-go Informer 还是非常简单的,先不管NewCOntroller
函数里面执行了什么,顺着代码来看一下kubeInformerFactory.Start
都干了啥。
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()for informerType, informer := range f.informers {if !f.startedInformers[informerType] {go informer.Run(stopCh)f.startedInformers[informerType] = true}}
}
可以看到这里遍历了f.informers
,而informers
的定义我们来看一眼数据结构
type sharedInformerFactory struct {client kubernetes.Interfacenamespace stringtweakListOptions internalinterfaces.TweakListOptionsFunclock sync.MutexdefaultResync time.DurationcustomResync map[reflect.Type]time.Durationinformers map[reflect.Type]cache.SharedIndexInformer// startedInformers is used for tracking which informers have been started.// This allows Start() to be called multiple times safely.startedInformers map[reflect.Type]bool
}
我们这里的例子,在运行的时候,f.informers
里面含有的内容如下
type *v1.Deployment informer &{0xc000379fa0 <nil> 0xc00038ccb0 {} 0xc000379f80 0xc00033bb00 30000000000 30000000000 0x28e5ec8 false false {0 0} {0 0}}
也就是说,每一种k8s 类型都会有自己的Informer函数。下面我们来看一下这个函数是在哪里注册的,这里以Deployment Informer 为例。
首先回到刚开始初始化kubeClient
的代码,
controller := NewController(kubeClient, exampleClient,kubeInformerFactory.Apps().V1().Deployments(),exampleInformerFactory.Samplecontroller().V1alpha1().Foos())deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.handleObject,UpdateFunc: func(old, new interface{}) {newDepl := new.(*appsv1.Deployment)oldDepl := old.(*appsv1.Deployment)if newDepl.ResourceVersion == oldDepl.ResourceVersion {// Periodic resync will send update events for all known Deployments.// Two different versions of the same Deployment will always have different RVs.return}controller.handleObject(new)},DeleteFunc: controller.handleObject,})
注意这里的传参, kubeInformerFactory.Apps().V1().Deployments()
, 这句话的意思就是指创建一个只关注Deployment 的Informer.
controller := &Controller{kubeclientset: kubeclientset,sampleclientset: sampleclientset,deploymentsLister: deploymentInformer.Lister(),deploymentsSynced: deploymentInformer.Informer().HasSynced,foosLister: fooInformer.Lister(),foosSynced: fooInformer.Informer().HasSynced,workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),recorder: recorder,}
deploymentInformer.Lister()
这里就是初始化了一个Deployment Lister,下面来看一下Lister函数里面做了什么。
// NewFilteredDeploymentInformer constructs a new informer for Deployment type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).List(options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).Watch(options)},},&appsv1.Deployment{},resyncPeriod,indexers,)
}func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}func (f *deploymentInformer) Lister() v1.DeploymentLister {return v1.NewDeploymentLister(f.Informer().GetIndexer())
}
注意这里的Lister
函数,它调用了Informer
,然后触发了f.factory.InformerFor
,
这就最终调用了sharedInformerFactory InformerFor
函数,
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {f.lock.Lock()defer f.lock.Unlock()informerType := reflect.TypeOf(obj)informer, exists := f.informers[informerType]if exists {return informer}resyncPeriod, exists := f.customResync[informerType]if !exists {resyncPeriod = f.defaultResync}informer = newFunc(f.client, resyncPeriod)f.informers[informerType] = informerreturn informer
}
这里可以看到,informer = newFunc(f.client, resyncPeriod)
这句话最终完成了对于informer的创建,并且注册到了Struct object中,完成了前面我们的问题。
下面我们再回到informer start
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()for informerType, informer := range f.informers {if !f.startedInformers[informerType] {go informer.Run(stopCh)f.startedInformers[informerType] = true}}
}
这里可以看到,它会遍历所有的informer,然后选择异步调用Informer 的RUN方法。我们来全局看一下Run方法
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)cfg := &Config{Queue: fifo,ListerWatcher: s.listerWatcher,ObjectType: s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError: false,ShouldResync: s.processor.shouldResync,Process: s.HandleDeltas,}func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.controller = New(cfg)s.controller.(*controller).clock = s.clocks.started = true}()// Separate stop channel because Processor should be stopped strictly after controllerprocessorStopCh := make(chan struct{})var wg wait.Groupdefer wg.Wait() // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stopwg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)wg.StartWithChannel(processorStopCh, s.processor.run)defer func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.stopped = true // Don't want any new listeners}()s.controller.Run(stopCh)
}
首先它根据得到的 key 拆分函数和Store index 创建一个FIFO队列,这个队列是一个先进先出的队列,主要用来保存对象的各种事件。
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {f := &DeltaFIFO{items: map[string]Deltas{},queue: []string{},keyFunc: keyFunc,knownObjects: knownObjects,}f.cond.L = &f.lockreturn f
}
可以看到这个队列创建的比较简单,就是使用 Map 来存放数据,String 数组来存放队列的 Key。
后面根据client 创建的List 和Watch 函数,还有队列创建了一个 config,下面将根据这个config 来初始化controller. 这个controller是client-go 的Cache controller ,主要用来控制从 APIServer 获得的对象的 cache 以及更新对象。
下面主要关注这个函数调用
wg.StartWithChannel(processorStopCh, s.processor.run)
这里进行了真正的Listering 调用。
func (p *sharedProcessor) run(stopCh <-chan struct{}) {func() {p.listenersLock.RLock()defer p.listenersLock.RUnlock()for _, listener := range p.listeners {p.wg.Start(listener.run)p.wg.Start(listener.pop)}p.listenersStarted = true}()<-stopChp.listenersLock.RLock()defer p.listenersLock.RUnlock()for _, listener := range p.listeners {close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop}p.wg.Wait() // Wait for all .pop() and .run() to stop
}
主要看 run 方法,还记得前面已经把ADD UPDATE DELETE 注册了自定义的处理函数了吗。这里就实现了前面函数的触发
func (p *processorListener) run() {// this call blocks until the channel is closed. When a panic happens during the notification// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)// the next notification will be attempted. This is usually better than the alternative of never// delivering again.stopCh := make(chan struct{})wait.Until(func() {// this gives us a few quick retries before a long pause and then a few more quick retrieserr := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {for next := range p.nextCh {switch notification := next.(type) {case updateNotification:p.handler.OnUpdate(notification.oldObj, notification.newObj)case addNotification:p.handler.OnAdd(notification.newObj)case deleteNotification:p.handler.OnDelete(notification.oldObj)default:utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))}}// the only way to get here is if the p.nextCh is empty and closedreturn true, nil})// the only way to get here is if the p.nextCh is empty and closedif err == nil {close(stopCh)}}, 1*time.Minute, stopCh)
}
可以看到当p.nexhCh channel
接收到一个对象进入的时候,就会根据通知类型的不同,选择对应的用户注册函数去调用。那么这个channel 谁来向其中传入参数呢
func (p *processorListener) pop() {defer utilruntime.HandleCrash()defer close(p.nextCh) // Tell .run() to stopvar nextCh chan<- interface{}var notification interface{}for {select {case nextCh <- notification:// Notification dispatchedvar ok boolnotification, ok = p.pendingNotifications.ReadOne()if !ok { // Nothing to popnextCh = nil // Disable this select case}case notificationToAdd, ok := <-p.addCh:if !ok {return}if notification == nil { // No notification to pop (and pendingNotifications is empty)// Optimize the case - skip adding to pendingNotificationsnotification = notificationToAddnextCh = p.nextCh} else { // There is already a notification waiting to be dispatchedp.pendingNotifications.WriteOne(notificationToAdd)}}}
}
答案就是这个pop
函数,这里会从p.addCh
中读取增加的通知,然后转给p.nexhCh
并且保证每个通知只会读取一次。
下面就是最终的Controller run 函数,我们来看看到底干了什么
// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {<-stopChc.config.Queue.Close()}()r := NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)r.ShouldResync = c.config.ShouldResyncr.clock = c.clockc.reflectorMutex.Lock()c.reflector = rc.reflectorMutex.Unlock()var wg wait.Groupdefer wg.Wait()wg.StartWithChannel(stopCh, r.Run)wait.Until(c.processLoop, time.Second, stopCh)
}
这里主要的就是wg.StartWithChannel(stopCh, r.Run)
,
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)wait.Until(func() {if err := r.ListAndWatch(stopCh); err != nil {utilruntime.HandleError(err)}}, r.period, stopCh)
}
这里就调用了r.ListAndWatch
方法,这个方法比较复杂,我们慢慢来看。
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {start := r.clock.Now()eventCount := 0// Stopping the watcher should be idempotent and if we return from this function there's no way// we're coming back in with the same watch interface.defer w.Stop()// update metricsdefer func() {r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))r.metrics.watchDuration.Observe(time.Since(start).Seconds())}()loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return errcase event, ok := <-w.ResultChan():if !ok {break loop}if event.Type == watch.Error {return apierrs.FromObject(event.Object)}if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))continue}meta, err := meta.Accessor(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))continue}newResourceVersion := meta.GetResourceVersion()switch event.Type {case watch.Added:err := r.store.Add(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Modified:err := r.store.Update(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Deleted:// TODO: Will any consumers need access to the "last known// state", which is passed in event.Object? If so, may need// to change this.err := r.store.Delete(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))}default:utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))}*resourceVersion = newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)eventCount++}}watchDuration := r.clock.Now().Sub(start)if watchDuration < 1*time.Second && eventCount == 0 {r.metrics.numberOfShortWatches.Inc()return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)}klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)return nil
}
这里就是真正调用watch 方法,根据返回的watch 事件,将其放入到前面创建的 FIFO 队列中。
最终调用了controller 的POP 方法
// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == FIFOClosedError {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.c.config.Queue.AddIfNotPresent(obj)}}}
}
前面是将 watch 到的对象加入到队列中,这里的goroutine 就是用来消费的。具体的消费函数就是前面创建的Process 函数
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Added, Updated:isSync := d.Type == Syncs.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err}s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {if err := s.indexer.Add(d.Object); err != nil {return err}s.processor.distribute(addNotification{newObj: d.Object}, isSync)}case Deleted:if err := s.indexer.Delete(d.Object); err != nil {return err}s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil
}
这个函数就是根据传进来的obj,先从自己的cache 中取一下,看是否存在,如果存在就代表是Update ,那么更新自己的队列后,调用用户注册的Update 函数,如果不存在,就调用用户的 Add 函数。
到此Client-go 的Informer 流程源码分析基本完毕。
原文链接
本文为云栖社区原创内容,未经允许不得转载。