Knative 驾驭篇:带你 '纵横驰骋' Knative 自动扩缩容实现

Knative 中提供了自动扩缩容灵活的实现机制,本文从 三横两纵 的维度带你深入了解 KPA 自动扩缩容的实现机制。让你轻松驾驭 Knative 自动扩缩容。
注:本文基于最新 Knative v0.11.0 版本代码解读

KPA 实现流程图

在 Knative 中,创建一个 Revision 会相应的创建 PodAutoScaler 资源。在KPA中通过操作 PodAutoScaler 资源,对当前的 Revision 中的 POD 进行扩缩容。
针对上面的流程实现,我们从三横两纵的维度进行剖析其实现机制。

三横

  • KPA 控制器
  • 根据指标定时计算 POD 数
  • 指标采集

KPA 控制器

通过Revision 创建PodAutoScaler, 在 KPA 控制器中主要包括两个资源(Decider 和 Metric)和一个操作(Scale)。主要代码如下


func (c *Reconciler) reconcile(ctx context.Context, pa *pav1alpha1.PodAutoscaler) error {......decider, err := c.reconcileDecider(ctx, pa, pa.Status.MetricsServiceName)if err != nil {return fmt.Errorf("error reconciling Decider: %w", err)}if err := c.ReconcileMetric(ctx, pa, pa.Status.MetricsServiceName); err != nil {return fmt.Errorf("error reconciling Metric: %w", err)}// Metrics services are no longer needed as we use the private services now.if err := c.DeleteMetricsServices(ctx, pa); err != nil {return err}// Get the appropriate current scale from the metric, and right size// the scaleTargetRef based on it.want, err := c.scaler.Scale(ctx, pa, sks, decider.Status.DesiredScale)if err != nil {return fmt.Errorf("error scaling target: %w", err)}
......
}

这里先介绍一下两个资源:

  • Decider : 扩缩容决策的资源,通过Decider获取扩缩容POD数: DesiredScale。
  • Metric:采集指标的资源,通过Metric会采集当前Revision下的POD指标。

再看一下Scale操作,在Scale方法中,根据扩缩容POD数、最小实例数和最大实例数确定最终需要扩容的POD实例数,然后修改deployment的Replicas值,最终实现POD的扩缩容, 代码实现如下:


// Scale attempts to scale the given PA's target reference to the desired scale.
func (ks *scaler) Scale(ctx context.Context, pa *pav1alpha1.PodAutoscaler, sks *nv1a1.ServerlessService, desiredScale int32) (int32, error) {
......min, max := pa.ScaleBounds()if newScale := applyBounds(min, max, desiredScale); newScale != desiredScale {logger.Debugf("Adjusting desiredScale to meet the min and max bounds before applying: %d -> %d", desiredScale, newScale)desiredScale = newScale}desiredScale, shouldApplyScale := ks.handleScaleToZero(ctx, pa, sks, desiredScale)if !shouldApplyScale {return desiredScale, nil}ps, err := resources.GetScaleResource(pa.Namespace, pa.Spec.ScaleTargetRef, ks.psInformerFactory)if err != nil {return desiredScale, fmt.Errorf("failed to get scale target %v: %w", pa.Spec.ScaleTargetRef, err)}currentScale := int32(1)if ps.Spec.Replicas != nil {currentScale = *ps.Spec.Replicas}if desiredScale == currentScale {return desiredScale, nil}logger.Infof("Scaling from %d to %d", currentScale, desiredScale)return ks.applyScale(ctx, pa, desiredScale, ps)
}

根据指标定时计算 POD 数

这是一个关于Decider的故事。Decider创建之后会同时创建出来一个定时器,该定时器默认每隔 2 秒(可以通过TickInterval 参数配置)会调用Scale方法,该Scale方法实现如下:

func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount int32, excessBC int32, validScale bool) {......metricName := spec.ScalingMetricvar observedStableValue, observedPanicValue float64switch spec.ScalingMetric {case autoscaling.RPS:observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicRPS(metricKey, now)a.reporter.ReportStableRPS(observedStableValue)a.reporter.ReportPanicRPS(observedPanicValue)a.reporter.ReportTargetRPS(spec.TargetValue)default:metricName = autoscaling.Concurrency // concurrency is used by defaultobservedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicConcurrency(metricKey, now)a.reporter.ReportStableRequestConcurrency(observedStableValue)a.reporter.ReportPanicRequestConcurrency(observedPanicValue)a.reporter.ReportTargetRequestConcurrency(spec.TargetValue)}// Put the scaling metric to logs.logger = logger.With(zap.String("metric", metricName))if err != nil {if err == ErrNoData {logger.Debug("No data to scale on yet")} else {logger.Errorw("Failed to obtain metrics", zap.Error(err))}return 0, 0, false}// Make sure we don't get stuck with the same number of pods, if the scale up rate// is too conservative and MaxScaleUp*RPC==RPC, so this permits us to grow at least by a single// pod if we need to scale up.// E.g. MSUR=1.1, OCC=3, RPC=2, TV=1 => OCC/TV=3, MSU=2.2 => DSPC=2, while we definitely, need// 3 pods. See the unit test for this scenario in action.maxScaleUp := math.Ceil(spec.MaxScaleUpRate * readyPodsCount)// Same logic, opposite math applies here.maxScaleDown := math.Floor(readyPodsCount / spec.MaxScaleDownRate)dspc := math.Ceil(observedStableValue / spec.TargetValue)dppc := math.Ceil(observedPanicValue / spec.TargetValue)logger.Debugf("DesiredStablePodCount = %0.3f, DesiredPanicPodCount = %0.3f, MaxScaleUp = %0.3f, MaxScaleDown = %0.3f",dspc, dppc, maxScaleUp, maxScaleDown)// We want to keep desired pod count in the  [maxScaleDown, maxScaleUp] range.desiredStablePodCount := int32(math.Min(math.Max(dspc, maxScaleDown), maxScaleUp))desiredPanicPodCount := int32(math.Min(math.Max(dppc, maxScaleDown), maxScaleUp))
......return desiredPodCount, excessBC, true
}

该方法主要是从 MetricCollector 中获取指标信息,根据指标信息计算出需要扩缩的POD数。然后设置在 Decider 中。另外当 Decider 中 POD 期望值发生变化时会触发 PodAutoscaler 重新调和的操作,关键代码如下:

......
if runner.updateLatestScale(desiredScale, excessBC) {m.Inform(metricKey)}
......    

在KPA controller中设置调和Watch操作:

......// Have the Deciders enqueue the PAs whose decisions have changed.deciders.Watch(impl.EnqueueKey)
......    

指标采集

通过两种方式收集POD指标:

  • PUSH 收集指标:通过暴露指标接口,外部服务(如Activitor)可以调用该接口推送 metric 信息
  • PULL 收集指标:通过调用 Queue Proxy 服务接口收集指标。

PUSH 收集指标实现比较简单,在main.go中 暴露服务,将接收到的 metric 推送到 MetricCollector 中:

// Set up a statserver.statsServer := statserver.New(statsServerAddr, statsCh, logger)
....
go func() {for sm := range statsCh {collector.Record(sm.Key, sm.Stat)multiScaler.Poke(sm.Key, sm.Stat)}}()

PULL 收集指标是如何收集的呢? 还记得上面提到的Metric资源吧,这里接收到Metric资源又会创建出一个定时器,这个定时器每隔 1 秒会访问 queue-proxy 9090 端口采集指标信息。关键代码如下:

// newCollection creates a new collection, which uses the given scraper to
// collect stats every scrapeTickInterval.
func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, logger *zap.SugaredLogger) *collection {c := &collection{metric:             metric,concurrencyBuckets: aggregation.NewTimedFloat64Buckets(BucketSize),rpsBuckets:         aggregation.NewTimedFloat64Buckets(BucketSize),scraper:            scraper,stopCh: make(chan struct{}),}logger = logger.Named("collector").With(zap.String(logkey.Key, fmt.Sprintf("%s/%s", metric.Namespace, metric.Name)))c.grp.Add(1)go func() {defer c.grp.Done()scrapeTicker := time.NewTicker(scrapeTickInterval)for {select {case <-c.stopCh:scrapeTicker.Stop()returncase <-scrapeTicker.C:stat, err := c.getScraper().Scrape()if err != nil {copy := metric.DeepCopy()switch {case err == ErrFailedGetEndpoints:copy.Status.MarkMetricNotReady("NoEndpoints", ErrFailedGetEndpoints.Error())case err == ErrDidNotReceiveStat:copy.Status.MarkMetricFailed("DidNotReceiveStat", ErrDidNotReceiveStat.Error())default:copy.Status.MarkMetricNotReady("CreateOrUpdateFailed", "Collector has failed.")}logger.Errorw("Failed to scrape metrics", zap.Error(err))c.updateMetric(copy)}if stat != emptyStat {c.record(stat)}}}}()return c
}

两纵

  • 0-1 扩容
  • 1-N 扩缩容

上面从KPA实现的 3个横向角度进行了分析,KPA 实现了0-1扩容以及1-N 扩缩容,下面我们从这两个纵向的角度进一步分析。
我们知道,在 Knative 中,流量通过两种模式到达POD: Serve 模式和 Proxy 模式。
Proxy 模式: POD数为 0 时(另外针对突发流量的场景也会切换到 Proxy 模式,这里先不做详细解读),切换到 Proxy 模式。
Serve 模式:POD数不为 0 时,切换成 Serve 模式。
那么在什么时候进行模式的切换呢?在KPA中的代码实现如下:

mode := nv1alpha1.SKSOperationModeServe// We put activator in the serving path in the following cases:// 1. The revision is scaled to 0://   a. want == 0//   b. want == -1 && PA is inactive (Autoscaler has no previous knowledge of//            this revision, e.g. after a restart) but PA status is inactive (it was//            already scaled to 0).// 2. The excess burst capacity is negative.if want == 0 || decider.Status.ExcessBurstCapacity < 0 || want == -1 && pa.Status.IsInactive() {logger.Infof("SKS should be in proxy mode: want = %d, ebc = %d, PA Inactive? = %v",want, decider.Status.ExcessBurstCapacity, pa.Status.IsInactive())mode = nv1alpha1.SKSOperationModeProxy}

0-1 扩容

第一步:指标采集
在POD数为0时,流量请求模式为Proxy 模式。这时候流量是通过 Activitor 接管的,在 Activitor 中,会根据请求数的指标信息,通过WebSockt调用 KPA中提供的指标接口,将指标信息发送给 KPA 中的 MetricCollector。
在 Activitor 中 main 函数中,访问 KPA 服务 代码实现如下

    // Open a WebSocket connection to the autoscaler.autoscalerEndpoint := fmt.Sprintf("ws://%s.%s.svc.%s%s", "autoscaler", system.Namespace(), pkgnet.GetClusterDomainName(), autoscalerPort)logger.Info("Connecting to Autoscaler at ", autoscalerEndpoint)statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger)go statReporter(statSink, ctx.Done(), statCh, logger)

通过 WebSockt 发送请求指标代码实现:

func statReporter(statSink *websocket.ManagedConnection, stopCh <-chan struct{},statChan <-chan []autoscaler.StatMessage, logger *zap.SugaredLogger) {for {select {case sm := <-statChan:go func() {for _, msg := range sm {if err := statSink.Send(msg); err != nil {logger.Errorw("Error while sending stat", zap.Error(err))}}}()case <-stopCh:// It's a sending connection, so no drainage required.statSink.Shutdown()return}}
}

第二步:根据指标计算 POD 数
在 Scale 方法中,根据 PUSH 获取的指标信息,计算出期望的POD数。修改 Decider 期望 POD 值,触发 PodAutoScaler 重新调和。
第三步:扩容
在KPA controller中,重新执行 reconcile 方法,执行 scaler 对当前Revision进行扩容操作。然后将流量模式切换成 Server 模式。最终实现 0-1 的扩容操作。

1-N 扩缩容

第一步:指标采集
在 POD 数不为0时,流量请求模式为 Server 模式。这时候会通过PULL 的方式访问当前 revision 中所有 POD queue proxy 9090 端口,拉取业务指标信息, 访问服务 URL 代码实现如下:

...
func urlFromTarget(t, ns string) string {return fmt.Sprintf("http://%s.%s:%d/metrics",t, ns, networking.AutoscalingQueueMetricsPort)
}

第二步:根据指标计算 POD 数
在 Scale 方法中,根据 PULL 获取的指标信息,计算出期望的POD数。修改 Decider 期望 POD 值,触发 PodAutoScaler 重新调和。
第三步: 扩缩容
在 KPA controller中,重新执行 reconcile 方法,执行 scaler 对当前Revision进行扩缩容操作。如果缩容为 0 或者触发突发流量场景,则将流量模式切换成 Proxy 模式。最终实现 1-N 扩缩容操作。

总结

相信通过上面的介绍,对Knative KPA的实现有了更深入的理解,了解了其实现原理不仅有助于我们排查相关的问题,更在于我们可以基于这样的扩缩容机制实现自定义的扩缩容组件,这也正是 Knative 自动扩缩容可扩展性灵魂所在。


原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/517043.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MongoDB 计划从“Data Sprawl”中逃脱

原文作者 | Adrian Bridgwater译者 |天道酬勤&#xff0c;责编 |晋兆雨头图 | CSDN 付费下载自视觉中国提供特定技术子集的软件供应商&#xff0c;喜欢用尽可能广泛的标签来提升自己&#xff0c;这是一种传达平台宽度和能力的方式。我们知道MongoDB以开源根数据库而闻名&#x…

FastMock

文章目录官网文档官网 官网&#xff1a;https://www.fastmock.site/#/ 文档 https://marvengong.gitee.io/fastmock/#/

关于在nw里使用require('printer')和nw.require('printer')报错的问题

公司项目为了兼容xp所以使用nw.js&#xff08;0.14.7-sdk&#xff09;&#xff0c;用到了printer模块&#xff08;第三方的c打印模块&#xff09;&#xff0c;在引入该模块的时候&#xff0c;使用了require导致一直报cannot find modul “.”&#xff0c;后来改用nw.require&am…

小蜜团队万字长文 | 讲透对话管理模型最新研究进展

对话管理模型背景 从人工智能研究的初期开始&#xff0c;人们就致力于开发高度智能化的人机对话系统。艾伦图灵&#xff08;Alan Turing&#xff09;在1950年提出图灵测试[1]&#xff0c;认为如果人类无法区分和他对话交谈的是机器还是人类&#xff0c;那么就可以说机器通过了…

mockjs

文章目录官网文档地址示例官网 &#xff1a;http://mockjs.com/ 文档地址 https://github.com/nuysoft/Mock/wiki/Getting-Started 示例 http://mockjs.com/examples.html

炸裂!谷歌这波操作,预警了什么?

我们都知道谷歌爸爸收购了Cask Data一家公司。长期以来&#xff0c;谷歌致力于推动围绕 GoogleCloud 的企业业务&#xff0c;但在这方面一直被亚马逊和微软吊打&#xff0c;这次的收购正是为了弥补自身的短板。被收购的 Cask Data 是一家专门提供基于Hadoop的大型数据分析服务解…

美团点评基于 Flink 的实时数仓平台实践

一、美团点评实时计算演进 美团点评实时计算演进历程 在 2016 年&#xff0c;美团点评就已经基于 Storm 实时计算引擎实现了初步的平台化。2017 年初&#xff0c;我们引入了 Spark Streaming 用于特定场景的支持&#xff0c;主要是在数据同步场景方面的尝试。在 2017 年底&am…

koa-generator 快速生成 koa2 服务的脚手架工具

文章目录1. 全局安装脚手架工具2. 执行生成3. 安装依赖4. 启动服务5. 默认的访问地址通常我们可以借助于脚手架&#xff0c;快速创建一个Koa2项目&#xff0c;当然也可以自己从头搭建&#xff1b;脚手架会帮我们提前搭好基本的架子 1. 全局安装脚手架工具 cnpm install -g koa…

轻松搭建基于 SpringBoot + Vue 的 Web 商城应用

首先介绍下在本文出现的几个比较重要的概念&#xff1a; 函数计算&#xff08;Function Compute&#xff09;: 函数计算是一个事件驱动的服务&#xff0c;通过函数计算&#xff0c;用户无需管理服务器等运行情况&#xff0c;只需编写代码并上传。函数计算准备计算资源&#xff…

股市中的Santa Claus Rally (圣诞节行情)

圣诞节行情 Santa Claus Rally Santa Claus Rally 是指 12 月 25 日圣诞节前后股市的持续上涨这样一个现象。《股票交易员年鉴》的创始人 Yale Hirsch 于 1972 年创造了这个定义&#xff0c;他将当年最后五个交易日和次年前两个交易日的时间范围定义为反弹日期。 根据 CFRA Re…

没想到!!Unicode 字符还能这样玩?

来源 | 程序通事责编 |晋兆雨头图 | CSDN 付费下载自视觉中国上周的时候&#xff0c;朋友圈的直升飞机不知道为什么就火了&#xff0c;很多朋友开着各种花式飞机带着起飞。图片来自网络还没来得及了解咋回事来着&#xff0c;这个直升飞机就????到的微博热搜。图片来自网络后…

为什么 APP 纷纷开发“暗黑模式”?优酷最佳实践总结

一、缘起 随着iOS 13和Android 10的正式发布&#xff0c;一个名词"暗黑模式(Dark Mode)"逐渐走入了大家的视野。各大APP都将暗黑模式的适配列入了开发日程&#xff0c;舆情上用户们对暗黑模式支持的呼声也非常的高。优酷主客也顺应时势&#xff0c;启动了相应的技术…

Mongo 安装、配置、启动 Windows

文章目录一、Mongo 安装1. Mongo 下载2. 安装3. 配置环境变量4. 验证5. 连接二、Mongo 配置2.1. 编辑mongod.cfg2.2. 修改dbPath2.3. systemLog路径2.4. 启动Mongod服务2.5. 安装MongoDB服务2.6. 验证一、Mongo 安装 1. Mongo 下载 下载Mongo数据库并安装 https://www.mongod…

混合云存储阵列与云存储网关的协同解决方案

前言 混合云存储阵列&#xff08;CSA&#xff09;于2017年云栖大会发布&#xff0c;上市2年多&#xff0c;已经被基因测序&#xff0c;医疗PACS&#xff0c;影视制作&#xff0c;非编&#xff0c;广电&#xff0c;视频监控等行业和场景的客户广泛采用。混合云存储阵列承载了用户…

炸裂!Google这波操作,预警了什么?

我们都知道谷歌爸爸收购了Cask Data一家公司。长期以来&#xff0c;谷歌致力于推动围绕 GoogleCloud 的企业业务&#xff0c;但在这方面一直被亚马逊和微软吊打&#xff0c;这次的收购正是为了弥补自身的短板。被收购的 Cask Data 是一家专门提供基于Hadoop的大型数据分析服务解…

Robo 3T 安装连接 MongoDB

文章目录1. 官网2.安装3. 连接 MongoDB1. 官网 https://robomongo.org 2.安装 3. 连接 MongoDB

何为真正的 FaaS ?阿里舜天平台做了四大创新

导读&#xff1a;数据中心和云计算的超高增速&#xff0c;AI、视频、基因测序等应用对于算力的无尽渴求和摩尔定律发展事实上已经停滞的现实&#xff0c;均给异构加速带来了巨大的应用潜力和商机。但 Faas 解决方案仍有较高的门槛&#xff0c;今天&#xff0c;我们一起了解 Faa…

SOA为什么不“香”了? | 大咖说中台

作者 | 耿立超责编 | 晋兆雨来源 | 《大数据平台架构与原型实现&#xff1a;数据中台建设实战》SOA 所有的理念都是基于现有应用系统展开的&#xff0c;不管是对服务的梳理还是服务之间的交互&#xff0c;都是以现有应用系统为载体的&#xff0c;中台不同于SOA 的地方在于&…

看到抖音上Python工程师晒得工资条,我沉默了......

我是个抖音中毒者闲来无事就喜欢刷抖音最近刷到了一个Python工程师的工资条然后我默默的打开看了然后就默默的关闭了如今Python技术由于大数据、人工智能的兴起Python也越来越火大家都纷纷学Python我不能跟你确保说学完Python你就能拿高工资但是你学完Python肯定有饭吃说不定还…

不用 H5,闲鱼 Flutter 如何玩转小游戏?

阿里妹导读&#xff1a;最近APP游戏化成为了一个新的风口&#xff0c;把在游戏中一些好玩的、能吸引用户的娱乐方式或场景应用在应用当中&#xff0c;以达到增加用户粘性&#xff0c;提升DAU的效果&#xff0c;成本较低。同时在一些需要对用户有引导性的场景&#xff0c;游戏化…