现在让你写一个Redis分布式锁
大概率你会先写一个框架
public Boolean setIfAbsent(String key, Object value,Long timeout) {try {return Boolean.TRUE.equals(objectRedisTemplate.opsForValue().setIfAbsent(key, value,timeout,TimeUnit.SECONDS));} catch (Exception e) {log.error("", e);return false;}}
private void assessInstance(){InitThreadPoolUtil.execute(() -> {while (true) {try {Boolean isMaster = setIfAbsent(RedisKeyConstant.ASSESS_INSTANCE_ONE, "admin", 2 * 60L);logger.error("执行插入===" + isMaster);if (isMaster) {// ...业务代码略if (redisService.hasKey(RedisKeyConstant.ASSESS_INSTANCE_ONE)) {redisService.del(RedisKeyConstant.ASSESS_INSTANCE_ONE);}}} catch (Exception e) {logger.error("评估规划发送通知失败:", e);}try {Thread.sleep(1000 * 60 * 1);} catch (InterruptedException e) {logger.error("线程休眠异常,异常信息为:", e);}}});}
但是这样就完了吗?
我们来评审一下此代码健壮性:
可以看到这是从线程池中取一个线程去执行该业务代码。那么我给你的场景是处理订单业务,那么你就会面对高并发情况,若某一刻发起了10个订单请求,那么就会有10个线程进入while循环。但是有且仅有一个线程会获取锁,并执行业务代码。其他9个线程会一直等待,一旦有锁释放,这9个线程会立刻抢锁。
我们给redis的锁定义了一个超时时间,某线程获取锁后最多使用 10s,然后必须释放锁。
此外你还知道执行该业务代码最多需要10s。等于你上网时间刚清零你本局游戏刚结束。
这样其他9个线程最多需要10s就可以获取到锁。
所以会出现一种现象,A线程获取到了锁后,开始执行业务代码。其他9个线程会一直重试尝试获取锁,累计10s。为了避免频繁尝试获取锁消耗资源,我们暂时设置线程第一次未获取锁后,需要休眠2s才能重新请求获取锁。这样就降低了这9个线程重试请求锁的频率。
对于用户而言,一个用户的订单正在处理,其他9个用户的订单需要等待10s,推算下来,最后一个用户的订单被处理时,已经等待了90s。如果我是用户,我可不希望等待这么长的时间且无法进行任何操作。
我更希望等待更少的时间,比如20s没反应,我可以继续提交订单。像不像抢演唱会票的过程:进入订单界面,提交的时候一直转圈圈,等待5s后显示订单提交失败,然后你会重新提交订单。
此外,上述代码还有个局限性:提交了10个订单,将会有1个线程执行业务代码,9个线程一直在等待。
执行业务代码的线程生命周期如下:尝试获取锁—>获取锁---->执行业务代码----->等待被自动回收
等待的线程生命周期如下: 休眠—>尝试获取锁—>休眠---->尝试获取锁—>…
可以发现等待的线程是始终无法被自动回收,除非执行完业务代码,操作系统才能判断:该线程已经没有被使用了,可以自动归还到线程池。(线程池自动管理线程的生命周期)
对于用户而言,他等待时间太久。对于系统而言,大量资源被此处占用、消耗。
所以我们必须优化。如何优化呢?
A线程会占用锁10s,其余9个线程会一直等待。现在我要求,一旦发现6s后,锁还没被释放,等待的线程就退出等待。而用户就可以重新提交订单了。
我们来捋一捋:A线程抢到了锁后,(超时时间也就是等待时间未超过6s)B线程先睡眠2s,再重新获取锁失败,(超时时间也就是等待时间未超过6s)再睡眠2s,重新获取锁失败,(超时时间也就是等待时间未超过6s)再睡眠2s,重新获取锁失败,(超时时间也就是等待时间超过6s),不再尝试获取锁,返回信息:订单提交失败。
推理下来,一个用户最多等待10s,变成了最多等待6s。那么10个订单同时提交而最后一个用户只需等待50s。想要再缩短等待时间,可以将超时时间从6s缩短到2s,这样10个订单同时提交而最后一个用户只需等待18s。
当然你也可以将业务处理时间优化,这里不讨论。
代码如下
private void assessInstance(){// 初始时间long startTime = System.currentTimeMillis();InitThreadPoolUtil.execute(() -> {while (true) {try {Boolean isMaster = setIfAbsent(RedisKeyConstant.ASSESS_INSTANCE_ONE, "admin", 2 * 60L);logger.error("执行插入===" + isMaster);if (isMaster) {// ...业务代码略// 尝试超过了设定值之后直接跳出循环,避免上新锁时间过长// 例如A线程上新锁,花费了10s,这10s内B线程无法获取锁,就会一直在循环里重试,设置超时时间为2s,// 一旦B线程重试超过2s就退出循环且生命周期结束。if (System.currentTimeMillis() - startTime > timeout) {return false;}if (redisService.hasKey(RedisKeyConstant.ASSESS_INSTANCE_ONE)) {redisService.del(RedisKeyConstant.ASSESS_INSTANCE_ONE);}}} catch (Exception e) {logger.error("评估规划发送通知失败:", e);}try {Thread.sleep(1000 * 60 * 1);} catch (InterruptedException e) {logger.error("线程休眠异常,异常信息为:", e);}}});}
这是针对高并发场景下以上代码实现Redis锁的问题。有些场景下使用上述代码完全没问题。
例如服务启动后,需要初始化一些数据。单机环境只会执行一次初始化数据,什么都不需要考虑。
若是集群模式,有三个机子。当然只能一台leader机子执行一次初始化数据,其余2个机子不需要执行初始化数据,所以必须上分布式锁,且不存在高并发场景。
上述的代码直接使用了redis的一些原生api,我们尝试将其封装一层供自己使用
/*** 全局锁,包括锁的名称*/
public class Lock {private String name;private String value;public Lock(String name, String value) {this.name = name;this.value = value;}public String getName() {return name;}public String getValue() {return value;}}
搞一个redis分布式锁的工具类
import com.sun.org.slf4j.internal.Logger;
import com.sun.org.slf4j.internal.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.util.concurrent.TimeUnit;/*** 分布式锁*/@Component
public class DistributedLockHandler {private static final Logger logger = LoggerFactory.getLogger(DistributedLockHandler.class);/*** 单个业务持有锁的时间30s,防止死锁*/private final static long LOCK_EXPIRE = 30 * 1000L;/*** 默认30ms尝试一次*/private final static long LOCK_TRY_INTERVAL = 30L;/*** 默认尝试20s*/private final static long LOCK_TRY_TIMEOUT = 20 * 1000L;@Autowiredprivate StringRedisTemplate template;/*** 尝试获取全局锁** @param lock 锁的名称* @return true 获取成功,false获取失败*/public boolean tryLock(Lock lock){return getLock(lock, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE);}/*** 尝试获取全局锁** @param lock 锁的名称* @param timeout 获取超时时间 单位ms* @return true 获取成功,false获取失败*/public boolean tryLock(Lock lock, long timeout) {return getLock(lock, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE);}/*** 尝试获取全局锁** @param lock 锁的名称* @param timeout 获取锁的超时时间* @param tryInterval 多少毫秒尝试获取一次* @param lockExpireTime 锁的过期* @return true 获取成功,false获取失败*/public boolean tryLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) {return getLock(lock, timeout, tryInterval, lockExpireTime);}/*** 操作redis获取全局锁** @param lock 锁的名称* @param timeout 获取的超时时间* @param tryInterval 多少ms尝试一次* @param lockExpireTime 获取成功后锁的过期时间* @return true 获取成功,false获取失败*/public boolean getLock(Lock lock, long timeout, long tryInterval, long lockExpireTime){// 1. 锁名不为空if (StringUtils.isEmpty(lock.getName()) || StringUtils.isEmpty(lock.getValue())) {return false;}// 2. 系统时间long startTime = System.currentTimeMillis();try{do{// 不存在锁,上新锁if (!template.hasKey(lock.getName())) {ValueOperations<String, String> ops = template.opsForValue();ops.setIfAbsent(lock.getName(), lock.getValue(), lockExpireTime, TimeUnit.MILLISECONDS);return true;} else {//已存在锁logger.error("lock is exist!!!");}// 尝试超过了设定值之后直接跳出循环,避免上新锁时间过长// 例如A线程上新锁,花费了10s,这10s内B线程无法获取锁,就会一直在循环里重试,设置超时时间为3s,一旦B线程重试超过3s就退出循环且生命周期结束。if (System.currentTimeMillis() - startTime > timeout) {return false;}// A线程刚获取了锁,B线程等待A线程释放锁Thread.sleep(tryInterval);}while(template.hasKey(lock.getName())); // 3. redis中是否存在锁}catch (Exception e){logger.error(e.getMessage());return false;}return false;}/*** 释放锁*/public void releaseLock(Lock lock){if (!StringUtils.isEmpty(lock.getName())) {template.delete(lock.getName());}}}
测试代码,可以看到这是我们自己封装的最终效果
@RestController
public class testDemo {@Autowiredprivate DistributedLockHandler distributedLockHandler;@RequestMapping("/index")public void index(){Lock lock=new Lock("lynn","min");if (distributedLockHandler.tryLock(lock)) {// 1. 成功获取锁try {//为了演示锁的效果,这里睡眠5000毫秒System.out.println("执行方法");Thread.sleep(5000);}catch (Exception e){e.printStackTrace();}// 2. 释放锁distributedLockHandler.releaseLock(lock);}}}
以上结合业务场景探讨了实现Redis分布式锁时,为何使用线程休眠,超时时间,以及针对超时时间的一些优化方案。
接下来引入一个新的问题:
若定义锁的过期时间是10s,此时A线程获取了锁然后执行业务代码,但是业务代码消耗时间花费了15s。这就会导致A线程还没有执行完业务代码,A线程却释放了锁(因为10s到了),第11s B线程发现锁已经释放,重新获取锁也开始执行业务代码。
此时多个线程同时执行业务代码,我们使用锁就是为了保证仅有一个线程执行这一块业务代码,说明这个锁是失效的!
如何处理这个情况,涉及到了锁延期操作,下一篇文章指出!