Go-zero中分布式事务的实现(DTM分布式事务管理器,在一个APi中如何调用两个不同服务的rpc层,并保证两个不同服务之间的业务逻辑同时成功)

涉及到的相关技术

     1.DTM分布式事务管理器,解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。

      2.SAGA事务模式,SAGA事务模式是DTM中常用的一种模式,简单易上手.(当然还有其它更多的事务模式,这里采用的SAGA只不过是其中一种较为简单的方法)

      3.Go-zero框架,ETCD服务注册...

更多内容移步至:go-zero 缩短从需求到上线的距离 和 介绍 | DTM开源项目文档

业务场景

        如果是在单体架构的业务当中,是不需要用到分布式事务的.单体架构中,涉及到需要保证多个事务同时成功的场景,只需要创建一个全局的事务对象 如:tx := db.Begin(),然后统一用这一个tx去管理接下来的业务逻辑即可.

        不清楚在一个api中如何调用其它服务rpc的可以看看我的另一篇博客中的一种解决办法:

go-zero标准的项目结构,以及如何使用docker-compose部署道linux服务器上-CSDN博客

        但是在go-zero框架的这种微服务中,比如说:我在一个用户服务的api中调用了用户服务rpc中注册的业务,并且同时还调用了标签服务的rpc层中的选择标签的业务. 那么,此时我就需要保证用户的注册和标签的选择这两个在不同服务下执行的业务逻辑同时成功.(总不能用户账号密码插入到的表中,但是突然断网了,导致标签没有选择上去吧,这个是不符合我的业务的).

DTM 环境搭建(Windows本地搭建)

        !!!!!!!!!!!!!! 这个环境请注意,是需要在你本地去搭建的,至于为什么,我会在后面解释,最重要的先把环境搭建起来吧! 我采用的是docker-compose去搭建.(如果不了解windows电脑如何配置docker环境,可以移步:)

Windows11电脑是如何搭建docker环境的-CSDN博客

        废话不多说,首先从搭建环境讲起.(我这里采用的是docker-compose搭建我需要的环境)

上图就是项目的结构

在dtm和etcd的目录下面各自新建一个Dockerfile文件,Dockerfile都不需要过多的配置,只需要用到最基础的镜像即可.在dtm的目录下还需要新建一个config.yml文件.

DTM下Dokcerfile以及config.yml的编写

FROM yedf/dtm:latestLABEL maintainer="zyf021026 <shichuxin6@163.com>"
# 指定要存储trans状态的存储驱动
# Store:### 默认存储驱动
#   Driver: 'boltdb'### redis 存储驱动
#   Driver: 'redis'
#   Host: 'localhost'
#   User: ''
#   Password: ''
#   Port: 6379### mysql 存储驱动
#   Driver: 'mysql'
#   Host: 'mysql'
#   User: 'root'
#   Password: '123456'
#   Port: 3306### postgres 存储驱动
#   Driver: 'postgres'
#   Host: 'localhost'
#   User: 'postgres'
#   Password: 'mysecretpassword'
#   Port: '5432'### 以下配置仅适用于 postgres/mysql 驱动
#   MaxOpenConns: 500
#   MaxIdleConns: 500
#   ConnMaxLifeTime: 5
#   TransGlobalTable: 'dtm.trans_global'
#   TransBranchOpTable: 'dtm.trans_branch_op'### 以下配置仅适用于 redis/boltdb 驱动
#   DataExpire: 604800 # Trans 过期时间
#   RedisPrefix: '{}'  # Redis 存储前缀MicroService:Driver: 'dtm-driver-gozero'           # 要处理注册/发现的驱动程序的名称Target: 'etcd://your-ip:2379/dtmservice' # 注册 dtm 服务的 etcd 地址EndPoint: 'your-ip:36790'# 以下配置的单位为'秒'
# TransCronInterval: 3
# TimeoutToFail: 35
# RetryInterval: 10# 日志等级
# LogLevel: 'info'

ETCD的Dockerfile文件编写

FROM bitnami/etcd:latestLABEL maintainer="zyf021026 <shichuxin6@163.com>"

使用docker-compose 构建镜像,启动容器


