k8s informer的list-watch机制剖析
1、list-watch场景:
client-go中的reflector模块首先会list apiserver获取某个资源的全量信息,然后根据list到的rv来watch资源的增量信息。希望使用client-go编写的控制器组件在与apiserver发生连接异常时,尽量的re-watch资源而不是re-list
2、list-watch主要做的三件事:
informer的list-watch逻辑主要做三个事情:
-
1、List部分逻辑:设置分页参数;执行list方法;将list结果同步进DeltaFIFO队列中,其实是调用store中的Replace方法。
-
2、定时同步:定时同步以协程的方式运行,使用定时器实现定期同步,Store中的Resync操作。
-
3、Watch部分逻辑:在for循环里;执行watch函数获取resultchan;监听resultchan中数据并处理;
过程细节剖析:
-
1、第一次list资源会设置资源版本号为空,旧版会设为0,拉完后就更新资源版本,后面watch的时候只要关心比这个资源版本大的资源。list的时候会把ListWatch对象包裹在pager对象里 ,这个对象的作用是控制分页查询,比如资源对象太多时,为了防止过大的网络IO,pager可以通过控制url的limit和continue参数来指定一次请求获取的资源数量。
-
2、watch的时候会开启一个死循环,ListerWatcher会返回要一个watch对象及其内部的一条channel,没有数据时则一直阻塞监听channel,只要有新资源变化就会停止阻塞,然后就根据事件类型往DeltaFIFO里面更新数据,最后会更新最新资源版本。
-
3、每次向apiserver发起watch请求,如果大概8分钟内都没有任何事件,则apiserver会主动断开连接,断开连接则会关闭watch对象的channel ,Reflector监听channel结束,然后会再次构建watch对象并发起watch请求。
-
4、ListAndWatch()会被Run()调用。Run()里面把ListAndWatch()包裹在了一个重试函数wait.Until()里面,ListAndWatch()正常情况下是死循环,一旦ListAndWatch()发送错误就会返回,wait.Until()在指定时间后又会重新执行ListAndWatch() 。这一步也叫所谓的ReList。再一次list资源时会尝试传入一个上次list到或最新watch到的资源版本,但并不保证可以成功list,比如watch到的Pod的资源版本和PodList的资源版本没有任何关联,Pod的更新不代表PodList的更新,这里只是尝试一下而已,如果list失败了就把url参数resourceVersion置为空,这样就能拉最新的列表。
概括:
- 通过list机制来获取全量资源,然后使用那个resourceversion并通过watch模式来增量更新,后续每次watch到新的变化后除了更新cache,还会更新resourceversion,并用新的resourceversion去watch
3、list-watch源码剖析:
// ListAndWatch 函数首先列出所有的对象,并在调用的时候获得资源版本,然后使用该资源版本来进行 watch 操作。
// 如果 ListAndWatch 没有初始化 watch 成功就会返回错误。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)var resourceVersion stringoptions := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}// 1.List部分逻辑:设置分页参数;执行list方法;将list结果同步进DeltaFIFO队列中;if err := func() error {initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})defer initTrace.LogIfLong(10 * time.Second)var list runtime.Objectvar paginatedResult boolvar err errorlistCh := make(chan struct{}, 1)panicCh := make(chan interface{}, 1)go func() {defer func() {if r := recover(); r != nil {panicCh <- r}}()// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first// list request will return the full response.// 如果listerWatcher支持,则尝试以块的形式收集列表,如果不支持,则收集第一个列表请求将返回完整响应pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {return r.listerWatcher.List(opts)}))switch {case r.WatchListPageSize != 0:pager.PageSize = r.WatchListPageSizecase r.paginatedResult:// We got a paginated result initially. Assume this resource and server honor// paging requests (i.e. watch cache is probably disabled) and leave the default// pager size set.case options.ResourceVersion != "" && options.ResourceVersion != "0":// User didn't explicitly request pagination.//// With ResourceVersion != "", we have a possibility to list from watch cache,// but we do that (for ResourceVersion != "0") only if Limit is unset.// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly// switch off pagination to force listing from watch cache (if enabled).// With the existing semantic of RV (result is at least as fresh as provided RV),// this is correct and doesn't lead to going back in time.//// We also don't turn off pagination for ResourceVersion="0", since watch cache// is ignoring Limit in that case anyway, and if watch cache is not enabled// we don't introduce regression.pager.PageSize = 0}// 如果过期或者不合法 resourceversion 则进行重试list, paginatedResult, err = pager.List(context.Background(), options)if isExpiredError(err) || isTooLargeResourceVersionError(err) {r.setIsLastSyncResourceVersionUnavailable(true)// Retry immediately if the resource version used to list is unavailable.// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on// continuation pages, but the pager might not be enabled, the full list might fail because the// resource version it is listing at is expired or the cache may not yet be synced to the provided// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure// the reflector makes forward progress.list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})}close(listCh)}()select {case <-stopCh:return nilcase r := <-panicCh:panic(r)case <-listCh:}if err != nil {return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)}// We check if the list was paginated and if so set the paginatedResult based on that.// However, we want to do that only for the initial list (which is the only case// when we set ResourceVersion="0"). The reasoning behind it is that later, in some// situations we may force listing directly from etcd (by setting ResourceVersion="")// which will return paginated result, even if watch cache is enabled. However, in// that case, we still want to prefer sending requests to watch cache if possible.//// Paginated result returned for request with ResourceVersion="0" mean that watch// cache is disabled and there are a lot of objects of a given type. In such case,// there is no need to prefer listing from watch cache.if options.ResourceVersion == "0" && paginatedResult {r.paginatedResult = true}r.setIsLastSyncResourceVersionUnavailable(false) // list was successfulinitTrace.Step("Objects listed")// listMetaInterface, err := meta.ListAccessor(list)if err != nil {return fmt.Errorf("unable to understand list result %#v: %v", list, err)}// 获取资源版本号resourceVersion = listMetaInterface.GetResourceVersion()initTrace.Step("Resource version extracted")// 将资源对象转换为资源列表,讲runtime.Object 对象转换为[]runtime.Object对象items, err := meta.ExtractList(list)if err != nil {return fmt.Errorf("unable to understand list result %#v (%v)", list, err)}initTrace.Step("Objects extracted")// 将资源对象列表中的资源和版本号存储在store中if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("unable to sync list result: %v", err)}initTrace.Step("SyncWith done")// 更新resourceVersion r.setLastSyncResourceVersion(resourceVersion)initTrace.Step("Resource version updated")return nil}(); err != nil {return err}// 2.定时同步:定时同步以协程的方式运行,使用定时器实现定期同步resyncerrc := make(chan error, 1)cancelCh := make(chan struct{})defer close(cancelCh)go func() {resyncCh, cleanup := r.resyncChan()defer func() {cleanup() // Call the last one written into cleanup}()for {select {case <-resyncCh:case <-stopCh:returncase <-cancelCh:return}// 如果ShouldResync 为nil或者调用返回true,则执行Store中的Resync操作if r.ShouldResync == nil || r.ShouldResync() {klog.V(4).Infof("%s: forcing resync", r.name)// 将indexer的数据和deltafifo进行同步if err := r.store.Resync(); err != nil {resyncerrc <- errreturn}}cleanup()resyncCh, cleanup = r.resyncChan()}}()// 3.在for循环里;执行watch函数获取resultchan;监听resultchan中数据并处理;for {// give the stopCh a chance to stop the loop, even in case of continue statements further down on errorsselect {case <-stopCh:return nildefault:}timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))options = metav1.ListOptions{ResourceVersion: resourceVersion,// We want to avoid situations of hanging watchers. Stop any wachers that do not// receive any events within the timeout window.TimeoutSeconds: &timeoutSeconds,// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.// Reflector doesn't assume bookmarks are returned at all (if the server do not support// watch bookmarks, it will ignore this field).AllowWatchBookmarks: true,}// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sentstart := r.clock.Now()w, err := r.listerWatcher.Watch(options)if err != nil {// If this is "connection refused" error, it means that most likely apiserver is not responsive.// It doesn't make sense to re-list all objects because most likely we will be able to restart// watch where we ended.// If that's the case begin exponentially backing off and resend watch request.// 如果这是“连接被拒绝”错误,则意味着 apiserver 很可能没有响应。// 重新列出所有对象是没有意义的,因为我们很可能能够重新启动// 看我们结束的地方。// 如果是这种情况,开始指数级后退并重新发送监视请求if utilnet.IsConnectionRefused(err) {<-r.initConnBackoffManager.Backoff().C()continue}return err}if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {if err != errorStopRequested {switch {case isExpiredError(err):// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already// has a semantic that it returns data at least as fresh as provided RV.// So first try to LIST with setting RV to resource version of last observed object.klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)default:klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)}}return nil}}
}4.4 LastSyncResourceVersion:获取上一次同步的资源版本func (r *Reflector) LastSyncResourceVersion() string {r.lastSyncResourceVersionMutex.RLock()defer r.lastSyncResourceVersionMutex.RUnlock()return r.lastSyncResourceVersion
}4.5 resyncChan:返回一个定时通道和清理函数,清理函数就是停止计时器。这边的定时重新同步是使用定时器实现的。func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {if r.resyncPeriod == 0 {return neverExitWatch, func() bool { return false }}// The cleanup function is required: imagine the scenario where watches// always fail so we end up listing frequently. Then, if we don't// manually stop the timer, we could end up with many timers active// concurrently.t := r.clock.NewTimer(r.resyncPeriod)return t.C(), t.Stop
}
4.6 syncWith:将从apiserver list的资源对象结果同步进DeltaFIFO队列中,调用队列的Replace方法实现。func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {found := make([]interface{}, 0, len(items))for _, item := range items {found = append(found, item)}return r.store.Replace(found, resourceVersion)
}4.7 watchHandler:watch的处理:接收watch的接口作为参数,watch接口对外方法是Stop和Resultchan,前者关闭结果通道,后者获取通道。func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {eventCount := 0// Stopping the watcher should be idempotent and if we return from this function there's no way// we're coming back in with the same watch interface.defer w.Stop()loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return errcase event, ok := <-w.ResultChan():if !ok {break loop}if event.Type == watch.Error {return apierrors.FromObject(event.Object)}if r.expectedType != nil {if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))continue}}// 判断期待的类型和监听到的事件类型是否一致if r.expectedGVK != nil {if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))continue}}// 获取事件对象meta, err := meta.Accessor(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))continue}newResourceVersion := meta.GetResourceVersion()// 对事件类型进行判断,并进行对应操作switch event.Type {case watch.Added:err := r.store.Add(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Modified:err := r.store.Update(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Deleted:// TODO: Will any consumers need access to the "last known// state", which is passed in event.Object? If so, may need// to change this.err := r.store.Delete(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))}case watch.Bookmark:// 表示监听已在此处同步,只需更新// A `Bookmark` means watch has synced here, just update the resourceVersiondefault:utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))}*resourceVersion = newResourceVersion// 更新 resource version 版本, 下次使用该 resourceVersion 来 watch 监听.r.setLastSyncResourceVersion(newResourceVersion)if rvu, ok := r.store.(ResourceVersionUpdater); ok {rvu.UpdateResourceVersion(newResourceVersion)}eventCount++}}watchDuration := r.clock.Since(start)// 如果 watch 退出小于 一秒, 另外一条事件也没拿到, 则打条错误日志if watchDuration < 1*time.Second && eventCount == 0 {return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)}klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)return nil
}4.8 relistResourceVersion:relistResourceVersion 函数获得反射器 relist 的资源版本,如果资源版本非 0,
则表示根据资源版本号继续获取,当传输过程中遇到网络故障或者其他原因导致中断,下次再连接时,会根据资源版本号继续传输未完成的部分。
可以使本地缓存中的数据与Etcd集群中的数据保持一致,该函数实现如下所示:// 如果最后一次relist的结果是HTTP 410(Gone)状态码,则返回"",这样relist将通过quorum读取etcd中可用的最新资源版本。
// 返回使用 lastSyncResourceVersion,这样反射器就不会使用在relist结果或watch事件中watch到的资源版本更老的资源版本进行relist了
// 当 r.lastSyncResourceVersion 为 "" 时这里为 "0",当使用 r.lastSyncResourceVersion 失败时这里为 ""
// 区别是 "" 会直接请求到 etcd,获取一个最新的版本,而 "0" 访问的是 cache
// 第一次使用0,出错了使用"",否则用lastSyncResourceVersion
// 注意:第一次不会直接全量list etcd,是全量list apiserver
func (r *Reflector) relistResourceVersion() string {r.lastSyncResourceVersionMutex.RLock()defer r.lastSyncResourceVersionMutex.RUnlock()if r.isLastSyncResourceVersionUnavailable {// 因为反射器会进行分页List请求,如果 lastSyncResourceVersion 过期了,所有的分页列表请求就都会跳过 watch 缓存// 所以设置 ResourceVersion="",然后再次 List,重新建立反射器到最新的可用资源版本,从 etcd 中读取,保持一致性。return ""}if r.lastSyncResourceVersion == "" {// 反射器执行的初始 List 操作的时候使用0作为资源版本。return "0"}return r.lastSyncResourceVersion
}4.9 setLastSyncResourceVersion:用于存储已被Reflector处理的最新资源对象的ResourceVersion,r.setLastSyncResourceVersion方法用于更新该值。
lastSyncResourceVersion属性为Reflector struct的一个属性,
func (r *Reflector) setLastSyncResourceVersion(v string) {r.lastSyncResourceVersionMutex.Lock()defer r.lastSyncResourceVersionMutex.Unlock()r.lastSyncResourceVersion = v
}// setIsLastSyncResourceVersionUnavailable 设置是否返回具有lastSyncResourceVersion 的最后一个列表或监视请求“过期”或“资源版本太大”错误。
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {r.lastSyncResourceVersionMutex.Lock()defer r.lastSyncResourceVersionMutex.Unlock()r.isLastSyncResourceVersionUnavailable = isUnavailable
}