分布式锁
分布式锁的基本概念
在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下。但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,JVM之间已经无法通过多线程的锁解决同步问题。那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。
分布式锁在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问。
如上图所示:假如redis中只有1个库存,此时有两个请求同时过来,其中一个请求走到第3步时,redis中的库存还是1(还没走到第4步),那另一个请求检查库存的时候也是1,所以就会导致超卖的现象。
解决方案:用锁把2、3、4步锁了,等他们执行完后,其他线程才可以进来。
如果再增加一台机器:
假设此时两个用户的请求同时到来,但是落在了不同的机器上,那么这两个请求是可以同时执行了,还是会出现库存超卖的问题。为什么呢?因为上图中的两个A系统,运行在两个不同的JVM里面,他们加的锁只对属于自己JVM里面的线程有效,对于其他JVM的线程是无效的。因此,这里的问题是Java提供的原生锁机制在多机部署场景下失效了这是因为两台机器加的锁不是同一个锁(两个锁在不同的JVM里面)。
那么,我们只要保证两台机器加的锁是同一个锁,问题不就解决了吗?此时,就该分布式锁登场了,分布式锁的思路是:在整个系统提供一个全局、唯一的获取锁的“东西”,然后每个系统在需要加锁时,都去问这个“东西”拿到一把锁,这样不同的系统拿到的就可以认为是同一把锁。至于这个“东西”,可以是Redis、Zookeeper,也可以是数据库
基于缓存(Redis、lua脚本等)实现分布式锁
实现步骤:
1、添加Redisson配置类
@Data
@Configuration
@ConfigurationProperties("spring.redis")
public class RedissonConfig {private String host;private String addresses;private String password;private String port;private int timeout = 3000;private int connectionPoolSize = 64;private int connectionMinimumIdleSize=10;private int pingConnectionInterval = 60000;private static String ADDRESS_PREFIX = "redis://";/*** 自动装配**/@BeanRedissonClient redissonSingle() {Config config = new Config();// 判断redis 的host是否为空if(StringUtils.isEmpty(host)){throw new RuntimeException("host is empty");}// 配置host,port等参数SingleServerConfig serverConfig = config.useSingleServer()//redis://127.0.0.1:7181.setAddress(ADDRESS_PREFIX + this.host + ":" + port).setTimeout(this.timeout).setPingConnectionInterval(pingConnectionInterval).setConnectionPoolSize(this.connectionPoolSize).setConnectionMinimumIdleSize(this.connectionMinimumIdleSize);// 判断进入redis 是否密码if(!StringUtils.isEmpty(this.password)) {serverConfig.setPassword(this.password);}// RedissonClient redisson = Redisson.create(config);return Redisson.create(config);}
}
2、业务逻辑层
@Override
public Long submitOrder(OrderSubmitVo orderSubmitVo) {//添加当前用户orderSubmitVo.setUserId(AuthContextHolder.getUserId());// 1.防重:redisString orderNo = orderSubmitVo.getOrderNo();if (StringUtils.isEmpty(orderNo)){throw new GmallException(ResultCodeEnum.ILLEGAL_REQUEST);}//判断锁、释放锁lua脚本String script = "if(redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end";Boolean flag = (Boolean)redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(RedisConst.ORDER_REPEAT + orderNo), orderNo);// if (!flag){// throw new GmallException(ResultCodeEnum.REPEAT_SUBMIT);// }// 2.验库存并锁定库存//2.1普通商品// List<Long> skuIdList = orderSubmitVo.getSkuIdList();List<CartInfo> cartInfoList = cartFeignClient.getCartCheckedList(AuthContextHolder.getUserId());List<CartInfo> commonSkuList = cartInfoList.stream().filter(cartInfo -> cartInfo.getSkuType() == SkuType.COMMON.getCode()).collect(Collectors.toList());if(!CollectionUtils.isEmpty(commonSkuList)) {List<SkuStockLockVo> commonStockLockVoList = commonSkuList.stream().map(item -> {SkuStockLockVo skuStockLockVo = new SkuStockLockVo();skuStockLockVo.setSkuId(item.getSkuId());skuStockLockVo.setSkuNum(item.getSkuNum());return skuStockLockVo;}).collect(Collectors.toList());//是否锁定Boolean isLockCommon = productFeignClient.checkAndLock(commonStockLockVoList, orderSubmitVo.getOrderNo());if (!isLockCommon){throw new GmallException(ResultCodeEnum.ORDER_STOCK_FALL);}}//2.2秒杀商品List<CartInfo> seckillSkuList = cartInfoList.stream().filter(cartInfo -> cartInfo.getSkuType() == SkuType.SECKILL.getCode()).collect(Collectors.toList());if(!CollectionUtils.isEmpty(seckillSkuList)) {List<SkuStockLockVo> seckillStockLockVoList = seckillSkuList.stream().map(item -> {SkuStockLockVo skuStockLockVo = new SkuStockLockVo();skuStockLockVo.setSkuId(item.getSkuId());skuStockLockVo.setSkuNum(item.getSkuNum());return skuStockLockVo;}).collect(Collectors.toList());//是否锁定Boolean isLockSeckill = seckillFeignClient.checkAndMinusStock(seckillStockLockVoList, orderSubmitVo.getOrderNo());if (!isLockSeckill){throw new GmallException(ResultCodeEnum.ORDER_STOCK_FALL);}}// 3.下单Long orderId = null;try {orderId = this.saveOrder(orderSubmitVo, cartInfoList);// 订单正常创建成功的情况下,发送消息定时关单int normalOrderOvertime = orderSetService.getNormalOrderOvertime();//rabbitService.sendDelayMessage(MqConst.EXCHANGE_ORDER_DIRECT, MqConst.ROUTING_CANCEL_ORDER, orderSubmitVo.getOrderNo(), normalOrderOvertime);} catch (Exception e) {e.printStackTrace();// 出现异常立马解锁库存 标记订单时无效订单//rabbitService.sendMessage(MqConst.EXCHANGE_ORDER_DIRECT, MqConst.ROUTING_ROLLBACK_STOCK, orderSubmitVo.getOrderNo());throw new GmallException(ResultCodeEnum.CREATE_ORDER_FAIL);}// 5.异步删除购物车中对应的记录。不应该影响下单的整体流程//rabbitService.sendMessage(MqConst.EXCHANGE_ORDER_DIRECT, MqConst.ROUTING_DELETE_CART, orderSubmitVo.getUserId());//说明:商品价格在此不做校验,我们在购物车里面已经校验,商品价格只会在停售时间更改return orderId;
}
以上那段lua脚本代码的意思:
如果redis有相同orderNo,表示正常提交订单,把redis的orderNo删除;如果redis没有相同的orderNo,表示重复提交了,不能再往后进行。
基于数据库实现分布式锁
核心思想:在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。
实现步骤:
1、创建一个表
DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`method_name` varchar(64) NOT NULL COMMENT '锁定的方法名',`desc` varchar(255) NOT NULL COMMENT '备注信息',`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
2、想要执行某个方法就往这个表中插入一条数据
INSERT INTO method_lock (method_name, desc) VALUES ('methodName', '测试的methodName');
因为我们对method_name做了唯一约束,这里如果有多个请求同时提交到数据库的画,数据库会保证只有一个请求会成功。那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
成功插入则获取锁,执行完成后删除对应的行数据释放锁:
delete from method_lock where method_name ='methodName';
使用基于数据库的这种实现方式很简单,但是对于分布式锁应该具备的条件来说,它有一些问题需要解决及优化:
1、数据库的可用性和性能直接会影响分布式锁的可用性和性能。所以数据库需要双机部署,数据同步,主备切换很麻烦;
2、不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,所以,需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;
3、没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;
4、不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。
5、在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。
基于ZooKeeper的实现方式
1、zookeeper中规定,在同一时刻,不能有多个客户端创建同一个节点,我们可以利用这个特性实现分布式锁。zookeeper临时节点只在session生命周期存在,session一结束会自动销毁。
2、watcher机制,在代表锁资源的节点被删除,即可以触发watcher解除阻塞重新去获取锁,这也是zookeeper分布式锁较其他分布式锁方案的一大优势。
1、排它锁
排他锁,又称写锁或独占锁。如果事务T1对数据对象O1加上了排他锁,那么在整个加锁期间,只允许事务T1对O1进行读取或更新操作,其他任务事务都不能对这个数据对象进行任何操作,直到T1释放了排他锁。
排他锁核心是保证当前有且仅有一个事务获得锁,并且锁释放之后,所有正在等待获取锁的事务都能够被通知到。
Zookeeper 的强一致性特性,能够很好地保证在分布式高并发情况下节点的创建一定能够保证全局唯一性,即Zookeeper将会保证客户端无法重复创建一个已经存在的数据节点。可以利用Zookeeper这个特性,实现排他锁。
1️⃣定义锁:通过Zookeeper上的数据节点来表示一个锁
2️⃣获取锁:客户端通过调用 create 方法创建表示锁的临时节点,可以认为创建成功的客户端获得了锁,同时可以让没有获得锁的节点在该节点上注册Watcher监听,以便实时监听到lock节点的变更情况
3️⃣释放锁:以下两种情况都可以让锁释放
1、当前获得锁的客户端发生宕机或异常,那么Zookeeper上这个临时节点就会被删除
2、正常执行完业务逻辑,客户端主动删除自己创建的临时节点
常见分布式锁方案对比
2、共享锁
共享锁,又称读锁。如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁,直到该数据对象上的所有共享锁都被释放。
共享锁与排他锁的区别在于,加了排他锁之后,数据对象只对当前事务可见,而加了共享锁之后,数据对象对所有事务都可见。
1️⃣定义锁:通过Zookeeper上的数据节点来表示一个锁,是一个类似于 /lockpath/[hostname]-请求类型-序号 的临时顺序节点
2️⃣获取锁:客户端通过调用 create 方法创建表示锁的临时顺序节点,如果是读请求,则创建 /lockpath/[hostname]-R-序号 节点,如果是写请求则创建 /lockpath/[hostname]-W-序号节点
3️⃣判断读写顺序:大概分为4个步骤
1)创建完节点后,获取 /lockpath 节点下的所有子节点,并对该节点注册子节点变更的Watcher监听
2)确定自己的节点序号在所有子节点中的顺序
3.1)对于读请求:1. 如果没有比自己序号更小的子节点,或者比自己序号小的子节点都是读请求,那么表明自己已经成功获取到了共享锁,同时开始执行读取逻辑 2. 如果有比自己序号小的子节点有写请求,那么等待
3.2)对于写请求,如果自己不是序号最小的节点,那么等待
4)接收到Watcher通知后,重复步骤1)
4️⃣释放锁:与排他锁逻辑一致
基于Zookeeper实现共享锁流程: