一个靠谱的分布式锁应该有哪些特点?
1.独占性:任何时候有且仅有一个线程持有锁
2.放死锁:有超时控制机制或撤销操作,得有个释放锁的兜底方案
3.不乱抢:不能张冠李戴,不能unlock别人加的锁
4.可重入性:自己加的锁自己还可以再次获得
基于setnx命令实现分布式锁,setnx成功返回1,失败返回0
1.带过期时间的setnx加锁,finally再次手动解锁
public void sale1(){String key="redisLock";String uuidValue= IdUtil.simpleUUID()+":"+Thread.currentThread().getId();while(!redisTemplate.opsForValue().setIfAbsent(key,uuidValue,30L, TimeUnit.SECONDS)){try {Thread.sleep(20L);} catch (InterruptedException e) {throw new RuntimeException(e);}}try {String result = (String)redisTemplate.opsForValue().get("inventory001");int inventorNum =result==null?0:Integer.parseInt(result);if(inventorNum>0){redisTemplate.opsForValue().set("inventory001",String.valueOf(--inventorNum));log.info("卖出一个商品,剩余库存:{}",inventorNum);}else {log.info("商品买完了");}} finally {redisTemplate.delete(key);}}
通过setnx命令加锁,set成功执行业务逻辑,set失败,sleep20毫秒继续抢锁。所有代码执行完,再在finally中手动释放锁。
问题:如果业务执行时间过长,锁已经自己过期了,业务代码还在执行,其它线程就能拿到锁进来了,第一个线程执行完后执行finally释放锁,就把第二个线程加的锁释放掉了。
改进:
2.判断是自己加的锁后再删除,判断和删除操作用lua脚本实现原子性
public void sale2(){String key="redisLock";String uuidValue= IdUtil.simpleUUID()+":"+Thread.currentThread().getId();while(!redisTemplate.opsForValue().setIfAbsent(key,uuidValue,30L, TimeUnit.SECONDS)){try {Thread.sleep(20L);} catch (InterruptedException e) {throw new RuntimeException(e);}}try {String result = (String)redisTemplate.opsForValue().get("inventory001");int inventorNum =result==null?0:Integer.parseInt(result);if(inventorNum>0){redisTemplate.opsForValue().set("inventory001",String.valueOf(--inventorNum));log.info("卖出一个商品,剩余库存:{}",inventorNum);}else {log.info("商品买完了");}} finally {String luaScript="if(redis.call('get',KEYS[1])==ARGV[1]) then return redis.call('del',KEYS[1]) else return 0 end";redisTemplate.execute(new DefaultRedisScript(luaScript, Boolean.class), Arrays.asList(key),uuidValue);}}
finally删除锁时判断是自己的锁后再删除。判断删除操作通过lua脚本实现原子性。
lau脚本:
if(redis.call('get',KEYS[1])==ARGV[1]) then return redis.call('del',KEYS[1]) else return 0 end
解释:如果get key的值为自己的uuidValue才执行del key,否则返回0
还存在问题:不支持可重入性
改进:
3.使用redis的hash结构实现锁的可重入性
hash结构:k k v 依次是lockName uuid:ThreadId 重入次数
加锁lua逻辑:
if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 elsereturn 0end
先判断lockName这个分布式锁是否存在:
返回0,不存在,新建属于自己的锁 lockName uuId:ThreadId 1
返回1,存在,再判断是不是自己的锁(是否存在自己的uuId:ThreadId)
返回0,不存在,最终返回0,end
返回1,存在,自己的锁自增1
解锁lau脚本:
if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 thenreturn nilelseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 thenreturn redis.call('del',KEYS[1])elsereturn 0end
判断是否有分布式锁且是自己的锁
否,返回nil
是,再将自己的锁重入次数-1,-1后重入次数是否为0
是,删除整个分布式锁
否 ,返回0,end
问题:还是没有完全实现锁的独占性,当一个线程的执行时间过长,锁自动释放,另一个线程就能拿到锁进来了。
改进:
4.每次加锁成功后后台启动一个定时任务,用来锁续期
private void renewExpire(){String script ="if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " +"return redis.call('expire',KEYS[1],ARGV[2]) " +"else " +"return 0 " +"end";String uuidValue=IdUtil.simpleUUID()+":"+Thread.currentThread().getId();new Timer().schedule(new TimerTask(){@Overridepublic void run(){if ((Boolean)redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) {renewExpire();}}},(this.expireTime * 1000)/3);}
假设expireTime是30秒,加锁成功后再调用这个方法,每10秒执行一次定时任务