Kubernetes CRD开发模式及源码实现深入剖析-Kubernetes商业环境实战

专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。留言请关注《数据云技术社区》公众号。

1 CRD资源扩展

  • CRD 即CustomResourceDefinition,是 kubernetes 极力推荐的资源扩展方式。
  • 基于 CRD 技术,用户能将自定义资源注册到 kubernetes 系统,并像使用原生资源(如 pod、statefulset )一样对自定义资源进行创建、查看、修改、删除等操作,实现了类似于插件式的功能增强。

2 CRD开发步骤

1.根据go.mod设定,建立目录$GOPATH/src/k8s.io module k8s.io/sample-controller
2.设置代理export GOPROXY=https://goproxy.io
3.加载依赖包,自动归档到vendor目录go mod vendor -v
4.设置update-codegen.sh"${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \k8s.io/sample-controller/pkg/client k8s.io/sample-controller/pkg/apis \samplecontroller:v1alpha1 \--output-base "$(dirname "${BASH_SOURCE[0]}")/../../.." \--go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt5.自定义控制器及main.go6.编译sample-controller生成可执行程序sample-controllergo build -o sample-controller .7.运行sample-controller./sample-controller -kubeconfig=$HOME/.kube/congig8.注册 CRD 资源 crd.yamlapiVersion: apiextensions.k8s.io/v1beta1kind: CustomResourceDefinitionmetadata:name: foos.samplecontroller.k8s.iospec:group: samplecontroller.k8s.ioversion: v1alpha1names:kind: Fooplural: foosscope: Namespacedkubectl create -f artifacts/examples/crd.yaml
9.创建 CRD 资源 example-foo.yamlapiVersion: samplecontroller.k8s.io/v1alpha1kind: Foometadata:name: example-foospec:deploymentName: example-fooreplicas: 1kubectl create -f artifacts/examples/example-foo.yaml10.查看 Foo 的部署情况kubectl get deployments
复制代码

3 CRD项目自定义控制器开发(以sample-controller为例)

3.1 main.go开发

  • 读取 kubeconfig 配置
  • 初始化kubeClient,监听普通事件
  • 初始化exampleClient, 监听Foo 事件
  • 初始化kubeInformerFactory和exampleInformerFactory
  • 初始化自定义 Controller
  • 开启 Controller
func main() {flag.Parse()// set up signals so we handle the first shutdown signal gracefullystopCh := signals.SetupSignalHandler()//读取 kubeconfig 配置,cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)if err != nil {klog.Fatalf("Error building kubeconfig: %s", err.Error())}//监听普通事件kubeClient, err := kubernetes.NewForConfig(cfg)if err != nil {klog.Fatalf("Error building kubernetes clientset: %s", err.Error())}//一个监听 Foo 事件exampleClient, err := clientset.NewForConfig(cfg)if err != nil {klog.Fatalf("Error building example clientset: %s", err.Error())}kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)//基于 Client(kubeClient, exampleClient)、Informer 初始化自定义 Controller,//监听 Deployment 以及 Foos 资源变化controller := NewController(kubeClient, exampleClient,kubeInformerFactory.Apps().V1().Deployments(),exampleInformerFactory.Samplecontroller().V1alpha1().Foos())// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)// Start method is non-blocking and runs all registered informers in a dedicated goroutine.kubeInformerFactory.Start(stopCh)exampleInformerFactory.Start(stopCh)//开启 Controllerif err = controller.Run(2, stopCh); err != nil {klog.Fatalf("Error running controller: %s", err.Error())}
}
复制代码

3.2 Controller处理事件逻辑

  • Controller 的关键成员即两个事件的 Listener(appslisters.DeploymentLister、listers.FooLister)这两个成员将由 main 函数传入参数进行初始化。
  • 为了缓冲事件处理,这里使用队列暂存事件,相关成员即为 workqueue.RateLimitingInterface record.EventRecorder 用于记录事件。
