深度思考 Spring Cloud + Alibaba Sentinel 源码原理

随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。

作者 | 向寒 / 孙玄

来源 | 架构之美

头图 | 下载于视觉中国

关于 Sentinel 

1、理论篇

以下是经过多年分布式经验总结的两个理论基础:

(1)微服务与治理的关系

(2)爬坡理论

我们今天的主题分为以下两个主要部分:

  • Sentinel设计原理

  • Sentinel运行流程源码剖析

Sentinel 设计原理

1、特性

丰富的应用场景:阿里 10 年双十一积累场景,含秒杀、双十一零点持续洪峰、热点商品探测、预热、消息队列削峰填谷、集群流量控制、实时熔断下游不可用应用等多样化的场景。

广泛的开源生态:提供开箱即用的与其它开源框架/库的整合模块,如Dubbo、Spring Cloud、gRPC、Zuul、Reactor 等。

完善的 SPI 扩展点:提供简单易用、完善的 SPI 扩展接口;可通过实现扩展接口来快速地定制逻辑。

完备的实时监控:提供实时的监控功能,可看到接入应用的单台机器秒级数据,及500 台以下规模的集群汇总运行情况。

2、核心关键点

(1)资源:限流的对象

如下代码/user/select即为一个资源:

 1@GetMapping("/user/select")23@SentinelResource(value = "select", blockHandler = "exceptionHandler")45public TUser select(@RequestParam Integer userId) {67    log.info("post /user/select userid=" + userId);89    return userService.select(userId);
10
11}

即被SentinelResource注解修饰的API:

 1@Target({ElementType.METHOD, ElementType.TYPE})23@Retention(RetentionPolicy.RUNTIME)45@Inherited67public @interface SentinelResource {89    String value() default "";
10
11
12
13    EntryType entryType() default EntryType.OUT;
14
15
16
17    int resourceType() default 0;
18
19
20
21    String blockHandler() default "";
22
23
24
25    Class<?>[] blockHandlerClass() default {};
26
27
28
29    String fallback() default "";
30
31......
32
33}

(2)入口:sentinel为每个资源创建一个Entry。

(3)槽链:每个Entry都会有一条用于记录限流以及各种控制的信息Slot chain,以此来实现下图中绿色部分的功能。

Sentinel 运行流程源码剖析

此图为官网全局流程图,接下来我们通过源码,分解该过程:

1、入口处

1SphU.entry("methodA", EntryType.IN);//入口
2
3}

核心代码

1SphU#lookProcessChain(ResourceWrapper resourceWrapper)

2、入口逻辑

 1private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)23    throws BlockException {45    // 从threadLocal中获取当前线程对应的context实例。67    Context context = ContextUtil.getContext();89    if (context instanceof NullContext) {
10
11        // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
12
13        // so here init the entry only. No rule checking will be done.
14
15        // 如果context是nullContext的实例,表示当前context的总数已经达到阈值,所以这里直接创建entry实例,并返回,不进行规则的检查。
16
17        return new CtEntry(resourceWrapper, null, context);
18
19    }
20
21
22
23    if (context == null) {
24
25        // Using default context.
26
27        //如果context为空,则使用默认的名字创建一个,就是外部在调用SphU.entry(..)方法前如果没有调用ContextUtil.enter(..),则这里会调用该方法进行内部初始化context
28
29        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
30
31    }
32
33
34
35    // Global switch is close, no rule checking will do.
36
37    // 总开关
38
39    if (!Constants.ON) {
40
41        return new CtEntry(resourceWrapper, null, context);
42
43    }
44
45
46
47    // 构造链路(核心实现) go in
48
49    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
50
51
52
53    /*
54
55     * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
56
57     * so no rule checking will be done.
58
59     * 当链的大小达到阈值Constants.MAX_SLOT_CHAIN_SIZE时,不会校验任何规则,直接返回。
60
61     */
62
63    if (chain == null) {
64
65        return new CtEntry(resourceWrapper, null, context);
66
67    }
68
69
70
71    Entry e = new CtEntry(resourceWrapper, chain, context);
72
73    try {
74
75        // 开始进行链路调用。
76
77        chain.entry(context, resourceWrapper, null, count, prioritized, args);
78
79    } catch (BlockException e1) {
80
81        e.exit(count, args);
82
83        throw e1;
84
85    } catch (Throwable e1) {
86
87        // This should not happen, unless there are errors existing in Sentinel internal.
88
89        RecordLog.info("Sentinel unexpected exception", e1);
90
91    }
92
93    return e;
94
95}

3、上下文信息

Context

Context是当前线程所持有的Sentinel上下文。

进入Sentinel的逻辑时,会首先获取当前线程的Context,如果没有则新建。当任务执行完毕后,会清除当前线程的context。Context 代表调用链路上下文,贯穿一次调用链路中的所有 Entry。

Context 维持着入口节点(entranceNode)、本次调用链路的 当前节点(curNode)、调用来源(origin)等信息。Context 名称即为调用链路入口名称。

Node

Node是对一个@SentinelResource标记的资源的统计包装。

Context中记录本当前线程资源调用的入口节点。

我们可以通过入口节点的childList,可以追溯资源的调用情况。而每个节点都对应一个@SentinelResource标记的资源及其统计数据,例如:passQps,blockQps,rt等数据。

Entry

