DTM分布式事务

DTM分布式事务

从内网看到了关于事务在业务中的讨论,评论区大佬有提及DTM开源项目[https://dtm.pub/],开学开学

基础理论

一、Why DTM

​ 项目产生于实际生产中的问题,涉及订单支付的服务会将所有业务相关逻辑放到一个大的本地事务,导致大量耦合,复杂度大幅提升

​ java成熟的分布式事务解决方案,使用代价过高:大量业务用java重写

​ DTM,Distributed Transaction Manager, 其是一个分布式事务管理器,解决跨数据库、跨服务、跨语言更新数据的一致性问题。

​ DTM提供了Saga、TCC、XA和二阶段消息模式以满足不同应用场景的需求,同时首创的子事务屏障技术有效解决幂等悬挂空补偿等异常问题。

​ DTM的优点:

  • 提供简单易用的接口,拆分具体业务接入分布式事务
  • 支持多语言栈
  • 核心技术子事务屏蔽,降低处理子事务乱的难度

二、 快速上手

1、Demo

​ 了解DTM的发展和特点,quick start一下8⃣️

// 运行dtm
git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
// 运行一个saga示例
go run qs/main.go

​ 上述Saga示例实现一个类似跨行转账的功能,包括两个事务分支:资金转出(TransOut)、资金转入(TransIn)。DTM保证TransIn和TransOut要么全成功,要么全回滚,保证最终金额的正确性。

  // 具体业务微服务地址const qsBusi = "http://localhost:8081/api/busi_saga"req := &gin.H{"amount": 30} // 微服务的载荷// DtmServer为DTM服务的地址,是一个urlDtmServer := "http://localhost:36789/api/dtmsvr"saga := dtmcli.NewSaga(DtmServer, shortuuid.New()).// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 补偿操作为url: qsBusi+"/TransOutCompensate"Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransIn", 补偿操作为url: qsBusi+"/TransInCompensate"Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务err := saga.Submit()

在这里插入图片描述

2、时序图

在这里插入图片描述

​ 从以上时序图可以看出,DTM整个全局事务分为如下几步:

  1. 用户定义好全局事务所有的事务分支(全局事务的组成部分称为事务分支),然后提交给DTM,DTM持久化全局事务信息后,立即返回
  2. DTM取出第一个事务分支,这里是TransOut,调用该服务并成功返回
  3. DTM取出第二个事务分支,这里是TransIn,调用该服务并成功返回
  4. DTM已完成所有的事务分支,将全局事务的状态修改为已完成

失败情况:

​ 在实际业务中,子事务可以出现失败,例如转入的子账号被冻结导致转账失败。因此可以对业务代码进行修改来模拟TransIn正向操作失败

func qsAddRoute(app *gin.Engine) {app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {log.Printf("TransIn")c.JSON(200, "")// c.JSON(409, "") // Status 409 for Failure. Won't be retried})app.POST(qsBusiAPI+"/TransInCompensate", func(c *gin.Context) {log.Printf("TransInCompensate")c.JSON(200, "")})app.POST(qsBusiAPI+"/TransOut", func(c *gin.Context) {log.Printf("TransOut")c.JSON(200, "")})app.POST(qsBusiAPI+"/TransOutCompensate", func(c *gin.Context) {log.Printf("TransOutCompensate")c.JSON(200, "")})
}

​ 再次运行,整个事务最终失败,时序图:

在这里插入图片描述

在转入操作失败的情况下,TransIn和TransOut的补偿操作被执行,保证了最终的余额和转账前是一样的

在这里插入图片描述

三、二阶段消息Demo

业务场景:

​ 跨行转账是典型的分布式事务场景,在这里A需要跨行转账给B

假设需求场景:

​ 只有转出A可能失败,转入B是能够最终成功的

​ 二阶段消息是DTM首创的事务模式,用于替换本地事务表和事务消息这两种现有的方案

​ 二阶段消息能够保证本地事务的提交和全局事务的提交是原子性的,适合解决不需要回滚的分布式事务场景

1、核心代码
// SagaAdjustBalance 1
func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {if strings.Contains(result, dtmcli.ResultFailure) {return dtmcli.ErrFailure}_, err := dtmimp.DBExec(BusiConf.Driver, db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)return err
}

调整用户的账号余额

app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {barrier := MustBarrierFromGin(c)return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)})}))

