深入源码分析kubernetes informer机制(四)DeltaFIFO


[阅读指南]
这是该系列第四篇
基于kubernetes 1.27 stage版本
为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。


文章目录

  • client-go中的存储结构
  • DeltaFIFO
    • delta
    • 索引 key
    • queue push操作
      • delta push 去重
    • queue pop操作
  • 总结

client-go中的存储结构

如下图,clinet-go中定义了存储类型接口store,用来提供存储对象的基本能力。

queue继承了store接口,并提供了队列的能力,队列中可以保存需要增删改的存储对象的key,它会取出队头元素,调用PopProcessFunc处理。

queue的实现有两个:FIFOdeltaFIFO

deltaFIFO的不同点在于,deltaFIFO队列中,key对应的不是对象本身,而是对象的delta。

另外deltaFIFO除了通过add、update、delete添加元素,还有两种特殊的方式:replaced和sync。replaced一般发生在资源版本更新时,而sync由resync定时发起。

DeltaFIFO

下面是deltaFIFO数据结构的定义

type DeltaFIFO struct {// 并发读写锁lock sync.RWMutexcond sync.Cond// `items` maps a key to a Deltas.// 资源对象的key与对应的delta数组,每个数组至少都会有一个deltaitems map[string]Deltas// 按照FIFO队列顺序存储key,用来给pop()消费。// 该数组不会有重复值,并且所有元素都一定在items中queue []string// 生成key值的函数,默认是 MetaNamespaceKeyFunckeyFunc KeyFunc// 本地缓存中已知的所有资源对象的keyknownObjects KeyListerGetter......
}

delta

如前面所说,deltaFIFO中key映射的不是对象本身,是delta数组。

根据Delta数据结构的定义,delta包含了一个资源对象的变更类型及变更的内容。这里的Object不一定是完整的资源数据,大部分场景下只会有变更的部分信息。

type Delta struct {Type   DeltaTypeObject interface{}
}type DeltaType string
const (Added   DeltaType = "Added"Updated DeltaType = "Updated"Deleted DeltaType = "Deleted"Replaced DeltaType = "Replaced"Sync DeltaType = "Sync"
)

举个栗子,本地已经有了一个pod对象,

&Pod{Name:      "mypod",Namespace: "default",Labels:    map[string]string{"app": "web", "version": "0.0.1"},
}

此时mypod的 lable从web变成了app-server,reflector就会创建一个这样的delta对象放入FIFO队列中。

&Delta{Type:   "Updated",Object: &Pod{Name:      "mypod",Namespace: "default",Labels:    map[string]string{"app": "app-server"},},
}

索引 key

deltaFIFO队列中,存储的是delta的key值,通过key值可以在items map中获取到对应的delta对象。

这个key值在初始化FIFO时通过KeyFunction进行定义,使用者没有指定时,都会使用自带的命名函数 MetaNamespaceKeyFunc 进行命名,命名规则是

  • namespace不为空,key为/
  • namespace为空,key为

这里的name是在yaml资源配置中的matadata.name,比如上面的mypod。在同一个资源下,name在所有api version都一定是唯一的。

func MetaNamespaceKeyFunc(obj interface{}) (string, error) {if key, ok := obj.(ExplicitKey); ok {return string(key), nil}meta, err := meta.Accessor(obj)if len(meta.GetNamespace()) > 0 {return meta.GetNamespace() + "/" + meta.GetName(), nil}return meta.GetName(), nil
}

queue push操作

watcher监控的资源变更时,会调用deltaFIFO中Added、Updated、Deleted、Replaced、Sync方法,最终它们都会通过queueActionLocked 方法往deltaFIFO队列中加入对应类型的delta对象。

queueActionLocked 也就是deltaFIFO的入队操作。

