一、为什么要使用分布式锁?
因为在集群下,相当于多个JVM,就相当于多个锁,集群之间锁是没有关联的,会照成锁失效从而导致线程安全问题
分布式锁可以分别通过MySQL、Redis、Zookeeper来进行实现
二、redis分布式锁的实现(基于setnx实现的分布式锁)
- 创建ILock接口
package com.hmdp.utils;public interface ILock {/*** 尝试获取锁* @param timeoutSec* @return*/boolean tryLock(long timeoutSec);/*** 释放锁*/void unLock();
}
- 创建SimpleRedisLock实现类
package com.hmdp.utils;import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;import java.util.concurrent.TimeUnit;public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock";@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标示String threadId = Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unLock() {// 释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}
}
三、以上Redis分布式锁存在的一些问题
1、锁的误删问题
问题:线程1拿到锁产生了业务阻塞,这个时候锁已经超时释放导致线程2可以拿到锁,这时线程1业务执行完会将线程2的锁进行释放
解决方案:在释放锁的时候进行判断,是否是自己的锁
SimpleRedisLock实现类代码优化:
package com.hmdp.utils;import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;import java.util.concurrent.TimeUnit;public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标示String threadId =ID_PREFIX + Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unLock() {// 获取线程标示String threadId =ID_PREFIX + Thread.currentThread().getId();// 获取锁中的标示String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判断标示是否一致if (threadId.equals(id)) {// 释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}}
}
2、原子性问题
问题:线程1获取锁,执行业务结束,判断锁是否是自己的,判断成功,可能在Jvm垃圾回收的时候阻塞时间过长(这是在判断成功和释放锁之间执行的动作)导致锁超时释放,这个时候线程2可以成功获取到锁,当线程1阻塞结束,因为判断锁是否是自己的已经成功,所以线程1直接删除锁,从而导致误删了线程2的锁
解决思路:保证判断和释放的原子性
解决方法:
- 创建lua文件并编写lua脚本(IDEA需要下载插件EmmyLua)
**
**
- lua脚本内容
-- 比较线程标示与锁中的标示是否一致
if(redis.call('get',KEYS[1]) == ARGY[1]) thenreturn redis.call('del',KEYS[1])
end
return 0
- 调用lua脚本
SimpleRedisLock实现类代码修改
package com.hmdp.utils;import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Collections;
import java.util.concurrent.TimeUnit;public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标示String threadId =ID_PREFIX + Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unLock() {// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());}
}
3、还存在一些问题
- 不可重入:同一个线程无法多次获取同一把锁
- 不可重试:获取锁只尝试一次就返回false,没有重试机制
- 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
- 主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁实现
以上自己设计Redis分布式锁是为了让大家了解分布式锁的基本原理,在企业中直接通过Redisson来实现就可以
四、Redisson
1.什么是Redisson?
Redisson是一个在Redis的基础上实现的Java驻内存数据网络。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现
2.使用方法
1.引入依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
2.配置Redisson
package com.hmdp.config;import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();// 配置自己的虚拟机地址和密码 配置密码是setPassword(),我虚拟机没有密码,所以省略config.useSingleServer().setAddress("redis://192.168.198.138:6379");// 创建RedissonClient对象return Redisson.create(config);}}
3.使用Redisson的分布式锁
@Resourceprivate RedissonClient redissonClient;@Testvoid testRedisson() throws InterruptedException {// 获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anyLock");// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);// 判断释放获取成功if(isLock) {try {System.out.println("执行业务");} finally {// 释放锁lock.unlock();}}}
3.Redisson可重入锁原理
4.Redisson分布式锁原理
-
可重入:利用hash结构记录线程id和重入次数
-
可重试:利用信号量和PubSub功能实现等待、唤醒、获取锁失败的重试机制
-
超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间
-
主从一致性:利用Redisson的multiLock。原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功