barrier.Call主要用于处理幂等,保证重复调用不会多次调整余额

开启事务,进行分支调用

	gid := dtmimp.GetFuncName()req := busi.GenReqHTTP(30, false, false)msg := dtmcli.NewMsg(DtmServer, gid).Add(busi.Busi+"/SagaBTransIn", req)err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error {return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS")})

该代码保证了DoAndSubmitDB的业务提交和全局事务提交是原子性的,保证TransOut和TransIn同时成功or失败。 其中DoAndSubmitDB第一个参数为回查URL

	app.GET(BusiAPI+"/QueryPrepared", dtmutil.WrapHandler(func(c *gin.Context) interface{} {logger.Debugf("%s QueryPrepared", c.Query("gid"))return string2DtmError(dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))}))app.GET(BusiAPI+"/QueryPreparedB", dtmutil.WrapHandler(func(c *gin.Context) interface{} {logger.Debugf("%s QueryPreparedB", c.Query("gid"))bb := MustBarrierFromGin(c)db := dbGet().ToSQLDB()return bb.QueryPrepared(db)}))
2、原子性

在这里插入图片描述

四、SAGA型事务Demo

业务场景:

​ A需要跨行转账给B

假设场景:

​ 转出A和转入B都可能成功和失败,需要最终转入转出都成功or失败

核心思想:

​ 将长事务拆分为多个本地短事务,有Saga事务协调器来协调,如果各个本地事务成功完成则正常完成,如果某个步骤失败,则根据相反顺序依次调用补偿操作。

1、核心代码

​ 调整用户的账户余额

func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {if strings.Contains(result, dtmcli.ResultFailure) {return dtmcli.ErrFailure}_, err := dtmimp.DBExec(BusiConf.Driver, db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)return err
}

​ 正向操作/补偿操作的处理函数(源码新增的Demo并未在此作展示

		app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {barrier := MustBarrierFromGin(c)return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)})}))app.POST(BusiAPI+"/SagaBTransInCom", dtmutil.WrapHandler(func(c *gin.Context) interface{} {barrier := MustBarrierFromGin(c)return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransInUID, -reqFrom(c).Amount, "")})}))app.POST(BusiAPI+"/SagaBTransOut", dtmutil.WrapHandler(func(c *gin.Context) interface{} {barrier := MustBarrierFromGin(c)return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount, reqFrom(c).TransOutResult)})}))app.POST(BusiAPI+"/SagaBTransOutCom", dtmutil.WrapHandler(func(c *gin.Context) interface{} {barrier := MustBarrierFromGin(c)return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransOutUID, reqFrom(c).Amount, "")})}))

开启事务,进行分支调用

		req := &busi.ReqHTTP{Amount: 30}// DtmServer为DTM服务的地址saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, shortuuid.New()).// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"Add(busi.Busi+"/SagaBTransOut", busi.Busi+"/SagaBTransOutCom", req).// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"Add(busi.Busi+"/SagaBTransIn", busi.Busi+"/SagaBTransInCom", req)// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务logger.Debugf("busi trans submit")err := saga.Submit()
2、时序图

​ 与快速上手demo一致

在这里插入图片描述

3、处理网络异常

​ 假设提交给dtm的事务中, 调用转入操作时,出现短暂的故障——>dtm会重试未完成的操作,此时要求全局事务中的各个子事务时幂等的

​ 子事务屏障技术,提供了BranchBarrier工具类,提供Call函数来保证函数内部的业务最多被调用一次。

