kubernetes pv-controller 解析

简介:pv controller是 kcm 的组件之一,它负责处理集群中的pvc/pv对象,对pvc/pv 对象进行状态转换。本文将基于 kubernetes 1.23进行解析。

39154261bcd1325e7642a78fd301087.jpg

作者 | 牧琦
来源 | 阿里技术公众号

基于 kubernetes 1.23

一 简介

pv controller是 kcm 的组件之一,它负责处理集群中的pvc/pv对象,对pvc/pv 对象进行状态转换。

二 pvController 初始化

初始化代码在 pkg/controller/volume/persistentvolume/pv_controller_base.go 文件中,NewController 主要做了如下几件事情

  • 初始化 eventRecorder
  • 初始化 PersistentVolumeController 对象,
  • 调用 VolumePluginMgr.InitPlugins() 方法 初始化存储插件,代码存在于 pkg/volume/plugins.go 文件中
  • 开始创建 informer 监听集群内的资源,初始化了如下 informer

    • PersistentVolumeInformer
    • PersistentVolumeClaimInformer
    • StorageClassInformer
    • PodInformer
    • NodeInformer
  • 将 PV & PVC 的 event 分别放入 volumeQueue & claimQueue
  • 为了不每次都迭代 pods ,自定义一个通过 pvc 键索引 pod 的索引器
  • 初始化 intree 存储 -> csi 迁移相关功能的 manager

NewController代码在cmd/kube-controller-manager代码里面被调用,初始化成功之后紧接着调用go Run()方法运行 pvController

三 开始运行

// 开始运行 pvController 
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer ctrl.claimQueue.ShutDown()defer ctrl.volumeQueue.ShutDown()klog.Infof("Starting persistent volume controller")defer klog.Infof("Shutting down persistent volume controller")if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {return}ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)go wait.Until(ctrl.volumeWorker, time.Second, stopCh)go wait.Until(ctrl.claimWorker, time.Second, stopCh)metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr)<-stopCh
}

同步缓存之后开始周期性执行 ctrl.resync,ctrl.volumeWorker , ctrl.claimWorker , 我们看下 initalizeCaches 方法

func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) {// 这里不访问 apiserver,是从本地缓存拿出的对象,这些对象不可以被外部函数修改volumeList, err := volumeLister.List(labels.Everything())if err != nil {klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)return}for _, volume := range volumeList {// 我们不能改变 volume 对象,所以这里我们copy一份新对象,对新对象进行操作volumeClone := volume.DeepCopy()if _, err = ctrl.storeVolumeUpdate(volumeClone); err != nil {klog.Errorf("error updating volume cache: %v", err)}}claimList, err := claimLister.List(labels.Everything())if err != nil {klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)return}for _, claim := range claimList {if _, err = ctrl.storeClaimUpdate(claim.DeepCopy()); err != nil {klog.Errorf("error updating claim cache: %v", err)}}klog.V(4).Infof("controller initialized")
}type persistentVolumeOrderedIndex struct {store cache.Indexer
}

该方法将 cache.listener 里面的缓存转存在 persistentVolumeOrderedIndex 中,它是按 AccessModes 索引并按存储容量排序的 persistentVolume 的缓存。

1 resync

func (ctrl *PersistentVolumeController) resync() {klog.V(4).Infof("resyncing PV controller")pvcs, err := ctrl.claimLister.List(labels.NewSelector())if err != nil {klog.Warningf("cannot list claims: %s", err)return}for _, pvc := range pvcs {ctrl.enqueueWork(ctrl.claimQueue, pvc)}pvs, err := ctrl.volumeLister.List(labels.NewSelector())if err != nil {klog.Warningf("cannot list persistent volumes: %s", err)return}for _, pv := range pvs {ctrl.enqueueWork(ctrl.volumeQueue, pv)}
}

这里将集群内所有的 pvc/pv 统一都放到对应的 claimQueue & volumeQueue 里面重新处理。 这个resyncPeriod 等于一个random time.Duration * config.time(在 kcm 启动时设置)。

