根据源码梳理Redisson的可重入、锁重试以及看门狗机制原理

Redisson可重入的原理

在上篇文章中我们已经知道了除了需要存储线程标识外,会额外存储一个锁重入次数。那么接下来我们查看使用Redisson时,Redisson的加锁与释放锁流程图。

当开始获取锁时,会先判断锁是否存在,如果存在再进行判断锁标识是否是当前线程,如果是那么value值 +1 代表锁重入次数加 1 并重新设置过期时间,如果不存在,那么直接获取锁并存储在Redis中,设置超时时间。如果需要释放锁,仍然是先获取锁标识是否和当前线程一致,如果不一致那么说明锁已经超时释放,如果一致则对value值-1后再判断value值是否为0,如果不是说明进行了锁重入,那么重置锁的超时时间即可。如果已经是0了,那么直接释放锁即可。

以上复杂的业务使用Java已经不能满足了,因此这部分业务在Redisson中使用了Lua脚本实现。

获取锁的Lua脚本

释放锁的Lua脚本

接下来我们可以进行源码跟踪查看可重入锁的实现

@Test
public void test01() throws Exception {//获取锁,指定锁名称,可重入RLock lock = redissonClient.getLock("lock");//三个参数分别是,最大获取锁等待时间(期间会重试),锁自动释放时间,时间单位boolean flag = lock.tryLock(1, 10, TimeUnit.SECONDS);if (flag){try{System.out.println("获取锁成功");}finally {lock.unlock();}}
}

查看最基本的tryLock()方法

接下来我会分批给出tryLock()方法的源码

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {//将最大获取锁等待时间转化为毫秒long time = unit.toMillis(waitTime);//获取当前时间的毫秒值long current = System.currentTimeMillis();//获取线程标识long threadId = Thread.currentThread().getId();//尝试获取锁Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquired// ...
}

接下来查看tryAcquire()方法,查看如何获取到锁

这里做了leaseTime值判断,实际上是判断调用者是否传递了锁释放时间,如果没有传入锁施放时间那么默认值为-1,这里我们查看任意一个tryLockInnerAsync()方法都可以。

可以看到tryLockInnerAsync()方法就是通过lua脚本来实现获取锁并进行重入的,有一点是和我们上面图片中的不同,那就是获取锁失败后会返回当前锁的过期时间。

接下来查看unlock()方法的源码

public void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}
}

查看unlockAsync()方法,追踪到最底层代码如下

至此,我们可以总结出,Redisson的可重入原理核心就是利用哈希结构去记录获取锁的线程与重入次数。

Redisson锁重试机制

理解锁重试机制之前,我们先查看释放锁的Lua脚本中有这么一行代码,释放锁的同时还会发布一条释放锁信息,方便其他线程开始获取锁。待会追踪源码时我们会需要用到该信息。

