client-go中ListAndWatch机制,informer源码详解

文章首发地址: 学一下 (suxueit.com)icon-default.png?t=N7T8https://suxueit.com/article_detail/s9UMb44BWZdDRfKqFv22

先上一张,不知道是那个大佬画的图

图片

简单描述一下流程

client-go封装部分

以pod为例

  1. 、先List所有的Pod资源,然后通过已经获取的pod资源的最大版本去发起watch请求,watch持续接收api-server的事件推送,

  2. 将所有的pod写入到queue

  3. 从队列中取出pod

  4. 4和5将 取出的pod缓存到本地

  5. 调用用户自定义的资源处理函数【AddEventHandler】

用户自定义部分

  1. 将事件写入,自定义的工作队列

  2. 遍历队列,取出资源key

  3. 用key从缓存取出对应资源,进行逻辑处理

阅读完成后续部分,你会发现上面的流程是有一点问题的

list后会立刻写入队列,然后再发起watch,并将监控的事件入队

informer入口分析

通常我们写controller都会初始化一个informer,然后lister对应资源,或者给资源添加的hook点

// 开始运行informer
kubeInformerFactory.Start(stopCh)
//
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()
​// 这里为什么是一个 数组?for informerType, informer := range f.informers {if !f.startedInformers[informerType] {// informer入口go informer.Run(stopCh)f.startedInformers[informerType] = true}}
}

面试问题

一个informer可以监听多个资源吗?

不能: 我们在使用时是看似是通过定义的一个informer客户端去监听多个资源【该informer不是实际意义上的informer,而是一个工厂函数】,实际上,该informer每监听一个资源会生成一个informer并存入工厂informer数组中,启动时再分别调用【goruntine】

因为 一个informers是可以listAndWatch多种资源的 当你调用 kubeInformerFactory.Core().V1().Pods().Lister() kubeInformerFactory.Core().V1().ConfigMaps().Lister() 会分别给 pods和configmap的资源类型生成一个informer

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {f.lock.Lock()defer f.lock.Unlock()
​informerType := reflect.TypeOf(obj)informer, exists := f.informers[informerType]if exists {return informer}
​resyncPeriod, exists := f.customResync[informerType]if !exists {resyncPeriod = f.defaultResync}
​informer = newFunc(f.client, resyncPeriod)// 通过类型将 资源的informer进行存储f.informers[informerType] = informer
​return informer
}

sharedIndexInformer分析

主要结构
type sharedIndexInformer struct {indexer    Indexercontroller Controller// 封装多个事件消费者的处理逻辑,client端通过AddEventHandler接口加入到事件消费Listener列表中processor             *sharedProcessorcacheMutationDetector MutationDetectorlisterWatcher ListerWatcherobjectType runtime.Objectstarted, stopped bool....
}
  • indexer: 本地缓存,底层的实现是threadSafeMap

  • controller: 内部调用Reflector进行ListAndWatch, 然后将事件发送给自定义事件消费者【往上获取apiserver事件,往下发送事件给定义的消费者】

  • processor: 封装多个事件消费者的处理逻辑,client端通过AddEventHandler接口加入到事件消费Listener列表中

    kubeLabelInformer.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.enqueuePodFn,UpdateFunc: func(old, new interface{}) {newPod := new.(*covev1.Pod)oldPod := old.(*covev1.Pod)
    ​if newPod.ResourceVersion == oldPod.ResourceVersion {return}controller.enqueuePodFn(new)},DeleteFunc: controller.enqueuePodFn,})
    //  AddEventHandler 主要内容
    // handler 就是注册的函数listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)// 不能重复加入,所有判断是否已经开始了if !s.started {s.processor.addListener(listener)return}
  • listerWatcher:实现从apiserver进行ListAndWatch的对象,发起watch请求,将server推送的事件传入本地channel,等待消费

  • objectType: 该informer监听的资源类型,例如 Pods