2 volumeWorker

一个无限循环, 不断的处理从 volumeQueue 里面获取到的 PersistentVolume

workFunc := func() bool {keyObj, quit := ctrl.volumeQueue.Get()if quit {return true}defer ctrl.volumeQueue.Done(keyObj)key := keyObj.(string)klog.V(5).Infof("volumeWorker[%s]", key)_, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)return false}volume, err := ctrl.volumeLister.Get(name)if err == nil {// The volume still exists in informer cache, the event must have// been add/update/syncctrl.updateVolume(volume)return false}if !errors.IsNotFound(err) {klog.V(2).Infof("error getting volume %q from informer: %v", key, err)return false}// The volume is not in informer cache, the event must have been// "delete"volumeObj, found, err := ctrl.volumes.store.GetByKey(key)if err != nil {klog.V(2).Infof("error getting volume %q from cache: %v", key, err)return false}if !found {// The controller has already processed the delete event and// deleted the volume from its cacheklog.V(2).Infof("deletion of volume %q was already processed", key)return false}volume, ok := volumeObj.(*v1.PersistentVolume)if !ok {klog.Errorf("expected volume, got %+v", volumeObj)return false}ctrl.deleteVolume(volume)return false}

我们主要关注 ctrl.updateVolume(volume) 方法

updateVolume

updateVolume 方法是对于集群内的 events 实际 handler 方法,它里面主要调用了 ctrl.syncVolume 方法来处理

func (ctrl *PersistentVolumeController) syncVolume(ctx context.Context, volume *v1.PersistentVolume) error {klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))...// [Unit test set 4]if volume.Spec.ClaimRef == nil {// Volume is unusedklog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil} else /* pv.Spec.ClaimRef != nil */ {// Volume is bound to a claim.if volume.Spec.ClaimRef.UID == "" {// The PV is reserved for a PVC; that PVC has not yet been// bound to this PV; the PVC sync will handle it.klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil}klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Get the PVC by _name_var claim *v1.PersistentVolumeClaimclaimName := claimrefToClaimKey(volume.Spec.ClaimRef)obj, found, err := ctrl.claims.GetByKey(claimName)if err != nil {return err}if !found {if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)if err != nil && !apierrors.IsNotFound(err) {return err}found = !apierrors.IsNotFound(err)if !found {obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})if err != nil && !apierrors.IsNotFound(err) {return err}found = !apierrors.IsNotFound(err)}}}if !found {klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Fall through with claim = nil} else {var ok boolclaim, ok = obj.(*v1.PersistentVolumeClaim)if !ok {return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)}klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))}if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {klog.V(4).Infof("Maybe cached claim: %s is not the newest one, we should fetch it from apiserver", claimrefToClaimKey(volume.Spec.ClaimRef))claim, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})if err != nil && !apierrors.IsNotFound(err) {return err} else if claim != nil {// Treat the volume as bound to a missing claim.if claim.UID != volume.Spec.ClaimRef.UID {klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a newer UID than pv.ClaimRef, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))claim = nil} else {klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a same UID with pv.ClaimRef", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))}}}if claim == nil {if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {// Also, log this only once:klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {// Nothing was saved; we will fall back into the same condition// in the next call to this methodreturn err}}if err = ctrl.reclaimVolume(volume); err != nil {// Release failed, we will fall back into the same condition// in the next call to this methodreturn err}if volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimRetain {// volume is being retained, it references a claim that does not exist now.klog.V(4).Infof("PersistentVolume[%s] references a claim %q (%s) that is not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), volume.Spec.ClaimRef.UID)}return nil} else if claim.Spec.VolumeName == "" {if pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) {volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)// Skipping syncClaimreturn nil}if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {// The binding is not completed; let PVC sync handle itklog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)} else {// Dangling PV; try to re-establish the link in the PVC syncklog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)}ctrl.claimQueue.Add(claimToClaimKey(claim))return nil} else if claim.Spec.VolumeName == volume.Name {// Volume is bound to a claim properly, update status if necessaryklog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil} else {// Volume is bound to a claim, but the claim is bound elsewhereif metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {// Also, log this only once:klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {// Nothing was saved; we will fall back into the same condition// in the next call to this methodreturn err}}if err = ctrl.reclaimVolume(volume); err != nil {return err}return nil} else {if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)if err = ctrl.unbindVolume(volume); err != nil {return err}return nil} else {// The PV must have been created with this ptr; leave it alone.klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)if err = ctrl.unbindVolume(volume); err != nil {return err}return nil}}}}
}

