序
本文主要研究一下PowerJob的UseCacheLock
UseCacheLock
tech/powerjob/server/core/lock/UseCacheLock.java
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseCacheLock {String type();String key();int concurrencyLevel();
}
UseCacheLock注解定义了type、key、concurrencyLevel属性
UseCacheLockAspect
tech/powerjob/server/core/lock/UseCacheLockAspect.java
@Slf4j
@Aspect
@Component
@Order(1)
@RequiredArgsConstructor
public class UseCacheLockAspect {private final MonitorService monitorService;private final Map<String, Cache<String, ReentrantLock>> lockContainer = Maps.newConcurrentMap();private static final long SLOW_THRESHOLD = 100;@Around(value = "@annotation(useCacheLock))")public Object execute(ProceedingJoinPoint point, UseCacheLock useCacheLock) throws Throwable {Cache<String, ReentrantLock> lockCache = lockContainer.computeIfAbsent(useCacheLock.type(), ignore -> {int concurrencyLevel = useCacheLock.concurrencyLevel();log.info("[UseSegmentLockAspect] create Lock Cache for [{}] with concurrencyLevel: {}", useCacheLock.type(), concurrencyLevel);return CacheBuilder.newBuilder().initialCapacity(300000).maximumSize(500000).concurrencyLevel(concurrencyLevel).expireAfterWrite(30, TimeUnit.MINUTES).build();});final Method method = AOPUtils.parseMethod(point);Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);long start = System.currentTimeMillis();reentrantLock.lockInterruptibly();try {long timeCost = System.currentTimeMillis() - start;if (timeCost > SLOW_THRESHOLD) {final SlowLockEvent slowLockEvent = new SlowLockEvent().setType(SlowLockEvent.Type.LOCAL).setLockType(useCacheLock.type()).setLockKey(String.valueOf(key)).setCallerService(method.getDeclaringClass().getSimpleName()).setCallerMethod(method.getName()).setCost(timeCost);monitorService.monitor(slowLockEvent);log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost,key,JSON.toJSONString(point.getArgs()));}return point.proceed();} finally {reentrantLock.unlock();}}
}
UseCacheLockAspect拦截@UseCacheLock注解,它定义了lockContainer维护了useCacheLock.type()与Cache的关系;Cache采用的guava的Cache,其initialCapacity为300000,maximumSize为500000,expireAfterWrite为30分钟;Cache的key为lock key,value为ReentrantLock;其execute方法主要是先执行reentrantLock.lockInterruptibly(),然后执行point.proceed(),最后reentrantLock.unlock();执行point.proceed()之前还判断了一下加锁耗时,若超过SLOW_THRESHOLD(
100ms
)则通过monitorService.monitor上报SlowLockEvent
示例
@UseCacheLock(type = "processJobInstance", key = "#instanceId", concurrencyLevel = 1024)public void redispatchAsync(Long instanceId, int originStatus) {// 将状态重置为等待派发instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdAndOriginStatus(instanceId, originStatus, InstanceStatus.WAITING_DISPATCH.getV(), new Date());}
key支持SpEl
小结
PowerJob的UseCacheLock注解定义了type、key、concurrencyLevel属性;UseCacheLockAspect拦截@UseCacheLock注解,它定义了lockContainer维护了useCacheLock.type()与Cache的关系;而Cache的key为lock key,value为ReentrantLock,最后是通过reentrantLock.lockInterruptibly()加锁。