那么接下来依然是追踪源码,上文中获取锁失败的源码如下

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if ((redis.call('exists', KEYS[1]) == 0) " +"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

这里会返回一个当前锁的施放时间,那么回退上一层查看我们得到过期释放时间后要去做什么

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {//返回的值赋值给该变量,该变量是一个Future,因为是异步执行lua脚本,因此无法立刻拿到返回值RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}//暂时先不用理解下述代码,只需要知道这里返回了一个剩余过期时间即可。CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);
}

接着回退查看上一层代码接着做了什么事情

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

这里get()方法获取到了返回的有效期值。接着回退上一层代码就到达了tryLock()方法,具体代码注释如下

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {//将最大获取锁等待时间转化为毫秒long time = unit.toMillis(waitTime);//获取当前时间的毫秒值long current = System.currentTimeMillis();//获取线程标识long threadId = Thread.currentThread().getId();//尝试获取锁,如果获取锁失败ttl应该是一个具体值,而不是nullLong ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {//获取到锁,直接返回truereturn true;}//计算最大等待时间减去第一次尝试获取锁的时间,得到剩余等待时间time -= System.currentTimeMillis() - current;if (time <= 0) {//如果不存在剩余时间acquireFailed(waitTime, unit, threadId);return false;}//如果还存在剩余时间,接着获取当前时间current = System.currentTimeMillis();//订阅释放锁的信号(就是开头说的释放锁时会发布的那条信息),这里也是异步执行,因此返回类型为FutureCompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);try {//获取等待结果,如果超过了剩余最大等待时间会抛出异常,执行TimeOutException中的catch代码subscribeFuture.get(time, TimeUnit.MILLISECONDS);} catch (TimeoutException e) {if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. " +"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {subscribeFuture.whenComplete((res, ex) -> {if (ex == null) {unsubscribe(res, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;} catch (ExecutionException e) {acquireFailed(waitTime, unit, threadId);return false;}//走到这一步说明还存在剩余时间并获取到了锁释放信息try {//更新剩余时间time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();//再次尝试获取锁,如果失败获取到ttl存活时间ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}//执行到这说明又没有获取到锁,更新剩余时间time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {//如果锁剩余时间小于当前线程剩余等待时间,再次获取锁,最大等待时间为锁的释放时间commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {//如果锁的剩余时间大于当前线程剩余等待时间,再次获取锁,最大等待时间为当前线程的剩余时间commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}//如果还没有获取到锁,while循环执行上述代码time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);}
}

这里采用了Future来接收锁释放信息,对于CPU比较友好,不是持续不断的尝试获取锁,没有造成资源浪费。

Redisson看门狗机制

在刚刚锁重试机制中,在tryLock()方法中存在一个重要变量ttl,该变量记录了锁剩余存活时间。其他线程会根据ttl到期后开始尝试获取锁,那么这就存在一个问题,如果获取到锁的线程阻塞,导致ttl到期被删除,此时就会有两个线程同时获取到了锁。为了解决这个问题,Redisson存在一个看门狗机制。在叙述锁重试机制时,有一段代码我们没有进行解释,具体代码如下

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {//返回的值赋值给该变量,该变量是一个Future,因为是异步执行lua脚本,因此无法立刻拿到返回值RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}//回调函数,当拿到返回值ttlRemainingCompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquired//说明获取锁成功if (ttlRemaining == null) {//如果我们配置了锁的过期时间,那么将其转化为毫秒后覆盖掉默认的锁释放时间(同时也会取消看门狗机制)if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {//如果没有指定锁的过期施放时间,那么定时将锁的有效时间进行更新scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);
}

internalLockLeaseTime属性存在一个默认值,如果我们不指定锁的过期时间,那么就是使用Redisson中的默认值,具体源码如下

public class RedissonLock extends RedissonBaseLock {protected long internalLockLeaseTime;protected final LockPubSub pubSub;final CommandAsyncExecutor commandExecutor;public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;//获取默认的锁过期时间,又叫获取看门狗的过期时间,默认是30sthis.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}
}

接着我们查看定期更新锁的有效期方法scheduleExpirationRenewal(),具体源码如下

protected void scheduleExpirationRenewal(long threadId) {//该对象主要存储了两个属性,线程标识,Timeout对象(一个定时任务),我们可以理解为一个锁对象ExpirationEntry entry = new ExpirationEntry();//MAP对象存储的是不同业务中的不同锁对象。getEntryName()实际上获取到的是getLock(name)方法中的name//如果第一次获取,返回值为null,如果map中已经存在该业务类型的锁那么返回的是entry对象ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {//说明map中以及存在该业务类型的锁了,更新该业务锁的线程标识idoldEntry.addThreadId(threadId);} else {//第一次向map中存放该业务类型的锁,更新该业务锁的线程标识identry.addThreadId(threadId);try {//续约方法renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}
}

接下来我们查看续约方法renewExpiration()

private void renewExpiration() {//获取业务锁对象ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}//这里有三个参数,第一个是定时任务需要执行的逻辑代码,第二个是延时执行时间,第三个延时执行时间单位//延时执行时间是锁的过期释放时间的三分之一Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {//获取锁对象ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}//获取锁对象中的线程标识Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}//刷新锁的有效时间CompletionStage<Boolean> future = renewExpirationAsync(threadId);//刷新锁的有效时间结束后,调用下面方法future.whenComplete((res, e) -> {//如果刷新锁的有效时间抛出异常,抛出日志并将锁对象从map中移除if (e != null) {log.error("Can't update lock {} expiration", getRawName(), e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}//如果刷新成功if (res) {// 递归调用本方法renewExpiration();} else {//如果返回值为null,那么就取消定时任务cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}

renewExpirationAsync()方法作用就是刷新锁的有效时间,具体源码如下

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getRawName()),internalLockLeaseTime, getLockName(threadId));
}

以上就是在获取锁成功时,Redisson内部维护一个map集合存储当前系统中的锁对象,并向锁对象中设置锁的有效期更新的定时任务。在释放锁的时候,需要对该定时任务取消,接下来我们查看unlock()方法中取消定时任务的代码

