前言
对于java的单进程应用来说,存在资源竞争的场景可以使用synchronized关键字和Lock来对资源进行加锁,使整个操作具有原子性。但是对于多进程或者分布式的应用来说,上面提到的锁不共享,做不到互相通讯,所以就需要分布式锁来解决问题了。
废话不多说,直接进入正题,下面结合AQS和Redis来实现分布式锁。
代码中大部分都是参考ReentrantLock来实现的,所以读者可以先去了解一下ReentranLock和AQS
参阅:
http://www.importnew.com/27477.html
http://cmsblogs.com/?p=2210
加锁
@Overrideprotected boolean tryAcquire(int acquires) throws AcquireLockTimeoutException {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, 1)) { // 标注1setExclusiveOwnerThread(current);// 如果是线程被中断失败的话,返回false,如果超时失败的话,捕获异常return tryAcquireRedisLock(TimeUnit.MILLISECONDS.toNanos(redisLockTimeout));}//可重入} else if (current == getExclusiveOwnerThread()) { //标注2int nextc = c + acquires;if (nextc < 0) {throw new Error("Maximum lock count exceeded");}setState(nextc);return true;}return false;}
下面会把进程内的锁称为进程锁,如果有更专业的描述方法的话,欢迎指出。
对上面的步骤分析:
1. 首先看标注1,通过compareAndSetState获取到进程锁,只有获取到进程锁,才有资格去竞争redis锁, 这样的好处就是对于同一个进程里面的所有加锁请求,在某一个时刻只有一个请求能去请求获取redis锁,有效降低redis的压力,总的来说就是把部分竞争交给进程自己去解决了,也就是先竞争进程锁。
2. 再看标注2,能进行到这一步,首先能确保已经获取了进程锁,但是是否一定获取了redis锁了呢,不一定,所以在tryAcquireRedisLock的过程中如果抛出异常,一定要保证使用finally代码块把进程锁释放掉,避免误以为已经同时获取了进程锁和redis锁。
获取redis锁
private final boolean tryAcquireRedisLock(long nanosTimeout) {if (nanosTimeout <= 0L) {return false;}final long deadline = System.nanoTime() + nanosTimeout;int count = 0;boolean interrupted = false;Jedis jedis = null;try {jedis = redisHelper.getJedisInstance();while (true) {nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L) {throw new AcquireLockTimeoutException();}String value = String.format(valueFormat, Thread.currentThread().getId());//避免系统宕机锁不释放,设置过期时间String response = jedis.set(lockKey, value, NX, PX, redisLockTimeout);if (OK.equals(response)) {//如果线程被中断同时也是失败的return !interrupted;}// 超过尝试次数if (count > RETRY_TIMES && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD && parkAndCheckInterrupt()) {interrupted = true;}count++;}} finally {redisHelper.returnResouce(jedis);}}final boolean parkAndCheckInterrupt() {LockSupport.parkNanos(TimeUnit.NANOSECONDS.toNanos(PARK_TIME));return Thread.interrupted();
}
分析:
1. 为了避免获取redis锁的过程无休止的运行下去,使用超时策略,如果超时了,直接返回失败
2. 如果还在有效时间内,则通过自旋不断尝试获取锁,如果超过了尝试次数,暂时挂起,让出时间片,但是不可以挂起太长的时间,几个时间片内为好。
解锁
//RedisDistributedLock.java
@Override
public void unlock() {sync.unlock();
}//Sync.java
public void unlock() {release(1);
}@Override
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {Jedis jedis = null;try {jedis = redisHelper.getJedisInstance();String value = String.format(valueFormat, Thread.currentThread().getId());jedis.eval(UNLOCK_SCRIPT, Arrays.asList(lockKey), Arrays.asList(value));} finally {redisHelper.returnResouce(jedis);}free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}
分析:
1. 可以注意到value在加锁和解锁的过程都有,这个value是用来标识锁的唯一性的,避免别的进程误删了该锁。
private final UUID uuid = UUID.randomUUID();
private final String valueFormat = "%d:" + uuid.toString();
验证
@Overridepublic void run() {SqlSession session = MybatisHelper.instance.openSession(true);try {KeyGeneratorMapper generatorMapper = session.getMapper(KeyGeneratorMapper.class);KeyFetchRecordMapper recordMapper = session.getMapper(KeyFetchRecordMapper.class);while (true) {try {lock.lock();KeyGenerator keyGenerator = generatorMapper.select(1);if (keyGenerator.getKey() >= MAX_KEY) {System.exit(0);}recordMapper.insert(new KeyFetchRecord(keyGenerator.getKey(), server));generatorMapper.increase(1, 1);session.commit();} catch (RuntimeException e) {e.printStackTrace();continue;} finally {lock.unlock();}}} finally {session.close();}}
开启5个进程,每个进程5个线程,进行获取一个key值,获取到后加1,然后记录到数据库,这个过程不要是原子的,因为把没有原子性的过程变成有原子性的过程,才证明了这个锁的有效性。
结果如下
没有重复的key,成功!