Redis分布式锁进阶源码分析

Redis分布式锁进阶源码分析

    • 1、如何写一个商品秒杀代码?
    • 2、加上Java锁
    • 3、使用redis setnx命令获取锁
    • 4、增加try和finally
    • 5、给锁设置过期时间
    • 6、增长过期时间,并setnx增加唯一value
    • 7、使用redisson
    • 8、源码分析
      • a、RedissonLock.tryLockInnerAsync
      • b、RedissonLock.tryAcquireAsync
    • 9、Redisson分布式锁的源码分析总结

根据秒杀场景演示

1、如何写一个商品秒杀代码?

@Autowired
StringRedisTemplate redisTemplate;public String stock() {String key = "stock_01";int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));if (stockNum > 0) {redisTemplate.opsForValue().set(key, (stockNum - 1) + "");}else {return "fail";}return "success";
}

上面的写法会造成并发问题,多个客户端同时请求此方法,查询到的库存一致,同时扣减,导致超卖。

2、加上Java锁

public synchronized String stock() {String key = "stock_01";int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));if (stockNum > 0) {redisTemplate.opsForValue().set(key, (stockNum - 1) + "");}else {return "fail";}return "success";
}

加上Java锁,会避免此问题,但是,如果是分布式项目,一个节点会部署到多个容器或者在多个Tomcat中运行,Java锁无法解决这种问题

3、使用redis setnx命令获取锁

每次执行扣减库存前,先用setnx命令插入一个标志,标记此线程方法获取到锁,获取成功方能扣减,不成功就返回。执行完扣减后删除标志。

注意:命令setnx key value,将 key 的值设为value,当且仅当key不存在;若给定的key已经存在,则不做任何动作。设置成功,返回1;设置失败,返回0。

public String stock() {String key = "stock_01";Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, "stock");	//setnxif(!ifAbsent){return "fail";}int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));if (stockNum > 0) {redisTemplate.opsForValue().set(key, (stockNum - 1) + "");} else {return "fail";}redisTemplate.delete(key);	//执行完扣减后删除keyreturn "success";
}

上面的代码如果执行完setnx命令后,程序异常报错,锁得不到释放,其他线程无法扣减库存,这时候就有人说了,可以加上try和finally,在finally中删除key这样就可以解决。

4、增加try和finally

 public String stock() {String key = "stock_01";Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, "stock");try {if(!ifAbsent){return "fail";}int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));if (stockNum > 0) {redisTemplate.opsForValue().set(key, (stockNum - 1) + "");} else {return "fail";}}finally {redisTemplate.delete(key);	//执行完扣减后删除key}return "success";
}

如果执行到try中的代码服务器刚好宕机,没有执行finally中的删除key,还是不会释放锁,如何解决?

5、给锁设置过期时间

public String stock() {String key = "stock_01";Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, "stock",10, TimeUnit.SECONDS);//执行setnx,并给key设置过期时间10秒try {if(!ifAbsent){return "fail";}int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));if (stockNum > 0) {redisTemplate.opsForValue().set(key, (stockNum - 1) + "");} else {return "fail";}}finally {redisTemplate.delete(key);  //执行完扣减后删除key}return "success";
}

上面代码还是会有问题,如果扣减代码执行时间大于我们设置的过期时间,redis已经删除了key,其他线程可以获取到锁,并正常执行,但是第一次获取到锁的线程扣减完库存之后,执行了删除key的操作,导致下一个线程丢失锁。可以给这个setnx命令的value设置一个唯一值来区分哪个线程获取到锁

6、增长过期时间,并setnx增加唯一value

public String stock() {String key = "stock_01";String id = UUID.randomUUID().toString();//增加唯一id,Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, id, 30, TimeUnit.SECONDS);//把id存入到value中try {if (!ifAbsent) {return "fail";}int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));if (stockNum > 0) {redisTemplate.opsForValue().set(key, (stockNum - 1) + "");} else {return "fail";}} finally {if (id.equals(redisTemplate.opsForValue().get(key))) {//对比id是否一致,一致才可删除锁,避免锁误删redisTemplate.delete(key);  //执行完扣减后删除key}}return "success";
}

这时候已经能解决大部分秒杀场景了,虽然已经考虑的足够多的情况了,但是很不幸,上面代码还是会出现问题
a、增长过期时间其实治标不治本,出问题的概率会变小,但是不代表不会出问题,代码执行时间还是会超过过期时间,导致锁丢失
b、执行到finally中的对比id已经执行,而删除key没有执行,过期时间到了,此时第二个线程获取到锁,但是第一个线程又执行了删除,极端情况还是会出现误删锁导致超卖