1、当 pv 的 Spec.ClaimRef 的值为空的时候,说明当前 pv 未被使用,调用 ctrl.updateVolumePhase 使得 pv 进入 Available 状态

2、当 pv 的 Spec.ClaimRef 的值不为空的时候, 说明当前 pv 已绑定一个pvc

  • 当Spec.ClaimRef.UID 为空的时候,说明 pvc 还未绑定 pv, 调用ctrl.updateVolumePhase 使得 pv 进入 Available 状态, 方法返回,等待 pvc syncClaim 方法处理
  • 使用 Spec.ClaimRef 相关的 pvc 信息获取 pv_controller缓存的pvc
  • 如果 pvc 没有找到

    • 有可能是集群压力过大缓存没有更新,则进一步从 informercache 中找,如果 informercache里面还是没有的话则进一步从apiserver中去找
    • 这里如果发现 非 Released & 非 Failed 的pv 经过上述步骤仍然找不到 pvc 的话,说明 pvc 被删除。在最新的kubernetes 版本中会检查reclaimPoilcy,对 pv的状态进行处理
  • 找到 pvc 之后

1)如果 pvc 的 uid 和 Spec.ClaimRef.UID 不一致,这样一般是 pv 指向的 pvc 被删了,然后立即创建了一个同名的pvc, 而缓存还没有更新,这时我们需要doublecheck一下,若 double check 之后依旧不存在,则判断是pv绑定了一个不存在的pvc, 将pvc置为空,执行上述pvc 没有找到的逻辑

2)如果pvc 的 volumeName 为空

  • 检查 pvc的 volumeMode 和 pv 的 volumeMode是否一致,不一致报 event 出来
  • 如果发现有这个 pv 有 AnnBoundByController = "pv.kubernetes.io/bound-by-controller" 这个annotation 说明 pvc/pv 流程正在绑定中
  • 将 pvc 放到 claimQueue 里面, 让 claimWorker 进行处理

3)如果 pvc.Spec.volumeName == pv.volumeName 的时候,直接将 pv 设置为 bound 状态

4)如果 pvc.Spec.volumeName != pv.volumeName 的时候

  • 如果是 pv 是动态创建的情况下,并且 pv 的 ReclaimPolicy 是 delete 的情况下, 说明 pvc 已经绑定了其他pv, 将 pv 置为 released 的状态, 等待deleters 删除
  • 如果 pv 不是动态创建的情况下,将 pv 的 ClaimRef 字段置为空,将其 unbound 掉

3 claimWorker

