kubelet源码学习(一):kubelet工作原理、kubelet启动过程

本文基于Kubernetes v1.22.4版本进行源码学习

1、kubelet工作原理

1)、kubelet核心工作

kubelet的工作核心就是一个控制循环,即:SyncLoop(图中的大圆圈)。而驱动这个控制循环运行的事件,包括:Pod更新事件、Pod生命周期变化、kubelet本身设置的执行周期、定时的清理事件

kubelet还负责维护着很多其他的子控制循环(也就是图中的小圆圈),叫做xxxManager,比如:probeManager会定时去监控Pod中容器的健康状况,当前支持两种类型的探针:livenessProbe和readinessProbe;statusManager负责维护状态信息,并把Pod状态更新到APIServer;containerRefManager是容器引用的管理,用来报告容器的创建、失败等事件

2)、CRI与容器运行时

kubelet调用下层容器运行时的执行过程,并不会直接调用Docker的API,而是通过一组叫作CRI(Container Runtime Interface,容器运行时接口)的gRPC接口来间接执行的

CRI shim负责响应CRI请求,扮演kubelet与容器项目之间的垫片(shim)。CRI shim实现了CRI规定的每个接口,然后把具体的CRI请求翻译成对后端容器项目的请求或者操作

每一种容器项目都可以自己实现一个CRI shim,自行对CRI请求进行处理,这样,Kubernetes就有了一个统一的容器抽象层,使得下层容器运行时可以自由地对接进入Kubernetes当中

如果使用Docker的话,dockershim负责处理CRI请求,然后组装成Docker API请求发给Docker Daemon

CRI接口可以分为两组:

  • RuntimeService:主要是跟容器相关的操作,比如创建、启动、删除容器,执行exec命令等
  • ImageManagerService:主要是容器镜像相关的操作,比如拉取镜像、删除镜像等

CRI接口核心方法如下图:

CRD设计的一个重要原则,就是确保这个接口本身,只关注容器,不关注Pod,在CRI的设计里并没有一个直接创建Pod或者启动Pod的接口

PodSandboxManager中包含RunPodSandbox方法,这个PodSandbox对应的并不是Kubernetes里的Pod API对象,而只是抽取了Pod里的一部分与容器运行时相关的字段,比如HostName、DnsConfig、CgroupParent等。所以说,PodSandbox描述的其实是Kubernetes将Pod这个概念映射到容器运行时层面所需要的字段,或者说是一个Pod对象子集

比如,当执行kubectl run创建了一个名叫foo的、包括了A、B两个容器的Pod之后。如果是Docker项目,dockershim就会创建出一个名叫foo的Infra容器(pause容器)用来hold住整个Pod的Network Namespace

2、kubelet启动过程

pkg/kubelet/kubelet.goRun()方法启动了kubelet各个模块,代码如下:

// pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {if kl.logServer == nil {kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))}if kl.kubeClient == nil {klog.InfoS("No API server defined - no node status update will be sent")}// Start the cloud provider sync managerif kl.cloudResourceSyncManager != nil {go kl.cloudResourceSyncManager.Run(wait.NeverStop)}// 启动不依赖container runtime的一些模块if err := kl.initializeModules(); err != nil {kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())klog.ErrorS(err, "Failed to initialize internal modules")os.Exit(1)}// Start volume manager// 启动volume managergo kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)if kl.kubeClient != nil {// Start syncing node status immediately, this may set up things the runtime needs to run.// 定时同步node状态go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)// 更新pod CIDR、runtime状态以及执行首次node状态同步go kl.fastStatusUpdateOnce()// start syncing lease// 启动node lease机制go kl.nodeLeaseController.Run(wait.NeverStop)}// 定时更新runtime状态go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)// Set up iptables util rulesif kl.makeIPTablesUtilChains {kl.initNetworkUtil()}// Start component sync loops.// 启动statusManagerkl.statusManager.Start()// Start syncing RuntimeClasses if enabled.// 启动runtimeClassManagerif kl.runtimeClassManager != nil {kl.runtimeClassManager.Start(wait.NeverStop)}// Start the pod lifecycle event generator.// 启动pleg,该模块主要用于周期性地向container runtime刷新当前所有容器的状态kl.pleg.Start()// 启动kubelet事件循环,不停监听外部数据的变化执行pod的相应操作kl.syncLoop(updates, kl)
}

