M功能-分布式锁-支付平台(五)

target:离开柬埔寨倒计时-218day

在这里插入图片描述

珍藏的图片又拿出来了

前言

M系统中的撮合引擎是最最核心的功能,第一版的撮合引擎不是我写的,也没有做交易对的动态分配这样的功能,都是基于抢锁方式来决定谁拥有该交易对的撮合权限,所以锁就至关重要了,本来最简单的方法就是只起一个java进程,然后用jdk的锁就不用担心这些问题了,但是当交易对多的时候,一个进程就不一定能及时的处理这些订单,所以还是需要多台机器同步进行处理,所以还是需要分布式锁。

我接触到的最初版本

我初次接触这个系统是在2019年初

记得是在那年4月还是5月的时候,发生了一个异常,同一个订单撮合了两次,本来那个订单在第一次撮合后就已经全部成交了,所以紧跟着就来了第二笔撮合,那时的负责人让我协助排查这个问题,我就一脸懵的开始了排查之路

  • 首先我快速熟悉这套交易流程,让负责人给我讲解;
  • 根据交易流程,发现问题出现的原因一定在撮合引擎上面;
  • 查看撮合引擎的日志

当时撮合引擎的线程名称是撮合引擎前缀+交易对+编号,排查日志很容易发现其中有两个线程名称和相似,只有编号不一样,交易对是一样的,这就意味着同一个交易对有两个线程在进行撮合,因为这两个线程处于不同的jvm进程内,所以就没办法共享订单簿内存,这样就会出现撮合多次的情况了。

看到这里我不禁心想,这不是锁住了吗,怎么会还出现同一个交易对被两个线程都撮合的情况呢,除非这个锁没有锁住,我先是去查看了加锁的逻辑,加锁使用的是redisson,加锁的key是交易对,所以从逻辑上看是没什么问题的;然后我就继续排查日志,我看到第二台服务器的那个线程产生撮合日志的时间就在几个小时前,属于我就着重去找了那段时间的日志;

从里面的日志我看到了一条很有嫌疑的日志,不能更改锁的过期时间,这时候我隐约知道问题出现的原因了

先来看一段redisson锁里面的一段关键代码片段

类:org.redisson.RedissonLock