// Controller is the controller implementation for Foo resources
type Controller struct {kubeclientset kubernetes.Interfacesampleclientset clientset.InterfacedeploymentsLister appslisters.DeploymentListerdeploymentsSynced cache.InformerSyncedfoosLister        listers.FooListerfoosSynced        cache.InformerSynced// workqueue is a rate limited work queue. This is used to queue work to be// processed instead of performing it as soon as a change happens. This// means we can ensure we only process a fixed amount of resources at a// time, and makes it easy to ensure we are never processing the same item// simultaneously in two different workers.workqueue workqueue.RateLimitingInterface// recorder is an event recorder for recording Event resources to the// Kubernetes API.recorder record.EventRecorder
}//Controller.go
controller := &Controller{kubeclientset:     kubeclientset,sampleclientset:   sampleclientset,deploymentsLister: deploymentInformer.Lister(),deploymentsSynced: deploymentInformer.Informer().HasSynced,foosLister:        fooInformer.Lister(),foosSynced:        fooInformer.Informer().HasSynced,workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),recorder:          recorder,
}//main.go初始化入参
func NewController(kubeclientset kubernetes.Interface,sampleclientset clientset.Interface,deploymentInformer appsinformers.DeploymentInformer,fooInformer informers.FooInformer) *Controller
复制代码

3.3 自定义Controller事件处理函数

  • 设置对 Foo 资源变化的事件处理函数(Add、Update 均通过 enqueueFoo 处理)
  • 设置对 Deployment 资源变化的事件处理函数(Add、Update、Delete 均通过 handleObject 处理)
  • 引出enqueueFoo 以及 handleObject 的实现
// Set up an event handler for when Foo resources changefooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.enqueueFoo,UpdateFunc: func(old, new interface{}) {controller.enqueueFoo(new)},})// Set up an event handler for when Deployment resources change. This// handler will lookup the owner of the given Deployment, and if it is// owned by a Foo resource will enqueue that Foo resource for// processing. This way, we don't need to implement custom logic for// handling Deployment resources. More info on this pattern:deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.handleObject,UpdateFunc: func(old, new interface{}) {newDepl := new.(*appsv1.Deployment)oldDepl := old.(*appsv1.Deployment)if newDepl.ResourceVersion == oldDepl.ResourceVersion {// Periodic resync will send update events for all known Deployments.// Two different versions of the same Deployment will always have different RVs.return}controller.handleObject(new)},DeleteFunc: controller.handleObject,})
复制代码

3.4 enqueueFoo 以及 handleObject 实现

  • enqueueFoo 就是解析 Foo 资源为namespace/name 形式的字符串,然后入队
  • handleObject 监听了所有实现了 metav1 的资源,但只过滤出 owner 是 Foo 的,将其解析为 namespace/name 入队。不是 Foo不做处理。
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Foo.
func (c *Controller) enqueueFoo(obj interface{}) {var key stringvar err errorif key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {utilruntime.HandleError(err)return}c.workqueue.AddRateLimited(key)
}// handleObject will take any resource implementing metav1.Object and attempt
// to find the Foo resource that 'owns' it. It does this by looking at the
// objects metadata.ownerReferences field for an appropriate OwnerReference.
// It then enqueues that Foo resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
func (c *Controller) handleObject(obj interface{}) {var object metav1.Objectvar ok boolif object, ok = obj.(metav1.Object); !ok {tombstone, ok := obj.(cache.DeletedFinalStateUnknown)if !ok {utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))return}object, ok = tombstone.Obj.(metav1.Object)if !ok {utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))return}klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())}klog.V(4).Infof("Processing object: %s", object.GetName())if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {// If this object is not owned by a Foo, we should not do anything more// with it.if ownerRef.Kind != "Foo" {return}foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)if err != nil {klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)return}c.enqueueFoo(foo)return}
}
复制代码

