dtm分布式事务框架之SAGA 实战

一.dtm分布式事务框架之SAGA

1.1DTM介绍

DTM是一款开源的分布式事务管理器,解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。

通俗一点说,DTM提供跨服务事务能力,一组服务要么全部成功,要么全部回滚,避免只更新了一部分数据产生的一致性问题。

您可以在为什么选DTM中了解更多DTM的设计初衷。

1.2SAGA介绍

10分钟说透Saga分布式事务

Saga是这一篇数据库论文SAGAS提到的一个分布式事务方案。其核心思想是将长事务拆分为多个本地短事务,由Saga事务协调器协调,如果各个本地事务成功完成那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。

与tcc(try,commit,cancel)不同,saga取消了commit阶段.可以出现中间状态.例如saga分布式事务(saga是dtm框架一部分):
1.从前往后执行事务,执行出错,向前补偿(回滚)
2.没有configm阶段,B可以看见中间状态

img

1.3.各种分布式事务应用场景

img

1.4DTM安装

这里采用的是源码编译安装

git clone https://github.com/dtm-labs/dtm && cd dtm
go build

启动后的界面如下

在这里插入图片描述

1.5HTTP-SAGA转账

这里参考的是DTM的SAGA例子

1.5.1创建我们的用户表
CREATE TABLE `user_account` (`id` int(11) NOT NULL AUTO_INCREMENT,`user_id` int(11) NOT NULL,`balance` decimal(10,2) NOT NULL DEFAULT '0.00',`trading_balance` decimal(10,2) NOT NULL DEFAULT '0.00',`create_time` datetime DEFAULT CURRENT_TIMESTAMP,`update_time` datetime DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `user_id` (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4
1.5.2编写核心业务代码

调整用户的账户余额

func SagaAdjustBalance(db *gorm.DB, uid int, amount float64) error {lock.Lock()defer lock.Unlock()if amount < 0 {var userAccount = UserAccount{}db.First(&userAccount, "user_id=?", uid)if userAccount.Balance < amount {return fmt.Errorf("余额不足")}}t := db.Exec("update user_account set balance = ? where user_id = ?", gorm.Expr("balance + ?", amount), uid)if t.Error != nil {return t.Error}return nil
}

再来编写具体的正向操作/补偿操作的处理函数

r.POST("/SagaBTransIn", func(c *gin.Context) {fmt.Printf("开始转入")userID := 1err = SagaAdjustBalance(db, userID, 100)if err != nil {fmt.Printf("转入失败:%s\r\n", err.Error())}fmt.Println("转入成功")})r.POST("/SagaBTransInCom", func(c *gin.Context) {fmt.Printf("转入失败,开始补偿")userID := 1err = SagaAdjustBalance(db, userID, -100)if err != nil {fmt.Printf("转入补偿失败:%s\r\n", err.Error())}fmt.Println("转入补偿成功")})r.POST("/SagaBTransOut", func(c *gin.Context) {fmt.Printf("开始转出")userID := 3err = SagaAdjustBalance(db, userID, -100)if err != nil {if err.Error() == "余额不足" {c.JSON(http.StatusConflict, gin.H{})return}fmt.Printf("转出失败:%s\r\n", err.Error())c.JSON(500, gin.H{"message": err.Error()})return}fmt.Println("转出成功")})r.POST("/SagaBTransOutCom", func(c *gin.Context) {fmt.Printf("转出补偿")userID := 3err = SagaAdjustBalance(db, userID, 100)if err != nil {fmt.Printf("转出补偿失败:%s\r\n", err.Error())}fmt.Println("转出补偿成功")})

到此各个子事务的处理函数已经OK了,然后是开启SAGA事务,进行分支调用

	r.GET("/start", func(c *gin.Context) {req := gin.H{}dmtServer := "http://127.0.0.1:36789/api/dtmsvr"qsBusi := "http://127.0.0.1:8089"saga := dtmcli.NewSaga(dmtServer, shortuuid.New()).// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务saga.WaitResult = trueerr := saga.Submit()if err != nil {c.JSON(500, gin.H{"message": err.Error()})}c.JSON(200, gin.H{"message": "ok"})})

完整代码如下

package mainimport ("fmt""github.com/dtm-labs/client/dtmcli""github.com/gin-gonic/gin""github.com/lithammer/shortuuid/v3""gorm.io/driver/mysql""gorm.io/gorm"glog "gorm.io/gorm/logger""log""net/http""os""sync""time"
)type UserAccount struct {ID             int     `gorm:"column:id;primary_key"`UserId         int     `gorm:"user_id"`Balance        float64 `gorm:"balance"`TradingBalance float64 `gorm:"trading_balance"`
}func (UserAccount) TableName() string {return "user_account"
}var lock sync.Mutexfunc SagaAdjustBalance(db *gorm.DB, uid int, amount float64) error {lock.Lock()defer lock.Unlock()if amount < 0 {var userAccount = UserAccount{}db.First(&userAccount, "user_id=?", uid)if userAccount.Balance < amount {return fmt.Errorf("余额不足")}}t := db.Exec("update user_account set balance = ? where user_id = ?", gorm.Expr("balance + ?", amount), uid)if t.Error != nil {return t.Error}return nil
}var db *gorm.DBfunc InitDB() error {var err errordsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local","root","123456","127.0.0.1","3306","dtm")//希望大家自己可以去封装loggernewLogger := glog.New(log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer(日志输出的目标,前缀和日志包含的内容——译者注)glog.Config{SlowThreshold:             time.Second, // 慢 SQL 阈值LogLevel:                  glog.Info,   // 日志级别IgnoreRecordNotFoundError: true,        // 忽略ErrRecordNotFound(记录未找到)错误Colorful:                  false,       // 禁用彩色打印},)db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{Logger: newLogger,})if err != nil {return err}return nil
}func main() {err := InitDB()if err != nil {panic(err)}r := gin.Default()r.POST("/SagaBTransIn", func(c *gin.Context) {fmt.Printf("开始转入")userID := 1err = SagaAdjustBalance(db, userID, 100)if err != nil {fmt.Printf("转入失败:%s\r\n", err.Error())}fmt.Println("转入成功")})r.POST("/SagaBTransInCom", func(c *gin.Context) {fmt.Printf("转入失败,开始补偿")userID := 1err = SagaAdjustBalance(db, userID, -100)if err != nil {fmt.Printf("转入补偿失败:%s\r\n", err.Error())}fmt.Println("转入补偿成功")})r.POST("/SagaBTransOut", func(c *gin.Context) {fmt.Printf("开始转出")userID := 3err = SagaAdjustBalance(db, userID, -100)if err != nil {if err.Error() == "余额不足" {c.JSON(http.StatusConflict, gin.H{})return}fmt.Printf("转出失败:%s\r\n", err.Error())c.JSON(500, gin.H{"message": err.Error()})return}fmt.Println("转出成功")})r.POST("/SagaBTransOutCom", func(c *gin.Context) {fmt.Printf("转出补偿")userID := 3err = SagaAdjustBalance(db, userID, 100)if err != nil {fmt.Printf("转出补偿失败:%s\r\n", err.Error())}fmt.Println("转出补偿成功")})r.GET("/start", func(c *gin.Context) {req := gin.H{}dmtServer := "http://127.0.0.1:36789/api/dtmsvr"qsBusi := "http://127.0.0.1:8089"saga := dtmcli.NewSaga(dmtServer, shortuuid.New()).// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务saga.WaitResult = trueerr := saga.Submit()if err != nil {c.JSON(500, gin.H{"message": err.Error()})}c.JSON(200, gin.H{"message": "ok"})})r.Run(":8089")
}
1.5.3测试

启动 main.go,在浏览上运行http://127.0.0.1:8089/start

可以看到如下的运行结果:

[GIN-debug] Listening and serving HTTP on :8089
开始转出
2023/12/07 10:16:06 E:/Linuxshare/GoStart/dtm/main.go:37
[64.070ms] [rows:1] SELECT * FROM `user_account` WHERE user_id=3 ORDER BY `user_account`.`id` LIMIT 1
&gid=3NujmbFwy6caKsX88fkApj&op=action&trans_type=saga"
开始转入
2023/12/07 10:16:06 E:/Linuxshare/GoStart/dtm/main.go:42
[5.984ms] [rows:1] update user_account set balance = balance + 100 where user_id = 1
转入成功
[GIN] 2023/12/07 - 10:16:06 | 200 |     51.7071ms |       127.0.0.1 | POST     "/SagaBTransIn?branch_id=02&
gid=3NujmbFwy6caKsX88fkApj&op=action&trans_type=saga"
[GIN] 2023/12/07 - 10:16:06 | 200 |    667.8659ms |       127.0.0.1 | GET      "/start"                    
[GIN] 2023/12/07 - 10:16:06 | 404 |            0s |       127.0.0.1 | GET      "/favicon.ico"   

1.6GRPC-SAGA库存服务

1.6.1复制一份conf.sample.yml 改名为conf.yaml

这里面采用的通信协议是kratos,代码如下

MicroService: # gRPC/HTTP based microservice configDriver: 'dtm-driver-kratos' # name of the driver to handle register/discoverTarget: 'consul://127.0.0.1:8500/dtmservice' # register dtm server to this urlEndPoint: 'grpc://127.0.0.1:36790'

修改完后,重新启动DTM,启动的时候要加参数,如下图所示

在这里插入图片描述

启动完后就可以看到已经注册到consul上去了

在这里插入图片描述

1.6.2编写具体的服务

SAGA库存服务的具体代码如下

package mainimport (proto "GoStart/api/inventory/v1""fmt""github.com/dtm-labs/client/dtmgrpc""github.com/gin-gonic/gin""github.com/google/uuid"
)func main() {r := gin.Default()r.GET("/start", func(c *gin.Context) {orderSn := uuid.NewString()req := &proto.SellInfo{GoodsInfo: []*proto.GoodsInvInfo{{GoodsId: 421,Num:     2,},},OrderSn: orderSn,}dmtServer := "127.0.0.1:36790"qsBusi := "discovery:///inventory-srv"uid := uuid.NewString()fmt.Println("uid", uid)saga := dtmgrpc.NewSagaGrpc(dmtServer, orderSn).// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"Add(qsBusi+"/Inventory/Sell", qsBusi+"/Inventory/Reback", req)// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务saga.WaitResult = trueerr := saga.Submit()if err != nil {c.JSON(500, gin.H{"message": err.Error()})}c.JSON(200, gin.H{"message": "ok"})})r.Run(":8089")
}

1.6.3启动服务进行测试

商品服务:

在这里插入图片描述

订单服务:

在这里插入图片描述

库存服务:

在这里插入图片描述

库存服务原先的数据如下

在这里插入图片描述

这时候运行SAGA库存服务的代码,然后在浏览器上访问http://127.0.0.1:8089/start,可以在库存服务看到如下运行情况

2023-12-07 10:41:27.639 INFO    v1/inventory.go:56      订单a0ee4972-407e-4625-a478-c8ccbe42c28d扣减库存2023/12/07 10:41:27 E:/Linuxshare/mxshop/app/inventory/srv/internal/data/v1/db/inventory.go:94
[4.898ms] [rows:1] SELECT * FROM `inventory` WHERE goods = 421 AND `inventory`.`deleted_at` IS NULL ORDER BY `inventory`.`id` LIMIT 12023/12/07 10:41:27 E:/Linuxshare/mxshop/app/inventory/srv/internal/data/v1/db/inventory.go:58
[5.495ms] [rows:1] UPDATE `inventory` SET `stocks`=stocks - 2 WHERE goods=421 AND stocks >= 2 AND `inventory`.`deleted_at` IS NULL 2023/12/07 10:41:27 E:/Linuxshare/mxshop/app/inventory/srv/internal/data/v1/db/inventory.go:76
[17.646ms] [rows:1] INSERT INTO `stockselldetail` (`order_sn`,`status`,`detail`) VALUES ('a0ee4972-407e-4625-a478-c8ccbe42c28d',1,'
[{"Goods":421,"Num":2}]')

再看数据库,库存的数据已发生变动,库存明细数据也行插入了

在这里插入图片描述

在这里插入图片描述

1.7事务屏障达到通过gin集成转入转出功能

1.7.1事务屏障介绍

异常与子事务屏障

分布式事务之所以难,主要是因为分布式系统中的各个节点都可能发生各种非预期的情况。本文先介绍分布式系统中的异常问题,然后介绍这些问题带给分布式事务的挑战,接下来指出现有各种常见用法的问题,最后给出正确的方案。

NPC的挑战

分布式系统最大的敌人可能就是NPC了,在这里它是Network Delay, Process Pause, Clock Drift的首字母缩写。我们先看看具体的NPC问题是什么:

  • Network Delay,网络延迟。虽然网络在多数情况下工作的还可以,虽然TCP保证传输顺序和不会丢失,但它无法消除网络延迟问题。
  • Process Pause,进程暂停。有很多种原因可以导致进程暂停:比如编程语言中的GC(垃圾回收机制)会暂停所有正在运行的线程;再比如,我们有时会暂停云服务器,从而可以在不重启的情况下将云服务器从一台主机迁移到另一台主机。我们无法确定性预测进程暂停的时长,你以为持续几百毫秒已经很长了,但实际上持续数分钟之久进程暂停并不罕见。
  • Clock Drift,时钟漂移。现实生活中我们通常认为时间是平稳流逝,单调递增的,但在计算机中不是。计算机使用时钟硬件计时,通常是石英钟,计时精度有限,同时受机器温度影响。为了在一定程度上同步网络上多个机器之间的时间,通常使用NTP协议将本地设备的时间与专门的时间服务器对齐,这样做的一个直接结果是设备的本地时间可能会突然向前或向后跳跃。

分布式事务既然是分布式的系统,自然也有NPC问题。因为没有涉及时间戳,带来的困扰主要是NP。

异常分类

我们以分布式事务中的TCC作为例子,看看NP带来的影响。

一般情况下,一个TCC回滚时的执行顺序是,先执行完Try,再执行Cancel,但是由于N,则有可能Try的网络延迟大,导致先执行Cancel,再执行Try。

这种情况就引入了分布式事务中的两个难题:

  • 空补偿: Cancel执行时,Try未执行,事务分支的Cancel操作需要判断出Try未执行,这时需要忽略Cancel中的业务数据更新,直接返回
  • 悬挂: Try执行时,Cancel已执行完成,事务分支的Try操作需要判断出Cancel已执行,这时需要忽略Try中的业务数据更新,直接返回

分布式事务还有一类需要处理的常见问题,就是重复请求

  • 幂等: 由于任何一个请求都可能出现网络异常,出现重复请求,所有的分布式事务分支操作,都需要保证幂等性

因为空补偿、悬挂、重复请求都跟NP有关,我们把他们统称为子事务乱序问题。在业务处理中,需要小心处理好这三种问题,否则会出现错误数据。

异常原因

下面看一个网络异常的时序图,更好的理解上述几种问题

exception

  • 业务处理请求4的时候,Cancel在Try之前执行,需要处理空回滚
  • 业务处理请求6的时候,Cancel重复执行,需要幂等
  • 业务处理请求8的时候,Try在Cancel后执行,需要处理悬挂

现有方案的问题

我们看到开源项目dtm之外,包括各云厂商,各开源项目,他们给出的业务实现建议大多类似如下(这也是大多数用户最容易想到的方案):

  • 空补偿: “针对该问题,在服务设计时,需要允许空补偿,即在没有找到要补偿的业务主键时,返回补偿成功,并将原业务主键记录下来,标记该业务流水已补偿成功。”
  • 防悬挂: “需要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在,如果存在则要拒绝执行该笔服务,以免造成数据不一致。”

上述的这种实现,能够在大部分情况下正常运行,但是上述做法中的“先查后改”在并发情况下是容易掉坑里的,我们分析以下如下场景:

  • 正常执行顺序下,Try执行时,在查完没有空补偿记录的业务主键之后,事务提交之前,如果发生了进程暂停P,或者事务内部进行网络请求出现了拥塞,导致本地事务等待较久
  • 全局事务超时后,Cancel执行,因为没有查到要补偿的业务主键,因此判断是空补偿,返回
  • Try的进程暂停结束,最后提交本地事务
  • 全局事务回滚完成后,Try分支的业务操作没有被回滚,产生了悬挂

事实上,NPC里的P和C,以及P和C的组合,有很多种的场景,都可以导致上述竞态情况,就不一一赘述了。

虽然这种情况发生的概率不高,但是在金融领域,一旦涉及金钱账目,那么带来的影响可能是巨大的。

PS:幂等控制如果也采用“先查再改”,也是一样很容易出现类似的问题。解决这一类问题的关键点是要利用唯一索引,“以改代查”来避免竞态条件。

子事务屏障

我们在dtm中,首创了子事务屏障技术,使用该技术,能够非常便捷的解决异常问题,极大的降低了分布式事务的使用门槛。

子事务屏障能够达到下面这个效果,看示意图:

barrier

所有这些请求,到了子事务屏障后:不正常的请求,会被过滤;正常请求,通过屏障。开发者使用子事务屏障之后,前面所说的各种异常全部被妥善处理,业务开发人员只需要关注实际的业务逻辑,负担大大降低。 子事务屏障提供了方法BranchBarrier.CallWithDB ,方法的原型为:

func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BusiFunc) error

业务开发人员,在busiCall里面编写自己的相关逻辑,调用 BranchBarrier.CallWithDB 。 BranchBarrier.CallWithDB 保证,在空回滚、悬挂等场景下,busiCall不会被调用;在业务被重复调用时,有幂等控制,保证只被提交一次。

子事务屏障会管理TCC、SAGA、事务消息等,也可以扩展到其他领域

原理

子事务屏障技术的原理是,在本地数据库,建立分支操作状态表dtm_barrier,唯一键为全局事务id-分支id-分支操作(try|confirm|cancel)

  1. 开启本地事务
  2. 对于当前操作op(try|confirm|cancel),insert ignore一条数据gid-branchid-op,如果插入不成功,提交事务返回成功(常见的幂等控制方法)
  3. 如果当前操作是cancel,那么在insert ignore一条数据gid-branchid-try,如果插入成功(注意是成功),则提交事务返回成功
  4. 调用屏障内的业务逻辑,如果业务返回成功,则提交事务返回成功;如果业务返回失败,则回滚事务返回失败

在此机制下,解决了乱序相关的问题

  • 空补偿控制–如果Try没有执行,直接执行了Cancel,那么3中Cancel插入gid-branchid-try会成功,不走屏障内的逻辑,保证了空补偿控制
  • 幂等控制–2中任何一个操作都无法重复插入唯一键,保证了不会重复执行
  • 防悬挂控制–Try在Cancel之后执行,那么Cancel会在3中插入gid-branchid-try,导致Try在2中不成功,就不执行屏障内的逻辑,保证了防悬挂控制

对于SAGA、二阶段消息,也是类似的机制。

原理图解

下面我们以图的方式来详解子事务屏障,因为Confirm操作不涉及空补偿和悬挂,所以重点看Try与Cancel,Try对应图中的A,Cancel对应图中的C:

子事务屏障中对应的幂等处理部分:

barrier-idem

这部分就是常规的幂等处理部分,往数据库中插入一个唯一键,如果是重复请求,那么插入失败,直接失败返回。

子事务屏障技术就是在上述的幂等处理部分,添加一个步骤–补偿服务再插入一条A记录,正常流程下,会因为唯一键冲突导致插入失败,往下执行业务。

在这里插入图片描述

当发生乱序,假设C在A前面执行,那么会发生下面的时序图:

在这里插入图片描述

  • 对于C操作,他先于A执行,是一个空补偿;此时C操作插入A记录时,发现插入成功,直接返回
  • 对于A操作,他在C之后执行,是一个悬挂;此时A操作插入A记录时,发现插入失败,直接返回

这两种情况都会被子事务屏障拦截返回,而不执行内部的业务操作。可以看到子事务屏障非常巧妙的解决了幂等、空补偿和悬挂三个问题。

竞态分析

上面分析了Try和Cancel的执行时间没有重叠的情况下,能够解决空补偿和悬挂问题。如果出现了Try和Cancel执行时间重叠的情况,我们看看会发生什么。

假设Try和Cancel并发执行,Cancel和Try都会插入同一条记录gid-branchid-try,由于唯一索引冲突,那么两个操作中只有一个能够成功,而另一个则会等持有锁的事务完成后返回。

  • 情况1,Try插入gid-branchid-try失败,Cancel操作插入gid-branchid-try成功,此时就是典型的空补偿和悬挂场景,按照子事务屏障算法,Try和Cancel都会直接返回
  • 情况2,Try插入gid-branchid-try成功,Cancel操作插入gid-branchid-try失败,按照上述子事务屏障算法,会正常执行业务,而且业务执行的顺序是Try在Cancel前
  • 情况3,Try和Cancel的操作在重叠期间又遇见宕机等情况,那么至少Cancel会被dtm重试,那么最终会走到情况1或2。

综上各种情况的详细论述,子事务屏障能够在各种NP情况下,保证最终结果的正确性。

优点

事实上,子事务屏障有大量优点,包括:

  • 两个insert判断解决空补偿、防悬挂、幂等这三个问题,比其他方案的三种情况分别判断,逻辑复杂度大幅降低
  • dtm的子事务屏障是SDK层解决这三个问题,业务完全不需要关心
  • 性能高,对于正常完成的事务(一般失败的事务不超过1%),子事务屏障的额外开销是每个分支操作一个SQL,比其他方案代价更小。

支持的存储

目前子事务屏障已经支持了

  • 数据库:包括 Mysql, Postgres, 以及与Mysql,Postgres兼容的数据库
  • 缓存 Redis:采用 Lua 脚本事务支持
  • Mongo:采用 Mongo 的事务支持

在子事务屏障的支持下,您可以将Redis、Mongo和数据库的事务组合在一起,形成一个全局事务。相关用法,可以在dtm-examples里面找到

理论上支持事务的各种存储都可以轻松实现子事务屏障,例如 TiKV 等,如果较多用户有这样的需求,我们将会快速支持。

对接orm库

barrier提供了sql标准接口,但大家的应用通常都会引入更高级的orm库,而不是裸用sql接口,因此需要进行转化. 相关的对接参考对接ORM

1.7.2建表
create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(id bigint(22) PRIMARY KEY AUTO_INCREMENT,trans_type varchar(45) default '',gid varchar(128) default '',branch_id varchar(128) default '',op varchar(45) default '',barrier_id varchar(45) default '',reason varchar(45) default '' comment 'the branch type who insert this record',create_time datetime DEFAULT now(),update_time datetime DEFAULT now(),key(create_time),key(update_time),UNIQUE key(gid, branch_id, op, barrier_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
17.3代码编写
package mainimport ("database/sql""fmt""log""net/http""os""sync""time""github.com/gin-gonic/gin""github.com/lithammer/shortuuid/v3""github.com/dtm-labs/client/dtmcli""gorm.io/driver/mysql""gorm.io/gorm"glog "gorm.io/gorm/logger"
)type UserAccount struct {ID             int     `gorm:"column:id;primary_key"`UserId         int     `gorm:"user_id"`Balance        float64 `gorm:"balance"`TradingBalance float64 `gorm:"trading_balance"`
}func (UserAccount) TableName() string {return "user_account"
}var lock sync.Mutex// 转入和转出的时候,都要加锁,否则会出现并发问题
func SagaAdjustBalance(db *sql.Tx, uid int, amount float64) error {lock.Lock()defer lock.Unlock()if amount < 0 {var balance float64db.QueryRow("select balance from dtm.user_account where user_id = ?", uid).Scan(&balance)if balance < -amount {return fmt.Errorf("余额不足")}}_, err := db.Exec("update dtm.user_account set balance = balance + ? where user_id = ?", amount, uid)if err != nil {return err}return nil
}var db *gorm.DBfunc initDB() error {dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local","root","root","192.168.2.13","3306","dtm")newLogger := glog.New(log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer(日志输出的目标,前缀和日志包含的内容——译者注)glog.Config{SlowThreshold:             time.Second, // 慢 SQL 阈值LogLevel:                  glog.Info,   // 日志级别IgnoreRecordNotFoundError: true,        // 忽略ErrRecordNotFound(记录未找到)错误Colorful:                  false,       // 禁用彩色打印},)var err errordb, err = gorm.Open(mysql.Open(dsn), &gorm.Config{Logger: newLogger,})if err != nil {return err}return nil
}// 获取屏障
// MustBarrierFromGin 1
func MustBarrierFromGin(c *gin.Context) *dtmcli.BranchBarrier {ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())fmt.Println(err)return ti
}// 服务发现, 库存服务有5个
func main() {err := initDB()if err != nil {panic(err)}r := gin.Default()r.POST("/SagaBTransIn", func(c *gin.Context) {barrier := MustBarrierFromGin(c) //1.生成一个屏障tx := db.Begin()                 //2.开启事务sourceTx := tx.Statement.ConnPool.(*sql.Tx)err := barrier.Call(sourceTx, func(tx1 *sql.Tx) error { //3.将业务逻辑翻到Call方法执行fmt.Println("开始转入")userID := 1err := SagaAdjustBalance(sourceTx, userID, 100) //4.修改gorm为 sql.Tx并使用原生sql查询(gorm支持不全)if err != nil {fmt.Printf("转入失败:%s\r\n", err.Error())return err}return nil})if err != nil {c.JSON(http.StatusOK, gin.H{"code": 1, "msg": err.Error()})return}return})r.POST("/SagaBTransInCom", func(c *gin.Context) {fmt.Println("转入失败, 开始补偿")//userID := 1//err := SagaAdjustBalance(db, userID, -100)//if err != nil {//	fmt.Printf("转入补偿失败:%s\r\n", err.Error())//	return//}fmt.Println("转入补偿成功")})r.POST("/SagaBTransOut", func(c *gin.Context) {barrier := MustBarrierFromGin(c)tx := db.Begin()sourceTx := tx.Statement.ConnPool.(*sql.Tx)err := barrier.Call(sourceTx, func(tx1 *sql.Tx) error {fmt.Println("开始转出")userID := 3err := SagaAdjustBalance(sourceTx, userID, -100)if err != nil {if err.Error() == "余额不足" {c.JSON(http.StatusConflict, gin.H{})}fmt.Printf("转出失败:%s\r\n", err.Error())c.JSON(500, gin.H{"msg": err.Error()})}fmt.Println("转出成功")return nil})if err != nil {c.JSON(http.StatusOK, gin.H{"code": 1, "msg": err.Error()})return}return})r.POST("/SagaBTransOutCom", func(c *gin.Context) {fmt.Println("转出失败, 开始补偿")//userID := 3//err := SagaAdjustBalance(db, userID, 100)//if err != nil {//	fmt.Printf("转出补偿失败:%s\r\n", err.Error())//	return//}fmt.Println("转出补偿成功")})r.GET("start", func(c *gin.Context) {req := gin.H{}dmtServer := "http://127.0.0.1:36789/api/dtmsvr"qsBusi := "http://127.0.0.1:8089"saga := dtmcli.NewSaga(dmtServer, shortuuid.New()).// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务saga.WaitResult = trueerr := saga.Submit()if err != nil {c.JSON(500, gin.H{"message": err.Error()})}c.JSON(200, gin.H{"message": "ok"})})r.Run(":8089")
}

s\r\n", err.Error())
// return
//}
fmt.Println(“转出补偿成功”)
})

r.GET("start", func(c *gin.Context) {req := gin.H{}dmtServer := "http://127.0.0.1:36789/api/dtmsvr"qsBusi := "http://127.0.0.1:8089"saga := dtmcli.NewSaga(dmtServer, shortuuid.New()).// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCom"Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCom"Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务saga.WaitResult = trueerr := saga.Submit()if err != nil {c.JSON(500, gin.H{"message": err.Error()})}c.JSON(200, gin.H{"message": "ok"})
})r.Run(":8089")

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/203881.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【天线了解】1.004天线的了解以及使用

一。004天线使用步骤 1.打开天线 &#xff08;1&#xff09;天线的各种版本 注意&#xff1a; 《1》天线包括单通道天线程序&#xff0c;双通道天线程序等。 《2》在没有连接天线时&#xff0c;有的天线程序打不开。 &#xff08;2&#xff09;打开软件前的配置工作 注意&…

接鸡冠^^

欢迎来到程序小院 接鸡冠 玩法&#xff1a;左右移动棒棒君(小海豹)接住鸡冠&#xff0c;避开炸弹&#xff0c;若不小心接住炸弹则游戏结束&#xff0c; 赶紧接鸡冠吧&#xff0c;看看你能够接住多少鸡冠哦^^。。开始游戏https://www.ormcc.com/play/gameStart/211 html <di…

【精选】设计模式——策略设计模式-两种举例说明,具体代码实现

Java策略设计模式 简介 策略设计模式是一种行为型设计模式&#xff0c;它允许在运行时选择算法的行为。 在软件开发中&#xff0c;我们常常需要根据不同情况采取不同的行为。通常的做法是使用大量的条件语句来实现这种灵活性&#xff0c;但这会导致代码变得复杂、难以维护和扩…

Unity打包EXE自定义(拖拽)窗口大小

代码 using System.Collections; using System.Collections.Generic; using UnityEngine; using System; using System.Runtime.InteropServices; public class MyWindow : MonoBehaviour {[DllImport("user32.dll")]private static extern IntPtr GetActiveWindow(…

CSS-自适应导航栏(flex | grid)

目标&#xff1a;实现左右各有按钮&#xff0c;中间是内容&#xff0c;自适应显示中间的内容导航栏&#xff0c;即 根据中间的宽度大小显示内容。 自适应导航栏 总结&#xff1a;推荐 flex布局 / grid布局 flex布局&#xff1a; 两侧 flex:1; ----->中间自适应 grid布局&…

uniapp(微信小程序)聊天实例,支持图片,语音,表情(附源码)

效果预览 安装教程 配置 请参考Dome 会话配置 {info:{// 用户关键字userKey:2666,// 用户手机userPhone:15252156614,// 用户昵称userName: 健健,// 头像headImg: http://d.hiphotos.baidu.com/image/h%3D300/sign0defb42225381f3081198ba999004c67/6159252dd42a2834a75bb01…

CRM客户关系管理系统的主要功能有哪些?

我们都知道&#xff0c;CRM系统可以帮助企业加快业务增长。如果一个企业能提高业务效率、跨团队协作、有效管理客户、轻松共享和同步数据&#xff0c;那么企业竞争力将极大地提高。基于此&#xff0c;我们说说CRM客户关系管理系统的主要功能分析。 完整的CRM是什么样的&#x…

红队专题-开源资产扫描系统-ARL资产灯塔系统

ARL资产灯塔系统 安装说明问题 &#xff1a; 安装说明 源码地址 https://github.com/TophantTechnology/ARL https://github.com/TophantTechnology/ARL/wiki/Docker-%E7%8E%AF%E5%A2%83%E5%AE%89%E8%A3%85-ARL 安装环境 uname -a Linux VM-24-12-centos 3.10.0-1160.49.1.e…

亚马逊云科技re:Invent,生成式AI正在彻底改变开发者的工作方式

去年此时&#xff0c;ChatGPT横空出世席卷全球&#xff0c;许多人称其意味着AI的iPhone时刻到来。CSDN创始人蒋涛对此曾预测&#xff1a;「下一步就是应用时刻&#xff0c;新应用时代将来临……大模型将推动更多的AI应用程序员诞生」。 在2023亚马逊云科技re:Invent全球大会第三…

Linux--环境变量

一.基本概念 * 环境变量 (environment variables) 一般是指在操作系统中用来指定操作系统运行环境的一些参数 * 如&#xff1a;我们在编写 C/C 代码的时候&#xff0c;在链接的时候&#xff0c;从来不知道我们的所链接的动态静态库在哪里&#xff0c;但 是照样可以链接成功&am…

使用jenkins插件Allure生成自动化测试报告

前言 以前做自动化测试的时候一直用的HTMLTestRunner来生成测试报告&#xff0c;后来也尝试过用Python的PyH模块自己构建测试报告&#xff0c;在后来看到了RobotFramework的测试报告&#xff0c;感觉之前用的测试报告都太简陋&#xff0c;它才是测试报告应该有的样子。也就是在…

微信小程序 -- ios 底部小黑条样式问题

问题&#xff1a; 如图&#xff0c;ios有的机型底部伪home键会显示在按钮之上&#xff0c;导致点击按钮的时候误触 解决&#xff1a; App.vue <script>export default {wx.getSystemInfo({success: res > {let bottomHeight res.screenHeight - res.safeArea.bott…

c语言五子棋

下面是一个简单的C语言五子棋实现示例&#xff1a; #include <stdio.h>#include <stdlib.h>#define BOARD_SIZE 15char board[BOARD_SIZE][BOARD_SIZE];void init_board() { int i, j; for (i 0; i < BOARD_SIZE; i) { for (j 0; j < BOARD_…

HarmonyOS4.0从零开始的开发教程09页签切换

HarmonyOS&#xff08;七&#xff09;页签切换 List组件和Grid组件的使用 Tabs组件的使用 概述 在我们常用的应用中&#xff0c;经常会有视图内容切换的场景&#xff0c;来展示更加丰富的内容。比如下面这个页面&#xff0c;点击底部的页签的选项&#xff0c;可以实现“首页…

CTF工控工业互联网(ISC)复现总结WP(超详细)

工业互联网复现 Modbus协议&#xff1a;MMS协议&#xff1a;ISC工业互联网比赛题目复现&#xff1a;Modbus协议分析&#xff1a;组态软件安全分析&#xff1a;工业协议分析1&#xff1a;工业协议分析2&#xff1a;特殊的工控流量&#xff1a; Modbus协议&#xff1a; Modbus 市…

JavaScript中的连续赋值问题a.x = a = {n:2}

输出以下代码的执行结果并解释为什么 var a {n: 1}; var b a; a.x a {n: 2}; console.log (a.x); console.log (b.x); 下面来分析下这段简单代码的工作步骤&#xff0c;从而进一步理解js引用类型“赋值”的工作方式。 首先第一行和第二行 var a {n:1}; var b a; …

JVM 命令行监控及诊断工具

面试题 你使用过Java虚拟机性能监控和故障处理工具吗&#xff1f;&#xff08;美图&#xff09; 怎么打出线程栈信息。&#xff08;字节跳动&#xff09; JVM诊断调优工具用过哪些&#xff1f; (京东) 怎么获取 Java 程序使用的内存&#xff1f;堆使用…

学生成绩管理系统(Java)

开发环境: Windows 11 IDEA 2021.3.3 需求: package com.it.neu;import java.util.ArrayList; import java.util.Scanner;import static java.time.Clock.system;class Student { //创建学生类private String Stu_name;private String Stu_id;public Student(String id, S…

圆通单号查询,圆通速递物流查询,对需要的单号进行颜色标记

批量查询圆通速递单号的物流信息&#xff0c;并对需要的单号进行颜色标记。 所需工具&#xff1a; 一个【快递批量查询高手】软件 圆通速递单号若干 操作步骤&#xff1a; 步骤1&#xff1a;运行【快递批量查询高手】软件&#xff0c;第一次使用的伙伴记得先注册&#xff0c…

【Java】实现顺序表基本的操作(数据结构)

文章目录 前言顺序表1、打印顺序表2、增加元素3、在任意位置增加元素4、判断是否包含某个元素5、查找某个元素对于的位置6、获取任意位置的元素7、将任意位置的元素设为value8、删除第一次出现的关键字9、获取顺序表长度10、清空顺序表总结 前言 在了解顺序表之前我们要先了解…