开源限流组件分析(三):golang-time/rate

文章目录

  • 本系列
  • 前言
  • 提供获取令牌的API
  • 数据结构
  • 基础方法
    • tokensFromDuration
    • durationFromTokens
    • advance
  • 获取令牌方法
    • reverseN
    • 其他系列API
  • 令人费解的CancelAt
    • 是bug吗
  • 取消后无法唤醒其他请求

本系列

  • 开源限流组件分析(一):juju/ratelimit
  • 开源限流组件分析(二):uber-go/ratelimit
  • 开源限流组件分析(三):golang-time/rate (本文)

前言

根据开源限流组件分析(一):juju/ratelimit的分析,令牌桶限流的实现没必要真的准备一个桶,定时往里塞令牌,然后每次获取令牌时从桶中弹出,但这样做对内存不友好,需要开辟桶最大容量大小的空间

最佳做法是利用 token 数可以和时间跨度相互转化的原理,只用维护一个桶中当前令牌数的变量,每次消费前才根据时间差计算并更新 Token 数目

本文将分析go官方库提供的限流工具的使用及实现细节,也是令牌桶算法。并对下面这个问题给出合理的解释:在CancelAt方法中,为啥归还令牌时不是归还所有令牌,而是要扣减一个值

本文基于源码:https://github.com/golang/time.git,版本:v0.7.0

提供获取令牌的API

  • Wait/WaitN:当没有足够的Token时,将在内部阻塞等待直到Token足够,或超时取消

    • 工程中推荐使用该方法,参数中有ctx,和ctx配合能优雅实现超时控制
  • Allow/AllowN:当前时刻没有足够的Token时,返回false。否则获取到令牌,返回成功

  • Reverse/ReverseN:当没有足够的Token时,返回 Reservation 对象,里面包含是否获取成功,以及要等多久才能放行请求

    • 可控性最好,可以选择等待到规定的时间然后放心请求,也可以调Reservation.Cancel() 取消等待,归还令牌

数据结构

type Limit float64type Limiter struct {mu sync.Mutex// 每秒产生多少个tokenlimit Limit// 桶大小burst int// 桶中剩余的token数,可以为负数,表示被某些请求预定了一些未来的令牌tokens float64// 最近一次推进令牌桶的时间last time.Time// 令牌可以满足最近一次请求的时间lastEvent time.Time
}

特别说明这两个字段:

  • last:表示最近一次推进令牌桶的时间,也就是每次调获取令牌api的时间,因此每次获取令牌都会推进令牌桶的时间

    • 这样每次获取令牌时,都会看从last到now这段时间之间产生了多少新的令牌
  • lastEvent:表示令牌桶中最近一次请求能放行的时间。如果当前时间调获取令牌api时就能满足,就是当前时间,否则是未来的某个时间

    • 只用于CancelAt计算归还多少个令牌,可以到后面再理解

基础方法

tokensFromDuration

计算在时长d内,可以生成多少个令牌

也就是 速度 * 时间 = 总量

func (limit Limit) tokensFromDuration(d time.Duration) float64 {if limit <= 0 {return 0}return d.Seconds() * float64(limit)
}

durationFromTokens

计算生成tokens个令牌,需要多少时间

也就是 总数 / 速度 = 时间

func (limit Limit) durationFromTokens(tokens float64) time.Duration {if limit <= 0 {return InfDuration}seconds := tokens / float64(limit)return time.Duration(float64(time.Second) * seconds)
}

advance

计算如果时间推进到t后,桶中可用的令牌个数

注意:该方法只计算,不会更新桶中的令牌数

func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {last := lim.lastif t.Before(last) {last = t}// 过去了多长时间elapsed := t.Sub(last)// 计算过去这段时间,一共产生多少tokendelta := lim.limit.tokensFromDuration(elapsed)tokens := lim.tokens + delta// tokens不能超过最大容量if burst := float64(lim.burst); tokens > burst {tokens = burst}return t, tokens
}

获取令牌方法

reverseN

Wait系列,Allow系列,Reserve系列的方法,底层都调了reverseN,其流程如下:

  1. 计算如果时间推进到now,桶中可用的 token 数

  2. 计算本次消费后,桶还能剩能下多少token

  3. 如果 token < 0, 说明目前的token不够,需要等待一段时间

  4. 只有同时满足下面两个条件时,才获取令牌成功

    1. n <= lim.burst:申请的token数量没有超过了桶的容量大小
    2. waitDuration <= maxFutureReserve: 需要等待的时间 <=用户期望的时间

注意如果桶中令牌数不够,需要等待直到桶中令牌产生为止

此时需要将桶中令牌数置为负值,表示有请求欠了账,占了位

如果接下来有别的请求来获取令牌,会在之前欠账的基础上继续欠账,让令牌数的负值更大,需要等待更长的时间

通过时间流逝来还这些债,这一点和另一个令牌桶开源限流组件分析(一):juju/ratelimit很类似

// 当前时t时刻,想获取n个令牌,最多等maxFutureReserve时间
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {lim.mu.Lock()defer lim.mu.Unlock()// 可以忽略这个分支,因为用了限流器就肯定要有速率限制if lim.limit == Inf {return Reservation{ok:        true,lim:       lim,tokens:    n,timeToAct: t,}}// 如果时间推进到now,可用的 token 数t, tokens := lim.advance(t)// 本次消费后,桶还能剩能下多少tokentokens -= float64(n)// 如果 token < 0, 说明目前的token不够,需要等待一段时间var waitDuration time.Durationif tokens < 0 {waitDuration = lim.limit.durationFromTokens(-tokens)}// n <= lim.burst:申请的 token 没有超过了桶的大小// waitDuration <= maxFutureReserve: 需要等待的时间 <=用户期望的时间ok := n <= lim.burst && waitDuration <= maxFutureReserver := Reservation{ok:    ok,lim:   lim,limit: lim.limit,}// 能获取到令牌if ok {// 本次获取了多少tokenr.tokens = n// 表示需要等待到这个时刻才能获得期望数量的token(当然 waitDuration 有可能为 0,就是立即满足,timeToAct就是now)r.timeToAct = t.Add(waitDuration)// 更新推进令牌桶的时间lim.last = t// 就扣减令牌,更新桶中token的数量lim.tokens = tokens// 桶中可以满足最近一次请求的时间lim.lastEvent = r.timeToAct}return r
}

其他系列API

allowN:调reserveN时传maxFutureReserve=0,表示只有不等待就能获得时,才获取令牌

func (lim *Limiter) AllowN(t time.Time, n int) bool {return lim.reserveN(t, n, 0).ok
}

ReserveN:直接调reserveN

func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {r := lim.reserveN(t, n, InfDuration)return &r
}

WaitN:根据ctx计算要获取令牌最多等待多久,并以这个时间最为最大等待时间调reverseN。然后在内部sleep

  1. 先检查ctx是否已取消,如果已取消直接返回

  2. 根据ctx还剩多久过期,来决定在reverseN中最多等待多久

    1. 换句话说,如果判断直到ctx过期都无法获取令牌,就不等了
  3. 获取到Reservation后,看 1)根据Reservation计算还有多久能获取令牌的时间delay,和2) ctx被取消,这两件事谁先发生:

    1. 如果delay先过完:获取令牌成功,可以执行请求
    2. 如果ctx先被取消:本次获取令牌失败,需要尽可能归还桶中的令牌

正常来说,只要不是外部主动取消,都是delay先过完

func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {// 其实就是:time.NewTimer(d)newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {timer := time.NewTimer(d)return timer.C, timer.Stop, func() {}}return lim.wait(ctx, n, time.Now(), newTimer)
}func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {lim.mu.Lock()burst := lim.burstlimit := lim.limitlim.mu.Unlock()// 不能超过桶容量if n > burst && limit != Inf {return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)}// 先检查下ctx是否已取消select {case <-ctx.Done():return ctx.Err()default:}// 获取令牌的最大等待时间根据ctx计算waitLimit := InfDurationif deadline, ok := ctx.Deadline(); ok {waitLimit = deadline.Sub(t)}r := lim.reserveN(t, n, waitLimit)if !r.ok {return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)}// 该Reservation还需要等多久才能获得令牌delay := r.DelayFrom(t)if delay == 0 {return nil}ch, stop, _ := newTimer(delay)defer stop()select {case <-ch:// ctx超时或被取消之前,就获得令牌了return nilcase <-ctx.Done():// 在获得令牌之前,ctx就被取消了,需要归还令牌到桶中r.Cancel()return ctx.Err()}
}

令人费解的CancelAt

最后看看CancelAt:取消本次持有,尽可能归还持有的令牌

为啥需要归还令牌?因此本次实际上没有放行请求,而是被cancel掉了,可以把令牌放回桶中,供接下来的请求使用

// 参数t一般是当前时间
func (r *Reservation) CancelAt(t time.Time) {if !r.ok {return}r.lim.mu.Lock()defer r.lim.mu.Unlock()/**1.如果产生令牌的速率无限大,那没必要归还令牌2.自己没有拿到token,也不用归还3.预期获得时间比当前时间早,这里认为不用归还*/if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {return}// 如果只有有更新的请求请求了令牌,要归还的令牌数减去// r.lim.lastEvent.Sub(r.timeToAct) 这期间产生的令牌restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))if restoreTokens <= 0 {return}// 推进桶的时间t, tokens := r.lim.advance(t)tokens += restoreTokensif burst := float64(r.lim.burst); tokens > burst {tokens = burst}r.lim.last = t// 归还tokenr.lim.tokens = tokens// 如果相等,说明后面没有新的token消费if r.timeToAct == r.lim.lastEvent {// 上一次的lastEventprevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))if !prevEvent.Before(t) {// 恢复lastEvent为上一次的lastEventr.lim.lastEvent = prevEvent}}
}

是bug吗

其中比较费解的是这一行:为啥要多减去这一部分

restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
  • r.TimeToAct:本次Reservation可以放行请求的时间,也是能使用令牌的时间
  • r.lim.lastEvent:令牌桶中,最近一次请求的TimeToAct

在归还令牌时,要减掉这段时间范围能产生的令牌数

对应于下面这个场景:

在这里插入图片描述

这行代码有注释:

// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.

但只是解释了在干啥:在r之后获取的令牌不应该被重新放回桶中

而没有解释为啥要这么干


有人在stackoverflow提问,归还所有持有的token是否应该改成restoreTokens = float64(r.tokens)

在这里插入图片描述

也有人在官方仓库提issue,举了这样一个例子:

func TestLimit(t *testing.T) {t0 := time.Now()t1 := time.Now().Add(100 * time.Millisecond)t2 := time.Now().Add(200 * time.Millisecond)t3 := time.Now().Add(300 * time.Millisecond)// 桶每秒产生10个令牌,容量=20l := NewLimiter(10, 20)l.ReserveN(t0, 15) // 桶里还剩 5 个 tokenfmt.Printf("%+v\n", l)// t1:时间过去100ms,桶里增加1个,变成6个// 消费10个,变成-4个r := l.ReserveN(t1, 10)// 此时timeToAct:500msfmt.Printf("%+v\n", l)// t2:时间过去100ms,桶里增加1个,变成-3个// 消费2个, 桶里变成-5个l.ReserveN(t2, 2)// 此时timeToAct:700msfmt.Printf("%+v\n", l)// t3: 时间过去100ms,桶里增加1个,变成-4个// r在t3取消,变成4(归还8个)个,按常理是变成6(归还10个)个r.CancelAt(t3)fmt.Printf("%+v\n", l)
}

最后一行:

  • 按常理来说,应该把所有令牌都归还,也就是归还10个,这样桶中的令牌数变成6
  • 实际上只归还了8个,桶中的令牌数变成4。就是那行令人费解的代码造成的

有人在issue下面给出解释:

在这里插入图片描述

意思就是:如果把保留的令牌数全部还回去的话,会造成在某一时刻突破令牌桶的速率限制

拿上面例子说明:

如果在t3时刻取消r时,如果把所有的令牌都归还了,也就是归还10个,此时桶中的令牌数变成6

那么再过400ms,在t2时刻获取的Reservation r2就可以开始执行,同时现在桶中有10个令牌!

如果此时有请求来获取这10个令牌,算上刚刚开始执行的r2,那就相当于同时有加起来需要消耗12个令牌的请求在执行!超过了最大容量10的限制


而按照源码中的逻辑扣减,在t3时刻取消r时,只归还8个,此时桶中的令牌数变成4

那么再过400ms,在t2时刻获取的Reservation r2就可以开始执行,同时现在桶中有8个令牌

如果此时有请求来获取这8个令牌,算上刚刚开始执行的r2,那就会同时加起来需要消耗10个令牌的请求在执行,刚好不超过桶中的最大容量的限制

也就是说,如果r后面有更新的请求rnew,rnew.TimeToAct赋值给了r.limit.lastEvent

那么从r.TimeToActr.limit.lastEvent之间能产生的令牌是不能归还的,而是要等时间流逝自然填充

这样当rnew可以执行时,桶中的令牌数 + rnew获取的令牌数才不会超过桶的容量


取消后无法唤醒其他请求

这个库有个小问题:如果请求A在请求B之前发生,都需要等待一段时间。如果任务A提前cancel了,使得桶中的令牌满足请求B的需求,请求B也不会被唤醒,而是等待其预定的时间到了再执行

例如下面的例子:

func TestBug(t *testing.T) {// 每100ms产生一个令牌,最大容量=10l := NewLimiter(10, 10)t1 := time.Now()// 先把桶中的初始令牌用完l.ReserveN(t1, 10)var wg sync.WaitGroupctx, cancel := context.WithTimeout(context.TODO(), time.Hour)defer cancel()wg.Add(1)wg.Add(2)go func() {defer wg.Done()// 如果要等,这个要等 1s才能执行,但是我们的 ctx 200ms 就会取消l.WaitN(ctx, 10)fmt.Printf("[1] cost: %s\n", time.Since(t1))}()go func() {defer wg.Done()// 模拟出现问题, 200ms就取消了time.Sleep(200 * time.Millisecond)cancel()}()time.Sleep(100 * time.Millisecond)go func() {defer wg.Done()// 正常情况下,这个要等 1.2 s 才能执行,// 但是我们前面都取消了,按理说在400ms时,桶中令牌数就够了,这个可以执行// 但实际上没有唤醒,等到1.2才执行ctx, cancel := context.WithTimeout(context.Background(), time.Hour)defer cancel()l.WaitN(ctx, 2)fmt.Printf("[2] cost: %s\n", time.Since(t1))}()wg.Wait()
}
  1. 时刻0ms:请求A获取10个token,需要等1000ms,桶中tokens = -10
  2. 时刻100ms:请求B获取2个token,需要等1200ms,桶中tokens = -11
  3. 时刻200ms:请求A取消,往桶中归还8个token,桶中tokens = -2
  4. 时刻400ms:理想情况下,此时请求B可以执行,因为桶中容量够了
  5. 时刻1.2s:实际上,请求B此时才可以执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/57152.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java之继承抽象类用法实例(三十一)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…

git命令笔记(速查速查)

git命令功能总结 1.创建git的本地仓库2. 配置本地仓库(name和email地址)3. 工作区、版本库、暂存区、对象区3.1 add, commit3.2 打印提交日志3.2 修改文件 4.版本回退&#xff08;git reset&#xff09;5. 撤销修改&#xff08;在push之前撤销&#xff09;6.删除版本库中的文件…

SSM框架学习(七、MyBatis-Plus高级用法:最优化持久层开发)

目录 一、MyBatis-Plus快速入门 1.简介 2.快速入门 二、MyBatis-Plus核心功能 1.基于Mapper接口CRUD &#xff08;1&#xff09;Insert 方法 &#xff08;2&#xff09;Delete方法 &#xff08;3&#xff09;Update 方法 &#xff08;4&#xff09;Select方法 2.基于Serv…

Chrome DevTools 三: Performance 性能面板扩展—— 性能优化

Performance 性能 &#xff08;一&#xff09;性能指标 首次内容绘制 (First Contentful Paint&#xff0c;FCP)&#xff1a; 任意内容在页面上完成渲染的时间 最大内容绘制 (Largest Contentful Paint&#xff0c;LCP)&#xff1a; 最大内容在页面上完成渲染的时间 第一字节…

283.移动零

目录 题目解法解释&#xff1a; .reverse()怎么用的&#xff1f;Char 13: error: no matching function for call to reverse 什么是双指针&#xff1f;双指针的常见类型&#xff1a;总结&#xff1a; 题目 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末…

88.【C语言】文件操作(5)

目录 文件的随机读写 1.fseek函数 代码示例 运行结果 2.ftell函数 代码示例 运行结果 3.rewind函数 代码示例 运行结果 承接79.【C语言】文件操作(4)文章 文件的随机读写 1.fseek函数 声明:int fseek ( FILE * stream, long int offset, int origin ); 格式:fsee…

APM 3.0.0|二次元味很冲的B站音乐软件

APM是一款专为B站音频设计的第三方播放器&#xff0c;支持从B站获取音频内容&#xff0c;提供桌面小组件&#xff0c;多语言支持&#xff0c;以及针对Android系统的优化。下载安装APK后打开应用&#xff0c;登录B站账号&#xff0c;浏览并播放音频内容。 大小&#xff1a;73M …

13分+文章利用scRNA-Seq揭示地铁细颗粒物引起肺部炎症的分子机制

写在前面 人们乘坐地铁时&#xff0c;不可避免地在地铁站台上吸入细颗粒物&#xff08;PM2.5&#xff09;&#xff0c;但PM2.5对人体又有哪些危害呢&#xff0c;今天和大家分享一篇文章&#xff0c;题目为“单细胞转录组学揭示吸入地铁细颗粒物引起的肺部炎症”&#xff0c;作…

《AI生成式工具使用》之:AI文本生视频(二战!)

目录 背景说明及目标 尝试练手 1、豆包AI之图片生成 总结&#xff1a;豆包AI生成的图片&#xff0c;不太能看细节&#xff0c;涉及到中文的基本上不能细看都是类似乱码的东西&#xff0c;有明显的逻辑性问题&#xff08;比如不符合道路交规&#xff09;。需要根据生成的结果…

Java-继承与多态-上篇

关于类与对象&#xff0c;内容较多&#xff0c;我们分为两篇进行讲解&#xff1a; &#x1f4da; Java-继承与多态-上篇&#xff1a;———— <就是本篇> &#x1f4d5; 继承的概念与使用 &#x1f4d5; 父类成员访问 &#x1f4d5; super关键字 &#x1f4d5; supe…

laravel 查询数据库

数据库准备 插入 三行 不同的数据 自行搭建 laravel 工程 参考 工程创建点击此处 laravel 配置 数据库信息 DB_CONNECTIONmysql #连接什么数据库 DB_HOST127.0.0.1 # 连接 哪个电脑的 ip &#xff08;决定 电脑 本机&#xff09; DB_PORT3306 # 端口 DB_DATABASEyanyu…

【记录】VSCode|自用设置项

文章目录 1 基础配置1.1 自动保存1.2 编辑区自动换行1.3 选项卡换行1.4 空格代替制表符1.5 开启滚轮缩放 2 进阶设置2.1 选项卡不自我覆盖2.2 选项卡限制宽度2.3 选项卡组限制高度2.4 字体设置2.5 字体加粗2.6 侧边栏2.7 沉浸式代码模式 Zen Mode2.8 设置 Zen 模式的选项卡组 3…

filebeat接入nginx和mysql获取日志

下载nginx &#xff08;1&#xff09; 直接下载 yum install nginx -y&#xff08;2&#xff09;查看状态启动 systemctl start nginx systemctl status nginx&#xff08;3&#xff09;配置文件检查 nginx -t&#xff08;4&#xff09;端口检查 netstat -tulpn | grep :80&am…

Flutter项目打包ios, Xcode 发布报错 Module‘flutter barcode_scanner‘not found

报错图片 背景 flutter 开发的 apple app 需要发布新版本&#xff0c;但是最后一哆嗦碰到个报错&#xff0c;这个小问题卡住了我一天&#xff0c;之间的埪就不说了&#xff0c;直接说我是怎么解决的&#xff0c;满满干货 思路 这个报错 涉及到 flutter_barcode_scanner; 所…

携手并进,智驭教育!和鲸科技与智谱 AI 签署“101 数智领航计划”战略合作协议

近日&#xff0c;上海和今信息科技有限公司&#xff08;以下简称“和鲸科技”&#xff09;与北京智谱华章科技有限公司&#xff08;以下简称“智谱 AI”&#xff09;签署“101 数智领航计划”战略合作协议。双方将携手营造智能化学科教育与科研环境&#xff0c;提供多种大模型工…

前后端联调需要改ip联调多个后端,用nginx代理

前后端联调需要改ip联调多个后端 Nginx #user nobody; worker_processes 1;#error_log logs/error.log; #error_log logs/error.log notice; #error_log logs/error.log info;#pid logs/nginx.pid;events {worker_connections 1024; }http {include mime…

空洞卷积:Atrous/Dilated convolution - 语义分割中多用到

没办法&#xff0c;我还是很多基础的、底层的模块不通透&#xff0c;读论文难免会受到阻碍&#xff0c;而且这现在科研任务很急了&#xff0c;必须要马上动手实验&#xff0c;全给我弄明白、特别是算法&#xff01; 空洞卷积-可变形卷积-这一个个我都要。 空洞卷积据说在语义分…

MySQL企业常见架构与调优经验分享

文章目录 一、选择 PerconaServer、MariaDB 还是 MYSQL二、常用的 MYSQL 调优策略三、MYSOL 常见的应用架构分享四、MYSOL 经典应用架构 观看学习课程的笔记&#xff0c;分享于此~ 课程&#xff1a;MySQL企业常见架构与调优经验分享 mysql官方优化文档 一、选择 PerconaServer、…

机器学习与深度学习2:梯度下降算法和BP反向传播算法

梯度下降算法&#xff1a; 算法原理 上一章我们已知神经网络算法就是求解拟合函数&#xff0c;通过线性变换和非线性变换来得出损失函数最小的模型。那么是如何进行求解的呢&#xff0c;其中之一就是梯度下降算法。 如图&#xff0c;当我们需要求解拟合曲线时&#xff0c;如何…

Verilog基础:层次化标识符的使用

相关阅读 Verilog基础https://blog.csdn.net/weixin_45791458/category_12263729.html?spm1001.2014.3001.5482 一、前言 Verilog HDL中的标识符(identifier)是一个为了引用而给一个Verilog对象起的名字&#xff0c;分为两大类&#xff1a;普通标识符大类和层次化标识符大类。…