项目地址
https://gitee.com/dromara/Akali
项目介绍
Akali(阿卡丽)是一个轻量级本地化热点检测/降级框架,适用于大流量场景,可轻松解决业务中超高流量的并发查询等场景。并且接入和使用极其简单,10秒钟即可接入使用!
Akali框架的理念就是小巧,实用,来无影去无踪,丝血团战,满血退场,所到之处,皆为虚无。
核心功能
对于核心方法,发现该方法高频使用,要么使用原有的数据进行返回(@AkaliHot
),要么使用指定的方法进行降级(@AkaliFallback
)。
源码拆解
系统启动
AkaliScanner
实现了InstantiationAwareBeanPostProcessor
,如果注册到Spring容器中的Bean存在AkaliFallback
和AkaliHot
注解标注的方法,创建代理类AkaliProxy
。
public class AkaliScanner implements InstantiationAwareBeanPostProcessor {@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {Class<?> clazz = bean.getClass();if (AkaliStrategy.class.isAssignableFrom(clazz)){AkaliStrategyManager.addStrategy((AkaliStrategy) bean);return bean;}AtomicBoolean needProxy = new AtomicBoolean(false);List<Method> fallbackMethodList = new ArrayList<>();List<Method> hotspotMethodList = new ArrayList<>();Arrays.stream(clazz.getDeclaredMethods()).forEach(method -> {AkaliFallback akaliFallback = searchAnnotation(method, AkaliFallback.class);if (ObjectUtil.isNotNull(akaliFallback)){fallbackMethodList.add(method);S.addMethodStr(MethodUtil.resolveMethodName(method), new Tuple2<>(AkaliStrategyEnum.FALLBACK, akaliFallback));needProxy.set(true);}AkaliHot akaliHot = searchAnnotation(method, AkaliHot.class);if (ObjectUtil.isNotNull(akaliHot)){hotspotMethodList.add(method);AkaliMethodManager.addMethodStr(MethodUtil.resolveMethodName(method), new Tuple2<>(AkaliStrategyEnum.HOT_METHOD, akaliHot));needProxy.set(true);}});if (needProxy.get()){try{AkaliProxy akaliProxy = new AkaliProxy(bean, fallbackMethodList, hotspotMethodList);Object obj = akaliProxy.proxy();return obj;}catch (Exception e){throw new BeanInitializationException(e.getMessage());}}else{return bean;}}
}
系统拦截
AopInvocationHandler
,拦截器的核心方法。注册对应的FlowRule,执行SphEngine.process
public class AopInvocationHandler implements InvocationHandler {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String methodStr = MethodUtil.resolveMethodName(method);if (AkaliMethodManager.contain(methodStr)){AkaliStrategyEnum akaliStrategyEnum = AkaliMethodManager.getAnnoInfo(methodStr).r1;Annotation anno = AkaliMethodManager.getAnnoInfo(methodStr).r2;if (anno instanceof AkaliFallback){AkaliRuleManager.registerFallbackRule((AkaliFallback) anno, method);}else if (anno instanceof AkaliHot){AkaliRuleManager.registerHotRule((AkaliHot) anno, method);}else{throw new RuntimeException("annotation type error");}return SphEngine.process(bean, method, args, methodStr, akaliStrategyEnum);}else {return method.invoke(bean, args);}}}
AkaliRuleManager#registerHotRule
,利用Sentinel框架注册流控规则。
public static void registerHotRule(AkaliHot akaliHot, Method method){String resourceKey = MethodUtil.resolveMethodName(method);if (!ParamFlowRuleManager.hasRules(resourceKey)){ParamFlowRule rule = new ParamFlowRule();rule.setResource(MethodUtil.resolveMethodName(method));rule.setGrade(akaliHot.grade().getGrade());rule.setCount(akaliHot.count());rule.setDurationInSec(akaliHot.duration());rule.setParamIdx(0);ParamFlowRuleManager.loadRules(ListUtil.toList(rule));log.info("[AKALI] Add Hot Rule [{}]", rule.getResource());}}
SphEngine
的处理逻辑就是如果流控是允许的,执行核心方法,如果流控不允许,执行对应的策略。
public class SphEngine {private static final Logger log = LoggerFactory.getLogger(SphEngine.class);public static Object process(Object bean, Method method, Object[] args, String methodStr, AkaliStrategyEnum akaliStrategyEnum) throws Exception{switch (akaliStrategyEnum){case FALLBACK:if (SphO.entry(methodStr)){try{return method.invoke(bean, args);}finally {SphO.exit();}}else{log.info("[AKALI]Trigger fallback strategy for [{}]", methodStr);return AkaliStrategyManager.getStrategy(akaliStrategyEnum).process(bean, method, args);}case HOT_METHOD:String convertParam = DigestUtil.md5Hex(JSON.toJSONString(args));Entry entry = null;try{entry = SphU.entry(methodStr, EntryType.IN, 1, convertParam);return method.invoke(bean, args);}catch (BlockException e){log.info("[AKALI]Trigger hotspot strategy for [{}]", methodStr);return AkaliStrategyManager.getStrategy(akaliStrategyEnum).process(bean, method, args);}finally {if (entry != null){entry.exit(1, convertParam);}}default:throw new Exception("[AKALI] Strategy error!");}}
}
FallbackStrategy
回退策略是获取方法名称是指定方法+Fallback
的方法,进行方法调用。
public class FallbackStrategy implements AkaliStrategy{private final Map<String, Method> fallBackMethodMap = new ConcurrentHashMap<>();@Overridepublic AkaliStrategyEnum getStrategy() {return AkaliStrategyEnum.FALLBACK;}@Overridepublic Object process(Object bean, Method method, Object[] args) throws Exception{String fallbackMethodName = StrUtil.format("{}Fallback", method.getName());Method fallbackMethod;if (fallBackMethodMap.containsKey(fallbackMethodName)){fallbackMethod = fallBackMethodMap.get(fallbackMethodName);}else{fallbackMethod = ReflectUtil.getMethod(bean.getClass(), fallbackMethodName, method.getParameterTypes());fallBackMethodMap.put(fallbackMethodName, fallbackMethod);}if (ObjectUtil.isNull(fallbackMethod)){throw new RuntimeException(StrUtil.format("[AKALI] Can't find fallback method [{}] in bean [{}]", fallbackMethodName, bean.getClass().getName()));}return fallbackMethod.invoke(bean, args);}
}
MethodHotspotStrategy
使用缓存,缓存中有数据就返回,没数据就调用方法。该地方使用的是hutool的缓存类。
public class MethodHotspotStrategy implements AkaliStrategy{private TimedCache<String, Object> timedCache;public MethodHotspotStrategy() {timedCache = CacheUtil.newTimedCache(1000 * 60);timedCache.schedulePrune(1000);}@Overridepublic AkaliStrategyEnum getStrategy() {return AkaliStrategyEnum.HOT_METHOD;}@Overridepublic Object process(Object bean, Method method, Object[] args) throws Exception{String hotKey = StrUtil.format("{}-{}", MethodUtil.resolveMethodName(method), DigestUtil.md5Hex(JSON.toJSONString(args)));if (timedCache.containsKey(hotKey)){return timedCache.get(hotKey);}else{Object result = method.invoke(bean, args);timedCache.put(hotKey, result);return result;}}
}