Entry是Sentinel中用来表示是否通过限流的一个凭证,如果能正常返回,则说明你可以访问被Sentinel保护的后方服务,否则Sentinel会抛出一个BlockException。

另外,它保存了本次执行entry()方法的一些基本信息,包括资源的Context、Node、对应的责任链等信息,后续完成资源调用后,还需要更具获得的这个Entry去执行一些善后操作,包括退出Entry对应的责任链,完成节点的一些统计信息更新,清除当前线程的Context信息等。

在构建Context时已经完成下图部分:

4、核心流程

这里有两个需要注意的点:

  • ProcessorSlot chain = lookProcessChain(resourceWrapper); 构建链路。

  • chain.entry(context, resourceWrapper, null, count, prioritized, args); 进行链路调用首先来看链路是如何构建的。

5、获取槽链

  • 已有直接获取;

  • 没有去创建。

 1       //在上下文中每一个资源都有各自的处理槽23        ProcessorSlotChain chain = chainMap.get(resourceWrapper);45        // 双重检查锁保证线程安全67        if (chain == null) {89            synchronized (LOCK) {
10
11                chain = ch ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {ainMap.get(resourceWrapper);
12
13                if (chain == null) {
14
15                    // Entry size limit.
16
17                    // 当链的长度达到阈值时,直接返回null,不进行规则的检查。
18
19                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
20
21                        return null;
22
23                    }
24
25                    // 构建链路 go in
26
27                    chain = SlotChainProvider.newSlotChain();
28
29                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
30
31                        chainMap.size() + 1);
32
33                    newMap.putAll(chainMap);
34
35                    newMap.put(resourceWrapper, chain);
36
37                    chainMap = newMap;
38
39                }
40
41            }
42
43        }
44
45        return chain;
46
47    }

6、创建槽链

SlotChainProvider.newSlotChain();

1   // 基于spi扩展点机制来扩展,默认为DefaultSlotChainBuilder
2
3slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);

7、SPI加载ProcessorSlot

这里采用了spi的机制来扩展SlotChainBuilder,默认是采用DefaultSlotChainBuilder来实现的,可以看到sentinel源码的sentinel-core包下,META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder文件下,默认属性是:

1  slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);

所以默认采用DefaultSlotChainBuilder来构建链路,因此找到DefaultSlotChainBuilder.build()方法。

8、DefaultSlotChainBuilder

 1public ProcessorSlotChain build() {23        // 定义链路起点45        ProcessorSlotChain chain = new DefaultProcessorSlotChain();6789        // Note: the instances of ProcessorSlot should be different, since they are not stateless.
10
11        // 基于spi扩展机制,加载ProcessorSlot的实现类,从META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件下获取,并且按指定顺序排序
12
13        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
14
15        // 遍历构建链路
16
17        for (ProcessorSlot slot : sortedSlotList) {
18
19            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
20
21                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
22
23                continue;
24
25            }
26
27            // 将slot节点加入链,因为已经排好序了,只需要加到最后即可
28
29            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
30
31        }
32
33
34
35        return chain;
36
37    }

9、遍历ProcessorSlots

这里也是通过spi的机制,读取文件META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot:

 1# Sentinel default ProcessorSlots23com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot45com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot67com.alibaba.csp.sentinel.slots.logger.LogSlot89com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
10
11com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
12
13com.alibaba.csp.sentinel.slots.system.SystemSlot
14
15com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
16
17com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

从这里看出,链路由这些节点组成,而slot之间的顺序是根据每个slot节点的@SpiOrder注解的值来确定的。

NodeSelectorSlot -> ClusterBuilderSlot -> LogSlot -> StatisticSlot -> AuthoritySlot -> SystemSlot -> FlowSlot -> DegradeSlot

链路调用 

chain.entry(…)

上面已经构建好了链路,下面就要开始进行链路的调用了。

回到CtSph#entryWithPriority

1、NodeSelectorSlot

NodeSelectorSlot(@SpiOrder(-10000))

直接进入NodeSelectorSlot类的entry方法。

根据官方文档,NodeSelectorSlot类的作用为:

负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。

 1@Override23public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)45    throws Throwable {6789    // 双重检查锁+缓存 机制
10
11    DefaultNode node = map.get(context.getName());
12
13    if (node == null) {
14
15        synchronized (this) {
16
17            node = map.get(context.getName());
18
19            if (node == null) {
20
21                node = new DefaultNode(resourceWrapper, null);
22
23                HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
24
25                cacheMap.putAll(map);
26
27                cacheMap.put(context.getName(), node);
28
29                map = cacheMap;
30
31                // Build invocation tree
32
33                // 构建调用链的树形结构
34
35                ((DefaultNode) context.getLastNode()).addChild(node);
36
37            }
38
39
40
41        }
42
43    }
44
45
46
47    context.setCurNode(node);
48
49    // 进入下一个链
50
51    fireEntry(context, resourceWrapper, node, count, prioritized, args);
52
53}

2、ClusterBuilderSlot

ClusterBuilderSlot(@SpiOrder(-9000))

根据官方文档,ClusterBuilderSlot的作用为:

此插槽用于构建资源的 ClusterNode 以及调用来源节点。ClusterNode 保持某个资源运行统计信息(响应时间、QPS、block 数目、线程数、异常数等)以及调用来源统计信息列表。调用来源的名称由 ContextUtil.enter(contextName,origin) 中的 origin 标记。