和一般的入队不同的是,新加入的delta不是直接加入到队尾,队列queue数组中保存的是delta的key。所以入队的操作是这样的

  1. 获取delta对应的key值(还记得keyfunc吗,又是它)
  2. 如果delta所属的资源key已经在队列中,直接将delta添加到key对应到deltas数组末尾。更新已存在的资源delta并不会影响他的key在队列中的位置。
  3. 如果delta所属的资源key不在队列中,就将key添加到队列末尾,并在items中关联key和delta
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)// 自定义的转换函数。可以在delta事件被处理之前完成一些预处理// 常见的用法是用来过滤一些处理程序不关注的资源对象、以及处理数据格式等if f.transformer != nil {obj, err = f.transformer(obj)}// 将新的delta放入资源key对应的delta数组末尾// 如果原本的key不存在,就是创建了一个新的数组,并将新的delta放入其中oldDeltas := f.items[id]newDeltas := append(oldDeltas, Delta{actionType, obj})// 对delta数组中的delta去重newDeltas = dedupDeltas(newDeltas)// 判断key是否已经在队列中,并且更新key对应的delta数组if len(newDeltas) > 0 {if _, exists := f.items[id]; !exists {f.queue = append(f.queue, id)}f.items[id] = newDeltasf.cond.Broadcast()}return nil
}

delta push 去重

上一节提到,delta进行push操作时,会对加入的delta进行去重。去重逻辑目前只针对两个delete类型的delta有效:当delta数组中倒数第一个和第二个delta都是delete类型时,将会去掉其中一个

func dedupDeltas(deltas Deltas) Deltas {n := len(deltas)if n < 2 {return deltas}a := &deltas[n-1]b := &deltas[n-2]if out := isDup(a, b); out != nil {deltas[n-2] = *outreturn deltas[:n-1]}return deltas
}// 判断a、b两个delta是否重复
// 目前暂时只有两个delete类型的delta会被判定为重复。
func isDup(a, b *Delta) *Delta {if out := isDeletionDup(a, b); out != nil {return out}return nil
}// 判定两个delta是否都是deleted类型
func isDeletionDup(a, b *Delta) *Delta {if b.Type != Deleted || a.Type != Deleted {return nil}if _, ok := b.Object.(DeletedFinalStateUnknown); ok {return a}return b
}

举个小小的例子来回顾一下delta push操作。假设queue中有3个pod对象,对应了不同的变更事件,如下所示。

此时watcher监听到资源发生变化:

  1. pod2收到了updated事件
  2. pod1收到了deleted事件
  3. pod3收到了deleted事件

于是,三个delta入队成功后的队列图如下

pod1已有一个deleted事件,再次收到deleted后,经过dedupDeltas去重,最终只保留一个deleted。

pod3虽然有两个deleted事件,但是他们并不是连续的事件,不会被去重

queue pop操作

deltaFIFO出队的操作和普通的队列出队类似,从队头取出一个资源对象key,并删除items中key对应的deltas数组。

pop出队时,会调用传参PopProcessFunc对出队元素进行处理。

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()defer f.lock.Unlock()for {for len(f.queue) == 0 {// 队列为空时阻塞if f.closed {return nil, ErrFIFOClosed}f.cond.Wait()}// 取出队首的资源对象keyid := f.queue[0]f.queue = f.queue[1:]// 获取key对应的deltas数组item, ok := f.items[id]// 执行pop处理函数,处理delta事件,如果处理失败了,资源对象会被重新加入到队列中。// 但是如果队列中存在相同的对象,资源对象会被丢弃。err := process(item, isInInitialList)if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}return item, err}
}

这里一开始有个小疑问,如果资源的delta处理失败了,并且队列中又出现了同样的资源key,这部分delta数据不就丢失了吗?

但是仔细看出队入队公用一个锁,pop处理对象时不会有新的对象入队,所以理论上不会出现在addIfNotPresent时,key是persent的情况。而deltaFIFO入队的逻辑,也不会存在一个队列有两个相同的key的情况,所以不会有丢失的问题,addIfNotPresent应该只是加多一层保障。如果理解有问题,欢迎大佬们指正。

回顾一下pop的调用方processLoop,调用pop时传入PopProcessFunc(c.config.Process))。

系列第一篇介绍informer时提到过,c.config.Process最终调用的是processDeltas函数,它包含了数据同步到存储,以及调用注册的用户函数两个操作。

func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {...}}
}// 数据处理函数
func processDeltas(handler ResourceEventHandler,clientState Store,deltas Deltas,isInInitialList bool,
) error {// from oldest to newestfor _, d := range deltas {obj := d.Object// 区分事件类型进行处理switch d.Type {case Sync, Replaced, Added, Updated:// 同步存储数据if old, exists, err := clientState.Get(obj); err == nil && exists {if err := clientState.Update(obj); err != nil {return err}// 回调用户函数handler.OnUpdate(old, obj)} else {// 同步存储数据if err := clientState.Add(obj); err != nil {return err}// 回调用户函数handler.OnAdd(obj, isInInitialList)}case Deleted:// 同步存储数据if err := clientState.Delete(obj); err != nil {return err}// 回调用户函数handler.OnDelete(obj)}}return nil
}

