一、场景
项目A监听mq中的其他项目的部署消息(包括push_seq, status, environment,timestamp等),然后将部署消息同步到数据库中(项目X在对应环境[environment]上部署的push_seq[项目X的版本])。那么问题来了,mq中加入包含了两个部署消息 dm1 和 dm2,dm2的push_seq > dm1的push_seq,在分布式的情况下,dm1 和 dm2可能会分别被消费(也就是并行),那么在同步数据库的时候可能会发生 dm1 的数据保存 后于 dm2的数据保存,导致保存项目的部署信息发生异常。
二、解决思路
将mq消息的并行消费变成串行消费,这里借助redis分布式锁来完成。同一个服务在分布式的状态下,监听到mq消息后,触发方法的执行,执行之前(通过spring aop around来做的)首先获得redis的一个分布式锁,获取锁成功之后才能执行相关的逻辑以及数据库的保存,最后释放锁。
三、主要代码
import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;/** * @author: hujunzheng * @create: 17/9/29 下午2:49 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface RedisLock {/*** redis的key* @return*/String value();/*** 持锁时间,单位毫秒,默认一分钟*/long keepMills() default 60000;/*** 当获取失败时候动作*/LockFailAction action() default LockFailAction.GIVEUP;public enum LockFailAction{/*** 放弃*/GIVEUP,/*** 继续*/CONTINUE;}/*** 睡眠时间,设置GIVEUP忽略此项* @return*/long sleepMills() default 500; }
import java.lang.reflect.Method; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/** * @author: hujunzheng * @create: 17/9/29 下午2:49 */ @Component @Aspect public class RedisLockAspect {private static final Log log = LogFactory.getLog(RedisLockAspect.class);@Autowiredprivate RedisCacheTemplate.RedisLockOperation redisLockOperation;@Pointcut("execution(* com.hjzgg..StargateDeployMessageConsumer.consumeStargateDeployMessage(..))" +"&& @annotation(me.ele.api.portal.service.redis.RedisLock)")private void lockPoint(){}@Around("lockPoint()")public Object arround(ProceedingJoinPoint pjp) throws Throwable{MethodSignature methodSignature = (MethodSignature) pjp.getSignature();Method method = methodSignature.getMethod();RedisLock lockInfo = method.getAnnotation(RedisLock.class);/*String lockKey = lockInfo.value();if (method.getParameters().length == 1 && pjp.getArgs()[0] instanceof DeployMessage) {DeployMessage deployMessage = (DeployMessage) pjp.getArgs()[0];lockKey += deployMessage.getEnv();System.out.println(lockKey);}*/boolean lock = false;Object obj = null;while(!lock){long timestamp = System.currentTimeMillis()+lockInfo.keepMills();lock = setNX(lockInfo.value(), timestamp);//得到锁,已过期并且成功设置后旧的时间戳依然是过期的,可以认为获取到了锁(成功设置防止锁竞争)long now = System.currentTimeMillis();if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){log.info("得到redis分布式锁...");obj = pjp.proceed();if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){releaseLock(lockInfo.value());}}else{if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){log.info("稍后重新请求redis分布式锁...");Thread.currentThread().sleep(lockInfo.sleepMills());}else{log.info("放弃redis分布式锁...");break;}}}return obj;}private boolean setNX(String key,Long value){return redisLockOperation.setNX(key, value);}private long getLock(String key){return redisLockOperation.get(key);}private Long getSet(String key,Long value){return redisLockOperation.getSet(key, value);}private void releaseLock(String key){redisLockOperation.delete(key);}@Pointcut(value = "execution(* me.ele..StargateBuildMessageConsumer.consumeStargateBuildMessage(me.ele.api.portal.service.mq.dto.BuildMessage)) && args(buildMessage)" +"&& @annotation(me.ele.api.portal.service.redis.RedisLock)", argNames = "buildMessage")private void buildMessageLockPoint(BuildMessage buildMessage){}@Around(value = "buildMessageLockPoint(buildMessage)", argNames = "pjp,buildMessage")public Object buildMessageAround(ProceedingJoinPoint pjp, BuildMessage buildMessage) throws Throwable {final String LOCK = buildMessage.getAppId() + buildMessage.getPushSequence();Lock lock = redisLockRegistry.obtain(LOCK);try {lock.lock();return pjp.proceed();} finally {try {lock.unlock();} catch (Exception e) {log.error("buildMessage={}, Lock {} unlock failed. {}", buildMessage, lock, e);}}}}
四、遇到的问题
开始是将锁加到deploy的方法上的,但是一直aop一直没有作用,换到consumeStargateDeployMessage方法上就可以了。考虑了一下是因为 @Transactional的原因。这里注意下。
在一篇文章中找到了原因:SpringBoot CGLIB AOP解决Spring事务,对象调用自己方法事务失效.
只要脱离了Spring容器管理的所有对象,对于SpringAOP的注解都会失效,因为他们不是Spring容器的代理类,SpringAOP,就切入不了。也就是说是 @Transactional注解方法的代理对象并不是spring代理对象。
参考: 关于proxy模式下,@Transactional标签在创建代理对象时的应用
五、使用spring-redis中的RedisLockRegistry
import java.util.concurrent.locks.Lock; import org.springframework.integration.redis.util.RedisLockRegistry;@Bean public RedisLockRegistry redisLockRegistry(@Value("${xxx.xxxx.registry}") String redisRegistryKey,RedisTemplate redisTemplate) {return new RedisLockRegistry(redisTemplate.getConnectionFactory(), redisRegistryKey, 200000); }Lock lock = redisLockRegistry.obtain(appId);lock.tryLock(180, TimeUnit.SECONDS); .... lock.unlock();
六、参考
其他工具类,请参考这里。
七、springboot LockRegistry
分布式锁-RedisLockRegistry源码分析[转]
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.integration.redis.util.RedisLockRegistry; import redis.clients.jedis.JedisShardInfo;@Ignore public class RedisLockTest {private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockTest.class);private static final String LOCK = "xxx.xxx";private RedisLockRegistry redisLockRegistry;@Beforepublic void setUp() {JedisShardInfo shardInfo = new JedisShardInfo("127.0.0.1");JedisConnectionFactory factory = new JedisConnectionFactory(shardInfo);redisLockRegistry = new RedisLockRegistry(factory, "test", 50L);}private class TaskA implements Runnable {@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}Lock lock = redisLockRegistry.obtain(LOCK);try {lock.lock();LOGGER.info("Lock {} is obtained", lock);Thread.sleep(10);lock.unlock();LOGGER.info("Lock {} is unlocked", lock);} catch (Exception ex) {LOGGER.error("Lock {} unlock failed", lock, ex);}}}private class TimeoutTask implements Runnable {@Overridepublic void run() {Lock lock = redisLockRegistry.obtain(LOCK);try {lock.lock();LOGGER.info("Lock {} is obtained", lock);Thread.sleep(5000);lock.unlock();LOGGER.info("Lock {} is unlocked", lock);} catch (Exception ex) {LOGGER.error("Lock {} unlock failed", lock, ex);}}}@Testpublic void test() throws InterruptedException, TimeoutException {ExecutorService service = Executors.newFixedThreadPool(2);service.execute(new TimeoutTask());service.execute(new TaskA());service.shutdown();if (!service.awaitTermination(1, TimeUnit.MINUTES)) {throw new TimeoutException();}} }