Run()方法主要逻辑如下:

  1. 调用kl.initializeModules()方法,启动不依赖container runtime的一些模块
  2. 启动volume manager
  3. 定时同步node状态
  4. 调用kl.fastStatusUpdateOnce()方法,更新pod CIDR、runtime状态以及执行首次node状态同步
  5. 启动node lease机制,同步节点租约
  6. 定时执行kl.updateRuntimeUp()方法,更新runtime状态
  7. 启动statusManager、runtimeClassManager
  8. 调用kl.pleg.Start()方法,启动pleg,该模块主要用于周期性地向container runtime刷新当前所有容器的状态
  9. 调用kl.syncLoop()方法,启动kubelet事件循环,不停监听外部数据的变化执行pod的相应操作
1)、initializeModules()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) initializeModules() error {// Prometheus metrics.metrics.Register(collectors.NewVolumeStatsCollector(kl),collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),)metrics.SetNodeName(kl.nodeName)servermetrics.Register()// Setup filesystem directories.if err := kl.setupDataDirs(); err != nil {return err}// If the container logs directory does not exist, create it.if _, err := os.Stat(ContainerLogsDir); err != nil {if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {return fmt.Errorf("failed to create directory %q: %v", ContainerLogsDir, err)}}// Start the image manager.// 启动imageManagerkl.imageManager.Start()// Start the certificate manager if it was enabled.// 启动certificateManager,证书相关if kl.serverCertificateManager != nil {kl.serverCertificateManager.Start()}// Start out of memory watcher.// 启动oomWatcherif kl.oomWatcher != nil {if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {return fmt.Errorf("failed to start OOM watcher: %w", err)}}// Start resource analyzer// 启动resource analyzer,刷新volume stats到缓存中kl.resourceAnalyzer.Start()return nil
}

initializeModules()方法主要逻辑如下:

  1. 启动imageManager,实际上是realImageGCManager
  2. 启动certificateManager,证书相关
  3. 启动oomWatcher
  4. 启动resource analyzer,刷新volume stats到缓存中

kl.imageManager.Start()方法代码如下:

