本博客为个人学习笔记,学习网站与详细见:黑马程序员Redis入门到实战 P56 - P63
目录
分布式锁介绍
基于Redis的分布式锁
Redis锁代码实现
修改业务代码
分布式锁误删问题
分布式锁原子性问题
Lua脚本
编写脚本
代码优化
总结
分布式锁介绍
在上一篇文章 Redis实战—优惠卷秒杀 中,我们通过使用锁、事务和代理对象实现了“一人一单”的优惠券秒杀功能。但我们使用的锁是基于JVM内部的锁,这导致锁的范围只能限制单个JVM的线程操作,因此在集群情况下,依然会出现超卖问题。所以我们需要设置一个锁,使其能够同时限制集群中的多个JVM线程操作,而这个锁就是分布式锁,由此引出本文。
集群情况下JVM锁的使用情况如下图。
集群情况下分布式锁的使用情况如下图。
分布式锁的实现
基于Redis的分布式锁
我们利用Redis的SET lock thread1 NX操作来模拟获取锁,即如果当前不存在lock键,则添加lock键成功,如果当前存在lock键,则添加lock键失败。我们将添加lock键的操作视为获取锁的操作,将lock键是否存在视为当前锁是否已被其他线程获取。执行语句后,通过Redis返回OK或者nil,我们可以判断是否获取锁成功。为防止宕机时无法对锁进行销毁,我们在进行SET操作时还需通过EX为键设置一个合理的时间。
Redis锁代码实现
// 接口类
public interface ILock {/** 尝试获取锁* timeoutSec 锁持有的超时时间,过期后自动释放* 返回值 true代表获取锁成功;false代表获取锁失败* */boolean tryLock(long timeoutSec);//释放锁void unlock();}// 接口实现类
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标识long threadId = Thread.currentThread().getId();// 获取锁,并添加时间Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + " ", timeoutSec, TimeUnit.SECONDS);//避免拆箱导致空指针,使用Boolean.TRUE.equals方法返回结果return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {// 释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}
}
修改业务代码
public Result seckillVoucher(Long voucherId) {//判断是否满足抢购条件...Long userId = UserHolder.getUser().getId();// 创建锁对象,根据用户ID加锁SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);// 获取锁boolean isLock = lock.tryLock(1200);// 若获取锁失败if (!isLock)return Result.fail("不允许重复下单");// 若获取锁成功try {// 获取当前代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {lock.unlock();}}
分布式锁误删问题
如上图所示,持有锁的线程1在锁的内部出现了业务阻塞,导致它的锁被超时释放。这时线程2尝试获得锁成功,然而在线程2持有锁执行过程中,线程1的业务反应过来,继续执行,而线程1业务执行完成后,进行了删除锁逻辑,此时就会把本应属于线程2的锁进行删除,这就是误删其它线程锁的情况。
解决方案:当线程创建锁时,同时为该锁添加当前线程标识,该标识由UUID随机数为前缀与线程id组合而成(为避免出现集群下两个线程的id相同的情况,因此添加UUID前缀)。当一个线程删除锁时,需要判断当前线程标识与锁标识是否一致,若一致,说明该锁由当前线程创建,可进行删除;若不一致,说明该锁由其它线程创建,不可进行删除。
对simpleRedisLock类代码优化如下。
package com.hmdp.utils;import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;import java.util.concurrent.TimeUnit;public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁,并设置标识、添加时间Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);//避免拆箱导致空指针,使用Boolean.TRUE.equals方法返回结果return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {// 获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁标识String lockID = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判断标识是否一致if(threadId.equals(lockID))stringRedisTemplate.delete(KEY_PREFIX + name);}
}
分布式锁原子性问题
如上图所示,线程1执行业务结束后,进行释放锁的操作,在对锁的标识进行判断后,开始释放锁。但是,线程1在"判断结束"到"释放锁"的期间,受到了阻塞(遇到JVM垃圾回收机制时会暂停程序,导致阻塞),这时线程2获取锁。当线程1恢复后,继续进行释放锁的操作,将会误删线程2的锁。我们前面设置了锁标识,并且要求在释放锁之前需要做一个判断,但在判断可以释放锁后,如果遇到了阻塞,将可能导致上图所示的误删操作。
解决方法:我们需要实现"判断"和"释放锁"这两条命令的原子性问题。
Lua脚本
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,能够确保多条命令执行时的原子性。Lua是一种编程语言,其基本语法可以参考网站:Lua 教程 | 菜鸟教程。这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,以保证多条redis命令的原子性,这样就可以实现拿锁、判断、删锁多条命令的原子性动作了,作为一名Java程序员这一块并不需要大家过于精通,只需要知道它有什么作用即可。
编写脚本
我们需要在resources文件中新建.lua文件(如果没有该新建项,需要下载EmmyLua插件),并在其中添加下图中的脚本内容。
代码优化
优化后的代码如下。
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;//初始化UNLOCK_SCRIPTstatic {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));//初始化返回值UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁,并设置锁标识、添加时间Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);//避免拆箱导致空指针,使用Boolean.TRUE.equals方法返回结果return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,//要求传入KEYS集合,使用Collections单元素集合工具Collections.singletonList(KEY_PREFIX + name),//线程标识ID_PREFIX + Thread.currentThread().getId());}/* @Overridepublic void unlock() {// 获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁标识String lockID = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判断标识是否一致if(threadId.equals(lockID))stringRedisTemplate.delete(KEY_PREFIX + name);}*/
}
总结
基于Redis的分布式锁实现思路
· 利用set nxex获取锁,并设置过期时间,保存线程标识
· 释放锁时先判断线程标识是否与锁标识一致,若一致则删除锁
特性
· 利用set nx满足互斥性
· 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
· 利用redis集群保证高可用和高并发特性(本文未涉及)