作者 | tech-bus.七十一
来源 | 程序员巴士
说到锁,在平时的工作中,主要是使用synchronized关键字,或者相关的一些类库来实现同步,但这都是基于单机应用而言的,当我们的应用多实例部署时,这时候就需要用到分布式锁了,常用的分布式锁主要是基于redis的分布式锁和基于zookeeper的分布式锁及基数据库的分布式锁,前俩个主要基于中间件的特性来实现,今天介绍一下基于数据库的分布式锁的实现,在一些并发不高的场景下比较适用。
首先需要在数据库中建立好数据表,相关的字段如下所示:
CREATE TABLE IF NOT EXISTS `lock_tbl`(`lock_id` INT NOT NULL, -- 主键且主要字段不可少`des_one` VARCHAR(20), -- 可有可无`des_two` VARCHAR(20), -- 可有可无PRIMARY KEY ( `lock_id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
接着我们使用单实例应用,编写一个接口,去买一个表里的商品,大致思路就是:读取库存,库存减一,回写数据库,返回成功,其核心代码如下:
public class StockServiceImpl implements StockService{@AutowiredStockMapper stockMapper;@Overridepublic Stock selectByPrimaryKey(Integer goodsId) {return stockMapper.selectByPrimaryKey(goodsId);}// 加锁也只能保证单个实例线程安全性public synchronized void byGoods() throws InterruptedException {// 这里写死,数据库里就一条记录且ID为1,拿到数据Stock stock = selectByPrimaryKey(1);// 获取到商品的库存Long goodsStock = stock.getGoodsStock();// 减库存goodsStock -= 1;stock.setGoodsStock(goodsStock);// 为了将问题放大这里睡上几秒 拉长查库存和更新库存的之间的时间间隔Thread.sleep(3000);// 更新updateByPrimaryKeySelective(stock);// 输出System.out.println("更新后库存为:" + goodsStock);}@Overridepublic int updateByPrimaryKeySelective(Stock record) {return stockMapper.updateByPrimaryKeySelective(record);}
}
在单个实例里面加个synchronized后完全正常的减库存,然后我们启动两个实例后使用postman对接口进行压测,出现如下情况:
通过截图可知上述程序已经出现超卖现象,接下来进行改造,使用数据库层面的锁,我们知道向一张表中插入俩条相同主键的数据,只可能成功一条,因为主键具有约束性,所以利用这个特点,当我们向数据库插入成功时,即代表获取到锁,从而去运行我们的业务代码,当我们的业务代码运行完时,我们把数据库的该条记录进行删除,即代表释放锁,从而其他线程即有机会获取到锁,再去跑业务代码,这样即使运行的是俩个实例,同一时间也只能一个线程去运行业务代码,也就不会出现超卖这种情况了。下面给出加锁和解锁的代码:
// 上锁。由于上锁失败的话会直接返回失败,并不会再次获取
// 是非阻塞的,这里利用循环实现阻塞。@Overridepublic boolean tryLock() {// 这里的Lock就是简单的一个POJO对象映射到数据库中一张表的字段Lock lock = new Lock();lock.setLockId(1);// 通过while循环来实现阻塞while (true) {try {// 首先查询一下主键为1的数据是否存在,如果存在则说明锁已经被占用了if (lockMapper.selectByPrimaryKey(1) == null) {// 不存在则尝试加锁即向数据库中插入数据int i = lockMapper.insert(lock);if (i == 1) {return true;}}Thread.sleep(1000);} catch (InterruptedException e) {}}}// 解锁代码@Overridepublic void unLock() {deleteByPrimaryKey(1);}
对service层的购买商品的代码就进行加锁
// 买商品public void byWithLock() throws InterruptedException {// 上锁lockService.tryLock();// 业务代码byGoods();// 释放锁并跳出循环lockService.unLock();}
对于controller层的代码
@RestController
public class LoadBalance {@AutowiredStockServiceImpl stockService;@RequestMapping("/balance")public String balance() {try {stockService.byWithLock();} catch (InterruptedException e) {e.printStackTrace();}return "success";}
}
再次将程序启动,使用postman简单做下压测,发现已经正常进行减库存了。结果如下图所示
存在的问题
如果有一台实例拿到锁后宕机了,锁未能及时释放,那么其他实例将永远无法获取到锁。
不可重入,一台实例拿到锁后,想再次获取该锁时会失败
如何解决
对于存在实例宕机导致锁无法释放的问题,可以在插入数据的时候将当前的一个时间戳也插入数据库中,然后启一个定时任务,定期去扫表,同时设定一个锁的超时时间(该超时时间一定要大于正常的接口调用时间),将超时的记录进行删除。
对于不可重入,可以在表中插入数据的时候增加实例和线程相关的信息,当获取锁时进行判断,如果相符则直接获取锁。
悲观锁
悲观锁简单理解就是在任何情况下都是悲观的认为请求临界资源的时候都会与其他线程发生冲突,因此每次都是加悲观锁,这种锁具有强烈的侵占性和排他性。上述的例子中所加的锁就是悲观锁即先取锁再访问,MySql自带的悲观锁是For Update,使用For Update可以显示的增加行锁,但悲观锁会让数据库额外的开销,同时增加死锁的风险。
乐观锁
乐观锁简单理解就是每次线程请求临界资源时都认为不会有其他线程与其竞争,只有在数据进行提交的时候才进行竞争,在检测数据冲突时并不依赖数据库本身的锁机制,不影响请求的性能。上述例子我们可以在数据库表中增加一个Version版本号,对于要进行修改的数据,先从数据库中将改Version的版本号查出来,然后修改的时候带上该版本号一起修改
SELECT VERSION FROM TABLE_A -- 假设这里查出来version的值是OldVersion
UPDATE TABLE_A SET COUNT = COUNT -1, VERSION = VERSION + 1 WHERE VERSION = OldVersion
总结
并发不是特别高的情况下可以考虑使用基于数据库的分布式锁,尽量采用乐观锁的方式以提高应用的吞吐量。
往期推荐
为什么大家都在抵制用定时任务实现「关闭超时订单」功能?
Gartner 发布 2022 年汽车行业五大技术趋势
别再用 Redis List 实现消息队列了,Stream 专为队列而生
OpenStack 如何跨版本升级
点分享
点收藏
点点赞
点在看