[k8s源码]9.workqueue

client-go 是一个库,提供了与 Kubernetes API 服务器交互的基础设施。它提供了诸如 Informer、Lister、ClientSet 等工具,用于监听、缓存和操作 Kubernetes 资源。而自定义控制器则利用这些工具来实现特定的业务逻辑和自动化任务。业务逻辑实现:client-go 不包含特定的业务逻辑。自定义控制器允许实现特定于您的应用程序或需求的逻辑。扩展 Kubernetes:通过自定义控制器,可以扩展 Kubernetes 的功能,处理自定义资源或实现特定的自动化任务。响应资源变化:自定义控制器可以监听特定资源的变化,并据此执行相应的操作。

而这里的workqueue是costromer Controller的一部分:

逻辑

当我们创建一个workqueue的时候,到底发生了什么

queue := workqueue.New()

该方法调用的new方法又调用了NewWithConfig()以及newQueueWithConfig().可以看到逐级返回以后,返回的是一个type类型的数据。 

func New() *Type {return NewWithConfig(QueueConfig{Name: "",})
}
func NewTyped[T comparable]() *Typed[T] {return NewTypedWithConfig(TypedQueueConfig[T]{Name: "",})
}func NewWithConfig(config QueueConfig) *Type {return NewTypedWithConfig(config)
}func NewTypedWithConfig[T comparable](config TypedQueueConfig[T]) *Typed[T] {return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
}// newQueueWithConfig constructs a new named workqueue
// with the ability to customize different properties for testing purposes
func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod time.Duration) *Typed[T] {var metricsFactory *queueMetricsFactoryif config.MetricsProvider != nil {metricsFactory = &queueMetricsFactory{metricsProvider: config.MetricsProvider,}} else {metricsFactory = &globalMetricsFactory}if config.Clock == nil {config.Clock = clock.RealClock{}}if config.Queue == nil {config.Queue = DefaultQueue[T]()}return newQueue(config.Clock,config.Queue,metricsFactory.newQueueMetrics(config.Name, config.Clock),updatePeriod,)
}
TypedInterface

Interface 被标记为废弃(Deprecated),并建议使用 TypedInterface 代替。这种变化主要是因为 Go 语言引入了泛型特性。TypedInterface[T comparable] 使用了泛型,T 是一个类型参数,它必须是可比较的(comparable)。泛型允许在编译时进行类型检查,提供了更好的类型安全性。使用 TypedInterface[T] 可以在编译时捕获类型错误,而不是在运行时。

这里最后返回了一个newQueue,而它的定义如下:

func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics, updatePeriod time.Duration) *Typed[T] {t := &Typed[T]{clock:                      c,queue:                      queue,dirty:                      set[T]{},processing:                 set[T]{},cond:                       sync.NewCond(&sync.Mutex{}),metrics:                    metrics,unfinishedWorkUpdatePeriod: updatePeriod,}// Don't start the goroutine for a type of noMetrics so we don't consume// resources unnecessarilyif _, ok := metrics.(noMetrics); !ok {go t.updateUnfinishedWorkLoop()}return t
}

那么Type类型到底是什么:Type 是 Typed[any] 的一个别名。这意味着 Type 可以在任何使用 Typed[any] 的地方使用,它们是完全等价的。

type Type = Typed[any]
type Typed[t comparable] struct {queue Queue[t]// dirty defines all of the items that need to be processed.dirty set[t]// Things that are currently being processed are in the processing set.// These things may be simultaneously in the dirty set. When we finish// processing something and remove it from this set, we'll check if// it's in the dirty set, and if so, add it to the queue.processing set[t]cond *sync.CondshuttingDown booldrain        boolmetrics queueMetricsunfinishedWorkUpdatePeriod time.Durationclock                      clock.WithTicker
}type empty struct{}
type t interface{}
type set[t comparable] map[t]empty

 这里有两个set,一个是process一个是dirty,一个项目可能同时存在于这两个集合中。这是因为一个正在处理的项目(在 processing 中)可能在处理过程中被标记为需要重新处理(因此也在 dirty 中)。如果它在 dirty 集合中,说明在处理过程中它被标记为需要重新处理。这时,系统会将它重新加入到处理队列中。

这里的t是一个空接口,允许存储任何形式的kubernetes资源。

这里还定义了接口,而Type实现了这个接口。

