本文记录用kubebuilder和code-generator开发k8s的crd控制器。
概览
和k8s.io/code-generator类似,是一个码生成工具,用于为你的CRD生成kubernetes-style API实现。区别在于:
Kubebuilder不会生成informers、listers、clientsets,而code-generator会。
Kubebuilder会生成Controller、Admission Webhooks,而code-generator不会。
Kubebuilder会生成manifests yaml,而code-generator不会。
Kubebuilder还带有一些其他便利性设施。
安装kubebuilder
https://github.com/kubernetes-sigs/kubebuilder/releases/tag/v3.13.0
在这里下载kubebuilder对应芯片版本:
本身作为一个二进制可执行文件,放入到/usr/local/bin目录下即可。
使用go mod
mkdir example
go mod init gateway
kubebuilder init --domain example.com
kubebuilder edit --multigroup=true
创建API
kubebuilder create api --group app --version v1 --kind Gateway
Create Resource [y/n]
y
Create Controller [y/n]
n
在example目录下创建了,api目录,制定了group和版本以及资源类型。
会自动生成apis/app/v1目录,里面有{crd}_types.go和zz_generated.deepcopy.go文件,如果需要配置和修改crd字段,可以修改{crd}_types.go中的crd spec结构体,见下图:
修改{crd}_types.go之后,需用重新执行make manifests重新生成crd,此时zz_generated.deepcopy.go也会同步更新。
生成RBAC manifests
在api/app/v1/目录创建rbac.go文件,加入以下内容:
// +kubebuilder:rbac:groups=app.example.com,resources=gateways,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=app.example.com,resources=gateways/status,verbs=get;update;patchpackage v1
生成crd manifests
make manifests
这一步会生成config/crd/bases目录,以及该目录下的crd yaml
使用code-generator
先下载code-generator
go get -u -v k8s.io/code-generator/...
这里要下下载很多依赖包
在后面的编译过程中,需要手动下载一些依赖包到$GOPATH下
同时需要升级go到1.21版本,已解决如下问题:
准备脚本
在hack目录下新增update-codegen.sh和verify-codegen.sh
其中update-codegen.sh如下
其中:
MODULE=gateway,这里和go.mod保持一致(go mod init gateway)
API_PKG=api,目前工程的api目录,有的可能是api
OUTPUT_PKG=generated/app:输出结果的目录(generated/{group},group是之前kubebuilder create api是指定的group参数)
GROUP_VERSION=app:v1,也是跟kubebuilder create api是的group和version保持一致
#!/usr/bin/env bashset -o errexit
set -o nounset
set -o pipefail# corresponding to go mod init <module>
MODULE=gateway
# api package
APIS_PKG=api
# generated output package
OUTPUT_PKG=generated/app
# group-version such as foo:v1alpha1
GROUP_VERSION=app:v1SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 /Users/luoyi/go/src/k8s.io/code-generator 2>/dev/null || echo /Users/luoyi/go/src/k8s.io/code-generator)}# generate the code with:
# --output-base because this script should also be able to run inside the vendor dir of
# k8s.io/kubernetes. The output-base is needed for the generators to output into the vendor dir
# instead of the $GOPATH directly. For normal projects this can be dropped.
bash "${CODEGEN_PKG}"/generate-groups.sh "client,lister,informer" \${MODULE}/${OUTPUT_PKG} ${MODULE}/${APIS_PKG} \${GROUP_VERSION} \--go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt \--output-base "${SCRIPT_ROOT}"
# --output-base "${SCRIPT_ROOT}/../../.." \
这个脚本主要就是制定了一些模块名和运行脚本的参数。本质上与进入到$GOPATH/src/k8s.io,执行如下命令一样:
./generate-groups.sh all /Users/luoyi/k8s/example/generated/app /Users/luoyi/k8s/example/api app:v1
其中,/Users/luoyi/k8s/example/generated/app指定了生成代码的路径,/Users/luoyi/k8s/example/api指定了作用路径,最后一个参数指定了group和版本。all表示client、informer和listener都生成。
在生成代码前还需要做些配置:
在{crd}_type.go上配置tag // +genclient,用于生成clienset
// +genclient
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status// Gateway is the Schema for the gateways API
type Gateway struct {metav1.TypeMeta `json:",inline"`metav1.ObjectMeta `json:"metadata,omitempty"`Spec GatewaySpec `json:"spec,omitempty"`
在apis/app/v1下新建doc.go,其中groupName要根据kubebuilder init和kubebuilder create api参数对应修改
// +groupName=app.example.com
package v1
在apis/app/v1下新建register.go,代码如下,无需修改
package v1import ("k8s.io/apimachinery/pkg/runtime/schema"
)// SchemeGroupVersion is group version used to register these objects.
var SchemeGroupVersion = GroupVersionfunc Resource(resource string) schema.GroupResource {return SchemeGroupVersion.WithResource(resource).GroupResource()
}
执行hack/update-codegen.sh可以得到clientset/lister/informer
./hack/update-codegen.sh
此时会得到example.com/gateway/generated/app目录,在目录下有clientset、liseters、informers
看到如下创建成功:
mv example.com/gateway/generated generated
将example.com/gateway/generated移到项目根目录即可
编写main.go
package mainimport ("flag""time""github.com/golang/glog""k8s.io/client-go/kubernetes""k8s.io/client-go/tools/clientcmd"// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"clientset "gateway/generated/app/clientset/versioned"informers "gateway/generated/app/informers/externalversions""gateway/signals"
)var (masterURL stringkubeconfig string
)func main() {flag.Parse()// 处理信号量stopCh := signals.SetupSignalHandler()// 处理入参cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)if err != nil {glog.Fatalf("Error building kubeconfig: %s", err.Error())}kubeClient, err := kubernetes.NewForConfig(cfg)if err != nil {glog.Fatalf("Error building kubernetes clientset: %s", err.Error())}studentClient, err := clientset.NewForConfig(cfg)if err != nil {glog.Fatalf("Error building example clientset: %s", err.Error())}studentInformerFactory := informers.NewSharedInformerFactory(studentClient, time.Second*30)//得到controllercontroller := NewController(kubeClient, studentClient,studentInformerFactory.App().V1().Gateways())//启动informergo studentInformerFactory.Start(stopCh)//controller开始处理消息if err = controller.Run(2, stopCh); err != nil {glog.Fatalf("Error running controller: %s", err.Error())}
}func init() {flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}
编写controller.go
package mainimport ("fmt""time""github.com/golang/glog"corev1 "k8s.io/api/core/v1""k8s.io/apimachinery/pkg/api/errors""k8s.io/apimachinery/pkg/util/runtime"utilruntime "k8s.io/apimachinery/pkg/util/runtime""k8s.io/apimachinery/pkg/util/wait""k8s.io/client-go/kubernetes""k8s.io/client-go/kubernetes/scheme"typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1""k8s.io/client-go/tools/cache""k8s.io/client-go/tools/record""k8s.io/client-go/util/workqueue"bolingcavalryv1 "gateway/api/app/v1"clientset "gateway/generated/app/clientset/versioned"studentscheme "gateway/generated/app/clientset/versioned/scheme"informers "gateway/generated/app/informers/externalversions/app/v1"listers "gateway/generated/app/listers/app/v1"
)const controllerAgentName = "student-controller"const (SuccessSynced = "Synced"MessageResourceSynced = "Student synced successfully"
)// Controller is the controller implementation for Student resources
type Controller struct {// kubeclientset is a standard kubernetes clientsetkubeclientset kubernetes.Interface// studentclientset is a clientset for our own API groupstudentclientset clientset.InterfacestudentsLister listers.GatewayListerstudentsSynced cache.InformerSyncedworkqueue workqueue.RateLimitingInterfacerecorder record.EventRecorder
}// NewController returns a new student controller
func NewController(kubeclientset kubernetes.Interface,studentclientset clientset.Interface,studentInformer informers.GatewayInformer) *Controller {utilruntime.Must(studentscheme.AddToScheme(scheme.Scheme))glog.V(4).Info("Creating event broadcaster")eventBroadcaster := record.NewBroadcaster()eventBroadcaster.StartLogging(glog.Infof)eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})controller := &Controller{kubeclientset: kubeclientset,studentclientset: studentclientset,studentsLister: studentInformer.Lister(),studentsSynced: studentInformer.Informer().HasSynced,workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Students"),recorder: recorder,}glog.Info("Setting up event handlers")// Set up an event handler for when Student resources changestudentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.enqueueStudent,UpdateFunc: func(old, new interface{}) {oldStudent := old.(*bolingcavalryv1.Gateway)newStudent := new.(*bolingcavalryv1.Gateway)if oldStudent.ResourceVersion == newStudent.ResourceVersion {//版本一致,就表示没有实际更新的操作,立即返回return}controller.enqueueStudent(new)},DeleteFunc: controller.enqueueStudentForDelete,})return controller
}// 在此处开始controller的业务
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {defer runtime.HandleCrash()defer c.workqueue.ShutDown()glog.Info("开始controller业务,开始一次缓存数据同步")if ok := cache.WaitForCacheSync(stopCh, c.studentsSynced); !ok {return fmt.Errorf("failed to wait for caches to sync")}glog.Info("worker启动")for i := 0; i < threadiness; i++ {go wait.Until(c.runWorker, time.Second, stopCh)}glog.Info("worker已经启动")<-stopChglog.Info("worker已经结束")return nil
}func (c *Controller) runWorker() {for c.processNextWorkItem() {}
}// 取数据处理
func (c *Controller) processNextWorkItem() bool {obj, shutdown := c.workqueue.Get()if shutdown {return false}// We wrap this block in a func so we can defer c.workqueue.Done.err := func(obj interface{}) error {defer c.workqueue.Done(obj)var key stringvar ok boolif key, ok = obj.(string); !ok {c.workqueue.Forget(obj)runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))return nil}// 在syncHandler中处理业务if err := c.syncHandler(key); err != nil {return fmt.Errorf("error syncing '%s': %s", key, err.Error())}c.workqueue.Forget(obj)glog.Infof("Successfully synced '%s'", key)return nil}(obj)if err != nil {runtime.HandleError(err)return true}return true
}// 处理
func (c *Controller) syncHandler(key string) error {// Convert the namespace/name string into a distinct namespace and namenamespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))return nil}// 从缓存中取对象student, err := c.studentsLister.Gateways(namespace).Get(name)if err != nil {// 如果Student对象被删除了,就会走到这里,所以应该在这里加入执行if errors.IsNotFound(err) {glog.Infof("Student对象被删除,请在这里执行实际的删除业务: %s/%s ...", namespace, name)return nil}runtime.HandleError(fmt.Errorf("failed to list student by: %s/%s", namespace, name))return err}glog.Infof("这里是student对象的期望状态: %#v ...", student)glog.Infof("实际状态是从业务层面得到的,此处应该去的实际状态,与期望状态做对比,并根据差异做出响应(新增或者删除)")c.recorder.Event(student, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)return nil
}// 数据先放入缓存,再入队列
func (c *Controller) enqueueStudent(obj interface{}) {var key stringvar err error// 将对象放入缓存if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {runtime.HandleError(err)return}// 将key放入队列c.workqueue.AddRateLimited(key)
}// 删除操作
func (c *Controller) enqueueStudentForDelete(obj interface{}) {var key stringvar err error// 从缓存中删除指定对象key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)if err != nil {runtime.HandleError(err)return}//再将key放入队列c.workqueue.AddRateLimited(key)
}
信号处理
package signalsimport ("os""os/signal"
)var onlyOneSignalHandler = make(chan struct{})func SetupSignalHandler() (stopCh <-chan struct{}) {close(onlyOneSignalHandler) // panics when called twicestop := make(chan struct{})c := make(chan os.Signal, 2)signal.Notify(c, shutdownSignals...)go func() {<-cclose(stop)<-cos.Exit(1) // second signal. Exit directly.}()return stop
}
//go:build !windows
// +build !windowspackage signalsimport ("os""syscall"
)var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
最后运行:
./gateway -kubeconfig=$HOME/.kube/config -alsologtostderr=true
kubeconfig是k8s集群的默认配置路径。