五、分布式锁-redission

源码仓库地址:git@gitee.com:chuangchuang-liu/hm-dingping.git

1、redission介绍

目前基于redis的setnx特性实现的自定义分布式锁仍存在的问题:

问题描述
重入问题同一个线程无法多次获取统一把锁。当方法A成功获取锁后,调用方法B,方法B也要获取锁,此时由于锁是不可重入的,也就是被方法A占用着,此时就产生了死锁的问题
不可重试自定义分布式锁无失败重试机制
超时释放锁的超时释放虽然可以避免死锁问题,但确实也可能存在业务执行时间比较长的情况,那这种情况下就仍存在安全隐患问题
主从一致性如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

什么是Redission?
Redission是一个用于Java的Redis客户端,它提供了丰富的特性,包括内存数据网格的功能。它支持同步/异步/RxJava/Reactive API,拥有超过50种基于Redis的Java对象和服务。Redission的使用非常简单,没有学习曲线,您不需要了解任何Redis命令就可以开始使用。(GitHub - redisson/redisson, Redisson官网)
Redission可以让Java应用更方便地访问和操作Redis数据存储,适合于需要高性能和高并发的应用场景。

2、快速开始

  1. 导入依赖
<!--redission-->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
  1. Redission配置客户端
@Configuration
public class RedisConfig {@Beanpublic RedissonClient redisClient(){Config config = new Config();// 可以用"rediss://"来启用SSL连接config.useSingleServer().setAddress("redis://192.168.224.128:6379").setPassword("123456");return Redisson.create(config);}
}
  1. 使用Redission分布式锁
@Resource
private RedissionClient redissonClient;@Test
void testRedisson() throws Exception{//获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anyLock");//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);//判断获取锁成功if(isLock){try{System.out.println("执行业务");          }finally{//释放锁lock.unlock();}}
}

3、redission可重入锁原理

3.1、原理介绍

在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1 ,如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。 、

而在redission中,也支持这种可重入锁原理,是通过redis的hash数据结构实现的。其中key表示这把锁是否存在,field判断锁是被哪个线程持有,value则记录锁被持有次数。
image.png

3.2、源码剖析

  • 获取锁

其中各参数解释:
KEYS[1]:锁的名称
ARGV[1]:锁过期时间
ARGV[2]:id + “:” + threadId

"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);"

判断锁是否存在
如果不存在,则设置当前线程标识,计数器+1;设置过期时间;
如果存在。做二次判断,判断锁的持有线程是不是自己?
如果是,计数器+1,重置锁的过期时间;
如果不是,获取锁失败,返回锁的剩余过期时间

  • 释放锁

其中各参数解释:
KEYS[1]:锁的名称
KEYS[2]:订阅频道
ARGV[1]:是要发布的消息内容
ARGV[2]:锁过期时间
ARGV[3]:id + “:” + threadId

"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;"

判断锁是不是当前线程?
不是==>直接返回
是==>计数器–
二次判断,判断计数器是否大于0
大于0==>重置锁过期时间
否则==>真正释放锁

4、redission锁重试和WatchDog机制

4.1、redission是如何解决不可重试的?

源码剖析:
用户调用tryLock方法时,指定waitTime最大等待时间

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}//        return get(tryLockAsync(waitTime, leaseTime, unit));
}
  1. 计算等待时间和获取当前时间:将用户指定的等待时间转换为毫秒,并记录方法调用时的当前时间。
  2. 尝试获取锁。如果ttl为空,则获取锁成功;否则,返回的是其他线程占用锁的剩余有效时间
  3. 检查剩余等待时间。如果time小于等于0,调用acquireFailed方法返回false
  4. 订阅锁。通过subscribe方法订阅相关的锁。如果在剩余时间内未能订阅成功,处理取消订阅并调用acquireFailed方法,返回false。
  5. 循环等待锁释放消息。等待过程中会调用tryAcquire方法获取锁,如果获取成功返回true
  6. 处理锁的ttl。如果ttl大于0,返回锁被其他线程占用的剩余过期时间(ttl)。更新剩余等待时间(time)。以time和ttl中较小的值继续等待再次尝试。
  7. 再次检查等于剩余等待时间。如果小于0,调用acquireFailed方法返回false
  8. 循环结束后(要么获取锁成功,要么超过最大等待时间了),最终调用unsubscribe方法取消订阅