// pkg/kubelet/images/image_gc_manager.go
func (im *realImageGCManager) Start() {go wait.Until(func() {// Initial detection make detected time "unknown" in the past.var ts time.Timeif im.initialized {ts = time.Now()}// 找出所有的image,并删除不再使用的image_, err := im.detectImages(ts)if err != nil {klog.InfoS("Failed to monitor images", "err", err)} else {im.initialized = true}}, 5*time.Minute, wait.NeverStop)// Start a goroutine periodically updates image cache.// 更新image的缓存go wait.Until(func() {// 调用CRI接口,获取最新的imageimages, err := im.runtime.ListImages()if err != nil {klog.InfoS("Failed to update image list", "err", err)} else {im.imageCache.set(images)}}, 30*time.Second, wait.NeverStop)}

realImageGCManager的Start()方法启动两个协程

  1. 定时调用detectImages()方法,会找出所有正在使用的image,然后删除不再使用的image
  2. 定时获取最新的image,调用imageCache()方法更新image的缓存
2)、fastStatusUpdateOnce()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) fastStatusUpdateOnce() {for {time.Sleep(100 * time.Millisecond)node, err := kl.GetNode()if err != nil {klog.ErrorS(err, "Error getting node")continue}if len(node.Spec.PodCIDRs) != 0 {podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")if _, err := kl.updatePodCIDR(podCIDRs); err != nil {klog.ErrorS(err, "Pod CIDR update failed", "CIDR", podCIDRs)continue}// 更新runtime状态kl.updateRuntimeUp()// node状态同步kl.syncNodeStatus()return}}
}

fastStatusUpdateOnce()方法启动一个循环,尝试立即更新pod CIDR。更新pod CIDR后,会更新runtime状态并同步node状态。该方法在一次成功的node状态同步后直接返回,仅在kubelet启动期间执行

kl.updateRuntimeUp()方法代码如下:

// pkg/kubelet/kubelet.go
// 首次执行的时候会初始化runtime依赖模块,然后更新runtimeState
func (kl *Kubelet) updateRuntimeUp() {kl.updateRuntimeMux.Lock()defer kl.updateRuntimeMux.Unlock()// 获取containerRuntime状态s, err := kl.containerRuntime.Status()if err != nil {klog.ErrorS(err, "Container runtime sanity check failed")return}if s == nil {klog.ErrorS(nil, "Container runtime status is nil")return}// Periodically log the whole runtime status for debugging.klog.V(4).InfoS("Container runtime status", "status", s)networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)// 检查network和runtime是否处于ready状态if networkReady == nil || !networkReady.Status {klog.ErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))} else {// Set nil if the container runtime network is ready.kl.runtimeState.setNetworkState(nil)}// information in RuntimeReady condition will be propagated to NodeReady condition.// 获取运行时状态runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)// If RuntimeReady is not set or is false, report an error.if runtimeReady == nil || !runtimeReady.Status {klog.ErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady))return}kl.runtimeState.setRuntimeState(nil)// 调用kl.initializeRuntimeDependentModules初始化依赖模块kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)kl.runtimeState.setRuntimeSync(kl.clock.Now())
}

updateRuntimeUp()方法会获取containerRuntime状态信息,然后根据返回containerRuntime状态检查网络、runtime是不是已经处于ready状态;接着调用kl.initializeRuntimeDependentModules()方法初始化依赖模块,这里会启动cadvisor、containerManager、evictionManager、containerLogManager、pluginManager、shutdownManager;最后设置runtime同步时间

3)、syncLoop()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {klog.InfoS("Starting kubelet main sync loop")// The syncTicker wakes up kubelet to checks if there are any pod workers// that need to be sync'd. A one-second period is sufficient because the// sync interval is defaulted to 10s.syncTicker := time.NewTicker(time.Second)defer syncTicker.Stop()housekeepingTicker := time.NewTicker(housekeepingPeriod)defer housekeepingTicker.Stop()plegCh := kl.pleg.Watch()const (base   = 100 * time.Millisecondmax    = 5 * time.Secondfactor = 2)duration := base// Responsible for checking limits in resolv.conf// The limits do not have anything to do with individual pods// Since this is called in syncLoop, we don't need to call it anywhere elseif kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {kl.dnsConfigurer.CheckLimitsForResolvConf()}for {if err := kl.runtimeState.runtimeErrors(); err != nil {klog.ErrorS(err, "Skipping pod synchronization")// exponential backofftime.Sleep(duration)duration = time.Duration(math.Min(float64(max), factor*float64(duration)))continue}// reset backoff if we have a successduration = basekl.syncLoopMonitor.Store(kl.clock.Now())// 调用kl.syncLoopIteration方法if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break}kl.syncLoopMonitor.Store(kl.clock.Now())}
}

syncLoop()方法在一个循环中不断的调用syncLoopIteration()方法

关于syncLoopIteration()方法中涉及的channel后面会详细介绍,这里只关注syncLoopIteration()方法中的处理逻辑

1)configCh

// pkg/kubelet/kubelet.go
// 该方法会监听多个channel,当发现任何一个channel有数据就交给handler去处理,在handler中通过调用dispatchWork分发任务
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {case u, open := <-configCh:// 该模块将同时watch 3个不同来源的pod信息的变化(file,http,apiServer)// 一旦某个来源的pod信息发生了变化(创建/更新/删除),这个channel中就会出现被更新的pod信息和更新的具体操作// Update from a config source; dispatch it to the right handler// callback.if !open {klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")return false}switch u.Op {case kubetypes.ADD:klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))// After restarting, kubelet will get all existing pods through// ADD as if they are new pods. These pods will then go through the// admission process and *may* be rejected. This can be resolved// once we have checkpointing.handler.HandlePodAdditions(u.Pods)case kubetypes.UPDATE:klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", format.Pods(u.Pods))handler.HandlePodUpdates(u.Pods)case kubetypes.REMOVE:klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", format.Pods(u.Pods))handler.HandlePodRemoves(u.Pods)case kubetypes.RECONCILE:klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", format.Pods(u.Pods))handler.HandlePodReconcile(u.Pods)case kubetypes.DELETE:klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", format.Pods(u.Pods))// DELETE is treated as a UPDATE because of graceful deletion.handler.HandlePodUpdates(u.Pods)case kubetypes.SET:// TODO: Do we want to support this?klog.ErrorS(nil, "Kubelet does not support snapshot update")default:klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)}kl.sourcesReady.AddSource(u.Source)...}return true
}    

configCh是读取配置事件的管道,该模块将同时watch 3个不同来源的Pod信息的变化(file、http、APIServer),一旦某个来源的Pod信息发生了变化(创建/更新/删除),这个channel中就会出现被更新的Pod信息和更新的具体操作

2)plegCh

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {...case e := <-plegCh:if e.Type == pleg.ContainerStarted {// record the most recent time we observed a container start for this pod.// this lets us selectively invalidate the runtimeCache when processing a delete for this pod// to make sure we don't miss handling graceful termination for containers we reported as having started.kl.lastContainerStartedTime.Add(e.ID, time.Now())}if isSyncPodWorthy(e) {// PLEG event for a pod; sync it.if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)handler.HandlePodSyncs([]*v1.Pod{pod})} else {// If the pod no longer exists, ignore the event.klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)}}if e.Type == pleg.ContainerDied {if containerID, ok := e.Data.(string); ok {kl.cleanUpContainersInPod(e.ID, containerID)}}...}return true
}        

kl.pleg.Start()的时候会每秒钟调用一次relist,根据最新的PodStatus生成PodLiftCycleEvent,然后存入到plegCh中

syncLoop()方法中调用kl.pleg.Watch()获取plegCh,然后传给syncLoopIteration()方法,syncLoopIteration()方法中消费plegCh中的数据,在handler中通过调用dispatchWork分发任务

3)syncCh

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {...case <-syncCh:// Sync pods waiting for syncpodsToSync := kl.getPodsToSync()if len(podsToSync) == 0 {break}klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", format.Pods(podsToSync))// 同步最新保存的pod状态handler.HandlePodSyncs(podsToSync)...}return true
}        