type Interface interface {Add(item interface{})Len() intGet() (item interface{}, shutdown bool)Done(item interface{})ShutDown()ShutDownWithDrain()ShuttingDown() bool
}
dirty队列

添加任务:当有新任务时,首先检查它是否已经在 dirty 中。如果不在,就添加进去。开始处理:当开始处理一个任务时,将它从 dirty 中移除。重新添加:如果一个正在处理的任务需要重新处理,就把它再次加入 dirty。dirty 帮助工作队列系统更高效地管理需要处理的任务,避免重复工作,并能快速决定是否需要添加新任务到处理队列中。 

各种类型的queue

在k8s.io/client-go/util/workqueue中查看。

从上面的例子可以看到,一个queue是有很多参数的,如果只是简单的通过new来创建,很多参数都是默认的参数。

限速队列

k8s.io/client-go/util/workqueue/default-rate-limiters.go

限速队列应用得非常广泛,比如在我们做一些操作失败后希望重试几次,但是立刻重试很有可能还是会失败,这个时候我们可以延迟一段时间再重试,而且失败次数越多延迟时间越长,这个其实就是限速。首先我们需要来了解下限速器

type RateLimiter TypedRateLimiter[any]type TypedRateLimiter[T comparable] interface {// When gets an item and gets to decide how long that item should waitWhen(item T) time.Duration// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for failing// or for success, we'll stop tracking itForget(item T)// NumRequeues returns back how many failures the item has hadNumRequeues(item T) int
}

TypedBucketRateLimiter (令牌桶限速器)
这个限速器基于令牌桶算法。想象一个固定容量的桶,桶里装着令牌。令牌以固定的速率被加入到桶中。当一个请求(或任务)到来时,它需要从桶中获取一个令牌。如果桶中有令牌,请求可以立即处理。如果桶是空的,请求必须等待直到新的令牌被加入。这种方法可以很好地控制平均处理速率,同时允许短时间的突发流量。
TypedItemExponentialFailureRateLimiter (指数退避限速器)
这个限速器根据失败次数增加等待时间:每次失败,等待时间会指数增加(基础延迟 * 2^失败次数)。有一个最大延迟时间,防止等待时间无限增长。
TypedItemFastSlowRateLimiter (快慢双速限速器)
这个限速器有两种速率:快速和慢速:在最初的几次尝试中使用快速延迟。超过设定的尝试次数后,切换到慢速延迟。适用于需要快速重试几次,然后如果仍然失败就减慢重试频率的场景。
TypedMaxOfRateLimiter (最大值限速器)
这个限速器组合了多个其他限速器:
它包含一个限速器的列表。当需要决定延迟时间时,它会询问所有的限速器。然后返回所有限速器中最长的延迟时间。这允许你组合多种限速策略,总是使用最保守(最慢)的那个。
TypedWithMaxWaitRateLimiter (最大等待时间限速器)

从代码中可以看到,有一个基础的RateLimiter的接口interface,然后其余的结构体都是这个端口的实现:

type TypedBucketRateLimiter[T comparable] struct {*rate.Limiter
}
ype TypedItemExponentialFailureRateLimiter[T comparable] struct {failuresLock sync.Mutexfailures     map[T]intbaseDelay time.DurationmaxDelay  time.Duration
}
type TypedItemFastSlowRateLimiter[T comparable] struct {failuresLock sync.Mutexfailures     map[T]intmaxFastAttempts intfastDelay       time.DurationslowDelay       time.Duration
}
type TypedMaxOfRateLimiter[T comparable] struct {limiters []TypedRateLimiter[T]
}
type TypedWithMaxWaitRateLimiter[T comparable] struct {limiter  TypedRateLimiter[T]maxDelay time.Duration
}

他们的new函数(部分)

func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {return NewTypedWithMaxWaitRateLimiter[any](limiter, maxDelay)
}func NewTypedWithMaxWaitRateLimiter[T comparable](limiter TypedRateLimiter[T], maxDelay time.Duration) TypedRateLimiter[T] {return &TypedWithMaxWaitRateLimiter[T]{limiter: limiter, maxDelay: maxDelay}
}

接口实现:

func (w TypedWithMaxWaitRateLimiter[T]) When(item T) time.Duration {delay := w.limiter.When(item)if delay > w.maxDelay {return w.maxDelay}return delay
}func (w TypedWithMaxWaitRateLimiter[T]) Forget(item T) {w.limiter.Forget(item)
}func (w TypedWithMaxWaitRateLimiter[T]) NumRequeues(item T) int {return w.limiter.NumRequeues(item)
}

 我们可以看到有的限速器需要一个基础限速器:NewTypedWithMaxWaitRateLimiter是从多个限速器中取得最大的限速时间。(这里函数名称不同,源代码里是NewTypedWithMaxWaitRateLimiter而实际演示代码是NewWithMaxWaitRateLimiter,这是因为源码读的是最新版,而实际安装的go是1.22,所以不一样,但是只有增加和缺少Type的区别)

baseRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second)
ratelimiter :=workqueue.NewWithMaxWaitRateLimiter(baseRateLimiter,10*time.Second)
ratelimitedQueue := workqueue.NewRateLimitingQueue(ratelimiter)
延迟队列 
type DelayingInterface interface {Interface// AddAfter adds an item to the workqueue after the indicated duration has passedAddAfter(item interface{}, duration time.Duration)
}
type delayingType struct {Interface// clock tracks time for delayed firingclock clock.Clock// stopCh lets us signal a shutdown to the waiting loopstopCh chan struct{}// stopOnce guarantees we only signal shutdown a single timestopOnce sync.Once// heartbeat ensures we wait no more than maxWait before firingheartbeat clock.Ticker// waitingForAddCh is a buffered channel that feeds waitingForAddwaitingForAddCh chan *waitFor// metrics counts the number of retriesmetrics retryMetrics
}func NewDelayingQueue() DelayingInterface {return NewDelayingQueueWithConfig(DelayingQueueConfig{})
}

具体实现:可以看到NewDelayingQueue()->NewDelayingQueueWithConfig{return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)},然后有一个newDelayingQueue但是带有参数的方法,这里的new的n是小写的,代表这是一个私有的方法,可以看到最后返回的是一个delayingType。而NewDelayingQueue()返回的是一个interface。

func NewDelayingQueue() DelayingInterface {return NewDelayingQueueWithConfig(DelayingQueueConfig{})
}func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {if config.Clock == nil {config.Clock = clock.RealClock{}}if config.Queue == nil {config.Queue = NewWithConfig(QueueConfig{Name:            config.Name,MetricsProvider: config.MetricsProvider,Clock:           config.Clock,})}return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)
}func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {ret := &delayingType{Interface:       q,clock:           clock,heartbeat:       clock.NewTicker(maxWait),stopCh:          make(chan struct{}),waitingForAddCh: make(chan *waitFor, 1000),metrics:         newRetryMetrics(name, provider),}go ret.waitingLoop()return ret
}
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {// don't add if we're already shutting downif q.ShuttingDown() {return}q.metrics.retry()// immediately add things with no delayif duration <= 0 {q.Add(item)return}select {case <-q.stopCh:// unblock if ShutDown() is calledcase q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:}
}

可以看到还有很多变种,但是最后都会调用 NewDelayingQueue但是带有参数的方法。


// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
// inject custom queue Interface instead of the default one
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {return NewDelayingQueueWithConfig(DelayingQueueConfig{Name:  name,Queue: q,})
}// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability.
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewNamedDelayingQueue(name string) DelayingInterface {return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name})
}// NewDelayingQueueWithCustomClock constructs a new named workqueue
// with ability to inject real or fake clock for testing purposes.
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {return NewDelayingQueueWithConfig(DelayingQueueConfig{Name:  name,Clock: clock,})
}

为什么newDelayingQueue返回的是type类型,而他的上级返回的是interface类型呢?以下面的代码为例:advancedAnimal实现的结构体包含一个interface:animal,和一个trick。这个trick字段是为了实现PerformTrick方法。

func (a *advancedAnimalType) PerformTrick() string {
    return a.trick
}

这个接收advancedAnimal的函数实现了PerformTrick(),所以可以看作是advancedAnimal实现了AdvancedAnimal的interface。 所以在下面的New函数中,虽然返回的是advancedAnimalType,但是最后NewAdvancedAnimal返回的是interface类型。

func NewAdvancedAnimal(config AdvancedAnimalConfig) AdvancedAnimal {
    if config.Animal == nil {
        config.Animal = NewAnimal(config.Species, config.Sound, config.Movement)
    }

    return &advancedAnimalType{
        Animal: config.Animal,
        trick:  config.Trick,
    }
}