一个无限循环,不断的处理从 claimQueue 里面获取到的 PersistentVolumeClaim

    workFunc := func() bool {keyObj, quit := ctrl.claimQueue.Get()if quit {return true}defer ctrl.claimQueue.Done(keyObj)key := keyObj.(string)klog.V(5).Infof("claimWorker[%s]", key)namespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)return false}claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)if err == nil {// The claim still exists in informer cache, the event must have// been add/update/syncctrl.updateClaim(claim)return false}if !errors.IsNotFound(err) {klog.V(2).Infof("error getting claim %q from informer: %v", key, err)return false}// The claim is not in informer cache, the event must have been "delete"claimObj, found, err := ctrl.claims.GetByKey(key)if err != nil {klog.V(2).Infof("error getting claim %q from   cache: %v", key, err)return false}if !found {// The controller has already processed the delete event and// deleted the claim from its cacheklog.V(2).Infof("deletion of claim %q was already processed", key)return false}claim, ok := claimObj.(*v1.PersistentVolumeClaim)if !ok {klog.Errorf("expected claim, got %+v", claimObj)return false}ctrl.deleteClaim(claim)return false}

我们主要关注 ctrl.updateClaim(claim) 方法, 与上面同样,它里面主要调用了 ctrl.syncClaim 方法来处理, 在 syncClaim 里面根据 pvc 的状态分别调用了 ctrl.syncUnboundClaim & ctrl.syncBoundClaim 方法来处理

syncUnboundClaim

func (ctrl *PersistentVolumeController) syncUnboundClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {if claim.Spec.VolumeName == "" {// User did not care which PV they get.delayBinding, err := pvutil.IsDelayBindingMode(claim, ctrl.classLister)if err != nil {return err}// [Unit test set 1]volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)if err != nil {klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)return fmt.Errorf("error finding PV for claim %q: %w", claimToClaimKey(claim), err)}if volume == nil {klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))switch {case delayBinding && !pvutil.IsDelayBindingProvisioning(claim):if err = ctrl.emitEventForUnboundDelayBindingClaim(claim); err != nil {return err}case storagehelpers.GetPersistentVolumeClaimClass(claim) != "":if err = ctrl.provisionClaim(ctx, claim); err != nil {return err}return nildefault:ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")}// Mark the claim as Pending and try to find a match in the next// periodic syncClaimif _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}return nil} else /* pv != nil */ {// Found a PV for this claim// OBSERVATION: pvc is "Pending", pv is "Available"claimKey := claimToClaimKey(claim)klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume))if err = ctrl.bind(volume, claim); err != nil {metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)return err}metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil)return nil}} else /* pvc.Spec.VolumeName != nil */ {klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)if err != nil {return err}if !found {klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}return nil} else {volume, ok := obj.(*v1.PersistentVolume)if !ok {return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)}klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))if volume.Spec.ClaimRef == nil {klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))if err = checkVolumeSatisfyClaim(volume, claim); err != nil {klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)// send an eventmsg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)// volume does not satisfy the requirements of the claimif _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}} else if err = ctrl.bind(volume, claim); err != nil {// On any error saving the volume or the claim, subsequent// syncClaim will finish the binding.return err}// OBSERVATION: pvc is "Bound", pv is "Bound"return nil} else if pvutil.IsVolumeBoundToClaim(volume, claim) {// User asked for a PV that is claimed by this PVC// OBSERVATION: pvc is "Pending", pv is "Bound"klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))// Finish the volume binding by adding claim UID.if err = ctrl.bind(volume, claim); err != nil {return err}// OBSERVATION: pvc is "Bound", pv is "Bound"return nil} else {// User asked for a PV that is claimed by someone else// OBSERVATION: pvc is "Pending", pv is "Bound"if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) {klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)// User asked for a specific PV, retry laterif _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}return nil} else {klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)return fmt.Errorf("invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))}}}}
}

梳理下整体流程

  • 如果当前 pvc 的 volumeName 为空

    • 判断当前pvc 是否是延迟绑定的
    • 调用 volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding) 找出对应的 pv
  • 如果找到 volume 的话

    • 调用 ctrl.bind(volume, claim) 方法进行绑定
  • 如果没有找到 volume 的话

    • 如果是延迟绑定, 并且还未触发(pod 未引用)则 emit event 到 pvc 上
    • 如果 pvc 绑定了 sc, 调用 ctrl.provisionClaim(ctx, claim) 方法
  1. 分析 pvc yaml, 找到 provisioner driver
  2. 启动一个 goroutine
  3. 调用 ctrl.provisionClaimOperation(ctx, claim, plugin, storageClass) 进行创建工作

provisionClaimOperation

func (ctrl *PersistentVolumeController) provisionClaimOperation(ctx context.Context,claim *v1.PersistentVolumeClaim,plugin vol.ProvisionableVolumePlugin,storageClass *storage.StorageClass) (string, error) {claimClass := storagehelpers.GetPersistentVolumeClaimClass(claim)klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)pluginName := plugin.GetPluginName()if pluginName != "kubernetes.io/csi" && claim.Spec.DataSource != nil {strerr := fmt.Sprintf("plugin %q is not a CSI plugin. Only CSI plugin can provision a claim with a datasource", pluginName)return pluginName, fmt.Errorf(strerr)}provisionerName := storageClass.Provisioner// Add provisioner annotation to be consistent with external provisioner workflownewClaim, err := ctrl.setClaimProvisioner(ctx, claim, provisionerName)if err != nil {// Save failed, the controller will retry in the next syncklog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)return pluginName, err}claim = newClaimpvName := ctrl.getProvisionedVolumeNameForClaim(claim)volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})if err != nil && !apierrors.IsNotFound(err) {klog.V(3).Infof("error reading persistent volume %q: %v", pvName, err)return pluginName, err}if err == nil && volume != nil {// Volume has been already provisioned, nothing to do.klog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))return pluginName, err}// Prepare a claimRef to the claim early (to fail before a volume is// provisioned)claimRef, err := ref.GetReference(scheme.Scheme, claim)if err != nil {klog.V(3).Infof("unexpected error getting claim reference: %v", err)return pluginName, err}// Gather provisioning optionstags := make(map[string]string)tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespacetags[CloudVolumeCreatedForClaimNameTag] = claim.Nametags[CloudVolumeCreatedForVolumeNameTag] = pvNameoptions := vol.VolumeOptions{PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,MountOptions:                  storageClass.MountOptions,CloudTags:                     &tags,ClusterName:                   ctrl.clusterName,PVName:                        pvName,PVC:                           claim,Parameters:                    storageClass.Parameters,}// Refuse to provision if the plugin doesn't support mount options, creation// of PV would be rejected by validation anywayif !plugin.SupportsMountOption() && len(options.MountOptions) > 0 {strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions)klog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName())}// Provision the volumeprovisioner, err := plugin.NewProvisioner(options)if err != nil {strerr := fmt.Sprintf("Failed to create provisioner: %v", err)klog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}var selectedNode *v1.Node = nilif nodeName, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok {selectedNode, err = ctrl.NodeLister.Get(nodeName)if err != nil {strerr := fmt.Sprintf("Failed to get target node: %v", err)klog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}}allowedTopologies := storageClass.AllowedTopologiesopComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")volume, err = provisioner.Provision(selectedNode, allowedTopologies)opComplete(volumetypes.CompleteFuncParam{Err: &err})if err != nil {ctrl.rescheduleProvisioning(claim)strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)klog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}klog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))// Create Kubernetes PV object for the volume.if volume.Name == "" {volume.Name = pvName}// Bind it to the claimvolume.Spec.ClaimRef = claimRefvolume.Status.Phase = v1.VolumeBoundvolume.Spec.StorageClassName = claimClass// Add AnnBoundByController (used in deleting the volume)metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnBoundByController, "yes")metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned, plugin.GetPluginName())// Try to create the PV object several timesfor i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {klog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)var newVol *v1.PersistentVolumeif newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), volume, metav1.CreateOptions{}); err == nil || apierrors.IsAlreadyExists(err) {// Save succeeded.if err != nil {klog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim))err = nil} else {klog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))_, updateErr := ctrl.storeVolumeUpdate(newVol)if updateErr != nil {// We will get an "volume added" event soon, this is not a big errorklog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr)}}break}// Save failed, try again after a while.klog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)time.Sleep(ctrl.createProvisionedPVInterval)}if err != nil {strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)klog.V(3).Info(strerr)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)var deleteErr errorvar deleted boolfor i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {_, deleted, deleteErr = ctrl.doDeleteVolume(volume)if deleteErr == nil && deleted {// Delete succeededklog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)break}if !deleted {klog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName())break}// Delete failed, try again after a while.klog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr)time.Sleep(ctrl.createProvisionedPVInterval)}if deleteErr != nil {strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr)klog.V(2).Info(strerr)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)}} else {klog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName())ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)}return pluginName, nil
}

