目录
引言:
1. TCC事务模式
2. TCC组成
3. TCC执行流程
3.1 TCC正常执行流程
3.2 TCC失败回滚
4. Confirm/Cancel操作异常
5. TCC 设计原则
5.1 TCC如何做到更好的一致性
5.2 为什么只适合短事务
6. 嵌套的TCC
7. .NET CORE结合DTM实现TCC分布式事务
7.1 轮子小卖部(Nuget)引入Dtmcli
7.2 生成转账数据库(EF_CORE)
7.3 数据库持久化
7.4 appsettings.json
7.5 Program.cs
7.6 主程序事务API控制器
7.7 用户1转账事务API控制器
7.8 用户2转账事务API控制器
小结
引言:
紧接上一期.NET CORE 分布式事务(一) DTM实现二阶段提交(.NET CORE 分布式事务(一) DTM实现二阶段提交-CSDN博客)
1. TCC事务模式
什么是TCC,TCC是Try、Confirm、Cancel三个词语的缩写,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。
2. TCC组成
TCC分为3个阶段
- Try 阶段:尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性)
- Confirm 阶段:如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源
- Cancel 阶段:如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。
3. TCC执行流程
3.1 TCC正常执行流程
一般情况下,时序图中的9个步骤会正常完成,整个业务按照预期进行。主程序注册全局事务,以及Try尝试事务Api地址、Confirm提交事务Api地址、Cancel回滚事务Api地址。并开始执行Try尝试事务,Try中的事务要对资源进行预算以及锁定,也就是尝试执行,判断资源是否支持提交执行事务。然后进行提交事务。最终完成全局事务。
3.2 TCC失败回滚
当Try尝试事务异常时与上面的正常流程的区别是,现在不会调用Confirm提交事务,而是调用Cancel回滚事务,对Try尝试事务进行的资源锁定进行解锁释放等操作。回退到全局事务开始前。
4. Confirm/Cancel操作异常
假如Confirm/Cancel操作遇见失败会怎么样?按照Tcc模式的协议,Confirm/Cancel操作是要求最终成功的,遇见失败的情况,都是由于临时故障或者程序bug。dtm在Confirm/Cancel操作遇见失败时,会不断进行重试,直到成功。
为了避免程序bug导致补偿操作一直无法成功,建议开发者对全局事务表进行监控,发现重试超过3次的事务,发出报警,由运维人员找开发手动处理。进行人工干预。
5. TCC 设计原则
在设计上,TCC主要用于处理一致性要求较高、需要较多灵活性的短事务。
5.1 TCC如何做到更好的一致性
对于我们的 A 跨行转账给 B 的场景,如果采用SAGA,在正向操作中调余额,在补偿操作中,反向调整余额,那么会出现这种情况:如果A扣款成功,金额转入B失败,最后回滚,把A的余额调整为初始值。整个过程中如果A发现自己的余额被扣减了,但是收款方B迟迟没有收到资金,那么会对A造成非常大的困扰。
上述需求在SAGA中无法解决,但是可以通过TCC来解决,设计技巧如下:
- 在账户中的 balance 字段之外,再引入一个 trading_balance 字段
- Try 阶段检查账户是否被冻结,检查账户余额是否充足,没问题后,调整 trading_balance (即业务上的冻结资金)
- Confirm 阶段,调整 balance ,调整 trading_balance (即业务上的解冻资金)
- Cancel 阶段,调整 trading_balance (即业务上的解冻资金)
这种情况下,终端用户 A 就不会看到自己的余额扣减了,但是 B 又迟迟收不到资金的情况。
5.2 为什么只适合短事务
TCC 的事务编排放在了应用端上,就是事务一共包含多少个分支,每个分支的顺序什么样,这些信息不会像 SAGA 那样,都发送给dtm服务器之后,再去调用实际的事务分支。当应用出现 crash 或退出,编排信息丢失,那么整个全局事务,就没有办法往前重试,只能够进行回滚。如果全局事务持续时间很长,例如一分钟以上,那么当应用进行正常的发布升级时,也会导致全局事务回滚,影响业务。因此 TCC 会更适合短事务。
那么是否可以把TCC的事务编排都保存到服务器,保证应用重启也不受到影响呢?理论上这种做法是可以解决这个问题的,但是存储到服务器会比在应用端更不灵活,无法获取到每个分支的中间结果,无法做嵌套等等。
考虑到一致性要求较高和短事务是高度相关的(一个中间不一致状态持续很长时间的事务,自然不能算一致性较好),这两者跟“应用灵活编排”,也是有较高相关度,所以将 TCC 实现为应用端编排,而 SAGA 实现为服务端编排。
6. 嵌套的TCC
dtm的Tcc事务模式,支持子事务嵌套,流程图如下:
在这个流程图中,Order这个微服务,管理了订单相关的数据修改,同时还管理了一个嵌套的子事务,因此他即扮演了RM的角色,也扮演了AP的角色。
7. .NET CORE结合DTM实现TCC分布式事务
还是以跨行转账作为例子,给大家详解这种架构。业务场景介绍如下:
我们需要跨行从A转给B 30元,我们先进行可能失败的转出操作TccUserTry,即进行A扣减30元。如果A因余额不足扣减失败,那么转账直接失败,返回错误;如果扣减成功,那么进行下一步转入操作,因为转入操作没有余额不足的问题,可以假定转入操作一定会成功。
7.1 轮子小卖部(Nuget)引入Dtmcli
<ItemGroup><PackageReference Include="Dtmcli" Version="1.4.0" /></ItemGroup>
7.2 生成转账数据库(EF_CORE)
数据库模型
//模型
public partial class UserMoney
{public int id { get; set; }public int money { get; set; }public int trading_balance { get; set; }public int balance { get; set; }public int trymoney { get; set; }public string guid { get; set; }
}
DbContext
public class DtmDbContext : DbContext{public DtmDbContext() { }public DtmDbContext(DbContextOptions<DtmDbContext> options) : base(options) { }public virtual DbSet<UserMoney> UserMoney { get; set; }protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder){optionsBuilder.UseMySql("server=localhost;port=3307;user id=root;password=123;database=DTM_Test", ServerVersion.Parse("8.0.23-mysql")).UseLoggerFactory(LoggerFactory.Create(option =>{option.AddConsole();}));}protected override void OnModelCreating(ModelBuilder modelBuilder){modelBuilder.UseCollation("utf8_general_ci").HasCharSet("utf8");modelBuilder.Entity<UserMoney>(entity =>{entity.ToTable("UserMoney");});}}
7.3 数据库持久化
CREATE TABLE
IFNOT EXISTS DTM_Test.barrier (id BIGINT ( 22 ) PRIMARY KEY AUTO_INCREMENT,trans_type VARCHAR ( 45 ) DEFAULT '',gid VARCHAR ( 128 ) DEFAULT '',branch_id VARCHAR ( 128 ) DEFAULT '',op VARCHAR ( 45 ) DEFAULT '',barrier_id VARCHAR ( 45 ) DEFAULT '',reason VARCHAR ( 45 ) DEFAULT '' COMMENT 'the branch type who insert this record',create_time datetime DEFAULT now( ),update_time datetime DEFAULT now( ),KEY ( create_time ),KEY ( update_time ),UNIQUE KEY ( gid, branch_id, op, barrier_id ) ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4;
数据库最终生成:
7.4 appsettings.json
{"AllowedHosts": "*","ConnectionString": "server=localhost;port=3307;user id=root;password=123;database=test","DtmUrl": "http://localhost:36789","TransactionUrl": "http://localhost:5016","Logging": {"LogLevel": {"Default": "Information","Microsoft.AspNetCore": "Warning"}}
}
7.5 Program.cs
// 注册DbContextbuilder.Services.AddDbContext<DtmDbContext>(options =>{options.UseMySql(builder.Configuration.GetValue<string>("ConnectionString"), ServerVersion.Parse("8.0.23-mysql"));});// 注册dtmbuilder.Services.AddDtmcli(dtm =>{dtm.DtmUrl = builder.Configuration.GetValue<string>("DtmUrl");dtm.DBType = "mysql";dtm.BarrierTableName = "dtm_test.barrier";});
7.6 主程序事务API控制器
using DTM_EF.Model;
using Dtmcli;
using Microsoft.AspNetCore.Mvc;
using System.Threading;namespace Dtm_TCC.Controllers
{[ApiController][Route("[controller]")]public class DtmTccController : ControllerBase{private readonly ILogger<DtmTccController> _logger;private readonly TccGlobalTransaction _globalTransaction;private readonly IDtmClient _dtmClient;private readonly IConfiguration _configuration;public DtmTccController(ILogger<DtmTccController> logger,TccGlobalTransaction globalTransaction,IDtmClient dtmClient,IConfiguration configuration){_logger = logger;_globalTransaction = globalTransaction;_dtmClient = dtmClient;_configuration = configuration;}[HttpPost(Name = "DtmTcc")]public async Task<IActionResult> DtmTcc(){var transactionurl = _configuration.GetValue<string>("TransactionUrl");// 创建CancellationToken用于取消事务CancellationToken cancellationToken = new CancellationToken();// 生成全局事务IDvar gid = await _dtmClient.GenGid(cancellationToken);UserMoney body = new UserMoney() { id = 1, trymoney = -30, guid = string.Empty };UserMoney body2 = new UserMoney() { id = 2, trymoney = 30, guid = string.Empty };await _globalTransaction.Excecute(/*gid,*/ async (tcc) =>{// 用户1 转出30元 第一个参数是try检测及冻结阶段,第二个是提交,第三个是回滚var res1 = await tcc.CallBranch(body,transactionurl + "/TccUserTry",transactionurl + "/TccUserConfirm",transactionurl + "/TccUserCancel", cancellationToken);// 用户2 转入30元var res2 = await tcc.CallBranch(body2,transactionurl + "/TccUser2Try",transactionurl + "/TccUser2Confirm",transactionurl + "/TccUser2Cancel", cancellationToken);}, cancellationToken);return Ok(TransResponse.BuildSucceedResponse());}}
}
7.7 用户1转账事务API控制器
using DTM_EF;
using DTM_EF.Model;
using Dtmcli;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using MySqlConnector;namespace Dtm_TCC.Controllers
{[Route("api/[controller]")][ApiController]public class UserController : ControllerBase{private readonly IBranchBarrierFactory _barrierFactory;private readonly ILogger<UserController> _Logger;private readonly DtmDbContext _dtmDbContext;public UserController(IBranchBarrierFactory barrierFactory,ILogger<UserController> Logger,DtmDbContext dtmDbContext){_barrierFactory = barrierFactory;_Logger = Logger;_dtmDbContext = dtmDbContext;}[HttpPost][Route("/TccUserTry")]public async Task<IActionResult> TccUserTry([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildFailureResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){await branchBarrier.Call(conn, async (tx) =>{//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id ).FirstOrDefault();//判断预算--不够回滚if (UserMoney == null || UserMoney!.money + body.trymoney < 0) obj = TransResponse.BuildFailureResponse();else{//修改信息准备提交UserMoney!.balance = 1;UserMoney.trading_balance = 1;UserMoney.trymoney = body.trymoney;UserMoney.guid = gid;_dtmDbContext.SaveChanges();obj = TransResponse.BuildSucceedResponse();}await Task.CompletedTask;});}_Logger.LogInformation($"{gid}--Try成功");return Ok(obj);}[HttpPost][Route("/TccUserConfirm")]public async Task<IActionResult> TccUserConfirm([FromQuery] string gid, [FromQuery] string trans_type,[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildFailureResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){await branchBarrier.Call(conn, async (tx) =>{//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();if (UserMoney != null){UserMoney!.balance = 0;UserMoney!.trading_balance = 0;UserMoney!.money += UserMoney!.trymoney;UserMoney!.trymoney = 0;UserMoney!.guid = string.Empty;_dtmDbContext.SaveChanges();obj = TransResponse.BuildSucceedResponse();}//修改信息准备提交 await Task.CompletedTask;});}_Logger.LogInformation($"{gid}--Confirm成功");return Ok(obj);}[HttpPost][Route("/TccUserCancel")]public async Task<IActionResult> TccUserCancel([FromQuery] string gid, [FromQuery] string trans_type,[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildFailureResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){//操作回滚并解锁await branchBarrier.Call(conn, async (tx) =>{//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id ).FirstOrDefault();if (UserMoney != null){UserMoney!.balance = 0;UserMoney!.trading_balance = 0;UserMoney!.trymoney = 0;UserMoney!.guid = string.Empty;_dtmDbContext.SaveChanges();obj = TransResponse.BuildSucceedResponse();}await Task.CompletedTask;});}_Logger.LogInformation($"{gid}--Cancel成功");return Ok(obj);}}
}
7.8 用户2转账事务API控制器
using DTM_EF;
using DTM_EF.Model;
using Dtmcli;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using MySqlConnector;namespace Dtm_TCC.Controllers
{[Route("api/[controller]")][ApiController]public class User2Controller : ControllerBase{private readonly IBranchBarrierFactory _barrierFactory;private readonly ILogger<User2Controller> _Logger;private readonly DtmDbContext _dtmDbContext;public User2Controller(IBranchBarrierFactory barrierFactory,ILogger<User2Controller> Logger,DtmDbContext dtmDbContext){_barrierFactory = barrierFactory;_Logger = Logger;_dtmDbContext = dtmDbContext;}[HttpPost][Route("/TccUser2Try")]public async Task<IActionResult> TccUserTry([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildFailureResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){await branchBarrier.Call(conn, async (tx) =>{//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();//判断预算--不够回滚if (UserMoney == null || UserMoney!.money + body.trymoney < 0) obj = TransResponse.BuildFailureResponse();else{//修改信息准备提交UserMoney!.balance = 1;UserMoney.trading_balance = 1;UserMoney.trymoney = body.trymoney;UserMoney.guid = gid;_dtmDbContext.SaveChanges();obj = TransResponse.BuildSucceedResponse();}await Task.CompletedTask;});}obj = TransResponse.BuildFailureResponse();_Logger.LogInformation($"{gid}--Try成功");return Ok(obj);}[HttpPost][Route("/TccUser2Confirm")]public async Task<IActionResult> TccUserConfirm([FromQuery] string gid, [FromQuery] string trans_type,[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildFailureResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){await branchBarrier.Call(conn, async (tx) =>{//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();if (UserMoney != null){UserMoney!.balance = 0;UserMoney!.trading_balance = 0;UserMoney!.money += UserMoney!.trymoney;UserMoney!.trymoney = 0;UserMoney!.guid = string.Empty;_dtmDbContext.SaveChanges();obj = TransResponse.BuildSucceedResponse();}//修改信息准备提交 await Task.CompletedTask;});}_Logger.LogInformation($"{gid}--Confirm成功");return Ok(obj);}[HttpPost][Route("/TccUser2Cancel")]public async Task<IActionResult> TccUserCancel([FromQuery] string gid, [FromQuery] string trans_type,[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildFailureResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){//操作回滚并解锁await branchBarrier.Call(conn, async (tx) =>{//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id ).FirstOrDefault();if (UserMoney != null){UserMoney!.balance = 0;UserMoney!.trading_balance = 0;UserMoney!.trymoney = 0;UserMoney!.guid = string.Empty;_dtmDbContext.SaveChanges();obj = TransResponse.BuildSucceedResponse();}await Task.CompletedTask;});}_Logger.LogInformation($"{gid}--Cancel成功");return Ok(obj);}}
}
小结
本文给出了一个完整的 TCC 事务方案,是一个可以实际运行的 TCC,您只需要在这个示例的基础上进行简单修改,就能够用于解决您的真实问题