informer.run都干了什么
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()
​// 定义了 DeltaFIFO 队列fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KnownObjects:          s.indexer,EmitDeltaTypeReplaced: true,})
​cfg := &Config{Queue: fifo,// listand watch 的接入口ListerWatcher:    s.listerWatcher,ObjectType:       s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError:     false,ShouldResync:     s.processor.shouldResync,
​// Process 是将事件发送给本地注册的事件处理函数的入口Process:           s.HandleDeltas,WatchErrorHandler: s.watchErrorHandler,}
​func() {// 这里为什么要加锁呢?// 猜测: 可能是防止有人不规范使用 informer,在多个goruntine中启动Start,导致多次初始化s.startedLock.Lock()defer s.startedLock.Unlock()
​s.controller = New(cfg)s.controller.(*controller).clock = s.clocks.started = true}()
​// Separate stop channel because Processor should be stopped strictly after controllerprocessorStopCh := make(chan struct{})var wg wait.Groupdefer wg.Wait()              // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stop
​// 这里如果使用的是 kubebuild和代码生成,默认使用的是 defaultCacheMutationDetectorwg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)// 运行 sharedProcessorwg.StartWithChannel(processorStopCh, s.processor.run)
​defer func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.stopped = true // Don't want any new listeners}()s.controller.Run(stopCh)
}

这里主要讲解一下

wg.StartWithChannel(processorStopCh, s.processor.run)

记得我们上面分析了,processor: 封装多个事件消费者的处理逻辑,client端通过AddEventHandler接口加入到事件消费Listener列表中,这里就开始运行Listeners

运行两个函数:

for _, listener := range p.listeners {// 内部会定时运行 1 秒运行一次去获取p.wg.Start(listener.run)p.wg.Start(listener.pop)
}
  • listener.run 从channel【nextCh】中读取数据,然后去触发注册的函数

    图片

  • 将数据从channel【addch】发送到 nextCh 【后面还会有将事件发送到channel【addCh】的操作】

    图片

controller分析

func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()r := NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)....// 运行 Reflector ,进行listAndWatch// 先进行list,将list的数据存入 队列,并存入队列自带的缓存item【map结构】// 然后watch// 对服务端推送的事件进行解码后,会将事件存入 queue【包括queue和item】// 详细见: 后面的reflector分析wg.StartWithChannel(stopCh, r.Run)
​// 循环执行processLoop// 内部调用 //err := process(item) process就是 HandleDeltas// 执行 HandleDeltas// HandleDeltas这里做两件事// 1,将数据存到 本地缓存,也就是  ThreadSafeStore【实际开发中就可以通过: lister直接获取】// 2、只是将事件通过distribute 函数发送到了一个channel【Addch】wait.Until(c.processLoop, time.Second, stopCh)wg.Wait()
}

processLoop——》c.config.Queue.Pop——》HandleDeltas

Pop函数

id := f.queue[0]f.queue = f.queue[1:]if f.initialPopulationCount > 0 {f.initialPopulationCount--}// 获取对象item, ok := f.items[id]if !ok {// This should never happenklog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)continue}// 删除items中的缓存delete(f.items, id)
​// 调用process,前面的 sharedIndexInformer.HandleDeltas,// 将事件发送到本地注册的处理函数err := process(item)// 如果处理失败 从新加入队列if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}

HandleDeltas函数

加载失败

面试题

什么情况下资源对象会在DeltaFIFO存在,但是缓存中不存在【ThreadSafeStore】

答: 写入队列和写入缓存是有先后顺序的,事件到达后会先写入队列,再通过队列的Pop方法进行处理,写入缓存,

扩展: 但是在实际使用中并不影响,因为自定的代码中的队列是在写入缓存后才会有事件通知到我们注册的handler,这时才会添加事件进入我们定义的队列,并开始运行代码更新资源

因为:HandleDeltas函数和 Watch写入队列是异步的,而且肯定是等Watch写入队列后,才会调度HandleDeltas进行缓存写入所有这个中间会有延迟

会不会出现缓存中有,而队列中没有的情况?

答: 是的,确实会有这种情况

1、队列中的事件处理后就会被清理,所有总是会出现这种情况的

Reflector分析

reflector做三件事

  1. 启动的时候向apiserver发起List请求,获取所有监听的资源,放入 DeltaFIFO

  2. 进行resync,定期将item中的资源,重新同步到queue中

  3. watch资源,通过rest接口发起watch请求,并等待apiserver推送的数据,

type DeltaFIFO struct {// 缓存资源对象items map[string]Deltas// 采用slice作为队列queue []string// 基于下面两个参数可以判断资源是否同步完成// 只要添加数据就会设置为 truepopulated bool// 第一次镜像replace时会设置 为资源数量【List阶段同步数据到队列调用的是 DeltaFIFO的replace】// 调用Pop时会initialPopulationCount--,Pod时会调用HandleDeltas,将数据同步到自定义的队列中,第一批插入的数据都Pop完成后,initialPopulationCount==0.说明同步完成initialPopulationCount int
}

