在分布式系统中,如果使用JVM中的同步锁在高并发的场景下仍然会产生线程安全问题。首先我们来查看在多个服务器时为什么会产生线程安全问题,有这样一个案例,有一件商品购买规则为一个用户只能购买一次,如果使用同步锁锁住用户id,只能保证在一个服务器中一个用户只能购买一次,在集群模式下,就可能产生并发问题。
为了避免这个问题,我们应该采取一个新的锁监视器,当需要加锁时,所有服务器都需要从外部的锁监视器中查看是否有线程加锁,如果没有则获取互斥锁,如果已经有线程获取到互斥锁,那么就阻塞等待。模型图如下
什么是分布式锁
满足分布式系统或集群模式下多进程可见并互斥的锁。
分布式锁的实现
分布式锁的核心是实现多进程之间的互斥,常见的实现方式有三种
MySQL | Redis | Zookeeper | |
互斥 | 利用MySQL本身的互斥锁机制 | 利用setnx这样的互斥命令 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接,自动释放锁 | 利用锁超时时间,到期释放 | 临时节点,断开连接自动释放 |
这里我们介绍Redis的实现方式,首先是需要实现的两个最基本的方法
获取锁,通过setnx命令,并expire命令设置超时时间。
释放锁,通过del命令,或是宕机后通过超时时间释放。
但是在获取锁时可能会存在一个问题,那就是在setnx时执行成功但是在expire时宕机,没设置到超时时间,为了避免这种情况,我们需要保证两个命令的原子性,可以采用lua脚本又或是采用set方法,指定ex与nx参数,采用set语法如下
set lock 1 nx ex 10
在Java中实现代码如下
public class SimpleRedisLock{private String name;private StringRedisTemplate stringRedisTemplate;private static final String key_Prefix="lock:";public SimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean getLock(String key,long timeOut) {String id = Thread.currentThread().getId();//因为这里Redis会返回一个Boolean类型,但是结果要boolean要进行拆箱,如果没查到的话会返回一个null,直接返回结果容易造成空指针异常Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key_Prefix + key, id + "", timeOut, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void rmLock(String key) {stringRedisTemplate.delete(key_Prefix+key);}
}
但是这样又存在一个问题,那就是如果一个获取到锁的线程因为某些原因阻塞,导致已经超过了锁的超时时间还没有执行完毕,此时如果新的线程来获取锁,因为Redis已经将锁删除了,因此可以顺利获取到锁。在第二个线程正在执行业务时,第一个线程执行完毕,开始执行删除锁操作,按照我们所实现的代码,会将第二个线程的锁删除,此时第三个线程它也可以开始获取到新的锁,然后在执行期间锁被第二个线程释放。从而造成并行错误。具体模型图如下
为了解决这个问题,我们需要修改初始代码如下,正确的模型图如下
public class SimpleRedisLock{private StringRedisTemplate stringRedisTemplate;private static final String key_Prefix="lock:";//修改线程标识为UUID,toString方法中的true是为了将UUID中的-去除,我们需要自己在后面拼接一个-private static final String ID_Prefix= UUID.randomUUID().toString(true)+"-";public SimpleRedisLock(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean getLock(String key,long timeOut) {String id = ID_Prefix+Thread.currentThread().getId();//因为这里Redis如果没查到的话会返回一个null类型是Boolean,但是结果要boolean要进行拆箱,boolean只又true以及false,会报空指针异常Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key_Prefix + key, id + "", timeOut, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void rmLock(String key) {//获取锁标识。如果相同在释放,不同则什么也不做String id = ID_Prefix+Thread.currentThread().getId();String value = stringRedisTemplate.opsForValue().get(key_Prefix + key);if (id.equals(value)) stringRedisTemplate.delete(key_Prefix+key);}
}
但是,这样也无法完全解决分布式锁可能产生的问题,接下来我们查看另一种模型图
线程一在判断过Redis中锁标识一样后就在开始执行释放锁时,比如说开始gc垃圾回收或是其他原因导致的暂时阻塞,而在阻塞期间线程一的锁标识超时释放,这时,线程二进行获取锁。线程一阻塞结束,由于要执行删除操作,再次把线程二的锁误删。
这样我们如果通过修改Java代码来解决该问题就过于复杂,需要依赖Redis中的事务机制以及乐观锁实现,因此更推荐使用Lua脚本,来保证获取锁与删除锁的原子性。
简单介绍一下Redis中在Lua语言中提供调用的方法
redis.call('命令名称','key','其他参数')-- 比如说要执行set name 张三
redis.call('set','name','张三')-- 如果不想写死需要执行key,value,那么可以通过参数传递
-- key 类型会放在KEYS数组当中。value会放在ARGV数组当中
-- 需要注意的时,Lua语言中数组下标以1开始
redis.call('set','KEYS[1]','ARGV[1]')
RedisTemplate调用lua脚本的API如下
redistemplate.excute(script,keys,args);
再次修改Java代码如下
public class SimpleRedisLock{private StringRedisTemplate stringRedisTemplate;private static final String key_Prefix="lock:";private static final String ID_Prefix= UUID.randomUUID().toString(true)+"-";private static final DefaultRedisScript<Long> rmLock_script;static {rmLock_script=new DefaultRedisScript<>();//设置脚本位置rmLock_script.setLocation(new ClassPathResource("rmlock.lua"));//设置返回类型rmLock_script.setResultType(Long.class);}public SimpleRedisLock(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean getLock(String key,long timeOut) {String id = ID_Prefix+Thread.currentThread().getId();//因为这里Redis如果没查到的话会返回一个null类型是Boolean,但是结果要boolean要进行拆箱,boolean只又true以及false,会报空指针异常Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key_Prefix + key, id + "", timeOut, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void rmLock(String key) {//调用lua脚本stringRedisTemplate.execute(rmLock_script, Collections.singletonList(key_Prefix + key),ID_Prefix+Thread.currentThread().getId());}}
具体的执行脚本代码如下
if(redis.call('get',KEYS[1]) == ARGV[1]) thenreturn redis.call('del',KEYS[1])
end
return 0
这样的实现方式已经可以避免普通的并发问题,但是仍然存在一定问题,比如说存在一个业务需要方法A调用方法B而在这两个方法中需要获取同一把锁,那么就是产生死锁问题,因此我们还需要实现锁可重入。其次我们的实现方式中,如果没有获取到锁会立即返回,但是通常我们需要进行重试,我们还需要实现重试机制。还有主从不一致问题,这些问题让我们实际开发中实际并不现实,因此我们可以选择Redisson来解决以上问题。
Redisson的简单使用
在pom文件引入依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.19.0</version>
</dependency>
配置Redis,我们可以选择yaml文件配置,也可以选择Java配置类
@Configuration
public class RedisConfig {@Beanpublic RedissonClient redissonClient(){//配置类Config config=new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");return Redisson.create(config);}
}
Redisson的简单使用
@SpringBootTest
class RedissonApplicationTests {@Resourceprivate RedissonClient redissonClient;@Testpublic void test01() throws Exception {//获取锁,指定锁名称,可重入RLock lock = redissonClient.getLock("lock");//三个参数分别是,最大获取锁等待时间(期间会重试),锁自动释放时间,时间单位boolean flag = lock.tryLock(1, 10, TimeUnit.SECONDS);if (flag){try{System.out.println("获取锁成功");}finally {lock.unlock();}}}
}
锁在Redis中存储结构如下,其中value代表的是锁重入次数