归档
- GitHub: Ali-Sentinel-集群流控
测试
-
参考:热点流控-测试
-
新建
ClusterDemoApplication2
public class ClusterDemoApplication2 {public static void main(String[] args) {System.setProperty("csp.sentinel.dashboard.server", "127.0.0.1:8080");System.setProperty("project.name", "My-Cluster-8866");System.setProperty("server.port", "10012");SpringApplication.run(ClusterDemoApplication.class, args);}
}
-
启动 2 应用后
- 访问:http://localhost:10010/hello/test1
- 访问:http://localhost:10012/hello/test1
-
在控制台设置
- 点击 集群流控:http://127.0.0.1:8080/#/dashboard/cluster/server/My-Cluster-8866
- 点击 + 新增 Token Server:
- 机器类型 使用默认:
应用内机器
- 选择机器 选第一个即可
- Server 端口 使用默认:
18730
- 最大允许 QPS 设置为:
2
- 请从中选取 client:
- 选第一个,再点击 →
- 机器类型 使用默认:
- 点击 保存
// POST http://127.0.0.1:8080//cluster/assign/single_server/My-Cluster-8866 {"clusterMap": {"machineId": "10.32.51.130@8720","ip": "10.32.51.130","port": 18730,"clientSet": ["10.32.51.130@8721"],"belongToApp": true,"maxAllowedQps": 2},"remainingList": [] }
- 点击 簇点链路:http://127.0.0.1:8080/#/dashboard/identity/My-Cluster-8866
- 在
sayHello
列,点击 + 流控:- 是否集群 打勾
- 集群阈值模式 选
总体阈值
- 集群阈值 设置为:
2
- 其他使用默认值
- 点击 新增
// POST http://127.0.0.1:8080//v1/flow/rule {"enable": false,"strategy": 0,"grade": 1,"controlBehavior": 0,"resource": "sayHello","limitApp": "default","clusterMode": true,"clusterConfig": {"thresholdType": "1"},"app": "My-Cluster-8866","ip": "10.32.51.130","port": "8720","count": 2 }
- 在
-
再测试
- 连续刷 3 次:http://localhost:10010/hello/test1
- 出现:
Oops, [test1] blocked by Sentinel
原理
改模式
- Path:
/cluster/assign/single_server/{app}
控制台
com.alibaba.csp.sentinel.dashboard.controller.cluster.ClusterAssignController
@RestController
@RequestMapping("/cluster/assign")
public class ClusterAssignController {// 全路径为: /cluster/assign/single_server/{app}@PostMapping("/single_server/{app}")public Result<ClusterAppAssignResultVO> apiAssignSingleClusterServersOfApp(@PathVariable String app,@RequestBody ClusterAppSingleServerAssignRequest assignRequest) {... // 参数校验try {return Result.ofSuccess(clusterAssignService.applyAssignToApp( // ref: sign_m_100app, Collections.singletonList(assignRequest.getClusterMap()),assignRequest.getRemainingList()));} ... // catch}
}
com.alibaba.csp.sentinel.dashboard.service.ClusterAssignServiceImpl
@Service
public class ClusterAssignServiceImpl implements ClusterAssignService {// sign_m_100@Overridepublic ClusterAppAssignResultVO applyAssignToApp(String app, List<ClusterAppAssignMap> clusterMap,Set<String> remainingSet) {... // 参数校验// 更改集群模式--为服务端 & 推送配置clusterMap.stream().filter(Objects::nonNull).filter(ClusterAppAssignMap::getBelongToApp).map(e -> {String ip = e.getIp(); // 传过来的值为: 10.32.51.130 (相当于连接到控制台的 Sentinel 使用者的机器 IP)int commandPort = parsePort(e); // 从 "machineId": "10.32.51.130@8720" 里取,取的值为: 8720CompletableFuture<Void> f = modifyMode(ip, commandPort, ClusterStateManager.CLUSTER_SERVER) // 改集群模式, ref: sign_m_101.thenCompose(v -> applyServerConfigChange(app, ip, commandPort, e)); // 推送配置, ref: sign_m_102 return Tuple2.of(e.getMachineId(), f);}).forEach(t -> handleFutureSync(t, failedServerSet)); // 失败或超时 (10s) 则记录到集合里// 更改集群模式--为客户端 & 推送配置clusterMap.parallelStream().filter(Objects::nonNull).forEach(e -> applyAllClientConfigChange(app, e, failedClientSet)); // 改集群模式为客户端并推送配置, ref: sign_m_110// 解绑剩余 (未分配的) 机器applyAllRemainingMachineSet(app, remainingSet, failedClientSet);return new ClusterAppAssignResultVO() ... // 组装失败的 Client/Server 集合并返回}// sign_m_101 改集群模式private CompletableFuture<Void> modifyMode(String ip, int port, int mode) {return sentinelApiClient.modifyClusterMode(ip, port, mode); // 命令为: setClusterMode, 处理者 ref: sign_c_110}// sign_m_102 推送配置给服务端private CompletableFuture<Void> applyServerConfigChange(String app, String ip, int commandPort,ClusterAppAssignMap assignMap) {ServerTransportConfig transportConfig = new ServerTransportConfig().setPort(assignMap.getPort()) // 传过来的值为: 18730.setIdleSeconds(600);/*** 推送服务端配置 (服务端口) * 命令为: cluster/server/modifyTransportConfig*/return sentinelApiClient.modifyClusterServerTransportConfig(app, ip, commandPort, transportConfig).thenCompose(v -> applyServerFlowConfigChange(app, ip, commandPort, assignMap)) // 推送集群整体流控 QPS, ref: sign_m_103.thenCompose(v -> applyServerNamespaceSetConfig(app, ip, commandPort, assignMap)); // 传过来的 namespaceSet 为空,不会处理,略...}// sign_m_103 推送集群整体流控 QPSprivate CompletableFuture<Void> applyServerFlowConfigChange(String app, String ip, int commandPort,ClusterAppAssignMap assignMap) {Double maxAllowedQps = assignMap.getMaxAllowedQps();... // maxAllowedQps 校验。注: QPS 大于 20w 则不推送/*** 推送集群流控配置* 命令为: cluster/server/modifyFlowConfig*/return sentinelApiClient.modifyClusterServerFlowConfig(app, ip, commandPort,new ServerFlowConfig().setMaxAllowedQps(maxAllowedQps));}// sign_m_110 改集群模式为客户端并推送配置private void applyAllClientConfigChange(String app, ClusterAppAssignMap assignMap,Set<String> failedSet) {Set<String> clientSet = assignMap.getClientSet();... // 空校验final String serverIp = assignMap.getIp(); // 集群服务端 IP: 10.32.51.130final int serverPort = assignMap.getPort(); // 集群服务端端口: 18730clientSet.stream().map(MachineUtils::parseCommandIpAndPort).filter(Optional::isPresent).map(Optional::get).map(ipPort -> { // 将客户端机器 ID 用 @ 符截取成 [ip, port] 元组CompletableFuture<Void> f = sentinelApiClient/*** 改集群模式为客户端,命令为: setClusterMode* 处理者 ref: sign_c_110*/.modifyClusterMode(ipPort.r1, ipPort.r2, ClusterStateManager.CLUSTER_CLIENT)/*** 推送客户端配置 (服务端的 IP 和端口) * 命令为: cluster/client/modifyConfig*/.thenCompose(v -> sentinelApiClient.modifyClusterClientConfig(app, ipPort.r1, ipPort.r2,new ClusterClientConfig().setRequestTimeout(20).setServerHost(serverIp).setServerPort(serverPort)));return Tuple2.of(ipPort.r1 + '@' + ipPort.r2, f);}).forEach(t -> handleFutureSync(t, failedSet)); // 失败或超时 (10s) 则记录到集合里}}
应用机器
com.alibaba.csp.sentinel.command.handler.cluster.ModifyClusterModeCommandHandler
// sign_c_110 改集群模式命令处理者
@CommandMapping(name = "setClusterMode", desc = "set cluster mode...")
public class ModifyClusterModeCommandHandler implements CommandHandler<String> {@Overridepublic CommandResponse<String> handle(CommandRequest request) {try {int mode = Integer.valueOf(request.getParam("mode"));... // 对模式对应的 SPI 的校验ClusterStateManager.applyState(mode); // 改模式, ref: sign_m_200return CommandResponse.ofSuccess("success");} ... // catch}
}
com.alibaba.csp.sentinel.cluster.ClusterStateManager
DynamicSentinelProperty
参考:入口控制-设置规则 sign_c_100
public final class ClusterStateManager {private static volatile SentinelProperty<Integer> stateProperty = new DynamicSentinelProperty<Integer>();private static final PropertyListener<Integer> PROPERTY_LISTENER = new ClusterStatePropertyListener();static {InitExecutor.doInit();stateProperty.addListener(PROPERTY_LISTENER);}// sign_m_200 改模式public static void applyState(Integer state) {stateProperty.updateValue(state); // 最终触发内部处理, ref: sign_m_210}private static class ClusterStatePropertyListener implements PropertyListener<Integer> {... // configLoad 实现@Overridepublic synchronized void configUpdate(Integer value) {applyStateInternal(value);}}// sign_m_210 状态更改处理private static boolean applyStateInternal(Integer state) {... // state 校验try {switch (state) {case CLUSTER_CLIENT:return setToClient(); // 设置为客户端模式, ref: sign_m_211case CLUSTER_SERVER:return setToServer(); // 设置为服务端模式, ref: sign_m_215case CLUSTER_NOT_STARTED:setStop();return true;... // default 处理}} ... // catch}// sign_m_211 设置为客户端模式public static boolean setToClient() {if (mode == CLUSTER_CLIENT) {return true;}mode = CLUSTER_CLIENT;sleepIfNeeded(); // 转换完之后,至少要等 5s 才能换模式lastModified = TimeUtil.currentTimeMillis();return startClient(); // ref: sign_m_212}// sign_m_212 启动客户端并关闭服务端private static boolean startClient() {try {EmbeddedClusterTokenServer server = EmbeddedClusterTokenServerProvider.getServer();if (server != null) {server.stop();}ClusterTokenClient tokenClient = TokenClientProvider.getClient();if (tokenClient != null) { // 默认实现为 DefaultClusterTokenClient, ref: sign_c_230tokenClient.start(); // ref: sign_m_230RecordLog.info("[ClusterStateManager] Changing cluster mode to client");return true;} ... // else} ... // catch}// sign_m_215 设置为服务端模式public static boolean setToServer() {... // 同样要等 5s 才能换模式return startServer(); // ref: sign_m_216}// sign_m_216 启动服务端并关闭客户端private static boolean startServer() {try {ClusterTokenClient tokenClient = TokenClientProvider.getClient();if (tokenClient != null) {tokenClient.stop();}EmbeddedClusterTokenServer server = EmbeddedClusterTokenServerProvider.getServer();if (server != null) { // 默认实现为 DefaultEmbeddedTokenServer, ref: sign_c_220server.start(); // ref: sign_m_218RecordLog.info("[ClusterStateManager] Changing cluster mode to server");return true;} ... // else} ... // catch}
}
启用服务端
com.alibaba.csp.sentinel.cluster.server.DefaultEmbeddedTokenServer
/** sign_c_220 默认内嵌 Token 服务器 */
public class DefaultEmbeddedTokenServer implements EmbeddedClusterTokenServer {// 相当于对此进行一层封装private final ClusterTokenServer server = new SentinelDefaultTokenServer(true); // ref: sign_cm_221// sign_m_218@Overridepublic void start() throws Exception {server.start(); // ref: sign_m_220}
}
com.alibaba.csp.sentinel.cluster.server.SentinelDefaultTokenServer
public class SentinelDefaultTokenServer implements ClusterTokenServer {// sign_cm_221public SentinelDefaultTokenServer(boolean embedded) {this.embedded = embedded;ClusterServerConfigManager.addTransportConfigChangeObserver(new ServerTransportConfigObserver() {@Overridepublic void onTransportConfigChange(ServerTransportConfig config) {changeServerConfig(config); // ref: sign_m_221}});initNewServer(); // 只是创建,并未启动...}// sign_m_220@Overridepublic void start() throws Exception {if (shouldStart.compareAndSet(false, true)) {startServerIfScheduled(); // ref: sign_m_222}}// sign_m_221private synchronized void changeServerConfig(ServerTransportConfig config) {...int newPort = config.getPort();... // 新端口与旧端口相同,则返回。(相当于界面用非默认端口 `18730`,则会启动 2 次 Netty 服务)try {if (server != null) {stopServer();}this.server = new NettyTransportServer(newPort);this.port = newPort;startServerIfScheduled(); } ... // catch}// sign_m_222private void startServerIfScheduled() throws Exception {if (shouldStart.get()) {if (server != null) {server.start(); // 启动 Netty 服务, ref: sign_m_223ClusterStateManager.markToServer();if (embedded) {handleEmbeddedStart();}}}}
}
Netty-服务
com.alibaba.csp.sentinel.cluster.server.NettyTransportServer
public class NettyTransportServer implements ClusterTokenServer {private final int port;public NettyTransportServer(int port) {this.port = port;}// sign_m_223 启动 Netty 服务@Overridepublic void start() {if (!currentState.compareAndSet(SERVER_STATUS_OFF, SERVER_STATUS_STARTING)) {return;}ServerBootstrap b = new ServerBootstrap();this.bossGroup = new NioEventLoopGroup(1);this.workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS);b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));p.addLast(new NettyRequestDecoder());p.addLast(new LengthFieldPrepender(2));p.addLast(new NettyResponseEncoder());p.addLast(new TokenServerHandler(connectionPool));}})... // childOptionb.bind(port).addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture future) {if (future.cause() != null) {...try {Thread.sleep(failCount * RETRY_SLEEP_MS); // 等待 n * 2sstart(); // 有异常,则尝试 3 次启动, ref: sign_m_223} ... // catch} else {// 没有异常,则改状态为启动完成currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_STARTED);}}});}
}
启用客户端
com.alibaba.csp.sentinel.cluster.client.DefaultClusterTokenClient
/** sign_c_230 默认客户端 */
public class DefaultClusterTokenClient implements ClusterTokenClient {// SPI 加载时会调用public DefaultClusterTokenClient() {ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() {@Overridepublic void onRemoteServerChange(ClusterClientAssignConfig assignConfig) {changeServer(assignConfig); // ref: sign_m_231}});initNewConnection(); // host 为空,不会创建连接...}// sign_m_230@Overridepublic void start() throws Exception {if (shouldStart.compareAndSet(false, true)) {startClientIfScheduled(); // ref: sign_m_232}}// sign_m_231private void changeServer(ClusterClientAssignConfig config) {... // 服务端的 IP 和端口没改,则返回不处理try {if (transportClient != null) {transportClient.stop();}this.transportClient = new NettyTransportClient(config.getServerHost(), config.getServerPort());... // 记录配置startClientIfScheduled(); // ref: sign_m_232} ... // catch}// sign_m_232private void startClientIfScheduled() throws Exception {if (shouldStart.get()) {if (transportClient != null) {transportClient.start(); // 启动 Netty 客户端连接, ref: sign_m_233} ... // else}}
}
Netty-客户端连接
com.alibaba.csp.sentinel.cluster.client.NettyTransportClient
public class NettyTransportClient implements ClusterTransportClient {private final String host;private final int port;public NettyTransportClient(String host, int port) {... // 校验参数this.host = host;this.port = port;}// sign_m_233 启动 Netty 客户端连接@Overridepublic void start() throws Exception {shouldRetry.set(true);startInternal(); // ref: sign_m_234}// sign_m_234private void startInternal() {connect(initClientBootstrap()); // sign_m_235 || sign_m_236}// sign_m_235private Bootstrap initClientBootstrap() {Bootstrap b = new Bootstrap();eventLoopGroup = new NioEventLoopGroup();b.group(eventLoopGroup).channel(NioSocketChannel.class)... // option.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {clientHandler = new TokenClientHandler(currentState, disconnectCallback);ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));pipeline.addLast(new NettyResponseDecoder());pipeline.addLast(new LengthFieldPrepender(2));pipeline.addLast(new NettyRequestEncoder());pipeline.addLast(clientHandler);}});return b;}// sign_m_236private void connect(Bootstrap b) {if (currentState.compareAndSet(CLIENT_STATUS_OFF, CLIENT_STATUS_PENDING)) {b.connect(host, port).addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture future) {if (future.cause() != null) { // 有异常,连接失败... // logfailConnectedTime.incrementAndGet();channel = null;} else { // 无异常,连接成功failConnectedTime.set(0);channel = future.channel();... // log}}});}}
}
更新规则
- 参考:WebUI-更新规则-应用端 sign_c_200
- 服务端并没有将规则同步到客户端
- 更新规则的入口在
ModifyClusterFlowRulesCommandHandler ( cluster/server/modifyFlowRules )
ClusterFlowRuleManager #loadRules
ClusterFlowRuleManager #FLOW_RULES
- 集群规则现在还没有同步,需要自己改造控制台
- ref: https://github.com/alibaba/Sentinel/issues/1670
- 可用 V2,也可用配置源
- 然后在
ModifyRulesCommandHandler
用 SPI 做下扩展
- 然后在
- 最主要的是要保持
ruleId
一致
流控原理
-
参考:
- 链路控制-ClusterBuilderSlot
- 链路控制-FlowSlot sign_m_721
-
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker
public class FlowRuleChecker {// sign_m_411 集群流控 (调用者, ref: sign_m_721)private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {try {// 客户端模式下,SPI 提供的实现者为: DefaultClusterTokenClient, ref: sign_c_421TokenService clusterService = pickClusterService();... // clusterService 为空则回退到本地单机校验long flowId = rule.getClusterConfig().getFlowId();// 根据规则 id 请求 token 服务器,以获取流控结果. ref: sign_m_421 TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);return applyTokenResult(result, rule, context, node, acquireCount, prioritized); // ref: sign_m_412} ... // catch// 回退到本地单机校验return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);}// sign_m_412 根据流控结果进行处理private static boolean applyTokenResult(TokenResult result, FlowRule rule, Context context,DefaultNode node,int acquireCount, boolean prioritized) {switch (result.getStatus()) {case TokenResultStatus.OK:return true; // 放行case TokenResultStatus.SHOULD_WAIT:try {Thread.sleep(result.getWaitInMs());} catch (InterruptedException e) {e.printStackTrace();}return true; // 等待指定时间后放行case TokenResultStatus.NO_RULE_EXISTS:case TokenResultStatus.BAD_REQUEST:case TokenResultStatus.FAIL:case TokenResultStatus.TOO_MANY_REQUEST:// 回退到本地单机校验return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);case TokenResultStatus.BLOCKED:default:return false;}}}
com.alibaba.csp.sentinel.cluster.client.DefaultClusterTokenClient
// sign_c_421
public class DefaultClusterTokenClient implements ClusterTokenClient {// sign_m_421 请求 token 服务器,获取流控结果@Overridepublic TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {...FlowRequestData data = new FlowRequestData().setCount(acquireCount).setFlowId(flowId).setPriority(prioritized);// 此请求的处理者为 FlowRequestProcessor, ref: sign_c_510 | sign_m_510ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);try {TokenResult result = sendTokenRequest(request); // ref: sign_m_422...return result;} ... // catch}// sign_m_422private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {...// 通过 Netty 客户端 (NettyTransportClient) 发送ClusterResponse response = transportClient.sendRequest(request);TokenResult result = new TokenResult(response.getStatus());if (response.getData() != null) {FlowTokenResponseData responseData = (FlowTokenResponseData) response.getData();result.setRemaining(responseData.getRemainingCount()).setWaitInMs(responseData.getWaitInMs());}return result;}
}
服务端流控
com.alibaba.csp.sentinel.cluster.server.processor.FlowRequestProcessor
// sign_c_510 流控请求处理器
@RequestType(ClusterConstants.MSG_TYPE_FLOW)
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {// sign_m_510@Overridepublic ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {// 服务端模式下,SPI 提供的实现者为 DefaultTokenService, ref: sign_c_521TokenService tokenService = TokenServiceProvider.getService();long flowId = request.getData().getFlowId();int count = request.getData().getCount();boolean prioritized = request.getData().isPriority();TokenResult result = tokenService.requestToken(flowId, count, prioritized); // ref: sign_m_521return toResponse(result, request); // ref: sign_m_511}// sign_m_511private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),new FlowTokenResponseData().setRemainingCount(result.getRemaining()).setWaitInMs(result.getWaitInMs()));}}
com.alibaba.csp.sentinel.cluster.flow.DefaultTokenService
// sign_c_521
@Spi(isDefault = true)
public class DefaultTokenService implements TokenService {// sign_m_521@Overridepublic TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {...FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);if (rule == null) { // 测试的时候,为 null (因为没同步)return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);}// 集群流控校验 (逻辑较简单。集群指标在 ClusterFlowRuleManager #applyClusterFlowRule 里初始化)return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);}}
总结
- 集群流控时,每次请求都会转发到服务端(由服务端去控制)