常用的限流算法有漏桶算法和令牌桶算法,guava的RateLimiter使用的是令牌桶算法,也就是以固定的频率向桶中放入令牌,例如一秒钟10枚令牌,实际业务在每次响应请求之前都从桶中获取令牌,只有取到令牌的请求才会被成功响应,获取的方式有两种:阻塞等待令牌或者取不到立即返回失败。
限流算法:令牌桶算法、漏斗桶算法、基于redis的滑动窗口计数法
令牌桶算法
我们用的是guava的RateLimiter,用在处理请求时候,从桶中申请令牌,申请到了就成功响应,申请不到时直接返回失败;
代码示例:
package common.guava;import com.google.common.util.concurrent.RateLimiter;
import org.junit.Test;public class RateLimitTest {@Testpublic void use1(){RateLimiter rateLimiter = RateLimiter.create(5.0);for (int i = 0 ; i < 20; i++){//尝试获取令牌if (rateLimiter.tryAcquire()){System.out.println("获取令牌成功");//模拟业务执行
// try {
// Thread.sleep(500);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }}else {System.out.println("获取令牌失败");}}}@Testpublic void test2(){RateLimiter rateLimiter = RateLimiter.create(5);long start = System.currentTimeMillis()/1000;for (int i = 0 ; i < 10; i++){System.out.println("----start----");//阻塞式放行rateLimiter.acquire();System.out.println("放行");System.out.println("-----end-----");}long end = System.currentTimeMillis() / 1000;System.out.println(String.format("耗时:%d s", (end - start)));}}
漏斗桶算法
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
参考:https://www.cnblogs.com/xuwc/p/9123078.html
package flowLimit;import com.google.common.util.concurrent.RateLimiter;
import org.junit.Test;import java.util.concurrent.TimeUnit;/*** @author: weijie* @Date: 2020/9/23 19:15* @url:https://blog.csdn.net/cailianren1/article/details/85283044*/
public class LeakyBucketLimitTest {/*** @param qps 平均qps 控制接口的响应速率,响应速率越快处理请求越多* @param countOfReq 桶的大小,接受请求的最大值* @return*/public RateLimiter createLeakyBucket(int qps, int countOfReq){return RateLimiter.create(qps,countOfReq, TimeUnit.MILLISECONDS);}@Testpublic void run(){RateLimiter leakyBucket = createLeakyBucket(100, 1000);long start = System.currentTimeMillis()/1000;int countRequest = 200;for (int i = 0; i < countRequest; i++){
// System.out.println("请求过来");leakyBucket.acquire();
// System.out.println("业务处理");}long spend = System.currentTimeMillis()/1000 - start;System.out.println("处理的请求数量:" + countRequest +"," +""+"耗时:" + spend + "s " +",qps:" + leakyBucket.getRate()+",实际qps:"+Math.ceil(countRequest/(spend)));}
}
窗口计数法
优点:和令牌桶相比,这种算法不需要去等待令牌生成的时间,在新的时间窗口,可以立即处理大量的请求。
缺点:在一个窗口临界点的前后时间,比如时间窗口是1分钟,在59秒和1分01秒同时突发大量请求,极端情况下可能会带来 2 倍的流量,系统可能承受不了这么大的突发性流量
java实现的固定窗口计数法:
package common.flowLimit;import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/*** @author: weijie* @Date: 2020/9/23 18:02* @Description:* @url: https://blog.csdn.net/king0406/article/details/103129530?*/
public class WindowLimiter {Logger log = LoggerFactory.getLogger(WindowLimiter.class);//本地缓存,以时间戳为key,以原子类计数器为valueprivate LoadingCache<Long, AtomicLong> counter =CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build(new CacheLoader<Long, AtomicLong>() {@Overridepublic AtomicLong load(Long seconds) throws Exception {return new AtomicLong(0);}});private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);//设置限流阈值为15private long limit = 15;/*** 固定时间窗口* 每隔5s,计算时间窗口内的请求数量,判断是否超出限流阈值*/@Testpublic void run(){while (true){fixWindow();}}private void fixWindow() {scheduledExecutorService.scheduleWithFixedDelay(() -> {try {// time windows 5 slong time = System.currentTimeMillis() / 5000;//模拟每秒发送随机数量的请求int reqs = (int) (Math.random() * 5) + 1;long num = counter.get(time).addAndGet(reqs);log.info("time=" + time + ",num=" + num);if (num > limit) {log.info("限流了,num=" + num);}} catch (Exception e) {log.error("fixWindow error", e);} finally {}}, 0, 1000, TimeUnit.MILLISECONDS);}
}
基于redis分布式固定窗口计数法:
package flowLimit;import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;import java.util.Random;/*** @author: weijie* @Date: 2020/9/23 18:11* @url: https://blog.csdn.net/king0406/article/details/103130327*/
public class WindowLimiterByRedisTest {Logger logger = LoggerFactory.getLogger(WindowLimiterByRedisTest.class);JedisPool jedisPool;@Beforepublic void init(){String host = "39.96.204.209";int port = 6379;jedisPool = new JedisPool(host, port);}@Testpublic void run(){/*每次请求进来,查询一下当前的计数值,如果超出请求数阈值,则拒绝请求,返回系统繁忙提示*/Jedis redis = jedisPool.getResource();redis.auth("123456");long limit = 10;while (true){Random random = new Random();//模拟三个不同的请求String request = "flow:" + random.nextInt(3);long count = 0;try {count = limitFlow(redis, request);//超过限流if (count > limit){logger.error("当前访问过于频道,请稍后再试");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}else {logger.info("请求放行,执行业务处理");}}catch (Exception e){e.printStackTrace();}}}private long limitFlow(Jedis jedis, String key) {//Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。设置成功返回1,设置失败返回0Long lng = jedis.setnx(key, "1");if (lng == 1) {//设置时间窗口,redis-key时效为10秒jedis.expire(key, 10);return 1L;} else {//Redis Incrby 命令将 key 中储存的数字加上指定的增量值。相当于放在redis中的计数器,每次请求到来计数器自增1System.out.println("key: " + key);String va = jedis.get(key);System.out.println("value: " + va);long val = jedis.incr(key);System.out.println("result: " + val);return val;}}
}