4、处理回滚

​ 事务失败交互的时序图:

在这里插入图片描述

  • TransIn的正向操作发生在提交之前,则补偿为空操作
  • TransIn的操作如果发生在提交后,则补偿操作会将数据提交一次

五、TCC型事务Demo

业务场景:

​ A需要跨行转账给B

假设场景:

​ 转出A和转入B都可能成功和失败,需要最终转入转出都成功or失败

​ 还有一个要求,假如发生回滚,SAGA 模式下会发生A发现自己的余额被扣减了,但是收款方B迟迟没有收到余额,那么会对A造成很大的困扰。业务上面希望不要出现这种情况

TCC分为3个阶段

  • Try 阶段:尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性)
  • Confirm 阶段:如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源
  • Cancel 阶段:如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。

在这里插入图片描述

1、核心代码

​ 冻结/解冻资金操作,会检查约束balance+trading_balance >= 0,如果约束不成立,执行失败(trading_balance 表示被冻结的金额

func tccAdjustTrading(db dtmcli.DB, uid int, amount int) error {affected, err := dtmimp.DBExec(BusiConf.Driver, db, `update dtm_busi.user_accountset trading_balance=trading_balance+?where user_id=? and trading_balance + ? + balance >= 0`, amount, uid, amount)if err == nil && affected == 0 {return fmt.Errorf("update error, maybe balance not enough")}return err
}func tccAdjustBalance(db dtmcli.DB, uid int, amount int) error {affected, err := dtmimp.DBExec(BusiConf.Driver, db, `update dtm_busi.user_accountset trading_balance=trading_balance-?,balance=balance+? where user_id=?`, amount, amount, uid)if err == nil && affected == 0 {return fmt.Errorf("update user_account 0 rows")}return err
}

​ Try/Confirm/Cancel处理函数:

		app.POST(BusiAPI+"/TccBTransOutTry", dtmutil.WrapHandler(func(c *gin.Context) interface{} {req := reqFrom(c)if req.TransOutResult != "" {return string2DtmError(req.TransOutResult)}bb := MustBarrierFromGin(c)if req.Store == Redis {return bb.RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), req.Amount, 7*86400)} else if req.Store == Mongo {return bb.MongoCall(MongoGet(), func(sc mongo.SessionContext) error {return SagaMongoAdjustBalance(sc, sc.Client(), TransOutUID, -req.Amount, "")})}return bb.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return tccAdjustTrading(tx, TransOutUID, -req.Amount)})}))app.POST(BusiAPI+"/TccBTransOutConfirm", dtmutil.WrapHandler(func(c *gin.Context) interface{} {if reqFrom(c).Store == Redis || reqFrom(c).Store == Mongo {return nil}return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {return tccAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount)})}))app.POST(BusiAPI+"/TccBTransOutCancel", dtmutil.WrapHandler(TccBarrierTransOutCancel))app.POST(BusiAPI+"/TccBTransInTry", dtmutil.WrapHandler(func(c *gin.Context) interface{} {req := reqFrom(c)if req.TransInResult != "" {return string2DtmError(req.TransInResult)}return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {return tccAdjustTrading(tx, TransInUID, req.Amount)})}))app.POST(BusiAPI+"/TccBTransInConfirm", dtmutil.WrapHandler(func(c *gin.Context) interface{} {return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {return tccAdjustBalance(tx, TransInUID, reqFrom(c).Amount)})}))app.POST(BusiAPI+"/TccBTransInCancel", dtmutil.WrapHandler(func(c *gin.Context) interface{} {return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {return tccAdjustTrading(tx, TransInUID, -reqFrom(c).Amount)})}))

开启事务,进行分支调用

	req := busi.GenReqHTTP(30, false, false)gid := dtmimp.GetFuncName()// TccGlobalTransaction 会开启一个全局事务err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {// CallBranch 会将事务分支的Confirm/Cancel注册到全局事务上,然后直接调用Try_, err := tcc.CallBranch(req, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")assert.Nil(t, err)return tcc.CallBranch(req, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")})
2、处理网络异常

​ 同SAGA部分

3、处理回滚

​ 事务失败交互的时序图:

在这里插入图片描述

​ 跟成功的TCC差别就在于,当某个子事务返回失败后,后续就回滚全局事务,调用各个子事务的Cancel操作,保证全局事务全部回滚

  • TransIn的正向操作发生在提交之前,则补偿为空操作
  • TransIn的操作如果发生在提交后,则补偿操作会将数据提交一次

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/606090.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

卷积神经网络|迁移学习-猫狗分类完整代码实现

还记得这篇文章吗?迁移学习|代码实现 在这篇文章中,我们知道了在构建模型时,可以借助一些非常有名的模型,这些模型在ImageNet数据集上早已经得到了检验。 同时torchvision模块也提供了预训练好的模型。我们只需稍作修改&#xf…

qtday1(2024/1/8)

#include "mywidget.h"MyWidget::MyWidget(QWidget *parent): QMainWindow(parent) {//设置界面固定大小this->resize(1728,972);this->setFixedSize(1728,972);this->setWindowIcon(QIcon("C:\\Users\\78507\\Desktop\\pic\\qq1.png"));this->…

高级RAG(五):TruLens 评估-扩大和加速LLM应用程序评估

之前我们介绍了,RAGAs评估,今天我们再来介绍另外一款RAG的评估工具:TruLens , trulens是TruEra公司的一款开源软件工具,它可帮助您使用反馈功函数客观地评估基于 LLM 的应用程序的质量和有效性。反馈函数有助于以编程方式评估输入、输出和中间…

vue3 内置组件

文章目录 前言一、过渡效果相关的组件1、Transition2、TransitionGroup 二、状态缓存组件(KeepAlive)三、传送组件(Teleport )四、异步依赖处理组件(Suspense) 前言 在vue3中 其提供了5个内置组件 Transiti…

antv/x6_2.0学习使用(四、边)

一、添加边 节点和边都有共同的基类 Cell,除了从 Cell 继承属性外,还支持以下选项。 属性名类型默认值描述sourceTerminalData-源节点或起始点targetTerminalData-目标节点或目标点verticesPoint.PointLike[]-路径点routerRouterData-路由connectorCon…

猫咪吃哪种猫粮好?主食冻干猫粮哪种性价比高

由于猫咪是肉食动物,对蛋白质的需求很高,如果摄入的蛋白质不足,就会影响猫咪的成长。而冻干猫粮本身因为制作工艺的原因,能保留原有的营养成分和营养元素,所以冻干猫粮蛋白含量比较高,营养又高,…

第二十七周:文献阅读笔记

第二十七周:文献阅读笔记 摘要AbstractDenseNet 网络1. 文献摘要2. 引言3. ResNets4. Dense Block5. Pooling layers6. Implementation Details7. Experiments8. Feature Reuse9. 代码实现 总结 摘要 DenseNet(密集连接网络)是一种深度学习神…

工智能基础知识总结--词嵌入之FastText

什么是FastText FastText是Facebook于2016年开源的一个词向量计算和文本分类工具,它提出了子词嵌入的方法,试图在词嵌入向量中引入构词信息。一般情况下,使用fastText进行文本分类的同时也会产生词的embedding,即embedding是fastText分类的产物。 FastText流程 FastText的架…

计算机组成原理简答题

目录 1、指令和数据在计算机内部以几进制存储,又是如何区分的呢? 2、计算机内部为什么要使用二进制? 3、简单描述计算机系统的层次结构 4、DRAM为什么要进行刷新,如何刷新的? 5、简述不同操作码的指令格式&#xf…

FileStream文件管理

文件管理 FileStream:是一个用于读写文件的一个类。它提供了基于流的方式操作文件,可以进行读取、写入、查找和关闭等操作。 第一个参数:path(路径) 相对路径:相对于当前项目的bin目录下的Debug和Realse来…

[嵌入式AI从0开始到入土]10_yolov5在昇腾上应用

[嵌入式AI从0开始到入土]嵌入式AI系列教程 注:等我摸完鱼再把链接补上 可以关注我的B站号工具人呵呵的个人空间,后期会考虑出视频教程,务必催更,以防我变身鸽王。 第一章 昇腾Altas 200 DK上手 第二章 下载昇腾案例并运行 第三章…

【AI视野·今日NLP 自然语言处理论文速览 第七十一期】Fri, 5 Jan 2024

AI视野今日CS.NLP 自然语言处理论文速览 Fri, 5 Jan 2024 Totally 28 papers 👉上期速览✈更多精彩请移步主页 Daily Computation and Language Papers LLaMA Pro: Progressive LLaMA with Block Expansion Authors Chengyue Wu, Yukang Gan, Yixiao Ge, Zeyu Lu, …

java导出word套打

这篇文档手把手教你完成导出word套打&#xff0c;有这个demo&#xff0c;其他word套打导出都通用。 1、主要依赖 <!--hutool--><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.3.0</ve…

IPv6路由协议---IPv6动态路由(RIPng)

IPv6动态路由协议 动态路由协议有自己的路由算法,能够自动适应网络拓扑的变化,适用于具有一定数量三层设备的网络。缺点是配置对用户要求比较高,对系统的要求高于静态路由,并将占用一定的网络资源和系统资源。 路由表和FIB表 路由器转发数据包的关键是路由表和FIB表,每…

CreateDIBSection失败的问题记录

错误记录 [ERROR] (:0, ): QPixmap::fromWinHICON(), failed to GetIconInfo() (操作成功完成。) [ERROR] (:0, ): QPixmap::fromWinHICON(), failed to GetIconInfo() (参数错误。) [ERROR] (:0, ): QPixmap::fromWinHICON(), failed to GetIconInfo() (参数错误。) [ERROR] …

升级 Vite 5 出现警告 The CJS build of Vite‘s Node API is deprecated.

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

数仓建设学习路线(一)

前言 数仓建设实践路线是语兴发布在B站的系列课程&#xff0c;搜索语兴呀即可学习完整的数仓建设理论。 大数据相关岗位 大数据常见的岗位主要包括实时开发、数据治理、数据安全、数据资产等。 其中&#xff1a; 实时开发组的主要任务是实时可视化制作(大屏/彩蛋/战报&…

前端结合MQTT实现连接 订阅发送信息等操作 VUE3

MQTT客户端下载 使用测试 在我之前文章中 MQTT下载基础使用 下面记录一下前端使用的话的操作 1.安装 npm i mqtt引入 import * as mqtt from "mqtt/dist/mqtt.min"; //VUE3 import mqtt from mqtt //VUE2 一、MQTT协议中的方法 Connect。等待与服务器建立连接…

[VUE]2-vue的基本使用

目录 vue基本使用方式 1、vue 组件 2、文本插值 3、属性绑定 4、事件绑定 5、双向绑定 6、条件渲染 7、axios 8、⭐跨域问题 &#x1f343;作者介绍&#xff1a;双非本科大三网络工程专业在读&#xff0c;阿里云专家博主&#xff0c;专注于Java领域学习&#xff0c;擅…

气膜建筑:舒适、智能、可持续

气膜建筑之所以能够拥有广阔的发展空间&#xff0c;源于其融合了诸多优势特点&#xff0c;使其成为未来建筑领域的前沿趋势。 气膜建筑注重环境可持续性和能源效率。在材料和设计上&#xff0c;它采用可回收材料、提高热保温效果&#xff0c;并积极利用太阳能等可再生能源&…