文章目录
- Nacos安全机制介绍
- Nacos代码实现
- Nacos限流机制
- Nacos限流的代码实现
Nacos安全机制介绍
一、Nacos安全控制机制
Nacos 提供了多种安全控制机制,以保证服务和配置的访问安全:
-
身份验证 (Authentication)
Nacos 支持用户身份验证来防止未授权的访问。其实现方式如下:
Basic Authentication:通过 HTTP 请求头传递用户的认证信息(如 Authorization),Nacos 会校验用户名和密码。
OAuth2 / Token 认证:除了基本的用户名密码认证外,Nacos 也可以配置 OAuth2 认证机制,允许使用第三方认证系统(例如 LDAP 或自定义身份验证服务)。 -
权限控制 (Authorization)
在身份验证的基础上,Nacos 提供了基于角色的权限控制:
资源授权:通过注解的方式对 API 接口进行权限控制,例如使用 @Secured 注解标记需要权限验证的接口。每个接口可以指定具体的权限(如读、写、删除等)。
角色分配:用户可以被分配不同的角色,角色对应不同的权限,例如,管理员拥有所有权限,普通用户仅能访问部分资源。
Nacos代码实现
权限的核心注解类
@Retention(RetentionPolicy.RUNTIME)
public @interface Secured {/*** READ,WRITE,DELETE,UPDATE;默认读权限*/ActionTypes action() default ActionTypes.READ;/**定义与请求相关的资源名称。资源名称用于标识请求操作的是哪一类资源(例如:配置、服务等)。默认为空字符串,但你可以指定具体的资源名称。*/String resource() default StringUtils.EMPTY;/**表示请求使用的签名类型。默认为 SignType.NAMING。SignType 可能是一个枚举,表示不同的签名机制,例如 NAMING、HASH、API_KEY 等。*/String signType() default SignType.NAMING;/**允许使用自定义的资源解析器类来解析资源。默认为 DefaultResourceParser.class,但你可以指定自定义的类来实现 ResourceParser,定制资源的解析方式。*/Class<? extends ResourceParser> parser() default DefaultResourceParser.class;/**允许为资源指定额外的标签,这些标签会作为键值对注入到 Resource 对象中,用于细化资源的授权控制。标签是以字符串形式传递的,可以用于更精细的权限控制。*/String[] tags() default {};
servlet过滤器,在我们的请求进来时,我们判断当前的请求是否还有@Secured
public class AuthFilter implements Filter {private final AuthConfigs authConfigs;private final ControllerMethodsCache methodsCache;private final HttpProtocolAuthService protocolAuthService;public AuthFilter(AuthConfigs authConfigs, ControllerMethodsCache methodsCache) {this.authConfigs = authConfigs;this.methodsCache = methodsCache;this.protocolAuthService = new HttpProtocolAuthService(authConfigs);this.protocolAuthService.initialize();}@Overridepublic void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)throws IOException, ServletException {//未启用权限验证if (!authConfigs.isAuthEnabled()) {chain.doFilter(request, response);return;}HttpServletRequest req = (HttpServletRequest) request;HttpServletResponse resp = (HttpServletResponse) response;//. 支持 user-agent 白名单跳过鉴权if (authConfigs.isEnableUserAgentAuthWhite()) {String userAgent = WebUtils.getUserAgent(req);if (StringUtils.startsWith(userAgent, Constants.NACOS_SERVER_HEADER)) {chain.doFilter(request, response);return;}//服务身份头鉴权(用于集群内部通信)} else if (StringUtils.isNotBlank(authConfigs.getServerIdentityKey()) && StringUtils.isNotBlank(authConfigs.getServerIdentityValue())) {String serverIdentity = req.getHeader(authConfigs.getServerIdentityKey());if (StringUtils.isNotBlank(serverIdentity)) {if (authConfigs.getServerIdentityValue().equals(serverIdentity)) {chain.doFilter(request, response);return;}Loggers.AUTH.warn("Invalid server identity value for {} from {}", authConfigs.getServerIdentityKey(),req.getRemoteHost());}} else {resp.sendError(HttpServletResponse.SC_FORBIDDEN,"Invalid server identity key or value, Please make sure set `nacos.core.auth.server.identity.key`"+ " and `nacos.core.auth.server.identity.value`, or open `nacos.core.auth.enable.userAgentAuthWhite`");return;}try {//获取请求的方法Method method = methodsCache.getMethod(req);//方法为空,直接结束,继续进行下面的校验if (method == null) {chain.doFilter(request, response);return;}// 判断当前注解是否开启if (method.isAnnotationPresent(Secured.class) && authConfigs.isAuthEnabled()) {if (Loggers.AUTH.isDebugEnabled()) {Loggers.AUTH.debug("auth start, request: {} {}", req.getMethod(), req.getRequestURI());}Secured secured = method.getAnnotation(Secured.class);if (!protocolAuthService.enableAuth(secured)) {chain.doFilter(request, response);return;}Resource resource = protocolAuthService.parseResource(req, secured);//解析资源IdentityContext identityContext = protocolAuthService.parseIdentity(req);//解析身份boolean result = protocolAuthService.validateIdentity(identityContext, resource);//校验身份和权限//校验失败抛出异常if (!result) {// TODO Get reason of failurethrow new AccessException("Validate Identity failed.");}injectIdentityId(req, identityContext);String action = secured.action().toString();result = protocolAuthService.validateAuthority(identityContext, new Permission(resource, action));if (!result) {// TODO Get reason of failurethrow new AccessException("Validate Authority failed.");}}chain.doFilter(request, response);} catch (AccessException e) {if (Loggers.AUTH.isDebugEnabled()) {Loggers.AUTH.debug("access denied, request: {} {}, reason: {}", req.getMethod(), req.getRequestURI(),e.getErrMsg());}resp.sendError(HttpServletResponse.SC_FORBIDDEN, e.getErrMsg());} catch (IllegalArgumentException e) {resp.sendError(HttpServletResponse.SC_BAD_REQUEST, ExceptionUtil.getAllExceptionMsg(e));} catch (Exception e) {Loggers.AUTH.warn("[AUTH-FILTER] Server failed: ", e);resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Server failed, " + e.getMessage());}}
注册我们的过滤器
@Configuration
public class AuthConfig {@Beanpublic FilterRegistrationBean<AuthFilter> authFilterRegistration(AuthFilter authFilter) {FilterRegistrationBean<AuthFilter> registration = new FilterRegistrationBean<>();registration.setFilter(authFilter);//注册权限过滤器registration.addUrlPatterns("/*");//任何的接口都允许registration.setName("authFilter");//设置名称registration.setOrder(6);//设置优先级return registration;}@Beanpublic AuthFilter authFilter(AuthConfigs authConfigs, ControllerMethodsCache methodsCache) {return new AuthFilter(authConfigs, methodsCache);}
}
Nacos限流机制
Nacos 在流量控制和限流方面,提供了以下机制来保护系统免受超载:
- 请求限流 (TpsControl)
TpsControl 注解:通过 @TpsControl 注解,可以对某个方法进行限流,控制每秒钟请求的数量(TPS)。这对于防止 Nacos 服务受到过多请求而导致性能下降非常重要。
@TpsControl(pointName = "nacosConfigRequest", limit = 100)
public Response handleConfigRequest(Request request) {// 限流请求,限制每秒最多100个请求
}
TpsControlManager:负责管理所有限流点的状态和配置,可以动态修改每个点的 TPS 限制。
通过使用 TpsControl,可以保证 Nacos 服务在流量高峰期间不会因为流量过大而导致崩溃或服务不可用。
2. 请求重试与超时控制
请求重试机制:Nacos 内部有一套请求重试机制,允许在网络出现故障时进行重试。请求会根据一定的规则进行重试,避免出现单点故障导致系统不可用。
请求超时控制:对于某些请求,Nacos 会设置超时时间,防止长时间没有响应的请求占用系统资源。
3. 长轮询控制
对于长轮询请求,Nacos 会控制每个客户端的最大并发请求数量,防止单个客户端占用过多资源而导致其他客户端的请求被延迟。
4. 动态限流(基于请求属性)
通过 请求属性(如请求路径、请求参数、客户端 IP 等)可以动态地进行流量限制。例如,可以针对某些 API 或某些高风险的操作(如配置更新、服务注册等)设置更严格的限流策略。
Nacos限流的代码实现
限流的注解实现
@Retention(RetentionPolicy.RUNTIME)
public @interface TpsControl {/*** 控制点的别名(可选)。* 主要是为了给控制点提供更人性化的名称,也可能用于日志或监控中展示。*/String name() default "";/*** 必填。真正用于限流控制的“控制点名称”。* 这是唯一标识某个接口或操作被限流管理的依据。*/String pointName();}
利用SpringMvc的拦截器拦哦判断限流规则
public class NacosHttpTpsControlInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {try {if (handler instanceof HandlerMethod) {Method method = ((HandlerMethod) handler).getMethod();if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {TpsControl tpsControl = method.getAnnotation(TpsControl.class);String pointName = tpsControl.pointName();HttpTpsCheckRequestParser parser = HttpTpsCheckRequestParserRegistry.getParser(pointName);TpsCheckRequest httpTpsCheckRequest = null;if (parser != null) {httpTpsCheckRequest = parser.parse(request);}if (httpTpsCheckRequest == null) {httpTpsCheckRequest = new TpsCheckRequest();}httpTpsCheckRequest.setPointName(pointName);TpsCheckResponse checkResponse = ControlManagerCenter.getInstance().getTpsControlManager().check(httpTpsCheckRequest);if (!checkResponse.isSuccess()) {generate503Response(request, response, checkResponse.getMessage());return false;}}}} catch (Throwable throwable) {Loggers.TPS.error("Error to check tps control", throwable);}return true;}void generate503Response(HttpServletRequest request, HttpServletResponse response, String message) {try {// Disable cache.response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);response.getWriter().println(message);} catch (Exception ex) {Loggers.TPS.error("Error to generate tps 503 response", ex);}}
}
Tps限流的规则
public class TpsControlRequestFilter extends AbstractRequestFilter {TpsControlManager tpsControlManager = ControlManagerCenter.getInstance().getTpsControlManager();@Overrideprotected Response filter(Request request, RequestMeta meta, Class handlerClazz) {Method method = null;try {//获取当钱的方法method = getHandleMethod(handlerClazz);} catch (NacosException e) {return null;}//如果当前的方法被注解TpsControl修饰,并且限流开关已经开启if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {try {//获取注解信息TpsControl tpsControl = method.getAnnotation(TpsControl.class);//获取自定义的限流名称String pointName = tpsControl.pointName();TpsCheckRequest tpsCheckRequest = null;//校验解析的名称String parseName = StringUtils.isBlank(tpsControl.name()) ? pointName : tpsControl.name();RemoteTpsCheckRequestParser parser = RemoteTpsCheckRequestParserRegistry.getParser(parseName);if (parser != null) {tpsCheckRequest = parser.parse(request, meta);}//判断tps检查请求是否为nullif (tpsCheckRequest == null) {tpsCheckRequest = new TpsCheckRequest();}tpsCheckRequest.setPointName(pointName);//核心点TpsCheckResponse check = tpsControlManager.check(tpsCheckRequest);//如果返回的结果为失败if (!check.isSuccess()) {Response response;try {response = super.getDefaultResponseInstance(handlerClazz);response.setErrorInfo(NacosException.OVER_THRESHOLD,"Tps Flow restricted:" + check.getMessage());return response;} catch (Exception e) {com.alibaba.nacos.plugin.control.Loggers.TPS.warn("Tps check fail , request: {},exception:{}", request.getClass().getSimpleName(),e);return null;}}} catch (Throwable throwable) {com.alibaba.nacos.plugin.control.Loggers.TPS.warn("Tps check exception , request: {},exception:{}", request.getClass().getSimpleName(),throwable);}}return null;}
}
检查当前的Tps
public TpsCheckResponse check(TpsCheckRequest tpsRequest) {//判断point中是否包含当前方法的名称if (points.containsKey(tpsRequest.getPointName())) {try {//包含就直接回去,返回成功return points.get(tpsRequest.getPointName()).applyTps(tpsRequest);} catch (Throwable throwable) {Loggers.TPS.warn("[{}]apply tps error,error={}", tpsRequest.getPointName(), throwable);}}//不包含也直接返回成功的结果return new TpsCheckResponse(true, TpsResultCode.CHECK_SKIP, "skip");}public TpsCheckResponse applyTps(BarrierCheckRequest barrierCheckRequest) {//添加当前的请求的数量rateCounter.add(barrierCheckRequest.getTimestamp(), barrierCheckRequest.getCount());//返回正确的请求响应return new TpsCheckResponse(true, TpsResultCode.PASS_BY_POINT, "success");}//请求的滑动窗口算法public TpsSlot createSlotIfAbsent(long timeStamp) {//startTime 是滑动窗口的起始时间戳。//distance 是当前时间戳距离 startTime 的偏移。long distance = timeStamp - startTime;//如果时间戳早于 startTime(即 distance < 0),为了避免负数下标,先加上整个滑动窗口的周期总时长(例如 10秒 * 1000ms)。// 然后除以每个时间槽的周期(一般为 1000ms)以得到偏移槽位数。long diff = (distance < 0 ? distance + getPeriod().toMillis(1) * DEFAULT_RECORD_SIZE : distance) / getPeriod().toMillis(1);//当前时间戳应该属于哪个时间窗口(对齐到周期的时间点)。long currentWindowTime = startTime + diff * getPeriod().toMillis(1);//用偏移量对槽位数量取模,确定当前时间窗口的槽位 index。int index = (int) diff % DEFAULT_RECORD_SIZE;TpsSlot tpsSlot = slotList.get(index);//如果槽的时间不是当前窗口时间(说明槽是旧的),重置它。if (tpsSlot.time != currentWindowTime) {tpsSlot.reset(currentWindowTime);}//返回这个属于当前窗口的槽。return slotList.get(index);}//重置public void reset(long second) {synchronized (this) {if (this.time != second) {this.time = second;countHolder.count.set(0L);countHolder.interceptedCount.set(0);}}}