深入源码分析kubernetes informer机制(二)Reflector


[阅读指南]
这是该系列第二篇
基于kubernetes 1.27 stage版本
为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。


文章目录

  • Reflector是什么
  • 整体结构
  • 工作流程
    • list拉取数据
    • 缓存resync操作
    • watch监听操作
  • 总结

Reflector是什么

reflector在informer中就像是一个对外的窗口,它与api-server建立连接,监听和获取来自api-server的资源变化信息,并把这些信息放进deltaFIFO中,交给下一个环节处理。

整体结构

与api-server进行交互,通过list获取指定的全量资源,watch监听指定的资源变化事件,并将这些事件放入delta FIFO队列中。
结构与交互如下图

// 省略了部分字段,只留下我们关注的
type Reflector struct {// name identifies this reflector. By default it will be a file:line if possible.name string// reflector对象需要监控的资源类型,比如上一节workqueue中的&v1.Pod{}expectedType reflect.Type// deltaFIFO 队列存储对象store Store// 实现list/watchlisterWatcher ListerWatcher// 上次更新的资源版本号,用来判断当前的node的资源状况lastSyncResourceVersion string......
}

工作流程

reflecter主函数比较简单,循环同步运行ListAndWatch直到收到stop信号。

func (r *Reflector) Run(stopCh <-chan struct{}) {wait.BackoffUntil(func() {if err := r.ListAndWatch(stopCh); err != nil {r.watchErrorHandler(r, err)}}, r.backoffManager, true, stopCh)
}

ListAndWatch主要做了这几件事:

  1. 通过stream或者chunk方式拉取全量list数据
  2. 开启一个协程进行缓存resync操作。
  3. 循环执行watch监听操作
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {...fallbackToList := !r.UseWatchList// stream式同步if r.UseWatchList {w, err = r.watchList(stopCh)...if err != nil {...fallbackToList = truew = nil}}// chunk式同步if fallbackToList {err = r.list(stopCh)if err != nil {return err}}...go r.startResync(stopCh, cancelCh, resyncerrc)return r.watch(w, stopCh, resyncerrc)
}

接下来咱一步步来看。

list拉取数据

ListAndWatch拉取全量数据时,出现了两种数据拉取的方式,list /watchstream list /watch

stream list是 kubernetes 1.27 引入的新方案,通过 ENABLE_CLIENT_GO_WATCH_LIST_ALPHA 变量可以启用stream list,默认会使用原有的list/watch。后续会单独开一篇介绍stream list方案,详情可以通过KEP-3157了解

前者在初始化时list拉取全量数据,通过watch更新增量变化。
后者可以通过watch 请求的方式获取list数据,从而减轻大规模集群初始化list数据时的资源消耗。
在建立watch连接时,携带如下两个参数即可告知服务器使用streaming list进行一致性读取。
sendInitialEvents=true
resourceVersionMatch=NotOlderThan

常规的list流程借用这个博主画的时序图来看下。
在这里插入图片描述

缓存resync操作

resync负责定期将本地的缓存重新加入deltaFIFO队列,确保本地缓存与controller的数据一致性。

国内太多博客没了解清楚就介绍这一部分是与api-server交互,进行relist。实际上resync完全没有涉及到服务端的部分,他就是一个本地缓存的同步机制。与服务端的交互使用list/watch已经完全可以确保资源一致性了,基本不怎么需要进行relist操作,并且对于节点非常多的大集群来说,list非常消耗资源,何况是定期relist呢。

关于resync机制的介绍,不在这里展开,详细看下一篇笔记。

watch监听操作

watch的实现非常巧妙,它利用了http的chunk编码传输机制建立长连接,来实现动态的数据监听,可以了解分块传输编码。
同样借用一张时序图来看下watch的流程
在这里插入图片描述

reflector通过Watcher监听api-server端的数据delta事件,并将这些事件放入deltaFIFO中统一处理。