3.4 Controler核心run处理函数

  • 在main.go中同步,等待 Informer 同步完成,并发执行runWorker,处理队列内事件。
  • runWorker引出processNextWorkItem,processNextWorkItem引出syncHandler。
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {defer utilruntime.HandleCrash()defer c.workqueue.ShutDown()// Start the informer factories to begin populating the informer cachesklog.Info("Starting Foo controller")// Wait for the caches to be synced before starting workersklog.Info("Waiting for informer caches to sync")if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {return fmt.Errorf("failed to wait for caches to sync")}klog.Info("Starting workers")// Launch two workers to process Foo resourcesfor i := 0; i < threadiness; i++ {go wait.Until(c.runWorker, time.Second, stopCh)}klog.Info("Started workers")<-stopChklog.Info("Shutting down workers")return nil
}// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {for c.processNextWorkItem() {}
}// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {obj, shutdown := c.workqueue.Get()if shutdown {return false}// We wrap this block in a func so we can defer c.workqueue.Done.err := func(obj interface{}) error {// We call Done here so the workqueue knows we have finished// processing this item. We also must remember to call Forget if we// do not want this work item being re-queued. For example, we do// not call Forget if a transient error occurs, instead the item is// put back on the workqueue and attempted again after a back-off// period.defer c.workqueue.Done(obj)var key stringvar ok bool// We expect strings to come off the workqueue. These are of the// form namespace/name. We do this as the delayed nature of the// workqueue means the items in the informer cache may actually be// more up to date that when the item was initially put onto the// workqueue.if key, ok = obj.(string); !ok {// As the item in the workqueue is actually invalid, we call// Forget here else we'd go into a loop of attempting to// process a work item that is invalid.c.workqueue.Forget(obj)utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))return nil}// Run the syncHandler, passing it the namespace/name string of the// Foo resource to be synced.if err := c.syncHandler(key); err != nil {// Put the item back on the workqueue to handle any transient errors.c.workqueue.AddRateLimited(key)return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())}// Finally, if no error occurs we Forget this item so it does not// get queued again until another change happens.c.workqueue.Forget(obj)klog.Infof("Successfully synced '%s'", key)return nil}(obj)if err != nil {utilruntime.HandleError(err)return true}return true
}
复制代码

3.5 Controler核心逻辑syncHandler自定义处理函数

  • syncHandler 的处理逻辑大体如下:
  • 根据 namespace/name 获取 foo 资源
  • 根据 foo,获取其 Deployment 名称,进而获取 deployment 资源(没有就为其创建)
  • 根据 foo 的 Replicas 更新 deployment 的 Replicas(如果不匹配)
  • 更新 foo 资源的状态为最新 deployment 的状态(其实就是 AvailableReplicas)
  • 由此,可知 foo 的实现实体其实就是 deployment
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {// Convert the namespace/name string into a distinct namespace and namenamespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))return nil}// Get the Foo resource with this namespace/namefoo, err := c.foosLister.Foos(namespace).Get(name)if err != nil {// The Foo resource may no longer exist, in which case we stop// processing.if errors.IsNotFound(err) {utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))return nil}return err}deploymentName := foo.Spec.DeploymentNameif deploymentName == "" {// We choose to absorb the error here as the worker would requeue the// resource otherwise. Instead, the next time the resource is updated// the resource will be queued again.utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))return nil}// Get the deployment with the name specified in Foo.specdeployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)// If the resource doesn't exist, we'll create itif errors.IsNotFound(err) {deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))}// If an error occurs during Get/Create, we'll requeue the item so we can// attempt processing again later. This could have been caused by a// temporary network failure, or any other transient reason.if err != nil {return err}// If the Deployment is not controlled by this Foo resource, we should log// a warning to the event recorder and retif !metav1.IsControlledBy(deployment, foo) {msg := fmt.Sprintf(MessageResourceExists, deployment.Name)c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)return fmt.Errorf(msg)}// If this number of the replicas on the Foo resource is specified, and the// number does not equal the current desired replicas on the Deployment, we// should update the Deployment resource.if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))}// If an error occurs during Update, we'll requeue the item so we can// attempt processing again later. THis could have been caused by a// temporary network failure, or any other transient reason.if err != nil {return err}// Finally, we update the status block of the Foo resource to reflect the// current state of the worlderr = c.updateFooStatus(foo, deployment)if err != nil {return err}c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)return nil
}
复制代码