version: '3'networks:backend:driver: bridge######## 项目依赖的环境,启动项目之前要先启动此环境 #######
services:etcd:build:context: etcdenvironment:- TZ=Asia/Shanghai- ALLOW_NONE_AUTHENTICATION=yesports: # 设置端口映射- "2379:2379"networks:- backendrestart: alwaysdtm:build:context: ./dtmenvironment:- TZ=Asia/Shanghaientrypoint:- "/app/dtm/dtm"- "-c=/app/dtm/configs/config.yaml"privileged: truevolumes:- ./dtm/config.yml:/app/dtm/configs/config.yaml # 将 dtm 配置文件挂载到容器里ports:- "36789:36789"- "36790:36790"networks:- backendrestart: alwaysdepends_on:- etcd

   在根目录下面执行docker-compose up -d 将需要的环境搭建起来

执行如下图中的命令:

        新建子事务屏障的数据库(库名和表名请不要修改) 可以作为独立的一个数据库使用,没有必要把自己的项目数据库名称改为dtm_barrier

/*Navicat Premium Data TransferSource Server         : LinkSource Server Type    : MySQLSource Server Version : 50743Source Host           : 39.101.77.206:3306Source Schema         : dtm_barrierTarget Server Type    : MySQLTarget Server Version : 50743File Encoding         : 65001Date: 03/03/2024 13:57:48
*/SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for barrier
-- ----------------------------
DROP TABLE IF EXISTS `barrier`;
CREATE TABLE `barrier`  (`id` bigint(22) NOT NULL AUTO_INCREMENT,`trans_type` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`gid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`branch_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`op` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`barrier_id` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`reason` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '' COMMENT 'the branch type who insert this record',`create_time` datetime NULL DEFAULT CURRENT_TIMESTAMP,`update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`) USING BTREE,UNIQUE INDEX `gid`(`gid`, `branch_id`, `op`, `barrier_id`) USING BTREE,INDEX `create_time`(`create_time`) USING BTREE,INDEX `update_time`(`update_time`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1482 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;

SAGA事务模式的使用

        简单说明一下,SAGA分布式事务模式,是没有办法携带返回值的,因此尽量此处要避免需要有返回值的业务场景.

        直接用代码来展示SAGA事务模式的使用方法吧!

    用户注册服务PRC的编写以及事务失败补偿机制的编写

这里不再演示proto文件是如何编写的

用户注册服务的rpc

func (l *UserCreateLogic) UserCreate(in *user.UserCreateRequest) (pd *user.UserCreateResponse, endErr error) {// 获取 RawDB// 注册db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()// 获取子事务屏障对象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)if err != nil {return nil, status.Error(500, err.Error())}// 开启子事务屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) error {// 加密密码pwd, _ := bcrypt.GetPwd(in.Password)// 插入用户数据_, err = tx.Exec("INSERT INTO users (id , created_at, updated_at, username, password, avatar, phone) VALUES (?,?, ?, ?, ?, ?, ?)", in.Id, time.Now(), time.Now(), in.Username, pwd, in.Avatar, in.Phone)//返回子事务执行失败if err != nil {return err}return nil})if err != nil {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) //如果失败,不再重试,直接回滚}return &user.UserCreateResponse{}, endErr
}

用户注册服务rpc的失败补偿 (如果注册服务的rpc失败,就会执行相应的补偿方法)

func (l *UserCreateRevertLoginLogic) UserCreateRevertLogin(in *user.UserCreateRequest) (pd *user.UserCreateResponse, err error) {fmt.Println("用户标签回滚开始--->")// 获取 RawDBdb, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()// 获取子事务屏障对象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)if err != nil {return nil, status.Error(500, err.Error())}// 开启子事务屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) error {fmt.Println("注册事务走入了补偿")//删除插入的标签数据 和 用户数据_, err = tx.Exec("DELETE FROM tb_user_tag where user_id = ?", in.Id)_, err = tx.Exec("DELETE FROM users where id = ?", in.Id)//返回子事务执行失败if err != nil {return err}return nil})if err != nil {fmt.Println("failed---->", err)return nil, err}fmt.Println("删除成功")fmt.Println("用户标签回滚结束--->")return &user.UserCreateResponse{}, nil
}

        标签服务Rpc的编写以及事务失败补偿机制的编写

标签服务的rpc

func (l *SignUserChooseTagLogic) SignUserChooseTag(in *tag.UserChooseTagRequest) (*tag.UserChooseTagRequest, error) {// 获取 RawDB// 注册账号时,选择标签db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()if err != nil {return nil, status.Error(500, err.Error())}// 获取子事务屏障对象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)// 开启子事务屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) (err error) {// 用户注册时选择标签var exists boolerr = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM tb_user_tag WHERE tag_id = ? and user_id = ?)", in.TagId, in.UserId).Scan(&exists)if err != nil {return err}if exists {return fmt.Errorf("标签重复选择")}fmt.Println("开始插入标签")_, err = tx.Exec("INSERT INTO tb_user_tag (tb_user_tag.created_at , tb_user_tag.updated_at , tag_id, user_id) VALUES (?,?,?, ?)", time.Now(), time.Now(), in.TagId, in.UserId)if err != nil {return fmt.Errorf("标签选择失败")}return nil})if err != nil {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) //事务失败不再重试,直接回滚}return &tag.UserChooseTagRequest{}, nil
}

标签选择失败补偿的rpc

func (l *SignUserChooseTagRevertLogic) SignUserChooseTagRevert(in *tag.UserChooseTagRequest) (*tag.UserChooseTagRequest, error) {fmt.Println("用户标签SignUserChooseTagRevert--->开始")// 获取 RawDBdb, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()if err != nil {return nil, status.Error(500, err.Error())}// 获取子事务屏障对象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)// 开启子事务屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) (err error) {fmt.Println("注册时选择标签进入了补偿")logc.Info(l.ctx)//删除记录_, err = tx.Exec("DELETE FROM tb_user_tag where tag_id = ? and user_id = ?", in.TagId, in.UserId)return err})if err != nil {return nil, err}fmt.Println("用户标签SignUserChooseTagRevert--->结束")return &tag.UserChooseTagRequest{}, nil
}

至此rpc层的业务逻辑全部编写完毕,但请一定要注意每一个rpc的返回值,一定要按照 如&tag.UserChooseTagRequest{}返回,不能简单的返回一个nil值.否则会导致事务一直无法提交

        API层的编写


func (l *SignUpLogic) SignUp(req *types.UserCreateRequest) (resp *types.UserCreateResponse, err error) {//首先判断用户是否存在_, err = l.svcCtx.UserRpc.UserIsExists(l.ctx, &user.UserCreateRequest{Phone: req.Phone,})if err != nil {return nil, err}// 获取UserRpc 的BuildTargetuserRpcBuildServer, err := l.svcCtx.Config.UserRpc.BuildTarget()if err != nil {return nil, status.Error(100, "用户注册异常")}// 获取TagRpc 的BuildTargettagRpcBuildServer, err := l.svcCtx.Config.TagRpc.BuildTarget()if err != nil {return nil, status.Error(100, "标签选择异常")}empty := user.Empty{}//dtm服务的etcd注册地址var dtmServer = l.svcCtx.Config.Dtm//dtmServer := "etcd://etcd:2379/dtmservice"fmt.Println(dtmServer)// 创建一个gidgid := dtmgrpc.MustGenGid(dtmServer)//创建一个自增idif _, err := l.svcCtx.UserRpc.AddUserId(l.ctx, &empty); err != nil {return nil, fmt.Errorf("CREATE user id error:%v", err)}userID, _ := l.svcCtx.UserRpc.NextUserID(l.ctx, &empty)saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).Add(tagRpcBuildServer+"/tag.TagSign/SignUserChooseTag", tagRpcBuildServer+"/tag.TagSign/SignUserChooseTagRevert", &tag.UserChooseTagRequest{UserId: userID.NextUserId,TagId:  req.StartTagId,}).Add(userRpcBuildServer+"/user.UserService/UserCreate", userRpcBuildServer+"/user.UserService/UserCreateRevertLogin", &user.UserCreateRequest{Username: req.Username,Password: req.Password,Avatar:   req.Avatar,Phone:    req.Phone,Id:       userID.NextUserId,})//事务提交if err := saga.Submit(); err != nil {//自增主键减少1if _, err := l.svcCtx.UserRpc.DecUserID(l.ctx, &empty); err != nil {logx.Error(err)}logx.Error(err)return nil, fmt.Errorf("saga submit error:%v", err)}return &types.UserCreateResponse{}, nil
}

上面代码的逻辑,相信如果各位接触到微服务,一定是可以理解的,由于saga的事务模式没有返回值,所以我通过redis生成一个自增id来使用,而不再采用mysql的自增主键id.

上面代码中的地址可以在rpc生成的pb.go中找到

自己的理解

        经历了长度一周多对分布式事务的研究,写一点自己的简单理解吧!(比较浅显)

        saga事务模式需要自己写事务的补偿方法,子事务屏障内的事务执行失败之后,就会执行对应的事务补偿方法!即回滚事务.补偿方法内写的便是对这一次执行的插入,修改语句的相反操作.比如我增加某一条数据,补偿内就写上对删除的操作.

        感觉和MySQL的Undo log 回滚日志很相似啊!Undo log日志会记录更新前的数据到日志中,是在一个事务下执行过程中,在还没有提交之前,如果发生意外,就可以通过这个日志回滚到事务执行之前的数据了.

        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/717993.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows 2012 设置 nginx 开机自启动(适用于windows2012/10)

Windows 2012 设置 nginx 开机自启动&#xff08;适用于windows2012/10&#xff09;https://www.cnblogs.com/xuegqcto/articles/7521483.html 在windows server 2012上安装nginx&#xff0c;同时配置开机自启动服务&#xff08;推荐使用“Windows Service Wrapper”工具&…

【Linux】线程概念|线程理解|线程控制

文章目录 线程概念Linux中线程是否存在的讨论线程创建和线程控制线程的终止和等待&#xff08;三种终止方式 pthread_join()的void**retval&#xff09; 线程概念 线程就是进程内部的一个执行流&#xff0c;线程在进程内运行&#xff0c;线程在进程的地址空间内运行&#xff0…

LeetCode-第14题-最长公共前缀

1.题目描述 编写一个函数来查找字符串数组中的最长公共前缀。 如果不存在公共前缀&#xff0c;返回空字符串 ""。 2.样例描述 3.思路描述 按字符串数组每个数组的长度&#xff0c;将字符串数组从小到大排序&#xff1b;他们的公共前缀一定小于或等于最长元素长度…

2024年智能驾驶年度策略:自动驾驶开始由创造型行业转向工程型行业

感知模块技术路径已趋于收敛&#xff0c;自动驾驶从创造型行业迈向工程型行业。在特斯拉的引领下&#xff0c;国内主机厂2022年以来纷纷跟随特斯拉相继提出“重感知、轻地图”技术方案&#xff0c;全球自动驾驶行业感知模块技术路径从百花齐放开始走向收敛。我们认为主机厂智能…

2023.3.3周报

目录 摘要 一、文献阅读 1、题目 2、摘要 3、模型架构 4、文献解读 一、Introduction 二、实验 三、结论 二、PINN 一、PINN比传统数值方法有哪些优势 二、PINN方法 三、正问题与反问题 三、PINN实验 一、数学方程 二、模型搭建 总结 摘要 本周我阅读了一篇…

Postman上传文件的操作方法

前言 调用某个接口&#xff0c;测试上传文件功能。一时间不知如何上传文件&#xff0c;本文做个操作记录&#xff0c;期望与你有益。 步骤一、设置Headers key:Content-Type value:multipart/form-data 步骤二、设置Body 选择form-data key:file下拉框选择file类型value&…

STM32(8)NVIC编程

中断源由部分片上外设产生 在misc.h中找&#xff0c;杂项 配置NVIC GPIO和AFIO不能产生中断源&#xff0c;但能通过EXTI&#xff0c;由EXTI产生中断源 NVIC不需要开启时钟&#xff0c;因为NVIC模块位于内核内部&#xff0c;芯片一上电就能工作。 中断响应函数 中断向量表在启…

Java:JVM基础

文章目录 参考JVM内存区域程序计数器虚拟机栈本地方法栈堆方法区符号引用与直接引用运行时常量池字符串常量池直接内存 参考 JavaGuide JVM内存区域 程序计数器 程序计数器是一块较小的内存空间&#xff0c;可以看做是当前线程所执行的字节码的行号指示器&#xff0c;各线程…

Unity 常用的4种灯光、制作镜子、灯光的调用修改数值、

创建灯光时&#xff0c;一般用4种&#xff1a;定向光、点光源、聚光、区域光、 定向光&#xff1a;太阳 点光源&#xff1a;灯泡 聚光灯&#xff1a;手电筒 区域光&#xff1a;烘焙-贴图 灯光选择已烘焙 需要先选择被烘焙的物体&#xff0c;然后再选择Contribute GI 等待进…

java中的set

Set Set集合概述和特点 不可以存储重复元素 没有索引,不能使用普通for循环遍历 哈希值 哈希值简介 是JDK根据对象的地址或者字符串或者数字算出来的int类型的数值 如何获取哈希值 Object类中的public int hashCode()&#xff1a;返回对象的哈希码值。 哈希值的特点 同一个…

分布式ID生成算法|雪花算法 Snowflake | Go实现

写在前面 在分布式领域中&#xff0c;不可避免的需要生成一个全局唯一ID。而在近几年的发展中有许多分布式ID生成算法&#xff0c;比较经典的就是 Twitter 的雪花算法(Snowflake Algorithm)。当然国内也有美团的基于snowflake改进的Leaf算法。那么今天我们就来介绍一下雪花算法…

计算机视觉基础知识(二)---数字图像

像素 像素是分辨率的单位;构成位图图像的最基本单元;每个像素都有自己的颜色; 图像分辨率 单位英寸内的像素点数;单位为PPI(Pixels Per Inch),为像素每英寸;PPI表示每英寸对角线上所拥有的像素数目:,x:长度像素数目,y:宽度像素数目,Z:屏幕大小;屏幕尺寸(大小)指的是对角线长…

springer模板参考文献不显示

Spring期刊模板网站&#xff0c;我的问题是23年12月的版本 https://www.springernature.com/gp/authors/campaigns/latex-author-support/see-where-our-services-will-take-you/18782940 参考文献显示问好&#xff0c;在sn-article.tex文件中&#xff0c;这个sn-mathphys-num…

数据结构c版(3)——排序算法

本章我们来学习一下数据结构的排序算法&#xff01; 目录 1.排序的概念及其运用 1.1排序的概念 1.2 常见的排序算法 2.常见排序算法的实现 2.1 插入排序 2.1.1基本思想&#xff1a; 2.1.2直接插入排序&#xff1a; 2.1.3 希尔排序( 缩小增量排序 ) 2.2 选择排序 2.2…

rtt的io设备框架面向对象学习-io设备管理层

目录 1.设备基类2.rtt基类2.1 rtt基类定义2.2 对象容器定义2.3 rtt基类构造函数 3.io设备管理接口4.总结 这层我的理解就是rtt基类和设备基类所在&#xff0c;所以抽离出来好点&#xff0c;不然每个设备类都要重复它。 1.设备基类 /include/rtdef.h中定义了设备基类struct rt_…

STM32(11)按键产生中断

1.初始化IO引脚&#xff0c;设置模式&#xff0c;速度等 2.设置AFIO&#xff08;配置EXTI的引脚映射&#xff09;&#xff0c;记得开启时钟 3.配置EXTI的通道&#xff08;EXTI0和EXTI1&#xff09; 4.配置NVIC 4.1 中断优先级分组 4.2 配置中断 5.编写中断响应函数 在中断向量…

蓝桥ACM培训-实战1

前言&#xff1a; 今天老师没讲课&#xff0c;只让我们做了一下几道题目。 正文&#xff1a; Problem:A 小蓝与操作序列&#xff1a; #include<bits/stdc.h> using namespace std; stack<int> a; int main(){int n,flag1,ans;string cz;cin>>n;for(int i1;…

访问修饰符、Object(方法,使用、equals)、查看equals底层、final--学习JavaEE的day15

day15 一、访问修饰符 含义&#xff1a; 修饰类、方法、属性&#xff0c;定义使用的范围 理解&#xff1a;给类、方法、属性定义访问权限的关键字 注意&#xff1a; ​ 1.修饰类只能使用public和默认的访问权限 ​ 2.修饰方法和属性可以使用所有的访问权限 访问修饰符本类本包…

JetCache源码解析——API实现(持续更新中……)

在JetCache中不仅可以通过在类和接口的函数上使用注解Cached、CacheUpdate和CacheInvalidate等实现缓存加载、更新和删除操作&#xff0c;也支持通过调用API接口的形式来实现缓存的加载、更新和删除操作。 缓存接口 缓存接口的定义如下&#xff1a; /*** 缓存接口&#xff0…

【计算机网络】HTTPS 协议原理

https 一、HTTPS 是什么二、加密1. 加密概念2. 加密的原因3. 常见的加密方式&#xff08;1&#xff09;对称加密&#xff08;2&#xff09;非对称加密 三、数据摘要(数据指纹)四、HTTPS 的工作原理探究1. 只使用对称加密2. 只使用非对称加密3. 双方都使用非对称加密4. 非对称加…