结论1:redission不是获取锁失败后立即进行重试,而是等待“一定时间”后再进行重试,节省了一定的CPU资源,对服务器性能有一定提升;
结论2:一定要采取调用tryLock方法携带参数waitTime的重载方法,其他重载的tryLock方法底层是不具备重试机制的。

4.2、redission是如何解决锁超时释放的-看门狗机制?

自定义分布式锁仍存在的一个问题是:锁的超时释放虽然可以避免死锁问题,但确实也可能存在业务执行时间比较长的情况,那这种情况下业务还未执行完毕,锁就被释放了,存在一定的安全隐患。
源码剖析:

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);}RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining) {// 开启任务更新过期时间scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;
}
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));}
  1. 如果没有指定leaseTime,那么底层会默认传入commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()看门狗时间
  2. 在leasetime的1/3处时间,会创建一个任务renewExpirationAsync方法来异步地更新重置锁过期时间
  3. 递归地调用自身来更新锁过期时间,直到业务处理完毕。

至此redission解决了因业务阻塞而导致锁提前释放的问题

业务执行完毕,释放锁源码剖析:

public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();// 释放锁RFuture<Boolean> future = unlockInnerAsync(threadId);future.onComplete((opStatus, e) -> {// 取消锁失效时间更新重置任务cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);return;}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}result.trySuccess(null);});return result;
}void cancelExpirationRenewal(Long threadId) {ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task == null) {return;}if (threadId != null) {task.removeThreadId(threadId);}if (threadId == null || task.hasNoThreads()) {Timeout timeout = task.getTimeout();if (timeout != null) {timeout.cancel();}// 删除递归更新锁时间任务EXPIRATION_RENEWAL_MAP.remove(getEntryName());}
}

当业务执行完毕且锁正常释放后,删除递归更新锁时间任务,避免redission一直递归创建任务更新锁过期时间

5、redission锁的MultiLock原理