总结

还是用上一节的例子,小结回顾一下整体的流程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/38961.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式

本文主要介绍设计模式的主要设计原则和常用设计模式。 一、UML画图 1.类图 2.时序图 二、设计模式原则 1.单一职责原则 就是一个方法、一个类只做一件事&#xff1b; 2.开闭原则 就是软件的设计应该对拓展开放&#xff0c;对修改关闭&#xff0c;这在java中体现最明显的就…

Docker本地镜像发布到阿里云

1. 本地镜像发布到阿里云 2. 镜像的生成方法 OPTIONS说明&#xff1a; -a :提交的镜像作者&#xff1b; -m :提交时的说明文字&#xff1b; 本次案例centosubuntu两个&#xff0c;当堂讲解一个&#xff0c;家庭作业一个&#xff0c;请大家务必动手&#xff0c;亲自实操。 docke…

Gradio部署应用到服务器不能正常访问

用Gradio部署一个基于ChatGLM-6B的应用&#xff0c;发布到团队的服务器上&#xff08;局域网&#xff0c;公网不能访问&#xff09;&#xff0c;我将gradio应用发布到服务器的9001端口 import gradio as gr with gr.Blocks() as demo:......demo.queue().launch(server_port90…

ad+硬件每日学习十个知识点(34)23.8.14 (DCDC详细设计,续流二极管的选择,COMP引脚的环路设计)

文章目录 1.二极管的rrm电压和rms电压有什么不同2.DCDC续流二极管的选择3.充电电容4.COMP引脚的环路设计5.DCDC设计总结6.多路并联7.相位匹配8.工作模式9.低温输出偏离10.电源负载与效率11.降压升压模块 1.二极管的rrm电压和rms电压有什么不同 答&#xff1a; 二极管的 RRM &a…

redis主从复制、哨兵服务、持久化、数据类型

Top NSD DBA DAY10 案例1&#xff1a;配置主从复制案例2&#xff1a;配置带验证的主从复制案例3&#xff1a;哨兵服务案例4&#xff1a;使用RDB文件恢复数据案例5&#xff1a;AOF案例6&#xff1a;字符类型案例7&#xff1a;列表类型案例8&#xff1a;散列类型案例9&#xff…

Linux交叉编译opencv并移植ARM端

Linux交叉编译opencv并移植ARM端 - 知乎 一、安装交叉编译器 目标平台为arm7l&#xff0c;此为32位ARM架构&#xff0c;要安装合适的编译器 sudo apt install arm-linux-gnueabihf-gcc sudo apt install arm-linux-gnueabihf-g注意&#xff1a;64位ARM架构的编译器与32位ARM架…

【MyBatis】查询数据库

目录 一、什么是MyBatis 二、MyBatis框架的搭建 1、搭建MyBatis框架 2、设置MyBaits项目的配置 三、使用MyBatis完成数据库的操作 1、MyBatis程序中sql语句的即时执行和预编译 1.1、即时执行&#xff08;${}&#xff09; 1.2、预编译&#xff08;#{}&#xff09; 1.3、即…

tomcat设置PermSize

最近tomcat老是报错,查看了日志出现PermGen 内存不够用,重启tomcat后查询使用情况 通过启动参数发现没有设置 PermGen,继续通过jmap查看 jmap -heap 21179 发现99%已使用,而且默认是30.5M,太小了,这里设置成256M 1. 创建setenv.sh文件 在/usr/local/tomcat/bin目录下创建一个…

解锁编程的新契机:深入探讨Kotlin Symbol Processor (KSP)的编写

解锁编程的新契机&#xff1a;深入探讨Kotlin Symbol Processor (KSP)的编写 1. 引言 随着软件开发领域的不断发展&#xff0c;新的工具和技术不断涌现&#xff0c;以满足开发者在构建高效、可维护和创新性的代码方面的需求。Kotlin Symbol Processor&#xff08;KSP&#xf…

