1. time/rate限速器使用
- 令牌桶限流算法
- rate.NewLimiter(limit,burst)产生一个新的限速器
- limit表示每秒产生token数、burst表示最多存token数
- Allow判断当前是否可以取到token
- Wait阻塞等待直到取到token
- Reverse返回等待时间(预估的等待时间),再去取token
package mainimport ("context""golang.org/x/time/rate""log""testing""time"
)func Test_RateLimiter(t *testing.T) {l := rate.NewLimiter(1, 5)log.Println(l.Limit(), l.Burst())for i := 0; i < 10; i++ {//阻塞等待直到,取到一个tokenlog.Println("before Wait")c, _ := context.WithTimeout(context.Background(), time.Second*2)if err := l.Wait(c); err != nil {log.Println("limiter wait err:" + err.Error())}log.Println("after Wait")//返回需要等待多久才有新的token,这样就可以等待指定时间执行任务r := l.Reserve()log.Println("reserve Delay:", r.Delay())//判断当前是否可以取到tokena := l.Allow()log.Println("Allow:", a)log.Println("======================")}
}
2. time/rate源码原理
- 计算上次请求和当前请求时间差
- 计算时间差内生成的token数+旧token数
- 如果token为负,则计算等待时间
- token为正,则请求后token-1
type Limit float64type Limiter struct {limit Limit//每秒产生的token数burst int//桶的总大小mu sync.Mutex//锁tokens float64//token总数last time.Time//上一次更新token的时间lastEvent time.Time//最后一次限速的时间
}
Allow、Reverse、Wait三个方法底层调用的都是func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation
// reserveN 是 AllowN、ReserveN 和 WaitN 的辅助方法。
// maxFutureReserve 指定了允许的最大预订等待时间。
// reserveN 返回 Reservation(而不是 *Reservation),以避免在 AllowN 和 WaitN 中进行分配。
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {// 加锁,保护临界区lim.mu.Lock()// 如果每秒产生的token数为无限,则无需预订直接返回if lim.limit == Inf {lim.mu.Unlock()return Reservation{ok: true, // 预订成功lim: lim, // 当前限流器tokens: n, // 预订的令牌数timeToAct: now, // 立即生效}}// 更新当前时间、上次时间和现在可用令牌数now, last, tokens := lim.advance(now)// 计算请求n个tokens后的剩余令牌数tokens -= float64(n)// 计算等待时长var waitDuration time.Durationif tokens < 0 {// 如果令牌不够,需要等待的时间waitDuration = lim.limit.durationFromTokens(-tokens)}// 判断预订是否成功,请求的n是否小于等于桶的容量,且等待时间是否小于用户给的最大实践ok := n <= lim.burst && waitDuration <= maxFutureReserve// 准备预订结果r := Reservation{ok: ok, // 预订是否成功lim: lim, // 当前限流器limit: lim.limit, // 当前限流器的限制}if ok {r.tokens = n // 成功预订的令牌数r.timeToAct = now.Add(waitDuration) // 生效时间}// 更新限流器状态if ok {lim.last = now // 更新上次预订时间lim.tokens = tokens // 更新剩余令牌数lim.lastEvent = r.timeToAct // 更新上次事件时间} else {lim.last = last // 未成功则恢复上次时间}// 解锁lim.mu.Unlock()return r // 返回预订结果
}// advance 计算并返回基于时间推移的 lim 的更新状态。
// lim 自身的状态不会被改变。
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {last := lim.lastif now.Before(last) {// 如果 now 比 last 还早,则使用 now 作为 lastlast = now}// 避免 last 非常久远时导致 delta 溢出。// 计算多久后这个桶会自动填满maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)elapsed := now.Sub(last)if elapsed > maxElapsed {// 如果实际时间间隔超过最大允许间隔,调整为最大间隔,避免由于非常大的 elapsed 造成溢出或不合理的计算。elapsed = maxElapsed}// 计算由于时间推移增加的令牌数delta := lim.limit.tokensFromDuration(elapsed)tokens := lim.tokens + deltaif burst := float64(lim.burst); tokens > burst {// 如果计算得到的令牌数超过了 burst,则限制为 bursttokens = burst}// 返回更新后的时间 now, 上次时间 last 以及新的令牌数 tokensreturn now, last, tokens
}// tokensFromDuration 是一个单位转换函数,
// 用于将时间段转换为在该时间段内以每秒 limit 个令牌的速率
// 可积累的令牌数。
func (limit Limit) tokensFromDuration(d time.Duration) float64 {// 自行分离整数部分和小数部分,以尽量减少舍入误差。// 参考 golang.org/issues/34861。sec := float64(d / time.Second) * float64(limit) // 计算整秒内的令牌数nsec := float64(d % time.Second) * float64(limit) // 计算剩余纳秒内的令牌数return sec + nsec / 1e9 // 返回整秒和纳秒对应的令牌数之和
}
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation
- AllowN(now time.Time, n int) bool
lim.reserveN(now, n, 0).ok
- now表示现在
- n表示请求n个token
- 0表示等待时间
- ReserveN
lim.reserveN(now, n, InfDuration)
- now表示现在
- n表示请求n个token
- InfDuration表示无限等待
- WaitN
// WaitN 阻塞直到 lim 允许 n 个事件发生。
// 如果 n 超过了 Limiter 的 burst 大小,Context 被取消,
// 或者预期的等待时间超过了 Context 的截止时间,它会返回一个错误。
// 如果速率限制是无限的(Inf),则忽略 burst 限制。
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {// 加锁以安全地获取限流器的 burst 和 limit 值lim.mu.Lock()burst := lim.burstlimit := lim.limitlim.mu.Unlock()// 如果 n 超过了 burst 且 limit 不是 Inf,则返回错误if n > burst && limit != Inf {return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)}// 检查 Context 是否已取消select {case <-ctx.Done():return ctx.Err()default:}// 查看ctx是否设定了deadline,确定最大等待时间now := time.Now()waitLimit := InfDurationif deadline, ok := ctx.Deadline(); ok {// 计算距离截止时间的剩余时间waitLimit = deadline.Sub(now)}// 进行预订r := lim.reserveN(now, n, waitLimit)if !r.ok {// 如果预订失败且等待时间超过 Context 截止时间,返回错误return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)}// 计算需要等待的时间delay := r.DelayFrom(now)if delay == 0 {// 如果不需要等待,直接返回return nil}// 启动定时器进行等待t := time.NewTimer(delay)defer t.Stop()select {case <-t.C:// 拿到了令牌return nilcase <-ctx.Done():// 在等待时 Context 被取消,取消预订,允许其他事件提前进行r.Cancel()return ctx.Err()}
}
3. 小结
令牌桶算法广泛应用于控制 API 请求速率、限制资源访问频率、管理任务调度等场景。通过合理设置 limit 和 burst,可以有效平衡系统负载和服务质量。该算法并不会实时去维护令牌桶中的token的数量,而是通过last和lastEvent来巧妙的计算出该段时间内容桶内令牌的状态,同时通过锁来维护了对于令牌桶的访问一致性问题。