4 Controler核心逻辑注册

  • pkg/samplecontroller/v1alpha1/register.go(处理类型 Schema)
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: samplecontroller.GroupName, Version: "v1alpha1"}// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {return SchemeGroupVersion.WithKind(kind).GroupKind()
}// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {return SchemeGroupVersion.WithResource(resource).GroupResource()
}var (SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)AddToScheme   = SchemeBuilder.AddToScheme
)// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {scheme.AddKnownTypes(SchemeGroupVersion,&Foo{},&FooList{},)metav1.AddToGroupVersion(scheme, SchemeGroupVersion)return nil
}
复制代码

5 总结

整个开发遇到各种问题,比如:包结构和相对路径,GO代理,以及K8s CRD 官方提供函数等,通过反复的实践,终于掌握整套Kubernetes CRD开发模式,以此进行留记,方便查阅。

专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。留言请关注《数据云技术社区》公众号。

转载于:https://juejin.im/post/5d57f750f265da03b94ff11a

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/363132.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

禁止浏览器缓存

近期在改页面的时候看到了页面中有response.setHeader("Pragma","No-cache"); 这样的语句&#xff0c;不知道是做什么的&#xff0c;经查得知原来是禁止浏览器缓存的&#xff0c;现把Html&#xff0c;Asp&#xff0c;Php&#xff0c;Jsp4种语言禁止浏览器缓…

Spring @Transactional实际如何工作?

在本文中&#xff0c;我们将深入探讨Spring事务管理。 我们将讨论Transactional在Transactional如何真正工作。 其他即将发布的帖子将包括&#xff1a; 如何使用传播和隔离等功能 主要陷阱是什么以及如何避免它们 JPA和事务管理 重要的是要注意&#xff0c;JPA本身不提供任何…

python3-开发进阶-RESTful 软件架构风格

一. 什么是RESTful REST与技术无关&#xff0c;代表的是一种软件架构风格&#xff0c;REST是Representational State Transfer的简称&#xff0c;中文翻译为“表征状态转移”REST从资源的角度类审视整个网络&#xff0c;它将分布在网络中某个节点的资源通过URL进行标识&#x…

mysql卸载重装总是卡在starting server这一选项

因为自己不小心把msyql给下载了&#xff0c;重装了一个5.7版本的可是在安装时卡在starting server这一部分&#xff0c;运行不下去。重写卸载重装仍然不成功&#xff0c;还是卡在starting server.无法继续下面的安装&#xff0c;查看日志也没有报错信息。 问题分析&#xff1a;…

两款开源Web功能测试工具

参考自&#xff1a;http://www.ibm.com/developerworks/cn/web/wa-aj-testing/twill:官网&#xff1a;http://twill.idyll.org/安装&#xff1a;easy_install twill运行&#xff1a;twill-sh文档&#xff1a;http://twill.idyll.org/windmill安装&#xff1a;easy_install wind…

箭头函数与普通函数的区别

箭头函数&#xff1a; let fun () > {console.log(lalalala); } 普通函数&#xff1a; function fun() {console.log(lalla); } 箭头函数相当于匿名函数&#xff0c;并且简化了函数定义。箭头函数有两种格式&#xff0c;一种只包含一个表达式&#xff0c;连{ ... }和return…

前端基础-HTML标记语言

阅读目录 一、 HTML标签与文档结构二、 HTML标签详细语法与注意点三、 HTML中标签分类四、 HTML注释 一、 HTML标签与文档结构 HTML作为一门标记语言&#xff0c;是通过各种各样的标签来标记网页内容的。我们学习HTML主要就是学习的HTML标签。 那什么是标签呢&#xff1f; #…

cf1199解题报告

目录 cf1199解题报告ABCDEFcf1199解题报告 发一波水题。 A 模拟 #include <bits/stdc.h> #define ll long long using namespace std; const int _1e67; int n,x,y,a[_]; int main() {scanf("%d%d%d",&n,&x,&y);for(int i1;i<n;i) scanf("…

带有Gradle的Docker容器分为4个步骤

您是否需要通过Java Web应用程序创建Docker映像&#xff1f; 您在使用Gradle吗&#xff1f; 如果是这样&#xff0c;那么您距Docker nivana仅4步之遥。 对于此示例&#xff0c;我将使用一个简单的Spring Boot应用程序。 您可以在我的名为galoshe的Github存储库中找到所有源代码…