// 在这里向服务端发起watch请求,并接收和处理资源变更事件
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {...for {...// w == nil表示使用常规的list/watch方式,streaming 方式会创建特殊的watcherif w == nil {timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))options := metav1.ListOptions{// 上次同步的资源版本,也就是本地的资源版本。以此来获取增量的数据ResourceVersion: r.LastSyncResourceVersion(),// watch 超时时间,长时间没有接受任务事件的watcher会被关掉,避免长时间挂起。TimeoutSeconds: &timeoutSeconds,// watch书签,避免watch重启时请求api-server导致的消耗。AllowWatchBookmarks: true,}// 创建一个watch对象,监听api-server的资源变更事件,将接收到的事件丢进resultChan中w, err = r.listerWatcher.Watch(options)...}// 将resultChan中的取出放入FIFO 队列err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)// 失败重试逻辑...}
}

建立连接的逻辑在这一行
w, err = r.listerWatcher.Watch(options)

还是用上一篇workqueue来看看这个Watch实例的实现。
从Watch函数一路往上追溯,可以看到先是与server建立了http连接,再通过watch标记建立了watch连接,创建stream watcher对象,并拉起一个协程去处理监听到的事件信息。

  • 此后所有监听的delta事件都会经过receive协程进入到resultChan中。
// reflector调用的watch函数
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {return lw.WatchFunc(options)
}// watchFunc函数的定义
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {...watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {options.Watch = true // 向服务端请求chunk连接optionsModifier(&options)return c.Get().Namespace(namespace).Resource(resource).VersionedParams(&options, metav1.ParameterCodec).Watch(context.TODO()) // 这里调用了getter的watch函数// getter是controller初始化时建立的http客户端: clientset.CoreV1().RESTClient()}return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {...	url := r.URL().String()for {req, err := r.newHTTPRequest(ctx)resp, err := client.Do(req)if err == nil && resp.StatusCode == http.StatusOK {return r.newStreamWatcher(resp)}...}
}func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {sw := &StreamWatcher{source:   d,reporter: r,result: make(chan Event),done: make(chan struct{}),}go sw.receive() // 处理消息事件的协程return sw
}// 解析接收到的事件,并放到resultChan中等待后续处理。
func (sw *StreamWatcher) receive() {for {// 解析数据action, obj, err := sw.source.Decode()select {case <-sw.done:return// 将事件发送到resultChancase sw.result <- Event{Type:   action,Object: obj,}:}}
}
  • 进入resultChan的事件,由watchHandler取出再分类添加到FIFO队列中。
func watchHandler(start time.Time,w watch.Interface,	// watch实例store Store, // 存储对象 比如delta FIFO queue...
) error {...
loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return err// 从ResultChan中取出变更事件,并放进队列中,比如delta FIFO队列中case event, ok := <-w.ResultChan():// 省略了一些资源过滤和错误处理...// 解析监听到的事件数据meta, err := meta.Accessor(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))continue}// 解析资源事件的版本resourceVersion := meta.GetResourceVersion()switch event.Type {case watch.Added: err := store.Add(event.Object) // 往队列添加add delta事件... // err handlecase watch.Modified: err := store.Update(event.Object) // 往队列添加update delta事件... // err handlecase watch.Deleted:	err := store.Delete(event.Object) // 往队列添加delete delta事件,在此之前会判断事件对应的资源对象是否存在... // err handlecase watch.Bookmark:...default:... // err handle}// 更新resourceVersion版本号,下一轮watch就不会再收到重复的更新事件setLastSyncResourceVersion(resourceVersion)if rvu, ok := store.(ResourceVersionUpdater); ok {rvu.UpdateResourceVersion(resourceVersion)}...}}...return nil
}

总结

用一个图来回顾下reflector各个模块的关系~
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/39743.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RocketMQ双主双从同步集群部署

&#x1f388; 作者&#xff1a;互联网-小啊宇 &#x1f388; 简介&#xff1a; CSDN 运维领域创作者、阿里云专家博主。目前从事 Kubernetes运维相关工作&#xff0c;擅长Linux系统运维、开源监控软件维护、Kubernetes容器技术、CI/CD持续集成、自动化运维、开源软件部署维护…

学习笔记十九:Pod常见的状态和重启策略

Pod常见的状态和重启策略 常见的pod状态第一阶段&#xff1a;第二阶段&#xff1a;扩展&#xff1a; pod重启策略测试Always重启策略正常停止容器内的tomcat服务非正常停止容器里的tomcat服务 测试never重启策略正常停止容器里的tomcat服务非正常停止容器里的tomcat服务 测试On…

Mac安装opencv后无法导入cv2的解决方法

前提条件&#xff1a;以下两个插件安装成功 pip install opencv-python pip install --user opencv-contrib-python 注&#xff1a;直接用pip install opencv-contrib-python如果报错&#xff0c;就加上“–user" 第一步&#xff1a; 设置–添加python解释器 第二步&am…

go语言恶意代码检测系统--对接前端可视化与算法检测部分

Malware Detect System 1 产品介绍 恶意代码检测系统。 2 产品描述 2.1 产品功能 功能点详细描述注册账号未注册用户注册成为产品用户&#xff0c;从而具备享有产品各项服务的资格登录账号用户登录产品&#xff0c;获得产品提供的各项服务上传恶意样本用户可以将上传自己的…

uniapp微信小程序消息订阅快速上手

一、微信公众平台小程序开通消息订阅并设置模板 这边的模板id和详细内容后续前后端需要使用 二、uniapp前端 需要是一个button触发 js&#xff1a; wx.getSetting({success(res){console.log(res)if(res.authSetting[scope.subscribeMessage]){// 业务逻辑}else{uni.request…

智安网络|深入比较:Sass系统与源码系统的差异及选择指南

随着前端开发的快速发展&#xff0c;开发人员需要使用更高效和灵活的工具来处理样式表。在这个领域&#xff0c;Sass系统和源码系统是两个备受关注的选项。 Sass系统 Sass&#xff08;Syntactically Awesome Style Sheets&#xff09;是一种CSS预处理器&#xff0c;它扩展了CS…

@Param详解

文章目录 背景什么是ParamParam的使用方法使用方法&#xff1a;遇到的问题及因Param解决了什么问题使用与不使用对比 Param是如何进行映射的总结 背景 最近在开发过程中&#xff0c;在写mapper接口是在参数前加了Param注解&#xff0c;但是在运行的时候就会报错&#xff0c;说…

更多openEuler镜像加入AWS Marketplace!

自2023年7月openEuler 22.03 LTS SP1正式登陆AWS Marketplace后&#xff0c;openEuler社区一直持续于在AWS上提供更多版本。 目前&#xff0c;openEuler22.03 LTS SP1 ,SP2两个版本及 x86 arm64两种架构的四个镜像均可通过AWS对外提供&#xff0c;且在亚太及欧洲15个Region开放…

wkhtmltopdf 与 .Net Core

wkhtmltopdf 是使用webkit引擎转化为pdf的开源小插件. 其有.NET CORE版本的组件,DinkToPdf,但该控件对跨平台支持有限。 故打算在Linux上安装相关插件直接调用. 准备工作 虚拟机&#xff1a;Linux version 3.10.0-1160.el7.x86_64 wkhtmltox开发包&#xff1a;wkhtmltox_0.12…

大数据Flink(六十):Flink 数据流和分层 API介绍

文章目录 Flink 数据流和分层 API介绍 一、​​​​​​​​​​​​​​Flink 数据流

ZooKeeper的应用场景(命名服务、分布式协调通知)

3 命名服务 命名服务(NameService)也是分布式系统中比较常见的一类场景&#xff0c;在《Java网络高级编程》一书中提到&#xff0c;命名服务是分布式系统最基本的公共服务之一。在分布式系统中&#xff0c;被命名的实体通常可以是集群中的机器、提供的服务地址或远程对象等一这…

iOS申请证书(.p12)和描述文件(.mobileprovision)

打包app时&#xff0c;经常会用到ios证书&#xff0c;但很多人都苦于没有苹果电脑&#xff0c;即使有苹果电脑的&#xff0c;也会觉得苹果电脑操作也很麻烦&#xff0c;这里记录一下&#xff0c;用香蕉云编&#xff0c;申请证书及描述文件的过程。 香蕉云编的地址&#xff1a;…

【C语言】每日一题(多数元素)

多数元素&#xff0c;链接奉上 方法 1.摩尔投票2.合理但错误的方法2.1暴力循环2.2排序求出中间元素中间元素 1.摩尔投票 先来简单的介绍摩尔投票&#xff1a; 摩尔投票是一种用来解决绝对众数问题的算法。 什么是绝对众数呢&#xff1f; 在一个集合中&#xff0c;如果一个元素…

每天一练:SpringBoot连接mq

目录 每天一练:Springboot连接rabbitmq 每天一练:Springboot连接rabbitmq 目录一、部署Rabbitmq&#xff1f;二、增加maven依赖三、连接RabbitMq四、发布和订阅消息总结 一、部署Rabbitmq&#xff1f; 这里rabbitmq采用docker安装部署。 拉取docker镜像 [root192 ~]# docker…

【ChatGLM】ChatGLM-6B模型Win+4GB显卡本地部署笔记

ChatGLM-6B是清华大学知识工程和数据挖掘小组发布的一个类似ChatGPT的开源对话机器人&#xff0c;由于该模型是经过约1T标识符的中英文训练&#xff0c;且大部分都是中文&#xff0c;因此十分适合国内使用。 预期环境 本机电脑备注&#xff1a; Win10专业版 32G内存256固态系统…

SAP动态安全库存简介

动态安全库存:跑需求计划时,ERP系统按设置的库存方式自动计算出满足一定时间内可保障生产的库存数量 SAP动态安全库存的计算公式:动态安全库存=平均日需求*覆盖范围。 平均日需求=特定时期内的总需求/特定时期内的工作天数 覆盖范围指在没又货物供应的情况下,库存可以维…

稀疏感知图像和体数据恢复的系统对象研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

STM32 F103C8T6学习笔记6:IIC通信__驱动MPU6050 6轴运动处理组件—一阶互补滤波

今日主要学习一款倾角传感器——MPU6050,往后对单片机原理基础讲的会比较少&#xff0c;更倾向于简单粗暴地贴代码&#xff0c;因为经过前些日子对MSP432的学习&#xff0c;对原理方面也有些熟络了&#xff0c;除了在新接触它时会对其引脚、时钟、总线等进行仔细一些的研究之外…

ATF(TF-A)安全通告 TFV-5 (CVE-2017-15031)

安全之安全(security)博客目录导读 ATF(TF-A)安全通告汇总 目录 一、ATF(TF-A)安全通告 TFV-5 (CVE-2017-15031) 二、CVE-2017-15031 一、ATF(TF-A)安全通告 TFV-5 (CVE-2017-15031) Title 未初始化或保存/恢复PMCR_EL0可能会泄露安全世界的时间信息 CVE ID CVE-2017-1503…

spark的standalone 分布式搭建

一、环境准备 集群环境hadoop11&#xff0c;hadoop12 &#xff0c;hadoop13 安装 zookeeper 和 HDFS 1、启动zookeeper -- 启动zookeeper(11,12,13都需要启动) xcall.sh zkServer.sh start -- 或者 zk.sh start -- xcall.sh 和zk.sh都是自己写的脚本-- 查看进程 jps -- 有…