3、LogSlot

LogSlot(@SpiOrder(-8000))

该类对链路的传递不做处理,只有在抛出BlockException的时候,向上层层传递的过程中,会通过该类来输入一些日志信息:

 1@Override23public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)45    throws Throwable {67    try {89        fireEntry(context, resourceWrapper, obj, count, prioritized, args);
10
11    } catch (BlockException e) {
12
13        // 当抛出BlockException异常时,这里会输入日志信息
14
15        EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
16
17            context.getOrigin(), count);
18
19        throw e;
20
21    } catch (Throwable e) {
22
23        RecordLog.warn("Unexpected entry exception", e);
24
25    }
26
27}

4、StatisticSlot

StatisticSlot(@SpiOrder(-7000))

官方文档:

StatisticSlot用于记录、统计不同纬度的 runtime 指标监控信息。

  1@Override23public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,45                  boolean prioritized, Object... args) throws Throwable {67    try {89        // Do some checking.1011        // 先将调用链继续下去,等到后续链调用结束了,再执行下面的步骤1213        fireEntry(context, resourceWrapper, node, count, prioritized, args);14151617        // Request passed, add thread count and pass count.1819        node.increaseThreadNum();2021        node.addPassRequest(count);22232425        if (context.getCurEntry().getOriginNode() != null) {2627            // Add count for origin node.2829            context.getCurEntry().getOriginNode().increaseThreadNum();3031            context.getCurEntry().getOriginNode().addPassRequest(count);3233        }34353637        if (resourceWrapper.getEntryType() == EntryType.IN) {3839            // Add count for global inbound entry node for global statistics.4041            Constants.ENTRY_NODE.increaseThreadNum();4243            Constants.ENTRY_NODE.addPassRequest(count);4445        }46474849        // Handle pass event with registered entry callback handlers.5051        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {5253            handler.onPass(context, resourceWrapper, node, count, args);5455        }5657    } catch (PriorityWaitException ex) {5859        node.increaseThreadNum();6061        if (context.getCurEntry().getOriginNode() != null) {6263            // Add count for origin node.6465            context.getCurEntry().getOriginNode().increaseThreadNum();6667        }68697071        if (resourceWrapper.getEntryType() == EntryType.IN) {7273            // Add count for global inbound entry node for global statistics.7475            Constants.ENTRY_NODE.increaseThreadNum();7677        }7879        // Handle pass event with registered entry callback handlers.8081        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {8283            handler.onPass(context, resourceWrapper, node, count, args);8485        }8687    } catch (BlockException e) {8889        // Blocked, set block exception to current entry.9091        context.getCurEntry().setBlockError(e);92939495        // Add block count.9697        node.increaseBlockQps(count);9899        if (context.getCurEntry().getOriginNode() != null) {
100
101            context.getCurEntry().getOriginNode().increaseBlockQps(count);
102
103        }
104
105
106
107        if (resourceWrapper.getEntryType() == EntryType.IN) {
108
109            // Add count for global inbound entry node for global statistics.
110
111            Constants.ENTRY_NODE.increaseBlockQps(count);
112
113        }
114
115
116
117        // Handle block event with registered entry callback handlers.
118
119        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
120
121            handler.onBlocked(e, context, resourceWrapper, node, count, args);
122
123        }
124
125
126
127        throw e;
128
129    } catch (Throwable e) {
130
131        // Unexpected internal error, set error to current entry.
132
133        context.getCurEntry().setError(e);
134
135
136
137        throw e;
138
139    }
140
141}

StatisticSlot 会先将链往下执行,等到后面的节点全部执行完毕,再进行数据统计。

5、AuthoritySlot

@SpiOrder(-6000)

AuthoritySlot

官方文档:

AuthoritySlot:根据配置的黑白名单和调用来源信息,来做黑白名单控制

 1@Override23public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)45    throws Throwable {67    // 黑白名单权限控制89    checkBlackWhiteAuthority(resourceWrapper, context);
10
11    fireEntry(context, resourceWrapper, node, count, prioritized, args);
12
13}
14
15
16
17void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
18
19    Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
20
21
22
23    if (authorityRules == null) {
24
25        return;
26
27    }
28
29
30
31    Set<AuthorityRule> rules = authorityRules.get(resource.getName());
32
33    if (rules == null) {
34
35        return;
36
37    }
38
39
40
41    for (AuthorityRule rule : rules) {
42
43        if (!AuthorityRuleChecker.passCheck(rule, context)) {
44
45            throw new AuthorityException(context.getOrigin(), rule);
46
47        }
48
49    }
50
51}

6、SystemSlot

@SpiOrder(-5000)

SystemSlot

官方文档:

SystemSlot:这个 slot 会根据对于当前系统的整体情况,对入口资源的调用进行动态调配。其原理是让入口的流量和当前系统的预计容量达到一个动态平衡。

1@Override
2public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
3                 boolean prioritized, Object... args) throws Throwable {
4   // 系统规则校验
5   SystemRuleManager.checkSystem(resourceWrapper);
6   fireEntry(context, resourceWrapper, node, count, prioritized, args);
7}

7、FlowSlot 限流规则引擎

@SpiOrder(-2000)

FlowSlot

官方文档:

这个 slot 主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止:

  • 指定应用生效的规则,即针对调用方限流的;

  • 调用方为 other 的规则;

  • 调用方为 default 的规则。


入口

 1@Override2public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,3                 boolean prioritized, Object... args) throws Throwable {4   // 检查限流规则5   checkFlow(resourceWrapper, context, node, count, prioritized);67   fireEntry(context, resourceWrapper, node, count, prioritized, args);8}9
10void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
11   throws BlockException {
12   checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
13}

1、所有规则检查

调用了FlowRuleChecker.checkFlow(…)方法。

 1public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,2                     Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {3   if (ruleProvider == null || resource == null) {4       return;5  }6   // 根据资源名称找到对应的7   Collection<FlowRule> rules = ruleProvider.apply(resource.getName());8   if (rules != null) {9       // 遍历规则,依次判断是否通过
10       for (FlowRule rule : rules) {
11           if (!canPassCheck(rule, context, node, count, prioritized)) {
12               throw new FlowException(rule.getLimitApp(), rule);
13          }
14      }
15  }
16}

2、单个规则检查

 1public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,2                                               boolean prioritized) {3   String limitApp = rule.getLimitApp();4   if (limitApp == null) {5       return true;6  }7   // 集群限流的判断8   if (rule.isClusterMode()) {9       return passClusterCheck(rule, context, node, acquireCount, prioritized);
10  }
11   // 本地节点的判断
12   return passLocalCheck(rule, context, node, acquireCount, prioritized);
13}

3、非集群模式的限流判断

 1private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,2                                     boolean prioritized) {3   // 根据请求的信息及策略,选择不同的node节点4   Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);5   if (selectedNode == null) {6       return true;7  }8   // 根据当前规则,获取规则控制器,调用canPass方法进行判断9//       rule.getRater()放回的是TrafficShapingController接口的实现类,使用了策略模式,根据使用的控制措施来选择使用哪种实现。
10   return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
11}

这里是先根据请求和当前规则的策略,找到该规则下存储统计信息的节点,然后根据当前规则获取相应控制器,通过控制器的canPass(…)方法进行判断。

4、获取节点
 1static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {2   // The limit app should not be empty.3   String limitApp = rule.getLimitApp();4   int strategy = rule.getStrategy();5   String origin = context.getOrigin();67   // 判断调用来源,这种情况下origin不能为default或other8   if (limitApp.equals(origin) && filterOrigin(origin)) {9       // 如果调用关系策略为STRATEGY_DIRECT,表示仅判断自己,则返回origin statistic node.
10       if (strategy == RuleConstant.STRATEGY_DIRECT) {
11           // Matches limit origin, return origin statistic node.
12           return context.getOriginNode();
13      }
14
15       // 采用调用来源进行判断的策略
16       return selectReferenceNode(rule, context, node);
17  } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { // 如果调用来源为default默认的
18       if (strategy == RuleConstant.STRATEGY_DIRECT) { // 如果调用关系策略为STRATEGY_DIRECT,则返回clusterNode
19           // Return the cluster node.
20           return node.getClusterNode();
21      }
22
23       return selectReferenceNode(rule, context, node);
24  } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
25       && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { // 如果调用来源为other,且调用来源不在限制规则内,为其他来源
26       if (strategy == RuleConstant.STRATEGY_DIRECT) {
27           return context.getOriginNode();
28      }
29       return selectReferenceNode(rule, context, node);
30  }
31   return null;
32}
5、流量整形控制器

rule.getRater()方法会返回一个控制器,接口为TrafficShapingController,该接口的实现类图如下:

从类图可以看出,是很明显的策略模式,分别针对不同的限流控制策略。


1、默认策略

DefaultController该策略是sentinel的默认策略,如果请求超出阈值,则直接拒绝请求。

 1@Override2public boolean canPass(Node node, int acquireCount, boolean prioritized) {3   // 当前已经统计的数4   int curCount = avgUsedTokens(node);5   if (curCount + acquireCount > count) {6       // 如果是高优先级的,且是基于qps的限流方式,则可以尝试从下个未来的滑动窗口中预支7       if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {8           long currentTime;9           long waitInMs;
10           currentTime = TimeUtil.currentTimeMillis();
11           // 从下个滑动窗口中提前透支
12           waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
13           if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
14               node.addWaitingRequest(currentTime + waitInMs, acquireCount);
15               node.addOccupiedPass(acquireCount);
16               sleep(waitInMs);
17
18               // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
19               throw new PriorityWaitException(waitInMs);
20          }
21      }
22       return false;
23  }
24   return true;
25}
26
27private int avgUsedTokens(Node node) {
28   if (node == null) {
29       return DEFAULT_AVG_USED_TOKENS;
30  }
31   // 如果当前是线程数限流,则返回node.curThreadNum()当前线程数
32   // 如果是QPS限流,则返回node.passQps()当前已经通过的qps数据
33   return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
34}
35
36private void sleep(long timeMillis) {
37   try {
38       Thread.sleep(timeMillis);
39  } catch (InterruptedException e) {
40       // Ignore.
41  }
42}
2、匀速排队策略

RateLimiterController

 1@Override2public boolean canPass(Node node, int acquireCount, boolean prioritized) {3   // Pass when acquire count is less or equal than 0.4   if (acquireCount <= 0) {5       return true;6  }7   // Reject when count is less or equal than 0.8   // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.9   if (count <= 0) {
10       return false;
11  }
12
13   long currentTime = TimeUtil.currentTimeMillis();
14   // Calculate the interval between every two requests.
15   // 计算两个请求之间的时间间隔
16   long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
17
18   // Expected pass time of this request. 该请求的预计通过时间 = 上一次通过的时间 + 时间间隔
19   long expectedTime = costTime + latestPassedTime.get();
20
21   // 如果预计时间比当前时间小,表示可以请求完全可以通过
22   if (expectedTime <= currentTime) {
23       // Contention may exist here, but it's okay.
24       // 这里可能存在竞争,但是不影响。
25       latestPassedTime.set(currentTime);
26       return true;
27  } else {
28       // Calculate the time to wait.
29       // 计算等待时间
30       long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
31       // 如果等待时间超出了等待队列的最大时间,则无法放入等待队列,直接拒绝
32       if (waitTime > maxQueueingTimeMs) {
33           return false;
34      } else {
35           long oldTime = latestPassedTime.addAndGet(costTime);
36           try {
37               // 重新计算等待时间
38               waitTime = oldTime - TimeUtil.currentTimeMillis();
39               // 判断等待时间是否超过等待队列的最大时间,如果超过了,拒绝,并且将latestPassedTime最后一次请求时间重新设置为原值
40               if (waitTime > maxQueueingTimeMs) {
41                   latestPassedTime.addAndGet(-costTime);
42                   return false;
43              }
44               // in race condition waitTime may <= 0
45               // 线程等待
46               if (waitTime > 0) {
47                   Thread.sleep(waitTime);
48              }
49               return true;
50          } catch (InterruptedException e) {
51          }
52      }
53  }
54   return false;
55}

从代码可以看出,匀速排队策略是使用了虚拟队列的方法,通过控制阈值来计算出请求的时间间隔,然后将上一次请求的时间加上时间间隔,表示下一次请求的时间,如果当前时间比这个值大,说明已经超出时间间隔了,当然可以请求,反之,表示需要等待,那么等待的时长就应该是要等到当前时间达到预期时间才能请求,这里就有个虚拟的等待队列,而等待其实是通过线程的等待来实现的。而这里所说的虚拟队列实际上是由一系列的处于sleep状态的线程组成的,但是实际的数据结构上并没有构成队列。

3、预热/冷启动策略

WarmUpController

首先看WarmUpController的属性和构造方法:

 1// 阈值2protected double count;3/**4* 冷启动的因子 ,默认为3 {@link SentinelConfig#coldFactor()}5*/6private int coldFactor;7// 转折点的令牌数8protected int warningToken = 0;9// 最大令牌数
10private int maxToken;
11// 折线初始斜率,标志流量的变化程度
12protected double slope;
13
14// 累积的令牌数 ,累积的令牌数越多,说明系统利用率越低,说明当前流量低,是冷状态
15protected AtomicLong storedTokens = new AtomicLong(0);
16// 最后更新令牌的时间
17protected AtomicLong lastFilledTime = new AtomicLong(0);
18
19public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
20   construct(count, warmUpPeriodInSec, coldFactor);
21}
22
23public WarmUpController(double count, int warmUpPeriodInSec) {
24   construct(count, warmUpPeriodInSec, 3);
25}
26
27/**
28* @param count             用户设定的阈值(这里假设设定为100)
29* @param warmUpPeriodInSec 默认为10
30* @param coldFactor       默认为3
31*/
32private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
33
34   if (coldFactor <= 1) {
35       throw new IllegalArgumentException("Cold factor should be larger than 1");
36  }
37
38   this.count = count;
39
40   this.coldFactor = coldFactor;
41
42   // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
43   // warningToken = 100;
44
45   // 按默认的warmUpPeriodInSec = 10,表示1秒钟10个请求,则每个请求为间隔stableInterval = 100ms,那么coldInterval=stableInterval * coldFactor = 100 * 3 = 300ms
46   // warningToken = 10 * 100 / (3 - 1) = 500
47   // thresholdPermits = warningToken = 0.5 * warmupPeriod / stableInterval = 0.5 * warmupPeriod / 100ms = 500 ==>> warmupPeriod = 100000ms
48   warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
49
50   // / maxPermits = thresholdPermits + 2 * warmupPeriod /
51   // (stableInterval + coldInterval)
52   // maxToken = 200
53
54   // maxPermits = 500 + 2 * 100000ms / (100ms + 300ms) = 1000
55   // maxToken = 500 + (2 * 10 * 100 / (1.0 + 3)) = 1000
56   // maxPermits = maxToken
57   maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
58
59   // slope
60   // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
61   // - thresholdPermits);
62
63   // slope = (3 - 1.0) / 100 / (600 - 500) = 0.0002
64   slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
65
66}

属性说明:

  • count: 用户设定的qps阈值。

  • coldFactor: 冷启动的因子,初始默认为3,通过SentinelConfig类的coldFactor()方法获取,这里会有个判断,如果启动因子小于等于1,则会设置为默认值3,因为如果是小于等于1,是没有意义的,就不是预热启动了。

  • warningToken:转折点的令牌数,当令牌数开始小于该值得时候,就要开启预热了。

  • maxToken:最大令牌数。

  • slope:折线的斜率。

  • storedTokens:当前存储的令牌数。

  • lastFilledTime:上一次更新令牌的时间。