syncCh是由syncLoop()方法里面创建的一个定时任务,每秒钟会向syncCh添加一个数据,这个方法会同步所有等待同步的Pod

4)kl.livenessManager.Updates()kl.readinessManager.Updates()kl.startupManager.Updates()

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {...case update := <-kl.livenessManager.Updates():// 如果探针健康检查失败,需要更新pod的状态if update.Result == proberesults.Failure {handleProbeSync(kl, update, handler, "liveness", "unhealthy")}case update := <-kl.readinessManager.Updates():// 当readiness状态变更时,更新容器和pod的状态ready := update.Result == proberesults.Successkl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)status := ""if ready {status = "ready"}handleProbeSync(kl, update, handler, "readiness", status)case update := <-kl.startupManager.Updates():// 当startup状态变更时,更新容器和pod的状态started := update.Result == proberesults.Successkl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)status := "unhealthy"if started {status = "started"}handleProbeSync(kl, update, handler, "startup", status)...}return true
}       

6)housekeepingCh

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {...case <-housekeepingCh:if !kl.sourcesReady.AllReady() {// If the sources aren't ready or volume manager has not yet synced the states,// skip housekeeping, as we may accidentally delete pods from unready sources.klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")} else {start := time.Now()klog.V(4).InfoS("SyncLoop (housekeeping)")// 执行一些清理工作,包括终止pod workers、删除不想要的pod,移除volumes、pod目录if err := handler.HandlePodCleanups(); err != nil {klog.ErrorS(err, "Failed cleaning pods")}duration := time.Since(start)if duration > housekeepingWarningDuration {klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())}klog.V(4).InfoS("SyncLoop (housekeeping) end")}}return true
}

housekeepingCh也是由由syncLoop()方法创建的,每两秒钟会触发清理,包括:终止Pod Workers、删除不想要的Pod,移除Volumes、Pod目录等

syncLoopIteration()方法监听如下的channel,根据事件做不同的处理

  • configCh:监听file、HTTP、APIServer的时间更新
  • plegCh:pleg子模块每秒钟调用一次relist,根据最新的PodStatus生成podLiftCycleEvent,然后存入到plegCh中
  • syncCh:定时器管道, 每隔一秒去同步最新保存的Pod状态
  • kl.livenessManager.Updates():如果探针检查失败,需要更新Pod的状态
  • kl.readinessManager.Updates():当readiness状态变更时,更新容器和Pod的状态
  • kl.startupManager.Updates():当startup状态变更时,更新容器和Pod的状态
  • housekeepingCh:每两秒钟会触发Pod清理工作
4)、小结

kubelet启动过程如下图

3、syncLoopIteration()方法中涉及的channel

1)、configCh

configCh相关的代码调用流程如上图,关于syncLoopIteration()方法中configCh的处理逻辑前面已经讲过了,这里来看下kubelet是如何监听APIServer并将Pod信息变化写入configCh的

// pkg/kubelet/kubelet.go
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {manifestURLHeader := make(http.Header)if len(kubeCfg.StaticPodURLHeader) > 0 {for k, v := range kubeCfg.StaticPodURLHeader {for i := range v {manifestURLHeader.Add(k, v[i])}}}// source of all configuration// 初始化config.PodConfigcfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)// 添加三种数据来源,分别是file、http、apiServer// define file config sourceif kubeCfg.StaticPodPath != "" {klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))}// define url config sourceif kubeCfg.StaticPodURL != "" {klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))}if kubeDeps.KubeClient != nil {klog.InfoS("Adding apiserver pod source")config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))}return cfg, nil
}

