系统负载自适应流控
规则配置
规则创建
public class SystemRule extends AbstractRule {private double highestSystemLoad = -1;private double highestCpuUsage = -1;private double qps = -1;private long avgRt = -1;private long maxThread = -1;
}
SystemRule
类包含了以下几个指标。
highestSystemLoad
:对应 Dashboard 上的 LOAD 菜单,代表系统最高负载值,默认为 -1,只有大于等于 0.0 才生效。avgRt
:对应 Dashboard 上的 RT菜单,代表系统平均响应时间,默认为 -1,只有大于0才生效。maxThread
:对应 Dashboard 上的线程数菜单,代表系统允许的最大线程数,默认为 -1,只有大于 0 才生效。qps
:对应 Dashboard 上的入口 QPS 菜单,代表限流的阈值,默认为 -1,只有大于 0 才生效。highestCpuUsage
:对应 Dashboard 上的 CPU 使用率菜单,代表最高CPU 使用率,取值是 [0,1] 之间,默认为 -1,只有大于等于0.0才生效
监听器实例化和管理
这部分和之前的黑白名单差不多
系统负载自适应规则的核心类是 SystemRuleManager
,它负责管理系统负载自适应规则的加载、更新和监听。当系统负载自适应规则发生变化时,SystemRuleManager
通过观察者模式通知相应的 RulePropertyListener
进行更新
创建监听器的代码位置
public final class SystemRuleManager {// 省略其它代码...private static AtomicBoolean checkSystemStatus = new AtomicBoolean(false);private static SystemStatusListener statusListener = null;private final static SystemPropertyListener listener = new SystemPropertyListener();private static SentinelProperty<List<SystemRule>> currentProperty = new DynamicSentinelProperty<List<SystemRule>>();// 创建单核线程池private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,new NamedThreadFactory("sentinel-system-status-record-task", true));static {checkSystemStatus.set(false);// 初始化系统状态监听器statusListener = new SystemStatusListener();// 任务调度, 一秒执行一次statusListener的任务, 即监听系统的负载状态scheduler.scheduleAtFixedRate(statusListener, 0, 1, TimeUnit.SECONDS);// 初始化SystemRule监听器currentProperty.addListener(listener);}// 省略其它代码...
}
规则初始化
当调用SystemRuleManager
的loadRules()
时
public static void loadRules(List<SystemRule> rules) {currentProperty.updateValue(rules);
}@Override
public boolean updateValue(T newValue) {if (isEqual(value, newValue)) {return false;}RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue);// 注意看这里, 和之前的黑白名单规则一样, 也是初始化了value = newValue;for (PropertyListener<T> listener : listeners) {// 遍历通知监听器listener.configUpdate(newValue);}return true;
}@Override
public synchronized void configUpdate(List<SystemRule> rules) {// 为了恢复这些系统设置到初始状态,以便重新进行监控和设置restoreSetting();// systemRules = rules;// 如果配置SystemRule, 那么遍历规则, 并加载规则if (rules != null && rules.size() >= 1) {for (SystemRule rule : rules) {// 加载系统配置,根据传入的SystemRule对象中的参数设置系统最高负载、CPU使用率、平均响应时间、最大线程数和QPSloadSystemConf(rule);}} else { // 如果没有配置SystemRule, 那么关闭系统自适应检查checkSystemStatus.set(false);}// 省略其它代码...
}
核心loadSystemConf()
此方法会判断是否配置了 LOAD、RT、THREAD、QPS、CPU,如果配置这些规则中的某一个,那么就将 checkSystemStatus
置为 true,也就是打开系统自适应功能
也就是说, 系统自适应功能是否开启就看这个方法
public static void loadSystemConf(SystemRule rule) {boolean checkStatus = false;// Check if it's valid.// highestSystemLoad参数大于等于0且小于当前最高系统负载,则更新最高系统负载,并标记为已设置if (rule.getHighestSystemLoad() >= 0) {highestSystemLoad = Math.min(highestSystemLoad, rule.getHighestSystemLoad());highestSystemLoadIsSet = true;checkStatus = true;}// 如果highestCpuUsage参数大于0且小于等于1,则更新CPU使用率的最高限制,并标记为已设置,如果大于1则记录警告日志if (rule.getHighestCpuUsage() >= 0) {if (rule.getHighestCpuUsage() > 1) {RecordLog.warn(String.format("[SystemRuleManager] Ignoring invalid SystemRule: "+ "highestCpuUsage %.3f > 1", rule.getHighestCpuUsage()));} else {highestCpuUsage = Math.min(highestCpuUsage, rule.getHighestCpuUsage());highestCpuUsageIsSet = true;checkStatus = true;}}// 如果果avgRt参数大于等于0,则更新平均响应时间的最高限制,并标记为已设置if (rule.getAvgRt() >= 0) {maxRt = Math.min(maxRt, rule.getAvgRt());maxRtIsSet = true;checkStatus = true;}// 如果maxThread参数大于等于0,则更新最大线程数的最高限制,并标记为已设置if (rule.getMaxThread() >= 0) {maxThread = Math.min(maxThread, rule.getMaxThread());maxThreadIsSet = true;checkStatus = true;}// 如果qps参数大于等于0,则更新QPS的最高限制,并标记为已设置if (rule.getQps() >= 0) {qps = Math.min(qps, rule.getQps());qpsIsSet = true;checkStatus = true;}// 根据上述值决定是否开启系统自适应检查checkSystemStatus.set(checkStatus);}
流程图如下
规则验证
SystemSlot是第六个
执行的slot
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {// 检查系统规则SystemRuleManager.checkSystem(resourceWrapper, count);// 如果检查通过,继续执行后续的处理链fireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}
}
核心方法就是checkSystem()
public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {// 参数验证,资源为空直接放行if (resourceWrapper == null) {return;}// 判断系统自适应功能是否开启,如果没开启则直接放行。if (!checkSystemStatus.get()) {return;}// 判断资源的流量是否为入口流量,如果不是IN,则直接放行,也就是说Sentinel系统自适应限流只对入口流量生效,如果类型为OUT则直接放行if (resourceWrapper.getEntryType() != EntryType.IN) {return;}// 获取当前qps,如果当前qps大于SystemRule规则配置的阈值,则直接抛BlockException异常double currentQps = Constants.ENTRY_NODE.passQps();if (currentQps + count > qps) {throw new SystemBlockException(resourceWrapper.getName(), "qps");}// 获取当前线程,如果当前线程大于SystemRule规则配置的阈值,则直接抛BlockException 异常int currentThread = Constants.ENTRY_NODE.curThreadNum();if (currentThread > maxThread) {throw new SystemBlockException(resourceWrapper.getName(), "thread");}// 获取当前平均响应时间指标,如果当前平均响应时间大于SystemRule规则配置的阈值,则直接抛BlockException异常double rt = Constants.ENTRY_NODE.avgRt();if (rt > maxRt) {throw new SystemBlockException(resourceWrapper.getName(), "rt");}// 如果当前系统负载大于规则配置的系统负载,则采取bbr算法验证if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {// bbr算法if (!checkBbr(currentThread)) {throw new SystemBlockException(resourceWrapper.getName(), "load");}}// 判断当前CPU使用率是否大于SystemRule规则配置的阈值,如果大于,则抛出BlockException异常if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {throw new SystemBlockException(resourceWrapper.getName(), "cpu");}
}// 使用BBR对负载进行校验
private static boolean checkBbr(int currentThread) {if (currentThread > 1 &¤tThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {return false;}return true;
}
上述有几个点需要说明
- BBR是什么?负载怎么获取的?
- Constants.ENTRY_NODE中的指标是什么存储进去的?
- CPU又是怎么获取的
BBR算法
BBR (Bottleneck Bandwidth and Round-trip propagation time) 是 Google 开发的一种拥塞控制
算法,主要用于解决网络拥塞问题
。下面我们将上面的代码进行拆解下:
- 首先检查当前线程数是否大于 1,如果不是,则直接返回
true
,表示通过 BBR 检查。 - 如果当前线程数大于 1,那么检查当前线程数是否大于
(Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000)
maxSuccessQps()
是每秒最大成功请求数minRt()
是最小响应时间- 如果当前线程数大于这个计算值,那么返回
false
,表示未通过 BBR 检查。否则,返回true
。
用通俗的语言解释:检查当前线程数是否大于(每秒最大成功请求数 * 最小响应时间 / 1000),如果大于这个值,说明系统可能出现拥塞,返回 false
,否则返回 true
。
举个例子,假设 currentThread
为 5,maxSuccessQps()
为 10,minRt()
为 200。那么计算值为 (10 * 200) / 1000 = 2
。因为 currentThread
大于计算值,所以返回 false
,表示未通过 BBR 检查。
checkBbr
方法的目的是在系统负载较高的情况下,通过限制并行线程数来防止系统过载
Constants.ENTRY_NODE相关说明
其实Constants.ENTRY_NODE
的指标其实就是在ClusterNode
中统计的, 这个ClusterNode
专门用户统计某资源在全部Context
内的指标
public final static ClusterNode ENTRY_NODE = new ClusterNode(TOTAL_IN_RESOURCE_NAME, ResourceTypeConstants.COMMON);
ClusterNode
最终也是通过StatisticSlot
统计QPS、Thread、avgRt 这三个指标, 可以看到下边类图的继承关系
观察一下StatisticSlot
是怎么收集这个几个资源的, 下边展示核心代码, 非核心代码省略
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {try {// 其它代码...if (resourceWrapper.getEntryType() == EntryType.IN) {// 通过线程数Constants.ENTRY_NODE.increaseThreadNum();// QPS通过数Constants.ENTRY_NODE.addPassRequest(count);}} catch (PriorityWaitException ex) {// 其它代码...if (resourceWrapper.getEntryType() == EntryType.IN) {// 拒绝线程数Constants.ENTRY_NODE.increaseThreadNum();}if (resourceWrapper.getEntryType() == EntryType.IN) {// 拒绝QPS数Constants.ENTRY_NODE.increaseBlockQps(count);}}}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {// // 获取当前时间作为响应时间long completeStatTime = TimeUtil.currentTimeMillis();// rt(此次请求所耗费 的时间)= 响应时间 - 开始时间long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();// 如果是请求类型是 INif (resourceWrapper.getEntryType() == EntryType.IN) {// 则记录 rt 到 ClusterNoderecordCompleteFor(Constants.ENTRY_NODE, count, rt, error);}}
}
可以看到上边代码判断流量类型为 EntryType.IN, 才调用 Constants.ENTRY_NODE
相关的方法统计QPS、Thread、avgRt
补充说明, 记录的开始时间并不是在
StatisticSlot
的入口方法entry()
, 而是初始化资源的时因为StatisticSlot已经是责任链的第三个 Slot 了,前面已经经过一些Slot和其他逻辑
public Entry(ResourceWrapper resourceWrapper, int count, Object[] args) {this.resourceWrapper = resourceWrapper;// 记录开始时间this.createTimestamp = TimeUtil.currentTimeMillis();this.count = count;this.args = args; }
CPU相关指标
获取
Java提供了与之对应的API供我们获取CPU指标, sentinel直接在这个基础上进行了封装, 代码位于com.alibaba.csp.sentinel.slots.system.SystemStatusListener#run
, 这个工具类可以改造为我们所用
public class SystemStatusListener implements Runnable {volatile double currentLoad = -1;volatile double currentCpuUsage = -1;volatile long processCpuTime = 0;volatile long processUpTime = 0;/*通过JMX获取操作系统的系统负载、CPU使用率等指标信息,并计算当前进程的CPU使用率。如果系统负载超过预设阈值,则记录系统状态日志*/@Overridepublic void run() {try {// 获取操作系统的MXBean实例OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);// 获取系统平均负载值currentLoad = osBean.getSystemLoadAverage();// 获取系统CPU使用率, 0.0代表所有CPU完全空闲,1.0代表所有CPU一直在满负荷运行double systemCpuUsage = osBean.getSystemCpuLoad();RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class);// 获取当前进程的CPU时间(以纳秒为单位)long newProcessCpuTime = osBean.getProcessCpuTime();// 获取当前Java虚拟机的运行时间(以毫秒为单位)long newProcessUpTime = runtimeBean.getUptime();// 获取可用的CPU核心数量int cpuCores = osBean.getAvailableProcessors();// 计算前后两次采集之间进程CPU时间的差值,并转换成毫秒long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS.toMillis(newProcessCpuTime - processCpuTime);// 计算运行时间的差值long processUpTimeDiffInMs = newProcessUpTime - processUpTime;// 将CPU时间差除以运行时间差,然后除以可用CPU核心数。这样得到的结果是每个CPU核心上的平均进程CPU使用率double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores;// 更新全局变量存储最新的进程CPU时间和运行时间,以便下一次循环计算时使用processCpuTime = newProcessCpuTime;processUpTime = newProcessUpTime;// 将计算得到的进程CPU使用率与系统CPU使用率进行比较,取较大者作为当前CPU使用率currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage);// 如果当前系统负载(currentLoad)大于预先设定的阈值(SystemRuleManagerif (currentLoad > SystemRuleManager.getSystemLoadThreshold()) {// 调用writeSystemStatusLog()方法,将系统过载信息写入日志中writeSystemStatusLog();}} catch (Throwable e) {RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e);}}
}
获取频率
public final class SystemRuleManager {// 这种线程池的创建方式值的学习,因为使用了NamedThreadFactory,将线程池里的线程做到见名知意private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("sentinel-system-status-record-task", true));static {// 1s 执行一次scheduler.scheduleAtFixedRate(new SystemStatusListener(), 0, 1, TimeUnit.SECONDS);}
}
参考资料
通关 Sentinel 流量治理框架 - 编程界的小學生