在多线程高并发场景下,为了保证共享资源的正确性,通常会采用加锁的方式。关于加锁以及一些相关的问题,这里根据个人学习了解的做个汇总。
加锁方式:
- 1、JVM锁
- 1.1 多例模式
- 1.2 事务
- 1.3 集群
- 2、MySQL悲观锁乐观锁
- 2.1 悲观锁
- 2.2 乐观锁
- 3、使用Redis监听命令
- 4、分布式锁
- 4.1 基于redis实现
1、JVM锁
JVM锁是指Java中的一些锁实现:synchronized、ReentrantLock等。这些加锁方式在特定场景下会存在失效问题。
1.1 多例模式
在Spring中默认为单例模式,可以使用@Scope(value=“prototype”)注解变成多例模式。代码示例如下:
@Scope(value="prototype", proxyMode=ScopeProxyMode.TARGET_CLASS)
public class TestService{public synchronized void testMethod(){// 业务操作 如更新用户账户金额}
}
因为synchronized在修饰普通方法时是将当前对象作为锁对象,单例模式下锁有效,但在多例模式下都是不同的对象,锁对象也是不同的,那么也就导致锁失效了。
1.2 事务
在使用@Transcation注解的方法里加锁,也可能会有失效的问题。比如对mysql数据库中账户金额执行更新操作,代码示例如下:
public class TestService{private ReentrantLock lock = new ReentrantLock();@Transcationpublic void testMethod(){lock.lock();try{// 业务操作 如更新用户账户金额}catch(Exception e){....}finally{lock.unlock();}}
}
因为@Transcation是基于动态代理实现的事务管理,原理流程大致分为:
a、开启事务
b、执行方法逻辑(对应上面testMethod方法里面的逻辑,包括加锁、更新账户金额、解锁)
c、无异常提交事务,或者发生异常回滚事务
可以看到在高并发场景下可能存在以下场景:
线程1执行完了b步骤,将账户金额减10,从100改为90,锁也释放了,但是还没来得及执行c步骤提交事务,
此时线程2对同一条数据执行更新操作,因为mysql默认是可重复读隔离级别,对于线程1还没提交的数据线程2是看不到的,那么线程2这时候更新就是基于100去加减,比如也是减10,那么对于线程2就是从100改为90,最后两个线程提交事务,账户金额最终只减了10,而不是20。
1.3 集群
集群下部署的多个应用实例,每个应用实例里面的对象都是自己的,和其他实例都是不同的,类似多例模式,也就无法加锁生效。
可以通过一个sql语句来解决锁失效的问题,sql如下:
update user_account set account=account-10 where id=1 and account>10;
这种方式需要注意锁的范围,查询或更新的条件必须是索引字段,否则会导致锁表,影响其他操作性能
2、MySQL悲观锁乐观锁
2.1 悲观锁
使用select … from … for update语句实现加锁。但是这种方式会存在一些问题:
a、存在性能问题
b、多个线程对多条数据加锁,顺序不一致会出现死锁问题
还需要注意,悲观锁中要使用行级锁查询或更新,那么条件要使用索引字段,要使用具体的值,不能是like、 != 这些操作,否则会导致锁表。
2.2 乐观锁
使用时间戳或者版本号,基于CSA机制实现。
CAS是Compare and Swap的缩写,比较并交换。原理就是使用3个基本操作数:内存地址V,旧的预期值V1,要修改的新值V2,更新一个变量的时候,只有当变量的旧的预期值V1和内存地址V中的值相同的时候,才会将内存地址V中的值更新为新值V2,否则放弃更新做自旋操作。CAS机制参考文章
这种方式存在的一些问题:
a、存在性能问题
b、由于CAS机制会存在ABA问题。
c、读写分离情况下因为主从同步存在延迟,从从库读取的数据可能和主库版本步一致导致更新失败。
总的来看,如果并发不是很高的情况下可以使用乐观锁,但是如果并发量很高的情况下,容易出现冲突导致不断重试,最好选择悲观锁。
3、使用Redis监听命令
watch:监听一个或者多个key
multi:开启事务
exec:执行事务
整个流程就是在开启事务之前监听某个key,然后开启事务,更新数据,最后执行事务。
如果在执行事务之前被监听的这个key被其他线程更改了,那么这里执行事务就会失败,也就是更新失败。
public class TestService{@Autowiredprivate StringRedisTemplate redisTemplate;public void testMethod(){redisTemplate.execute(new SessionCallback<Object>() {@Overridepublic Object execute(RedisOperations operations) throws DataAccessException{//监听keyoperations.watch("testKey");//获取key的值String testValueStr = operations.opsForValue().get("testKey").toString();if(null != testValueStr && testValueStr != ''){Integer testValue = Integer.valueOf(testValueStr);if(testValue > 0){//开启事务operations.multi();//设置key的新值operations.opsForValue().set("testKey",String.valueOf(--testValue));//执行事务List list = operations.exec();if(null == list || list.size() == 0){testMethod();}return list;}}return null;}});}
}
这种方式也存在性能。
4、分布式锁
跨服务、跨进程、跨线程实现共享资源的排他独占使用。可以基于redis实现、基于zookeeper实现、基于mysql实现。
4.1 基于redis实现
使用setnx命令加锁,使用del命令解锁,代码中引入redis包,使用redisTemplate.setIfAbsent()、redisTemplate.delete()方法
public class TestService {@Autowiredprivate StringRedisTemplate redisTemplate;public void testMethod() {// 获取redis锁,获取成功设置过期时间,获取失败再重试while (!redisTemplate.opsForValue().setIfAbsent("testKey", "1", 30, TimeUnit.MILLISECONDS){try {Thread.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}}try {//获取key的值String testValueStr = redisTemplate.opsForValue().get("testKey").toString();if(null != testValueStr && testValueStr != ""){Integer testValue = Integer.valueOf(testValueStr);if(testValue > 0){//设置key的新值redisTemplate.opsForValue().set("testKey",String.valueOf(--testValue));}}}finally {redisTemplate.delete("testKey");}}
}
这种写法需要注意过期时间的设置是否合理,可能存在第一个请求获取到锁业务逻辑还未执行完毕,锁过期了,第二个请求获取到了锁,等第一个请求执行完业务逻辑删除锁的时候将第二个请求的锁给删了,这种误删或导致后续一系列的加锁有问题。
本着解铃还须系铃人的思想解决这个问题,给每个请求的锁一个唯一标识,删除的时候先判断是自己的锁才能删。修改如下:
String uuid = UUID.randomUUID().toString();
// 获取redis锁,如果获取失败再重试
while (!redisTemplate.opsForValue().setIfAbsent("testKey", uuid, 30, TimeUnit.MILLISECONDS)) {....
}
try{....
} finally {if (uuid.equals(redisTemplate.opsForValue().get("testKey"))){ //步骤1redisTemplate.delete("testKey");// 步骤2}
}
- 修改后似乎能确保删的是自己的锁,但是仔细分析会发现finally里面的步骤1和步骤2不是原子操作,如果第一个请求在步骤1判断通过,步骤2还未执行时锁失效了,这时另一个请求获取了锁,那么第一个请求就会把第二个请求的锁给误删了。
- 解决判断和删除操作的原子性问题可以使用Lua脚本,因为Lua脚本可以一次性发送多个指令,而redis又是单线程的,保证这多个执行中间没有其他操作从而保证原子性。Lua脚本相关语法自行搜索,这里修改代码如下:
finally{String script = "if redis.call('get',KEYS[1] == ARGV[1]) " +" then " +" return redis.call('del',KEYS[1]) " +" else " +" return 0 " +" end";redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList("testKey"),uuid);
}
到这里以为锁的问题都解决了,但其实还是可能有问题。方法嵌套调用,比如这个场景:
public void testMethodA() {// 加锁while(!redisTemplate.opsForValue().setIfAbsent("testKey", uuid, 30, TimeUnit.MILLISECONDS)){....}// 业务处理 doSomthing();//调用方法BtestMethodB();// 释放锁redisTemplate.delete("testKey");
}public void testMethodB() {// 加锁while(!redisTemplate.opsForValue().setIfAbsent("testKey", uuid, 30, TimeUnit.MILLISECONDS)){....}// 业务处理 doSomthing();// 释放锁redisTemplate.delete("testKey");
}
由于两个方法一开始都执行加锁操作且是同一个key,那么当testMethodA调用testMethodB的时候,testMethodB等待testMethodA释放锁,testMethodA方法一直未执行完没有释放锁,可能导致死锁问题。这个场景下就需要解决testMethodB的锁可重入性问题。
- 这里可以借助ReentrantLock可重入锁的实现原理来编写Lua脚本实现Redis的可重入锁。
编写一个工具类,参考实现java.util.concurrent.locks.Lock,这里只展示部分代码:
/*** Redis分布式锁工具** @author lyc* @since 2023/8/27 17:05*/
public class MyRedisLock implements Lock {private StringRedisTemplate redisTemplate;// 锁名称private String lockName;// 锁的值,对应每个锁的唯一标识private String uuid;// 过期时间默认30sprivate long expire = 30L;public MyRedisLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {this.redisTemplate = stringRedisTemplate;this.lockName = lockName;this.uuid = uuid + ":" + Thread.currentThread().getId();}/*** 加锁*/@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time != -1) {this.expire = unit.toSeconds(time);}// 加锁Lua脚本:判断锁不存在或者锁存在而且和自己的锁唯一标识一样则将次数+1,返回1表示加锁成功,否则返回0表示加锁失败String script = "if redis.cal('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 " +"then " +" redis.call('hincrby',KEYS[1],ARGV[1],1) " +" redis.call('expire',KEYS[1],ARGV[2]) " +" return 1 " +"else " +" return 0 " +"end";while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {//获取锁失败了等待50ms再重试Thread.sleep(50);}//获取锁成功return true;}/*** 解锁*/@Overridepublic void unlock() {// 解锁Lua脚本:判断锁不存在或锁的唯一标识和自己不一致则解锁失败,否则判断次数是否减为0了,一个加锁对应一个解锁,如果减为0了则表示加锁的已经全部释放了可以删除锁了,否则还不能删除锁。String script = "if redis.call('hexists',KEYS[1],ARGV[1]) == 0 " +"then " +" return nil " +"elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 " +"then " +" return redis.call('del',KEYS[1]) " +"else " +" return 0 " +"end";// nil对应Long类型的null,对应Boolean类型的false,和返回0是一样的无法区分开,所以这里用Long.classLong flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid, String.valueOf(expire));if (null == flag) {throw new IllegalStateException("无法释放不属于自己的锁");}}
}
/*** 对外提供可操作分布式锁的Client* @author lyc* @since 2023/8/27 19:57*/
@Component
public class DistributedLockClient {@Autowiredprivate StringRedisTemplate redisTemplate;private String uuid;public DistributedLockClient() {this.uuid = UUID.randomUUID().toString();}public MyRedisLock getMyRedisLock(String lockName){return new MyRedisLock(redisTemplate,lockName,uuid);}
}
@Service
public class TestService {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate DistributedLockClient distributedLockClient;public void testMethod() {// 获取Redis分布式锁MyRedisLock lock = distributedLockClient.getMyRedisLock("testKey");lock.lock();try {//获取key的值String testValueStr = redisTemplate.opsForValue().get("testKey");if (null != testValueStr && testValueStr != "") {Integer testValue = Integer.valueOf(testValueStr);//调用方法BtestMethodB();if (testValue > 0) {//设置key的新值redisTemplate.opsForValue().set("testKey", String.valueOf(--testValue));}}}finally {lock.unlock();}}public void testMethodB(){LycRedisLock lock = distributedLockClient.getLycRedisLock("testKey");lock.lock();System.out.println("测试Redis可重入锁...");lock.unlock();}
自此,redis分布式锁是否已经完全没问题呢?非也,还有一个点需要解决就是过期时间问题,上面默认30s并不是很合理,因为有些业务方法嵌套调用30s可能并不够,就需要实现分布式锁的过期时间自动续期。
想法是利用定时任务不断的检测时间然后重新设置过期时间,这里借助Java中的Timer来实现,加锁成功后调用方法开启自动续期。部分代码如下:
/*** 过期时间自动续期*/
private void renewExpire() {String script = "if redis.call('hexists',KEYS[1],ARGV[1]) == 1 " +"then " +" return redis.call('expire',KEYS[1],ARGV[2]) " +"else " +" return 0 " +"end";// 开启新的子线程检测锁的过期时间new Timer().schedule(new TimerTask() {@Overridepublic void run() {if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {renewExpire();}}}, expire * 1000 / 3);
}/*** 加锁*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {......while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {......}// 加锁成功后开启自动续期定时任务this.renewExpire();//获取锁成功return true;
}
通过一步步的改进优化,到这里Redis分布式锁才算基本完成,可以满足并发场景下分布式加锁,可重入,自动续期功能。
后续还有基于Zookeep、Mysql实现的分布式锁,再看看该如何实现又会有哪些存在的问题及解决方案。