makePodSourceConfig()方法中先初始化config.PodConfig,然后添加三种数据来源:分别是file、http、APIServer,调用cfg.Channel()方法会创建对应的channel

1)NewSourceApiserver()

这里先来看监听APIServer的部分,NewSourceApiserver()方法代码如下:

// pkg/kubelet/config/apiserver.go
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {// 创建ListWatch,监听当前node的pod变化lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))// The Reflector responsible for watching pods at the apiserver should be run only after// the node sync with the apiserver has completed.klog.InfoS("Waiting for node sync before watching apiserver pods")go func() {for {if nodeHasSynced() {klog.V(4).InfoS("node sync completed")break}time.Sleep(WaitForAPIServerSyncPeriod)klog.V(4).InfoS("node sync has not completed yet")}klog.InfoS("Watching apiserver")// 如果node sync完成,调用newSourceApiserverFromLW方法newSourceApiserverFromLW(lw, updates)}()
}func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {send := func(objs []interface{}) {var pods []*v1.Podfor _, o := range objs {pods = append(pods, o.(*v1.Pod))}// 监听到apiServer当前node的pod信息变化后写入channel,后续listen()方法会监听这个channel接收值updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}}// 调用client-go API来创建reflectorr := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)go r.Run(wait.NeverStop)
}

newSourceApiserverFromLW()方法中调用client-go API来创建Reflector,当监听到APIServer中当前Node的Pod信息变化后写入channel

2)cfg.Channel()

makePodSourceConfig()方法中调用cfg.Channel()方法会创建对应的channel

// pkg/kubelet/config/config.go
// 给每个来源注册一个专用的channel
func (c *PodConfig) Channel(source string) chan<- interface{} {c.sourcesLock.Lock()defer c.sourcesLock.Unlock()c.sources.Insert(source)// 调用c.mux.Channel方法return c.mux.Channel(source)
}
// pkg/util/config/config.go
func (m *Mux) Channel(source string) chan interface{} {if len(source) == 0 {panic("Channel given an empty name")}m.sourceLock.Lock()defer m.sourceLock.Unlock()channel, exists := m.sources[source]if exists {return channel}newChannel := make(chan interface{})m.sources[source] = newChannel// 同时启动goroutine去监听新数据// 这里创建的channel最终会传入newSourceApiserverFromLW中定义的send函数中,当监听到apiServer当前node的pod数据变化后会写入channel// listen函数会一直监听这个channel来接收数据go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)return newChannel
}func (m *Mux) listen(source string, listenChannel <-chan interface{}) {for update := range listenChannel {// 调用Merge方法m.merger.Merge(source, update)}
}

Channel()方法中创建的channel最终会传入newSourceApiserverFromLW()方法中定义的send()函数中,当监听到APIServer当前Node的Pod信息数据变化后会写入channel,这里的listen()方法会一直监听这个channel来接收数据,listen()方法调用Merge()方法处理接收到的数据

// pkg/kubelet/config/config.go
func (s *podStorage) Merge(source string, change interface{}) error {s.updateLock.Lock()defer s.updateLock.Unlock()seenBefore := s.sourcesSeen.Has(source)// 区分pod变更类型adds, updates, deletes, removes, reconciles := s.merge(source, change)firstSet := !seenBefore && s.sourcesSeen.Has(source)// deliver update notificationsswitch s.mode {case PodConfigNotificationIncremental:// 最终将pod变更信息传入configChif len(removes.Pods) > 0 {s.updates <- *removes}if len(adds.Pods) > 0 {s.updates <- *adds}if len(updates.Pods) > 0 {s.updates <- *updates}if len(deletes.Pods) > 0 {s.updates <- *deletes}if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {// Send an empty update when first seeing the source and there are// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that// the source is ready.s.updates <- *adds}// Only add reconcile support here, because kubelet doesn't support Snapshot update now.if len(reconciles.Pods) > 0 {s.updates <- *reconciles}...}return nil
}

Merge()方法中会区分Pod变更类型,最终将Pod变更信息传入configCh,kl.syncLoopIteration()方法中监听configCh,交给handler去处理,在handler中通过调用dispatchWork分发任务

configCh数据写入流程如下图:

在这里插入图片描述

2)、plegCh

初始化pleg并运行代码如下:

// pkg/kubelet/pleg/generic.go
func (g *GenericPLEG) Start() {go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}