ImagesSprite V1.1.1 Beta发布

ImagesSprite v 1.1 发布后有网友提出样式显示了&#xff0c;但不能复制&#xff0c;现做调整如下&#xff08;20090804&#xff09;&#xff1a; 1.修正主界面下方css样式不能复制。 2.修正生成时透明色没有保存。【下载ImagesSprite V1.1.1 Beta】- 点击:4次转载于:https://w…

使用Spark构建简单的RESTful API

免责声明 &#xff1a;这篇文章是关于名为Spark的Java微型Web框架的&#xff0c;而不是关于数据处理引擎Apache Spark的 。 在此博客文章中&#xff0c;我们将看到如何使用Spark构建简单的Web服务。 如免责声明中所述&#xff0c;Spark是受Ruby框架Sinatra启发的Java微型Web框…

cf1206解题报告

目录 cf1206解题报告ABCDE&#xff0c;Fcf1206解题报告 A 模拟 #include <bits/stdc.h> using namespace std; int n,m,a[1010],b[12910]; int dsr[1111]; int main() {scanf("%d",&n);for(int i1;i<n;i) scanf("%d",&a[i]),dsr[a[i]]1;s…

canvas时钟

<!DOCTYPE html><html><head lang"en"><meta charset"UTF-8"><title></title></head><body><canvas id"canvas" width"500" height"500">您的浏览器不支持&#xff…

第七周

这个作业属于哪个课程C语言程序设计 &#xff08;第三版&#xff09;这个作业要求在哪里2019春季第七周作业我的课程目标学习指针的运用这个作业在哪个具体方面帮助我实现目标这个作业让我知道了指针实用性参考文献无一、2019春第七周作业&#xff08;基础题&#xff09; 7-2 自…

很久没来了,嘿嘿 问候一下大家,O(∩_∩)O哈哈~

很久没来了&#xff0c;嘿嘿。发现上次发帖正好是一个月以前哦&#xff0c;嘿嘿。最近职位火热招聘&#xff0c;有意向的快联系哦&#xff0c;嘿嘿1. BI Technical PM BI Technical PM Job Location: Redmond Oversea education or working background. 2 …

Java EE 8 –为更多设备提供更多应用程序

如果我不喜欢夏天的一件事&#xff0c;那就是事实是没有太多要分享或谈论的新闻。 谁决定将Java Day Tokyo置于这一年的无聊时间里&#xff0c;谁干得不错&#xff0c;就给我一个机会撰写有关新的和即将到来的Java EE 8规范的博客文章&#xff0c;其中包含了更多的思想和建议。…

全局预处理与执行,作用域与作用域链

一.全局预处理 <!DOCTYPE html><html><head lang"en"><meta charset"UTF-8"><title></title></head><body><p>全局预处理&#xff1a;首先会创建一个词法环境(Lexical Environment),然后扫面全局里…

2090. 「ZJOI2016」旅行者 分治,最短路

2090. 「ZJOI2016」旅行者 链接 loj 思路 \((l,mid)(mid1,r)\).考虑跨过mid的贡献。 假设选的中间那条线的点为gzy,贡献为\(dis(x,gzy)dis(gzy,y)\) 那就计算n遍最短路,一次分治为\(n^2mlog{nm}\) 设Sn*m.矩阵的长度是不定的&#xff0c;每次取最长的边进行分治是最好的&#x…

Xshell连接Linux慢问题解决办法

由于各种原因&#xff0c;经常更换网络环境&#xff0c;然后发现&#xff0c;每次更换网络环境后&#xff0c;xshell连接虚拟机的rhel或者CentOS都几乎是龟速.... 今天专门查了一下解决方案&#xff1a; 原来是ssh的服务端在连接时会自动检测dns环境是否一致导致的&#xff0c;…

Gradle入门:依赖管理

即使不是没有可能&#xff0c;创建没有任何外部依赖关系的现实应用程序也是一项挑战。 这就是为什么依赖性管理是每个软件项目中至关重要的部分的原因。 这篇博客文章描述了我们如何使用Gradle管理项目的依赖关系。 我们将学习配置已使用的存储库和所需的依赖项。 我们还将通过…