目录
- Sentinel规则持久化
- Sentinel与Spring Cloud Gateway整合
- 自定义扩展部分
- 遇到的问题
- 解决方案
- 控制面板改造
- 新增读取规则代码
- 重写SpringCloudGatewayApiDefinitionChangeObserver类,注意:类路径要完全一致
- 新增自定义规则读取类
- lua脚本(这里使用的是固定窗口,当然也可以改成滑动窗口,具体看业务场景要求)
前言: sentinel是一款阿里开源的限流项目,有完整的限流功能配置页面和限流方案,项目中只需要引入相关的依赖和添加对应的配置,即可进行限流。
开源版本的Sentinel也存在一些问题,比如:
- 限流规则是保存在内存当中的,服务启动会丢失相关的限流规则
- sentienl提供的集群限流不支持高可用(独立式)
本文主要讲的是sentinel与Gateway网关进行整合,以及扩展一些自定义的功能
Sentinel规则持久化
关于这点,网上有很多整合方案,具体可以参考下这篇文章:https://www.jianshu.com/p/48b0334590e5 整体改完规则能完成正常的持久化到nacos,以及客户端获取规则。
但是存在一个问题,规则的间隔时间没有同步到客户端,因此需要在这基础上做一些调整
1、Gateway流控规则的序列化与反序列化,这里采用了和原控制台推送的一样的序列化方式,主要是为了保证参数的一致
@Bean
public Converter<List<GatewayFlowRuleEntity>, String> gatewayFlowRuleEntityEncoder() {return rules -> JSON.toJSONString(rules.stream().map(GatewayFlowRuleEntity::toGatewayFlowRuleExtension).collect(Collectors.toList()));
}@Bean
public Converter<String, List<GatewayFlowRuleEntity>> gatewayFlowRuleEntityDecoder() {return s -> {List<GatewayFlowRuleExtension> gatewayFlowRules = JSON.parseArray(s, GatewayFlowRuleExtension.class);List<GatewayFlowRuleEntity> entities = gatewayFlowRules.stream().map(GatewayFlowRuleEntity::fromGatewayFlowRuleExtension).collect(Collectors.toList());return entities;};
}
** 2、新增GatewayFlowRuleExtension扩展类,在GatewayFlowRule的字段基础上新增了app(应用名称)、ip(客户端ip)、port(客户端端口号),为了反序列化时兼容原有控制台功能**
3、原有GatewayFlowRuleEntity类新增了自定义序列化与反序列方法
/*** 序列化*/public GatewayFlowRuleExtension toGatewayFlowRuleExtension() {GatewayFlowRuleExtension rule = new GatewayFlowRuleExtension();rule.setResource(resource);rule.setResourceMode(resourceMode);rule.setGrade(grade);rule.setCount(count);rule.setIntervalSec(calIntervalSec(interval, intervalUnit));rule.setControlBehavior(controlBehavior);if (burst != null) {rule.setBurst(burst);}if (maxQueueingTimeoutMs != null) {rule.setMaxQueueingTimeoutMs(maxQueueingTimeoutMs);}if (paramItem != null) {GatewayParamFlowItem ruleItem = new GatewayParamFlowItem();rule.setParamItem(ruleItem);ruleItem.setParseStrategy(paramItem.getParseStrategy());ruleItem.setFieldName(paramItem.getFieldName());ruleItem.setPattern(paramItem.getPattern());if (paramItem.getMatchStrategy() != null) {ruleItem.setMatchStrategy(paramItem.getMatchStrategy());}}rule.setApp(app);rule.setIp(ip);rule.setPort(port);return rule;
}/*** 自定义反序列化方式* @param rule* @return*/
public static GatewayFlowRuleEntity fromGatewayFlowRuleExtension(GatewayFlowRuleExtension rule) {GatewayFlowRuleEntity entity = new GatewayFlowRuleEntity();entity.setApp(rule.getApp());entity.setIp(rule.getIp());entity.setPort(rule.getPort());entity.setResource(rule.getResource());entity.setResourceMode(rule.getResourceMode());entity.setGrade(rule.getGrade());entity.setCount(rule.getCount());Object[] intervalSecResult = parseIntervalSec(rule.getIntervalSec());entity.setInterval((Long) intervalSecResult[0]);entity.setIntervalUnit((Integer) intervalSecResult[1]);entity.setControlBehavior(rule.getControlBehavior());entity.setBurst(rule.getBurst());entity.setMaxQueueingTimeoutMs(rule.getMaxQueueingTimeoutMs());GatewayParamFlowItem paramItem = rule.getParamItem();if (paramItem != null) {GatewayParamFlowItemEntity itemEntity = new GatewayParamFlowItemEntity();entity.setParamItem(itemEntity);itemEntity.setParseStrategy(paramItem.getParseStrategy());itemEntity.setFieldName(paramItem.getFieldName());itemEntity.setPattern(paramItem.getPattern());itemEntity.setMatchStrategy(paramItem.getMatchStrategy());}return entity;
}
按照这几个点改完,控制面板的规则就能完整地保存到nacos中,再同步给应用程序
Sentinel与Spring Cloud Gateway整合
这一点网上的资料比较多,就不具体地介绍了,主要就是读取nacos中规则,然后加载到内存中,通过注册监听器监听规则变化。
自定义扩展部分
遇到的问题
- 1、部分情况下,系统的访问量并非是正常用户请求过来的,可能是一些爬虫之类的机器访问的,针对这些流量,需要根据ip+具体的接口进行限流。针对这种情况,sentinel本身的限流规则局限性比较大(底层采用滑动窗口的算法,在内存中进行限流)
- 2、自定义限流部分的规则获取、更新
解决方案
问题1:为了解决这个问题, 这里采用gateway接入redis,使用redis+lua,将访问ip+访问接口作为一个完整的key,使用固定窗口算法进行限流
问题2:Sentinel本身的规则更新非常方便,这里使用Sentienl控制面板来配置自定义限流规则,再通过过滤的方式,从Sentinel原本规则中读取自定义规则
控制面板改造
api管理菜单中,新增api规则时,api名称和匹配串使用固定前缀custom_
使用自定义(custom_开头) API分组创建规则时,阈值类型只支持QPS,只能配置QPS阈值和间隔选项,其它选项配置都不支持
新增读取规则代码
重写SpringCloudGatewayApiDefinitionChangeObserver类,注意:类路径要完全一致
public class SpringCloudGatewayApiDefinitionChangeObserver implements ApiDefinitionChangeObserver {public SpringCloudGatewayApiDefinitionChangeObserver() {}public void onChange(Set<ApiDefinition> apiDefinitions) {GatewayApiMatcherManager.loadApiDefinitions(apiDefinitions);//自定义扩展类GatewayApiMatcherManagerExtension.loadApiDefinitions(apiDefinitions);}
}
新增自定义规则读取类
@Slf4j
public final class GatewayApiMatcherManagerExtension {private static final Map<String, WebExchangeApiMatcher> API_MATCHER_MAP = new ConcurrentHashMap();public static Map<String, WebExchangeApiMatcher> getApiMatcherMap() {return Collections.unmodifiableMap(API_MATCHER_MAP);}public static Optional<WebExchangeApiMatcher> getMatcher(String apiName) {return Optional.ofNullable(apiName).map((e) -> {return (WebExchangeApiMatcher)API_MATCHER_MAP.get(apiName);});}public static Set<ApiDefinition> getApiDefinitionSet() {return (Set)API_MATCHER_MAP.values().stream().map(AbstractApiMatcher::getApiDefinition).collect(Collectors.toSet());}public static synchronized void loadApiDefinitions(Set<ApiDefinition> definitions) {Set<ApiDefinition> apiDefinitionsNew = new HashSet<>();log.info("[sentinel规则更新] definitions:{}", JSON.toJSONString(definitions));try {apiDefinitionsNew = filterData(definitions);}catch (Exception e){log.error("[sentinel规则更新] 过滤自定义sentinel规则失败 set:{} exMsg:{}", JSON.toJSONString(definitions),e.getMessage(),e);}log.info("[sentinel规则更新] definitions:{} apiDefinitionsNew:{}",JSON.toJSONString(definitions),JSON.toJSONString(apiDefinitionsNew));if (apiDefinitionsNew != null && !apiDefinitionsNew.isEmpty()) {apiDefinitionsNew.forEach(GatewayApiMatcherManagerExtension::addApiDefinition);} else {API_MATCHER_MAP.clear();}}private static Set<ApiDefinition> filterData(Set<ApiDefinition> set){Set<ApiDefinition> apiDefinitionsSetNew = new HashSet<>();if (CollectionUtils.isEmpty(set)){return set;}for (ApiDefinition apiDefinition : set) {if (ObjectUtils.isEmpty(apiDefinition) || StringUtils.isEmpty(apiDefinition.getApiName()) || !apiDefinition.getApiName().startsWith("custom_")){continue;}String apiName = apiDefinition.getApiName();Set<ApiPredicateItem> predicateItems = apiDefinition.getPredicateItems();if (CollectionUtils.isEmpty(predicateItems)){continue;}Set<ApiPredicateItem> apiPredicateItems = new HashSet<>();for (ApiPredicateItem predicateItem : predicateItems) {if (!(predicateItem instanceof ApiPathPredicateItem)){continue;}ApiPathPredicateItem item = new ApiPathPredicateItem();String pattern = ((ApiPathPredicateItem) predicateItem).getPattern();if (StringUtils.isEmpty(pattern) || !pattern.startsWith("custom_")){continue;}String[] split = pattern.split("custom_");item.setPattern(split[1]);item.setMatchStrategy(((ApiPathPredicateItem) predicateItem).getMatchStrategy());apiPredicateItems.add(item);}if (CollectionUtils.isEmpty(apiPredicateItems)){continue;}ApiDefinition apiDefinitionNew = new ApiDefinition();apiDefinitionNew.setApiName(apiName);apiDefinitionNew.setPredicateItems(apiPredicateItems);apiDefinitionsSetNew.add(apiDefinitionNew);}return apiDefinitionsSetNew;}static void addApiDefinition(ApiDefinition definition) {API_MATCHER_MAP.put(definition.getApiName(), new WebExchangeApiMatcher(definition));}private GatewayApiMatcherManagerExtension() {}
}
这样自定义规则就可以从GatewayApiMatcherManagerExtension类中读取到了,然后再新增一个过滤器,在请求进来的时候,根据规则,使用lua脚本进行限流就可了
lua脚本(这里使用的是固定窗口,当然也可以改成滑动窗口,具体看业务场景要求)
local key = KEYS[1]
local rangTime = tonumber(ARGV[1])
local limitCount = tonumber(ARGV[2])
local current = tonumber(redis.call('get', key) or "0")
if current+1 > limitCount thenreturn -1;
elseredis.call("incr", key)if current == 0 thenredis.call("expire",key,rangTime)endreturn 1
end
上面就是关于Sentinel限流相关整合过程了,代码仅供参考学习,欢迎留言讨论。后续会将完整的项目代码更新到远程仓库,欢迎关注我的公众号,后续会在公众号提供获取地址。下一篇将介绍一下es分词插件配置的几种方案