面临这两个问题如何解决:
a、动态修改时间,即锁续命:开启一个线程执行一个定时任务,去判断获取锁的线程有没有结束,如果没结束就增加过期时间“续命”
b、判断有没有key和删除key的操作要有原子性:Java中没有提供这种操作,但是Lua脚本可以实现

7、使用redisson

a、引入pom:

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>2.7.0</version>
</dependency>

b、增加配置类:

@Configuration
@Slf4j
public class RedissonManager {    //集群环境使用-节点信息    @Value("${spring.redis.cluster.nodes:default}")private String clusterNodes;    //公共-密码    @Value("${spring.redis.password:default}")private String password;//单机环境使用@Value("${spring.redis.host:default}")private String host;//单机环境使用@Value("${spring.redis.port:6379}")private String port;//单机环境使用@Value("${spring.redis.database:0}")private int database;@Bean@ConditionalOnProperty(name = "spring.redis.mode", havingValue = "cluster")public RedissonClient redissonClient() {// 集群环境使用Config config = new Config();config.useClusterServers().addNodeAddress(clusterNodes.split(",")).setPassword(password);return Redisson.create(config);}@Bean@ConditionalOnProperty(name = "spring.redis.mode", havingValue = "singleton", matchIfMissing = true)public RedissonClient redissonSingletonClient() {// 单机打包使用Config config = new Config();config.useSingleServer().setAddress(host + ":" + port).setPassword(password).setDatabase(database);return Redisson.create(config);}
}

c、代码如下

@Autowired
StringRedisTemplate redisTemplate;
@Autowired
Redisson redisson;public String stock() {String key = "stock_01";RLock lock = redisson.getLock(key);lock.lock();try {int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));if (stockNum > 0) {redisTemplate.opsForValue().set(key, (stockNum - 1) + "");} else {return "fail";}} finally {lock.unlock();return "fail";}return "success";
}

8、源码分析

a、RedissonLock.tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {//锁续命的执行周期,默认30秒,this.internalLockLeaseTime = java.util.concurrent.TimeUnit.SECONDS.toMillis(30L);this.internalLockLeaseTime = unit.toMillis(leaseTime);//执行Lua脚本return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,//redis.call,用于执行Redis脚本。这个命令会将脚本中的Redis命令调用转化为Lua数据类型,并执行这个脚本。//redis.call('exists', key),用于检查指定的键是否存在,如果键存在,则返回1;键不存在,则返回0。"if (redis.call('exists', KEYS[1]) == 0) then " +//判断key不存在// 保存到Hash(哈希表) 中// hset:指定要执行的Redis命令为hset,hset key field value:将哈希表key中的域field的值设为value// KEYS[1]:哈希表的键名,为this.getName()也就是代码中传过来的key// ARGV[2]:指定要设置的字段名,为this.getLockName(threadId),也就是value为当前线程id// 1:指定要将字段设置为的值"redis.call('hset', KEYS[1], ARGV[2], 1); " +// 设置过期时间// ARGV[1]为this.internalLockLeaseTime,默认30秒"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) " +//如果key存在,锁重入// hincrby:指定要执行的Redis命令为hincrby,hincrby key field increment:为哈希表key中的域field的值加上增量increment"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +// 重置过期时间为30秒"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//以毫秒为单位返回key的剩余时间"return redis.call('pttl', KEYS[1]);",Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}

b、RedissonLock.tryAcquireAsync

//此方法异步地尝试获取锁,它不会阻塞锁的线程
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1L) {//没有获取到锁,返回失败return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//获取到锁RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(30L, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);//注册一个回调方法,这个方法在异步方法执行完成执行ttlRemainingFuture.addListener(new FutureListener<Long>() {public void operationComplete(Future<Long> future) throws Exception {if (future.isSuccess()) {//执行完成获取结果Long ttlRemaining = (Long)future.getNow();if (ttlRemaining == null) {//scheduleExpirationRenewal会每隔10秒给锁刷新过期时间,默认置为30秒,直到这个锁获取不到RedissonLock.this.scheduleExpirationRenewal(threadId);}}}});return ttlRemainingFuture;}
}

