如何使用 client-go 监听自定义资源
在 Kubernetes 中使用 client-go
监听自定义资源(Custom Resource,简称 CR)需要借助 Dynamic Client 或 Custom Informer,因为 client-go
的标准 Clientset
只支持内置资源(如 Pod、Deployment)。自定义资源由 CustomResourceDefinition(CRD)或 Operator 定义,监听它们需要动态处理其 Group、Version 和 Resource(GVR)。以下是详细步骤和实现方法。
前提条件
- CRD 已部署:确保你的自定义资源定义(CRD)已在集群中注册。
- 示例:
myresource.example.com/v1
,Kind 为MyResource
。
- 示例:
- 依赖:
client-go
(推荐与集群版本匹配,例如v0.28.0
对应 Kubernetes 1.28)。- 添加依赖:
go get k8s.io/client-go@v0.28.0
- 权限:确保 ServiceAccount 有权访问 CRD(通过 RBAC 配置)。
方法 1:使用 Dynamic Client 和 Informer
Dynamic Client
是 client-go
提供的通用客户端,支持任意资源类型。结合 SharedInformer
,可以监听自定义资源。
步骤
-
初始化 Dynamic Client:
package mainimport ("context""log""time""k8s.io/apimachinery/pkg/apis/meta/v1/unstructured""k8s.io/apimachinery/pkg/runtime/schema""k8s.io/client-go/dynamic""k8s.io/client-go/rest""k8s.io/client-go/tools/cache""k8s.io/client-go/tools/clientcmd" )func getDynamicClient() dynamic.Interface {config, err := rest.InClusterConfig()if err != nil {config, err = clientcmd.BuildConfigFromFlags("", "/path/to/kubeconfig")if err != nil {log.Fatalf("Failed to create config: %v", err)}}client, err := dynamic.NewForConfig(config)if err != nil {log.Fatalf("Failed to create dynamic client: %v", err)}return client }
-
定义 GVR:
- 指定自定义资源的 Group、Version 和 Resource。
- 示例:
myresource.example.com/v1
,资源名为myresources
。gvr := schema.GroupVersionResource{Group: "example.com",Version: "v1",Resource: "myresources", }
-
创建 Dynamic Informer:
func main() {client := getDynamicClient()// 创建 Dynamic Informer Factoryfactory := dynamicinformer.NewDynamicSharedInformerFactory(client, time.Minute*30)informer := factory.ForResource(gvr).Informer()// 添加事件处理函数informer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {unstructuredObj := obj.(*unstructured.Unstructured)name := unstructuredObj.GetName()namespace := unstructuredObj.GetNamespace()log.Printf("CR Added: %s/%s", namespace, name)},UpdateFunc: func(oldObj, newObj interface{}) {oldUnstructured := oldObj.(*unstructured.Unstructured)newUnstructured := newObj.(*unstructured.Unstructured)log.Printf("CR Updated: %s/%s", newUnstructured.GetNamespace(), newUnstructured.GetName())},DeleteFunc: func(obj interface{}) {unstructuredObj := obj.(*unstructured.Unstructured)log.Printf("CR Deleted: %s/%s", unstructuredObj.GetNamespace(), unstructuredObj.GetName())},})// 启动 Informerctx, cancel := context.WithCancel(context.Background())defer cancel()factory.Start(ctx.Done())factory.WaitForCacheSync(ctx.Done())// 保持运行<-ctx.Done() }
-
获取缓存数据:
lister := factory.ForResource(gvr).Lister() items, err := lister.List(labels.Everything()) if err != nil {log.Printf("Failed to list CRs: %v", err) } else {for _, item := range items {unstructuredObj := item.(*unstructured.Unstructured)log.Printf("Current CR: %s/%s", unstructuredObj.GetNamespace(), unstructuredObj.GetName())} }
说明
- GVR:通过
kubectl api-resources
查看自定义资源的准确 GVR。 - Unstructured:自定义资源以
unstructured.Unstructured
类型返回,需手动解析字段。 - 依赖:需要导入
k8s.io/client-go/dynamic/informer
。
方法 2:生成类型化客户端和 Informer(推荐生产环境)
如果你的 CRD 有明确的 Go 类型(通过代码生成器生成),可以使用类型化的客户端和 Informer。这种方法需要更多前期工作,但更安全和直观。
步骤
-
生成代码:
- 使用
controller-tools
或k8s.io/code-generator
生成 CRD 的客户端代码。 - 示例 CRD 文件(
myresource_v1.yaml
):apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata:name: myresources.example.com spec:group: example.comnames:kind: MyResourceplural: myresourcesscope: Namespacedversions:- name: v1served: truestorage: trueschema:openAPIV3Schema:type: objectproperties:spec:type: objectproperties:replicas:type: integer
- 生成命令:
mkdir -p pkg/apis/example.com/v1 controller-gen crd paths=./pkg/apis/example.com/v1 output:crd:dir=./manifests controller-gen object paths=./pkg/apis/example.com/v1 k8s.io/code-generator/generate-groups.sh all ./pkg/client ./pkg/apis example.com:v1
- 使用
-
注册类型:
- 在
pkg/apis/example.com/v1/types.go
中定义类型:package v1import (metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" )type MyResourceSpec struct {Replicas int32 `json:"replicas"` }type MyResource struct {metav1.TypeMeta `json:",inline"`metav1.ObjectMeta `json:"metadata,omitempty"`Spec MyResourceSpec `json:"spec,omitempty"` }type MyResourceList struct {metav1.TypeMeta `json:",inline"`metav1.ListMeta `json:"metadata,omitempty"`Items []MyResource `json:"items"` }
- 在
-
创建 Informer:
package mainimport ("context""log""time""k8s.io/client-go/tools/cache"examplev1 "your/module/pkg/apis/example.com/v1"exampleclientset "your/module/pkg/client/clientset/versioned"exampleinformers "your/module/pkg/client/informers/externalversions" )func main() {config, err := rest.InClusterConfig()if err != nil {log.Fatalf("Failed to create config: %v", err)}client, err := exampleclientset.NewForConfig(config)if err != nil {log.Fatalf("Failed to create clientset: %v", err)}factory := exampleinformers.NewSharedInformerFactory(client, time.Minute*30)informer := factory.Example().V1().MyResources().Informer()informer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {myResource := obj.(*examplev1.MyResource)log.Printf("MyResource Added: %s/%s, Replicas: %d", myResource.Namespace, myResource.Name, myResource.Spec.Replicas)},UpdateFunc: func(oldObj, newObj interface{}) {newResource := newObj.(*examplev1.MyResource)log.Printf("MyResource Updated: %s/%s", newResource.Namespace, newResource.Name)},DeleteFunc: func(obj interface{}) {myResource := obj.(*examplev1.MyResource)log.Printf("MyResource Deleted: %s/%s", myResource.Namespace, myResource.Name)},})ctx, cancel := context.WithCancel(context.Background())defer cancel()factory.Start(ctx.Done())factory.WaitForCacheSync(ctx.Done())<-ctx.Done() }
说明
- 类型安全:使用生成的类型(如
*examplev1.MyResource
),避免手动解析。 - 依赖:需要自定义的客户端包(
pkg/client
)。 - 复杂度:前期生成代码较繁琐,但长期维护更方便。
注意事项
- CRD 注册:
- 确保 CRD 已应用(
kubectl apply -f myresource_v1.yaml
)。 - 检查:
kubectl get crd myresources.example.com
- 确保 CRD 已应用(
- 权限:
- 为 ServiceAccount 配置 RBAC:
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata:name: myresource-reader rules: - apiGroups: ["example.com"]resources: ["myresources"]verbs: ["get", "list", "watch"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata:name: myresource-reader-binding subjects: - kind: ServiceAccountname: defaultnamespace: default roleRef:kind: ClusterRolename: myresource-readerapiGroup: rbac.authorization.k8s.io
- 为 ServiceAccount 配置 RBAC:
- 性能:
- 使用
WithNamespace
或标签过滤减少监听范围。
- 使用
- 错误处理:
- Watch 失败时,Informer 会自动重试,需关注日志。
验证
-
创建自定义资源:
apiVersion: example.com/v1 kind: MyResource metadata:name: test-resourcenamespace: default spec:replicas: 3
kubectl apply -f test-resource.yaml
-
运行程序,观察日志输出:
MyResource Added: default/test-resource, Replicas: 3
总结
- Dynamic Client:
- 适合快速实现,无需生成代码。
- 使用
unstructured.Unstructured
处理数据。
- 类型化客户端:
- 适合生产环境,类型安全,需生成代码。
- 使用特定类型(如
*MyResource
)操作。
- 选择建议:
- 测试或简单场景:Dynamic Client。
- 长期项目或 Operator:类型化客户端。
如果你有具体的 CRD 定义或需求(例如监听特定字段),可以告诉我,我会进一步定制代码!
---