// 这个其实就是给redisson锁保活的一个续命任务
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {// 就是这里,如果这里发生了异常,就不会执行下面的对自己的调用if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}// 其实当时key是存在的,只是发生了网络问题,所以没有到这个分支if (res) {// reschedule itself// 每次续命成功才会继续发起下一次的续命renewExpiration();}});}// 这里续命时间默认是锁超时时间的1/3,也就是说默认30s的话,会每10s发起一次续命}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}

其实这个续命任务在多数场景下都是足以支持的了,像我遇到的这个场景是比较少见的,当然也可以增大锁的超时时间,但是多长的时间能满足呢,这些都是问题,所以基于这个场景我写了个基于mysql的锁来支持这个功能。

Mysql实现简单的分布式锁

首先是一个大的抽象类,实现lock接口

package com.littlehow.lock;import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;public abstract class DefaultLock implements Lock {private final String id;protected final String key;// 主要是此处存放锁定key和id使用protected DefaultLock(String key) {this.id = UUID.randomUUID().toString().replace("-", "");this.key = key;}// 获取锁idpublic String getId(long threadId) {return id + ":" + threadId;}public String getKey() {return key;}@Overridepublic boolean tryLock() {try {return tryLock(-1, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();}return false;}@Overridepublic void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}@Overridepublic Condition newCondition() {return new Condition() {@Overridepublic void await() throws InterruptedException {}@Overridepublic void awaitUninterruptibly() {}@Overridepublic long awaitNanos(long nanosTimeout) throws InterruptedException {return 0;}@Overridepublic boolean await(long time, TimeUnit unit) throws InterruptedException {return false;}@Overridepublic boolean awaitUntil(Date deadline) throws InterruptedException {return false;}@Overridepublic void signal() {}@Overridepublic void signalAll() {}};}
}

真正实现逻辑的分布式锁实现类

package com.littlehow.lock;import lombok.extern.slf4j.Slf4j;import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j
public class DistributedLock extends DefaultLock {private final static Map<String, ScheduledFuture> scheduleFuture = new ConcurrentHashMap<>();private final static AtomicInteger threadId = new AtomicInteger(1);// 使用默认的拒绝策略AbortPolicy 新任务来了抛出拒绝异常即可private final ExecutorService pool = new ThreadPoolExecutor(8, 20, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100000),r -> new Thread(r, "littlehow-lock-" + threadId.getAndIncrement()));private final ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor((r) -> new Thread(r,"DistributedLock-thread"));private static final long defaultTimeout = 60000L;private final long expired;private ContinueLife continueLife;private final LockService lockService;public DistributedLock(long expired, LockService lockService, String key) {this(expired, null, lockService, key);}public DistributedLock(long expired, ContinueLife continueLife, LockService lockService, String key) {super(key);this.expired = expired;this.continueLife = continueLife;this.lockService = lockService;}@Overridepublic void lockInterruptibly() {tryLock(-1, TimeUnit.MILLISECONDS);}/*** 如果要实现重入,可以在这里获取锁成功后计数到ThreadLocal,不用考虑计数失败,因为在这里操作计数失败只能是发生了不可控的异常* 想要保证原子性的话,计数就可以放到底层,如mysql表这些来设置,此处因为没有重入的需求,所以就没有实现加锁去锁的计数*/@Overridepublic boolean tryLock(long time, TimeUnit unit) {final String id = getId(Thread.currentThread().getId());// 这里实际上使用的ip获取工具获取的,此处就写死String ip = "192.168.1.1";log.debug("get lock key={}, id={}, ip={}", key, id, ip);Future<Boolean> future = pool.submit(() ->  this.lockService.tryLock(this.key, id, System.currentTimeMillis() + expired, ip));try {boolean lock = future.get(time == -1L ? defaultTimeout : time, unit);if (lock && continueLife != null) {final String cacheKey = key + "-" + id;if (!scheduleFuture.containsKey(cacheKey)) {ScheduledFuture taskFuture = schedule.scheduleWithFixedDelay(() -> {boolean flag = this.continueLife.flushLife(key, id, System.currentTimeMillis() + expired) ;//如果续命返回false,则会清除续命任务if (!flag) {cancelContinueTask(cacheKey);}},expired / 3, expired / 3, TimeUnit.MILLISECONDS);scheduleFuture.put(cacheKey, taskFuture);}}return lock;} catch (Exception e) {log.debug("get lock fail key={} id={} message={}", key, getId(Thread.currentThread().getId()), e.getMessage());}return false;}@Overridepublic void unlock() {String id = getId(Thread.currentThread().getId());try {log.info("unlock key={}, id={}", key, id);this.lockService.unlock(this.key, id);} catch (Throwable t) {log.error("解锁异常", t);cancelContinueTask(key + "-" + id);}}private void cancelContinueTask(String cacheKey) {//停止相应的续命任务ScheduledFuture tf = scheduleFuture.get(cacheKey);if (tf == null) return;log.info("continue life fail key={}", key);tf.cancel(true);log.info("clear task key={}, result={}", key, tf.isCancelled());scheduleFuture.remove(cacheKey);}
}

下面是锁接口和续命接口

package com.littlehow.lock;public interface LockService {// 获取锁boolean tryLock(String key, String id, long expired, String ip);// 解锁void unlock(String key, String id);
}=====================================================================================package com.littlehow.lock;public interface ContinueLife {// 刷新过期时间boolean flushLife(String key, String id, long time);
}

然后是mysql实现的一套锁,基于上面的基础接口和类

package com.littlehow.lock.support.mysql;import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;/*** @author littlehow* @since 5/28/24 19:43*/
@Setter
@Getter
@Accessors(chain = true)
public class LockModel {/*** 锁的关键key*/private String key;/*** 锁的机器ip地址*/private String ip;/*** 锁的实际id*/private String lockId;/*** 锁的过期时间*/private Long expireTime;
}=====================================================================================
package com.littlehow.lock.support.mysql;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;@Component
@Slf4j
public class MysqlLockSupport {@Value("${lock.warn.time:30000}")private long warnTime;public boolean tryLock(String key, String id, long expired, String ip) {Assert.hasText(id, "lock id must be not null");LockModel lockModel = new LockModel().setLockId(id).setExpireTime(expired).setKey(key).setIp(ip);// mysql实际实现细节就不具体写出来了,下面就写个伪代码// 实际代码是去数据库拉取信息,然后根据数据库信息进行下面的判定LockModel dbLock = lockModel;if (dbLock == null) {// 进行保存,保存成功才返回true,否则返回false,对唯一约束异常也要做保存失败处理return true;} else if (id.equals(dbLock.getLockId())) {// 同一个线程获取两次锁,直接返回true// 重入逻辑可以在上层使用ThreadLocal实现,这里就不实现数据库的计数了return true;} else {// 这里就是其他线程在对此进行抢锁操作// 如果时间超过了配置的警告时间,则进行错误日志答应,报警处理if (System.currentTimeMillis() - warnTime > dbLock.getExpireTime()) {log.error("key {} deadlock for {}, ip address {}", key, dbLock.getLockId(), dbLock.getIp());}}return false;}public void unlock(String key, String id) {// 如果支持重入的锁,那么上层逻辑一定要减去对应的值,最终等于1才调用此处的逻辑// 此处的代码就相当于是更新三个值,一个锁的过期时间,一个是锁的lockId。一个是ip地址,都进行置空处理// 因为这个是为撮合引擎定制的锁,所以这个key才不进行删除,因为此处的key就相当于是交易对,这些交易对基本都是固定的,只会增加,基本不会出现减少的情况}public boolean updateLockExpired(String key, String id, long time) {log.info("start continue life key={}, id={}, time={}", key, id, time);// 这里是更新锁的续命时间, 如果更新续命时间成功,则返回true即可return true;}
}=====================================================================================
package com.littlehow.lock.support.mysql;import com.littlehow.lock.LockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MysqlLockService implements LockService {@Autowiredprivate MysqlLockSupport lockSupport;@Overridepublic boolean tryLock(String key, String id, long expired, String ip) {try {return lockSupport.tryLock(key, id, expired, ip);} catch (Throwable t) {log.error("获取锁异常", t);return false;}}@Overridepublic void unlock(String key, String id) {lockSupport.unlock(key, id);}}=====================================================================================
package com.littlehow.lock.support.mysql;import com.littlehow.lock.ContinueLife;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MysqlContinueLife implements ContinueLife {@Autowiredprivate MysqlLockSupport lockSupport;@Overridepublic boolean flushLife(String key, String id, long time) {try {return lockSupport.updateLockExpired(key, id, time);} catch (Throwable t) {//出现异常返回true,下次续命任务会继续进行log.error("锁续命异常", t);}return true;}
}=====================================================================================
package com.littlehow.lock.support.mysql;import com.littlehow.lock.DistributedLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;@Component
public class MysqlLockFactory {@Value("${lock.expired:30000}")private long expired;@Autowiredprivate MysqlContinueLife continueLife;@Autowiredprivate MysqlLockService lockService;private static final Map<String, Lock> locks = new HashMap<>();/*** 获取锁信息* @param key* @return*/public Lock getLock(String key) {Lock lock = locks.get(key);if (lock == null) {synchronized (this) {lock = locks.get(key);if (lock == null) {lock = new DistributedLock(expired, continueLife, lockService, key);locks.put(key, lock);}}}return lock;}
}

然后就是调用了

package com.littlehow.lock;import com.littlehow.lock.support.mysql.MysqlLockFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;import java.util.concurrent.locks.Lock;/*** @author littlehow* @since 5/28/24 20:04*/
@Slf4j
public class TestLock {@Autowiredprivate MysqlLockFactory mysqlLockFactory;/*** 这里可以使用junit进行测试调用*/public void test() {Lock lock = mysqlLockFactory.getLock("USD/CNY");try {if (lock.tryLock()) {// 已经获取到锁,可以进行业务处理} else {log.info("获取锁失败");}} finally {lock.unlock();}}
}

所以整个锁的获取流程图如下

在这里插入图片描述

后记

这几天很忙很忙,差点就中断制定的日更博客了,做M功能时的苦难感情戏本来就要登场的,结果一直酝酿不出当时的情绪,感觉写不好,所以就先更新一些我在M项目里面做的一些事情,也算是解析了一点点分布式锁在超长事务里面使用的一些注意事项吧!

今天又看到别人在翻新自己的“沙滩排球”场地,有时候真的羡慕他们呀,没有那么卷的生活,每天都开开心心,还能忙里偷闲做自己喜欢做的事情!
在这里插入图片描述

加油吧littlehow
北京时间:2024-05-28 21:10

金边时间:2024-05-28 20:10

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/18082.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL--复合查询

之前学过了基本的查询&#xff0c;虽然已经够80%的使用场景了&#xff0c;但是依旧需要了解剩下的20%。 一、多表笛卡尔积&#xff08;多表查询&#xff09; 以前我们使用基本查询的时候&#xff0c;from后面就跟一张表名&#xff0c;在多表查询这里&#xff0c;from后面可以跟…

13 VUE学习:组件v-model

基本用法 v-model 可以在组件上使用以实现双向绑定。 从 Vue 3.4 开始&#xff0c;推荐的实现方式是使用 [defineModel()]宏&#xff1a; <!-- Child.vue --> <script setup> const model defineModel()function update() {model.value } </script><te…

GitLab的安装及基础操作

1. 项目目标 &#xff08;1&#xff09;熟练使用rpm包安装gitlab &#xff08;2&#xff09;熟练配置gitlab &#xff08;3&#xff09;熟练创建gitlab群组、成员、项目 &#xff08;4&#xff09;熟练使用gitlab推送和拉取代码 2. 项目准备 2.1. 规划节点 主机名 主机I…

Gb 2024-05-22开源项目日报Top10

根据Github Trendings的统计,今日(2024-05-22统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目3非开发语言项目2Jupyter Notebook项目2Rust项目2JavaScript项目1Lua项目1编程面试大学:成为软件工程师的全面学习计划 创建周期:2…

查询DQL

016条件查询之等量关系 条件查询语法格式 select ... from... where过滤条件;等于 select empno, ename from emp where sal3000;select job, sal from emp where enameFORD;select grade, losal, hisal from salgrade where grade 1;不等于 <> 或 ! selectempno,en…

德比软件携手亚马逊云科技,用生成式AI赋能旅游行业降本增效

旅游行业是最早被数字化技术赋能的行业之一。比如&#xff0c;消费者早已习惯在携程、艺龙、Booking等OTA平台根据实时酒店信息预订酒店。 这种丝滑的消费者体验背后&#xff0c;离不开领先的管理软件支撑。实际上大型酒店集团与OTA平台之间的系统对接非常复杂&#xff0c;酒店…

Go GORM介绍

GORM 是一个功能强大的 Go 语言 ORM&#xff08;对象关系映射&#xff09;库&#xff0c;它提供了一种方便的方式来与 SQL 数据库进行交互&#xff0c;而不需要编写大量的 SQL 代码。 GORM的关键特性 全功能的ORM&#xff1a;支持几乎所有的ORM功能&#xff0c;包括模型定义、基…

在Ubuntu系统中使用Systemctl添加启动项的详细指南

在Ubuntu系统中使用Systemctl添加启动项的详细指南 在Ubuntu系统中&#xff0c;systemctl 是管理systemd服务的主要工具。通过它&#xff0c;你可以添加、启动、停止、重启、启用和禁用服务。 什么是Systemctl&#xff1f; systemctl 是一个用于管理systemd系统和服务管理器…

OpenHarmony迎来首个互联网技术统一标准,鸿蒙OS生态走向如何?

开源三年半&#xff0c;OpenHarmony(以下简称“开源鸿蒙”)迎来了新进展。在5月25日召开的「OpenHarmony开发者大会」上&#xff0c;鸿蒙官宣了开源鸿蒙设备统一互联技术标准。 一直以来&#xff0c;各行业品牌操作系统相互独立、难以协同,成为其互联互通的痛点。为进一步解决…

Unity SetParent第二个参数worldPositionStays的意义

初学Unity的小知识&#xff1a; 改变对象的父级有三种调用方式&#xff0c;如下&#xff1a; transMe.SetParent(transParent,true); transMe.SetParent(transParent,false); transMe.parent transParent;具体有什么区别呢&#xff0c;这里写一个测试例子来详细说明&#xff…

数据驱动的UI艺术:智能设计的视觉盛宴

数据驱动的UI艺术&#xff1a;智能设计的视觉盛宴 引言 在当今这个数据泛滥的时代&#xff0c;大数据不仅仅是一种技术手段&#xff0c;它更是一种艺术形式。当大数据遇上UI设计&#xff0c;两者的结合便催生了一种全新的艺术形式——数据驱动的UI艺术。本文将探讨如何将数据…

STM32建立工程问题汇总

老版本MDK&#xff0c;例如MDK4 工程内容如下&#xff1a; User文件夹中存放main.c文件&#xff0c;用户中断服务函数&#xff08;stm32f1xx.it.c&#xff09;&#xff0c;用户配置文件&#xff08;stm32f1xx_hal_conf.h&#xff09;等用户程序文件&#xff0c;或者mdk启动程序…

5,串口编程---实现简单的用串口发送接收数据

单片机通过串口向PC机发送数据 PC机通过串口接收单片机发过来的数据 1.UART和USART的区别&#xff1a; USART支持同步通信方式,可以通过外部时钟信号进行同步传输,而UART仅支持异步通信方式 本开发板STM32F103ZET6有5个串口&#xff0c;用串口1作调试串口&#xff0c;因为串…

关于我转生从零开始学C++这件事:升级Lv.25

❀❀❀ 文章由不准备秃的大伟原创 ❀❀❀ ♪♪♪ 若有转载&#xff0c;请联系博主哦~ ♪♪♪ ❤❤❤ 致力学好编程的宝藏博主&#xff0c;代码兴国&#xff01;❤❤❤ OK了老铁们&#xff0c;又是一个周末&#xff0c;大伟又来继续给大家更新我们的C的内容了。那么根据上一篇博…

php之web开发

目标 实现一款具有常用大部分功能的WEB应用&#xff0c;并初步了解WEB漏洞原理 登录功能&#xff1a; 1、基于前端的登录功能 <!DOCTYPE html> <html> <head> <title>简单登录功能</title> </head> <meta charset"UTF-8"…

【Python】 Python中的“命名元组”:简单而强大的数据结构

基本原理 在Python中&#xff0c;namedtuple是tuple的一个子类&#xff0c;它允许我们为元组的每个位置指定一个名字。这种数据结构非常适合用于需要固定字段和值的场景&#xff0c;例如数据库查询的结果或配置文件中的设置。 namedtuple提供了一种方便的方式来访问元组中的元…

海外盲盒系统APP开发,盲盒出海热潮下的蓝海

近几年&#xff0c;我国潮玩出海成为了一个的大风口。根据数据显示&#xff0c;今年全球潮玩规模将达到400多亿美元&#xff0c;市场发展空间巨大。海外庞大的市场对于我国盲盒出海是一个较大的优势。在当下互联网的快速发展下&#xff0c;海外盲盒APP商城成为了盲盒企业出海的…

开源VS闭源:谁将引领AI大模型的新时代?

一、引言 随着人工智能技术的飞速发展&#xff0c;AI大模型已成为推动这一浪潮的核心动力。在AI大模型的发展过程中&#xff0c;开源与闭源两种不同的发展路径一直备受关注。本文将深入探讨这两种路径的优劣势&#xff0c;分析它们对AI大模型发展的影响&#xff0c;并预测谁将…

ctfshow web入门 黑盒测试

web380 这里文章看的我好有感触 但是影响做题 扫描一下 访问flag.php啥也没有再访问page.php page.php?idflagweb381 扫出来page.php但是没啥用哇&#xff0c;查看源代码 这些文件挨个试发现啥也没&#xff0c;最后仔细对比发现其实都是layui&#xff0c;然后尝试着访问…