9、Redisson分布式锁的源码分析总结

  • 锁标识:Redisson使用Hash数据结构来表示锁。在这个Hash中,key为锁的名字,field为当前竞争锁成功的线程的唯一标识,value为重入次数。
  • 队列:所有竞争锁失败的线程,会被放入一个队列中,等待锁的释放。这些线程会订阅当前锁的解锁事件,一旦锁被释放,就会唤醒队列中的一个线程来尝试获取锁。这个机制是通过Semaphore来实现的线程的挂起和唤醒。
  • 加锁:加锁的核心源码在tryLockInnerAsync方法中。这个方法首先会将锁的租约时间转换为毫秒,然后执行一个Lua脚本尝试获取锁。如果获取锁成功,就会设置一个定时任务来续期锁的租约时间,避免锁因为超时而被自动释放。如果获取锁失败,就会将当前线程放入等待队列中,等待锁的释放。
  • 解锁:解锁的核心源码在unlockInnerAsync方法中。这个方法会执行一个Lua脚本来释放锁。如果释放锁成功,就会唤醒等待队列中的一个线程来尝试获取锁。

Redisson分布式锁的实现原理主要基于Redis的单线程特性和Lua脚本的原子性。通过使用Lua脚本,可以保证加锁和解锁的操作是原子的,不会被其他操作打断。同时,通过定时任务来续期锁的租约时间,可以避免因为网络延迟等原因导致锁被提前释放。
总的来说,Redisson分布式锁的实现提供了一种高效、可靠的分布式锁解决方案,可以很好地满足分布式系统中的并发控制需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/580457.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

插入排序,选择排序,冒泡排序,顺序搜索,二分搜索,迭代,求最大公因数,最小公倍数等简单模板

目录 1.排序 1.插入排序模板 2.冒泡排序模板 3.选择排序模板 2.搜索 1.顺序搜索 2.二分搜索 3.迭代 1.基础迭代 ​编辑 4.求最大公因数&#xff0c;最小公倍数 1.最直接的方法 取巧一点 2.辗转相除法&#xff08;欧几里得法&#xff09; 1.排序 1.插入排序模板 插…

ES慢查询分析——性能提升6 倍

问题 生产环境频繁报警。查询跨度91天的数据&#xff0c;请求耗时已经来到了30s。报警的阈值为5s。 背景 查询关键词简单&#xff0c;为‘北京’ 单次仅检索两个字段 查询时间跨度为91天&#xff0c;覆盖数据为450亿数据 问题分析 使用profle分析&#xff0c;复现监控报警的…

Halo多博客备份,同时备份redis与mysql,将备份文件上传到百度云

代码&#xff1a;https://github.com/loks666/py_tools 写在前面 我的服务器运行了多个halo博客&#xff0c;都在同一个域名下&#xff0c;只是用前缀区分&#xff0c;所以代码中我也是使用前缀区分的&#xff0c;使用了list元祖中包含了多个halo站点信息&#xff0c;记得在代…

PSV新内存卡(或内存卡格式化后)如何安装VITASHELL文件管理器

本博文适合PSV破解固化后的系统&#xff0c;例如变革3.65破解固化后换新的内存卡&#xff0c;或者内存卡格式化后如何在内存卡上安装文件管理器&#xff08;没有文件管理器无法安装游戏&#xff09;。如果你的PSV还没破解&#xff0c;那本文不适合没破解的情况&#xff0c;按照…

数据库(多对多表关系及关联查询)

添加外键约束&#xff1a; alter table 表名 drop foreign_key fk(外键约束)_ 表名_列名_列名 添加约束规则&#xff1a; 1.主表中没有对应记录&#xff0c;不能将记录添加到从表 2.从表存在与主表对应的记录&#xff0c;不能从主表中删除该行 3.删除主标前&#xff0c;先…

问题:执行conda init 提示 No action taken,然后无法正确激活环境

执行完下面代码后&#xff0c; conda activate base 报错&#xff0c;提示先执行conda init,于是再执行下面代码 conda init发现还报错提示提示 No action taken。 解决方法&#xff1a; 打开一个新的终端窗口&#xff0c;您应该就可以正常使用conda命令。&#xff08;把其…

VIRTUALBOX VAGRANT虚拟机网速慢解决方案

查看时长 time curl -s http://www.baidu.com > /dev/null 1config.vm.provider :virtualbox do |vb| 2 vb.customize ["modifyvm", :id, "--natdnshostresolver1", "on"] 3 vb.customize ["modifyvm", :id, "--natdn…

ElasticSearch 的 mapping 参数 - fields

概要 在 es 中&#xff0c;一个字段可能运用于不同的场景&#xff0c;但是某个字段类型的使用场景是有局限的 下面&#xff0c;我们先来看一段 es 查询语句&#xff1a; $must ["bool" > ["should" > [["range" > ["user_id.r…

设计模式之-观察者模式,快速掌握观察者模式,通俗易懂的讲解观察者模式以及它的使用场景

