随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
作者 | 向寒 / 孙玄
来源 | 架构之美
头图 | 下载于视觉中国
关于 Sentinel
1、理论篇
以下是经过多年分布式经验总结的两个理论基础:
(1)微服务与治理的关系
(2)爬坡理论
我们今天的主题分为以下两个主要部分:
Sentinel设计原理
Sentinel运行流程源码剖析
Sentinel 设计原理
1、特性
丰富的应用场景:阿里 10 年双十一积累场景,含秒杀、双十一零点持续洪峰、热点商品探测、预热、消息队列削峰填谷、集群流量控制、实时熔断下游不可用应用等多样化的场景。
广泛的开源生态:提供开箱即用的与其它开源框架/库的整合模块,如Dubbo、Spring Cloud、gRPC、Zuul、Reactor 等。
完善的 SPI 扩展点:提供简单易用、完善的 SPI 扩展接口;可通过实现扩展接口来快速地定制逻辑。
完备的实时监控:提供实时的监控功能,可看到接入应用的单台机器秒级数据,及500 台以下规模的集群汇总运行情况。
2、核心关键点
(1)资源:限流的对象
如下代码/user/select即为一个资源:
1@GetMapping("/user/select")23@SentinelResource(value = "select", blockHandler = "exceptionHandler")45public TUser select(@RequestParam Integer userId) {67 log.info("post /user/select userid=" + userId);89 return userService.select(userId);
10
11}
即被SentinelResource注解修饰的API:
1@Target({ElementType.METHOD, ElementType.TYPE})23@Retention(RetentionPolicy.RUNTIME)45@Inherited67public @interface SentinelResource {89 String value() default "";
10
11
12
13 EntryType entryType() default EntryType.OUT;
14
15
16
17 int resourceType() default 0;
18
19
20
21 String blockHandler() default "";
22
23
24
25 Class<?>[] blockHandlerClass() default {};
26
27
28
29 String fallback() default "";
30
31......
32
33}
(2)入口:sentinel为每个资源创建一个Entry。
(3)槽链:每个Entry都会有一条用于记录限流以及各种控制的信息Slot chain,以此来实现下图中绿色部分的功能。
Sentinel 运行流程源码剖析
此图为官网全局流程图,接下来我们通过源码,分解该过程:
1、入口处
1SphU.entry("methodA", EntryType.IN);//入口
2
3}
核心代码
1SphU#lookProcessChain(ResourceWrapper resourceWrapper)
2、入口逻辑
1private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)23 throws BlockException {45 // 从threadLocal中获取当前线程对应的context实例。67 Context context = ContextUtil.getContext();89 if (context instanceof NullContext) {
10
11 // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
12
13 // so here init the entry only. No rule checking will be done.
14
15 // 如果context是nullContext的实例,表示当前context的总数已经达到阈值,所以这里直接创建entry实例,并返回,不进行规则的检查。
16
17 return new CtEntry(resourceWrapper, null, context);
18
19 }
20
21
22
23 if (context == null) {
24
25 // Using default context.
26
27 //如果context为空,则使用默认的名字创建一个,就是外部在调用SphU.entry(..)方法前如果没有调用ContextUtil.enter(..),则这里会调用该方法进行内部初始化context
28
29 context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
30
31 }
32
33
34
35 // Global switch is close, no rule checking will do.
36
37 // 总开关
38
39 if (!Constants.ON) {
40
41 return new CtEntry(resourceWrapper, null, context);
42
43 }
44
45
46
47 // 构造链路(核心实现) go in
48
49 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
50
51
52
53 /*
54
55 * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
56
57 * so no rule checking will be done.
58
59 * 当链的大小达到阈值Constants.MAX_SLOT_CHAIN_SIZE时,不会校验任何规则,直接返回。
60
61 */
62
63 if (chain == null) {
64
65 return new CtEntry(resourceWrapper, null, context);
66
67 }
68
69
70
71 Entry e = new CtEntry(resourceWrapper, chain, context);
72
73 try {
74
75 // 开始进行链路调用。
76
77 chain.entry(context, resourceWrapper, null, count, prioritized, args);
78
79 } catch (BlockException e1) {
80
81 e.exit(count, args);
82
83 throw e1;
84
85 } catch (Throwable e1) {
86
87 // This should not happen, unless there are errors existing in Sentinel internal.
88
89 RecordLog.info("Sentinel unexpected exception", e1);
90
91 }
92
93 return e;
94
95}
3、上下文信息
Context
Context是当前线程所持有的Sentinel上下文。
进入Sentinel的逻辑时,会首先获取当前线程的Context,如果没有则新建。当任务执行完毕后,会清除当前线程的context。Context 代表调用链路上下文,贯穿一次调用链路中的所有 Entry。
Context 维持着入口节点(entranceNode)、本次调用链路的 当前节点(curNode)、调用来源(origin)等信息。Context 名称即为调用链路入口名称。
Node
Node是对一个@SentinelResource标记的资源的统计包装。
Context中记录本当前线程资源调用的入口节点。
我们可以通过入口节点的childList,可以追溯资源的调用情况。而每个节点都对应一个@SentinelResource标记的资源及其统计数据,例如:passQps,blockQps,rt等数据。
Entry
Entry是Sentinel中用来表示是否通过限流的一个凭证,如果能正常返回,则说明你可以访问被Sentinel保护的后方服务,否则Sentinel会抛出一个BlockException。
另外,它保存了本次执行entry()方法的一些基本信息,包括资源的Context、Node、对应的责任链等信息,后续完成资源调用后,还需要更具获得的这个Entry去执行一些善后操作,包括退出Entry对应的责任链,完成节点的一些统计信息更新,清除当前线程的Context信息等。
在构建Context时已经完成下图部分:
4、核心流程
这里有两个需要注意的点:
ProcessorSlot chain = lookProcessChain(resourceWrapper); 构建链路。
chain.entry(context, resourceWrapper, null, count, prioritized, args); 进行链路调用首先来看链路是如何构建的。
5、获取槽链
已有直接获取;
没有去创建。
1 //在上下文中每一个资源都有各自的处理槽23 ProcessorSlotChain chain = chainMap.get(resourceWrapper);45 // 双重检查锁保证线程安全67 if (chain == null) {89 synchronized (LOCK) {
10
11 chain = ch ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {ainMap.get(resourceWrapper);
12
13 if (chain == null) {
14
15 // Entry size limit.
16
17 // 当链的长度达到阈值时,直接返回null,不进行规则的检查。
18
19 if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
20
21 return null;
22
23 }
24
25 // 构建链路 go in
26
27 chain = SlotChainProvider.newSlotChain();
28
29 Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
30
31 chainMap.size() + 1);
32
33 newMap.putAll(chainMap);
34
35 newMap.put(resourceWrapper, chain);
36
37 chainMap = newMap;
38
39 }
40
41 }
42
43 }
44
45 return chain;
46
47 }
6、创建槽链
SlotChainProvider.newSlotChain();
1 // 基于spi扩展点机制来扩展,默认为DefaultSlotChainBuilder
2
3slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
7、SPI加载ProcessorSlot
这里采用了spi的机制来扩展SlotChainBuilder,默认是采用DefaultSlotChainBuilder来实现的,可以看到sentinel源码的sentinel-core包下,META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder文件下,默认属性是:
1 slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
所以默认采用DefaultSlotChainBuilder来构建链路,因此找到DefaultSlotChainBuilder.build()方法。
8、DefaultSlotChainBuilder
1public ProcessorSlotChain build() {23 // 定义链路起点45 ProcessorSlotChain chain = new DefaultProcessorSlotChain();6789 // Note: the instances of ProcessorSlot should be different, since they are not stateless.
10
11 // 基于spi扩展机制,加载ProcessorSlot的实现类,从META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件下获取,并且按指定顺序排序
12
13 List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
14
15 // 遍历构建链路
16
17 for (ProcessorSlot slot : sortedSlotList) {
18
19 if (!(slot instanceof AbstractLinkedProcessorSlot)) {
20
21 RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
22
23 continue;
24
25 }
26
27 // 将slot节点加入链,因为已经排好序了,只需要加到最后即可
28
29 chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
30
31 }
32
33
34
35 return chain;
36
37 }
9、遍历ProcessorSlots
这里也是通过spi的机制,读取文件META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot:
1# Sentinel default ProcessorSlots23com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot45com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot67com.alibaba.csp.sentinel.slots.logger.LogSlot89com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
10
11com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
12
13com.alibaba.csp.sentinel.slots.system.SystemSlot
14
15com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
16
17com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
从这里看出,链路由这些节点组成,而slot之间的顺序是根据每个slot节点的@SpiOrder注解的值来确定的。
NodeSelectorSlot -> ClusterBuilderSlot -> LogSlot -> StatisticSlot -> AuthoritySlot -> SystemSlot -> FlowSlot -> DegradeSlot
链路调用
chain.entry(…)
上面已经构建好了链路,下面就要开始进行链路的调用了。
回到CtSph#entryWithPriority
1、NodeSelectorSlot
NodeSelectorSlot(@SpiOrder(-10000))
直接进入NodeSelectorSlot类的entry方法。
根据官方文档,NodeSelectorSlot类的作用为:
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。
1@Override23public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)45 throws Throwable {6789 // 双重检查锁+缓存 机制
10
11 DefaultNode node = map.get(context.getName());
12
13 if (node == null) {
14
15 synchronized (this) {
16
17 node = map.get(context.getName());
18
19 if (node == null) {
20
21 node = new DefaultNode(resourceWrapper, null);
22
23 HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
24
25 cacheMap.putAll(map);
26
27 cacheMap.put(context.getName(), node);
28
29 map = cacheMap;
30
31 // Build invocation tree
32
33 // 构建调用链的树形结构
34
35 ((DefaultNode) context.getLastNode()).addChild(node);
36
37 }
38
39
40
41 }
42
43 }
44
45
46
47 context.setCurNode(node);
48
49 // 进入下一个链
50
51 fireEntry(context, resourceWrapper, node, count, prioritized, args);
52
53}
2、ClusterBuilderSlot
ClusterBuilderSlot(@SpiOrder(-9000))
根据官方文档,ClusterBuilderSlot的作用为:
此插槽用于构建资源的 ClusterNode 以及调用来源节点。ClusterNode 保持某个资源运行统计信息(响应时间、QPS、block 数目、线程数、异常数等)以及调用来源统计信息列表。调用来源的名称由 ContextUtil.enter(contextName,origin) 中的 origin 标记。
3、LogSlot
LogSlot(@SpiOrder(-8000))
该类对链路的传递不做处理,只有在抛出BlockException的时候,向上层层传递的过程中,会通过该类来输入一些日志信息:
1@Override23public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)45 throws Throwable {67 try {89 fireEntry(context, resourceWrapper, obj, count, prioritized, args);
10
11 } catch (BlockException e) {
12
13 // 当抛出BlockException异常时,这里会输入日志信息
14
15 EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
16
17 context.getOrigin(), count);
18
19 throw e;
20
21 } catch (Throwable e) {
22
23 RecordLog.warn("Unexpected entry exception", e);
24
25 }
26
27}
4、StatisticSlot
StatisticSlot(@SpiOrder(-7000))
官方文档:
StatisticSlot用于记录、统计不同纬度的 runtime 指标监控信息。
1@Override23public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,45 boolean prioritized, Object... args) throws Throwable {67 try {89 // Do some checking.1011 // 先将调用链继续下去,等到后续链调用结束了,再执行下面的步骤1213 fireEntry(context, resourceWrapper, node, count, prioritized, args);14151617 // Request passed, add thread count and pass count.1819 node.increaseThreadNum();2021 node.addPassRequest(count);22232425 if (context.getCurEntry().getOriginNode() != null) {2627 // Add count for origin node.2829 context.getCurEntry().getOriginNode().increaseThreadNum();3031 context.getCurEntry().getOriginNode().addPassRequest(count);3233 }34353637 if (resourceWrapper.getEntryType() == EntryType.IN) {3839 // Add count for global inbound entry node for global statistics.4041 Constants.ENTRY_NODE.increaseThreadNum();4243 Constants.ENTRY_NODE.addPassRequest(count);4445 }46474849 // Handle pass event with registered entry callback handlers.5051 for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {5253 handler.onPass(context, resourceWrapper, node, count, args);5455 }5657 } catch (PriorityWaitException ex) {5859 node.increaseThreadNum();6061 if (context.getCurEntry().getOriginNode() != null) {6263 // Add count for origin node.6465 context.getCurEntry().getOriginNode().increaseThreadNum();6667 }68697071 if (resourceWrapper.getEntryType() == EntryType.IN) {7273 // Add count for global inbound entry node for global statistics.7475 Constants.ENTRY_NODE.increaseThreadNum();7677 }7879 // Handle pass event with registered entry callback handlers.8081 for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {8283 handler.onPass(context, resourceWrapper, node, count, args);8485 }8687 } catch (BlockException e) {8889 // Blocked, set block exception to current entry.9091 context.getCurEntry().setBlockError(e);92939495 // Add block count.9697 node.increaseBlockQps(count);9899 if (context.getCurEntry().getOriginNode() != null) {
100
101 context.getCurEntry().getOriginNode().increaseBlockQps(count);
102
103 }
104
105
106
107 if (resourceWrapper.getEntryType() == EntryType.IN) {
108
109 // Add count for global inbound entry node for global statistics.
110
111 Constants.ENTRY_NODE.increaseBlockQps(count);
112
113 }
114
115
116
117 // Handle block event with registered entry callback handlers.
118
119 for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
120
121 handler.onBlocked(e, context, resourceWrapper, node, count, args);
122
123 }
124
125
126
127 throw e;
128
129 } catch (Throwable e) {
130
131 // Unexpected internal error, set error to current entry.
132
133 context.getCurEntry().setError(e);
134
135
136
137 throw e;
138
139 }
140
141}
StatisticSlot 会先将链往下执行,等到后面的节点全部执行完毕,再进行数据统计。
5、AuthoritySlot
@SpiOrder(-6000)
AuthoritySlot
官方文档:
AuthoritySlot:根据配置的黑白名单和调用来源信息,来做黑白名单控制
1@Override23public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)45 throws Throwable {67 // 黑白名单权限控制89 checkBlackWhiteAuthority(resourceWrapper, context);
10
11 fireEntry(context, resourceWrapper, node, count, prioritized, args);
12
13}
14
15
16
17void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
18
19 Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
20
21
22
23 if (authorityRules == null) {
24
25 return;
26
27 }
28
29
30
31 Set<AuthorityRule> rules = authorityRules.get(resource.getName());
32
33 if (rules == null) {
34
35 return;
36
37 }
38
39
40
41 for (AuthorityRule rule : rules) {
42
43 if (!AuthorityRuleChecker.passCheck(rule, context)) {
44
45 throw new AuthorityException(context.getOrigin(), rule);
46
47 }
48
49 }
50
51}
6、SystemSlot
@SpiOrder(-5000)
SystemSlot
官方文档:
SystemSlot:这个 slot 会根据对于当前系统的整体情况,对入口资源的调用进行动态调配。其原理是让入口的流量和当前系统的预计容量达到一个动态平衡。
1@Override
2public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
3 boolean prioritized, Object... args) throws Throwable {
4 // 系统规则校验
5 SystemRuleManager.checkSystem(resourceWrapper);
6 fireEntry(context, resourceWrapper, node, count, prioritized, args);
7}
7、FlowSlot 限流规则引擎
@SpiOrder(-2000)
FlowSlot
官方文档:
这个 slot 主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止:
指定应用生效的规则,即针对调用方限流的;
调用方为 other 的规则;
调用方为 default 的规则。
入口
1@Override2public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,3 boolean prioritized, Object... args) throws Throwable {4 // 检查限流规则5 checkFlow(resourceWrapper, context, node, count, prioritized);67 fireEntry(context, resourceWrapper, node, count, prioritized, args);8}9
10void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
11 throws BlockException {
12 checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
13}
1、所有规则检查
调用了FlowRuleChecker.checkFlow(…)方法。
1public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,2 Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {3 if (ruleProvider == null || resource == null) {4 return;5 }6 // 根据资源名称找到对应的7 Collection<FlowRule> rules = ruleProvider.apply(resource.getName());8 if (rules != null) {9 // 遍历规则,依次判断是否通过
10 for (FlowRule rule : rules) {
11 if (!canPassCheck(rule, context, node, count, prioritized)) {
12 throw new FlowException(rule.getLimitApp(), rule);
13 }
14 }
15 }
16}
2、单个规则检查
1public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,2 boolean prioritized) {3 String limitApp = rule.getLimitApp();4 if (limitApp == null) {5 return true;6 }7 // 集群限流的判断8 if (rule.isClusterMode()) {9 return passClusterCheck(rule, context, node, acquireCount, prioritized);
10 }
11 // 本地节点的判断
12 return passLocalCheck(rule, context, node, acquireCount, prioritized);
13}
3、非集群模式的限流判断
1private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,2 boolean prioritized) {3 // 根据请求的信息及策略,选择不同的node节点4 Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);5 if (selectedNode == null) {6 return true;7 }8 // 根据当前规则,获取规则控制器,调用canPass方法进行判断9// rule.getRater()放回的是TrafficShapingController接口的实现类,使用了策略模式,根据使用的控制措施来选择使用哪种实现。
10 return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
11}
这里是先根据请求和当前规则的策略,找到该规则下存储统计信息的节点,然后根据当前规则获取相应控制器,通过控制器的canPass(…)方法进行判断。
4、获取节点
1static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {2 // The limit app should not be empty.3 String limitApp = rule.getLimitApp();4 int strategy = rule.getStrategy();5 String origin = context.getOrigin();67 // 判断调用来源,这种情况下origin不能为default或other8 if (limitApp.equals(origin) && filterOrigin(origin)) {9 // 如果调用关系策略为STRATEGY_DIRECT,表示仅判断自己,则返回origin statistic node.
10 if (strategy == RuleConstant.STRATEGY_DIRECT) {
11 // Matches limit origin, return origin statistic node.
12 return context.getOriginNode();
13 }
14
15 // 采用调用来源进行判断的策略
16 return selectReferenceNode(rule, context, node);
17 } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { // 如果调用来源为default默认的
18 if (strategy == RuleConstant.STRATEGY_DIRECT) { // 如果调用关系策略为STRATEGY_DIRECT,则返回clusterNode
19 // Return the cluster node.
20 return node.getClusterNode();
21 }
22
23 return selectReferenceNode(rule, context, node);
24 } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
25 && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { // 如果调用来源为other,且调用来源不在限制规则内,为其他来源
26 if (strategy == RuleConstant.STRATEGY_DIRECT) {
27 return context.getOriginNode();
28 }
29 return selectReferenceNode(rule, context, node);
30 }
31 return null;
32}
5、流量整形控制器
rule.getRater()方法会返回一个控制器,接口为TrafficShapingController,该接口的实现类图如下:
从类图可以看出,是很明显的策略模式,分别针对不同的限流控制策略。
1、默认策略
DefaultController该策略是sentinel的默认策略,如果请求超出阈值,则直接拒绝请求。
1@Override2public boolean canPass(Node node, int acquireCount, boolean prioritized) {3 // 当前已经统计的数4 int curCount = avgUsedTokens(node);5 if (curCount + acquireCount > count) {6 // 如果是高优先级的,且是基于qps的限流方式,则可以尝试从下个未来的滑动窗口中预支7 if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {8 long currentTime;9 long waitInMs;
10 currentTime = TimeUtil.currentTimeMillis();
11 // 从下个滑动窗口中提前透支
12 waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
13 if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
14 node.addWaitingRequest(currentTime + waitInMs, acquireCount);
15 node.addOccupiedPass(acquireCount);
16 sleep(waitInMs);
17
18 // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
19 throw new PriorityWaitException(waitInMs);
20 }
21 }
22 return false;
23 }
24 return true;
25}
26
27private int avgUsedTokens(Node node) {
28 if (node == null) {
29 return DEFAULT_AVG_USED_TOKENS;
30 }
31 // 如果当前是线程数限流,则返回node.curThreadNum()当前线程数
32 // 如果是QPS限流,则返回node.passQps()当前已经通过的qps数据
33 return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
34}
35
36private void sleep(long timeMillis) {
37 try {
38 Thread.sleep(timeMillis);
39 } catch (InterruptedException e) {
40 // Ignore.
41 }
42}
2、匀速排队策略
RateLimiterController
1@Override2public boolean canPass(Node node, int acquireCount, boolean prioritized) {3 // Pass when acquire count is less or equal than 0.4 if (acquireCount <= 0) {5 return true;6 }7 // Reject when count is less or equal than 0.8 // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.9 if (count <= 0) {
10 return false;
11 }
12
13 long currentTime = TimeUtil.currentTimeMillis();
14 // Calculate the interval between every two requests.
15 // 计算两个请求之间的时间间隔
16 long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
17
18 // Expected pass time of this request. 该请求的预计通过时间 = 上一次通过的时间 + 时间间隔
19 long expectedTime = costTime + latestPassedTime.get();
20
21 // 如果预计时间比当前时间小,表示可以请求完全可以通过
22 if (expectedTime <= currentTime) {
23 // Contention may exist here, but it's okay.
24 // 这里可能存在竞争,但是不影响。
25 latestPassedTime.set(currentTime);
26 return true;
27 } else {
28 // Calculate the time to wait.
29 // 计算等待时间
30 long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
31 // 如果等待时间超出了等待队列的最大时间,则无法放入等待队列,直接拒绝
32 if (waitTime > maxQueueingTimeMs) {
33 return false;
34 } else {
35 long oldTime = latestPassedTime.addAndGet(costTime);
36 try {
37 // 重新计算等待时间
38 waitTime = oldTime - TimeUtil.currentTimeMillis();
39 // 判断等待时间是否超过等待队列的最大时间,如果超过了,拒绝,并且将latestPassedTime最后一次请求时间重新设置为原值
40 if (waitTime > maxQueueingTimeMs) {
41 latestPassedTime.addAndGet(-costTime);
42 return false;
43 }
44 // in race condition waitTime may <= 0
45 // 线程等待
46 if (waitTime > 0) {
47 Thread.sleep(waitTime);
48 }
49 return true;
50 } catch (InterruptedException e) {
51 }
52 }
53 }
54 return false;
55}
从代码可以看出,匀速排队策略是使用了虚拟队列的方法,通过控制阈值来计算出请求的时间间隔,然后将上一次请求的时间加上时间间隔,表示下一次请求的时间,如果当前时间比这个值大,说明已经超出时间间隔了,当然可以请求,反之,表示需要等待,那么等待的时长就应该是要等到当前时间达到预期时间才能请求,这里就有个虚拟的等待队列,而等待其实是通过线程的等待来实现的。而这里所说的虚拟队列实际上是由一系列的处于sleep状态的线程组成的,但是实际的数据结构上并没有构成队列。
3、预热/冷启动策略
WarmUpController
首先看WarmUpController的属性和构造方法:
1// 阈值2protected double count;3/**4* 冷启动的因子 ,默认为3 {@link SentinelConfig#coldFactor()}5*/6private int coldFactor;7// 转折点的令牌数8protected int warningToken = 0;9// 最大令牌数
10private int maxToken;
11// 折线初始斜率,标志流量的变化程度
12protected double slope;
13
14// 累积的令牌数 ,累积的令牌数越多,说明系统利用率越低,说明当前流量低,是冷状态
15protected AtomicLong storedTokens = new AtomicLong(0);
16// 最后更新令牌的时间
17protected AtomicLong lastFilledTime = new AtomicLong(0);
18
19public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
20 construct(count, warmUpPeriodInSec, coldFactor);
21}
22
23public WarmUpController(double count, int warmUpPeriodInSec) {
24 construct(count, warmUpPeriodInSec, 3);
25}
26
27/**
28* @param count 用户设定的阈值(这里假设设定为100)
29* @param warmUpPeriodInSec 默认为10
30* @param coldFactor 默认为3
31*/
32private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
33
34 if (coldFactor <= 1) {
35 throw new IllegalArgumentException("Cold factor should be larger than 1");
36 }
37
38 this.count = count;
39
40 this.coldFactor = coldFactor;
41
42 // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
43 // warningToken = 100;
44
45 // 按默认的warmUpPeriodInSec = 10,表示1秒钟10个请求,则每个请求为间隔stableInterval = 100ms,那么coldInterval=stableInterval * coldFactor = 100 * 3 = 300ms
46 // warningToken = 10 * 100 / (3 - 1) = 500
47 // thresholdPermits = warningToken = 0.5 * warmupPeriod / stableInterval = 0.5 * warmupPeriod / 100ms = 500 ==>> warmupPeriod = 100000ms
48 warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
49
50 // / maxPermits = thresholdPermits + 2 * warmupPeriod /
51 // (stableInterval + coldInterval)
52 // maxToken = 200
53
54 // maxPermits = 500 + 2 * 100000ms / (100ms + 300ms) = 1000
55 // maxToken = 500 + (2 * 10 * 100 / (1.0 + 3)) = 1000
56 // maxPermits = maxToken
57 maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
58
59 // slope
60 // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
61 // - thresholdPermits);
62
63 // slope = (3 - 1.0) / 100 / (600 - 500) = 0.0002
64 slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
65
66}
属性说明:
count: 用户设定的qps阈值。
coldFactor: 冷启动的因子,初始默认为3,通过SentinelConfig类的coldFactor()方法获取,这里会有个判断,如果启动因子小于等于1,则会设置为默认值3,因为如果是小于等于1,是没有意义的,就不是预热启动了。
warningToken:转折点的令牌数,当令牌数开始小于该值得时候,就要开启预热了。
maxToken:最大令牌数。
slope:折线的斜率。
storedTokens:当前存储的令牌数。
lastFilledTime:上一次更新令牌的时间。
总体思路:当系统存储的令牌为最大值时,说明系统访问流量较低,处于冷状态,这时候当有正常请求过来时,会让请求通过,并且会补充消耗的令牌数。当瞬时流量来临时,一旦剩余的令牌数小于警戒令牌数(restToken <= warningToken),则表示有大流量过来,需要开启预热过程,开始逐渐增大允许的qps。当qps达到用户设定的阈值后,系统已经预热完毕,这时候就进入了正常的请求阶段。
源码分析如下:
1@Override2public boolean canPass(Node node, int acquireCount, boolean prioritized) {3 // 当前已经通过的qps4 long passQps = (long) node.passQps();56 // 上一个滑动窗口的qps7 long previousQps = (long) node.previousPassQps();8 // 同步令牌,如果是出于冷启动或预热完毕状态,则考虑要添加令牌9 syncToken(previousQps);
10
11 // 开始计算它的斜率
12 // 如果进入了警戒线,开始调整他的qps
13 long restToken = storedTokens.get();
14 if (restToken >= warningToken) { // 说明一瞬间有大流量过来,消耗了大量的存储令牌,造成剩余令牌数第一警戒值,则要开启预热默认,逐渐增加qps
15 // 计算当前离警戒线的距离
16 long aboveToken = restToken - warningToken;
17 // 消耗的速度要比warning快,但是要比慢
18 // current interval = restToken*slope+1/count
19 // restToken越小,interval就越小,表示系统越热
20 // 随着aboveToken的减小,warningQps会逐渐增大
21 double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
22 if (passQps + acquireCount <= warningQps) { // 随着warningQps的增大,acquireCount = 1,那么passQps允许的范围就变大,相应的流量就越大,系统越热
23 return true;
24 }
25 } else {
26 if (passQps + acquireCount <= count) {
27 return true;
28 }
29 }
30
31 return false;
32}
33
34/**
35* 同步令牌
36* @param passQps
37*/
38protected void syncToken(long passQps) {
39 long currentTime = TimeUtil.currentTimeMillis();
40 // 把当前时间的后三位置为0 e.g. 1601456312835 = 1601456312835 - 1601456312835 % 1000 = 1601456312000
41 currentTime = currentTime - currentTime % 1000;
42 // 获取上一次更新令牌的时间
43 long oldLastFillTime = lastFilledTime.get();
44 if (currentTime <= oldLastFillTime) {
45 return;
46 }
47
48 // 获得目前的令牌数
49 long oldValue = storedTokens.get();
50 // 获取新的令牌数
51 long newValue = coolDownTokens(currentTime, passQps);
52
53 // 更新累积令牌数
54 if (storedTokens.compareAndSet(oldValue, newValue)) {
55 // 去除上一次的qps,设置剩下的令牌数
56 long currentValue = storedTokens.addAndGet(0 - passQps);
57 if (currentValue < 0) {
58 // 如果剩下的令牌数小于0,则置为0。
59 storedTokens.set(0L);
60 }
61 // 设置令牌更新时间
62 lastFilledTime.set(currentTime);
63 }
64}
65
66private long coolDownTokens(long currentTime, long passQps) {
67 // 当前拥有的令牌数
68 long oldValue = storedTokens.get();
69 long newValue = oldValue;
70
71 // 添加令牌的判断前提条件:
72 // 当令牌的消耗程度远远低于警戒线的时候
73 if (oldValue < warningToken) { // 这种情况表示已经预热结束,可以开始生成令牌了
74 // 这里按照count = 100来计算的话,表示旧值oldValue + 距离上次更新的秒数时间差 * count ,表示每秒增加count个令牌
75 // 这里的currentTime 和 lastFilledTime.get() 都是已经去掉毫秒数的
76 newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
77 } else if (oldValue > warningToken) { // 进入这里表示当前是冷状态或正处于预热状态
78 if (passQps < (int)count / coldFactor) { // 如果是冷状态,则补充令牌数,避免令牌数为0
79 newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
80 }
81 // 预热阶段则不添加令牌数,从而限制流量的急剧攀升
82 }
83 // 限制令牌数不能超过最大令牌数maxToken
84 return Math.min(newValue, maxToken);
85}
4、预热的匀速排队策略
WarmUpRateLimiterController
这种是匀速排队模式和预热模式的结合,这里不深入了。搞懂了上面两种,再看这种也比较清晰了。
5、DegradeSlot
官方文档说明:
这个 slot 主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。
源码解析:
1@Override2public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,3 boolean prioritized, Object... args) throws Throwable {4 //降级判断5 performChecking(context, resourceWrapper);67 // 如果有自定义的slot,还会继续进行8 fireEntry(context, resourceWrapper, node, count, prioritized, args);9}
10
11void performChecking(Context context, ResourceWrapper r) throws BlockException {
12 // 使用DegradeRuleManager获得当前资源的熔断器
13 List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
14 if (circuitBreakers == null || circuitBreakers.isEmpty()) {
15 return;
16 }
17 // 遍历熔断器,只要有任何一个满足熔断条件,就抛出DegradeException异常。
18 for (CircuitBreaker cb : circuitBreakers) {
19 if (!cb.tryPass(context)) {
20 throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
21 }
22 }
23}
这里有个关键类,DegradeRuleManager,该类中会保存所有的熔断规则,使用Map<String, List>的格式进行保存。当需要使用的时候,就直接根据资源名称,从该map中获取对应的熔断器列表。
那么规则是如何加载的呢?我们看到DegradeRuleManager这个类,在加载时候,有个静态代码块:
1private static final RulePropertyListener LISTENER = new RulePropertyListener();
2private static SentinelProperty<List<DegradeRule>> currentProperty
3 = new DynamicSentinelProperty<>();
4
5static {
6 currentProperty.addListener(LISTENER);
7}
currentProperty.addListener(LISTENER);继续分析该段代码,找到DynamicSentinelProperty的addListener(…)方法:
1@Override
2public void addListener(PropertyListener<T> listener) {
3 listeners.add(listener);
4 listener.configLoad(value);
5}
612345
发现会调用监听器的configLoad(…)方法,最终会调用RulePropertyListener这个类的reloadFrom(…)方法。具体怎么解析的其实就是将规则根据资源名称进行归类,并保存为map格式。
FlowSlot 限流规则引擎之限流算法原理
1、滑动窗口实现原理
每个时间窗口最大流量为100QPS;
20和80表示当时的真实QPS数量;
一个时间窗口分为两个半限,上半限和下半限;
如果时间窗口1的下半限和时间窗口2的上半限的峰值超过100QPS,那么就丢失一部分流量。
但是这样并不是我们想要的,那么我们来看看计数器滑动窗口。
2、计数器滑动窗口原理
在滑动窗口算法上优化;
相邻的两个半限总和>总阈值,才丢弃流量。
3、令牌桶算法
令牌漏斗桶存着所有的Token;
按期发放Token;
如果桶满了,就会熔断;
达到Token的Request可以获取资源;
得不到的就抛弃。
图文总结
1、整体流程
(1)请求发送到web容器;
(2)Sentinel Aop拦截所有Sentinel Resouce;
(3)如果资源的规则通过则执行正常流程;
(4)不通过则返回流控异常提示。
2、Sentinel AOP切面运行流程
更多阅读推荐
都在说云原生,它的技术图谱你真的了解吗?
SRE 是如何保障稳定性的
如何写出让 CPU 跑得更快的代码?
Serverless 在 SaaS 领域的最佳实践
云原生人物志 | Pulsar翟佳:社区的信任最重要
一目了然的 Docker 环境配置指南