资源数据采集
之前的NodeSelectorSlot
和ClusterBuilderSlot
已经完成了对资源调用树的构建, 现在则是要对资源进行收集, 核心点就是这些资源数据是如何统计
LogSlot
作用: 记录异常请求日志, 用于故障排查
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)throws Throwable {try {// 啥也没干, 直接调用下一个SlotfireEntry(context, resourceWrapper, obj, count, prioritized, args);} catch (BlockException e) {// 被流控或者熔断降级后直接打印logEagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),context.getOrigin(), e.getRule().getId(), count);throw e;} catch (Throwable e) {RecordLog.warn("Unexpected entry exception", e);}}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {try {// 啥也没干,直接调用下一个 SlotfireExit(context, resourceWrapper, count, args);} catch (Throwable e) {RecordLog.warn("Unexpected entry exit exception", e);}}
}
LogSlot只做了一件事, 当出现BlockException
异常时, 记录log日志(EagleEyeLogUtil.log
会将日志写到 sentinel-block.log
文件中)
StatisticSlot
初始StatisticSlot
如果要设计一个 StatisticSlot,首先需要明确其需要实现的功能,即收集各种指标数据,如请求总数、请求成功数、请求失败数、响应时间等。
目前先把核心结构先列出来, 后续填充其他功能
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {// 调用责任链下一个 SlotfireEntry(context, resourceWrapper, node, count, prioritized, args);} catch (Throwable e) {throw e;}}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {DefaultNode node = (DefaultNode)context.getCurNode();// 调用责任链下一个 SlotfireExit(context, resourceWrapper, count);}
}
错误信息和异常数统计
fireEntry()
调用的是真正验证用于的Slot, 比如FlowSlot, DegradeSlot等, 如果后续验证不通过的话, 那么会抛出BlockException
, 那么此时就可以使用try-catch捕获, 捕获后记录异常错误信息以及异常数
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {// 调用下一个Slot, 如果验证不通过, 那就捕获异常fireEntry(context, resourceWrapper, node, count, prioritized, args);} catch (BlockException e) {// 捕获 BlockExceptionthrow e;} catch (Throwable e) {// .....throw e;}
}
QPS和线程数统计
QPS和线程数的统计应该在什么时候统计?
可以fireEntry()之后
进行统计, 调用fireEntry()
- 如果没有报
BlockException
, 则表示没有被流控
或熔断降级
- 将当前资源占用的线程数 + 1以及当前请求QPS + 1
- 如果报了
BlockException
, 则表示被拦截了, 即请求失败- 将请求拒绝的QPS + 1
对于总的QPS则可以通过公式计算 总QPS = 成功QPS + 失败QPS
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {// 规则验证fireEntry(context, resourceWrapper, node, count, prioritized, args);// 如果能走到这里,则将当前资源占用的线程数 + 1 以及当前资源请求成功的 QPS 数 + 1node.increaseThreadNum();node.addPassRequest();} catch (BlockException e) { // 捕获 BlockException// 如果规则验证失败,则将 BlockQps + 1node.increaseBlockQps();throw e;} catch (Throwable e) {// .....throw e;}
}
响应时间统计
entry()是入口方法
,相当于 AOP的before() 方法,那我们肯定会对应一个after() 方法,exit()是出口方法
, 也就说可以在exit()中记录响应时间
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {Node node = context.getCurNode();if (context.getCurEntry().getBlockError() == null) {// 获取系统当前时间long completeStatTime = TimeUtil.currentTimeMillis();context.getCurEntry().setCompleteTimestamp(completeStatTime);// 得到响应时间,这个时间是哪里来的呢?是我们最初最开始为资源创建Entry对象时记录的。long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();// 记录响应时间等信息recordCompleteFor(node, count, rt, error);}fireExit(context, resourceWrapper, count, args);
}
结束时间是在 StatisticsSlot 里的exit方法记录的,那开始时间是在哪记录的呢?在entry方法里记录可以吗?显然不妥,因为StatisticsSlot不是第一个Slot,不能作为请求的起始时间
,起始时间应该放到初始化Entry
资源管理对象,也就是只要资源诞生就意味着此次请求开始了,而且我们在设计 Entry 类的时候也将开始时间和结束时间两个字段设计进去了,因此我们开始时间我们可以直接通过 context.getCurEntry().getCreateTimestamp()
获取
流程图如下
DefaltNode, EntranceNode和ClusterNode的指标如何统计
- DefaltNode:用于统计某个 Context 下某个资源的指标信息,维度是 Context + 资源
- EntranceNode:用于统计某个 Context 下全部资源的指标信息,维度是 Context
- ClusterNode:用于统计某个资源在全部 Context 下的指标信息,维度是资源,与 Context 无关
收集指标信息也就是每次请求就记录一下, 问题就是在哪里出发记录的动作?
即下述三个问题
- 如何统计某个资源在某个Context下的指标?
- 如何统计某个Context下所有资源的指标?
- 如何统计某个资源在全部Context中的指标?
如何统计某个资源在某个Context下的指标?
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {fireEntry(context, resourceWrapper, node, count, prioritized, args);// 数据统计node.increaseThreadNum();node.addPassRequest(count);}
}
可以发现 increaseThreadNum()
和 addPassRequest()
方法都是node调用的,那node是什么呢?
node是DefaultNode类型
的方法参数,我们还知道 entry()
方法是通过上一个责任链:ClusterSlot调用的,也就是说node这个参数是前面Slot传过来的,其实,我们回溯回去,会发现这个node就是DefaultNode本身,并不是它的子类EntranceNode。因此,我们得出一个结论:StatisticSlot直接调用DefaultNode里的方法进行指标收集,我们又知道DefaultNode的维度是Context + 资源
public class DefaultNode extends StatisticNode {// 和资源绑定private ResourceWrapper id;private ClusterNode clusterNode;// 增加线程数@Overridepublic void increaseThreadNum() {super.increaseThreadNum();this.clusterNode.increaseThreadNum();}// 增加请求成功数@Overridepublic void addPassRequest(int count) {super.addPassRequest(count);this.clusterNode.addPassRequest(count);}
}
DefaultNode核心源码
public class DefaultNode extends StatisticNode {// 和资源绑定private ResourceWrapper id;private ClusterNode clusterNode;// 增加线程数@Overridepublic void increaseThreadNum() {super.increaseThreadNum();this.clusterNode.increaseThreadNum();}// 增加请求成功数@Overridepublic void addPassRequest(int count) {super.addPassRequest(count);this.clusterNode.addPassRequest(count);}
}
DefaultNode 的维度是 Context + 资源,DefaultNode源码里只看到了资源 ResourceWrapper,没有看到Context呢?在NodeSelectorSlot的entry()
方法里我们会初始化DefaultNode 且与Context进行绑定(Key-Value形式),核心代码
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {// Context#name与DefaultNode 进行绑定private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);public void entry(...) {DefaultNode node = new DefaultNode(resourceWrapper, null);map.put(context.getName(), node);}
}
要想获取某个资源在某 Context 下的指标时
- 从map中获取DefaultNode
- 从DefaultNode获取资源Id
流程图如下
如何统计某个Context下所有资源的指标?
也就是不细分资源,直接统计Context
如何找到当前Context下的全部资源呢?
- 一个资源肯定对应一个DefaultNode
- EntranceNode相当于树干,它有很多树枝 DefaultNode 挂到其下面
public class EntranceNode extends DefaultNode {// 树枝private volatile Set<Node> childList = new HashSet<>();
}
有了这个 childList 事情就变得简单了,直接 for 循环遍历即可,获取到的是每个 DefaultNode,然后调用每个 DefaultNode 的统计方法进行求和即可,如下所示:
public class EntranceNode extends DefaultNode {@Overridepublic int curThreadNum() {int r = 0;// 遍历 DefaultNode 子集for (Node node : getChildList()) {// += 操作求和r += node.curThreadNum();}return r;}@Overridepublic double passQps() {double r = 0;for (Node node : getChildList()) {r += node.passQps();}return r;}
}
如何统计某个资源在全部Context中的指标?
我们知道 ClusterNode 是在 DefaultNode 下的,一个资源至少对应一个 DefaultNode 以及会对应唯一一个 ClusterNode (因为 ClusterNode 的维度是资源,所以不管资源在哪几个 Context 下,都只会对应唯一一个 ClusterNode)
上边的DefaultNode 的时候不管是 increaseThreadNum()
还是 addPassRequest()
都会调用一个方法叫:this.clusterNode.increaseXxx()
,其实这就是用于统计某个资源在所有 Context 下的指标信息的
public void increaseThreadNum() {super.increaseThreadNum();// clusterNode.xxxthis.clusterNode.increaseThreadNum();
}
public void addPassRequest(int count) {super.addPassRequest(count);// clusterNode.xxxthis.clusterNode.addPassRequest(count);
}
总结
StatisticSlot只负责指标统计, 调用相关的统计方法进行实现, Sentinel底层采用滑动窗口, 令牌桶, 漏桶三个算法
参考资料
通关 Sentinel 流量治理框架 - 编程界的小學生