文章目录 一、快速理解观察者模式二、观察者模式适用场景三、观察者模式优缺点观察者模式的优点包括&#xff1a;观察者模式的缺点包括&#xff1a; 四、代码示例五、我们来听一个故事&#xff0c;加深理解 一、快速理解观察者模式 当谈到设计模式中的观察者模式&#xff08;O…

视觉学习(7) —— 接收数据和发送数据以及全局变量和浮点数

1、前提 创建一个四个字节的地址 2、发送数据 &#xff08;1&#xff09;直接发送数据 再观察地址里的值 与我们想要值不一样 输入0&#xff0c;而实际值则为 结论&#xff1a;直接输入值到地址&#xff0c;值会发生变化 &#xff08;2&#xff09;走全局变量发送数据 添加全…

系列十(实战)、发送 接收批量消息(Java操作RocketMQ)

一、发送 & 接收批量消息 1.1、概述 批量消息是指RocketMQ可以把一组消息集合一次性发送&#xff0c;这一组消息会被当做一个消息供消费者消费。 1.2、Demo05MQTestApp /*** Author : 一叶浮萍归大海* Date: 2023/12/25 11:48* Description: 发送 & 接收批量消息*/ …

基于SSM实现的电动汽车充电网点管理系统

一、系统架构 前端&#xff1a;jsp | jquery | bootstrap | css 后端&#xff1a;spring | springmvc | jdbc 环境&#xff1a;jdk1.8 | mysql 二、代码及数据库 三、功能介绍 01. web端-首页 02. web端-登录 03. web端-注册 04. web端-我要充电 05. web端-个人中心-消…

搞定Apache Superset

踩雷了无数次终于解决了Superset的一系列问题 现在是北京时间2023年12月27日&#xff0c;亲测有效。 Superset概述 Apache Superset是一个现代的数据探索和可视化平台。它功能强大且十分易用&#xff0c;可对接各种数据源&#xff0c;包括很多现代的大数据分析引擎&#xff…

php5.6安装mongo扩展

需要依赖 可以参考 php5.6安装openssl扩展 https://pecl.php.net/package/mongo 安装mongo扩展 wget https://pecl.php.net/get/mongo-1.6.16.tgz/Users/hina/Applications/php/5.6.40/bin/phpize./configure --with-php-config/Users/hina/Applications/php/5.6.40/bin/ph…

JS深浅拷贝

区分 B复制了A的值&#xff0c;如果A被修改&#xff0c;B的值也被改变&#xff0c;那就是浅拷贝。 如果B的值没有跟着修改&#xff0c;那就是深拷贝 深浅拷贝的方式 1、遍历赋值 2、Object.create() 3、JSON.parse()和JSON.stringify() 浅拷贝-遍历 let a {name:"…

recognize-anything 识别万物

docker run --gpus all -itd --nametest -v /app:/app nvcr.io/nvidia/cuda:11.8.0-devel-ubuntu22.04 ###########安装过程######################## # 更新包索引 apt update # 安装 Python 3 apt install python3 -y # 安装 pip apt install python3-pip -y # 安装…

k8s面试之——简述网络模型

kubernetes网络模型是kubernetes集群中管理容器网络通信的一种机制&#xff0c;用于实现pod间、pod与外部网络间的通信和互联&#xff0c;并提供了多种网络插件和配置选项来满足不同应用场景下的需求。kubernetes网络模型可以分为一下几个部分&#xff1a; 1. pod网络模型 在…

详解结构体(包含结构体内存对齐,柔性数组,位段)【尊嘟很详细】

​ 结构体 结构体是一些值的集合&#xff0c;这些值称为成员变量&#xff0c;结构的成员可以是标量、数组、指针,甚至是其他结构体。 成员名可以与程序中其它变量同名&#xff0c;互不干扰。 结构体的定义 &#xff08;struct结构名{}&#xff09; struct books {int a;c…

饮用水除溴酸盐和硝酸盐中的应用与优势

随着人们对健康和生活质量的日益关注&#xff0c;饮用水安全问题成为了社会关注的焦点。在自然水体中&#xff0c;溴酸盐和硝酸盐的含量往往较高&#xff0c;而这些物质对人体健康存在一定的潜在风险。因此&#xff0c;饮用水处理中如何有效去除溴酸盐和硝酸盐&#xff0c;成为…

TypeScript下载安装,编译运行

TypeScript是拥有类型的JavaScript超集&#xff0c;它可以编译成普通、干净、完整的JavaScript代码。 简单理解&#xff1a;TypeScript就是加强版的JavaScript。 TypeScript最终会被编译成JavaScript代码&#xff0c;那么我们必然需要对应的编译环境 环境搭建前提&#xff1a…