文章目录
- 滑动时间窗算法
- 基本知识
- 源码算法分析
- 漏桶算法
- 令牌桶算法
- 拦截器处理web请求
滑动时间窗算法
基本知识
限流算法最简单的实现就是使用一个计数器法。比如对于A接口来说,我要求一分钟之内访问量不能超过100,那么我们就可以这样来实现:
- 最开始的时候就设置一个count值,每当一个请求过来我就count++
- 如果count的值大于了100,并且与第一个请求的时间间隔小于1分钟,那么就表示请求数过多
- 如果该请求与第一个请求的间隔时间大于1分钟,且count的值还在限流范围内,那么就重置count。
/*** @Description: 限流算法之计数器法,伪代码的实现* @Author 胡尚* @Date: 2024/7/12 8:50*/
public class Count {// 当前时间private Long timeStamp = System.currentTimeMillis();// 请求数量private int count = 0;// 时间窗口的最大请求数private final int limit = 100;// 时间窗口msprivate final long interval = 1000 * 60;/*** 限流校验,校验是否成功* @return true表示允许请求,false表示校验未通过*/public boolean check(){long now = System.currentTimeMillis();// 还在一分钟 时间窗口之内if (now <= timeStamp + interval){count++;return count <= limit;} else {timeStamp = System.currentTimeMillis();// 超时后重置请求数count = 1;}return true;}
}
计数法的实现有一个缺点,那就是限流的时间精度不准确。比如第一个时间窗口的前半分钟只有十个请求,后半分钟有90个请求;第二个时间窗口的前半分钟有90个请求,后半分钟只有十个请求。这种情况下,限流器是不会出现限流的,但是我在在一个时间段中的请求却有180个了。
为了解决计算器法统计时间精度不够的问题,进而引入了滑动时间窗
滑动时间窗算法的基本原理是维护一个固定大小的时间窗口,窗口内的数据被认为是当前分析的有效数据。随着时间的推移,新的数据点进入窗口,而旧的数据点则被移出窗口,从而形成了一个滑动的时间窗口。
在上图中,一个红色窗格就是一个时间窗口,我们把一个时间窗口分为了6个小格子。就拿时间窗口为一分钟举例,上面每一个小格子就是10秒;每过10秒,我们的时间窗口就会往右移动一格。每一个格子都有自己的计数器count,比如当一个请求 在0:35秒的时候到达,那么0:30~0:39对应的counter就会加1。
在具体实现上,滑动时间窗算法可以通过多种数据结构来实现,例如使用环形数组、哈希表等。例如,可以使用一个环形数组来存储时间窗口内的数据点,数组的大小等于时间窗口的大小(以时间单位为单位)。每当有新的数据点进入时,旧的对应时间点的数据将被覆盖,从而实现滑动时间窗的效果。此外,还可以使用哈希表来记录每个时间点上的数据变化情况,其中键为时间点,值为该时间点的数据值或变化量。
/*** @Description: 限流算法之滑动时间窗算法的伪代码实现* 假设每秒的请求不能超过100,我们设置一个1s的时间窗口,时间窗口中共有10个小格子,* 每个格子记录100ms的请求数,每100毫秒移动一次,每次移动都需要记录当前服务请求数* 我这里就简单实现,只有一个计数器,最新的小窗口永远存储最新访问请求总数。当然也可以每个小窗口都有自己的计数器。* @Author 胡尚* @Date: 2024/7/12 9:23*/
public class SlidingTimeWindow {/*** 服务器访问次数,可以放在redis中实现分布式系统访问*/private Long count = 0L;/*** 滑动时间窗,使用linkedList来记录滑动窗口的10个格子*/private LinkedList<Long> slots = new LinkedList<>();public static void main(String[] args) throws InterruptedException {SlidingTimeWindow timeWindow = new SlidingTimeWindow();new Thread(new Runnable() {@Overridepublic void run() {try {timeWindow.onCheck();} catch (InterruptedException e) {e.printStackTrace();}}}).start();// 模拟一直都有请求数,随机休眠几毫秒while(true){// TODO 限流校验timeWindow.count++;Thread.sleep(new Random().nextInt(15));}}private void onCheck() throws InterruptedException {while (true){// 把当前访问总数存入时间小窗口中slots.add(count);// 时间窗口的剔除操作if (slots.size() > 10){slots.removeFirst();}// 最新的时间小窗口的数和最老的时间小窗口数进行比较,是否要限流if (slots.peekLast() - slots.peekFirst() > 100 ){// TODO 限流标识} else{// TODO 解除限流标识}Thread.sleep(100);}}}
源码算法分析
我们接下来看看Sentinel它的滑动时间窗是怎么实现的。我们从StatisticSlot
类的entry()
方法开始看,因为它每次都会记录请求通过/请求拒绝相关的计数
public void entry (...) throws Throwable {...// 请求通过,我们详细看看这个方法的处理逻辑node.addPassRequest(count);
}// 调用到StatisticNode类的addPassRequest()方法中
public void addPassRequest(int count) {// 一个记录秒级// 它的创建语句 :Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);// 小窗口总数SampleCountProperty.SAMPLE_COUNT = 2 滑动时间窗总时间数IntervalProperty.INTERVAL=1000 这两个数在下面的方法中会用到rollingCounterInSecond.addPass(count);// 一个记录分钟级rollingCounterInMinute.addPass(count);
}// 进入到addPass()方法中
public void addPass(int count) {// 获取到当前时间小窗口WindowWrap<MetricBucket> wrap = data.currentWindow();// 往时间小窗口中进行相加操作// wrep.value()获取的是MetricBucket对象,它存储了一个LongAdder[] counters数组// 所以这里的添加操作应该是往LongAdder[]数组中对应的某一个小标进行相加操作wrap.value().addPass(count);
}
我们首先详细分析data.currentWindow();
这里是如何根据当前时间获取到对应的时间小窗口的
- 计算出当前请求时间对应的小窗口下标idx
- 计算出当前请求的小窗口起始时间
- 取出idx对应的老的小窗口对象
- 如果老的小窗口对象为null,那么就直接创建一个小窗口对象,并存入滑动窗口对应的小窗口位置
- 如果老的小窗口对象所代表的小窗口起始时间 = 当前请求的小窗口起始时间 ,那么就返回老的小窗口对象
- 如果老的小窗口对象所代表的小窗口起始时间 < 当前请求的小窗口起始时间,那么就会替换覆盖掉老的小窗口对象
- 时钟回拨的处理,创建一个新的无关紧要的小窗口对象返回
public WindowWrap<T> currentWindow(long timeMillis) {// 当前时间戳基本上不会小于0if (timeMillis < 0) {return null;}// 获取当前时间对应的小窗口的下标数: timeMillis / 小窗口总时间数 % 小窗口数// 比如当前限流时间是1秒,移动窗口的小窗口数是2,那么每个小窗口总时间数就是500ms。// 假如当前时间为700ms。那么就是 700 / 500 = 1 ,然后1%2 = 1 最终得到我当时间所在的小窗口是下标为1的小窗口int idx = calculateTimeIdx(timeMillis);// Calculate current bucket start time.// 计算当前小窗口的启动时间:timeMillis - timeMillis % 小窗口总时间数// 还是上面的案例: 700 - 700%500 = 500。 那么700ms这个时间对应的小窗口启动时间为500// 假如当前时间为1600ms:1600 - 1600%500 = 1500。 那么1600md对应小窗口的启动时间是1500long windowStart = calculateWindowStart(timeMillis);while (true) {// 取出老小窗口对象WindowWrap,根据上面得到的当前时间对应的小窗口的下标数WindowWrap<T> old = array.get(idx);// 如果滑动时间窗口中,该小窗口为nullif (old == null) {// 那么就创建一个小窗口对象WindowWrap,参数有小窗口的总时长、小窗口的起始时间、当前请求时间戳WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));// 再用CAS算法存入时间窗口中if (array.compareAndSet(idx, null, window)) {return window;} else {Thread.yield();}// 如果当前时间对应的小窗口起始时间 = 老小窗口的起始时间。那么就直接返回老的小窗口对象// 可能有一个700ms的请求先过来,创建了一个WindowWrap小窗口对象。然后来了一个750ms的请求,这时它们的小窗口的起始时间就是一样的} else if (windowStart == old.windowStart()) {return old;// 如果当前时间对应的小窗口起始时间 > 老的小窗口的起始时间。那么就把老的小窗口对象给覆盖掉// 可能有一个700ms的请求先过来,创建了一个WindowWrap小窗口对象。// 然后来了一个1600ms的请求,这时它们的小窗口的起始时间就是500 和 1500// 这里就会调用resetWindowTo(old, windowStart);方法进行滑动时间窗,剔除掉老的小窗口} else if (windowStart > old.windowStart()) {if (updateLock.tryLock()) {try {// 重置小窗口return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {Thread.yield();}// 时钟回拨的处理逻辑,直接返回一个新的小窗口WindowWrap对象,它和滑动窗口都没有关系} else if (windowStart < old.windowStart()) {// Should not go through here, as the provided time is already behind.return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}
}
我们再看看往时间小窗口中进行相加操作的具体实现wrap.value().addPass(count);
// 往时间小窗口中进行相加操作
// wrep.value()获取的是MetricBucket对象,它存储了一个LongAdder[] counters数组
// 所以这里的添加操作应该是往LongAdder[]数组中对应的某一个小标进行相加操作
wrap.value().addPass(count);// 方法调用
public void addPass(int n) {// 这里就传了一个代码pass通过的枚举对象add(MetricEvent.PASS, n);
}// 往代码Pass通过的数组下标对应位置进行相加操作
public MetricBucket add(MetricEvent event, long n) {// event.ordinal()获取的其实就是当前对象在枚举中的下标数// 官方解释:返回此枚举常量的序号(其在枚举声明中的位置,其中初始常量被赋值为0的序数)counters[event.ordinal()].add(n);return this;
}
通过上方的代码我们就已经知道了Sentinel在滑动窗口上的具体实现。我们现在再看看限流FlowSlot流程中,从时间窗口中取值的流程。
快速失败方式 --> DefaultController.canPass()
public boolean canPass(Node node, int acquireCount, boolean prioritized) {// 当前时间窗口中取统计指标数据int curCount = avgUsedTokens(node);// 当前qps > count阈值,那么就要返回falseif (curCount + acquireCount > count) {// 因为prioritized默认情况下都是false,所以下面if不需要花太多精力去看if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {...}return false;}return true;
}// 我们看看avgUsedTokens(node)方法是怎么从时间窗口中取值的。最终会调用至 ArrayMetric.pass()方法中
public long pass() {data.currentWindow();long pass = 0;// data.values()其实就是从滑动时间窗口中取所有的小窗口对象中存的MetricBucket对象List<MetricBucket> list = data.values();for (MetricBucket window : list) {// 求总和pass += window.pass();}return pass;
}
漏桶算法
漏桶算法,又称leaky bucket。
从图中我们可以看到,整个算法其实十分简单。首先,我们有一个固定容量的桶,有水流进来,也有水流出去。对于流进来的水来说,我们无法预计一共有多少水会流进来,也无法预计水流的速度。但是对于流出去的水来说,这个桶可以固定水流出的速率。而且,当桶满了之后,多余的水将会溢出。
我们将算法中的水换成实际应用中的请求,我们可以看到漏桶算法天生就限制了请求的速度。当使用了漏桶算法,我们可以保证接口会以一个常速速率来处理请求。所以漏桶算法天生不会出现临界问题。
/*** @Description: 限流算法之漏桶算法 伪代码实现* @Author 胡尚* @Date: 2024/7/12 11:57*/
public class LeakyBucket {/*** 当前时间*/private long timeStamp = System.currentTimeMillis();/*** 桶的容量*/private long capacity;/*** 水漏出的速度(每秒系统能处理的请求数)*/private long rate;/*** 当前水量(当前累积请求数)*/private long water;public boolean check() {long now = System.currentTimeMillis();// 当前水量water = Math.max(0, capacity - ((now - timeStamp) / 1000) * rate); // 更新timeStamptimeStamp = now;if (water + 1 > capacity){// 水满了,直接拒绝return false;}else{// 还能装水water++;return true;}}
}
我们接下来看看sentinel,它在流控规则中的排队等待是怎么实现漏桶算法的。
- 计算出两次请求应该的间隔时间costTime
- 计算出下一次请求期望的时间点expectedTime
- 如果currentTime > currentTime 就返回true,放行
- 否则计算当前请求需要等待时间waitTime
- 判断waitTime > 最大超时时间 就返回false
- 否则再又重新计算一次等待时间,重新判断一次是否超过最大等待时间,再去进入等待sleep()
// 排队等待 --> RateLimiterController.canPass() ---> 漏桶算法
public boolean canPass(Node node, int acquireCount, boolean prioritized) {if (acquireCount <= 0) {return true;}if (count <= 0) {return false;}// 获取当前请求的当前时间long currentTime = TimeUtil.currentTimeMillis();// acquireCount一般就是一次请求,也就是1 count就是我们在控制台界面中设置的阈值// costTime就是算出两次请求之间应该的间隔时间,比如我1s中只能允许10个请求。那么这里就是 1.0 / 10 * 1000 = 100ms。// 也就是说我两次请求的间隔时间应该是100毫秒long costTime = Math.round(1.0 * (acquireCount) / count * 1000);// expectedTime就是一个期望时间。 期望时间 = 两次请求间隔时间 + 上一次请求时间// 比如我上一个请求的时间为300ms,那么我期望下一个请求应该是400mslong expectedTime = costTime + latestPassedTime.get();// 假如我当前的请求时间 比期望时间还要大,那么就直接更新最后一次请求时间,并返回true放行当前请求// 比如我期望下一个请求的时间是400ms。但我现在请求的时间已经是450ms了,我比你期望的时间还要长,那你是不是肯定要给我放行了if (expectedTime <= currentTime) {latestPassedTime.set(currentTime);return true;} else {// 接下来就是我当前的请求时间,比期望的时间要小的处理逻辑// 重新算出来的期望时间 - 当前请求的时间 = 我当前这个请求需要等待的时间long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();// 如果我这个请求要等待的时间,超过了控制台界面设置的最大超时时间那么就直接返回falseif (waitTime > maxQueueingTimeMs) {return false;} else {// 接下来是我当前请求需要进行等待的处理逻辑// 修改上一次请求时间,上一次请求时间 + 间隔时间 = oldTime(其实就是重新算出来的期望时间)long oldTime = latestPassedTime.addAndGet(costTime);try {// 重新计算一次,期望时间 - 当前请求的时间 = 当前请求要等待的时间waitTime = oldTime - TimeUtil.currentTimeMillis();// 当前请求要等待的时间 是否大于了最大超时时间,如果大于了就把上一次请求时间改回来,在直接返回falseif (waitTime > maxQueueingTimeMs) {latestPassedTime.addAndGet(-costTime);return false;}// 当前线程进入等待if (waitTime > 0) {Thread.sleep(waitTime);}return true;} catch (InterruptedException e) {}}}return false;
}
令牌桶算法
令牌桶算法,又称token bucket。
有一个线程,会根据一定增长的预热速率往桶中放令牌,如果桶中的令牌满了那么就就直接将令牌丢弃掉。
当有请求过来时就会从桶中获取令牌,如果获取到了就放行,如果没有获取到就拒绝。
/*** 令牌桶限流算法 伪代码* 这里就没有预热、增长的速率往桶中放入令牌*/
public class TokenBucket {public long timeStamp = System.currentTimeMillis(); // 当前时间public long capacity; // 桶的容量public long rate; // 令牌放入速度public long tokens; // 当前令牌数量public boolean grant() {long now = System.currentTimeMillis();// 先添加令牌tokens = Math.min(capacity, tokens + (now - timeStamp) * rate);timeStamp = now;if (tokens < 1) {// 若不到1个令牌,则拒绝return false;} else {// 还有令牌,领取令牌tokens--;return true;}}
}
对应的sentinel中,Warm Up的实现就是基于令牌桶实现的,Warm Up --> WarmUpController.canPass() --> 令牌桶算法
这里就不展开介绍sentinel具体的实现算法了
public boolean canPass(Node node, int acquireCount, boolean prioritized) {long passQps = (long) node.passQps();long previousQps = (long) node.previousPassQps();syncToken(previousQps);// 开始计算它的斜率// 如果进入了警戒线,开始调整他的qpslong restToken = storedTokens.get();if (restToken >= warningToken) {long aboveToken = restToken - warningToken;// 消耗的速度要比warning快,但是要比慢// current interval = restToken*slope+1/countdouble warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));if (passQps + acquireCount <= warningQps) {return true;}} else {if (passQps + acquireCount <= count) {return true;}}return false;
}
限流算法小结
计数器 VS 滑动窗口:
计数器算法是最简单的算法,可以看成是滑动窗口的低精度实现。
滑动窗口由于需要存储多份的计数器(每一个格子存一份),所以滑动窗口在实现上需要更多的存储空间。
也就是说,如果滑动窗口的精度越高,需要的存储空间就越大。
漏桶算法 VS 令牌桶算法:
漏桶算法和令牌桶算法最明显的区别是令牌桶算法允许流量一定程度的突发。
因为默认的令牌桶算法,取走token是不需要耗费时间的,也就是说,假设桶内有100个token时,那么可以瞬间允许100个请求通过。
当然我们需要具体情况具体分析,只有最合适的算法,没有最优的算法。
拦截器处理web请求
我们引入了下面的依赖,也就是我们实际使用sentinel的场景下,我们会发现我们controller层编写的http请求接口都不需要我们自己额外的定义资源开启保护。
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
会自动的把我们的http请求接口定义成一个一个的资源。具体的实现如下:
入口是SentinelWebAutoConfiguration
自动配置类
SentinelWebAutoConfiguration
自动配置类它实现了WebMvcConfigurer
接口,所以在SpringBoot启动过程中就会调用这个类重写的addInterceptors
方法
public class SentinelWebAutoConfiguration implements WebMvcConfigurer {......// 此数组存的就是最下面创建的拦截器@Autowiredprivate Optional<SentinelWebInterceptor> sentinelWebInterceptorOptional;@Overridepublic void addInterceptors(InterceptorRegistry registry) {if (!sentinelWebInterceptorOptional.isPresent()) {return;}SentinelProperties.Filter filterConfig = properties.getFilter();// 最终下面创建的拦截器会被添加在这里来registry.addInterceptor(sentinelWebInterceptorOptional.get()).order(filterConfig.getOrder()).addPathPatterns(filterConfig.getUrlPatterns()); // 这里其实就是拦截所有的请求 /**log.info(...);}// 往Spring容器中存一个SentinelWebInterceptor拦截器@Bean@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",matchIfMissing = true)public SentinelWebInterceptor sentinelWebInterceptor(SentinelWebMvcConfig sentinelWebMvcConfig) {return new SentinelWebInterceptor(sentinelWebMvcConfig);}...
}
SentinelWebInterceptor extends AbstractSentinelInterceptor
这里是该类的继承关系,我们直接去看它父类中的preHandle()
方法
// 前置拦截器方法
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {try {String resourceName = this.getResourceName(request);if (StringUtil.isEmpty(resourceName)) {return true;} else if (this.increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {return true;} else {String origin = this.parseOrigin(request);String contextName = this.getContextName(request);ContextUtil.enter(contextName, origin);// 这里定义了资源,开启了保护Entry entry = SphU.entry(resourceName, 1, EntryType.IN);request.setAttribute(this.baseWebMvcConfig.getRequestAttributeName(), entry);return true;}} catch (BlockException var12) {// 如果出现了BlockException,则去相应的处理逻辑BlockException e = var12;try {this.handleBlockException(request, response, e);} finally {ContextUtil.exit();}return false;}
}