限流算法——时间滑动窗口
背景:
在当今的微服务架构中,会存在流量剧增的情况,需要适当的限流才能保证我们服务不会被打崩,因此一些限流组件就随之诞生,主流的接口限流组件,如 spring cloud alibaba sentinel等,开源限流工具包,如 guava 等。
本篇文章,主要通过手撕代码对时间滑动窗口限流算法实现。
常见限流算法:
限流算法是一种用于控制数据流速率的方法,以防止系统过载或保证服务质量。常见的限流算法包括固定窗口算法、滑动窗口算法、漏桶算法、漏桶算法和令牌桶算法
- 固定窗口算法。这是一种简单的计数器算法,它在固定的时间窗口内累加访问次数。当访问次数达到设定的阈值时,触发限流策略。这种方法在每个新的时间窗口开始时进行计数器的清零。
- 滑动窗口算法。滑动窗口算法是对固定窗口算法的改进,它将时间窗口分为多个小周期,每个小周期都有自己的计数器。随着时间的滑动,过期的小周期数据被删除,这样可以更精确地控制流量。
- 漏桶算法。漏桶算法则是一种更加平滑的限流方式,它以固定的速率处理请求,就像漏桶以一定的速率释放水滴一样。如果请求速率超过漏桶的释放速率,则超出部分的请求会被丢弃。
- 令牌桶算法。令牌桶算法中,系统以恒定的速率向桶中添加令牌,每个请求在处理前都需要从桶中获取一个令牌。如果桶中没有足够的令牌,则请求会被限流。
这些算法各有优缺点,适用于不同的场景和需求。例如,固定窗口和滑动窗口算法适合于QPS限流和统计总访问量,而漏桶和令牌桶算法则更适合于保证请求处理的平滑性和速率限制。
业务场景:
需求:需要针对单个设备,限制设备在固定时间内上报消息的频率,所以需要在设备发送消息前判断当前设备是否超出了限流标准,如果超过了标准就需要限制发送。
时间滑动窗口算法:
例如需求是十秒钟限制请求数100,
格子(时间):
我们可以将时间转为秒,看作一个个格子,将一秒钟看作一个格子,当同一秒钟新来的请求就将当前格子中的计数加一,随着时间的推移,到下一秒钟就创建出新的格子,这样我们就可以得到一个长链表,里面是随着时间推移,一个个记录请求数的格子。
窗口:
我们需求是计算十秒钟的请求是否超限,最新的十个格子,也就是10秒钟就是我们当前的窗口大小。我们只需要计算当前窗口内的请求总数是否超过了限制即可。
滑动窗口:
随之时间推移,会不断的创建新的格式,我们需要计算的窗口也会随着时间不断向后滑动,只包含当前最新的十个格子。
如下图图示,会建立一个长链表,其中包括当前的时间窗口以及过期的时间格子。判断是否超限只需计算当前滑动窗口内的所有请求数之和即可。
缺陷:
随着时间推移,会不断创建新的格子,此时链表就会越来越长,并且过期的时间格子永远也用不到了,所以需要将过期的时间格子进行清除,并且最新的格子也需要一直创建,此时就需要一个异步任务一直来创建和清除格子,无论是在时间上还是空间上都是比较消耗性能。
有没有优化方法?
答案是有的,环形数组。
我们可以根据当前的窗口来创建一个环形的数组,随着时间的推移,新的格子会将原来个格子覆盖掉,也不需要开启异步线程专门用来创建和清除格子。
环形数组
所谓环形数组,我们可以使用数组 + 取余 的方式来实现。
可以通过对当前时间取余的方式,确定当前时间格子的下标。
此时我们还需要维护一个变量,上次请求时间lastTime,需要与当前时间now进行比较,来决定我们计数的策略。
会有以下三种情况:
(1)情况一:lastTime 和 now 在同一秒钟,该情况,只需要继续沿用上次请求创建的格子,格子中请求数+1即可。
(2)情况二:当前时间和上次请求记录时间不在同一格子,但是在同一个周期中,没有超过一个周期。
该情况,从当前时间now计算时间窗口,保留在同一个周期内的格子, (now -> lastTime] 之间的格子,即绿色的格子,里面存储的数据是同一个周期内的;而从 (lastTime -> now] 之间的格子已经是上一个周期,已经淘汰,需要将其重置为0;并且将now 当前的格子数置为1,重新开始计数当前格子。
(3)情况三:当前时间和上次请求记录时间已经超过了一个周期中,已经超过了环形数组中的一圈。
此时就需要将当前窗口内的所有格子进行清空,重新开始计数。
(4)情况四:异常情况,时钟回拨,可能是服务器时间被人修改,往前修改了,导致时间出错,lastTime > now,需要重新统计。
java代码实现:
package com.company.limit;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;/*** 时间滑动窗口算法* 限流算法*/
public class TimeSlidingWindow {/*** 限流上线次数*/private Integer limitMaxNum;/*** 滑动窗口格子数* 根据时间来,60秒分成60格*/private Integer windowNum;/*** 上次统计时间,单位秒*/private ConcurrentHashMap<String, AtomicLong> lastTimeMap;/*** 记录key值与时间窗口映射*/private ConcurrentHashMap<String, AtomicIntegerArray> timeWindowsMap;/*** 记录key值与窗口内请求总数映射*/private ConcurrentHashMap<String, AtomicInteger> timeCountMap;public TimeSlidingWindow(int limitMaxNum, int windowNum) {this.timeWindowsMap = new ConcurrentHashMap<>();this.timeCountMap = new ConcurrentHashMap<>();this.windowNum = windowNum;this.limitMaxNum = limitMaxNum;this.lastTimeMap = new ConcurrentHashMap<>();}/*** 限流方法入口* @param key* @return*/public Boolean limit(String key) {// 获取当前窗口AtomicIntegerArray windows = this.timeWindowsMap.computeIfAbsent(key, k -> new AtomicIntegerArray(this.windowNum));// 获取当前窗口请求总和AtomicInteger count = timeCountMap.computeIfAbsent(key, k -> new AtomicInteger(0));AtomicLong lastTime = lastTimeMap.computeIfAbsent(key, k -> new AtomicLong(System.currentTimeMillis() / 1000));// 计算当前时间所处格子Long now = System.currentTimeMillis() / 1000;int temp = (int) (now % this.windowNum);// 计算当前时间与上次请求时间差,用于刷新窗口Long diffTime = now - lastTime.get();// System.out.println("now:" + now);
// System.out.println("lastTime:" + lastTime.get());// 将锁的粒度缩小单个value节点synchronized (windows) {if (diffTime >= 0 && now.equals(lastTime.get())) {/*(1)当前时间所属格子与上次请求记录在同一个格子中该情况,只需要继续沿用上次请求创建的格子,格子中请求数++*/windows.getAndAdd(temp, 1);count.addAndGet(1);} else if (diffTime >= 0 && diffTime < windowNum) {/*(2)当前时间和上次请求记录时间在同一个周期中,环形数组的同一个周期中,没有超过一个周期。该情况意味着,从当前时间now计算时间窗口内请求数,只需要保留并计算 (now -> last) 之间的格子;而从(last -> now) 之间的格子已经淘汰,需要将其重置为0;并且将now 当前的格子数置为1,重新开始计数当前格子。*/count = clearExpireWindows(windows, (int) (lastTime.get() % this.windowNum), temp, count);windows.set(temp, 1);count.addAndGet(1);} else if (diffTime >= 0 && diffTime >= this.windowNum) {/*(3)当前时间和上次请求记录时间不在同一个周期中,已经超过了环形数组中的一圈。意味着,之前统计的*/windows = new AtomicIntegerArray(this.windowNum);windows.set(temp, 1);count.set(1);} else {/*(4)异常情况,时钟回拨,可能是服务器时间被人修改,往前修改了,导致时间出错,需要重新统计*/System.out.println("时钟回拨,时间异常,重新开启限流统计");this.timeWindowsMap = new ConcurrentHashMap<>();this.timeCountMap = new ConcurrentHashMap<>();return true;}lastTime.set(now);// 如果限流了,这次计数需要回退if (count.get() > this.limitMaxNum) {windows.getAndAdd(temp, -1);count.addAndGet(-1);return false;}}return true;}/*** 清除过期数据* @param windows 需要清除的窗口* @param from 开始位置* @param to 结束位置* @param count 当前周期计算总和* @return*/private AtomicInteger clearExpireWindows(AtomicIntegerArray windows, int from, int to, AtomicInteger count) {if (to == from) {count.addAndGet(1);return count;}// 调整下标值,若结束位置小于开始位置,则说明当前格子位于下一个周期中。if (to < from) {to = this.windowNum + to;}while (++from <= to) {int window = windows.get(from % this.windowNum);count.addAndGet(-window);windows.set(from % this.windowNum, 0);}return count;}public static void main(String[] args) {TimeSlidingWindow timeSlidingWindow = new TimeSlidingWindow(5, 10);new Thread(() -> {int i = 0;while (i < 600) {Boolean limit = timeSlidingWindow.limit("/hello");System.out.println("/hello" + i + ":" + limit + ", 时间:" + (i * 300.0) / 1000.0);try {Thread.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}i++;}}).start();new Thread(() -> {int i = 0;while (i < 600) {Boolean limit1 = timeSlidingWindow.limit("/world");System.out.println("/world" + i + ":" + limit1 + ", 时间:" + (i * 500.0) / 1000.0);try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}i++;}}).start();}
}