Start()方法中启动一个gorounite函数每一秒执行一次relist()方法

// pkg/kubelet/pleg/generic.go
func (g *GenericPLEG) relist() {klog.V(5).InfoS("GenericPLEG: Relisting")if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))}timestamp := g.clock.Now()defer func() {metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))}()// Get all the pods.// 调用runtime获取当前node的所有pod和container信息(最终调用CRI接口)podList, err := g.runtime.GetPods(true)if err != nil {klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")return}g.updateRelistTime(timestamp)pods := kubecontainer.Pods(podList)// update running pod and container countupdateRunningPodAndContainerMetrics(pods)g.podRecords.setCurrent(pods)// Compare the old and the current pods, and generate events.eventsByPodID := map[types.UID][]*PodLifecycleEvent{}for pid := range g.podRecords {oldPod := g.podRecords.getOld(pid)pod := g.podRecords.getCurrent(pid)// Get all containers in the old and the new pod.// 获得pod中的所有containerallContainers := getContainersFromPods(oldPod, pod)for _, container := range allContainers {// 检查container是否有变化,如果有变化,生成podLifecycleEventevents := computeEvents(oldPod, pod, &container.ID)for _, e := range events {updateEvents(eventsByPodID, e)}}}var needsReinspection map[types.UID]*kubecontainer.Podif g.cacheEnabled() {needsReinspection = make(map[types.UID]*kubecontainer.Pod)}// If there are events associated with a pod, we should update the// podCache.// 遍历所有发生的event的podfor pid, events := range eventsByPodID {pod := g.podRecords.getCurrent(pid)if g.cacheEnabled() {// updateCache() will inspect the pod and update the cache. If an// error occurs during the inspection, we want PLEG to retry again// in the next relist. To achieve this, we do not update the// associated podRecord of the pod, so that the change will be// detect again in the next relist.// TODO: If many pods changed during the same relist period,// inspecting the pod and getting the PodStatus to update the cache// serially may take a while. We should be aware of this and// parallelize if needed.if err := g.updateCache(pod, pid); err != nil {// Rely on updateCache calling GetPodStatus to log the actual error.klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))// make sure we try to reinspect the pod during the next relistingneedsReinspection[pid] = podcontinue} else {// this pod was in the list to reinspect and we did so because it had events, so remove it// from the list (we don't want the reinspection code below to inspect it a second time in// this relist execution)delete(g.podsToReinspect, pid)}}// Update the internal storage and send out the events.g.podRecords.update(pid)// Map from containerId to exit code; used as a temporary cache for lookupcontainerExitCode := make(map[string]int)// 遍历这个pod的所有event变化for i := range events {// Filter out events that are not reliable and no other components use yet.if events[i].Type == ContainerChanged {continue}select {// 推送到kubelet的plegCh中case g.eventChannel <- events[i]:default:metrics.PLEGDiscardEvents.Inc()klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")}// Log exit code of containers when they finished in a particular eventif events[i].Type == ContainerDied {// Fill up containerExitCode map for ContainerDied event when first time appearedif len(containerExitCode) == 0 && pod != nil && g.cache != nil {// Get updated podStatusstatus, err := g.cache.Get(pod.ID)if err == nil {for _, containerStatus := range status.ContainerStatuses {containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode}}}if containerID, ok := events[i].Data.(string); ok {if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)}}}}}if g.cacheEnabled() {// reinspect any pods that failed inspection during the previous relistif len(g.podsToReinspect) > 0 {klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")for pid, pod := range g.podsToReinspect {if err := g.updateCache(pod, pid); err != nil {// Rely on updateCache calling GetPodStatus to log the actual error.klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))needsReinspection[pid] = pod}}}// Update the cache timestamp.  This needs to happen *after*// all pods have been properly updated in the cache.g.cache.UpdateTime(timestamp)}// make sure we retain the list of pods that need reinspecting the next time relist is calledg.podsToReinspect = needsReinspection
}

relist()方法中主要逻辑如下:

  1. 调用runtime获取当前Node的所有Pod和Container信息(最终调用CRI接口)
  2. 遍历所有Pod,检查container是否有变化,如果有变化,生成podLifecycleEvent
  3. 遍历所有发生的event的Pod,遍历Pod的所有event变化,推送到kubelet的plegChg中
3)、syncCh

所有的Pod进入syncLoopIteration()方法后,最终会走到managePodLoop()方法中,会将Pod信息添加到workQueue队列里