provisionClaimOperation 的基本逻辑如下

  • 检查driver,只有 csi 类型的 driver 才允许使用 dataSource 字段
  • 为 pvc 加 claim.Annotations["volume.kubernetes.io/storage-provisioner"] = class.Provisioner annotation
  • 根据规则拼出 pv Name = "pvc-" + pvc.UID
  • 如果找到了 pv, 则说明 pv已经存在,跳过 provision
  • 收集pvc/pv 基本信息封装到 options 中
  • 对 plugin 进行校验, 如果plugin不支持mount操作,则直接拒绝provision 请求
  • 调用plugin.NewProvisioner(options) 创建 provisioner, 接口实现了Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) 方法,注意,该方法为同步方法
  • Provision 方法返回了 PersistentVolume实例
  • 为创建出来的 pv 关联 pvc 对象(ClaimRef),尝试创建 pv 对象 (重复多次)
  • 如果创建 pv 失败,则尝试调用 Delete 方法删除创建的volume资源

syncBoundClaim

func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {if claim.Spec.VolumeName == "" {// Claim was bound before but not any more.if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {return err}return nil}obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)if err != nil {return err}if !found {// Claim is bound to a non-existing volume.if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {return err}return nil} else {volume, ok := obj.(*v1.PersistentVolume)if !ok {return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)}klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))if volume.Spec.ClaimRef == nil {// Claim is bound but volume has come unbound.// Or, a claim was bound and the controller has not received updated// volume yet. We can't distinguish these cases.// Bind the volume again and set all states to Bound.klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))if err = ctrl.bind(volume, claim); err != nil {// Objects not saved, next syncPV or syncClaim will try againreturn err}return nil} else if volume.Spec.ClaimRef.UID == claim.UID {// All is well// NOTE: syncPV can handle this so it can be left out.// NOTE: bind() call here will do nothing in most cases as// everything should be already set.klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))if err = ctrl.bind(volume, claim); err != nil {// Objects not saved, next syncPV or syncClaim will try againreturn err}return nil} else {// Claim is bound but volume has a different claimant.// Set the claim phase to 'Lost', which is a terminal// phase.if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {return err}return nil}}
}

1)如果 pvc.Spec.VolumeName 为空, 说明这个 pvc 之前被 bound 过,但是已经不存在指向的pv, 报出event并返回

