Redis分布式锁—Redisson+RLock可重入锁实现篇

前言

平时的工作中,由于生产环境中的项目是需要部署在多台服务器中的,所以经常会面临解决分布式场景下数据一致性的问题,那么就需要引入分布式锁来解决这一问题。

针对分布式锁的实现,目前比较常用的就如下几种方案:

  1. 基于数据库实现分布式锁
  2. 基于 Redis 实现分布式锁 【本文】
  3. 基于 Zookeeper 实现分布式锁

接下来这个系列文章会跟大家一块探讨这三种方案,本篇为 Redis 实现分布式锁篇。

Redis分布式环境搭建推荐:基于Docker的Redis集群搭建

Redis分布式锁一览

说到 Redis 锁,能搜到的,或者说常用的无非就下面这两个:

  • setNX + Lua脚本
  • Redisson + RLock可重入锁 【本文】

接下来我们一一探索这两个的实现,本文为 Redisson + RLock可重入锁 实现篇。

1、setNX+Lua实现方式

跳转链接:https://www.cnblogs.com/niceyoo/p/13711149.html

2、Redisson介绍

Redisson 是 java 的 Redis 客户端之一,是 Redis 官网推荐的 java 语言实现分布式锁的项目。

Redisson 提供了一些 api 方便操作 Redis。因为本文主要以锁为主,所以接下来我们主要关注锁相关的类,以下是 Redisson 中提供的多样化的锁:

  • 可重入锁(Reentrant Lock)
  • 公平锁(Fair Lock)
  • 联锁(MultiLock)
  • 红锁(RedLock)
  • 读写锁(ReadWriteLock)
  • 信号量(Semaphore) 等等

总之,管你了解不了解,反正 Redisson 就是提供了一堆锁… 也是目前大部分公司使用 Redis 分布式锁最常用的一种方式。

本文中 Redisson 分布式锁的实现是基于 RLock 接口,而 RLock 锁接口实现源码主要是 RedissonLock 这个类,而源码中加锁、释放锁等操作都是使用 Lua 脚本来完成的,并且封装的非常完善,开箱即用。

接下来主要以 Redisson 实现 RLock 可重入锁为主。

代码中实现过程

一起来看看在代码中 Redisson 怎么实现分布式锁的,然后再对具体的方法进行解释。

源码地址:https://github.com/niceyoo/redis-redlock

篇幅限制,文中代码不全,请以上方源码链接为主。

代码大致逻辑:首先会涉及数据库 2 个表,order2(订单表)、stock(库存表),controller层会提供一个创建订单的接口,创建订单之前,先获取 RedLock 分布式锁,获取锁成功后,在一个事务下减库存,创建订单;最后通过创建大于库存的并发数模拟是否出现超卖的情况。

代码环境:SpringBoot2.2.2.RELEASE + Spring Data JPA + Redisson

1)Maven 依赖 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.2.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>redis-redlock</artifactId><version>0.0.1-SNAPSHOT</version><name>redis-redlock</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency><!-- redisson --><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.11.1</version></dependency><!-- Gson --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency><!-- JPA --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><!-- Mysql Connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.48</version></dependency><!-- 数据库连接池 --><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.20</version></dependency><!-- Hutool工具包 --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>4.6.8</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

redisson、MySQL 等相关依赖。

2)application.yml 配置文件
server:port: 6666servlet:context-path: /spring:# 数据源datasource:url: jdbc:mysql://127.0.0.1:3306/redis_demo?useUnicode=true&characterEncoding=utf-8&useSSL=falseusername: rootpassword: 123456type: com.alibaba.druid.pool.DruidDataSourcedriverClassName: com.mysql.jdbc.DriverlogSlowSql: truejpa:# 显示sqlshow-sql: false# 自动生成表结构generate-ddl: truehibernate:ddl-auto: updateredis:redis:cluster:nodes: 10.211.55.4:6379, 10.211.55.4:6380, 10.211.55.4:6381lettuce:pool:min-idle: 0max-idle: 8max-active: 20# 日志
logging:# 输出级别level:root: infofile:# 指定路径path: redis-logs# 最大保存天数max-history: 7# 每个文件最大大小max-size: 5MB

配置redis,指定数据库地址。

3)Redisson配置类 RedissonConfig.java
/*** redisson配置类*/
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useClusterServers().setScanInterval(2000).addNodeAddress("redis://10.211.55.4:6379", "redis://redis://10.211.55.4:6380").addNodeAddress("redis://redis://10.211.55.4:6381");RedissonClient redisson = Redisson.create(config);return redisson;}}
4)StockServerImpl 库存实现类,其他参考源码
import com.example.redisredlock.bean.Stock;
import com.example.redisredlock.dao.StockDao;
import com.example.redisredlock.server.StockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Slf4j
@Service
@Transactional
public class StockServerImpl implements StockService {@Autowiredprivate StockDao stockDao;@Overridepublic StockDao getRepository() {return stockDao;}/*** 减库存** @param productId* @return*/@Overridepublic boolean decrease(String productId) {Stock one = stockDao.getOne(productId);int stockNum = one.getStockNum() - 1;one.setStockNum(stockNum);stockDao.saveAndFlush(one);return true;}
}

库存实现类,就一个接口,完成对库存的-1操作。

5)OrderServerImpl 订单实现类(核心代码)
package com.example.redisredlock.server.impl;import com.example.redisredlock.bean.Order;
import com.example.redisredlock.dao.OrderDao;
import com.example.redisredlock.server.OrderServer;
import com.example.redisredlock.server.StockService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;@Slf4j
@Service
@Transactional
public class OrderServerImpl implements OrderServer {/*** 库存service*/@Resourceprivate StockService stockService;/*** 订单order dao*/@Resourceprivate OrderDao orderDao;@Overridepublic OrderDao getRepository() {return orderDao;}@Resourceprivate RedissonClient redissonClient;@Override@Transactional(rollbackFor = Exception.class)public boolean createOrder(String userId, String productId) {//  如果不加锁,必然超卖RLock lock = redissonClient.getLock("stock:" + productId);try {lock.lock(10, TimeUnit.SECONDS);int stock = stockService.get(productId).getStockNum();log.info("剩余库存:{}", stock);if (stock <= 0) {return false;}String orderNo = UUID.randomUUID().toString().replace("-", "").toUpperCase();if (stockService.decrease(productId)) {Order order = new Order();order.setUserId(userId);order.setProductId(productId);order.setOrderNo(orderNo);Date now = new Date();order.setCreateTime(now);order.setUpdateTime(now);orderDao.save(order);return true;}} catch (Exception ex) {log.error("下单失败", ex);} finally {lock.unlock();}return false;}
}
6)Order 订单实体类
@Data
@Entity
@Table(name = "order2")
public class Order extends BaseEntity {private static final long serialVersionUID = 1L;/*** 订单编号*/private String orderNo;/*** 下单用户id*/private String userId;/*** 产品id*/private String productId;}
7)Stock 库存实体类
@Data
@Entity
@Table(name = "stock")
public class Stock extends BaseEntity {private static final long serialVersionUID = 1L;/*** 用产品id,设置为库存id*//*** 库存数量*/private Integer stockNum;}
8)OrderController 订单接口
package com.example.redisredlock.controller;import com.example.redisredlock.bean.Order;
import com.example.redisredlock.server.OrderServer;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;/*** @author niceyoo*/
@RestController
@RequestMapping("/order")
public class OrderController {@Resourceprivate OrderServer orderServer;@PostMapping("/createOrder")public boolean createOrder(Order order) {return orderServer.createOrder(order.getUserId(), order.getProductId());}}

表结构说明及接口测试部分

因为项目中使用 Spring Data JPA,所以会自动创建数据库表结构,大致为:

stock(库存表)

id(商品id)stock_num(库存数量)create_time(创建时间)update_time(更新时间)
1234100xxxxxxxx

order2(订单表)

id(订单id)order_no(订单号)user_id(用户id)product_id(商品id)
xxxxxxxxxxxx1234

如下是详细表结构+数据:

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for order2
-- ----------------------------
DROP TABLE IF EXISTS `order2`;
CREATE TABLE `order2` (`id` varchar(64) NOT NULL,`create_time` datetime(6) DEFAULT NULL,`update_time` datetime(6) DEFAULT NULL,`order_no` varchar(255) DEFAULT NULL,`user_id` varchar(64) DEFAULT NULL,`product_id` varchar(64) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ----------------------------
-- Table structure for stock
-- ----------------------------
DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock` (`id` varchar(255) NOT NULL,`create_time` datetime(6) DEFAULT NULL,`update_time` datetime(6) DEFAULT NULL,`stock_num` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ----------------------------
-- Records of stock
-- ----------------------------
BEGIN;
INSERT INTO `stock` VALUES ('1234', '2020-09-21 21:38:09.000000', '2020-09-22 08:32:17.883000', 0);
COMMIT;SET FOREIGN_KEY_CHECKS = 1;

创建订单的过程就是消耗库存表 stock_num 的过程,如果没有分布式锁的情况下,在高并发下很容易出现商品超卖的情况,所以引入了分布式锁的概念,如下是在库存100,并发1000的情况下,测试超卖情况:

JMeter 模拟进程截图

JMeter 调用接口截图

stock 库存表截图

订单表截图

加了锁之后并没有出现超卖情况。

核心代码说明

整个 demo 核心代码在创建订单 createOrder() 加锁的过程,如下:

@Override
@Transactional(rollbackFor = Exception.class)
public boolean createOrder(String userId, String productId) {//  如果不加锁,必然超卖RLock lock = redissonClient.getLock("stock:" + productId);try {lock.lock(10, TimeUnit.SECONDS);int stock = stockService.get(productId).getStockNum();log.info("剩余库存:{}", stock);if (stock <= 0) {return false;}String orderNo = UUID.randomUUID().toString().replace("-", "").toUpperCase();if (stockService.decrease(productId)) {Order order = new Order();order.setUserId(userId);order.setProductId(productId);order.setOrderNo(orderNo);Date now = new Date();order.setCreateTime(now);order.setUpdateTime(now);orderDao.save(order);return true;}} catch (Exception ex) {log.error("下单失败", ex);} finally {lock.unlock();}return false;
}

去除业务逻辑,加锁框架结构为:

RLock lock = redissonClient.getLock("xxx");lock.lock();try {...
} finally {lock.unlock();
}

关于 RedLock 中的方法

因为 RLock 本身继承自 Lock 接口,如下分为两部分展示:

public interface RLock extends Lock, RLockAsync {//----------------------Lock接口方法-----------------------/*** 加锁 锁的有效期默认30秒*/void lock();/*** tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .*/boolean tryLock();/*** tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,* 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。** @param time 等待时间* @param unit 时间单位 小时、分、秒、毫秒等*/boolean tryLock(long time, TimeUnit unit) throws InterruptedException;/*** 解锁*/void unlock();/*** 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过* Thread.currentThread().interrupt(); 方法真正中断该线程*/void lockInterruptibly();//----------------------RLock接口方法-----------------------/*** 加锁 上面是默认30秒这里可以手动设置锁的有效时间** @param leaseTime 锁有效时间* @param unit      时间单位 小时、分、秒、毫秒等*/void lock(long leaseTime, TimeUnit unit);/*** 这里比上面多一个参数,多添加一个锁的有效时间** @param waitTime  等待时间* @param leaseTime 锁有效时间* @param unit      时间单位 小时、分、秒、毫秒等*/boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;/*** 检验该锁是否被线程使用,如果被使用返回True*/boolean isLocked();/*** 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)* 这个比上面那个实用*/boolean isHeldByCurrentThread();/*** 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间* @param leaseTime  锁有效时间* @param unit       时间单位 小时、分、秒、毫秒等*/void lockInterruptibly(long leaseTime, TimeUnit unit);  
}

1、加锁

首先重点在 getLock() 方法,到底是怎么拿到分布式锁的,我们点进该方法:

public RLock getLock(String name) {return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}

调用 getLock() 方法后实际返回一个 RedissonLock 对象,此时就有点呼应了,文章前面提到的 Redisson 普通的锁实现源码主要是 RedissonLock 这个类,而源码中加锁、释放锁等操作都是使用 Lua 脚本来完成的,封装的非常完善,开箱即用。

在 RedissonLock 对象中,主要实现 lock() 方法,而 lock() 方法主要调用 tryAcquire() 方法:

tryAcquire() 方法又继续调用 tryAcquireAsync() 方法:

到这,由于 leaseTime == -1,于是又调用 tryLockInnerAsync()方法,感觉有点无限套娃那种感觉了…

咳咳,不过这个方法是最关键的了:

这个方法就有点意思了,看到了一些熟悉的东西,还记得上一篇里的 Lua 脚本吗?

我们来分析一下这个部分的 Lua 脚本:

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

脚本里,一共是有两个参数 KEYS[1]、通过后面的参数可以得知: KEYS[1] 为 getName(),ARGV[2] 为 getLockName(threadId)。

假设传递加锁参数时传入的 name 值为 “niceyoo”,

假设线程调用的 ID 为 thread-1,

假设 RedissonLock 类的成员变量 UUID 类型的 id 值为 32063ed-98522fc-80287ap,

结合 getLockName(threadId)) 方法:

protected String getLockName(long threadId) {return this.id + ":" + threadId;
}

即,KEYS[1] = niceyoo,ARGV[2] = 32063ed-98522fc-80287ap:thread-1

然后将假设值带入语句中:

  1. 判断是否存在名为 “niceyoo” 的 key;
  2. 如果没有,则在其下设置一个字段为 “32063ed-98522fc-80287ap:thread-1”,值为 1 的键值对 ,并设置它的过期时间,也就是第一个 if 语句体;
  3. 如果存在,则进一步判断 “32063ed-98522fc-80287ap:thread-1” 是否存在,若存在,则其值加 1,并重新设置过期时间,这个过程可以看做锁重入;
  4. 返回 “niceyoo” 的生存时间(毫秒);

如果放在锁这个场景下就是,key 表示当前线程名称,argv 为当前获得锁的线程,所有竞争这把锁的线程都要判断这个 key 下有没有自己的,也就是上边那些 if 判断,如果没有就不能获得锁,如果有,则进入重入锁,字段值+1。

2、解锁

解锁调用的是 unlockInnerAsync() 方法:

该方法同样还是调用的 Lua 脚本实现的。

同样还是假设 name=niceyoo,假设线程 ID 是 thread-1

同理,我们可以得到:

KEYS[1] 是 getName(),即 KEYS[1]=niceyoo,

KEYS[2] 是 getChannelName(),即 KEYS[2]=redisson_lock__channel:{niceyoo},

ARGV[1] 是 LockPubSub.unlockMessage,即ARGV[1]=0,

ARGV[2] 是生存时间,

ARGV[3] 是 getLockName(threadId),即 ARGV[3]=32063ed-98522fc-80287ap:thread-1

因此,上面脚本的意思是:

  1. 判断是否存在 name 为 “niceyoo” 的key;

  2. 如果不存在,向 Channel 中广播一条消息,广播的内容是0,并返回1

  3. 如果存在,进一步判断字段 32063ed-98522fc-80287ap:thread-1 是否存在

  4. 若字段不存在,返回空,若字段存在,则字段值减1

  5. 若减完以后,字段值仍大于0,则返回0

  6. 减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;

可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了。

3、加锁解锁小结


4、其他补充

4.1 lock() 方法

通常在获得 RLock 时,需要调用 lock() 方法,那么设置过期时间跟不设置有啥区别:

RLock lock = redissonClient.getLock("xxx");/*最常见的使用方法*/
lock.lock();

如果没有设置过期时间,默认还是会有一个30秒的过期时间,等价于:

RLock lock = redissonClient.getLock("xxx");/*支持过期解锁,30秒之后自动释放锁,无须调用unlock方法手动解锁*/
lock.lock(30, TimeUnit.SECONDS);
4.1 tryLock() 方法

有的小伙在在获取分布式锁时,使用的是 tryLock() 方法,跟 lock() 方法有啥区别:

RLock lock = redissonClient.getLock("xxx");/*尝试加锁,最多等待10秒,上锁以后10秒自动解锁,返回true表示加锁成功*/
if(lock.tryLock(10,10, TimeUnit.SECONDS)){xxx
}

首先我们来看一下 tryLock() 方法源码:

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(leaseTime, unit, threadId);//1、 获取锁同时获取成功的情况下,和lock(...)方法是一样的 直接返回True,获取锁False再往下走if (ttl == null) {return true;}//2、如果超过了尝试获取锁的等待时间,当然返回false 了。time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(threadId);return false;}// 3、订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);//  阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。//  只有await返回true,才进入循环尝试获取锁if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {@Overridepublic void operationComplete(Future<RedissonLockEntry> future) throws Exception {if (subscribeFuture.isSuccess()) {unsubscribe(subscribeFuture, threadId);}}});}acquireFailed(threadId);return false;}//4、如果没有超过尝试获取锁的等待时间,那么通过While一直获取锁。最终只会有两种结果//1)、在等待时间内获取锁成功 返回true。2)等待时间结束了还没有获取到锁那么返回false。while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(leaseTime, unit, threadId);// 获取锁成功if (ttl == null) {return true;}//   获取锁失败time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(threadId);return false;}}
}

tryLock() 方法是申请锁并返回锁有效期还剩的时间,如果为空说明锁未被其他线程申请,那么就直接获取锁并返回,如果获取到时间,则进入等待竞争逻辑。

tryLock() 方法一般用于特定满足需求的场合,但不建议作为一般需求的分布式锁,一般分布式锁建议用 lock(long leaseTime, TimeUnit unit) 方法。因为从性能上考虑,在高并发情况下后者效率是前者的好几倍。

Redis分布式锁的缺点

在上一节中我们提到了 「setNX+Lua脚本」实现分布式锁在集群模式下的缺陷,

我们再来回顾一下,通常我们为了实现 Redis 的高可用,一般都会搭建 Redis 的集群模式,比如给 Redis 节点挂载一个或多个 slave 从节点,然后采用哨兵模式进行主从切换。但由于 Redis 的主从模式是异步的,所以可能会在数据同步过程中,master 主节点宕机,slave 从节点来不及数据同步就被选举为 master 主节点,从而导致数据丢失,大致过程如下:

  1. 用户在 Redis 的 master 主节点上获取了锁;
  2. master 主节点宕机了,存储锁的 key 还没有来得及同步到 slave 从节点上;
  3. slave 从节点升级为 master 主节点;
  4. 用户从新的 master 主节点获取到了对应同一个资源的锁,同把锁获取两次。

ok,然后为了解决这个问题,Redis 作者提出了 RedLock 算法,步骤如下(五步):

在下面的示例中,我们假设有 5 个完全独立的 Redis Master 节点,他们分别运行在 5 台服务器中,可以保证他们不会同时宕机。

  1. 获取当前 Unix 时间,以毫秒为单位。
  2. 依次尝试从 N 个实例,使用相同的 key 和随机值获取锁。在步骤 2,当向 Redis 设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以避免服务器端 Redis 已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个 Redis 实例。
  3. 客户端使用当前时间减去开始获取锁时间(步骤 1 记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是 3 个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
  4. 如果取到了锁,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。
  5. 如果因为某些原因,获取锁失败(没有在至少 N/2+1 个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些 Redis 实例根本就没有加锁成功)。

到这,基本看出来,只要是大多数的 Redis 节点可以正常工作,就可以保证 Redlock 的正常工作。这样就可以解决前面单点 Redis 的情况下我们讨论的节点挂掉,由于异步通信,导致锁失效的问题。

但是细想后, Redlock 还是存在如下问题:

假设一共有5个Redis节点:A, B, C, D, E。设想发生了如下的事件序列:

  1. 客户端1成功锁住了A, B, C,获取锁成功(但D和E没有锁住)。
  2. 节点C崩溃重启了,但客户端1在C上加的锁没有持久化下来,丢失了。
  3. 节点C重启后,客户端2锁住了C, D, E,获取锁成功。
  4. 这样,客户端1和客户端2同时获得了锁(针对同一资源)。

哎,还是不能解决故障重启后带来的锁的安全性问题…

针对节点重后引发的锁失效问题,Redis 作者又提出了 延迟重启 的概念,大致就是说,一个节点崩溃后,不要立刻重启他,而是等到一定的时间后再重启,等待的时间应该大于锁的过期时间,采用这种方式,就可以保证这个节点在重启前所参与的锁都过期,听上去感觉 延迟重启 解决了这个问题…

但是,还是有个问题,节点重启后,在等待的时间内,这个节点对外是不工作的。那么如果大多数节点都挂了,进入了等待,就会导致系统的不可用,因为系统在过期时间内任何锁都无法加锁成功…

巴拉巴拉那么多,关于 Redis 分布式锁的缺点显然进入了一个无解的步骤,包括后来的 神仙打架事件(Redis 作者 antirez 和 分布式领域专家 Martin Kleppmann)…

总之,首先我们要明确使用分布式锁的目的是什么?

无外乎就是保证同一时间内只有一个客户端可以对共享资源进行操作,也就是共享资源的原子性操作。

总之,在 Redis 分布式锁的实现上还有很多问题等待解决,我们需要认识到这些问题并清楚如何正确实现一个 Redis 分布式锁,然后在工作中合理的选择和正确的使用分布式锁。

目前我们项目中也有在用分布式锁,也有用到 Redis 实现分布式锁的场景,然后有的小伙伴就可能问,啊,你们就不怕出现上边提到的那种问题吗~

其实实现分布式锁,从中间件上来选,也有 Zookeeper 可选,并且 Zookeeper 可靠性比 Redis 强太多,但是效率是低了点,如果并发量不是特别大,追求可靠性,那么肯定首选 Zookeeper。

如果是为了效率,就首选 Redis 实现。

好了,之后一起探索 Zookeeper 实现分布式锁。

最近换工作了,一切都是重新开始,新的系统、新的人、新的业务… 未来路很长,大家一起加油。

我的博客园:https://www.cnblogs.com/niceyoo

本文首发于博客园:https://www.cnblogs.com/niceyoo/p/13736140.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/413904.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

angular安装记录

1. 安装node.js&#xff0c;下载地址&#xff1a;https://nodejs.org/en/download/&#xff0c;详细的安装教程参考这里&#xff1a;https://blog.csdn.net/u010255310/article/details/52205132 直接一路next就可以。安装好node后&#xff0c;会自动在path中配置了node的安装路…

Docker (1) 基本概念和安装

Docker简介 什么是容器&#xff1f; 一种虚拟化的方案&#xff0c;操作系统级别的虚拟化。容器是一个轻量的、独立的、可执行的包&#xff0c;包含了执行它所需要的所有东西&#xff1a;代码、运行环境、系统工具、系统库、设置。很长一段时间中&#xff0c;容器是专门用于Linu…

vue获取浏览器地址栏参数(?及/)路由+非路由实现方式

1、? 参数 浏览器参数形式&#xff1a;http://javam4.com/m4detail?id1322914793170014208 1.1、路由取参方式 this.$route.query.id前端跳转方式&#xff1a; 一、onclick方式 <a title"测试数据"click"test(row.id)"target"_blank"&g…

python3.X 使用pip 离线安装whl包(转载)

转载https://blog.csdn.net/wangyaninglm/article/details/54177720 0. 绪论 断网的环境下配置python开发环境非常讨厌&#xff0c;本文旨在优雅暴力的解决这一问题。 生产环境 &#xff1a; windows 7 windows10 python 3.5.2 pip 1.5.2 友情提示&#xff1a;出现问题时候&…

PHP文件下载

步骤 //流的方式发送给浏览器 header("Content-Type: application/octet-stream"); //按照字节的返回给浏览器 header("Accept-Ranges: bytes"); //告诉浏览器文件的大小 header("Accept-Length: ".filesize(文件地址)); //以附件的形式发送给浏…

面试官:了解二叉树吗,平衡二叉树,红黑树?

前言 面试过程中&#xff0c;多多少少会问一点数据结构&#xff08;二叉树&#xff09;的问题&#xff0c;今天我们来复习一下二叉树的相关问题&#xff0c;文末总结。 1. 二叉树的由来 在 jdk1.8 之前&#xff0c;HashMap 的数据结构由「数组链表」组成&#xff0c;数组是 …

Jenkins: QQ/Wechat机器人群消息通知Job构建结果

简介 Jenkins是持续化集成的一个核心部件&#xff0c;它上游从仓库&#xff08;gitlab&#xff09;等拉取代码&#xff0c;经编译构建&#xff0c;将应用发布至下游目标环境&#xff1b;构建结果通知的方式有很多&#xff0c;现成的插件有邮件和钉钉方式&#xff0c;但是就方便…

Java中的Set对象去重

前言部分 Set<T> 去重相信大家一定不陌生&#xff0c;尤其是在 Set<String>、Set<Integer> 等等&#xff0c;但是在使用 Set<实体> &#xff0c;在不重写 equals()、hashCode() 方法情况下&#xff0c;直接使用貌似并不能生效。 所以想要 Set<实体…

openfalcon架构及相关服务配置详解

一&#xff1a;openfalcon组件 1.falcon-agent 数据采集组件 agent内置了一个http接口&#xff0c;会自动采集预先定义的各种采集项&#xff0c;每隔60秒&#xff0c;push到transfer。 2.transfer agent与transfer建立长连接&#xff0c;将数据汇报给tarnsfer transfer默认监听…

DBeaver连接达梦|虚谷|人大金仓等国产数据库

前言 工作中有些项目可能会接触到「达梦、虚谷、人大金仓」等国产数据库&#xff0c;但通常这些数据库自带的连接工具使用并不方便&#xff0c;所以这篇文章记录一下 DBeaver 连接国产数据库的通用模版&#xff0c;下文以达梦为例&#xff08;其他国产数据库连接操作方式一样&…

Map<String,Object>接收参数,Long类型降级为Integer,报类型转换异常

前言 今天看群里小伙伴问了一个非常有意思的问题&#xff1a; 使用 Map<String,Object> 对象接收前端传递的参数&#xff0c;在后端取参时&#xff0c;因为接口文档中明确该字段类型为 Long &#xff0c;所以对接收的参数进行了强转&#xff0c;即 (Long)参数 &#xf…

adb无法连接安卓手机

确保已安装好手机驱动&#xff08;在设备管理器中能找到安卓的设备&#xff09;查看设备的VID信息找到你的模拟器存放的目录&#xff0c;<例如&#xff1a;C:\Users\Administrator\.android 下找到或新建一个adb_usb.ini文件。文档内容写入VID即可 在cmd上输入adb kill-serv…

SpringBoot中的Tomcat是如何启动的

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency>添加如上 Web 的依赖&#xff0c;Spring Boot 就帮我们内置了 Servlet 容器&#xff0c;默认使用的是 Tomcat&a…

IDEA社区版(Community)和付费版(UItimate)的区别

比对类型Ultimate(终极版,付费)Community(社区版,免费)语言支持JavaJavaGroovyGroovyKotlinKotlinScala&#xff08;通过插件&#xff09;Scala&#xff08;通过插件&#xff09;Python 和 Jython&#xff08;通过插件&#xff09;Python 和 Jython&#xff08;通过插件&#x…

从使用传统Web框架到切换到Spring Boot后的总结

1、前言 其实我接触 Spring Boot 的时间并不长&#xff0c;所以还算一个初学者&#xff0c;这篇文章也算是我对 Spring Boot 学习以及使用过程中的复盘&#xff0c;如果文章出现描述错误或表达不清晰的地方&#xff0c;欢迎大家在评论区留言互动。 没想到 Spring Boot 这两年…

前端学习(2859):简单秒杀系统学习之前端界面布局

<html><head><meta charset"utf-8"><link rel"stylesheet" type"text/css" href"miao.min.css" charset"utf-8"><title>秒杀系统</title></head><body><div class"…

Spring Boot 项目瘦身指南,瘦到不可思议!129M->1.3M

之前在 从使用传统Web框架到切换到Spring Boot后的总结 中提到关于 Spring Boot 编译打包&#xff0c;Spring Boot 应用程序不用额外部署到外部容器中&#xff0c;可以直接通过 Maven 命令将项目编译成可执行的 jar 包&#xff0c;然后通过 java -jar 命令启动即可&#xff0c;…

基于Docker搭建RabbitMQ(多图)

1、一点废话&#xff08;可直接跳转至标题2&#xff09; 通常在拉取镜像之前&#xff0c;除了通过命令执行 docker search xxx 之外&#xff0c;我们还可以通过 Docker 镜像仓库查询指定的镜像。 如下是 rabbitmq 镜像的搜索结果&#xff1a; 一般拉取下载数 stars 数相对比…