Java基于数据库的分布式可重入锁(带等待时间和过期时间)

文章目录

  • 技术背景介绍
  • 代码实现
    • 数据库表结构
    • 尝试获取锁
    • 续约
    • 阻塞式获取锁
    • 解锁
    • 检查锁是否过期或者释放
  • 使用示例
  • 优化方案

项目代码

技术背景介绍

一般分布式锁使用最方便的就是使用redis实现,因为他自带超时过期机制、发布订阅模式、高吞吐高性能的优势,但是有些项目里只有mysql数据库,很多数据库都是没有数据超时过期机制和发布订阅模式的,当然也不是所有的,这里我只针对mysql数据库作为基础组件。

代码实现

数据库表结构

DROP TABLE IF EXISTS `distributed_lock`;
CREATE TABLE `distributed_lock` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',`lock_name` varchar(255) NOT NULL COMMENT '锁名',`machine_id` varchar(255) DEFAULT NULL COMMENT '服务器id',`expire_time` datetime DEFAULT NULL COMMENT '过期时间,服务里会有一个看门狗续期,如果过期了就说明服务挂了,解锁会设置为空',`is_locked` tinyint(4) NOT NULL DEFAULT '0' COMMENT '当前是否锁定状态',`state` int(11) NOT NULL DEFAULT '0' COMMENT '锁标记位 类似次数',`thread_id` varchar(255) DEFAULT NULL COMMENT '当前获得锁的线程id',`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`gmt_modified` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',`is_deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否删除',PRIMARY KEY (`id`) USING BTREE,UNIQUE KEY `idx_lock_name` (`lock_name`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

尝试获取锁

使用乐观锁模式更新锁记录。如果获取失败,则加入订阅列表中,等待被唤醒或者到达超时时间自动唤醒,待获取到锁后再从订阅列表中移除。他的具体等待时间取决于用户输入的等待时间和锁超时过期的时间,这里使用JUC的Semaphore来实现等待功能。

public boolean tryLock(String lockName, Long waitTime, Long leaseTime, TimeUnit timeUnit) {long startTime = System.currentTimeMillis();String threadId = getCurrentThreadId();Long ttl = tryAcquire(lockName, leaseTime, timeUnit);// lock acquiredif (ttl == null) {return true;}long time = timeUnit.toMillis(waitTime);if (waitTime != -1 && System.currentTimeMillis() - startTime < time) {//没有获取到锁,也没到等待时长,执行订阅释放锁的任务LockEntry lockEntry = subscribe(lockName, threadId, () -> {});try {while (true) {ttl = tryAcquire(lockName, leaseTime, timeUnit);// lock acquiredif (ttl == null) {return true;}long remainTtl = time - System.currentTimeMillis() + startTime;if (remainTtl < 0) {return false;}// waiting for messagelockEntry.getLatch().tryAcquire(ttl >= 0 && ttl < remainTtl ? ttl : remainTtl, TimeUnit.MILLISECONDS);}} catch (InterruptedException e) {log.error("thread interrupted", e);throw new RuntimeException(e);} finally {unsubscribe(lockEntry, lockName);}} else {return false;}}private Long tryAcquire(String lockName, long leaseTime, TimeUnit unit) {String currentThreadId = getCurrentThreadId();//设定了自动释放锁的时间if (leaseTime != -1) {return tryLockInner(leaseTime, unit, lockName, currentThreadId);}//没有设置自动过期时间,就需要在获取到之后使用看门狗续期Long remainTtl = tryLockInner(internalLockLeaseTime, TimeUnit.MILLISECONDS, lockName, currentThreadId);// lock acquiredif (remainTtl == null) {scheduleExpirationRenewal(lockName, currentThreadId);}return remainTtl;}/*** 加锁成功返回null,否则返回锁的过期时间** @param leaseTime* @param unit* @param lockName* @param threadId* @return*/private Long tryLockInner(long leaseTime, TimeUnit unit, String lockName, String threadId) {long internalLockLeaseTime = unit.toMillis(leaseTime);//查询是否存在锁LockObject existLock = lockRepository.queryLock(lockName);LockObject lockObject = new LockObject();lockObject.setLockName(lockName);lockObject.setThreadId(threadId);lockObject.setMachineId(machineId);lockObject.setIsLocked(true);lockObject.setExpireTime(new Date(System.currentTimeMillis() + internalLockLeaseTime));if (existLock == null) {//保存锁lockObject.setState(1);try {lockRepository.save(lockObject);} catch (Exception e) {//抛出数据重复异常,说明被其他线程锁定了//返回需要等待的时间log.error("lock other thread occupy", e);return reCheckTtl(leaseTime, unit, lockName, threadId);}} else {//存在的锁会判断是否是当前线程的,如果是也允许加锁成功,支持可重入//如果正好其他锁释放了,那也会抢锁,具体是否公平由各数据库的内部锁决定int updateNum = lockRepository.reentrantLock(lockObject);if (updateNum == 0) {//返回需要等待的时间return reCheckTtl(leaseTime, unit, lockName, threadId);}}//加锁成功return null;}private Long reCheckTtl(long leaseTime, TimeUnit unit, String lockName, String threadId) {Long ttl = queryLockTtl(lockName);if (ttl == null) {//如果返回null,那就是获取锁的时候失败了,但是执行查询锁的过期时间的时候释放了//就需要重新执行上锁逻辑return tryLockInner(leaseTime, unit, lockName, threadId);} else {return ttl;}}/*** 获取锁的释放时间,单位毫秒,* 如果锁不存在 或者 未上锁 或者 已过期 则返回null** @param lockName* @return*/private Long queryLockTtl(String lockName) {LockObject lockObject = lockRepository.queryLock(lockName);if (lockObject != null && lockObject.getExpireTime() != null) {long intervalTime = lockObject.getExpireTime().getTime() - System.currentTimeMillis();if (intervalTime > 0) {return intervalTime;}}return null;}
<update id="updateReentrantLock">update distributed_lock<set>is_locked   = true,machine_id   = #{machineId,jdbcType=VARCHAR},thread_id   = #{threadId,jdbcType=VARCHAR},state       = if(expire_time &lt; NOW(), 1, state + 1),expire_time = #{expireTime,jdbcType=TIMESTAMP}</set>where is_deleted = 0and lock_name = #{lockName,jdbcType=VARCHAR}and (expire_time &lt; NOW()or is_locked = falseor (machine_id = #{machineId,jdbcType=VARCHAR}and thread_id = #{threadId,jdbcType=VARCHAR}))</update>

续约

如果锁没有设置过期时间,那么就需要设置自动续期,使用过期和续期的目的也是为了防止服务宕机导致锁无法释放的问题。如果续期失败说明锁已经释放了,那么会自动停止锁的续约任务。

private void scheduleExpirationRenewal(String lockName, String threadId) {ExpirationEntry entry = new ExpirationEntry(lockName, threadId);ExpirationEntry oldEntry = expirationRenewalMap.putIfAbsent(expirationRenewalKey(lockName, threadId), entry);if (oldEntry != null) {oldEntry.addCount();} else {//只对第一次获取锁的线程续约,后面的属于重入renewExpiration(lockName, threadId);}}private void renewExpiration(String lockName, String threadId) {String keyName = expirationRenewalKey(lockName, threadId);ExpirationEntry ee = expirationRenewalMap.get(keyName);if (ee == null) {return;}//获取到锁后过1/3时间开启续约任务scheduledExecutor.schedule(() -> {ExpirationEntry ent = expirationRenewalMap.get(keyName);if (ent == null) {return;}boolean renewResult = renewExpirationLock(lockName, ent.getThreadId());if (!renewResult) {//更新失败说明锁被释放了log.error("Can't update lock " + lockName + " expiration");expirationRenewalMap.remove(keyName);return;}// reschedule itselfrenewExpiration(lockName, threadId);}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);}private void cancelExpirationRenewal(String lockName, String threadId) {String keyName = expirationRenewalKey(lockName, threadId);ExpirationEntry task = expirationRenewalMap.get(keyName);if (task == null) {return;}Integer count = task.reduceCount();if (count == 0) {expirationRenewalMap.remove(keyName);}}private String expirationRenewalKey(String lockName, String threadId) {return lockName + "_" + threadId;}/*** 续期** @param lockName* @param threadId*/private boolean renewExpirationLock(String lockName, String threadId) {LockObject lockObject = new LockObject();lockObject.setLockName(lockName);lockObject.setThreadId(threadId);lockObject.setMachineId(machineId);lockObject.setExpireTime(new Date(System.currentTimeMillis() + internalLockLeaseTime));int updateNum = lockRepository.renewExpirationLock(lockObject);return updateNum != 0;}
<update id="updateRenewExpirationLock">update distributed_lockset expire_time = #{expireTime,jdbcType=TIMESTAMP}where is_deleted = 0and is_locked = trueand lock_name = #{lockName,jdbcType=VARCHAR}and machine_id   = #{machineId,jdbcType=VARCHAR}and thread_id   = #{threadId,jdbcType=VARCHAR}and expire_time &gt; NOW()</update>

阻塞式获取锁

阻塞式获取锁和非阻塞的区别就是等待锁释放的过程,没有获取到锁的线程会一直等待下去。

public void lock(String lockName, long leaseTime, TimeUnit unit) {LockEntry lockEntry = null;try {while (true) {// 尝试获取锁Long ttl = tryAcquire(lockName, leaseTime, unit);if (ttl == null) {// 成功获取到锁,直接退出break;}// 未获取到锁,订阅锁释放通知(如果还没订阅)if (lockEntry == null) {lockEntry = subscribe(lockName, getCurrentThreadId(), () -> {});}// 等待锁释放通知,直到TTL时间结束try {lockEntry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {// 恢复线程的中断状态Thread.currentThread().interrupt();throw new RuntimeException("Thread was interrupted while waiting for the lock", e);}}} finally {// 确保在退出时释放锁并取消订阅if (lockEntry != null) {unsubscribe(lockEntry, getCurrentThreadId());}}}

解锁

获取锁的线程释放锁的时候,state会减1,直到减到0,锁才会真正的释放。这里需要移除锁续约的任务,并且唤醒等待当前锁的线程

public void unlock(String lockName) {if (releaseLock(lockName)) {//释放锁成功后去除看门狗的续期//如果解锁失败,比如自己获取到锁过期了,然后又去释放锁,因为他没有续约任务所以不需要移除cancelExpirationRenewal(lockName, getCurrentThreadId());//发送锁释放的通知// 这里只处理本机维护的等待锁的线程,其他的机器数据库没法主动发出通知,需要轮训或者由获取锁的线程下次获取锁时自行处理LockEntry lockEntry = subscribeMap.get(lockName);//要判空,因为如果没有阻塞中的线程,那么lockEntry会为空if (lockEntry != null) {Semaphore semaphore = lockEntry.getLatch();if (semaphore.hasQueuedThreads()) {semaphore.release();}}}}
<update id="updateReleaseLock">update distributed_lock<set>state       = state - 1,expire_time = if(state=0, null, expire_time),is_locked   = if(state=0, false, true),machine_id   = if(state=0, null, machine_id),thread_id   = if(state=0, null, thread_id),</set>where is_deleted = 0and lock_name = #{lockName,jdbcType=VARCHAR}and machine_id   = #{machineId,jdbcType=VARCHAR}and thread_id   = #{threadId,jdbcType=VARCHAR}and expire_time &gt; NOW()and is_locked = true</update>

检查锁是否过期或者释放

因为mysql数据库没有发布订阅的功能,所以这里采用了定时查询的模式检查锁的状态。如果检测到锁释放了,会发起唤醒等待锁线程的通知,让等待的线程重新尝试获取锁。

public void process() {scheduledExecutor.scheduleAtFixedRate(() -> {//执行本机订阅这把锁的检查任务List<String> needCheckLockNameList = subscribeMap.entrySet().stream().filter(entry -> entry.getValue().getCounter().get() != 0).map(entry -> entry.getKey()).collect(Collectors.toList());//查询已经过期或者释放的锁List<String> lockNameList = lockRepository.queryAllowObtainLockList(needCheckLockNameList);//执行对应锁的唤醒操作lockNameList.forEach(lockName -> {LockEntry lockEntry = subscribeMap.get(lockName);if (lockEntry != null) {//这里最多多唤醒一次,无非就是让等待线程多抢占一次,没什么关系,这种场景发生在tryAcquire正好过期,定时任务正好运行//多一次判断可以大幅度减少冲突时多释放的信号Semaphore semaphore = lockEntry.getLatch();if (semaphore.hasQueuedThreads()) {semaphore.release();log.info("定时任务发起唤醒等待锁的通知");}}});}, 0, 1, TimeUnit.SECONDS);}
<select id="queryAllowObtainLockList" resultType="java.lang.String">select lock_namefrom distributed_lockwhere is_deleted = 0and lock_name in<foreach collection="list" item="lockName" open="(" close=")" separator=",">#{lockName,jdbcType=VARCHAR}</foreach>and (is_locked = falseor expire_time &lt; NOW())</select>

使用示例

public static void main(String[] args) {// 第一个Spring容器,加载配置类 Config1ApplicationContext context1 = new AnnotationConfigApplicationContext(MybatisPlusConfig.class);// 第二个Spring容器,加载配置类 Config2ApplicationContext context2 = new AnnotationConfigApplicationContext(MybatisPlusConfig.class);DatabaseDistributedLock server1 = context1.getBean(DatabaseDistributedLock.class);DatabaseDistributedLock server2 = context2.getBean(DatabaseDistributedLock.class);server1.lock("test");new Thread(() -> {ThreadUtil.sleep(1, TimeUnit.SECONDS);if (server2.tryLock("test", 17L, TimeUnit.SECONDS)) {System.out.println("我执行了1");ThreadUtil.sleep(5, TimeUnit.SECONDS);server2.unlock("test");}}).start();new Thread(() -> {ThreadUtil.sleep(2, TimeUnit.SECONDS);if (server1.tryLock("test", 17L, TimeUnit.SECONDS)) {System.out.println("我执行了2");ThreadUtil.sleep(5, TimeUnit.SECONDS);server1.unlock("test");}}).start();System.out.println("我获取到了锁");ThreadUtil.sleep(15, TimeUnit.SECONDS);server1.unlock("test");ThreadUtil.sleep(100, TimeUnit.SECONDS);}

优化方案

订阅通知如果有消息队列的话,可以借助用来实现发布订阅锁通知

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/883092.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring MVC(下)

博主主页: 码农派大星. 数据结构专栏:Java数据结构 数据库专栏:MySQL数据库 JavaEE专栏:JavaEE 关注博主带你了解更多JavaEE知识 目录 1.响应 1.1 返回静态页面 1.2 返回数据ResponseBody 1.3 返回HTML代码⽚段 1.4 返回JSON 1.5 设置状态码 1.6 设置Header 2 . …

【文献及模型、制图分享】基于国际湿地城市视角的常德市湿地保护修复成效与归因分析及其政策启示

文献介绍 《湿地公约》提出的“国际湿地城市”认证是促进湿地保护修复的新举措。以国际湿地城市常德市为例&#xff0c;基于2000—2022年15 m空间分辨率湿地分类数据&#xff0c;监测常德市湿地保护修复逐年动态变化&#xff0c;定量分析湿地保护修复驱动因素的重要性和贡献率…

K8s中TSL证书如何续期

TSL是什么 K8s中的作用是什么&#xff1f; 在 Kubernetes&#xff08;K8s&#xff09;中&#xff0c;TSL 指的是 Transport Layer Security&#xff0c;也就是传输层安全协议。它是用来保护在网络上传输的数据的安全性和隐私性。 TSL 在 Kubernetes 中的作用包括&#xff1a;…

第1讲(ASP.NET Core 6 Web Api 开发入门):第一个Web Api项目

一、运行模板项目 二、验证模板项目的api 法1&#xff1a;直接在网页上进行验证api 法2&#xff1a;通过命令行验证api 复制下图的Curl语句&#xff0c;打开命令行进行粘贴。&#xff08;对于windows系统&#xff0c;需要把换成"&#xff0c;再去掉所有的/&#xff0c;最…

一文了解AOSP是什么?

一文了解AOSP是什么&#xff1f; AOSP基本信息 基本定义 AOSP是Android Open Source Project的缩写&#xff0c;这是一个由Google维护的完全免费和开放的操作系统开发项目。它是Android系统的核心基础&#xff0c;提供了构建移动操作系统所需的基本组件。 主要特点 完全开源…

【景观生态学实验】实验一 ArcGIS地理数据处理及制图基础

实验目的 1.掌握ArcGIS软件基本操作&#xff1a;通过实验操作与学习&#xff0c;熟练掌握ArcGIS软件相关的基本操作&#xff0c;包括界面熟悉、工具栏使用、数据的加载和保存、基本数据处理操作等; 2.掌握如何使用ArcGIS进行影像拼接及裁剪&#xff1a;通过实验操作与学习&am…

传知代码-ChatGPT多模态命名实体识别

代码以及视频讲解 本文所涉及所有资源均在传知代码平台可获取 ChatGPT辅助细化知识增强&#xff01; 多模态命名实体识别&#xff08;MNER&#xff09;最近引起了广泛关注。 用户在社交媒体上生成大量非结构化内容&#xff0c;主要由图像和文本组成。这些帖子具有与社交媒体相…

GISBox vs CesiumLab:哪款GIS工具更适合你的项目?

在地理信息系统&#xff08;GIS&#xff09;领域&#xff0c;越来越多的用户开始关注GIS工具箱的选择&#xff0c;其中GISBox和CesiumLab是两款备受推崇的产品。那么&#xff0c;哪一款更适合你的需求呢&#xff1f;本文将从功能、使用体验和应用场景等方面&#xff0c;对GISBo…

产品如何实现3D展示?具体步骤如下

产品实现3D展示主要依赖于先进的3D建模与展示技术。以下是产品实现3D展示的具体步骤和方法&#xff1a; 一、3D建模 使用专业的3D建模软件&#xff0c;如Blender、Maya、3ds Max等&#xff0c;这些软件提供了丰富的建模工具和材质编辑器&#xff0c;能够创建出高精度的3D模型…

Python基于amazon/chronos-t5-base的预训练模型离线对时间系列数据的未来进行预测

Python基于预训练模型对时间系列数据的未来进行预测 导入库 %matplotlib inline import matplotlib.pyplot as plt import numpy as np import pandas as pd import torch from chronos import ChronosPipeline from tqdm.auto import tqdm from autogluon.timeseries import…

电脑定期运行某个程序

1、右键计算机-管理&#xff0c;点击任务计划程序&#xff0c;再点击创建基本任务&#xff1b; 2、写名称&#xff0c;下一步 3、选择任务开始计划&#xff0c;下一步 4、选择触发时间&#xff0c;下一步 5、选择启动程序&#xff0c;下一步 6、选择运行的程序&#xff0c;下一…

模型拆解(一):DBINet、GCPANet、CPD、ACCoNet、FPS-U2Net

文章目录 一、DBINet1.1编码器模块&#xff1a;ResNet50PVT双分支结构1.2解码器模块&#xff1a;自细化模块SR的应用1.3DFM&#xff1a;双分支融合模块1.4转换器模块&#xff1a;调整编码器输出至解码器中1.5深度监督损失函数 二、GCPANet2.1编码器模块&#xff1a;ResNet50主干…

uniapp移动端优惠券! 附源码!!!!

本文为常见的移动端uniapp优惠券&#xff0c;共有6种优惠券样式&#xff08;参考了常见的优惠券&#xff09;&#xff0c;文本内容仅为示例&#xff0c;您可在此基础上调整为你想要的文本 预览效果 通过模拟数据&#xff0c;实现点击使用优惠券让其变为灰色的效果&#xff08;模…

来自骨关节炎计划的膝关节MR图像的自动异常感知3D骨骼和软骨分割|文献速递-基于生成模型的数据增强与疾病监测应用

Title 题目 Automated anomaly-aware 3D segmentation of bones and cartilages in kneeMR images from the Osteoarthritis Initiative 来自骨关节炎计划的膝关节MR图像的自动异常感知3D骨骼和软骨分割 Background 背景 近年来&#xff0c;多个机器学习算法被提出用于图像…

windows|常见的文件伪装方法

几种常见的文件伪装方法&#xff1a; 扩展名伪装unicode字符伪装压缩包伪装隐写术 方法仅限于学习目的&#xff0c;不用于任何恶意或非法用途。 ———— 一、扩展名伪装&#xff1a;假装是另一种类型的文件 修改文件的扩展名&#xff0c;使得文件看起来像其他类型的文件&a…

python常用设计模式,单例模式和工厂设计模式

python常用设计模式&#xff0c;单例和工厂设计模式Demo 单例模式 单例设计模式是一种创建型设计模式&#xff0c;它确保一个类只有一个实例&#xff0c;并提供一个全局访问点来获取该实例。 应用场景&#xff1a;日志记录、线程池、缓存等 优点&#xff1a; 全局访问&…

洛谷题解 - P1162 填涂颜色

目录 填涂颜色题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 提示题解思路代码 填涂颜色 题目描述 由数字 0 0 0 组成的方阵中&#xff0c;有一任意形状的由数字 1 1 1 构成的闭合圈。现要求把闭合圈内的所有空间都填写成 2 2 2。例如&#xff1a; 6 6 6\times…

python的散列类型与字符编码

文章目录 一、 初识散列类型(无序序列)二、认识集合集合的方法增删 二、认识字典字典方法增删改查 声明空变量 三、字符编码元组名() #声明一个空元组 一、 初识散列类型(无序序列) 数据类型分为3种: python的序列类型有好几种&#xff0c;之前的博文讲到了两种类型 1.数值类型…

Appium中的api(一)

目录 1.基础python代码准备 1--参数的一些说明 2--python内所要编写的代码 解释 2.如何获取包名和界面名 1-api 2-完整代码 代码解释 3.如何关闭驱动连接 4.安装卸载app 1--卸载 2--安装 5.判断app是否安装 6.将应用放到后台在切换为前台的时间 7.UIAutomatorViewer的使用 1--找…

Oracle CONNECT BY、PRIOR和START WITH关键字详解

Oracle CONNECT BY、PRIOR和START WITH关键字详解 1. 基本概念2. 数据示例3. SQL示例3.1. 查询所有员工及其上级3.2. 显示层次结构3.3. 查询特定员工的子级 4. 结论 在Oracle数据库中&#xff0c;CONNECT BY、PRIOR和START WITH关键字主要用于处理层次结构数据&#xff0c;例如…