2)从 cache 里面找 pvc 绑定的 pv

  • 如果没找到, 说明 pvc 绑定了一个不存在的pv,报 event 并返回
  • 如果找到了pv

    • 检查 pv.Spec.ClaimRef 字段, 如果 为空,说明 pv 还没有绑定 pvc, 调用 ctrl.bind(volume, claim); 方法进行绑定
    • pv.ClaimRef.UID == pvc.UID, 调用 bind 方法,但是大多数情况会直接返回(因为所有的操作都已经做完了)
    • 其他情况说明 volume 绑定了其他的 pvc, 更新pvc 的状态 为 lost 并报出 event

四 总结

最后用一张 pvc/pv 的状态流转图来总结一下

image.png

原文链接

本文为阿里云原创内容,未经允许不得转载。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/511715.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Apsara Stack 技术百科 | 标准化的云时代:一云多芯

简介&#xff1a;随着今年云栖大会现场平头哥的自研云芯片倚天710发布&#xff0c;以及众多新兴厂商的芯片发布&#xff0c;将有越来越多的类型芯片进入到主流市场&#xff0c;“多芯”的架构将在数据中心中越来越常见&#xff0c;阿里云混合云在安全和稳定的基础上&#xff0c…

用户态 tcpdump 如何实现抓到内核网络包的?

