作者 | leadersnowy
来源 | CSDN博客
kube-scheduler介绍
首先我们知道,kube-scheduler的根本工作任务是根据各种调度算法将Pod调度到最合适的工作节点上
一、整个调度流程分为两个阶段:
1、预选(Predicates):
输入是所有节点,输出是满足预选条件的节点。kube-scheduler根据预选策略过滤掉不满足策略的Nodes。例如,如果某节点的资源不足或者不满足预选策略的条件如“Node的label必须与Pod的Selector一致”时则无法通过预选。
2、优选(Priorities):
输入是预选阶段筛选出的节点,优选会根据优先策略为通过预选的Nodes进行打分排名,选择得分最高的Node。例如,资源越富裕、负载越小的Node可能具有越高的排名。
而整个调度过程是有很多因素影响的,包括节点状态(cpu,内存,磁盘),节点与pod的亲和性,节点标签等,而我们本次只做与磁盘相关的源码分析。
源码分析
2.1、磁盘预选(Predicates)
这一部分核心代码在pkg/scheduler/framework/plugins/nodevolumelimits/这个目录下,分两个文件
csi.go,non_csi.go 两个文件的核心思想基本一致,都是通过Filter来限制一个节点上的最大盘的数量,这个最大数量是默认配置在k8s源码中的。csi的就是限制一个节点上最多能挂几块csi的volume,而non_csi的相对比较复杂,因为牵扯到in-tree插件跟flexvolume插件,而且对每一种in-tree插件都进行了分别的可挂载盘的数量的配置,下面我们对于non-csi的进行具体的源码分析。
话不多说,直接上代码:
2.1.1 nonCSILimits
type nonCSILimits struct {name stringfilter VolumeFiltervolumeLimitKey v1.ResourceNamemaxVolumeFunc func(node *v1.Node) intcsiNodeLister storagelisters.CSINodeListerpvLister corelisters.PersistentVolumeListerpvcLister corelisters.PersistentVolumeClaimListerscLister storagelisters.StorageClassLister// The string below is generated randomly during the struct's initialization.// It is used to prefix volumeID generated inside the predicate() method to// avoid conflicts with any real volume.randomVolumeIDPrefix string
}
最核心的结构体nonCSILimits ,提供了几个成员
name 顾名思义,每一个nonCSILimits结构体变量的名称
filter VolumeFilter类型的变量,具体包括几个方法,FilterVolume(),FilterPersistentVolume(),MatchProvisioner()以及IsMigrated()具体调用我们后面再讲
volumeLimitKey 其实就是一个string类型,指定了几种类型的key
maxVolumeFunc() 一个获取该Limits的最大数量的方法
csiNodeLister csinode的监听对象
pvLister pv的监听对象
pvcLister pvc的监听对象
scLister sc的监听对象
randomVolumeIDPrefix 一个string类型,用于生成唯一的pvID
2.1.2 nonCSILimits初始化
func newNonCSILimits(filterName string,csiNodeLister storagelisters.CSINodeLister,scLister storagelisters.StorageClassLister,pvLister corelisters.PersistentVolumeLister,pvcLister corelisters.PersistentVolumeClaimLister,
) framework.Plugin {var filter VolumeFiltervar volumeLimitKey v1.ResourceNamevar name stringswitch filterName {case ebsVolumeFilterType:name = EBSNamefilter = ebsVolumeFiltervolumeLimitKey = v1.ResourceName(volumeutil.EBSVolumeLimitKey)case gcePDVolumeFilterType:name = GCEPDNamefilter = gcePDVolumeFiltervolumeLimitKey = v1.ResourceName(volumeutil.GCEVolumeLimitKey)case azureDiskVolumeFilterType:name = AzureDiskNamefilter = azureDiskVolumeFiltervolumeLimitKey = v1.ResourceName(volumeutil.AzureVolumeLimitKey)case indiskVolumeFilterType:name = IndiskNamefilter = indiskVolumeFiltervolumeLimitKey = v1.ResourceName(volumeutil.IndiskVolumeLimitKey)default:klog.Fatalf("Wrong filterName, Only Support %v %v %v %v %v", ebsVolumeFilterType,gcePDVolumeFilterType, azureDiskVolumeFilterType, cinderVolumeFilterType, indiskVolumeFilterType)return nil}pl := &nonCSILimits{name: name,filter: filter,volumeLimitKey: volumeLimitKey,maxVolumeFunc: getMaxVolumeFunc(filterName),csiNodeLister: csiNodeLister,pvLister: pvLister,pvcLister: pvcLister,scLister: scLister,randomVolumeIDPrefix: rand.String(32),}return pl
}
初始化时引用了大量的常量
const (// defaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE.// GCE instances can have up to 16 PD volumes attached.defaultMaxGCEPDVolumes = 16// defaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure.// Larger Azure VMs can actually have much more disks attached.// TODO We should determine the max based on VM sizedefaultMaxAzureDiskVolumes = 16// ebsVolumeFilterType defines the filter name for ebsVolumeFilter.ebsVolumeFilterType = "EBS"// gcePDVolumeFilterType defines the filter name for gcePDVolumeFilter.gcePDVolumeFilterType = "GCE"// azureDiskVolumeFilterType defines the filter name for azureDiskVolumeFilter.azureDiskVolumeFilterType = "AzureDisk"// cinderVolumeFilterType defines the filter name for cinderVolumeFilter.indiskVolumeFilterType = "Indisk"// ErrReasonMaxVolumeCountExceeded is used for MaxVolumeCount predicate error.ErrReasonMaxVolumeCountExceeded = "node(s) exceed max volume count"// KubeMaxPDVols defines the maximum number of PD Volumes per kubelet.KubeMaxPDVols = "KUBE_MAX_PD_VOLS"// MaxIndiskVolumes defines the maximum number of Indisk Volumes per node.MaxIndiskVolumes = "MAX_INDISK_VOLUMES"// DefaultIndiskVolumes defines the default number of Indisk Volumes per node.DefaultIndiskVolumes = "DEFAULT_INDISK_VOLUMES"
)
这些常量包括各intree存储类型的名字,以及默认的最大volume数量等信息,在nonCSILimits初始化时进行赋值。
除了这些常量之外,最关键的就是filter方法的初始化,看一下cinder的例子,是怎么区分这个volume是cinder的volume的。
var cinderVolumeFilter = VolumeFilter{FilterVolume: func(vol *v1.Volume) (string, bool) {if vol.Cinder != nil {return vol.Cinder.VolumeID, true}return "", false},FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {if pv.Spec.Cinder != nil {return pv.Spec.Cinder.VolumeID, true}return "", false},MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {if sc.Provisioner == csilibplugins.CinderInTreePluginName {return true}return false},IsMigrated: func(csiNode *storage.CSINode) bool {return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)},
}
可以看到,filer其实是通过对元数据的字段(vol.Cinder,pv.Spec.Cinder,sc.Provisioner)等进行判断来分辨是不是本类型的volume的。
如果不是intree的,可以通过其它的字段来进行过滤
Flexvolume的我们可以用这些字段:
vol.FlexVolume.Driver,pv.Spec.FlexVolume.Driver
CSI的我们可以用这些字段:
vol.CSI.Driver,pv.Spec.CSI.Driver
2.1.3 核心方法Filter
// Filter invoked at the filter extension point.
func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {// If a pod doesn't have any volume attached to it, the predicate will always be true.// Thus we make a fast path for it, to avoid unnecessary computations in this case.if len(pod.Spec.Volumes) == 0 {return nil}newVolumes := make(map[string]bool)if err := pl.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {return framework.NewStatus(framework.Error, err.Error())}// quick returnif len(newVolumes) == 0 {return nil}node := nodeInfo.Node()if node == nil {return framework.NewStatus(framework.Error, fmt.Sprintf("node not found"))}var csiNode *storage.CSINodevar err errorif pl.csiNodeLister != nil {csiNode, err = pl.csiNodeLister.Get(node.Name)if err != nil {// we don't fail here because the CSINode object is only necessary// for determining whether the migration is enabled or notklog.V(5).Infof("Could not get a CSINode object for the node: %v", err)}}// if a plugin has been migrated to a CSI driver, defer to the CSI predicateif pl.filter.IsMigrated(csiNode) {return nil}// count unique volumesexistingVolumes := make(map[string]bool)for _, existingPod := range nodeInfo.Pods {if err := pl.filterVolumes(existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, existingVolumes); err != nil {return framework.NewStatus(framework.Error, err.Error())}}numExistingVolumes := len(existingVolumes)// filter out already-mounted volumesfor k := range existingVolumes {delete(newVolumes, k)}numNewVolumes := len(newVolumes)maxAttachLimit := pl.maxVolumeFunc(node)volumeLimits := volumeLimits(nodeInfo)if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok {maxAttachLimit = int(maxAttachLimitFromAllocatable)}if numExistingVolumes+numNewVolumes > maxAttachLimit {return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)}if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) {nodeInfo.TransientInfo.TransientLock.Lock()defer nodeInfo.TransientInfo.TransientLock.Unlock()nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumesnodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes}return nil
}
接下来看一下核心代码的逻辑
最开始是一些校验,首先校验这个pod有没有使用volume,再校验这个pod使用的volume是什么类型的volume,然后由对应类型的volume的filter进行处理。
看一下最核心的处理流程
numExistingVolumes := len(existingVolumes)// filter out already-mounted volumesfor k := range existingVolumes {delete(newVolumes, k)}numNewVolumes := len(newVolumes)maxAttachLimit := pl.maxVolumeFunc(node)volumeLimits := volumeLimits(nodeInfo)if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok {maxAttachLimit = int(maxAttachLimitFromAllocatable)}if numExistingVolumes+numNewVolumes > maxAttachLimit {return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)}if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) {nodeInfo.TransientInfo.TransientLock.Lock()defer nodeInfo.TransientInfo.TransientLock.Unlock()nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumesnodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes}return nil
逻辑非常清楚
numExistingVolumes 表示该节点上已经有的volume数
numNewVolumes是该pod需要创建的volume数
maxAttachLimit 是该类型的volume在节点上所能创建的最大volume数
如果 numExistingVolumes+numNewVolumes > maxAttachLimit 则该节点不可调度
如果可以调度,则return nil,然后释放资源
到此,预选阶段的工作做完。
这里还需要注意一个地方
nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumesnodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes
AllocatableVolumesCount,RequestedVolumes这两个变量的赋值是给后面进行优选打分的时候用的
2.2 磁盘优选(Priorities)
优选打分的条件也很多,这里我们只处理跟存储相关的,具体源码在
pkg\scheduler\framework\plugins\noderesources\resource_allocation.go
pkg\scheduler\framework\plugins\noderesources\least_allocated.go
2.2.1 resource_allocation
resource_allocation 字体意思就可以理解,就是资源分配,而资源分配有多种方,比如least_allocated,most_allocated,requested_to_capacity_ratio都是资源分配的方法,而resource_allocation只是提供一个score方法,代码如下:
func (r *resourceAllocationScorer) score(pod *v1.Pod,nodeInfo *framework.NodeInfo) (int64, *framework.Status) {node := nodeInfo.Node()if node == nil {return 0, framework.NewStatus(framework.Error, "node not found")}if r.resourceToWeightMap == nil {return 0, framework.NewStatus(framework.Error, "resources not found")}requested := make(resourceToValueMap, len(r.resourceToWeightMap))allocatable := make(resourceToValueMap, len(r.resourceToWeightMap))for resource := range r.resourceToWeightMap {allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource)}var score int64// Check if the pod has volumes and this could be added to scorer function for balanced resource allocation.if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)} else {score = r.scorer(requested, allocatable, false, 0, 0)}if klog.V(10).Enabled() {if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {klog.Infof("%v -> %v: %v, map of allocatable resources %v, map of requested resources %v , allocatable volumes %d, requested volumes %d, score %d",pod.Name, node.Name, r.Name,allocatable, requested, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount,nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes,score,)} else {klog.Infof("%v -> %v: %v, map of allocatable resources %v, map of requested resources %v ,score %d,",pod.Name, node.Name, r.Name,allocatable, requested, score,)}}return score, nil
}
核心代码就一句
if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)} else {score = r.scorer(requested, allocatable, false, 0, 0)}
如果pod有volume并且BalanceAttachedNodeVolumes这个feature打开了,并且节点有TransientInfo
那么就走存储相关的打分,否则就不走。
我们看一下r.scorer的参数的后两个,就是我们在预选阶段最后赋值的两个参数
AllocatableVolumesCount 表示还可以创建的volume数量
RequestedVolumes 表示该pod需要的volume数量
2.2.2 least_allocated
least_allocated代码最少资源优先,也就是节点上资源越少,分越高
直接看源码
type resourceAllocationScorer struct {Name stringscorer func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64resourceToWeightMap resourceToWeightMap
}func leastResourceScorer(resToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 {return func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {var nodeScore, weightSum int64for resource, weight := range resToWeightMap {resourceScore := leastRequestedScore(requested[resource], allocable[resource])nodeScore += resourceScore * weightweightSum += weight}return nodeScore / weightSum}
}// The unused capacity is calculated on a scale of 0-MaxNodeScore
// 0 being the lowest priority and `MaxNodeScore` being the highest.
// The more unused resources the higher the score is.
func leastRequestedScore(requested, capacity int64) int64 {if capacity == 0 {return 0}if requested > capacity {return 0}return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity
}
我们来看核心算法
capacity表示还可以剩余资源数量
requested表示该pod需求资源数量
比如:capacity=10,requested=3,framework.MaxNodeScorel默认是100
那么得分就是 (10-3)*100/10=70
但是我们看到 leastResourceScorer中并没有引用存储的部分,所以我们可以手动添加上
if includeVolumes && allocatableVolumes - requestedVolumes > 0 && allocatableVolumes > 0 {nodeScore += int64(((allocatableVolumes - requestedVolumes) * int(framework.MaxNodeScore)) / allocatableVolumes)weightSum += 1}
至此,kube-scheduler存储部分代码解读。
往期推荐
从 40% 跌至 4%,“糊”了的 Firefox 还能重回巅峰吗?
Gartner 发布 2022 年汽车行业五大技术趋势
使用这个库,让你的服务操作 Redis 速度飞起
漫画:什么是“低代码”开发平台?
点分享
点收藏
点点赞
点在看