// pkg/kubelet/pleg/generic.go
func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {// Requeue the last update if the last sync returned error.switch {case syncErr == nil:// No error; requeue at the regular resync interval.p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):// Network is not ready; back off for short period of time and retry as network might be ready soon.p.workQueue.Enqueue(pod.UID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))default:// Error occurred during the sync; back off and then retry.p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))}p.completeWorkQueueNext(pod.UID)
}

syncCh是由syncLoop()方法里面创建的一个定时任务,每秒钟会调用getPodsToSync()方法从workQueue中获取等待同步的Pod进行同步

4)、小结

kubelet核心流程如下图

在这里插入图片描述

参考:

11.深入k8s:kubelet工作原理及其初始化源码分析

45 | 幕后英雄:SIG-Node与CRI

46 | 解读 CRI 与 容器运行时

kubelet启动&&创建Pod源码分析

kubelet源码分析 syncLoopIteration(一) configCh

kubelet源码分析 syncLoopIteration(二) plegCh、syncCh、relist

kubelet源码分析 housekeeping 定时清理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/423691.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java面试题18 牛客 假定Base b = new Derived();

Java面试题18 牛客 假定Base b new Derived&#xff08;&#xff09;; 调用执行b.methodOne&#xff08;&#xff09;后&#xff0c;输出结果是什么&#xff1f; 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class Base { public…

ReactOS,硬件抽象层,HAL概述

分析环境reactos0.3.4 &#xff0c;i386体系] ReactOS的硬件抽象层 HAL(Hardware Abstraction Layer)位于OS与硬件的中间&#xff0c;和windows的hal原理基本一致&#xff0c;硬件抽象层隐藏特定平台的硬件接口细节&#xff0c;为上层的系统提供标准的稳定的虚拟硬件平台&…

[给ASP.NET 初学者的话]不要练功练了三年,才发现自己必须「砍掉重练」!....学习ASP.NET之前,请先把自己杯中的水倒掉...

这是我的文章备份&#xff0c;原文请看&#xff1a; [给ASP.NET 初学者的话]不要练功练了三年&#xff0c;才发现自己必须「砍掉重练」&#xff01;....学习ASP.NET之前&#xff0c;请先把自己杯中的水倒掉 http://www.dotblogs.com.tw/mis2000lab/archive/2012/03/15/game_ove…

Java面试题19 牛客下面有关java的引用类型,说法正确的有?

Java面试题19下面有关java的引用类型&#xff0c;说法正确的有&#xff1f; A:对于一个对象来说&#xff0c;只要有强引用的存在&#xff0c;它就会一直存在于内存中 B&#xff1a;如果一个对象仅持有虚引用&#xff0c;那么它就和没有任何引用一样&#xff0c;在任何时候都可…

MapReduce运行机制

相关链接 MapReduce中Shuffle机制详解——Map端Shuffle链接 MapReduce中Shuffle机制详解——Reduce端Shuffle链接MapReduce将作业job的整个运行过程分为两个阶段&#xff1a;Map阶段和Reduce阶段。按照时间顺序包括&#xff1a;输入分片&#xff08;input split&#xff09;、m…

WebService Software Factory 设计草图

以下是根据WSSF设计理念&#xff0c;按现公司的业务需求设计的接口项目拟稿&#xff0c;实现细分。仅供参考。&#xff08;WSSF---ASMX版&#xff09; 建议用1680*1050分辩浏览 转载于:https://www.cnblogs.com/RuiLei/archive/2008/10/05/1304160.html

Java面试题 20在面向对象编程里,经常使用is-a来说明对象之间的继承关系

Java面试题 20在面向对象编程里&#xff0c;经常使用is-a来说明对象之间的继承关系&#xff0c;下列对象中不具备继承关系的是&#xff1f;&#xff08;&#xff09; A:手机与小米手机 B&#xff1a;企业家与雷军 C:编程语言与Java D&#xff1a;中国与北京 类之间存在以下…

MapReduce计数器

原文链接&#xff1a;http://itfish.net/article/61067.html 1、MapReduce计数器是什么&#xff1f;计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器&#xff0c;记录数据或者进度的变化情况。 2、MapReduce计数器能做什么…

编写“线围棋”程序-2-可开局

棋盘有了&#xff0c;怎么支持在上面落子呢&#xff1f; 只要解决下面3个问题就可以了&#xff1a; 1.响应鼠标点击事件&#xff0c;获得“下棋子”的动作源。 2.修改和记录棋局状态。 3.在棋盘上显示棋局的状态。 为此&#xff0c;直接增加一个“棋局类“&#xff0c;也就是对…

