云原生学习路线导航页(持续更新中)
- 本文是 Kubernetes operator学习 系列第一篇,主要对client-go进行学习,从源码阅读角度,学习client-go各个组件的实现原理、如何协同工作等
- 参考视频:Bilibili 2022年最新k8s编程operator篇,UP主:白丁云原生
- 本文引用声明
- 部分内容来自 白丁云原生 所提供 资料:链接: https://pan.baidu.com/s/1BibLAishAFJLeTyYCLnlbA 提取码: en2p
- 部分来自:https://zhuanlan.zhihu.com/p/573982128
- 并根据个人理解进行了汇总和修改
1.为什么要学习 client-go
- 为了适应更多的业务场景,k8s提供了很多的扩展点,用于满足更复杂的需求
- K8s的扩展点如下:
- ① kubectl
- 用户通过kubectl与ApiServer进行交互,kubectl提供了插件,可以扩展kubectl的行为,但是这些插件只能影响用户本地的环境
- ② API Server
- 处理所有的请求,可以对用户请求进行 身份认证、基于其内容阻止请求、编辑请求内容、处理删除操作等等。
- 这个扩展点应该说的是:用户可以 自定义 API Server
- ③ k8s提供的内置资源
- 我们无法修改,只能通过 annotation、label 控制他们
- ④ CRD
- 自定义资源,配合 自定义控制器Custom Controller,扩展k8s的特定业务场景
- ⑤ scheduler
- 调度器,决定k8s把Pod放到哪个节点执行。k8s提供了多种方式扩展调度行为
- ⑥ Controller Manager
- 实际上也是k8s的一个客户端,通过与API Server交互。k8s的每种资源都有对应的控制器,都属于ControllerManager
- client-go本质上就是一个与apiserver交互的库,所以Controller Manager也是通过 client-go 库与 API Server交互的
- ⑦ Custom Controller
- 自定义控制器,可以控制 内置资源,也可以控制自定义资源CRD
- ⑧ kubelet
- 使用CNI:使得k8s可以使用不同技术,连接Pod网络
- 使用CSI:使得k8s可以支持不同的存储类型
- 使用CRI:使得k8s可以支持不同的容器运行时
- ⑨ client-go
- 一个通用的Golang库,用于和 apiserver 交互
- 不管是k8s的各个组件,还是我们自己为CRD开发Custom Controller,都需要使用 client-go 与 API Server 进行通信
综上所述,要想学习 Operator,使用 CRD + Custom Controller 扩展kubernetes功能,必须先学习 Client-go 库,学会如何与APIServer进行交互
2.client-go与kubernetes版本对应关系
我们假设前提:kubernetes版本为 v1.x.y
- kubernetes版本 >= v1.17.0时,client-go 版本使用 v0.x.y
- kubernetes版本 < v1.17.0时,client-go 版本使用 v1.x.y
3.client-go架构
3.1.client-go 源代码目录介绍
- github地址:https://github.com/kubernetes/client-go
- discovery:用于发现API Server都是支持哪些API。kubectl apiversions使用了同样的机制
- dynamic:包含了kubernetes dynamic client的逻辑,可以操作任意的k8s资源API对象,包括内置的、自定义的资源对象
- informers:包含了所有内置资源的informer,便于操作k8s的资源对象
- kubernetes:包含了访问Kubernetes API的 所有ClientSet
- listers:包含了所有内置资源的lister,用于读取缓存中k8s资源对象的信息
- plugin/pkg/client/auth:包含所有可选的认证插件,用于从外部获取credential(凭证)
- tools:包含一系列工具,编写控制器时会用到很多里面的工具方法
- transport:包含了创建连接、认证的逻辑,会被上层的ClientSet使用
3.2.client-go 架构
图片参考来源:https://zhuanlan.zhihu.com/p/573982128
下面先介绍各组件整体的运转流程,然后对 client-go 和 一个 CRDController 应该包含哪些组件进行详细介绍。
- 整体流程简介:
- Reflector会持续监听k8s集群中指定资源类型的API,当发现变动和更新时,就会创建一个发生变动的 对象副本,并将其添加到队列DeltaFIFO中
- Informer监听DeltaFIFO队列,取出对象,做两件事:
- (1)将对象加入Indexer,Indexer 会将 [对象key, 对象] 存入一个线程安全的Cache中
- (2)根据对象的 资源类型和操作,找到对应 Controller 预先提供的 Resource Event Handler,调用Handler,将对象的Key加入该 Controller 的 Workqueue
- Controller 的循环函数 ProcessItem,监听到 Workqueue 有数据了,就会取出一个key,交给处理函数Worker,Worker 会根据 Key,使用 Indexer reference 从 Cache 中 获取 该key对应的 真实对象。然后就可以进行调谐了。
- 注意点
- DeltaFIFO 中 存的是 对象副本
- Cache 中 存的是 [对象key, 对象] 的映射
- Workqueue 中存的是 对象Key
- CRDController 中,使用Informer对象,是为了向其中添加一些 Resource Event Handlers
- CRDController 中,使用Indexer对象,是为了根据对象Key,获取对象实例
- client-go组件
- Reflector
- reflector会一直监听kubernetes中指定资源类型的API,实现监听的函数就是ListAndWatch。这种监听机制既适用于k8s的内建资源,也适用于自定义资源。
- 当reflector通过监听API发现资源对象实例存在新的 notification 时,它就会使用 listing API 获取这个新的实例对象,并将其放入 watchHandler 函数内的 DeltaFIFO 中;
- Informer
- Informer 会从 Delta FIFO 中取出对象。实现这个功能的方法对应源码中的 processLoop;
- Informer 取出对象后,根据Resource类型,调用对应的 Resource Event Handler 回调函数,该函数实际上由某个具体的 Controller 提供,函数中会获取对象的 key,并将 key 放入到 该Controller 内部的 Workqueue 中,等候处理。
- Indexer 和 Thread Safe Store
- Indexer 会提供对象的索引功能,通常是基于对象Key来创建索引。默认索引函数是MetaNamespaceKeyFunc, 它生成的索引键为/格式。
- Indexer 维护着一个线程安全的 Cache,即 Thread Safe Store。存储的是[对象key, 对象],用对象Key可以进行获取对象实例。
- Resource Event Handlers reference
- 这实际上是所有Controller的Resource Event Handlers的引用。
- 这些 handlers 由具体的Controller提供,就是 Informer 的回调函数。Informer 会根据资源的类型,调用对应Controller的 handler 方法
- handler 通常都是用于将资源对象的key放入到 该Controller 内部的 Workqueue 中,等候处理。
- Reflector
- 自定义控制器组件
- Informer reference
- Informer reference 是 Informer 实例对象的引用,用于操作和处理自定义资源对象
- 我们编写自定义控制器时,需要引用自己需要的Informer,向其中加入一系列 Resource Event Handlers
- Indexer reference
- Indexer reference 是 Indexer实例对象的引用,用于根据对象Key索引资源对象
- 我们编写自定义控制器时,应该创建Indexer的引用,将对象Key传给它,就可以获取想要处理的对象
- NewIndexerInformer函数
- client-go中的基本控制器提供了 NewIndexerInformer 函数,用于创建Informer和Indexer。
- 可以直接使用NewIndexerInformer 函数,或者也可以使用工厂方法来创建Informer
- Resource Event Handlers
- 由具体的 Controller 给 Client-go 的Informer 提供的回调函数,获取待处理对象的key,并将key放入到Workqueue中。
- Workqueue
- 此队列是 具体的Controller 内部创建的队列,用于暂时存储从Resource event handler 中 传递过来的,待处理对象的Key。
- Resource event handler 函数通常会获取待处理对象的key,并将key放入到这个workqueue中。
- Process Item
- 这个函数为循环函数,它不断从 Work queue 中取出对象的key,并使用 Indexer Reference 获取这个key对应的具体资源对象,然后根据资源的变化,做具体的调谐 Reconcile 动作。
- Informer reference
3.3.使用client-go编写Controller的步骤
- 根据2.2中所述,编写一个 自定义Controller,需要实现如下功能。
- 先从client-go中获取对应资源的 Informer
- 提供 一系列的 Resource event handlers,并加入对应的Informer,供该informer回调
- 提供一个 Workqueue 队列,存储待处理的对象的Key
- 提供一个 循环函数 ProcessItem,不断从 Workqueue 中取出对象的key,交给 处理函数 Worker
- 提供一个 处理函数 Worker,根据对象Key,使用对应资源的Indexer,获取到该对象的实例,根据对象的属性变化,做真正的调谐过程。
4.client-go的client组件
4.1.Client类型
- 我们知道,client-go 可以实现与 kubernetes 的通信。如何实现的呢?
- client-go 主要提供了4种 client 组件:
RESTClient
:最基础的客户端,提供最基本的封装,可以通过它组装与API Server即时通讯时 的 urlClientset
:是一个Client的集合,在Clientset中包含了所有K8S内置资源 的 Client,通过Clientset便可以很方便的操作如Pod、Service这些资源dynamicClient
:动态客户端,可以操作任意K8S的资源,包括CRD定义的资源DiscoveryClient
:用于发现K8S提供的资源组、资源版本和资源信息,比如:kubectl api-resources
- 4种client分别对应源码目录:
4.2.RESTClient
4.2.1.RESTClient结构体
type RESTClient struct {// base is the root URL for all invocations of the clientbase *url.URL// versionedAPIPath is a path segment connecting the base URL to the resource rootversionedAPIPath string// content describes how a RESTClient encodes and decodes responses.content ClientContentConfig// creates BackoffManager that is passed to requests.createBackoffMgr func() BackoffManager// rateLimiter is shared among all requests created by this client unless specifically// overridden.rateLimiter flowcontrol.RateLimiter// warningHandler is shared among all requests created by this client.// If not set, defaultWarningHandler is used.warningHandler WarningHandler// Set specific behavior of the client. If not set http.DefaultClient will be used.Client *http.Client
}
4.2.2.RESTClient常用方法
- RESTClientFor()
- 位置:rest/config.go 文件
- 函数签名:
func RESTClientFor(config *Config) (*RESTClient, error)
,直接 rest点 调用 - 该方法是用于 创建一个 RESTClient 实例
- 接收一个
rest.Config
类型参数,Config中包含了 限速器、编解码器 等- RESTClientFor 方法内部会从 Config 中取出这些配置,设置给RESTClient 实例
- 这样RESTClient 实例就具有了 限速、编解码 等多种功能
- 因此,我们创建Config的时候,可以手动设置这些功能,下面的示例中会展示。
- RESTClient实例的常用方法
- /rest/client.go 中有一个接口 Interface
// Interface captures the set of operations for generically interacting with Kubernetes REST apis. type Interface interface {GetRateLimiter() flowcontrol.RateLimiterVerb(verb string) *RequestPost() *RequestPut() *RequestPatch(pt types.PatchType) *RequestGet() *RequestDelete() *RequestAPIVersion() schema.GroupVersion }
- RESTClient 实现了这个接口,因此具有所有的方法,用于发送各种类型的请求
- 另外,Interface 每个方法的返回值都是 Request 类型,Request 类型的各种方法,很多的返回值也是 Request,这样就可以实现 链式编程 了
- /rest/client.go 中有一个接口 Interface
4.2.3.Request 和 Result 常用方法
- Request 位于 /rest/request.go
func (r *Request) Namespace(namespace string) *Request
:设置 当前Resquest 访问的 namespacefunc (r *Request) Resource(resource string) *Request
:设置 当前Resquest 想要访问的资源类型func (r *Request) Name(resourceName string) *Request
:设置 当前Resquest 想要访问的资源的名称func (r *Request) Do(ctx context.Context) Result
:格式化并执行请求。返回一个 Result 对象,以便于处理响应。
- Result 也位于 /rest/request.go
4.2.4.RESTClient使用示例
- 需求:获取default命名空间下的所有pod,并打印所有pod的name
package mainimport ("context"v1 "k8s.io/api/core/v1""k8s.io/client-go/kubernetes/scheme""k8s.io/client-go/rest""k8s.io/client-go/tools/clientcmd" )func main() {// 在你机器的homeDir下,放入集群的config文件,用于连接集群(可以直接从集群master的~/.kube/config拷贝过来)config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)if err != nil {panic(err)}// 设置默认 GroupVersion(我要操作的是pod,不属于任何的group,所以使用了SchemeGroupVersion。你要操作什么,就写什么GroupVersion即可)config.GroupVersion = &v1.SchemeGroupVersion// 设置编解码器(后面的 Into方法 就是使用它完成 反序列化 的)config.NegotiatedSerializer = scheme.Codecs// 设置 API 根的子路径config.APIPath = "/api"// 创建一个 RESTClientrestClient, err := rest.RESTClientFor(config)if err != nil {panic(err)}// 创建一个Pod,用于接收请求结果pods := v1.PodList{}// 链式编程 发送请求,并反序列化结果到pod中err = restClient.Get().Namespace(v1.NamespaceDefault).Resource("pods").Do(context.TODO()).Into(&pods)if err != nil {panic(err)}// 打印pod名称for _, pod := range pods.Items {println(pod.Name)} }