public RFuture<Void> unlockAsync(long threadId) {RFuture<Boolean> future = unlockInnerAsync(threadId);//当取消锁成功时,执行该回调方法CompletionStage<Void> f = future.handle((opStatus, e) -> {//取消续约定时任务cancelExpirationRenewal(threadId);if (e != null) {throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);
}

取消续约定时任务的具体代码如下

protected void cancelExpirationRenewal(Long threadId) {//根据key获取到锁对象ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());//如果不存在锁,那么直接返回if (task == null) {return;}if (threadId != null) {//移除锁对象中的线程标识task.removeThreadId(threadId);}if (threadId == null || task.hasNoThreads()) {Timeout timeout = task.getTimeout();if (timeout != null) {//如果锁对象中的定时任务不为空,那么就取消timeout.cancel();}//移除map中的锁对象EXPIRATION_RENEWAL_MAP.remove(getEntryName());}
}

对锁重试与看门狗机制进行一个流程总结 

总结 

可重入:利用hash结构记录线程id和重入次数。

可重试:利用信号量和PubSub功能实现等待、唤醒、获取锁失败的重试机制。

超时续约:利用watchDog,每隔一段时间,重置超时时间。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/197470.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法通关村第十六关-黄金挑战滑动窗口与堆的结合

大家好我是苏麟 , 今天带来一道小题 . 滑动窗口最大值 描述 : 给你一个整数数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。 返回 滑动窗口中的最大值 。 题目 : …

EM32DX-C4【C#】

1外观&#xff1a; J301 直流 24V 电源输入 CAN0 CAN0 总线接口 CAN1 CAN1 总线接口 J201 IO 接线段子 S301-1、S301-2 输出口初始电平拨码设置 S301-3~S301-6 模块 CAN ID 站号拨码开关 S301-7 模块波特率拨码设置 S301-8 终端电阻选择开关 2DI&#xff1a; 公共端是…

XUbuntu22.04之OBS30.0设置录制音频降噪(一百九十六)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

LLM之Agent(三):HuggingGPT根据用户需求自动调用Huggingface合适的模型

​ 浙大和微软亚洲研究院开源的HuggingGPT&#xff0c;又名JARVIS&#xff0c;它可以根据用户的自然语言描述的需求就可以自动分析需要哪些AI模型&#xff0c;然后去Huggingface上直接调用对应的模型&#xff0c;最终给出用户的解决方案。 一、HuggingGPT的工作流程 它的…

Linux驱动开发学习笔记3《新字符设备驱动实验》

目录 一、新字符设备驱动原理 1.分配和释放设备号 2.新的字符设备注册方法 &#xff08;1&#xff09; 字符设备结构 &#xff08;2&#xff09;cdev_init函数 &#xff08;3&#xff09; cdev_add函数 &#xff08;4&#xff09;cdev_del 函数 二、自动创建设备节点 …

网络安全(二)-- Linux 基本安全防护技术

4.1. 概述 安全防护基础主要是会用Linux系统&#xff0c; 熟悉Linux基本操作命令。 在这个章节中&#xff0c;我们主要探讨自主访问控制&#xff08;许可位、ACL&#xff09;、文件属性、 PAM技术、能力机制等。 4.1.1. 补充命令 本章节中&#xff0c;涉及一些新的命令&#…

【C++】STL简介(了解)【STL的概念,STL的历史缘由,STL六大组件、STL的重要性、以及如何学习STL、STL的缺陷的讲解】

这里写自定义目录标题 一、什么是STL二、STL的版本1. 原始版本2. P. J. 版本3. RW版本★ 4. SGI版本 三、STL的六大组件四、STL的重要性五、如何学习STL六、STL的缺陷 一、什么是STL STL ( standard template libaray - 标准模板库 )&#xff1a;是C标准库 的重要组成部分&…

红队攻防之隐匿真实IP

0x01 前言 安全态势日益严峻&#xff0c;各大组织普遍采用了综合的安全产品&#xff0c;如态势感知系统、WAF和硬件防火墙等&#xff0c;这些措施加大了渗透测试和攻防演练的难度。即使是一些基本的漏洞验证、端口扫描&#xff0c;也可能导致测试IP被限制&#xff0c;从而阻碍…

DFT新手教程:VASP中ISIF取值设置

新手初学VASP计算时首先接触到的就是结构优化的计算任务。 在结构优化中&#xff0c;INCAR中的关键参数包括 IBRION &#xff0c;NSW&#xff0c;ISIF&#xff0c;EDIFF和EDIFFG 各个参数均可在vaspwiki查到可设置的参数以及该参数所具有的设置的含义。 https://www.vasp.at/…