作者 | 张彦飞allen来源 | 开发内功修炼今天聊聊大家工作中经常用到的 tcpdump。在网络包的发送和接收过程中&#xff0c;绝大部分的工作都是在内核态完成的。那么问题来了&#xff0c;我们常用的运行在用户态的程序 tcpdump 是那如何实现抓到内核态的包的呢&#xff1f;有的同…

技术实践第三期|HashTag在Redis集群环境下的使用

简介&#xff1a;欢迎了解友盟技术干货第三期内容&#xff1a;Redis集群环境如何按照前缀批量删除缓存。希望能对开发者们在实际应用中有所帮助。 一、背景 数据源列表添加缓存支持&#xff0c;types字段可传多值&#xff0c;如app, mini, web等&#xff0c;会构建如下缓存ke…

android java加密_Android Java字符串加密

Java字符串加密学习笔记Java字节码字节码生成Java2bitcode由Java源代码生成字节码的过程如下图(图片来源于网络)JVM执行不是热代码直接走字节码解释器热代码&#xff1a;多次调用&#xff0c;多次执行的循环体&#xff0c;会被JIT优化成机器码字节码执行方法调用在JVM中即是字节…

数据质量漫谈

简介&#xff1a; 数据质量问题无处不在&#xff0c;本文尝试找到一种方法&#xff0c;能够尽可能的发现数据质量问题并解决之。 作者 | 茂才 来源 | 阿里技术公众号 一 概述 1 数据质量问题无处不在 基本上每个用数据的同学&#xff0c;都遇到过以下类似的问题。 表没有按…

7招,实现安全高效的流水线管理

简介&#xff1a;云效团队多年来为阿里巴巴内部&#xff08;Aone&#xff09;和云上企业用户&#xff08;云效&#xff09;分别提供研发运维工具&#xff0c;并致力于打造企业级一站式的 DevOps 平台&#xff0c;更多关注不同类型的企业用户在使用过程中的管理与协作场景&#…

字节跳动最新音乐检索系统ByteCover2,检索速度提高八倍

翻唱识别&#xff08;CSI&#xff09;是音乐信息检索&#xff08;MIR&#xff09;领域的一项重要任务&#xff0c;在歌曲搜索&#xff0c;音乐分发&#xff0c;曲库整理&#xff0c;智能推荐等场景下有着重要作用&#xff0c;被誉为下一代音乐识别技术。 近期&#xff0c;字节…

Serverless 场景排查问题利器 : 函数实例命令行操作

简介&#xff1a;实例命令行功能的推出希望能消除用户使用 Serverless 的“最后一公里”&#xff0c;直接将真实的函数运行环境展现给用户。 背景介绍 全托管的 Serverless 计算平台能给用户带来更少的运维代价、更强的稳定性和更快的弹性能力&#xff0c;在 Serverless 落地…

从运维域看 Serverless 真的就是万能银弹吗?

简介&#xff1a;极客时间《Serverless 入门课》作者秦粤最新文章: 再次讨论正当时的 Serverless。文章分为三个部分&#xff0c;分别是 复杂化for 云开发商; 简化 for 开发者&#xff0c;以及团队使用 Serverless 的最佳场景。 作者说 在开始本篇内容前我想与各位开发者达成几…

多任务学习模型之ESMM介绍与实现

简介&#xff1a;本文介绍的是阿里巴巴团队发表在 SIGIR’2018 的论文《Entire Space Multi-Task Model: An Effective Approach for Estimating Post-Click Conversion Rate》。文章基于 Multi-Task Learning (MTL) 的思路&#xff0c;提出一种名为ESMM的CVR预估模型&#xff…

java pinyin4j 首字母_通讯录之按汉字首字母排序 --java--pinyin4J

最近开发手机端OA系统通讯录时遇到了用汉字首字母排序的问题&#xff0c;各种谷歌后发现了一个轮子pinyin4J&#xff0c;这个轮子是可以将汉字转换成字母拼音&#xff0c;个人觉得很好用&#xff0c;完美的解决了排序的问题&#xff0c;分享一下。一.工具介绍pinyin4j是一个支持…