为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例
此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。
1653553998403.png
为了解决该问题,redission的方案是去掉redis集群主从关系,每一个节点都是平等的。加锁逻辑是需要写入到每一个节点上才算加锁成功。这样,当某一台机器宕机了,这台机器的slave节点变为master节点,此时另一个线程趁虚而入,虽然可以正常写入,但其它机器仍会写入失败,最终结果仍是获取锁失败,从而保证了获取锁的可靠性。
1653554055048.png
MulitLock源码剖析:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {//        try {//            return tryLockAsync(waitTime, leaseTime, unit).get();//        } catch (ExecutionException e) {//            throw new IllegalStateException(e);//        }long newLeaseTime = -1;if (leaseTime != -1) {if (waitTime == -1) {newLeaseTime = unit.toMillis(leaseTime);} else {newLeaseTime = unit.toMillis(waitTime)*2;}}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}long lockWaitTime = calcLockWaitTime(remainTime);int failedLocksLimit = failedLocksLimit();List<RLock> acquiredLocks = new ArrayList<>(locks.size());for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {lockAcquired = false;}if (lockAcquired) {acquiredLocks.add(lock);} else {if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());for (RLock rLock : acquiredLocks) {RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}return true;}
  1. 遍历锁集合,调用lock.tryLock尝试获取锁。将获取结果传给变量lockAcquired
  2. 如果获取成功,将当前锁存放到acquiredLocks集合中
  3. 获取成功后,如果此时的剩余等待时间小于等于0,释放自己已获取的锁,返回false
  4. 如果获取失败,判断是否具备重试机制
    1. 没有重试,则直接返回false
    2. 有重试机制,将acquiredLocks集合清空,将iterator指针前移,重新遍历尝试。

6、结论

目前已接触的分布式锁有:

  • 可不重入锁/自定义分布式锁:

原理: 利用setnx特性、expire避免死锁、添加线程标识避免锁误删
缺点: 仍存在不可重入、失败不可重试、锁超时失效等问题

  • 可重入锁:

原理: 利用hash数据结构存储线程标识和重入次数、利用看门狗机制延续锁失效时间、利用信号量机制控制等待重试时间
缺点: 仍存在集群模式下redis宕机导致的锁失效问题

  • MulitLock

原理: 利用多个平等的redis节点,所有redis都写入才算获取锁成功
缺点: 维护成本高,实现相对复杂

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/768041.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

腾讯云服务器价格查询系统,2024年1年、3年和5年活动价格表

腾讯云服务器多少钱一年&#xff1f;61元一年起。2024年最新腾讯云服务器优惠价格表&#xff0c;腾讯云轻量2核2G3M服务器61元一年、2核2G4M服务器99元一年可买三年、2核4G5M服务器165元一年、3年756元、轻量4核8M12M服务器646元15个月、4核16G10M配置32元1个月、312元一年、8核…

Java生成动态图形验证码

dome /*** ClassName : VerifyCodeController* Description : 图片验证码* Author : llh* Date: 2024-03-22 10:48*/ Controller RequestMapping("/verifycode") public class VerifyCodeController {Resourceprivate StringRedisTemplate stringRedisTemplate;Get…

YOLOv8:Roboflow公开数据集训练模型

Roboflow公开数据集 Roboflow是一个提供计算机视觉数据集管理和处理工具的平台。虽然Roboflow本身并不创建或策划公开数据集&#xff0c;但它提供了一系列功能&#xff0c;帮助用户组织、预处理、增强和导出计算机视觉数据集。 官方网站&#xff1a;https://universe.roboflow…

求解完全背包问题

10.求解完全背包问题 - 蓝桥云课 (lanqiao.cn) import os import sys# 请在此输入您的代码 taotal_w,nmap(int,input().split()) w[] v[] dp[0]*(taotal_w1) #物品无限使用不用考虑 for i in range(n):wi,vimap(int,input().split())w.append(wi)v.append(vi)for i in range(n…

C++String类

1. 前言 String是C中操作字符串的类&#xff0c;它是在比较早的时候设计的STL模板&#xff0c;因此在某些地方设计的有些冗余 对于String类&#xff0c;不仅仅是学会使用它&#xff0c;更重要的是要从底层去理解它&#xff1b;本篇文章将从底层出发&#xff0c;模拟实现常用的S…

2024年阿里云服务器价格查询系统,最新报价

2024年腾讯云服务器优惠价格表&#xff0c;一张表整理阿里云服务器最新报价&#xff0c;阿里云服务器网整理云服务器ECS和轻量应用服务器详细CPU内存、公网带宽和系统盘详细配置报价单&#xff0c;大家也可以直接移步到阿里云CLUB中心查看 aliyun.club 当前最新的云服务器优惠券…

反激电源进阶及充电器基础认知

前言 反激开关电源核心工作原理&#xff0c;学会了这个原理&#xff0c;就代表着你的双脚已经全部跨入了开关电源世界的大门了。_哔哩哔哩_bilibili 最近不小心看了上面这个视频&#xff0c;有点感觉。 本文是 从开关电源&#xff08;BMS充电器&#xff09;入门硬件之——开…

代码随想录|Day26|贪心01|455.分发饼干、376.摆动序列、53.最大子数组和

455.分发饼干 大尺寸的饼干既可以满足胃口大的孩子也可以满足胃口小的孩子。 局部最优&#xff1a;尽量确保每块饼干被充分利用 全局最优&#xff1a;手上的饼干可以满足尽可能多的孩子 思路&#xff1a;大饼干 尽量分给 大胃口孩子 将小孩和饼干数组排序&#xff0c;我们从大到…

洛谷day3

B2053 求一元二次方程 - 洛谷 掌握printf用法&#xff1b; #include <iostream> #include <cmath> using namespace std; double a,b,c; double delta; double x1,x2;int main() {cin>>a>>b>>c;delta b*b-4*a*c;if(delta>0){x1 (-bsqrt…

ensp ppp验证实验(二)

实验拓扑&#xff1a; 1、R1和R2使用PPP链路直连&#xff0c;R2和R3把2条PPP链路捆绑为PPP MP直连 2、按照图示配置IP地址 3、R2对R1的PPP进行单向chap验证 4、R2和R3的PPP进行双向chap验证 实验内容&#xff1a; R1配置&#xff1a; #修改名称 <Huawei>sys Enter …

一些规律、现象

图文部分由COPILOT生成。 规律详情 墨菲定律 墨菲定律(Murphys Law) 一件事可能出错时就一定会出错。 图&#xff1a;AI生成 破窗效应 破窗效应(Broken windows theory&#xff09;是犯罪心理学理论。以一幢有少许破窗的建筑为例&#xff0c;如果那些窗没修理好&#xff0…

ShardingSphere水平分表——开发经验(2)

1. 什么场景下分表&#xff1f; 数据量过大或者数据库表对应的磁盘文件过大。 Q&#xff1a;多少数据分表&#xff1f; A&#xff1a;网上有人说1kw&#xff0c;2kw&#xff1f;不准确。 1、一般看字段的数量&#xff0c;有没有包含text类型的字段。我们的主表里面是不允许有t…

从零开始学HCIA之网络基础知识02

1、TCP/IP&#xff08;Transmission Control Protocol/Internet Protocol&#xff09;参考模型&#xff0c;它是当下实际的业界标准。 2、TCP/IP这个名字来自该协议簇中两个非常重要的协议&#xff0c;一个是IP&#xff08;Internet Protocol&#xff09;&#xff0c;另一个是T…

Go 限流器-漏桶 VS 令牌桶 常用包原理解析

本文主要介绍两个包Uber漏桶&#xff0c;time/rate令牌桶 可以了解到&#xff1a; 使用方法漏桶/令牌桶 两种限流思想 and 实现原理区别及适用场景应用Case 背景 我们为了保护系统资源&#xff0c;防止过载&#xff0c;常常会使用限流器。 使用场景&#xff1a; API速率限制…

带3090显卡的Linux服务器上部署SDWebui

背景 一直在研究文生图&#xff0c;之前一直是用原始模型和diffuser跑SD模型&#xff0c;近来看到不少比较博主在用 SDWebui&#xff0c;于是想着在Linux服务器上部署体验一下&#xff0c;谁知道并没有想象的那么顺利&#xff0c;还是踩了不少坑。记录一下过程&#xff0c;也许…

YOLO-MS 论文解读

paper&#xff1a;YOLO-MS: Rethinking Multi-Scale Representation Learning for Real-time Object Detection official implementation&#xff1a;https://github.com/fishandwasabi/yolo-ms 背景 尽管已经取得了很好的性能&#xff0c;但识别不同尺度的物体仍是实时目标…

【Mysql】硬盘性能压测(Sysbench工具)

1、IOPS和吞吐量介绍 IOPS&#xff08;每秒输入/输出操作数&#xff09;&#xff1a;是衡量存储设备每秒能够执行的输入/输出操作的数量。对于数据库等需要频繁读写的应用程序而言&#xff0c;IOPS 是一个关键的性能指标。更高的 IOPS 意味着存储设备能够处理更多的读写请求&am…

检索增强生成(RAG)技术:实现流程、作用及应用案例

一. RAG简介 在自然语言处理&#xff08;NLP&#xff09;领域中&#xff0c;检索增强生成&#xff08;Retrieval-Augmented Generation, RAG&#xff09;技术巧妙地结合了信息检索与神经网络生成模型的力量&#xff0c;通过在生成过程中引入相关的外部信息&#xff0c;实现了在…

【WEEK4】 【DAY5】AJAX - Part Two【English Version】

2024.3.22 Friday Following the previous article 【WEEK4】 【DAY4】AJAX - Part One【English Version】 Contents 8.4. Ajax Asynchronous Data Loading8.4.1. Create User.java8.4.2. Add lombok and jackson support in pom.xml8.4.3. Change Tomcat Settings8.4.4. Mo…

谷粒商城 - 前端基础

1.前端技术栈 2.ES6 2.1简介 2.2 let 与 const <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Doc…