Java面试题 21 下列说法正确的有()

下列说法正确的有&#xff08;&#xff09; A 能被java.exe成功运行的java class文件必须有main()方法 B J2SDK就是Java API C:Appletviewer.exe可利用jar选项运行.jar文件 D能被Appletviewer成功运行的java class文件必须有main()方法 蒙蔽树上蒙蔽果&#xff0c;蒙蔽树下…

[翻译]SQL Server 未公开的两个存储过程sp_MSforeachtable 和 sp_MSforeachdb

SQL Server 未公开的两个存储过程sp_MSforeachtable 和 sp_MSforeachdb 您是否曾经写过代码来处理数据库中的所有表&#xff1f;处理一个 SQL Server实例中的所有数据库的代码又该如何写&#xff1f;然则&#xff0c;您是否知道有多种方法可以解决这问题&#xff1f;您可以创建…

Java面试题 22 牛客 Java是一门支持反射的语言,基于反射为Java提供了丰富的动态性支持

Java面试题 22 牛客 Java是一门支持反射的语言,基于反射为Java提供了丰富的动态性支持&#xff0c;下面关于Java反射的描述&#xff0c;哪些是错误的&#xff1a;( ) A Java反射主要涉及的类如Class, Method, Filed,等&#xff0c;他们都在java.lang.reflet包下 B 通…

java面试题24 关于Java中的数组,

java面试题24 关于Java中的数组&#xff0c;下面的一些描述&#xff0c;哪些描述是准确的&#xff1a;&#xff08; &#xff09; A 数组是一个对象&#xff0c;不同类型的数组具有不同的类 B 数组长度是可以动态调整的 C 数组是一个连续的存储结构 D:一个固定长度的…

[开发技巧3]不显示报表直接打印

水晶报表9.2VB6 使用Application可以进行打印 在将数据赋给报表模板后&#xff0c;调用PrintOut方法 赋给报表数据objCRReport.Database.SetDataSource rst 此句打印&#xff0c;会出现打印提示框objCRReport.PrintOut 不提示&#xff0c;直接打印到默认打印机CallobjCRReport.…

SQL开发中容易忽视的一些小地方( 三)

目的&#xff1a;这篇文章我想说说我在工作中关于in和union all 的用法. 索引定义 &#xff1a; 微软的SQL SERVER提供了两种索引&#xff1a;聚集索引(clustered index&#xff0c;也称聚类索引、簇集索引)和非聚集索引(nonclustered index&#xff0c;也称非聚类索引、非簇集…

java面试题26 java语言的下面几种数组复制方法中,哪个效率最高?

java面试题26 java语言的下面几种数组复制方法中&#xff0c;哪个效率最高&#xff1f; A for 循环逐一复制 B System.arraycopy C Array.copyOf D 使用clone方法 效率&#xff1a;System.arraycopy > clone > Arrays.copyOf > for循环 1、System.arraycopy的用法…

pycharm使用笔记2-远程连接(转)

原文地址:https://blog.csdn.net/jinxiaonian11/article/details/70208920 随着科技的发展&#xff0c;远程办公已经是一种趋势&#xff0c;远程开发能力对于每一个程序员来说都是必不可少的。有时候就算在公司&#xff0c;在进行开发的时候有许多的数据都是储存在服务器上的&a…

java面试题27 java中下面哪些是Object类的方法()

java面试题27 java中下面哪些是Object类的方法&#xff08;&#xff09; A notify() B notifyAll() C sleep() D wait() 蒙蔽树上蒙蔽果&#xff0c;蒙蔽树下你和我。遇到这种题&#xff0c;我默默的打开了编译工具 Object类中方法&#xff1a; protected Object clone()…

shiro学习(1):shiro简介

Apache Shiro是Java的一个安全框架。对比另一个安全框架Spring Sercurity&#xff0c;它更简单和灵活。 Shiro可以帮助我们完成&#xff1a;认证、授权、加密、会话管理、Web集成、缓存等。 Apache Shiro特性 Authentication&#xff1a;身份认证/登录&#xff0c;验证用户是…

微软启动了自爆程序,让我们一起帮它倒计时

……“公元2008年10月20日&#xff0c;注定成为人类信息技术史上不平凡的一天&#xff0c;因为在这一天&#xff0c;曾经创造了无数辉煌的计算机软件帝国微软公司&#xff0c;启动了自爆程序&#xff0c;剩下的&#xff0c;就是倒计时了……” ——《地球人类信息技术编年史》 …