来源:https://www.cnblogs.com/zhandouBlog/p/11743660.html
前言
RateLimiter是基于令牌桶算法实现的一个多线程限流器,它可以将请求均匀的进行处理,当然他并不是一个分布式限流器,只是对单机进行限流。它可以应用在定时拉取接口数据,
预防单机过大流量使用。
原理
首先先讲一下令牌桶的原理,每隔一段时间生产一个令牌放入桶里,请求在执行时需要拿到令牌才可以执行,如果拿不到令牌将等待令牌产生,一个生产者,多个消费者。
但是这样的令牌桶有一个问题,如果CPU负载过高,生产令牌的线程没有获取到时间片生产令牌,那么限制的流量将会比设定值更低。
可能是出于这个原因,guava并没有这样做,而是一个惰性生产令牌,每次请求令牌时,通过当前时间和下次产生令牌时间的差值计算出现在有多少个令牌,如果当前时间比发放时间大,会获得令牌,并且会生成令牌存储。如果令牌不够,则让线程sleep,并且将下次令牌产生时间更新成当前时间+sleep时间
sleep,并且将下次发放令牌的时间,设置成当前时间+线程sleep的时间。这样说,可能不是很清楚,看图。
这样做的好处是什么,如果获取令牌的线程抢不到cpu,只是这个线程的执行时间会晚,其他线程不会受到影响。
源码阅读
public static void main(String[] args) {RateLimiter rateLimiter = RateLimiter.create(10);while (true) {long start = System.currentTimeMillis();rateLimiter.acquire();System.out.println(System.currentTimeMillis() - start);}} |
运行可以发现,上面的代码除了第一次输出的是0或者1,其他都接近100。下面先看一下RateLimiter.create做了哪些事情
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {//创建对象,并且赋值,permitsPerSecond这个是我们设置的qps,stopwatch这个相当于一个计时器,记录相对时间,类似于我上面图中的10ms,100ms等,下面传入的1.0就是一秒的意思,设置上速率就是一秒多少次RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0);rateLimiter.setRate(permitsPerSecond);return rateLimiter;} 看一下setRate public final void setRate(double permitsPerSecond) {checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");//从名字就可以看出这是一个互斥锁,这个互斥锁采用了double check懒汉单例模式生成,synchronized (mutex()) {doSetRate(permitsPerSecond, stopwatch.readMicros());}} |
下面看一下doSetRate,真正开始设置速率了
final void doSetRate(double permitsPerSecond, long nowMicros) {//这个方法非常重要里面是nextFreeTicketMicros和storedPermits的设置,在生成对象的时候没有用,获取令牌时再讲resync(nowMicros);//这个就是令牌生成间隔,微秒表示double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;this.stableIntervalMicros = stableIntervalMicros;这里又有一个doSetRatedoSetRate(permitsPerSecond, stableIntervalMicros);}void doSetRate(double permitsPerSecond, double stableIntervalMicros) {double oldMaxPermits = this.maxPermits;//设置最大令牌maxPermits = maxBurstSeconds * permitsPerSecond;//设置存储令牌if (oldMaxPermits == Double.POSITIVE_INFINITY) {storedPermits = maxPermits;} else {storedPermits =(oldMaxPermits == 0.0)? 0.0 // initial state: storedPermits * maxPermits / oldMaxPermits;} |
到了这里整个创建过程就结束了,基本上就是一些设置,创建了锁,设置了生成令牌的间隔时间等等,下面看一下获取令牌的方法。
//获取一个令牌 public double acquire() {return acquire(1);}public double acquire(int permits) {//reserve返回等待时间,内部进行了令牌的获取long microsToWait = reserve(permits);stopwatch.sleepMicrosUninterruptibly(microsToWait);return 1.0 * microsToWait / SECONDS.toMicros(1L);}final long reserve(int permits) {checkPermits(permits);//创建对象时生成的锁synchronized (mutex()) {//stopwatch.readMicros()拿到当前的时间,预订return reserveAndGetWaitLength(permits, stopwatch.readMicros());}}final long reserveAndGetWaitLength(int permits, long nowMicros) {//将数据透传,拿到最早可预订的时间,如果预订时间在未来时间,返回一个大于0的值为等待时间long momentAvailable = reserveEarliestAvailable(permits, nowMicros);return max(momentAvailable - nowMicros, 0);} |
下面的代码就是我图中的实现
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {//这个方法很重要先看下面这个方法的讲解,在从这里往下看resync(nowMicros);//返回下次发放令牌时间,如果这个时间大于当前时间,在调用的上层会sleeplong returnValue = nextFreeTicketMicros;//拿到此次花费的令牌double storedPermitsToSpend = min(requiredPermits, this.storedPermits);// 如果令牌不够,这里就会大于0,下面就会得出一个等待时间double freshPermits = requiredPermits - storedPermitsToSpend;long waitMicros =storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)+ (long) (freshPermits * stableIntervalMicros);//将下次发放令牌的时间加上等待时间this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);this.storedPermits -= storedPermitsToSpend;return returnValue;}void resync(long nowMicros) {if (nowMicros > nextFreeTicketMicros) {//当前时间大于下次令牌发放时间,新的令牌为当前时间减去下次发放令牌时间除以生成令牌的时间间隔double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();//不能超过最大令牌数storedPermits = min(maxPermits, storedPermits + newPermits);//更新下次发放令牌时间为当前时间nextFreeTicketMicros = nowMicros;}} |
总结
RateLimiter的原理用语言描述,很容易把人绕晕,上面的图其实是最好的总结,懂得原理才能更好的使用,在多种限流器中选择合适的限流器。了解源码,能更进一步的掌握原理,并且从源码中可以学到设计思路和
一些设计模式的应用。