一、SpringBoot结合 Redis实现分布式锁
1.1、什么是分布式锁
分布式锁,是在分布式的环境下,才会使用到的一种同步访问机制,在传统的单体环境里面,不存在分布式锁的概念,只有在分布式环境里面,才有分布式锁的概念,那到底什么是分布式锁呢?
现在我们通过一个案例来看下,分布式锁的作用。
- 假设,有一个应用程序,它是分布式部署的,总共有两个应用实例部署在两台机器上面。
- 注意:这两个应用程序是一模一样的,只不过部署的机器不同。
- 现在,假设应用程序中有一个定时任务,是专门用于操作数据库的数据,那么,当定时任务执行的时候,两台机器上面的程序都会同时执行,也就是说,此时,存在两个任务同时操作同一个数据库的数据,如果不加锁,那么数据库的数据就会被操作两次。
- 这显然,可能会出现问题啦,例如:如果是新增数据,那就会插入两条数据,但是实际情况下,我们希望是只有一条数据。
- 要实现这个功能,就需要添加分布式锁,只有获取到锁的那台机器,才能够操作数据库,其他的机器就不能操作。
分布式锁如下图所示:
1.2、如何实现分布式锁
使用Redis数据库实现分布式锁,核心思想就是:
- 第一步:每一个线程访问共享资源之前,都需要向redis里面设置一个分布式锁的key。
- 第二步:如果分布式锁的key已经存在,则说明其他线程已经拿到锁了,那么当前线程就获取锁失败,不用执行。
- 第三步:如果当前线程获取锁成功,则执行具体的业务逻辑代码。
- 第四步:当业务逻辑代码执行完成之后,此时,需要主动将分布式锁key给删除掉,这样,下次访问的时候,其他线程可以有机会获取到锁。
- 注意:删除锁的时候,一定只能够删除当前线程设置的锁,其他线程不能够删除非自身线程的锁。
1.3、分布式锁实现代码
这里是采用SpringBoot结合Redis实现分布式锁,所以,我们需要搭建SpringBoot环境,以及集成Redis数据库。
(1)创建RedisLock工具类
这里我们创建一个RedisLock类,这个类是专门用于加锁、解锁操作的。
- 为了保证加锁、解锁操作的原子性,一般实际开发中,都会采用LUA脚本执行redis命令。
- 注意:这里删除锁的时候,必须只能删除当前线程持有的锁。
- 为什么呢???
- 假设:A线程获取到了锁,开始执行业务代码,由于执行业务代码时间很长,此时,锁已经过期了,redis已经删除了过期的key;
- 如果这个时候,B线程来获取锁,那是可以获取成功的,即:B获取锁成功,此时redis中的锁是B持有的。
- 但是当A线程的业务代码执行完成之后,最后要删除锁,假设,删除锁之前,没有判断这个锁是不是A线程的,就直接删除了,那么此时A线程删除的锁是B线程持有的,从而导致错误。
- 所以,每次删除锁的时候,都需要判断一下,当前的锁是不是自己持有的。
package com.spring.boot.demo.utils;import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.commands.JedisCommands;
import redis.clients.jedis.params.SetParams;import java.util.ArrayList;
import java.util.List;
import java.util.Objects;/*** @author ZhuYouBin* @version 1.0.0* @Date: 2022/11/8 20:41* @Description Redis分布式锁工具类*/
@Component
public class RedisLock {// 定义 RedisTemplate 对象private static RedisTemplate<String, Object> redisTemplate;// 通过构造方法的方式,注入 RedisTemplate 对象public RedisLock(RedisTemplate<String, Object> redisTemplate) {RedisLock.redisTemplate = redisTemplate;}// 创建 LUA 脚本【用于解锁的脚本语句】private static final StringBuilder unlock_lua = new StringBuilder();static {// TODO 这里的 KEYS[1] 表示分布式锁对应的 key 值// TODO 这里的 ARGV[1] 表示当前线程的唯一标识,要和分布式锁对应的 value 值比较是否相等// 查询当前 key 对应的 value 值,是否为当前线程拥有的unlock_lua.append("if redis.call(\"get\", KEYS[1]) == ARGV[1] ");unlock_lua.append("then "); // 当前锁是当前线程拥有的,可以删除分布式锁unlock_lua.append(" return redis.call(\"del\", KEYS[1]) ");unlock_lua.append("else "); // 不是当前线程拥有的锁,删除失败unlock_lua.append(" return 0 ");unlock_lua.append("end ");}/*** 获取分布式锁的操作* @param key 锁对应的 key 值* @param requestId 当前请求线程唯一标识, 作为 value 值【例如:UUID】* @param expire 锁过期时间,单位ms* @return 加锁成功,则返回 true*/public static boolean acquireLock(final String key, final String requestId, final long expire) {try {RedisCallback<String> callback = new RedisCallback<String>() {@Overridepublic String doInRedis(RedisConnection redisConnection) throws DataAccessException {// 获取 redis 命令对象JedisCommands commands = (JedisCommands) redisConnection.getNativeConnection();SetParams params = new SetParams();params.nx(); // 只有键不存在,才能够设置params.px(expire); // 设置过期时间,单位ms// 执行命令【】return commands.set(key, requestId, params);}};// 执行加锁命令Object result = redisTemplate.execute(callback);if (!Objects.isNull(result)) {// 执行结果不等于null,则说明执行成功return true;}} catch (Exception e) {e.printStackTrace();}return false;}/*** 释放当前线程持有的分布式锁* @param key 锁对应的 key 值* @param requestId 当前请求线程唯一标识, 作为 value 值* @return 释放成功,则返回true*/public static boolean releaseLock(final String key, final String requestId) {try {final List<String> keys = new ArrayList<>();keys.add(key);final List<String> args = new ArrayList<>();args.add(requestId);RedisCallback<Long> callback = new RedisCallback<Long>() {@Overridepublic Long doInRedis(RedisConnection redisConnection) throws DataAccessException {Object nativeConnection = redisConnection.getNativeConnection();Jedis jedis = (Jedis) nativeConnection;// 执行解锁脚本语句Object eval = jedis.eval(unlock_lua.toString(), keys, args);return (Long) eval;}};Long ret = (Long) redisTemplate.execute(callback);if (ret != null && ret > 0L) {// 删除成功,解锁成功return true;}} catch (Exception e) {e.printStackTrace();}return false;}}
1.4、编写测试案例
为了演示分布式锁的功能,这里可以编写两个应用程序,写两个定时任务,分别用于模拟同时访问数据库的情况,案例代码如下所示:
(1)案例代码
package com.spring.boot.demo.controller;import com.spring.boot.demo.utils.RedisLock;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;/*** @author ZhuYouBin* @version 1.0.0* @Date: 2022/11/9 22:18* @Description*/
@Component
public class TaskDemo01 {@Scheduled(cron = "0/10 * * * * ?")public void demo01() throws InterruptedException {final String key = "distribute_lock_key";final String requestId = UUID.randomUUID().toString();final long expire = 10000; // 10s过期boolean acquireLock = RedisLock.acquireLock(key, requestId, expire);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String format = sdf.format(new Date());if (acquireLock) {System.out.println(format + ": [TaskDemo01]模拟操作数据库......");// TODO 暂停2秒,释放锁Thread.sleep(2000);RedisLock.releaseLock(key, requestId);} else {System.out.println(format + ": [TaskDemo01]获取锁失败,不执行......");}}}
(2)运行测试
将上面的工程分别启动两个实例,然后查看控制台的输出日志。
到此,Redis实现分布式锁就成功啦。
1.5、Redis分布式锁存在的问题
虽然这种Redis实现分布式锁可以解决大部分的问题,但是仍然可能存在问题:
- 分布式锁过期问题。
- 当某个线程执行业务逻辑代码的时间,超过分布式锁的有效时间,此时其他线程会获取到锁。
- 解决办法:使用redission提供的看门狗进行key续期操作。
- 多个Redis实例情况。
- 当项目中存在多个Redis实例时候,此时这种分布式锁就失效了。
- 解决办法:使用redission实现分布式锁。
这种分布式锁可以说,在大部分情况下足够使用了,但是,当项目中部署了多个redis实例时候,这种分布式锁就失效了,多个redis实例时候,可以使用redis官方推出的RedLock。