Redis 分布式锁如何自动续期
何为分布式
- 分布式,从狭义上理解,也与集群差不多,但是它的组织比较松散,不像集群,有一定组织性,一台服务器宕了,其他的服务器可以顶上来。分布式的每一个节点,都完成不同的业务,一个节点宕了,这个业务就不可访问了。
- 分布式是指将一个业务拆分不同的子业务,分布在不同的机器上执行。
分布式锁
- 为了保证操作共享资源在高并发情况下的同一时间只能被同一个线程执行,在单体应用单机部署的情况下,可以使用
Java
并发处理相关的API
(如ReentrantLcok
或synchronized
)进行互斥控制,这是在JVM
层面的加锁方式。 - 单体单机部署的系统被演化成分布式系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,为了解决这个问题就需要一种跨
JVM
的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题
- 分布式锁是一种用于在分布式系统中实现互斥访问的机制。它可以确保在多个节点同时访问共享资源时,只有一个节点能够获取到锁并执行操作,其他节点需要等待。
分布式锁的实现方式
基于数据库
- 可以使用数据库的事务机制来实现分布式锁。通过在数据库中创建一个特定的表或记录来表示锁的状态,当节点需要获取锁时,尝试插入或更新这个表或记录,如果成功则获取到锁,否则等待。
基于缓存
- 可以使用分布式缓存如
Redis
或Memcached
来实现分布式锁。通过在缓存中设置一个特定的键值对来表示锁的状态,当节点需要获取锁时,尝试设置这个键值对,如果成功则获取到锁,否则等待。
基于ZooKeeper
ZooKeeper
是一个分布式协调服务,可以用于实现分布式锁。通过创建临时顺序节点来表示锁的状态,当节点需要获取锁时,尝试创建自己的临时顺序节点,并检查是否是最小的节点,如果是则获取到锁,否则监听前一个节点的删除事件,等待。
基于分布式算法
- 还有一些基于分布式算法的实现方式,如
Chubby
、Raft等。这些算法通过选举、协调等机制来实现分布式锁。
需要注意的是,分布式锁的实现需要考虑到并发性、可靠性和性能等方面的问题,选择合适的实现方式需要根据具体的需求和场景进行评估。
分布式锁的特点
- 互斥性:在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁:即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 容错性:只要大部分的
Redis
节点正常运行,客户端就可以加锁和解锁。 - 可重入性:加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
Redis实现分布式锁
Redis Setnx命令
Redis Setnx(SET if Not eXists)
命令在指定的key
不存在时,为key
设置指定的值。
redis 127.0.0.1:6379> SETNX KEY_NAME VALUE
- 设置成功,返回
1
。 设置失败,返回0
。
Set命令
-
setnx
不能同时完成expire
设置失效时长,不能保证setnx
和expire
的原子性。我们可以使用set
命令完成setnx
和expire
的操作,并且这种操作是原子操作。 -
例子:设置
lock=test
,失效时长3s
,不存在时设置set lock test ex 3 nx
。设置成功返回OK
,设置失败返回null
SpringBoot使用Redis分布式锁
基于RedisTemplate
- 假设业务代码块在
6s
之内处理完成,那么下面的代码就不会有业务代码执行超时,分布式锁没有问题 - 如果业务代码执行耗时较长,那么设置的键会自动过期,导致上个业务还没有执行结束,下个业务还能拿到锁,分布式锁失效
/*** Set 实现分布式锁子*/
@Override
public void setRedisLock() {// redis KeyString redisKey = "ID_1001";// value 身份标识String redisValue = UUID.randomUUID().toString();try {// 获取分布式锁,设置超时时间 6s 假设业务代码最长 6s 执行完毕ValueOperations valueOperations = redisTemplate.opsForValue();boolean lockFlag = !valueOperations.setIfAbsent(redisKey, redisValue, 6, TimeUnit.SECONDS).booleanValue();if (lockFlag) {throw new Exception("redis key:" + redisKey + " 值:" + redisValue + " 获取锁失败");} else {logger.info("redis key:{} 值:{} 获取锁成功", redisKey, redisValue);}// 实现业务代码:暂时假设业务代码执行时长在 6s 之内} catch (Exception e) {logger.error(e.getMessage(), e);throw new RuntimeException(e.getMessage());} finally {boolean deleteFlag;String currentValue = (String) redisTemplate.opsForValue().get(redisKey);if (redisValue.equals(currentValue)) {deleteFlag = redisTemplate.opsForValue().getOperations().delete(redisKey).booleanValue();if (deleteFlag) {logger.info("redis 锁:{} 释放成功", redisKey);} else {logger.error("redis 锁:{} 释放失败", redisKey);}} else {logger.error("redis 锁:{} 值:{} 身份校验失败无法释放", redisKey, redisValue);}}}
Redis分布式锁续期处理
- 在上面的例子中,当业务代码执行耗时超过
redis
设置的超时时间时,下一个任务获取锁的时候还是会获取成功,这样在业务上是又问题的。所以得要考虑处理锁续期。 - 实现思路,开启一个定时任务作为守护线程,如果业务代码没有执行完成主动进行续期操作
- 任务完整之后终止守护线程,释放获取的锁
@Override
public void setRedisLock1() {// redis KeyString redisKey = "ID_1001";TestTask testTask = new TestTask();CustomResponse response = execute(testTask, redisKey, 7, true);if (response.getCode() != 0) {logger.error("线程:" + Thread.currentThread().getId() + "执行结果:" + response.getMsg());}
}/*** 利用redis做分布式锁** @param runnable 执行的业务* @param lockKey 锁定key, 不同业务应该全局唯一* @param lockTime 锁定时间 (单位 ms)* @param autoRelock 是否自动续期*/
public CustomResponse execute(Runnable runnable, String lockKey, long lockTime, boolean autoRelock) {CustomResponse customResponse = new CustomResponse();execute(runnable, lockKey, lockTime, autoRelock, customResponse);return customResponse;
}/*** 利用redis做分布式锁** @param runnable 执行的业务* @param lockKey 锁定key, 不同业务应该全局唯一* @param lockTime 锁定时间 (单位 ms)* @param autoRelock 是否自动续期* @param customResponse 执行结果*/
public void execute(Runnable runnable, String lockKey, long lockTime, boolean autoRelock, CustomResponse customResponse) {if (customResponse == null) {throw new IllegalArgumentException("customResponse 参数不能为空");}if (lockTime <= 0) {throw new IllegalArgumentException("请设置正确的 redis key 超时时间");}boolean flag = true;boolean completedFlag = true;TimerTask timerTask = null;ScheduledFuture<?> scheduledFuture = null;try {// 失效时间,设置失败的key强制删除Long hasKeyExpire = redisTemplate.getExpire(lockKey);if (hasKeyExpire != null && hasKeyExpire.intValue() == -1) {redisTemplate.delete(lockKey);}ValueOperations<String, String> operations = redisTemplate.opsForValue();if (Boolean.TRUE.equals(operations.setIfAbsent(lockKey, "1", lockTime, TimeUnit.MILLISECONDS))) {// 开启续期,超时时间之后开始任务if (autoRelock) {timerTask = new TimerTask() {public void run() {logger.info("redis key:{} 自动续期任务执行...", lockKey);redisTemplate.opsForValue().setIfPresent(lockKey, "1", lockTime, TimeUnit.MILLISECONDS);}};try {scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(timerTask, lockTime / 2, lockTime, TimeUnit.SECONDS);} catch (Throwable e) {logger.debug(e.getMessage());}}customResponse.setMsg(0, "获取 redis 锁成功");// 执行业务逻辑try {runnable.run();// 处理标志位completedFlag = false;} catch (Throwable e) {logger.error("redis key:{} 执行业务代码出错:{}", lockKey, e.getMessage(), e);customResponse.setMsg(500, e.getMessage());}} else {flag = false;customResponse.setMsg(100, "获取锁失败");}} catch (Throwable e) {if (completedFlag) {logger.error(e.getMessage(), e);customResponse.setMsg(500, e.getMessage());}} finally {try {// 删除自己设置的锁if (flag) {redisTemplate.delete(lockKey);logger.info("执行完成删除自己的 key");}// 移除定时任务timerTask.cancel();if (Objects.nonNull(scheduledFuture)) {scheduledFuture.cancel(true);}} catch (Throwable e) {logger.debug(e.getMessage(), e);}}
}private class TestTask implements Runnable {@Overridepublic void run() {try {logger.info("任务开始执行...");Thread.sleep(10000);logger.info("任务执行结束...");} catch (InterruptedException e) {throw new RuntimeException(e);}}}
- 获取锁:使用
Redis
的SETNX
命令尝试获取锁。如果返回1
表示获取锁成功,返回0
表示锁已被其他进程持有。 - 设置锁的过期时间:如果成功获取到锁,可以使用
Redis
的EXPIRE
命令设置锁的过期时间,确保在一定时间后自动释放锁。 - 续期处理:在业务处理过程中,可以定期(比如锁过期时间的一半)使用
Redis
的EXPIRE
命令来延长锁的过期时间,防止锁过期后被其他进程获取。 - 释放锁:在业务处理完成后,使用
Redis
的DEL
命令释放锁。 - 需要注意的是,分布式锁的续期处理需要保证原子性,避免多个进程同时续期导致锁被误释放。可以使用
Redis
的Lua
脚本来保证续期操作的原子性。 另外,为了防止进程异常退出或崩溃导致锁无法释放,可以使用Redis
的SET
命令设置一个唯一的锁标识,并在获取锁和续期操作时进行比对,确保只有持有锁的进程才能释放锁。