EndpointSubset
EndpointSubset 是一组具有公共端口集的地址,扩展的端点集是 Addresses (Pod IP 地址) 和 Ports (Service 名称和端口号) 的笛卡尔积。
下面是一个典型的 EndpointSubset 示例:
Name: "test",Subsets: [{Addresses: [{"ip": "10.10.1.1"},{"ip": "10.10.2.2"}],Ports: [{"name": "a","port": 8675},{"name": "b","port": 309}]}
]
将上面的 Subset 转换为对应的端点集合:
a: [ 10.10.1.1:8675, 10.10.2.2:8675 ]
b: [ 10.10.1.1:309, 10.10.2.2:309 ]
EndPointController
首先来看看 Endpoints
控制器对象,该对象是实现 Endpoints 功能的核心对象。
// Controller manages selector-based service endpoints.
type Controller struct {client clientset.InterfaceeventBroadcaster record.EventBroadcastereventRecorder record.EventRecorder// serviceLister is able to list/get services and is populated by the shared informer passed to// NewEndpointController.serviceLister corelisters.ServiceLister// servicesSynced returns true if the service shared informer has been synced at least once.// Added as a member to the struct to allow injection for testing.servicesSynced cache.InformerSynced// podLister is able to list/get pods and is populated by the shared informer passed to// NewEndpointController.podLister corelisters.PodLister// podsSynced returns true if the pod shared informer has been synced at least once.// Added as a member to the struct to allow injection for testing.podsSynced cache.InformerSynced// endpointsLister is able to list/get endpoints and is populated by the shared informer passed to// NewEndpointController.endpointsLister corelisters.EndpointsLister// endpointsSynced returns true if the endpoints shared informer has been synced at least once.// Added as a member to the struct to allow injection for testing.endpointsSynced cache.InformerSynced// Services that need to be updated. A channel is inappropriate here,// because it allows services with lots of pods to be serviced much// more often than services with few pods; it also would cause a// service that's inserted multiple times to be processed more than// necessary.queue workqueue.TypedRateLimitingInterface[string]// workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes.workerLoopPeriod time.Duration// triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime// annotation.triggerTimeTracker *endpointsliceutil.TriggerTimeTrackerendpointUpdatesBatchPeriod time.Duration
}
初始化
NewEndpointController
方法用于 EndPoint
控制器对象的初始化工作,并返回一个实例化对象,控制器对象同时订阅了 Service, Pod, EndPoint 三种资源的变更事件。
// NewEndpointController returns a new *Controller.
func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {broadcaster := record.NewBroadcaster(record.WithContext(ctx))recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})e := &Controller{client: client,queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[string](),workqueue.TypedRateLimitingQueueConfig[string]{Name: "endpoint",},),workerLoopPeriod: time.Second,}serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: e.onServiceUpdate,UpdateFunc: func(old, cur interface{}) {e.onServiceUpdate(cur)},DeleteFunc: e.onServiceDelete,})e.serviceLister = serviceInformer.Lister()e.servicesSynced = serviceInformer.Informer().HasSyncedpodInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: e.addPod,UpdateFunc: e.updatePod,DeleteFunc: e.deletePod,})e.podLister = podInformer.Lister()e.podsSynced = podInformer.Informer().HasSyncedendpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: e.onEndpointsDelete,})e.endpointsLister = endpointsInformer.Lister()e.endpointsSynced = endpointsInformer.Informer().HasSyncede.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()e.eventBroadcaster = broadcastere.eventRecorder = recordere.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriodreturn e
}
启动控制器
根据控制器的初始化方法 NewEndpointController
的调用链路,可以找到控制器开始启动和执行的地方。
// cmd/kube-controller-manager/app/core.gofunc startEndpointsController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {go endpointcontroller.NewEndpointController(ctx,controllerContext.InformerFactory.Core().V1().Pods(),controllerContext.InformerFactory.Core().V1().Services(),controllerContext.InformerFactory.Core().V1().Endpoints(),controllerContext.ClientBuilder.ClientOrDie("endpoint-controller"),controllerContext.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,).Run(ctx, int(controllerContext.ComponentConfig.EndpointController.ConcurrentEndpointSyncs))return nil, true, nil
}
具体逻辑方法
Controller.Run
方法执行具体的初始化逻辑。
// Run will not return until stopCh is closed. workers determines how many
// endpoints will be handled in parallel.
func (e *Controller) Run(ctx context.Context, workers int) {defer utilruntime.HandleCrash()// Start events processing pipeline.e.eventBroadcaster.StartStructuredLogging(3)e.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: e.client.CoreV1().Events("")})defer e.eventBroadcaster.Shutdown()defer e.queue.ShutDown()logger := klog.FromContext(ctx)logger.Info("Starting endpoint controller")defer logger.Info("Shutting down endpoint controller")if !cache.WaitForNamedCacheSync("endpoint", ctx.Done(), e.podsSynced, e.servicesSynced, e.endpointsSynced) {return}for i := 0; i < workers; i++ {go wait.UntilWithContext(ctx, e.worker, e.workerLoopPeriod)}go func() {defer utilruntime.HandleCrash()e.checkLeftoverEndpoints()}()<-ctx.Done()
}
e.worker
方法本质上就是一个无限循环轮询器,不断从队列中取出 EndPoint
对象,然后进行对应的操作。
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time.
func (e *Controller) worker(ctx context.Context) {for e.processNextWorkItem(ctx) {}
}func (e *Controller) processNextWorkItem(ctx context.Context) bool {eKey, quit := e.queue.Get()if quit {return false}defer e.queue.Done(eKey)logger := klog.FromContext(ctx)err := e.syncService(ctx, eKey)e.handleErr(logger, err, eKey)return true
}
syncService
Controller
的回调处理方法是 syncService
方法,该方法是 EndPoint
控制器操作的核心方法,通过方法的命名,可以知道 EndPoint 主要关注的对象是 Service。
func (e *Controller) syncService(ctx context.Context, key string) error {startTime := time.Now()logger := klog.FromContext(ctx)// 通过 key 解析出 Service 对象对应的 命名空间和名称namespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {return err}defer func() {logger.V(4).Info("Finished syncing service endpoints", "service", klog.KRef(namespace, name), "startTime", time.Since(startTime))}()// 获取 Service 对象service, err := e.serviceLister.Services(namespace).Get(name)if err != nil {if !errors.IsNotFound(err) {return err}// Delete the corresponding endpoint, as the service has been deleted.// TODO: Please note that this will delete an endpoint when a// service is deleted. However, if we're down at the time when// the service is deleted, we will miss that deletion, so this// doesn't completely solve the problem. See #6877.err = e.client.CoreV1().Endpoints(namespace).Delete(ctx, name, metav1.DeleteOptions{})if err != nil && !errors.IsNotFound(err) {return err}e.triggerTimeTracker.DeleteService(namespace, name)return nil}// Service 类型为 ExternalName// 直接返回if service.Spec.Type == v1.ServiceTypeExternalName {// services with Type ExternalName receive no endpoints from this controller;// Ref: https://issues.k8s.io/105986return nil}// Service 的标签选择器为 nil// 这种情况下关联不到 EndPoint 对象// 直接返回if service.Spec.Selector == nil {// services without a selector receive no endpoints from this controller;// these services will receive the endpoints that are created out-of-band via the REST API.return nil}logger.V(5).Info("About to update endpoints for service", "service", klog.KRef(namespace, name))// 获取 Service 的标签选择器关联的 Pod 列表pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())if err != nil {// Since we're getting stuff from a local cache, it is// basically impossible to get this error.return err}// We call ComputeEndpointLastChangeTriggerTime here to make sure that the// state of the trigger time tracker gets updated even if the sync turns out// to be no-op and we don't update the endpoints object.endpointsLastChangeTriggerTime := e.triggerTimeTracker.ComputeEndpointLastChangeTriggerTime(namespace, service, pods)// 初始化端点集合对象subsets := []v1.EndpointSubset{}// 初始化已就绪的 EndPoint 对象计数var totalReadyEps int// 初始化未就绪的 EndPoint 对象计数var totalNotReadyEps int
// 遍历 Pod 列表
for _, pod := range pods {// ShouldPodBeInEndpoints :// pod 处于终止状态(phase == v1.PodFailed || phase == v1.PodSucceeded)// pod IP 还未分配// pod 正在被删除但是 includeTerminating 为 trueif !endpointsliceutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {logger.V(5).Info("Pod is not included on endpoints for Service", "pod", klog.KObj(pod), "service", klog.KObj(service))continue}// 实例化一个 EndpointAddress 对象ep, err := podToEndpointAddressForService(service, pod)if err != nil {// this will happen, if the cluster runs with some nodes configured as dual stack and some as not// such as the case of an upgrade..logger.V(2).Info("Failed to find endpoint for service with ClusterIP on pod with error", "service", klog.KObj(service), "clusterIP", service.Spec.ClusterIP, "pod", klog.KObj(pod), "error", err)continue}epa := *epif endpointsliceutil.ShouldSetHostname(pod, service) {epa.Hostname = pod.Spec.Hostname}// Allow headless service not to have ports.if len(service.Spec.Ports) == 0 {if service.Spec.ClusterIP == api.ClusterIPNone {// 构建一个新的对象添加到 subset中,这里 ports 为空数组subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(logger, subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)// No need to repack subsets for headless service without ports.}} else {for i := range service.Spec.Ports {servicePort := &service.Spec.Ports[i]portNum, err := podutil.FindPort(pod, servicePort)if err != nil {logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "error", err)continue}// 根据 Service 端口对象 + 端口号构建一个对象epp := endpointPortFromServicePort(servicePort, portNum)var readyEps, notReadyEps int// 将构建好的对象追加到端点集合里subsets, readyEps, notReadyEps = addEndpointSubset(logger, subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)// 累加已就绪的 EndPoint 对象计数totalReadyEps = totalReadyEps + readyEps// 累加未就绪的 EndPoint 对象计数totalNotReadyEps = totalNotReadyEps + notReadyEps}}}// 计算并确定最后的 EndPoint 对象集合 (新的 EndPoint Set)subsets = endpoints.RepackSubsets(subsets)// 通过 informer 获取 Service 对象对应的 EndPoint Set// 也就是当前的 EndPoint Set (旧的 EndPoint Set)// See if there's actually an update here.currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)if err != nil {if !errors.IsNotFound(err) {return err}currentEndpoints = &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name,Labels: service.Labels,},}}// 如果 Service 的资源版本号未设置,就需要创建新的 EndPoints createEndpoints := len(currentEndpoints.ResourceVersion) == 0// Compare the sorted subsets and labels// Remove the HeadlessService label from the endpoints if it exists,// as this won't be set on the service itself// and will cause a false negative in this diff check.// But first check if it has that label to avoid expensive copies.compareLabels := currentEndpoints.Labelsif _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)}// When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints// updates caused by Pod updates that we don't care, e.g. annotation update.// 对新的和旧的 EndPoint Set进行排序 + 比较操作// 如果新的 Set 和旧的 Set 比较之后,没有任何差异// 并且 Service 的版本号也不需要创建// 直接返回就可以了if !createEndpoints &&endpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service))return nil}// 深度拷贝当前的 EndPoint Set// 重新设置相关的 (最新) 属性newEndpoints := currentEndpoints.DeepCopy()newEndpoints.Subsets = subsetsnewEndpoints.Labels = service.Labelsif newEndpoints.Annotations == nil {newEndpoints.Annotations = make(map[string]string)}if !endpointsLastChangeTriggerTime.IsZero() {newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] =endpointsLastChangeTriggerTime.UTC().Format(time.RFC3339Nano)} else { // No new trigger time, clear the annotation.delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)}if truncateEndpoints(newEndpoints) {newEndpoints.Annotations[v1.EndpointsOverCapacity] = truncated} else {delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)}if newEndpoints.Labels == nil {newEndpoints.Labels = make(map[string]string)}if !helper.IsServiceIPSet(service) {newEndpoints.Labels = utillabels.CloneAndAddLabel(newEndpoints.Labels, v1.IsHeadlessService, "")} else {newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)}logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)if createEndpoints {// No previous endpoints, create them// 创建新的 EndPoints_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})} else {// Pre-existing// 更新已有 EndPoints_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})}if err != nil {if createEndpoints && errors.IsForbidden(err) {// A request is forbidden primarily for two reasons:// 1. namespace is terminating, endpoint creation is not allowed by default.// 2. policy is misconfigured, in which case no service would function anywhere.// Given the frequency of 1, we log at a lower level.logger.V(5).Info("Forbidden from creating endpoints", "error", err)// If the namespace is terminating, creates will continue to fail. Simply drop the item.if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {return nil}}if createEndpoints {e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToCreateEndpoint", "Failed to create endpoint for service %v/%v: %v", service.Namespace, service.Name, err)} else {e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToUpdateEndpoint", "Failed to update endpoint %v/%v: %v", service.Namespace, service.Name, err)}return err}return nil
}
通过 Controller.syncService
方法的源代码,我们可以看到: EndPoint
对象每次同步时,都会执行如下的操作:
- 根据参数 key 获取指定的 Service 对象
- 获取 Service 对象的标签选择器关联的 Pod 列表
- 通过 Service 和 Pod 列表计算出最新的 EndPoint 对象 (新) 集合
- 通过 informer 获取 Service 对象对应的 EndPoint 对象 (旧) 集合
- 如果新集合与旧集合对比,没有任何差异,说明不需要更新,直接退出方法即可
- 根据 Service 资源版本号确定 EndPoints 对象的操作 (创建或更新) 并执行
想要原文可以加作者v:mkjnnm