Redisson实现分布式锁
一、分布式锁使用场景
随着互联网技术的不断发展,数据量的不断增加,业务逻辑日趋复杂,在这种背景下,传统的集中式系统已经无法满足我们的业务需求,分布式系统被应用在更多的场景,而在分布式系统中访问共享资源就需要一种互斥机制,来防止彼此之间的互相干扰,以保证一致性,在这种情况下,我们就需要用到分布式锁
首先我们先来看一个小例子:
假设某商城有一个商品库存剩10个,用户A想要买6个,用户B想要买5个,在理想状态下,用户A先买走了6了,库存减少6个还剩4个,此时用户B应该无法购买5个,给出数量不足的提示;而在真实情况下,用户A和B同时获取到商品剩10个,A买走6个,在A更新库存之前,B又买走了5个,此时B更新库存,商品还剩5个,这就是典型的电商“秒杀”活动。
- 案例:
@RestController
public class HelloController {//数据库中的库存private Integer num=10;@GetMapping("/")public Integer secondsKill(int count){synchronized (num){//是否有库存if(num>0){//减库存num=num-count;}}return num;}
}
思考:将该项目启动两个实例,8080实例购买6个,8081实例购买5个此时会有什么问题
从上述例子不难看出,在高并发情况下,如果不做处理将会出现各种不可预知的后果。那么在这种高并发多线程的情况下,解决问题最有效最普遍的方法就是给共享资源或对共享资源的操作加一把锁,来保证对资源的访问互斥。在Java JDK已经为我们提供了这样的锁,利用ReentrantLock或者synchronized,即可达到资源互斥访问的目的。但是在分布式系统中,由于分布式系统的分布性,即多线程和多进程并且分布在不同机器中,也就是说一个服务可以同时启动多个实例,不同用户访问不同的实例,那么这两种锁将失去原有锁的效果,需要我们自己实现分布式锁——分布式锁。
一般我们使用分布式锁有两个场景:
- 效率:使用分布式锁可以避免不同节点重复相同的工作,这些工作会浪费资源。比如用户付了钱之后有可能不同节点会发出多封短信。
- 正确性:加分布式锁同样可以避免破坏正确性的发生,如果两个节点在同一条数据上面操作,比如多个节点机器对同一个订单操作不同的流程有可能会导致该笔订单最后状态出现错误,造成损失。
Redis 因为其性能好,实现起来分布式锁简单,所以让很多人都对基于 Redis 实现的分布式锁十分青睐。
提示
除了能使用 Redis 实现分布式锁之外,Zookeeper 也能实现分布式锁。但是项目中不可能仅仅为了实现分布式锁而专门引入 Zookeeper ,所以,除非你的项目体系中本来就有 Zookeeper(来实现其它功能),否则不会单独因为分布式锁而引入它
二、使用Redis的Setnx命令实现分布式锁
-
步骤分析
1、每次用户请求下单时,就在redis中设置一个键值对,如果设置成功,就执行下单操作流程 2、如果下单失败,就让你等待,等到前面的人下单完成后将该键删除,你再下单
1、SETNX 命令
早期,SETNX 是独立于 SET 命令之外的另一条命令。它的意思是 SET if Not eXists,即,在键值对不存在的时候才能设值成功。
注意:SETNX 命令的价值在于:它将 判断
和 设值
两个操作合二为一,从而避免了 查查改改
的情况的出现。
后来,在 Redis 2013 年推出的 2.6.12 版本中,Redis 为 SET 命令官方提供了 NX 选项,使得 SET 命令也能实现 SETNX 命令的功能。其语法如下:
SET <key> <value> [EX seconds] [PX milliseconds] [NX | XX]
EX 值的是 key
的存活时间,单位为秒。PX 与 EX 作用一样,唯一的不同就是后者的单位是微秒(使用较少)。
NX 和 XX 作用是相反的。NX 表示只有当 key『不存在时』才会设置其值;XX 表示当 key
存在时才设置 key
的值。
在 “升级” 了 SET 命令之后,Redis 官方说:“由于 SET 命令选项可以替换 SETNX,SETEX,因此在 Redis 的将来版本中,这二个命令可能会被弃用并最终删除”。
所以,现在我们口头所说的 SETNX 命令,并非单指 SETNX 命令,而是包括带 NX 选项的 SET 命令(甚至以后就没有 SETNX 命令了)
2、SETNX 的使用
在使用 SETNX
操作实现分布式锁功能时,需要注意以下几点:
- 这里的『锁』指的是 Redis 中的一个约定的键值对。谁能创建这个键值对,就意味着谁拥有这整个『锁』。
- 使用
SETNX
命令获取『锁』时,如果操作返回结果是 0(表示 key 已存在,设值失败),则意味着获取『锁』失败(该锁被其它线程先获取),反之,则设值成功,表示获取『锁』成功。- 如果这个 key 不存在,SETNX 才会设置该 key 的值。此时 Redis 返回 1 。
- 如果这个 key 存在,SETNX 则不会设置该 key 的值。此时 Redis 返回 0 。
- 为了防止其它线程获得『锁』之后,有意或无意,长期持有『锁』而不释放(导致其它线程无法获得该『锁』)。因此,需要为 key 设置一个合理的过期时间。
- 当成功获得『锁』并成功完成响应操作之后,需要释放『锁』(可以执行 DEL 命令将『锁』删除)。
在代码层面,与 Setnx 命令对应的接口是 ValueOperations 的 setIfAbsent 方法
- 示例代码一:
@RestController
public class HelloController {@Autowiredprivate StringRedisTemplate redisTemplate;@SneakyThrows@GetMapping("/add")public String addGoods(String goodsname) {String message="";ValueOperations<String, String> operations = redisTemplate.opsForValue();//在redis中设置一个键,表示有人正在操作库存boolean flag = operations.setIfAbsent(goodsname, "y",30, TimeUnit.SECONDS);if (flag) {Thread.sleep(10000);try {message= "修改了商品信息";} catch (Exception e) {//删除redis中的键redisTemplate.delete(goodsname);}}else{message= "其他人正在使用,请稍后重试";}return message;}
}
- 示例代码二:
@SneakyThrows@GetMapping("/add2")public String addGoods2(String goodsname) {String message = "";ValueOperations<String, String> operations = redisTemplate.opsForValue();//在redis中设置一个键,表示有人正在操作库存,返回false表示正在有人操作此时会失败//如果失败了隔两秒重试一次while (!operations.setIfAbsent(goodsname, "y", 30, TimeUnit.SECONDS)) {//睡100 毫秒,继续取set 看看是否成功System.out.println(Thread.currentThread().getName() + ":获取锁失败");Thread.sleep(1000);}try {message= "修改了商品信息"} catch (Exception e) {//删除redis中的键redisTemplate.delete(goodsname);}return message;}
开启两个不同的浏览器发请求测试
3、SETNX命令的问题
a、死锁问题
假设线程1通过SETNX获取到锁并且正常执行然后释放锁那么一切ok,其它线程也能获取到锁。但是线程1现在"耍脾气"了,线程1抱怨说"工作太久有点累需要休息一下,你们想要获取锁等着吧,等我把活干完你们再来获取锁"。此时其它线程就无法向下继续执行,因为锁在线程1手中。这种长期不释放锁情况就有可能造成死锁。
为了防止像线程1这种"耍脾气"的现象发生,我们可以设置key的过期时间来解决。设置过期时间过后其它线程可不会惯着线程1,其它线程表示你要休息可以,休息了指定时间把锁让出来然后拍拍屁股走人,没人惯着你。
上锁时,设置的超时自动删除时长(3 秒),设置多长合适?万一设置短了怎么办?如果设置短了,在业务逻辑执行完之前时间到期,那么 Redis 自动就把键值对给删除了,即,把锁给释放了,这不符合逻辑。
b、SETNX误删情况
-
情况一
设置过期时间线程1被治得服服帖帖,此时线程1又开始不当人了。线程1想既然你抢我得锁,等你获得锁后我就将锁删除毕竟我还要有备用钥匙,让你也锁不住,让其它线程也执行。
线程1休息的时间超过了过期时间,此时锁会自动释放。线程2现在脱颖而出抢到了锁然后开心的继续执行。但是现在线程1醒了,发现线程2抢走了锁。线程1表示小子胆挺肥啊,敢抢我的锁,等我执行完了就将你锁删除,让其它"哥们"也进来。此时就会发生蝴蝶效应,线程1删除了线程2的锁,线程2删除了线程3的锁,直到最后一个"哥们:wc,我锁了?"。当然线程是无感知,其实线程1乃至其它线程都不知道删除的是别人的锁,全部线程都以为删除的是自己的锁。直到最后一个线程无锁可删。
这种误删锁的情况让锁的存在荡然无存,本来应该串行执行的线程,在一定程度上都开始并发执行了。
那么误删情况该如何解决了?我们可以给锁加上线程标识,只有锁是当前线程的才能删除,否则不能删除。在添加key的时候,key的value存储当前线程的标识,这个标识只要保证唯一即可。可以使用UUID或者一个自增数据。在删除锁的时候,将线程标识取出来进行判断,如果相同就表示锁是自己的能够删除,否则不能删除。
获取锁
//获取线程前缀,同时也是线程表示。通过UUID唯一性 private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-"; //与线程id组合 public boolean tryLock(long timeOut) {//获取线程idString id =ID_PREFIX+ Thread.currentThread().getId();//获取锁Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, id , timeOut, TimeUnit.SECONDS);return Boolean.TRUE.equals(absent);}
释放锁:
public void unLock() {//获取存储的线程标识String value = stringRedisTemplate.opsForValue().get(key);//当前线程的线程标识String id =ID_PREFIX+ Thread.currentThread().getId();//线程标识相同则删除否,则不删除if (id.equals(value)){redisTemplate.delete(key);}}
-
情况二
加入线程标识后,线程一不能随便删除其它线程的锁,但是线程1又开始不当人了。线程1表示判断线程标识和释放锁的操作我可以分开执行,这又不是一个原子性的操作,线程1干完活以后就准备去释放锁,当线程1判断锁是自己的后表示开锁太累了,休息一会在开。此时其它线程就想无所谓,反正过期时间一到锁就会自动释放。但是线程1已经判断了锁是自己的以后就不会执行判断锁的操作(线程1已经执行了if判断,只是没有执行方法体),当线程2获得锁后,线程1仍然能删除线程2的锁。
解锁时,`查 - 删` 操作是 2 个操作,由两个命令完成,非原子性。 redis底层执行这个setnx不是一个原子操作,而是有两步操作完成的,首先set hello world,然后第二步设置key的过期时间: expire hello 3,那么如果执行完第一步刚好redis宕机了,此时key一直保存到redis。永远也无法删除了。
三、Redisson实现分布式锁【日常使用】
1、Redisson 如何解决上述问题
-
Redisson 解决 “过期自动删除时长” 问题的思路和方案
Redisson 中客户端一旦加锁成功,就会启动一个后台线程(惯例称之为 watch dog 看门狗)。watch dog 线程默认会每隔 10 秒检查一下,如果锁 key 还存在,那么它会不断的延长锁 key 的生存时间,直到你的代码中去删除锁 key 。
-
Redisson 解决setnx和 解锁的非原子性 问题的思路和方案
Redisson 的上锁和解锁操作都是通过 Lua 脚本实现的。Redis 中 执行 Lua 脚本能保证原子性,整段 Lua 脚本的执行是原子性的,在其执行期间 Redis 不会再去执行其它命令
2、Redisson 的使用
-
添加依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.15.6</version> </dependency>
-
配置 RedissonConfig
/*** redission配置类*/ @Configuration public class RedissonConfig {@Value("${spring.redis.host}")private String host;@Value("${spring.redis.port}")private String port;@Value("${spring.redis.database}")private int database;@Beanpublic RedissonClient getRedissonClient(){String address="redis://"+host+":"+port; //拼接redis地址Config config = new Config();config.useSingleServer().setAddress(address).setDatabase(database).setKeepAlive(true);return Redisson.create(config);} }
a、基本用法
@RestController
public class HelloController {@Autowiredprivate RedissonClient redissonClient;@SneakyThrows@GetMapping("/add")public String addGoods(String goodsname) {String message = "";//获取一把锁对象,一般将需要上锁的类名+方法名做为锁的键RLock rLock=redissonClient.getLock("HelloController.addGoods");try{rLock.lock(); //加锁,其实就是设置一个key-value 默认加的锁都是30s}catch (Exception e){e.printStackTrace();return "上锁失败,请稍后重试";}try{Thread.sleep(5000);//睡五秒message = "执行了添加操作";}catch (Exception e){e.printStackTrace();return "上锁失败,请稍后重试";}finally {//判断是否有锁对象,以及是否是同一个锁if (rLock.isLocked() && rLock.isHeldByCurrentThread()){rLock.unlock(); //解锁}}return message;}
}
b、批量加锁
RedissonMultiLock 对象可以将多个 RLock 对象关联为一个联锁,每个 RLock 对象实例可以来自于不同的 Redisson 实例
@RestController
public class HelloController {@Autowiredprivate RedissonClient redissonClient;@SneakyThrows@GetMapping("/add")public String addGoods(String goodsname) {String message = "";//批量锁对象RLock multiLock=null;try{//保存批量锁对象List<RLock> rLockList=new ArrayList<RLock>();for(String skid :skuList){RLock rLock=redissonClient.getLock("lock:"+skid);rLockList.add(rLock);}//将锁集合转为数组RLock[] arrayLock = rLockList.stream().toArray(RLock[]::new);//将多个RLock整合为一个大锁对象multiLock=redissonClient.getMultiLock(arrayLock);multiLock.lock(); //上锁}catch (Exception e){e.printStackTrace();return "上锁失败,请稍后重试";}try{message="执行了批量操作"}catch (Exception e){return "批量操作失败"}finally{//判断是否有锁对象,以及是否是同一个锁if (multiLock.isLocked() && multiLock.isHeldByCurrentThread()){multiLock.unlock(); //解锁}}return message;}
}
3、Redisson分析
-
你通过 RedissonClient 拿到的锁都是 “可重入锁”
这里的 “可重入” 的意思是:持有锁的线程可以反复上锁,而不会失败,或阻塞等待;锁的非持有者上锁时,则会失败,或需要等待。当然,如果你对一个锁反复上锁,那么逻辑上,你应该对它执行同样多次的解锁操作
@Autowired private RedissonClient redissonClient; @Test void contextLoads() {RLock rLock = redissonClient.getLock("hello");rLock.lock(); System.out.println("lock success!");rLock.lock(); System.out.println("lock success!");rLock.lock(); System.out.println("lock success!");rLock.unlock();rLock.unlock();rLock.unlock(); }
使用 lock( )上锁时由于你没有指定过期删除时间,所以,逻辑上只有当你调用 unlock( )之后,Redis 中代表这个锁的键值对才会被删除。当然你也可以在 lock 时指定超时自动解锁时间:
rLock.lock(3,TimeUnit.SECONDS); //3秒钟 自动解锁
这种情况下,如果你有意或无意没有调用 unlock 进行解锁,那么 3秒后,Redis 也会自动删除代表这个锁的键值对
-
当两个不同的线程对同一个锁进行 lock 时,第二个线程的上锁操作会失败
而上锁失败的默认行为是阻塞等待,直到前一个线程释放掉锁。这种情况下,如果你不愿意等待,那么你可以调用
tryLock()
方法上锁。tryLock 上锁会立刻(或最多等一段时间)返回,而不会一直等(直到所得持有线程释放)。// 拿不到就立刻返回 rLock.tryLock(); // 拿不到最多等 1 秒。1 秒内始终拿不到,就返回 rLock.tryLock(1, TimeUnit.SECONDS); // 尝试在1s内去拿锁,拿不到就返回false,拿到了10s自动释放这个锁 rLock.tryLock(1, 10, TimeUnit.SECONDS);
-
Redisson 在上锁时,向 Redis 中添加的键值对时,通过hset设置k-v的
其中键就是hello,当然你也可以是其它的值,那么这个值里面的键是redisson内部帮我们生成的UUID +“:” +thread-id 拼接而成的字符串;值是这个锁的上锁次数,默认是1
Redisson 如何保证线程间的互斥以及锁的重入(反复上锁)?
因为代表着锁的键值对的键中含有线程 ID ,因此,当你执行上锁操作时,Redisson 会判断你是否是锁的持有者,即,当前线程的 ID 是否和键值对中的线程 ID 一样。
如果当前执行 lock 的线程 ID 和之前执行 lock 成功的线程的 ID 不一致,则意味着是 “第二个人在申请锁” ,那么就 lock 失败;如果 ID 是一样的,那么就是 “同一个” 在反复 lock,那么就累加锁的上锁次数,即实现了重入。
4、watch dog 自动延期机制
如果在使用 lock/tryLock 方法时,你指定了超时自动删除时间,如:hello.tryLock(10, TimeUnit.SECONDS);Redis 会自动10s后将当前线程锁的键值对给删除掉,不会自动续期,而且如果你的业务执行时间过长,超过了key的过期时间, 而你在执行完业务之后也去删除这个key,就会报错,提示错误为:当前线程不能删除这个key,因为你删的key不是你之前的key,而是另外一个线程给redis重新设置的key。所以设置带过期时间的hello.tryLock(10, TimeUnit.SECONDS)键值对时,时长一定要超过业务执行的时长
如果,你在使用 lock/tryLock 方法时,没有指定超时自动删除时间,那么,就完全依靠你的手动删除( unlock 方法 ),那么,这种情况下你会遇到一个问题:如果你有意或无意中忘记了 unlock 释放锁,那么锁背后的键值对将会在 Redis 中长期存在!
一定要注意Redisson 看门狗(watch dog)在指定了加锁时间时,是不会对锁时间自动续租的。
在 watch dog 机制中,有一个被 “隐瞒” 的细节:表面上看,你的 lock 方法没有指定锁定时长,但是 Redisson 去 Redis 中添加代表锁的键值对时,它还是添加了自动删除时间。默认 30 秒(可配置)。这意味着,如果,你没有主动 unlock 进行解锁,那么这个代表锁的键值对也会在 30 秒之后被 Redis 自动删除,但是实际上,并没有。这正是因为 Redisson 利用 watch dog 机制对它进行了续期( 使用 Redis 的 expire 命令重新指定新的过期时间)。也就是内部有一个定时任务,每隔10s会会自动启动定时任务,该任务重新给key续期30s。
5、使用aop统一实现分布式锁
-
添加aop依赖
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId> </dependency>
-
修改yml
spring:aop:auto: true #开启aop
-
创建自定义注解
/*** 分布式锁自定义注解,表示哪些方法需要实现分布式锁*/ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface ReCommit {}
-
创建apo类
/*** 防止重复提交aop*/ @Component @Aspect public class ReCommitAop {@Resourceprivate RedissonClient redissonClient;//配置节入点@Pointcut("execution(* com.woniu.controller.*.*(..))")public void pott(){ }/*** 环绕通知* @param point* @return*/@Around("pott()")public Object aroundPrintLog(ProceedingJoinPoint point){Object result=null;RLock rLock=null;try{//获得方法信息MethodSignature methodSignature = (MethodSignature) point.getSignature();//获得了要执行的方法的字节码对象Method method = methodSignature.getMethod();//获得方法中所有的形参Object[] args = point.getArgs();//判断方法上是否有注解ReCommit reCommit=method.getDeclaredAnnotation(ReCommit.class);if(reCommit!=null) { //存在,需要进行上锁String key=point.getSignature().toShortString();rLock= redissonClient.getLock(key);//加锁if (!rLock.tryLock()) {return new ResponseResult<Object>(6002,"数据使用中,请稍后再试。。。");}}//执行方法result = point.proceed(args);}catch (Throwable throwable) {throwable.printStackTrace();}finally{// 判断是否有锁对象,以及是否是同一个锁if (rLock!=null && rLock.isLocked() && rLock.isHeldByCurrentThread()){rLock.unlock(); //解锁}}return result;} }