总体思路:当系统存储的令牌为最大值时,说明系统访问流量较低,处于冷状态,这时候当有正常请求过来时,会让请求通过,并且会补充消耗的令牌数。当瞬时流量来临时,一旦剩余的令牌数小于警戒令牌数(restToken <= warningToken),则表示有大流量过来,需要开启预热过程,开始逐渐增大允许的qps。当qps达到用户设定的阈值后,系统已经预热完毕,这时候就进入了正常的请求阶段。

源码分析如下:

 1@Override2public boolean canPass(Node node, int acquireCount, boolean prioritized) {3   // 当前已经通过的qps4   long passQps = (long) node.passQps();56   // 上一个滑动窗口的qps7   long previousQps = (long) node.previousPassQps();8   // 同步令牌,如果是出于冷启动或预热完毕状态,则考虑要添加令牌9   syncToken(previousQps);
10
11   // 开始计算它的斜率
12   // 如果进入了警戒线,开始调整他的qps
13   long restToken = storedTokens.get();
14   if (restToken >= warningToken) { // 说明一瞬间有大流量过来,消耗了大量的存储令牌,造成剩余令牌数第一警戒值,则要开启预热默认,逐渐增加qps
15       // 计算当前离警戒线的距离
16       long aboveToken = restToken - warningToken;
17       // 消耗的速度要比warning快,但是要比慢
18       // current interval = restToken*slope+1/count
19       // restToken越小,interval就越小,表示系统越热
20       // 随着aboveToken的减小,warningQps会逐渐增大
21       double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
22       if (passQps + acquireCount <= warningQps) { // 随着warningQps的增大,acquireCount = 1,那么passQps允许的范围就变大,相应的流量就越大,系统越热
23           return true;
24      }
25  } else {
26       if (passQps + acquireCount <= count) {
27           return true;
28      }
29  }
30
31   return false;
32}
33
34/**
35* 同步令牌
36* @param passQps
37*/
38protected void syncToken(long passQps) {
39   long currentTime = TimeUtil.currentTimeMillis();
40   // 把当前时间的后三位置为0 e.g. 1601456312835 = 1601456312835 - 1601456312835 % 1000 = 1601456312000
41   currentTime = currentTime - currentTime % 1000;
42   // 获取上一次更新令牌的时间
43   long oldLastFillTime = lastFilledTime.get();
44   if (currentTime <= oldLastFillTime) {
45       return;
46  }
47
48   // 获得目前的令牌数
49   long oldValue = storedTokens.get();
50   // 获取新的令牌数
51   long newValue = coolDownTokens(currentTime, passQps);
52
53   // 更新累积令牌数
54   if (storedTokens.compareAndSet(oldValue, newValue)) {
55       // 去除上一次的qps,设置剩下的令牌数
56       long currentValue = storedTokens.addAndGet(0 - passQps);
57       if (currentValue < 0) {
58           // 如果剩下的令牌数小于0,则置为0。
59           storedTokens.set(0L);
60      }
61       // 设置令牌更新时间
62       lastFilledTime.set(currentTime);
63  }
64}
65
66private long coolDownTokens(long currentTime, long passQps) {
67   // 当前拥有的令牌数
68   long oldValue = storedTokens.get();
69   long newValue = oldValue;
70
71   // 添加令牌的判断前提条件:
72   // 当令牌的消耗程度远远低于警戒线的时候
73   if (oldValue < warningToken) { // 这种情况表示已经预热结束,可以开始生成令牌了
74       // 这里按照count = 100来计算的话,表示旧值oldValue + 距离上次更新的秒数时间差 * count ,表示每秒增加count个令牌
75       // 这里的currentTime 和 lastFilledTime.get() 都是已经去掉毫秒数的
76       newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
77  } else if (oldValue > warningToken) { // 进入这里表示当前是冷状态或正处于预热状态
78       if (passQps < (int)count / coldFactor) { // 如果是冷状态,则补充令牌数,避免令牌数为0
79           newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
80      }
81       // 预热阶段则不添加令牌数,从而限制流量的急剧攀升
82  }
83   // 限制令牌数不能超过最大令牌数maxToken
84   return Math.min(newValue, maxToken);
85}
4、预热的匀速排队策略

WarmUpRateLimiterController

这种是匀速排队模式和预热模式的结合,这里不深入了。搞懂了上面两种,再看这种也比较清晰了。

5、DegradeSlot

官方文档说明:

这个 slot 主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。

