目录
代码示例
接口
代理
接口实现
限流工厂
限流处理器接口
直接交换处理器
限流处理器
限流配置
滑动窗口限流
通过代理模式+滑动窗口,限流请求第三方平台,避免出现第三方平台抛出限流异常,影响正常业务流程,从出口出发进行限流请求。
代码示例
接口
/*** 第三方请求*/
public interface ThirdApi {/*** 发送消息** @param userId 用户id* @param message 消息* @return 发送是否成功*/boolean sendMessage(String userId, String message);
}
代理
/*** 第三方请求代理*/
@Component
public class ProxyThirdApi implements ThirdApi {@Resourceprivate ThirdApiServiceClient thirdApiServiceClient;@Resourceprivate LimitProcessorFactory limitProcessorFactory;@Resourceprivate YmlConstant ymlConstant;private ThirdApi thirdApi;@PostConstructpublic void initThirdApi() {thirdApi = new ThirdApiImpl(thirdApiServiceClient, ymlConstant);}@Override@SneakyThrowspublic boolean sendMessage(String userId, String message) {// 限流String bizLimit = "MSG_SEND_LIMIT";Object result = limitProcessorFactory.getProcessor(bizLimit).process(() -> thirdApi.sendMessage(userId, message));if (result instanceof Boolean) {return (Boolean) result;} else {return false;}}
}
接口实现
/*** 第三方请求实现**/
@Slf4j
@AllArgsConstructor
public class ThirdApiImpl implements ThirdApi {private final ThirdApiServiceClient thirdApiServiceClient;private final YmlConstant ymlConstant;@Overridepublic boolean sendMessage(String userId, String message) {MessageReq messageReq = new MessageReq();messageReq.setContent(message);messageReq.setReceiveId(userId);log.info("[ThirdApiImpl][sendMessage] {}", JSON.toJSONString(messageReq));HttpResponse<SendMessagesResp> sendResp = thirdApiServiceClient.sendMessage(messageReq);if (sendResp.isOk()) {return true;} else {log.error("[ThirdApiImpl][sendMessage] 消息发送失败,返回信息:{}", JSON.toJSONString(sendResp));return false;}}
}
限流工厂
/*** 限流工厂**/
@Component
public class LimitProcessorFactory {@Resourceprivate LimitProperties properties;@Getterprivate Map<String, LimitProperties.LimitData> propertiesMap;private final Map<String, LimiterProcessor> processorMap = new ConcurrentHashMap<>(10);@PostConstructpublic void initPropertiesMap() {List<LimitProperties.LimitData> props = properties.getProps();if (CollectionUtils.isEmpty(props)) {propertiesMap = Collections.emptyMap();} else {propertiesMap = props.stream().collect(Collectors.toMap(LimitProperties.LimitData::getName, Function.identity()));}}/*** 获取限流处理器** @param name 业务名称* @return 限流处理器*/public LimiterProcessor getProcessor(String name) {LimitProperties.LimitData props = propertiesMap.get(name);if (Objects.isNull(props)) {throw new BusinessException(String.format("无法找到[%s]的处理器配置", name));}if (props.getEnabled()) {return processorMap.computeIfAbsent(props.getName(), name -> {TimeUnit timeUnit = props.getTimeUnit();// 使用窗口滑动算法进行限流RateLimiter limiter = new SlidingWindowRateLimiter(props.getInterval(), props.getLimit(), timeUnit);return new LimiterProcessor(name, timeUnit.toMillis(props.getWaitTime()), limiter);});} else {return new SynchronousProcessor();}}
}
限流处理器接口
/*** 限流处理器接口*/
public interface LimiterProcessor {/*** 限流** @param callback 回调* @return 执行结果* @throws Throwable Throwable*/Object process(LimiterCallback callback) throws Throwable;
}
直接交换处理器
/*** 直接交换处理器** @author zhimajiang*/
@Slf4j
public class SynchronousProcessor implements LimiterProcessor {@Overridepublic Object process(LimiterCallback callback) throws Throwable {return callback.process();}
}
限流处理器
/*** 限流处理器**/
@Slf4j
@AllArgsConstructor
public class Processor implements LimiterProcessor {private final String name;private final long waitTime;private final RateLimiter rateLimiter;@Overridepublic Object process(LimiterCallback callback) throws Throwable {while (true) {if (rateLimiter.tryAcquire()) {// 未被限流,则尝试唤醒其他被限流的任务Object proceed = callback.process();synchronized (this) {this.notifyAll();}return proceed;} else {// 已被限流则进入阻塞log.info("LimiterProcessor][process] {}-限流", name);synchronized (this) {try {this.wait(waitTime);} catch (InterruptedException ignored) {}}}}}
}
限流配置
/*** 限流配置**/
@Data
@Configuration
@ConfigurationProperties("limit")
public class LimitProperties {/*** 限流配置*/private List<LimitProperties.LimitData> props;@Datapublic static class LimitData {/*** 名称*/private String name;/*** 是否启用*/private Boolean enabled = false;/*** 时间间隔*/private int interval;/*** 限制阈值*/private int limit;/*** 阻塞等待时间*/private int waitTime = 1000;/*** 时间单位*/private TimeUnit timeUnit = TimeUnit.MILLISECONDS;}
}
滑动窗口限流
/*** 滑动窗口限流**/
public class SlidingWindowRateLimiter implements RateLimiter {/*** 子窗口数量*/private final int slotNum;/*** 子窗口大小*/private final long slotSize;/*** 限流阈值*/private final int limit;/*** 上一次的窗口结束时间*/private long lastTime;/*** 子窗口流量计数*/private final AtomicInteger[] counters;/*** 滑动窗口限流** @param windowSize 时间窗口大小* @param slotNum 子窗口数量* @param limit 限流阈值* @param timeUnit 时间单位*/public SlidingWindowRateLimiter(int windowSize, int slotNum, int limit, TimeUnit timeUnit) {long windowSizeMills = timeUnit.toMillis(windowSize);this.slotNum = slotNum;this.slotSize = windowSizeMills / slotNum;this.limit = limit;this.lastTime = System.currentTimeMillis();this.counters = new AtomicInteger[slotNum];resetCounters();}/*** 滑动窗口限流** @param windowSize 时间窗口大小* @param limit 限流阈值* @param timeUnit 时间单位*/public SlidingWindowRateLimiter(int windowSize, int limit, TimeUnit timeUnit) {this(windowSize, 5, limit, timeUnit);}/*** 滑动窗口限流** @param windowSize 时间窗口大小(毫秒)* @param limit 限流阈值*/public SlidingWindowRateLimiter(int windowSize, int limit) {this(windowSize, 5, limit, TimeUnit.MILLISECONDS);}/*** 重置子窗口流量计数*/private void resetCounters() {for (int i = 0; i < this.slotNum; i++) {this.counters[i] = new AtomicInteger(0);}}/*** 限流请求** @return true-允许执行 false-触发限流*/@Overridepublic synchronized boolean tryAcquire() {long currentTime = System.currentTimeMillis();// 小窗口移动格数int slideNum = (int) Math.floor((double) (currentTime - this.lastTime) / this.slotSize);slideWindow(slideNum);// 窗口时间内的请求总数int sum = Arrays.stream(this.counters).mapToInt(AtomicInteger::get).sum();this.lastTime = this.lastTime + slideNum * slotSize;if (sum >= limit) {return false;} else {this.counters[this.slotNum - 1].incrementAndGet();return true;}}/*** 将计数器内全部元素向左移动num个位置** @param num 移动位置个数*/private void slideWindow(int num) {if (num == 0) {return;}if (num >= this.slotNum) {// 如果移动步数大于子窗口个数,则计数全部清零resetCounters();return;}// 对于a[0]~a[num-1]来说,移动元素则代表删除元素,所以直接从a[num]开始移动for (int index = num; index < this.slotNum; index++) {// 移动元素int newIndex = index - num;this.counters[newIndex] = this.counters[index];this.counters[index].getAndSet(0);}}
}