从零开始,快速打造租车服务小程序的分享

随着移动互联网的发展&#xff0c;小程序成为了企业推广和服务的重要手段之一。租车服务行业也不例外&#xff0c;通过打造一款租车服务小程序&#xff0c;企业可以更好地与用户进行互动和交流&#xff0c;提供更方便快捷的租车服务。本文将介绍如何利用第三方制作平台/工具快速…

PHP实现在线年龄计算器

1. 输入日期查询年龄 2. php laravel框架实现 代码 /*** 在线年龄计算器*/public function ageDateCal(){// 输入的生日时间$birthday $this->request(birthday);// 当前时间$currentDate date(Y-m-d);// 计算周岁$age date_diff(date_create($birthday), date_create($…

Eleastisearch5.2.2利用镜像迁移构建实例后ES非健康状态

正常迁移完成后启动服务&#xff0c;查看ES非健康状态 此时观察ES集群状态&#xff1a;curl -XGET -u elastic:xxx localhost:9200/_cluster/health?pretty 注意到"active_shards_percent_as_number" : 88.8888 该项的值不产生变化;集群状态"status" : “…

SQL注入之Oracle注入

SQL注入之Oracle注入 7.1 SQL注入之Oracle环境搭建 前言 Oracle Database&#xff0c;又名Oracle RDBMS&#xff0c;或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是世界上流行的关系数据库管理系统…

(el-Form)操作(不使用 ts):Element-plus 中 Form 表单组件校验规则等的使用

Ⅰ、Element-plus 提供的 Form 表单组件与想要目标情况的对比&#xff1a; 1、Element-plus 提供 Form 表单组件情况&#xff1a; 其一、Element-plus 自提供的 Form 代码情况为(示例的代码)&#xff1a; // Element-plus 自提供的代码&#xff1a; // 此时是使用了 ts 语言环…

6.3 社会工程学攻击

数据参考&#xff1a;CISP官方 目录 社会工程学攻击概念社会工程学攻击利用的人性 “弱点”典型社会工程学攻击方式社会工程学攻击防护 一、社会工程学攻击概念 什么是社会工程学攻击 也被称为 "社交工程学" 攻击利用人性弱点 (本能反应、贪婪、易于信任等) 进…

栈存储结构详解

目录 栈存储结构详解 进栈和出栈 栈的具体实现 栈的应用 什么是队列&#xff08;队列存储结构&#xff09; 栈存储结构详解 同顺序表和链表一样&#xff0c;栈也是用来存储逻辑关系为 "一对一" 数据的线性存储结构&#xff0c;如图 1 所示。 图 1 栈存储结构示意…

HTML5的介绍和基本框架

目录 HTML5 HTML5介绍 HTML5的DOCTYPE声明 HTML5基本骨架 html标签 head标签 body标签 title标签 meta标签 在vscode中写出第一个小框架 HTML5 HTML5介绍 HTML5是用来描述网页的一种语言&#xff0c;被称为超文本标记语言。用HTML5编写的文件&#xff0c;后缀以.ht…

设备加密狗

场景描述 随着科技的飞速发展&#xff0c;越来越多的智能设备走进生产加工车间。例如智能雕刻机、钣金机、 榫槽机、钻孔机、磨刀机等等。 目前市场的智能设备具有一个共同的特点&#xff0c;内置嵌入操作系统&#xff0c;如windows或者linux系统。设备制造商提供智能设备出…

连续两年增收不增利,比亚迪电子靠新能源汽车业务再次起飞?

在净利润连续两年下挫之后&#xff0c;比亚迪电子&#xff08;00285.HK&#xff09;终于迎来了好消息。 不久前比亚迪电子发布2023年中期盈利预告显示&#xff0c;上半年净利润同比增加115%-146%&#xff08;2022年上半年的净利润显示6.34亿元&#xff09;。 这主要受益于大客…

包管理工具 nvm npm nrm yarn cnpm npx pnpm详解

包管理工具 nvm npm yarn cnpm npx pnpm npm、cnpm、yarn、pnpm、npx、nvm的区别&#xff1a;https://blog.csdn.net/weixin_53791978/article/details/122533843 npm、cnpm、yarn、pnpm、npx、nvm的区别&#xff1a;https://blog.csdn.net/weixin_53791978/article/details/1…