集成DTM实现跨语言分布式事务V1.0
简介
DTM是一款开源的分布式事务管理器,解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。
通俗一点说,DTM提供跨服务事务能力,一组服务要么全部成功,要么全部回滚,避免只更新了一部分数据产生的一致性问题。
https://dtm.pub/
安装运行DTM
以下安装可以使用Dockerfile的方式与dmc服务一起部署,另外官网介绍的docker安装模式我没成功!
安装go语言环境
#下载安装go语言包,请使用1.20以上版本,其他版本安装失败
wget -c https://dl.google.com/go/go1.20.1.linux-amd64.tar.gz -O - | sudo tar -xz -C /usr/localcd /usr/local/
mkdir gopath vi /etc/profile #在/etc/profile添加go环境配置export GOROOT=/usr/local/go
export GOPATH=/usr/local/gopath
export PATH=$PATH:$GOROOT/bin
export GO111MODULE="on" # 开启 Go moudles 特性
export GOPROXY=https://goproxy.cn,direct # 安装 Go 模块时,国内代理服务器设置#查看版本
go version
安装dtm服务
git clone https://github.com/dtm-labs/dtm && cd dtm
go build#运行./dtm,看到如下说明成功
[root@paratera128 dtm]# ./dtm
invalid log level: , switching to default: INFO
{"level":"info","ts":"2023-07-25T18:54:17.816-0700","caller":"entry/main.go:46","msg":"dtm version is: v0.0.0-dev"}
{"level":"info","ts":"2023-07-25T18:54:17.818-0700","caller":"config/config.go:121","msg":"config file: loaded config is: \n{\n \"Store\": {\n \"Driver\": \"boltdb\",\n \"Host\": \"\",\n \"Port\": 0,\n \"User\": \"\",\n \"Password\": \"\",\n \"Db\": \"dtm\",\n \"Schema\": \"public\",\n \"MaxOpenConns\": 500,\n \"MaxIdleConns\": 500,\n \"ConnMaxLifeTime\": 5,\n \"DataExpire\": 604800,\n \"FinishedDataExpire\": 86400,\n \"RedisPrefix\": \"{a}\"\n },\n \"TransCronInterval\": 3,\n \"TimeoutToFail\": 35,\n \"RetryInterval\": 10,\n \"RequestTimeout\": 3,\n \"HTTPPort\": 36789,\n \"GrpcPort\": 36790,\n \"JSONRPCPort\": 36791,\n \"MicroService\": {\n \"Driver\": \"default\",\n \"Target\": \"\",\n \"EndPoint\": \"\"\n },\n \"HTTPMicroService\": {\n \"Driver\": \"default\",\n \"RegistryType\": \"\",\n \"RegistryAddress\": \"\",\n \"RegistryOptions\": \"{}\",\n \"Target\": \"\",\n \"EndPoint\": \"\"\n },\n \"UpdateBranchSync\": 0,\n \"UpdateBranchAsyncGoroutineNum\": 1,\n \"LogLevel\": \"info\",\n \"Log\": {\n \"Outputs\": \"stderr\",\n \"RotationEnable\": 0,\n \"RotationConfigJSON\": \"{}\"\n },\n \"TimeZoneOffset\": \"\",\n \"ConfigUpdateInterval\": 3,\n \"AlertRetryLimit\": 3,\n \"AlertWebHook\": \"\",\n \"AdminBasePath\": \"\"\n}"}
{"level":"info","ts":"2023-07-25T18:54:17.820-0700","caller":"maxprocs/maxprocs.go:47","msg":"maxprocs: Leaving GOMAXPROCS=1: CPU quota undefined"}
{"level":"info","ts":"2023-07-25T18:54:17.827-0700","caller":"dtmsvr/svr.go:32","msg":"start dtmsvr"}
{"level":"info","ts":"2023-07-25T18:54:17.828-0700","caller":"dtmsvr/svr.go:51","msg":"dtmsvr http listen at: 36789"}
{"level":"info","ts":"2023-07-25T18:54:17.830-0700","caller":"dtmsvr/svr.go:65","msg":"grpc listening at [::]:36790"}
{"level":"info","ts":"2023-07-25T18:54:17.931-0700","caller":"dtmsvr/svr.go:80","msg":"RegisterService: default"}
{"level":"info","ts":"2023-07-25T18:54:17.932-0700","caller":"dtm/main.go:96","msg":"admin is proxied to admin.dtm.pub"}
{"level":"info","ts":"2023-07-25T18:54:17.933-0700","caller":"dtm/main.go:98","msg":"admin is running at: http://localhost:36789"}
访问:http://localhost:36789,可以看到所有的事务状态和过程数据
数据存储
DTM部署后会默认将数据进行文件的形式存储,文件名:dtm.bolt,当然,你也可以通过修改conf.yml文件以关系型数据库,redis等方式存储
如下提供MySQL的数据库脚本,可以根据需要定制化监控处理功能,并且DTM提供了相应的API供服务方进行RM表的数据保存
RM表(资源管理器)
CREATE TABLE dtm.barrier (`id` bigint(22) NOT NULL AUTO_INCREMENT,`trans_type` varchar(45) DEFAULT '' COMMENT '事务模式',`gid` varchar(128) DEFAULT '' COMMENT '全局事务id',`branch_id` varchar(128) DEFAULT '' COMMENT '分支事务id',`op` varchar(45) DEFAULT '' COMMENT '事务操作', `barrier_id` varchar(45) DEFAULT '' COMMENT '分支事务层级,如果分支事务包含事务会递增',`reason` varchar(45) DEFAULT '' COMMENT '插入此记录的分支类型,如果成功与op相同,如果失败与op相反',`create_time` datetime DEFAULT CURRENT_TIMESTAMP,`update_time` datetime DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `gid` (`gid`,`branch_id`,`op`,`barrier_id`),KEY `create_time` (`create_time`),KEY `update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=44 DEFAULT CHARSET=utf8;
TM表(事务管理器)
CREATE TABLE if not EXISTS dtm.trans_global (`id` bigint(22) NOT NULL AUTO_INCREMENT,`gid` varchar(128) NOT NULL COMMENT 'global transaction id',`trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg',`status` varchar(12) NOT NULL COMMENT 'transaction status: prepared | submitted | aborting | succeed | failed',`query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for msg|workflow',`protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc',`create_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,`finish_time` datetime DEFAULT NULL,`rollback_time` datetime DEFAULT NULL,`options` varchar(1024) DEFAULT '' COMMENT 'options for transaction like: TimeoutToFail, RequestTimeout',`custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction',`next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job',`next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job',`owner` varchar(128) not null default '' comment 'who is locking this trans',`ext_data` TEXT comment 'extra data for this trans. currently used in workflow pattern',`result` varchar(1024) DEFAULT '' COMMENT 'result for transaction',`rollback_reason` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',PRIMARY KEY (`id`),UNIQUE KEY `gid` (`gid`),key `owner`(`owner`),key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans'
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (`id` bigint(22) NOT NULL AUTO_INCREMENT,`gid` varchar(128) NOT NULL COMMENT 'global transaction id',`url` varchar(1024) NOT NULL COMMENT 'the url of this op',`data` TEXT COMMENT 'request body, depreceated',`bin_data` BLOB COMMENT 'request body',`branch_id` VARCHAR(128) NOT NULL COMMENT 'transaction branch ID',`op` varchar(45) NOT NULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel',`status` varchar(45) NOT NULL COMMENT 'transaction op status: prepared | succeed | failed',`finish_time` datetime DEFAULT NULL,`rollback_time` datetime DEFAULT NULL,`create_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;CREATE TABLE IF NOT EXISTS dtm.kv (`id` bigint(22) NOT NULL AUTO_INCREMENT,`cat` varchar(45) NOT NULL COMMENT 'the category of this data',`k` varchar(128) NOT NULL,`v` TEXT,`version` bigint(22) default 1 COMMENT 'version of the value',create_time datetime default NULL,update_time datetime DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE key `uniq_k`(`cat`, `k`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
DTM入门示例
maven依赖
<dependency><groupId>io.github.dtm-labs</groupId><artifactId>dtmcli-java</artifactId><version>2.1.4</version>
</dependency>
application.yml
server:port: 8081#DTM服务地址
dtm:ipport: '192.168.137.128:36789'spring:datasource:url: jdbc:mysql://182.92.67.160:3316/dtm?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTCusername: rootpassword: parateradriver-class-name: com.mysql.cj.jdbc.Driver
Saga模式
将长事务拆分为多个分支事务,由Saga事务协调器协调,如果每个分支事务都成功提交完成,那么全局事务就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。
时序图
成功完成
失败回滚
模拟正向操作及反向补偿
@RequestMapping("TransOut")
public Object TransOut() {try {logger.info("TransOut");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransOutCompensate")
public Object TransOutCompensate() {try {logger.info("TransOutCompensate");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransIn")
public Object TransIn() {try {logger.info("TransIn");//int i = 1/0 ;模拟异常时回滚return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransInCompensate")
public Object TransInCompensate() {try {logger.info("TransInCompensate");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}
执行全局事务
//这个地方如果DTM服务跟项目不是同一台机器,请填写IP地址,并保障可以ping通
private static final String svc = "http://192.168.110.174:8081/api";@Value("${dtm.ipport}")
private String ipPort;@RequestMapping("testSaga")
public String testSage() {DtmClient dtmClient = new DtmClient(ipPort);try {// 全局事务id,保证唯一即可String customGid = UUID.randomUUID().toString();Saga saga = dtmClient.newSaga(customGid)//按业务顺序将正向操作和反向补偿操作注入到Saga流中,通过事务协调器执行,第三个参数为分支接口传参(正向操作和反向补偿传参都是一样的).add(svc + "/TransOut", svc + "/TransOutCompensate", null).add(svc + "/TransIn", svc + "/TransInCompensate", null).enableWaitResult();saga.submit();} catch (Exception e) {log.error("saga submit error", e);return "fail";}return "success";
}
成功完成:
失败回滚:
TCC模式
TCC是Try、Confirm、Cancel三个词语的缩写
- Try 阶段:尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性)
- Confirm 阶段:如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源
- Cancel 阶段:如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。
时序图
成功完成
失败回滚
模拟三阶段事务
@RequestMapping("TransOutTry")
public Object TransOutTry() {try {logger.info("TransOutTry");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransOutConfirm")
public Object TransOutConfirm() {try {logger.info("TransOutConfirm");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransOutCancel")
public Object TransOutCancel() {try {logger.info("TransOutCancel");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransInTry")
public Object TransInTry() {try {logger.info("TransInTry");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransInConfirm")
public Object TransInConfirm() {try {logger.info("TransInConfirm");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransInCancel")
public Object TransInCancel() {try {logger.info("TransInCancel");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}
注册执行事务分支
private static final String svc = "http://192.168.110.174:8081/api";@Value("${dtm.ipport}")
private String ipport;@RequestMapping("testTcc")
public String testTcc() {//创建dtm clinetDtmClient dtmClient = new DtmClient(ipport);//创建tcc事务String customGid = UUID.randomUUID().toString();try {dtmClient.tccGlobalTransaction(customGid, tcc -> {//第一个参数为接口传参,三阶段对应的都是相同的参数tcc.callBranch(null, svc + "/TransOutTry", svc + "/TransOutConfirm", svc + "/TransOutCancel");tcc.callBranch(null, svc + "/TransInTry", svc + "/TransInConfirm", svc + "/TransInCancel");});} catch (Exception e) {log.error("tccGlobalTransaction error", e);return "fail";}return "success";
}
成功完成:
失败回滚:
操作异常思考
假如Compensate/Confirm/Cancel操作遇见失败会怎么样?按照业务的思路,Compensate/Confirm/Cancel操作是要求最终成功的,遇见失败的情况,都是由于临时故障或者程序bug。dtm在Compensate/Confirm/Cancel操作遇见失败时,会不断进行重试(时间间隔为10 * 2的幂次方),直到成功,当然,如果是因为业务条件不满足等非技术环境问题,不应该进入重试,这一点应该在程序编写时充分考虑。
为了避免程序bug导致补偿操作一直无法成功,可以通过DTM管理控制台进行监控,由运维人员手动后台处理,强制停止!,不过这个目前做得比较粗糙,凑合着用!
另外如果对业务没有特别的要求,可以暴力设置超时回滚:
saga.setTimeoutToFail(1800);
Saga与TCC模式对比
其实还有两阶段、XA模式等,因为并发锁的原因,性能比较低,特别是对于死锁的处理需要大量的人工介入
Saga | TCC | |
---|---|---|
难度 | 简单,只需要正反两个分支事务 | 复杂,需要Try、Confirm、Cancel三个分支事务 |
一致性 | 低,缺少资源预留,并发度高时可能出现脏读 | 高,一开始就会对所有资源进行预留(try) |
并发执行能力 | 支持并发和按序执行 | 不支持并发 |
事务嵌套 | 不支持 | 支持 |
先易后难,我们后续的功能开发先基于Saga模式进行
DMC平台集成分布式事务V1.0
数据库设计
DROP TABLE IF EXISTS trans_setting;
CREATE TABLE IF NOT EXISTS trans_setting (id bigint(22) NOT NULL AUTO_INCREMENT,call_url varchar(45) NOT NULL COMMENT '全局事务执行标志,原则上只需要保证唯一性即可,建议使用项目名+模块名+方法名,如console.user.getUser',create_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,updated_at datetime ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `call_url` (`call_url`)
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '全局事务执行表' ROW_FORMAT = Compact;DROP TABLE IF EXISTS trans_setting_detail;
CREATE TABLE IF NOT EXISTS trans_setting_detail (id bigint(22) NOT NULL AUTO_INCREMENT,fid bigint(22) NOT NULL COMMENT '全局事务URL表主键',branch_url varchar(100) NOT NULL COMMENT '分支url,完整地址:ip、port、restful接口',branch_rb_url varchar(100) NOT NULL COMMENT '分支回滚url,完整地址:ip、port、restful接口',create_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,updated_at datetime ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `fid_branch_url` (`fid`,`branch_url`)
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '分支事务URL表' ROW_FORMAT = Compact;
增删改查
略…
DTM服务地址配置
dtm:url: 192.168.137.128:36789
全局事务执行接口
通过restful接口的方式,兼容跨语言、服务、数据库的全局事务处理
/*** 分布式事务统一调度接口** @param callUrl 配置的全局事务执行标志* @param postData 分支事务接口参数* @return*/
@PostMapping("/s-api/globalTransaction")
public String execGlobalTransaction(String callUrl, Object postData) {return transactionService.execGlobalTransaction(callUrl, postData);
}
@Value("${dtm.url}")
private String dtmUrl;public String execGlobalTransaction(String callUrl, Object postData) {QueryWrapper wrapper = new QueryWrapper();wrapper.eq("call_url", callUrl);TransSetting transSetting = settingDao.selectOne(wrapper);wrapper.clear();wrapper.eq("fid", transSetting.getId());List<TransSettingDetail> list = settingDetailDao.selectList(wrapper);DtmClient dtmClient = new DtmClient(dtmUrl);try {//使用callUrl + 时间戳 + 四位随机数作为gid,比较好定位问题String time = DateUtil.date2String(Calendar.getInstance().getTime(), DateUtil.DATE_TIME);int number = new Random().nextInt(9000) + 1000;String customGid = callUrl + ":" + time + ":" + number;Saga saga = dtmClient.newSaga(customGid);//循环遍历添加分支urllist.forEach(detail -> {saga.add(detail.getBranchUrl(), detail.getBranchRbUrl(), postData);});saga.enableWaitResult();saga.submit();} catch (Exception e) {log.error("saga submit error", e);return "fail";}return "success";
}
调度测试
从DTM控制台可以看到,调度成功并且没有触发回滚:
我们人为在转入分支制造异常,再次测试:
调度失败,发生回滚: