文章目录
- 本系列
- 前言
- 数据结构
- 对外提供接口
- 初始化令牌桶
- 获取令牌
- 核心方法
- adjustavailableTokens
- currentTick
- take
- TakeAvailable
- Wait系列
本系列
- 开源限流组件分析(一):juju/ratelimit(本文)
- 开源限流组件分析(二):uber-go/ratelimit
- 开源限流组件分析(三):golang-time/rate
前言
这篇文章分析下go开源限流组件juju-ratelimit的使用方式和源码实现细节
源码地址:https://github.com/juju/ratelimit 版本:v1.0.2
其提供了一种高效的令牌桶限流实现
令牌桶相比于其他限流算法(如漏桶算法)的一个显著优势,就是在突发流量到来时,可以短时间内提供更多的处理能力,以应对这些额外的请求
直观上来说,令牌桶算法可以实现为:桶用channel实现
- 在后台每隔一段固定的时间向桶中发放令牌
- 要获取令牌时,从channel取数据
后台定时填充令牌:
func Start(bucket chan struct{}, interval time.Duration) {go func() {ticker := time.NewTicker(interval)defer ticker.Stop()// 每隔interval时间往channel中塞一个令牌for range ticker.C {select {// 放令牌case bucket <- struct{}{} :// 桶满了,丢弃default:}}}()}
业务请求要获取获取令牌时(非阻塞式):
func GetToken(bucket chan struct{}) bool {select {case <- bucket:return truedefault:return false}
}
但这样有多少容量就要开多少空间,对内存不友好
更好的方式是只用一个int变量availableTokens
维护桶中有多少token:
- 每次有请求要获取令牌时,先根据
当前时间now
减去上次获取令牌的时间last
,计算因为这段时间流逝,应该给桶中补充多少令牌,并加到availableTokens
中 - 如果
availableTokens
< 本次请求的令牌数request
,说明令牌不够。否则令牌够,放行请求,并根据本次需求的令牌数在桶中扣减 :availableTokens -= request
数据结构
type Bucket struct {clock Clock// bucket创建时间,仅初始化一次,用于计算时间相对偏移量startTime time.Time// 令牌桶最大容量capacity int64// 每次放入多少个令牌quantum int64// 每次放入令牌桶的间隔fillInterval time.Durationmu sync.Mutex// 当前桶中有多少token,注意:可能为负值availableTokens int64// 上一次访问令牌桶的ticklatestTick int64
}
注意:桶中剩余令牌availableTokens
可能为负值,至于为啥在下文分析扣减令牌流程时说明
对外提供接口
初始化令牌桶
其提供了几种根据不同需求初始化令牌桶的方法:
每fillInterval
时间放1个令牌,桶容量为capacity
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
每fillInterval
时间放入quantum
个令牌
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket
这两个方法都是简单设置Bucket的字段,比较简单
每秒放入rate
个令牌:
func NewBucketWithRate(rate float64, capacity int64) *Bucket
其构造方法如下:主要是需要根据参数rate
推算出tb.fillInterval
以及tb.quantum
:
func NewBucketWithRateAndClock(rate float64, capacity int64, clock Clock) *Bucket {tb := NewBucketWithQuantumAndClock(1, capacity, 1, clock)// 每次放入令牌数quantum从1开始,每轮 * 1.1for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) {// 假设quantum=1, rate=10,即每次放1个,每秒放10个 => 则放令牌的间隔是0.1sfillInterval := time.Duration(1e9 * float64(quantum) / rate)if fillInterval <= 0 {continue}tb.fillInterval = fillIntervaltb.quantum = quantum// 误差小于0.01就返回tbif diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin {return tb}}}
虽然这里用了for循环寻找最合适的fillInterval和quantum,但只要rate小于10亿,都会在执行第一轮循环后退出
所以可以近似看做quantum=1
,fillInterval=1e9 / rate
例如:quantum=1,当要求 rate=10,即每次放1个,每秒放10个时,计算出来放令牌的时间间隔时是 0.1s
nextQuantum方法:
// 每次增大1.1倍
func nextQuantum(q int64) int64 {q1 := q * 11 / 10if q1 == q {q1++}return q1
}
Rate方法:根据quantum
和fillInterval
,计算每秒放入多少个令牌
func (tb *Bucket) Rate() float64 {// 举个例子:假设每次放入2个,每500ms放一个 => 每秒就放4个return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
}
获取令牌
非阻塞获取count
个令牌,返回的Duration
表示调用者需要等待的时间才能获得令牌
func (tb *Bucket) Take(count int64) time.Duration
如果在maxWait
时间内能获取到count
个令牌就获取,否则不获取
返回获取成功时需要等待的时间,是否获取成功
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
非阻塞获取最多count
个令牌,桶中还有多少获取多少,返回实际获得的令牌数(可能小于count)
func (tb *Bucket) TakeAvailable(count int64) int64
获取count
个令牌,并在方法内部等待直到获取到令牌
func (tb *Bucket) Wait(count int64)
只有当最多阻塞等待maxWait
时间,能获取到count
个令牌时才等待,并在内部等待。如果不能返回false
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool
这5个方法中,业务上实用的是WaitMaxDuration
,毕竟被限流的请求不能无限等下去
核心方法
adjustavailableTokens
将桶中的令牌数调整为tick时间的令牌数,相当于给桶补充token
-
计算在
tick
与上一次lastTick
之间能够生产多少个令牌- 一般是计算从上次请求到本次请求中产生的令牌数
-
追加令牌到桶中
func (tb *Bucket) adjustavailableTokens(tick int64) {lastTick := tb.latestTicktb.latestTick = tickif tb.availableTokens >= tb.capacity {return}// 补充令牌tb.availableTokens += (tick - lastTick) * tb.quantumif tb.availableTokens > tb.capacity {tb.availableTokens = tb.capacity}return
}
其参数tick如何计算的?调currentTick方法
currentTick
计算当前时间与开始时间(桶的初始化时间)之间,需要放入多少次令牌
func (tb *Bucket) currentTick(now time.Time) int64 {return int64(now.Sub(tb.startTime) / tb.fillInterval)
}
用于计算当前到哪个tick了
take
Take系列的方法,底层会调到take方法:
Take:
func (tb *Bucket) Take(count int64) time.Duration {tb.mu.Lock()defer tb.mu.Unlock()// 可以等待无限大的时间d, _ := tb.take(tb.clock.Now(), count, infinityDuration)return d
}
TakeMaxDuration:
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {tb.mu.Lock()defer tb.mu.Unlock()return tb.take(tb.clock.Now(), count, maxWait)
}
差别在于Take会等待无限大的时间,直到拿到token。TakeMaxDuration最多只会等待maxWait时间
take是获取令牌的核心方法,其流程如下:
- 计算出当前时刻可用的令牌数,并补充到令牌桶中
- 如果当前令牌桶存量够,在桶中扣减令牌
- 如果当前令牌桶存量不够,在桶中预扣减令牌,并返回需要等待的时间
waitTime
/**
当前时间是now
要获取count个令牌
最多等待maxWait时间
*/
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {if count <= 0 {return 0, true}tick := tb.currentTick(now)// 给桶补充tokentb.adjustavailableTokens(tick)avail := tb.availableTokens - count// 当前桶中的令牌就能满足需求,直接返回if avail >= 0 {tb.availableTokens = availreturn 0, true}// 到这avail是负数,-avail就是还需要产生多少个令牌,// 要产生avail个令牌,还需要多少个tick(向上取整)// 再加上当前tick,就是能满足需求的tickendTick := tick + (-avail+tb.quantum-1)/tb.quantum// 需要等到这个时间才有令牌endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)// 需要等待的时间waitTime := endTime.Sub(now)// 如果需要等待的时间超过了最大等待时间,则不等待,也不扣减,直接返回if waitTime > maxWait {return 0, false}// 这里将availableTokens更新为负值tb.availableTokens = avail// 返回要等待的时间,且已经在桶里预扣减了令牌return waitTime, true
}
怎么理解桶中的availableTokens
变为负值?表示有请求已经提前预支了令牌,相当于欠账
-
之后有请求要获取令牌时,需要先等时间流逝,把这些账还了,才能获取成功
- TakeAvailable方法
-
当然也可以在之前已欠账的基础上继续欠账,这样要等待更久的时间才能获取令牌成功
- Take,TakeMaxDuration方法
例如:令牌桶每秒产生1个令牌,假设在**t1时刻**桶中已经没有令牌了
- 请求A在t1时刻调Take()获取5个令牌,将
availableTokens
更新为-5,并返回5s,表示需要等待5s
才能让请求放行 - 时间过去
1s
,此时时刻t2 = t1 + 1s
- 请求B在t2时刻调Take()获取5个令牌,首先因为时间流逝,将availableTokens更新为-4。再将availableTokens更新为
-4 - 5 = -9
,也就是继续欠账。返回9s,表示要等待9s
才能让请求放行
TakeAvailable
非阻塞获取最多count个令牌,桶中还有多少获取多少,返回实际获得的令牌数(可能小于count)
func (tb *Bucket) TakeAvailable(count int64) int64 {tb.mu.Lock()defer tb.mu.Unlock()return tb.takeAvailable(tb.clock.Now(), count)
}// 返回可用的令牌数(可能小于count),如果没可用的令牌,将返回0,不会阻塞
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {if count <= 0 {return 0}// 基于当前时间,给桶补充令牌tb.adjustavailableTokens(tb.currentTick(now))if tb.availableTokens <= 0 {return 0}// 获取max(count, availableTokens)个令牌,也就是有多少就获取多少if count > tb.availableTokens {count = tb.availableTokens}tb.availableTokens -= countreturn count
}
Wait系列
Wait系列的两个方法Wait和WaitMaxDuration就是对Take的封装,也就是如果需要等待,在Wait方法内部等待
func (tb *Bucket) Wait(count int64) {if d := tb.Take(count); d > 0 {// 在内部等待tb.clock.Sleep(d)}
}func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool {d, ok := tb.TakeMaxDuration(count, maxWait)if d > 0 {tb.clock.Sleep(d)}return ok
}