佛罗里达大学利用神经网络,解密 GPCR-G 蛋白偶联选择性

内容一览&#xff1a;G 蛋白偶联受体 (GPCRs) 是一种将细胞膜外的刺激&#xff0c;传递到细胞膜内的跨膜蛋白&#xff0c;广泛参与到人体生理活动当中。近日&#xff0c;佛罗里达大学的研究者测定了 GPCRs 和 G 蛋白的结合选择性&#xff0c;并开发了预测二者选择性的算法&…

JVM简单了解内存溢出

JVM oracle官网文档&#xff1a;https://docs.oracle.com/en/java/javase/index.html 什么是JVM JVM(Java Virtual Machine)原名Java虚拟机&#xff0c;是一个可以执行Java字节码的虚拟计算机。它的作用是在不同平台上实现Java程序的跨平台运行&#xff0c;即使在不同的硬件…

MX6ULL学习笔记 (七) 中断实验

前言&#xff1a; 本章我们就来学习一 下如何在 Linux 下使用中断。在linux内核里面使用中断&#xff0c;不同于我们以往在别的裸机开发一样&#xff0c;需要进行各种寄存器的配置&#xff0c;中断使能之类的&#xff0c;而在Linux 内核中&#xff0c;提供了完善的中断框架…

AWS攻略——使用中转网关(Transit Gateway)连接同区域(Region)VPC

文章目录 环境准备创建VPC 配置中转网关给每个VPC创建Transit Gateway专属挂载子网创建中转网关创建中转网关挂载修改VPC的路由 验证创建业务Private子网创建可被外网访问的环境测试子网连通性Public子网到Private子网Private子网到Private子网 知识点参考资料 在《AWS攻略——…

java日历功能

java 日历功能 功能概述java代码打印结果 功能概述 输入年份和月份&#xff0c;打印该月份所有日期&#xff0c;头部信息为星期一至星期日 java代码 package com.java.core.demoTest; import java.util.Calendar; import java.util.Scanner;// 打印日历 public class Calend…

计算机网络之IP篇

目录 一、IP 的基本认识 二、DNS 三、ARP 四、DHCP 五、NAT 六、ICMP 七、IGMP 七、ping 的工作原理 ping-----查询报文的使用 traceroute —— 差错报文类型的使用 八、断网了还能 ping 通 127.0.0.1 吗&#xff1f; 8.1、什么是 127.0.0.1 &#xff1f; 8.2、为…

小程序1rpx边框不完美

问题展示 原因 rpx类似rem&#xff0c;渲染后实际转换成px之后可能存在小数&#xff0c;在不同的设备上多多少少会存在渲染的问题。而1rpx的问题就更加明显&#xff0c;因为不足1个物理像素的话&#xff0c;在IOS会进行四舍五入&#xff0c;而安卓好像统一向上取整&#xff0c…

人工智能|网络爬虫——用Python爬取电影数据并可视化分析

一、获取数据 1.技术工具 IDE编辑器&#xff1a;vscode 发送请求&#xff1a;requests 解析工具&#xff1a;xpath def Get_Detail(Details_Url):Detail_Url Base_Url Details_UrlOne_Detail requests.get(urlDetail_Url, headersHeaders)One_Detail_Html One_Detail.cont…

[MySQL--基础]多表查询

前言 ⭐Hello!这里是欧_aita的博客。 ⭐今日语录&#xff1a;生活中最大的挑战就是发现自己是谁。然后&#xff0c;坚定不移地成为那个人。 ⭐个人主页&#xff1a;欧_aita ψ(._. )>⭐个人专栏&#xff1a; 数据结构与算法 MySQL数据库 多表查询 前言多表关系概述&#x1f…

为什么 SQL 不适合图数据库

背景 “为什么你们的图形产品不支持 SQL 或类似 SQL 的查询语言&#xff1f;” 过去&#xff0c;我们的一些客户经常问这个问题&#xff0c;但随着时间的推移&#xff0c;这个问题变得越来越少。 尽管一度被忽视&#xff0c;但图数据库拥有无缝设计并适应其底层数据结构的查询…

四层LVS与七层Nginx负载均衡的区别

一、四层负载均衡与七层负载均衡&#xff1a; &#xff08;1&#xff09;四层负载均衡&#xff1a; 四层负载均衡工作在 OSI 七层模型的第四层&#xff08;传输层&#xff09;&#xff0c;指的是负载均衡设备通过报文中的目标IP地址、端口和负载均衡算法&#xff0c;选择到达的…