源码解析:

 1@Override2public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,3                 boolean prioritized, Object... args) throws Throwable {4   //降级判断5   performChecking(context, resourceWrapper);67   // 如果有自定义的slot,还会继续进行8   fireEntry(context, resourceWrapper, node, count, prioritized, args);9}
10
11void performChecking(Context context, ResourceWrapper r) throws BlockException {
12   // 使用DegradeRuleManager获得当前资源的熔断器
13   List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
14   if (circuitBreakers == null || circuitBreakers.isEmpty()) {
15       return;
16  }
17   // 遍历熔断器,只要有任何一个满足熔断条件,就抛出DegradeException异常。
18   for (CircuitBreaker cb : circuitBreakers) {
19       if (!cb.tryPass(context)) {
20           throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
21      }
22  }
23}

这里有个关键类,DegradeRuleManager,该类中会保存所有的熔断规则,使用Map<String, List>的格式进行保存。当需要使用的时候,就直接根据资源名称,从该map中获取对应的熔断器列表。

那么规则是如何加载的呢?我们看到DegradeRuleManager这个类,在加载时候,有个静态代码块:

1private static final RulePropertyListener LISTENER = new RulePropertyListener();
2private static SentinelProperty<List<DegradeRule>> currentProperty
3   = new DynamicSentinelProperty<>();
4
5static {
6   currentProperty.addListener(LISTENER);
7}

currentProperty.addListener(LISTENER);继续分析该段代码,找到DynamicSentinelProperty的addListener(…)方法:

1@Override
2public void addListener(PropertyListener<T> listener) {
3   listeners.add(listener);
4   listener.configLoad(value);
5}
612345

发现会调用监听器的configLoad(…)方法,最终会调用RulePropertyListener这个类的reloadFrom(…)方法。具体怎么解析的其实就是将规则根据资源名称进行归类,并保存为map格式。

FlowSlot 限流规则引擎之限流算法原理

1、滑动窗口实现原理

  • 每个时间窗口最大流量为100QPS;

  • 20和80表示当时的真实QPS数量;

  • 一个时间窗口分为两个半限,上半限和下半限;

  • 如果时间窗口1的下半限和时间窗口2的上半限的峰值超过100QPS,那么就丢失一部分流量。

但是这样并不是我们想要的,那么我们来看看计数器滑动窗口。

2、计数器滑动窗口原理

  • 在滑动窗口算法上优化;

  • 相邻的两个半限总和>总阈值,才丢弃流量。

3、令牌桶算法

  • 令牌漏斗桶存着所有的Token;

  • 按期发放Token;

  • 如果桶满了,就会熔断;

  • 达到Token的Request可以获取资源;

  • 得不到的就抛弃。

图文总结 

1、整体流程

(1)请求发送到web容器;

(2)Sentinel Aop拦截所有Sentinel Resouce;

(3)如果资源的规则通过则执行正常流程;

(4)不通过则返回流控异常提示。

2、Sentinel AOP切面运行流程

更多阅读推荐

  • 都在说云原生,它的技术图谱你真的了解吗?

  • SRE 是如何保障稳定性的

  • 如何写出让 CPU 跑得更快的代码?

  • Serverless 在 SaaS 领域的最佳实践

  • 云原生人物志 | Pulsar翟佳:社区的信任最重要

  • 一目了然的 Docker 环境配置指南

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/515526.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WORDPRESS付费会员插件Paid Memberships Pro v2.12.5 – Plugin + All Addons

WORDPRESS付费会员插件Paid Memberships Pro v2.12.5 – Plugin All Addons 简介&#xff1a; Paid Memberships Pro是一款功能强大的会员订阅和内容限制管理插件&#xff0c;适用于WordPress网站。它提供了丰富的特性和工具&#xff0c;帮助网站所有者轻松地创建和管理付费…

云计算与星辰大海的结合

云栖号资讯&#xff1a;【点击查看更多行业资讯】 在这里您可以找到不同行业的第一手的上云资讯&#xff0c;还在等什么&#xff0c;快来&#xff01; 今年在疫情的影响下&#xff0c;各国的经济发展都遇到了一些困难&#xff0c;甚至除中国以外的主要经济体都会进入了负增长的…

uniapp 打包安卓 Android 抖音app 后端篇~02

文章目录1. 中间件配置2. 云短信配置1. 中间件配置 2. 云短信配置

从开源自治到微服务云化,用这剂良药提升微服务幸福感

前言 微服务发展至今&#xff0c;因其高内聚、低耦合等特性&#xff0c;以及诸多开源方案带来的开放性&#xff0c;已成为提升架构效率的最佳实践之一。当一项技术或一个框架成为事实标准之后&#xff0c;我们会把更多的注意力聚焦在运维效率和应用可用性的持续提升上。相信下…

uniapp 打包安卓 Android 抖音app 前后端调试篇~03

文章目录1. 未登录首页浏览短视频2. 发布视频-云短信登录3. 发布选择视频4. 上传短视频到云存储5. 测试发布视频6. 个人中心查看发布视频7. 首页查看刚发布视频8. 个人中心1. 未登录首页浏览短视频 在未登录的情况下&#xff0c;首页可以看短视频 2. 发布视频-云短信登录 点…

俯瞰云原生,这便是供应层

来源 | K8sMeetup社区作者 | Catherine Paganini&#xff0c;Jason Morgan头图 | 下载于视觉中国在都在说云原生&#xff0c;它的技术图谱你真的了解吗&#xff1f;中&#xff0c;我们对 CNCF 的云原生技术生态做了整体的介绍。从本篇开始&#xff0c;将详细介绍云原生全景图的…

进击的Kubernetes调度系统(一):SchedulingFramework

作者&#xff1a;王庆璨 张凯 前言 Kubernetes已经成为目前事实标准上的容器集群管理平台。它为容器化应用提供了自动化部署、运维、资源调度等全生命周期管理功能。经过3年多的快速发展&#xff0c;Kubernetes在稳定性、扩展性和规模化方面都有了长足进步。 尤其是Kubernete…

HTTP系列学习(笔记三):HTTP的发展历程思维导图

HTTP&#xff08;HyperText Transfer Protocol&#xff09;是万维网&#xff08;World Wide Web&#xff09;的基础协议。 0.9版本&#xff1a; 1.0版本&#xff1a; 1.1版本&#xff1a; 2.0版本&#xff1a; 2.0进化版本&#xff1a; 为了便于浏览记忆&#xff0c;整理了一份…

为什么说Serverless是云的未来?

作者 | 不瞋 阿里云高级技术专家 每隔几年&#xff0c;IT 界就会出现新突破性的进展。回望整个计算机技术发展史&#xff0c;我们会发现“抽象、解耦、集成”的主题贯穿其中。产业每一次的抽象、解耦、集成&#xff0c;都将创新推向新的高度&#xff0c;也催生出庞大的市场和…

(企业级)HBuilder X 安装蓝叠安卓模拟器

文章目录1. 下载蓝叠模拟器2. 设置 adb链接和root3. 设置竖屏4. 设置uni-app adb 环境变量5. 配置 HBuilderX adb5. 运行6.效果图7.常见模拟器1. 下载蓝叠模拟器 https://www.bluestacks.cn/ 2. 设置 adb链接和root 3. 设置竖屏 4. 设置uni-app adb 环境变量 在 HBuilderX…

小困惑,关于 Serverless 函数计算的字体安装

来源 | Serverless作者 | 孙飞宇头图 | 下载于视觉中国前言首先介绍下在本文出现的几个比较重要的概念&#xff1a;函数计算&#xff08;Function Compute&#xff09;&#xff1a;函数计算是一个事件驱动的服务&#xff0c;通过函数计算&#xff0c;用户无需管理服务器等运行情…

一文带你了解MySQL中的各种锁机制!

云栖号资讯&#xff1a;【点击查看更多行业资讯】 在这里您可以找到不同行业的第一手的上云资讯&#xff0c;还在等什么&#xff0c;快来&#xff01; MySQL中的锁机制,按粒度分为行级锁,页级锁,表级锁&#xff0c;其中按用法还分为共享锁和排他锁. 行级锁 行级锁是Mysql中锁…

for循环中let,var 的经典面试题:for循环中 console.log(i)详解

同学们在刚准备面试时肯定见过一道经典面试题&#xff1a; for(var i 0; i < 10; i) {setTimeOut(function(){console.log(i)}) } // 输出 10 10 10 10 10 10 10 10 10 10for(let i 0; i < 10; i) {setTimeOut(function(){console.log(i)}) } // 输出 0 1 2 3 4 5 6 7…

后疫情时代,银行从数字化转型到智能化“迁徙”

云栖号资讯&#xff1a;【点击查看更多行业资讯】 在这里您可以找到不同行业的第一手的上云资讯&#xff0c;还在等什么&#xff0c;快来&#xff01; 全球数据智能趋势一览 笔者在搜索了众多机构发表的数据智能发展趋势报告&#xff0c;并做了筛选和甄别后&#xff0c;参考了公…

普通二本学校软件工程专业本科毕业的女生,没有考研,选择直接就业现如今过得怎样呐?

第一篇程序人生 在进入大学之前买的联想笔记本电脑被我之前放在窗户边&#xff0c;一个月之前去上班的时候忘记关窗户&#xff0c;下大雨给淋雨进水了&#xff0c;刚好开机密码的几个键盘失灵了&#xff0c;上周末在网上买了一个键盘&#xff0c;终于可以开机了&#xff0c;为…

阿里云交通数据中台解决方案打造“数字化生产力”

数字经济时代&#xff0c;计算、分析、处理等作为“关键生产要素”已成为行业和社会的共识。但是对于交通领域而言&#xff0c;以往端到端的方式进行平台搭建和应用开发已不能适应数字爆炸和产品快速迭代的要求。交通行业在计算分析方面面临着信息采集难、样式杂、变化快、价值…

一次讲清楚,七种分布式事务的解决方案

来源 | moon聊技术责编 | 寇雪芹头图 | 下载于视觉中国什么是分布式事务分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器「分别位于不同的分布式系统的不同节点之上」。一个大的操作由N多的小的操作共同完成。而这些小的操作又分布在不同的服务上。针…

SpringCloud应用在Kubernetes上的最佳实践—开发篇

作者 | 孤弋 阿里云高级技术专家&#xff0c;负责 EDAS 的开发和用户体验优化工作。 前言 近年来&#xff0c;云原生、Kubernetes、微服务、SpringCloud 这些名词在技术圈内不绝于耳&#xff0c;数据显示&#xff0c;使用 SpringCloud 作为微服务的框架&#xff0c;同时选择…

支持批任务的Coscheduling/Gang scheduling

作者&#xff1a;王庆璨 张凯 进击的Kubernetes调度系统&#xff08;一&#xff09;&#xff1a;Scheduling Framework 进击的Kubernetes调度系统&#xff08;二&#xff09;&#xff1a;支持批任务的Coscheduling/Gang scheduling 前言 首先我们来了解一下什么是Coscheduli…

ESLint is disabled since its execution has not been approved or denied yet

我的vs code有安装eslint插件&#xff0c;但是不这道为什么这两天很多代码校验都不起作用了 一顿操作猛如虎&#xff0c;最后发现代码开始的时候有一条黄线 爆出了一个错误 ESLint is disabled since its execution has not been approved or denied yet. Use the light bulb…