func (r *Reflector) Run(stopCh <-chan struct{}) {wait.BackoffUntil(func() {// 开始进行ListAndWatchif err := r.ListAndWatch(stopCh); err != nil {r.watchErrorHandler(r, err)}}, r.backoffManager, true, stopCh)
​
}

List阶段

这里没啥可说的,就是请求数据写入队列

// 发起List,这里采用了分页获取【如果设置了chunk】
list, paginatedResult, err = pager.List(context.Background(), options)
// 将数据写入队列
if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("unable to sync list result: %v", err)}
// 设置资源版本,防止后续网络断开需要重试的情况,可以从已经获取的版本开始获取
r.setLastSyncResourceVersion(resourceVersion)

Watch过程

  1. 指定资源版本通过rest请求apiserver进行Watch

  2. apiserver推送的数据会被Watch对象写入channel【result】

  3. 从Result这个channel中不断接收原生,将事件通过 switch 不同的类型调用不同的函数

第一阶段
options = metav1.ListOptions{// 该值会持续更新,如果网络异常导致 连续中断,则会从接收到的版本再次进行watchResourceVersion: resourceVersion,....
}
​
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
// 开始对资源进行watch, apiserver推送事件后,会将事件推送到一个 result的channel中,然后由后续的watchHandler进行处理
w, err := r.listerWatcher.Watch(options)
Watch对象的实现
retry := r.retryFn(r.maxRetries)
url := r.URL().String()
for {if err := retry.Before(ctx, r); err != nil {return nil, retry.WrapPreviousError(err)}// 构造请求req, err := r.newHTTPRequest(ctx)if err != nil {return nil, err}
​resp, err := client.Do(req)updateURLMetrics(ctx, r, resp, err)retry.After(ctx, r, resp, err)if err == nil && resp.StatusCode == http.StatusOK {// 返回流对象return r.newStreamWatcher(resp)}
}
流对象
// NewStreamWatcher creates a StreamWatcher from the given decoder.
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {sw := &StreamWatcher{source:   d,reporter: r,// It's easy for a consumer to add buffering via an extra// goroutine/channel, but impossible for them to remove it,// so nonbuffered is better.result: make(chan Event),// If the watcher is externally stopped there is no receiver anymore// and the send operations on the result channel, especially the// error reporting might block forever.// Therefore a dedicated stop channel is used to resolve this blocking.done: make(chan struct{}),}go sw.receive()return sw
}
sw.receive()

图片

从result这个channel获取数据,并调用对应的事件

会在这里循环读取数据,ResultChan()返回的就是 result 这个channel

图片

通过不同的事件类型,调用不同的队列方法 【store是前面定义的 DeltaFIFO】

同时还会将已经获取的 资源版本进行更新【这里传进来的是指针,所有更改后 外面会生效】

图片

reSync过程
// 这里进行重新 同步数据到队列中, 同步主要是为了 能够周期性的去触发我们自己写的代码更新资源状态go func() {resyncCh, cleanup := r.resyncChan()defer func() {cleanup() // Call the last one written into cleanup}()for {select {case <-resyncCh:case <-stopCh:returncase <-cancelCh:return}// 重新同步资源对象到队列中if r.ShouldResync == nil || r.ShouldResync() {klog.V(4).Infof("%s: forcing resync", r.name)if err := r.store.Resync(); err != nil {resyncerrc <- errreturn}}cleanup()// 是否可以进行同步: 这里是去重试时间开启了一个定时通知resyncCh, cleanup = r.resyncChan()}}()
​
// 进行重新同步
func (f *DeltaFIFO) Resync() error {f.lock.Lock()defer f.lock.Unlock()
​if f.knownObjects == nil {return nil}
​// fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{//      KnownObjects:          s.indexer,//      EmitDeltaTypeReplaced: true,//  })// 这里同步是去取 indexer里面的数据: indexer就是 threadSafeMapkeys := f.knownObjects.ListKeys()for _, k := range keys {if err := f.syncKeyLocked(k); err != nil {return err}}return nil
}

面试题

网络异常,会不会导致事件丢失?

不会,网络异常只会影响到Watch,这中间发送的事件无法接收到,但是一旦网络恢复,重新开始Watch,客户端会维护一个已经接收事件的版本号,当网络恢复,会从这个版本号开始进行watch资源,

如果客户端重启呢,会不会丢失事件?

是的,客户端重启肯定是会丢失事件的,但是并不影响controller的运行,重启会重新获取全量的资源列表,这时能够获取到最新的版本,controller的目标就是将用户期望spec,实际进行应用,所有只需要应用最新版本即可

存的资源版本号,是每个资源对象都有存吗,例如pod资源,pod1的版本号和pod2的版本号?

不用,只用存储一个版本号即可,因为资源的的版本号是递增的,只用记录最后一个同步的版本即可,

ListAndWatch中的watch过程,是每一个资源都有一个watch吗?

:不是,client-go采用的是采用的区间watch【同时watch满足条件的一批资源】,所以只需要一个watch请求,

扩展:是一类对象一个watch,例如pod,configmap,这是一类对象

k8s采用了多路复用,可以将一个controller发起的watch请求,通过一个连接进行发送,可以降低api-server的连接数量

为什么要进行重新同步?

1、重新同步是为了controller能够定期的去更新对应的资源

2、controller在处理事件时,如果需要等待或者处理错误,通过重新同步可以再次触发更新

listAndWatch的流程?

1、发起List请求,获取所有需要Watch的资源 2、将这些资源写入队列

3、定期的中缓存中取出 资源对象,写入队列

4、对资源进行Watch,如果有事件会同步到队列

5、从队列中取出资源,写入到缓存

6、调用用户定义的handler处理事件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/768594.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQLiteC/C++接口详细介绍sqlite3_stmt类(八)

返回&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;SQLiteC/C接口详细介绍sqlite3_stmt类&#xff08;七&#xff09; 下一篇&#xff1a; SQLiteC/C接口详细介绍sqlite3_stmt类&#xff08;九&#xff09; 27、sqlite3_column_int 函数 sqlite3_column_int 用于返…

0.2W超精密金属膜电阻器-陶瓷封装-塑封

高精度欧姆值&#xff0c;温度系数低 长期稳定性&#xff0c;无感设计 UPSC 欧姆范围&#xff1a;40 Ω 至 5 MΩ UPR 欧姆范围&#xff1a;10 Ω 至 5 MΩ 温度系数&#xff1a;2 ppm/C 至 25 ppm/C 符合 RoHS 规范 由EE1/10(RE55)成品高精密金属膜电阻器选配组装而成&am…

java数据结构与算法刷题-----LeetCode75. 颜色分类

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 1. 双指针两次遍历2. 三指针 1. 双指针两次遍历 解题思路&#…

以行动激发消费活力,加多宝引领高品质消费浪潮

2024年“315”期间&#xff0c;加多宝携手全国多地市场监督管理局、消费者协会等单位&#xff0c;围绕今年“激发消费活力”主题&#xff0c;积极配合各地相关政府部门开展系列宣传活动&#xff0c;以实际行动呼吁切实保护消费者合法权益&#xff0c;共建诚信消费环境&#xff…

Python综合实战案例-数据清洗分析

写在前面&#xff1a; 本次是根据前文讲解的爬虫、数据清洗、分析进行的一个纵隔讲解案例&#xff0c;也是对自己这段时间python爬虫、数据分析方向的一个总结。 本例设计一个豆瓣读书数据⽂件&#xff0c;book.xlsx⽂件保存的是爬取豆瓣⽹站得到的图书数据&#xff0c;共 6067…

在fstab文件中配置UUID方式自动挂载数据盘、swap、目录(**)

linux如何挂在硬盘&#xff0c;自动挂载和手动挂载&#xff08;详细说明&#xff09;https://gitcode.csdn.net/65eedcea1a836825ed7a06f4.html 解决linux重启后磁盘挂载失效的问题 https://blog.csdn.net/sugarbliss/article/details/107033034 linux /etc/fstab 文件详细说…

Android Audio相关

AudioManager AudioService的Bp端&#xff0c;调用AudioManager>AudioService&#xff08;代码实现&#xff09; AudioService 继承自IAudioService.Stub&#xff0c;为Bn端 AudioSystem AudioService功能实现都依赖于AudioSystem&#xff0c;AudioService通过AudioSys…

Attention Is All You Need若如爱因斯坦的相对论,Transformer模型则堪称E=MC^2之等量公式

Transformer模型已经成为当前所有自然语言处理NLP的标配&#xff0c;如GPT&#xff0c;Bert&#xff0c;Sora&#xff0c;LLama&#xff0c;Grok等。假如《Attention Is All You Need》类比为爱因斯坦的侠义相对论&#xff0c;Transformer模型则堪称EMC^2之等量公式。 看过论文…

[BT]BUUCTF刷题第6天(3.24)

第6天 Web [极客大挑战 2019]PHP Payload&#xff1a; O:4:"Name":3:{s:14:"%00Name%00username";s:5:"admin";s:14:"%00Name%00password";s:3:"100";}这道题考点是网站源码备份文件泄露和PHP反序列化&#xff0c;有篇介…

SpringBoot Starter解析

conditional注解解析 介绍 基于条件的注解作用: 根据是否满足某一个特定条件决定是否创建某个特定的bean意义: Springboot实现自动配置的关键基础能力 常见的conditional注解 ConditionalOnBean: 当容器中存在某个Bean才会生效ConditionalOnMissingBean: 不存在某个Bean才会…

管理自由,体验简单,使用安全 | 详解威联通全套多用户多权限管理方案【附TS-466C产品介绍】

管理自由&#xff0c;体验简单&#xff0c;使用安全 | 详解威联通全套多用户多权限管理方案【附TS-466C产品介绍】 哈喽小伙伴们好&#xff0c;我是Stark-C~。今天我们来解决一个之前评论区多次被提及的问题--多用户权限管理。 对于我们NAS用户来说&#xff0c;基本都会面临这…

docker 本地机 互通文件

查询容器name 查询容器Id 进行传输

QTabWidget的tabbar不同方向显示 文字方向设置 图标跟随变化 实现方式 qt控件绘制原理

先来看结果图&#xff1a;&#xff08;参考博客&#xff1a;QTabWidget中tab页文本水平或垂直设置_pyqt tab_widget.settabposition(qtabwidget.west) 字体-CSDN博客&#xff09; 从图中可知&#xff0c;"普通"是qt自己的样式&#xff0c;但是很明显&#xff0c;在垂…

最新Java面试题5【2024初级】

互联网大厂面试题 1&#xff1a;阿里巴巴Java面试题 2&#xff1a;阿里云Java面试题-实习生岗 3&#xff1a;腾讯Java面试题-高级 4&#xff1a;字节跳动Java面试题 5&#xff1a;字节跳动Java面试题-大数据方向 6&#xff1a;百度Java面试题 7&#xff1a;蚂蚁金服Java…

Excel打开CSV文件中文乱码问题

Excel的数据导入功能 直接用Excel打开下载的CSV文件&#xff0c;会看到汉字乱码&#xff0c;数字显示正常。如下图所示现象。 请先正常打开一份空白的excel文件&#xff0c;将鼠标定位在第一行第一列&#xff0c;这边鼠标定位的位置将决定后续打开的csv文件在excel中展示的位置…

【Python从入门到进阶】51、电影天堂网站多页面下载实战

接上篇《50、当当网Scrapy项目实战&#xff08;三&#xff09;》 上一篇我们讲解了使用Scrapy框架在当当网抓取多页书籍数据的效果&#xff0c;本篇我们来抓取电影天堂网站的数据&#xff0c;同样采用Scrapy框架多页面下载的模式来实现。 一、抓取需求 打开电影天堂网站&…

C语言之strsep用法实例(八十六)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

C++实现FFmpeg音视频实时拉流并播放

1.准备工作: 下载rtsp流媒体服务器rtsp-simple-server,安装go开发环境并编译 编译好后启动流媒体服务器 准备一个要推流的mp4视频文件,如db.mp4 使用ffmpeg开始推流 推流命令: ffmpeg -re -stream_loop -1 -i db.mp4 -c copy -rtsp_transport tcp -f rtsp rtsp://192.168.16…

Soot 安装和简单使用

目录 前言 一、Soot 的下载和安装 1.1 在命令行中使用 Soot 1.2 在项目中使用 Soot 二、使用 Soot 生成中间代码 (IR) 三、使用 Soot 进行 Java 类插桩 四、使用 Soot 生成控制流图 (CFG) 4.1 按语句划分的控制流程图 4.2 按基本块划分的控制流程图 五、Graphviz 工具…

Docker jupyter 容器中添加matplotlib 中文支持

本教程基于 jupyter/datascience-notebook&#xff0c;适用其他容器。 # 查看所有 Docker 容器 docker ps -a # 进入已经运行的 Jupyter 容器 docker exec -it CONTAINER_ID bash 本例中CONTAINER_ID为2e # 切换到 matplotlib 的字体目录&#xff08;find / -name "…