目录
- 1.简介
- 1.1.为什么需要限流?
- 1.2.限流和熔断有什么区别?
- 1.3.限流和削峰有什么区别?
- 1.4 缓存,降级,限流简介
- 2.应用级限流
- 2.1 控制并发数量
- 2.2 控制访问速率
- 2.2.1 令牌桶算法
- 2.2.2 漏桶算法
- 3.分布式限流
- 4.交流群
1.简介
接口限流是对某一时间窗口内的请求数进行限制,以保持系统的可用性和稳定性,防止因流量暴增而导致的系统运行缓慢或宕机。此外,接口限流也可以通过限制每个用户或每个接口调用的频率和并发数,来控制对服务资源的访问。
1.1.为什么需要限流?
大量正常用户高频访问导致服务器宕机
恶意用户高频访问导致服务器宕机
网页爬虫 ,对于这些情况我们需要对用户的访问进行限流访问
1.2.限流和熔断有什么区别?
限流发生在流量进来之前,超过的流量进行限制。
熔断是一种应对故障的机制,发生在流量进来之后,如果系统发生故障或者异常,熔断会自动切断请求,防止故障进一步扩展,导致服务雪崩。
1.3.限流和削峰有什么区别?
削峰是对流量的平滑处理,通过缓慢地增加请求的处理速率来避免系统瞬时过载。
削峰大概就是水库,把流量储存起来,慢慢流,限流大概就是闸口,拒绝超出的流量。
1.4 缓存,降级,限流简介
- 缓存 缓存的目的是提升系统访问速度和增大系统处理容量
- 降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开
- 限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理
2.应用级限流
2.1 控制并发数量
在Java中,可以使用信号量(Semaphore)机制来控制并发数量。信号量是一个计数器,用于限制对共享资源的访问。以下是一个使用信号量机制控制并发数量的示例:
import java.util.concurrent.Semaphore; public class ConcurrencyControlExample { private static final int MAX_CONCURRENT_THREADS = 5; // 最大并发线程数 private static Semaphore semaphore = new Semaphore(MAX_CONCURRENT_THREADS); public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(new WorkerThread("" + i)).start(); } } static class WorkerThread implements Runnable { private String command; public WorkerThread(String s) { this.command = s; } @Override public void run() { try { // 获取信号量,如果信号量为0,则当前线程需要等待 semaphore.acquire(); // 执行任务 System.out.println(Thread.currentThread().getName() + "开始处理:" + command); processCommand(); System.out.println(Thread.currentThread().getName() + "结束处理:" + command); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放信号量,允许其他线程获取信号量并执行任务 semaphore.release(); } } private void processCommand() { try { Thread.sleep(2000); // 模拟耗时任务 } catch (InterruptedException e) { e.printStackTrace(); } } }
}
2.2 控制访问速率
在工程实践中,常见的是使用令牌桶算法来实现这种模式,常用的限流算法有两种:漏桶算法和令牌桶算法。
2.2.1 令牌桶算法
如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务,令牌桶算法通过发放令牌,根据令牌的rate频率做请求频率限制,容量限制等。
在Wikipedia上,令牌桶算法是这么描述的:
1、每过1/r秒桶中增加一个令牌。
2、桶中最多存放b个令牌,如果桶满了,新放入的令牌会被丢弃。
3、当一个n字节的数据包到达时,消耗n个令牌,然后发送该数据包。
4、如果桶中可用令牌小于n,则该数据包将被缓存或丢弃。
令牌桶控制的是一个时间窗口内通过的数据量,在API层面我们常说的QPS、TPS,正好是一个时间窗口内的请求量或者事务量,只不过时间窗口限定在1s罢了。以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。令牌桶的另外一个好处是可以方便的改变速度,一旦需要提高速率,则按需提高放入桶中的令牌的速率。
在我们的工程实践中,通常使用Google开源工具包Guava提供的限流工具类RateLimiter来实现控制速率,该类基于令牌桶算法来完成限流,非常易于使用,而且非常高效。如我们不希望每秒的任务提交超过1个
public static void main(String[] args) {String start = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());RateLimiter limiter = RateLimiter.create(1.0); // 这里的1表示每秒允许处理的量为1个for (int i = 1; i <= 10; i++) {double waitTime = limiter.acquire(i);// 请求RateLimiter, 超过permits会被阻塞System.out.println("cutTime=" + System.currentTimeMillis() + " call execute:" + i + " waitTime:" + waitTime);}String end = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());System.out.println("start time:" + start);System.out.println("end time:" + end);}
首先通过RateLimiter.create(1.0);创建一个限流器,参数代表每秒生成的令牌数,通过limiter.acquire(i);来以阻塞的方式获取令牌,令牌桶算法允许一定程度的突发(允许消费未来的令牌),所以可以一次性消费i个令牌;当然也可以通过tryAcquire(int permits, long timeout, TimeUnit unit)来设置等待超时时间的方式获取令牌,如果超timeout为0,则代表非阻塞,获取不到立即返回,支持阻塞或可超时的令牌消费。
从输出来看,RateLimiter支持预消费,比如在acquire(5)时,等待时间是4秒,是上一个获取令牌时预消费了3个两排,固需要等待3*1秒,然后又预消费了5个令牌,以此类推。
ateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费),在使用过程中需要注意这一点,Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定,平滑突发限流),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值,平滑预热限流) 两种模式实现思路类似,主要区别在等待时间的计算上。
- SmoothBursty 模式:RateLimiter limiter = RateLimiter.create(5);
RateLimiter.create(5)表示桶容量为5且每秒新增5个令牌,即每隔200毫秒新增一个令牌;limiter.acquire()表示消费一个令牌,如果当前桶中有足够令牌则成功(返回值为0),如果桶中没有令牌则暂停一段时间,比如发令牌间隔是200毫秒,则等待200毫秒后再去消费令牌,这种实现将突发请求速率平均为了固定请求速率。 - SmoothWarmingUp模式:RateLimiter limiter = RateLimiter.create(5,1000, TimeUnit.MILLISECONDS);
创建方式:RateLimiter.create(doublepermitsPerSecond, long warmupPeriod, TimeUnit unit),permitsPerSecond表示每秒新增的令牌数,warmupPeriod表示在从冷启动速率过渡到平均速率的时间间隔。速率是梯形上升速率的,也就是说冷启动时会以一个比较大的速率慢慢到平均速率;然后趋于平均速率(梯形下降到平均速率)。可以通过调节warmupPeriod参数实现一开始就是平滑固定速率。
注:RateLimiter控制的是速率,Samephore控制的是并发量。RateLimiter的原理就是令牌桶,它主要由许可发出的速率来定义,如果没有额外的配置,许可证将按每秒许可证规定的固定速度分配,许可将被平滑地分发,若请求超过permitsPerSecond则RateLimiter按照每秒 1/permitsPerSecond 的速率释放许可。注意:RateLimiter适用于单体应用,且RateLimiter不保证公平性访问。
使用上述方式使用RateLimiter的方式不够优雅,自定义注解+AOP的方式实现(适用于单体应用),详细见下面代码:
自定义注解:
import java.lang.annotation.*;/*** 自定义注解可以不包含属性,成为一个标识注解*/
@Inherited
@Documented
@Target({ElementType.METHOD, ElementType.FIELD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimitAspect {}
自定义切面类
import com.google.common.util.concurrent.RateLimiter;
import com.test.cn.springbootdemo.util.ResultUtil;
import net.sf.json.JSONObject;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;@Component
@Scope
@Aspect
public class RateLimitAop {@Autowiredprivate HttpServletResponse response;private RateLimiter rateLimiter = RateLimiter.create(5.0); //比如说,我这里设置"并发数"为5@Pointcut("@annotation(com.test.cn.springbootdemo.aspect.RateLimitAspect)")public void serviceLimit() {}@Around("serviceLimit()")public Object around(ProceedingJoinPoint joinPoint) {Boolean flag = rateLimiter.tryAcquire();Object obj = null;try {if (flag) {obj = joinPoint.proceed();//这个方法用于执行原来的方法或继续原来的控制流程。}else{String result = JSONObject.fromObject(ResultUtil.success1(100, "failure")).toString();output(response, result);}} catch (Throwable e) {e.printStackTrace();}System.out.println("flag=" + flag + ",obj=" + obj);return obj;}public void output(HttpServletResponse response, String msg) throws IOException {response.setContentType("application/json;charset=UTF-8");ServletOutputStream outputStream = null;try {outputStream = response.getOutputStream(); //这行代码获取了与当前HTTP响应关联的输出流,并将其赋值给outputStream变量。outputStream.write(msg.getBytes("UTF-8"));//这部分将转换后的字节数组写入到之前获取的输出流中,这意味着数据将被发送到客户端。} catch (IOException e) {e.printStackTrace();} finally {outputStream.flush();outputStream.close();}}
}
测试controller
import com.test.cn.springbootdemo.aspect.RateLimitAspect;
import com.test.cn.springbootdemo.util.ResultUtil;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;@Controller
public class TestController {@ResponseBody@RateLimitAspect@RequestMapping("/test")public String test(){return ResultUtil.success1(1001, "success").toString();}
2.2.2 漏桶算法
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。
算法实现:
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;// 漏桶 限流
@Slf4j
public class LeakBucketLimiter {// 计算的起始时间private static long lastOutTime = System.currentTimeMillis();// 流出速率 每秒 2 次private static int leakRate = 2;// 桶的容量private static int capacity = 2;//剩余的水量private static AtomicInteger water = new AtomicInteger(0);//返回值说明:// false 没有被限制到// true 被限流public static synchronized boolean isLimit(long taskId, int turn) {// 如果是空桶,就当前时间作为漏出的时间if (water.get() == 0) {lastOutTime = System.currentTimeMillis();water.addAndGet(1);return false;}// 执行漏水int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;// 计算剩余水量int waterLeft = water.get() - waterLeaked;water.set(Math.max(0, waterLeft));// 重新更新leakTimeStamplastOutTime = System.currentTimeMillis();// 尝试加水,并且水还未满 ,放行if ((water.get()) < capacity) {water.addAndGet(1);return false;} else {// 水满,拒绝加水, 限流return true;}}//线程池,用于多线程模拟测试private ExecutorService pool = Executors.newFixedThreadPool(10);@Testpublic void testLimit() {// 被限制的次数AtomicInteger limited = new AtomicInteger(0);// 线程数final int threads = 2;// 每条线程的执行轮数final int turns = 20;// 线程同步器CountDownLatch countDownLatch = new CountDownLatch(threads);long start = System.currentTimeMillis();for (int i = 0; i < threads; i++) {pool.submit(() -> {try {for (int j = 0; j < turns; j++) {long taskId = Thread.currentThread().getId();boolean intercepted = isLimit(taskId, j);if (intercepted) {// 被限制的次数累积limited.getAndIncrement();}Thread.sleep(200);}} catch (Exception e) {e.printStackTrace();}//等待所有线程结束countDownLatch.countDown();});}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}float time = (System.currentTimeMillis() - start) / 1000F;//输出统计结果log.info("限制的次数为:" + limited.get() + ",通过的次数为:" + (threads * turns - limited.get()));log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns));log.info("运行的时长为:" + time);}
}
3.分布式限流
分布式限流
自定义注解+拦截器+Redis实现限流 (单体和分布式均适用,全局限流)
自定义注解:
@Inherited
@Documented
@Target({ElementType.FIELD,ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface AccessLimit {int limit() default 5; int sec() default 5;
}
拦截器:
public class AccessLimitInterceptor implements HandlerInterceptor {@Autowiredprivate RedisTemplate<String, Integer> redisTemplate; //使用RedisTemplate操作redis@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {if (handler instanceof HandlerMethod) {HandlerMethod handlerMethod = (HandlerMethod) handler;Method method = handlerMethod.getMethod();if (!method.isAnnotationPresent(AccessLimit.class)) {return true;}AccessLimit accessLimit = method.getAnnotation(AccessLimit.class);if (accessLimit == null) {return true;}int limit = accessLimit.limit();int sec = accessLimit.sec();String key = IPUtil.getIpAddr(request) + request.getRequestURI();Integer maxLimit = redisTemplate.opsForValue().get(key);if (maxLimit == null) {redisTemplate.opsForValue().set(key, 1, sec, TimeUnit.SECONDS); //set时一定要加过期时间} else if (maxLimit < limit) {redisTemplate.opsForValue().set(key, maxLimit + 1, sec, TimeUnit.SECONDS);} else {output(response, "请求太频繁!");return false;}}return true;}public void output(HttpServletResponse response, String msg) throws IOException {response.setContentType("application/json;charset=UTF-8");ServletOutputStream outputStream = null;try {outputStream = response.getOutputStream();outputStream.write(msg.getBytes("UTF-8"));} catch (IOException e) {e.printStackTrace();} finally {outputStream.flush();outputStream.close();}}@Overridepublic void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {}
}
controller:
@Controller
@RequestMapping("/activity")
public class AopController {@ResponseBody@RequestMapping("/seckill")@AccessLimit(limit = 4,sec = 10) //加上自定义注解即可public String test (HttpServletRequest request,@RequestParam(value = "username",required = false) String userName){//TODO somethings……return "hello world !";}
}
配置文件:
/*springmvc的配置文件中加入自定义拦截器*/
<mvc:interceptors><mvc:interceptor><mvc:mapping path="/**"/><bean class="com.pptv.activityapi.controller.pointsmall.AccessLimitInterceptor"/></mvc:interceptor>
</mvc:interceptors>
4.交流群