type Animal interface {Speak() stringMove() string
}
// 扩展的 AdvancedAnimal 接口
type AdvancedAnimal interface {AnimalPerformTrick() string
}
// 基本的动物实现
type basicAnimal struct {species stringsound   stringmovement string
}
func (a *basicAnimal) Speak() string {return a.sound
}
func (a *basicAnimal) Move() string {return a.movement
}
// 高级动物实现
type advancedAnimalType struct {Animaltrick string
}func (a *advancedAnimalType) PerformTrick() string {return a.trick
}
// 创建基本动物的函数
func NewAnimal(species, sound, movement string) Animal {return &basicAnimal{species: species,sound:   sound,movement: movement,}
}
// 创建高级动物的函数
func NewAdvancedAnimal(config AdvancedAnimalConfig) AdvancedAnimal {if config.Animal == nil {config.Animal = NewAnimal(config.Species, config.Sound, config.Movement)}return &advancedAnimalType{Animal: config.Animal,trick:  config.Trick,}
}
// 配置结构体
type AdvancedAnimalConfig struct {Animal   AnimalSpecies  stringSound    stringMovement stringTrick    string
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/875014.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【ROS2】演示:为有损网络使用服务质量设置

目录 背景 先决条件 运行演示 命令行选项 添加网络流量 背景 请阅读有关 QoS 设置的文档页面&#xff0c;以获取有关 ROS 2 中可用支持的背景信息。 在这个演示中&#xff0c;我们将生成一个发布相机图像的节点和另一个订阅图像并在屏幕上显示图像的节点。然后&#xff0c;我们…

Fedora40安装telnet-server启用telnet服务

Fedora40安装telnet-server启用telnet服务 安装 telnet-server sudo yum install telnet-server或 sudo dnf install telnet-server启用服务 fedora40 或 CentosStream9 不能用 yum或dnf安装xinetd, telnet-server 的服务名为: telnet.socket 启用 telnet.socket.service …

三、基础语法2(30小时精通C++和外挂实战)

三、基础语法2&#xff08;30小时精通C和外挂实战&#xff09; B-02内联函数B-04内联函数与宏B-05_constB-06引用B-07引用的本质B-08-汇编1-X86-X64汇编B-09-汇编2-内联汇编B-10-汇编3-MOV指令C-02-汇编5-其他常见指令C-05-汇编8-反汇编分析C-07-const引用、特点 B-02内联函数 …

TreeSelect增加可筛选功能

TreeSelect官方可筛选示例 <template><el-tree-selectv-model"value":data"data"filterablestyle"width: 240px"/><el-divider /><el-divider />filter node method:<el-tree-selectv-model"value":data&q…

数据安全传输--加密算法

目录 古典加密算法与近代加密算法对比 算法分类 对称加密 常见的对称加密算法 在对称加密算法中密钥共享是一个很麻烦的问题 非对称加密 非对称加密过程 常见非对称加密算法 对称加密和非对称加密两者对比结论 DH算法 身份认证和数据认证技术 hash算法 hash算法特点…

PySide(PyQt),自定义图标按钮

1、在Qt Designer中新建画面&#xff0c;并放置3个按钮&#xff08;QPushButton&#xff09;和一个分组框&#xff08;QGroupBox&#xff09;小部件&#xff0c;分别命名为btn_1&#xff0c; btn_2&#xff0c;btn_3和btnStation。 2、将所有小部件的显示文字内容删除。 3、将…

论文复现:Predictive Control of Networked Multiagent Systems via Cloud Computing

Predictive Control of Networked Multiagent Systems via Cloud Computing论文复现 文章目录 Predictive Control of Networked Multiagent Systems via Cloud Computing论文复现论文摘要系统参数初始化系统模型观测器预测过程控制器设计系统的整体框图仿真结果 论文摘要 翻译…

杰发科技Bootloader(2)—— 基于7840的Keil配置地址

序 在7840的sample代码里面有一个简单的Boot跳转APP的示例 PFlash地址从0开始 DFlash的地址从1000000开始 Boot解析 他的boot地址配置为0 Boot的代码主要是这几行&#xff0c;主要作用就是Flash的跳转 int main(void) {SystemClock_Config();InitDebug();printf("demo…

NSAT-8000与Chroma8000相比,有什么独特优势?

在电源模块的广泛应用推动下&#xff0c;测试效率成为行业关注的焦点。纳米软件响应这一需求&#xff0c;推出了NSAT-8000电源自动测试系统&#xff0c;其0代码操作模式大幅简化了测试流程。那么与Chroma 8000系统相比&#xff0c;有什么不同呢&#xff1f; 一、测试项目搭建 C…

nacos get changed dataId error, code: 403

nacos get changed dataId error, code: 403问题解决 问题出现原因&#xff1a;解决办法&#xff1a;需要在运行项目的配置添加权限账号和密码,重启服务 问题出现原因&#xff1a; 由于nacosserver开启了权限验证&#xff0c;项目启动时出现异常 nacos.core.auth.caching.ena…

数据结构->线性结构->顺序存储->静态链表

一、思路 链表由节点组成。 1、分析需求&#xff0c;画图&#xff1a; 2、定义学生结构体&#xff0c;包含姓名、年龄、性别和下一个学生的指针&#xff1a; #include <stdio.h> #define N 20// 定义性别枚举类型&#xff0c;固定值&#xff0c;不是男就是女 typedef e…

torchscript接口

一、定义 定义script、eager、onnx 模式对比案例生成的模型可以被c调用接口解读 二、实现 定义 可以在高性能环境libtorch&#xff08;C &#xff09;中直接加载&#xff0c;实现模型推理&#xff0c;而无需Pytorch训练框架依赖无需代码&#xff0c;直接加载模型&#xff0c…

国中水务:果汁能救“水”吗?

喝下汇源果汁有什么&#xff08;“功效”&#xff09;&#xff1f;这家公司最有发言权。 今天我们聊聊——国中水务。 最近&#xff0c;国中水务公告称拟通过收购&#xff0c;间接控股北京汇源&#xff0c;即将把“垂涎已久”的汇源收入囊中。 两家的故事得从几年前说起&#…

学习大数据DAY21 Linux基本指令2

目录 思维导图 搜索查看查找类 find 从指定目录查找文件 head 与 tail 查看行 cat 查看内容 more 查看大内容 grep 过滤查找 history 查看已经执行过的历史命令 wc 统计文件 du 查看空间 管道符号 | 配合命令使用 上机练习 4 解压安装类 zip unzip 压缩解压 tar …

git跨库合并

1、背景 A为开发环境的代码仓库&#xff0c;B为生产环境的代码仓库。A和B之间不能通信。开发人员的本地电脑可以和A、B通信。 目的 上线时&#xff0c;需要将A代码合并B代码。 2、实现 2.1 添加远程仓库 2.1.1 代码方式 在B代码仓库中,将A添加为远程仓库。 git remote …

【保姆级教程】油猴脚本的安装使用

目录 前言 一、油猴简介 1. 核心功能 2. 应用场景 3. 安全性与兼容性 4. 社区生态 二、教学开始&#xff08;嫌麻烦直接目录跳转开始学习&#xff09; 1.插件安装&#xff08;以Microsoft Edge浏览器为例&#xff09; 2.获取脚本 3.大展身手 三、扩展&#xff08;脚…

2024年7月23日(samba DNS)

​ 回顾 1、关闭防火墙&#xff0c;关闭selinux systemctl stop firewalld systemctl disable firewalld setenforce 0 2、修改静态IP地址 vim /etc/sysconfig/network-scripts/ifcfg-ens33 #修改uuid的目的是为了保证网络的唯一性 3、重启网络服务 systemctl restart netwo…

Ansible的脚本-----playbook剧本【上】

目录 1.playbook剧本组成 2.playbook剧本实战演练 2.1 实战演练一&#xff1a;给被管理主机安装httpd服务 2.2 实战演练二&#xff1a;定义、引用变量 2.3 实战演练三&#xff1a;指定远程主机sudo切换用户 2.4 实战演练四&#xff1a;when条件判断 2.5 实战演练五&…

【Matlab 传感器布局优化】基于群智能算法的wsn覆盖优化研究

一 背景介绍 无线传感器网络&#xff08;Wireless Sensor Network, WSN&#xff09;作为远程环境监测系统应用的关键技术&#xff0c;能够在有限的能源供应下提供高效的传感和通信服务。覆盖控制是保证高效通信和可靠数据传输的重要手段。鉴于复杂的物理环境限制了节点部署方式…

文本编辑三巨头(grep)

目录 正则表达式 元字符 grep 案例 我在编写脚本的时候发现&#xff0c;三个文本编辑的命令&#xff08;grep、sed、awk&#xff0c;被称为文本编辑三剑客&#xff0c;我习惯叫它三巨头&#xff09;用的还挺多的&#xff0c;说实话我一开始学的时候也有些懵&#xff0c;主要…