助力开源生态繁荣,统信软件建设中国桌面操作系统根社区

继 React、SUSE、RedHat 宣布对俄罗斯停服后&#xff0c;近日 Ubuntu 开发商 Canonical 在俄乌冲突下也宣布对俄罗斯企业停止支持和专业服务。 这给我们敲醒了警钟&#xff1a;因为Ubuntu 事件瞄准桌面操作系统&#xff0c;桌面操作系统用户庞大&#xff0c;其安全性属于系统级…

一文详解 | 开放搜索兼容Elasticsearch做召回引擎

简介&#xff1a;开放搜索发布开源兼容版&#xff0c;支持阿里云Elasticsearch做搜索召回引擎&#xff0c;本文详细介绍阿里云ES用户如何通过接入开放搜索兼容版丰富行业分词库&#xff0c;提升查询语义理解能力&#xff0c;无需开发、算法投入&#xff0c;即可获得淘系同款搜索…

人人都是 Serverless 架构师 | 现代化 Web 应用开发实战

简介&#xff1a;本篇实战将介绍如何以超低成本构建动态的 Web 站点&#xff0c;并且实现灵活扩展&#xff0c;限流等效果&#xff0c;最后再跟大家聊一聊“现代应用”的相关概念。 相信很多同学都有过想要拥有自己的 Web 站点的想法&#xff0c;但是如果想要搭建动态的站点&a…

Gartner:如何在中国成功应用多云模式

作者 | Gartner研究总监 杜勇 供稿 | Gartner 当前&#xff0c;中国政府鼓励行业企业通过云计算技术来实施数字化转型&#xff0c;从而加速经济增长。许多企业机构已部署了私有云和单一供应商混合云&#xff0c;以实现这一目标。为了满足全球业务和本地业务需要分别部署在不同的…

java socket 线程池_程序员:java使用线程池和TCP实现简单多轮聊天系统

最近在做物联网项目,需要使用TCP和传感器进行双向交互,通过这种渠道,找到了下面的代码,写成博客主要也是为了记录一下,以后用到随时可以看。代码实现服务端package com.tcp;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.…

阿里云李飞飞:中国数据库的时与势

简介&#xff1a;数据库、操作系统和中间件并列为三大基础软件&#xff0c;无论是在银行存取款&#xff0c;还是进行健康码查询&#xff0c;我们的日常应用和企业业务背后都离不开数据库。可以说&#xff0c;没有数据库&#xff0c;就难以构建数字化底座。过去的40多年&#xf…

阿里巴巴超大规模 Kubernetes 基础设施运维体系介绍

简介&#xff1a;ASI 作为阿里集团、阿里云基础设施底座&#xff0c;为越来越多的云产品提供更多专业服务&#xff0c;托管底层 K8s 集群&#xff0c;屏蔽复杂的 K8s 门槛、透明几乎所有的基础设施复杂度&#xff0c;并用专业的产品技术能力兜底稳定性&#xff0c;让云产品只需…

数据库资深“学霸”再启程,专访数据库初创公司矩阵起源全球 CTO 田丰博士

师出名门&#xff0c;工业界履历从大厂首席工程师到创业公司 CTO&#xff0c;并能一直从事底层系统的核心研发工作&#xff0c;可能是很多优秀技术人向往的光鲜履历。不过抛弃大厂的光鲜稳定工作和成功的创业项目&#xff0c;再次加入初创公司&#xff0c;则需要比常人更大的魄…

Spring官方RSocket Broker 0.3.0发布: 快速构建你的RSocket架构

简介&#xff1a;Spring官方的RSocket Broker其实开发已经非常久了&#xff0c;我以为会伴随着Spring Cloud 2021.0发布的&#xff0c;但是没有发生。不过Spring RSocket Broker还是发布了最新的0.3版本&#xff0c;虽然还是预览版&#xff0c;但目前已经可用&#xff0c;考虑官…