.NET CORE 分布式事务(三) DTM实现Saga及高并发下的解决方案

目录(结尾附加项目代码资源地址)

引言:

1. SAGA事务模式

2. 拆分为子事务

3. 失败回滚

4. 如何做补偿

4.1 失败的分支是否需要补偿

5. 异常

6. 异常与子事务屏障

6.1 NPC的挑战

6.2 现有方案的问题

6.3 子事务屏障

6.4 原理

7. 更多高级场景

7.1 部分第三方操作无法回滚(go语言写了一点,不要在意这些细节,下面开始 .NET)

7.2 超时回滚

8.0 .NET CORE结合DTM实现Saga(C#启动)

8.1 准备工作(和前两期的注册环节、数据库差不多的操作,看过前两期的小伙伴可跳过8.1阶段)

8.1.1 Nuget引入Dtmcli

8.1.2 生成转账数据库(EF_CORE)

8.1.3 DbContext

8.1.4 数据库持久化

8.1.5 数据库最终生成

8.1.6 appsettings.json

8.1.7 Program.cs

8.2 主程序事务API控制器

8.3 用户1转账事务API控制器

8.4 用户2转账事务API控制器

9. 开始运行

9.1 先给A和B两位用户各1000块钱。

9.2 执行转账

10. 并发下执行Saga分布式事务

10.1 Program.cs代码修改

10.2 用户1转账事务API控制器代码修改

10.3 用户2转账事务API控制器代码修改

10.4 Redis启动

小结


引言:

紧接前两期   .NET CORE 分布式事务(一) DTM实现二阶段提交(.NET CORE 分布式事务(一) DTM实现二阶段提交-CSDN博客)  .NET CORE 分布式事务(二) DTM实现TCC(.NET CORE 分布式事务(二) DTM实现TCC-CSDN博客)  本期讲解Saga分布式事务,并探讨如何在高并发下使用Saga分布式事务。

1. SAGA事务模式

SAGA事务模式是DTM中最常用的模式,主要是因为SAGA模式简单易用,工作量少,并且能够解决绝大部分业务的需求。SAGA最初出现在1987年Hector Garcaa-Molrna & Kenneth Salem发表的论文SAGAS里。其核心思想是将长事务拆分为多个短事务,由Saga事务协调器协调,如果每个短事务都成功提交完成,那么全局事务就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。

2. 拆分为子事务

例如我们要进行一个类似于银行跨行转账的业务,将A中的30元转给B,根据Saga事务的原理,我们将整个全局事务,切分为以下服务:

  • 转出(TransOut)服务,这里转出将会进行操作A-30
  • 转出补偿(TransOutCompensate)服务,回滚上面的转出操作,即A+30
  • 转入(TransIn)服务,转入将会进行B+30
  • 转入补偿(TransInCompensate)服务,回滚上面的转入操作,即B-30

整个SAGA事务的逻辑是:

执行转出成功=>执行转入成功=>全局事务完成

如果在中间发生错误,例如转入B发生错误,则会调用已执行分支的补偿操作,即:

执行转出成功=>执行转入失败=>执行转入补偿成功=>执行转出补偿成功=>全局事务回滚完成

下面我们看一个成功完成的SAGA事务典型的时序图:

在这个图中,我们的全局事务发起人,将整个全局事务的编排信息,包括每个步骤的正向操作和反向补偿操作定义好之后,提交给服务器,服务器就会按步骤执行前面SAGA的逻辑。

3. 失败回滚

如果有正向操作失败,例如账户余额不足或者账户被冻结,那么dtm会调用各分支的补偿操作,进行回滚,最后事务成功回滚。失败的时序图如下:

补偿执行顺序:

dtm的SAGA事务在1.10.0及之前,补偿操作是并发执行的,1.10.1之后,是根据用户指定的分支顺序,进行回滚的。

如果是普通SAGA,没有打开并发选项,那么SAGA事务的补偿分支是完全按照正向分支的反向顺序进行补偿的。

如果是并发SAGA,补偿分支也会并发执行,补偿分支的执行顺序与指定的正向分支顺序相反。假如并发SAGA指定A分支之后才能执行B,那么进行并发补偿时,DTM保证A的补偿操作在B的补偿操作之后执行

4. 如何做补偿

当SAGA对分支A进行失败补偿时,A的正向操作可能1. 已执行;2. 未执行;3. 甚至有可能处于执行中,最终执行成功或者失败是未知的。那么对A进行补偿时,要妥善处理好这三种情况,难度很大。

dtm提供了子事务屏障技术,自动处理上述三种情况,开发人员只需要编写好针对1的补偿操作情况即可,相关工作大幅简化,详细原理,参见下面的异常章节。

4.1 失败的分支是否需要补偿

dtm 常被问到的一个问题是,TransIn返回失败,那么这个时候是否还需要调用TransIn的补偿操作?DTM 的做法是,统一进行一次调用,这种的设计考虑点如下:

  • XA, TCC 等事务模式是必须要的,SAGA 为了保持简单和统一,设计为总是调用补偿
  • DTM 支持单服务多数据源,可能出现数据源1成功,数据源2失败,这种情况下,需要确保补偿被调用,数据源1的补偿被执行
  • DTM 提供的子事务屏障,自动处理了补偿操作中的各种情况,用户只需要执行与正向操作完全相反的补偿即可

5. 异常

在事务领域,异常是需要重点考虑的问题,例如宕机失败,进程crash都有可能导致不一致。当我们面对分布式事务,那么分布式中的异常出现更加频繁,对于异常的设计和处理,更是重中之重。

我们将异常分为以下几类:

  • 偶发失败: 在微服务领域,由于网络抖动、机器宕机、进程Crash会导致微小比例的请求失败。这类问题的解决方案是重试,第二次进行重试,就能够成功,因此微服务框架或者网关类的产品,都会支持重试,例如配置重试3次,每次间隔2s。DTM的设计对重试非常友好,应当支持幂等的各个接口都已支持幂等,不会发生因为重试导致事务bug的情况
  • 故障宕机: 大量公司内部都有复杂的多项业务,这些业务中偶尔有一两个非核心业务故障也是常态。DTM也考虑了这样的情况,在重试方面做了指数退避算法,如果遇见了故障宕机情况,那么指数退避可以避免大量请求不断发往故障应用,避免雪崩。
  • 网络乱序: 分布式系统中,网络延时是难以避免的,所以会发生一些乱序的情况,例如转账的例子中,可能发生服务器先收到撤销转账的请求,再收到转账请求。这类的问题是分布式事务中的一个重点难点问题。

业务上的失败与异常是需要做严格区分的,例如前面的余额不足,是业务上的失败,必须回滚,重试毫无意义。分布式事务中,有很多模式的某些阶段,要求最终成功。例如dtm的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功。

6. 异常与子事务屏障

分布式事务之所以难,主要是因为分布式系统中的各个节点都可能发生各种非预期的情况。本文先介绍分布式系统中的异常问题,然后介绍这些问题带给分布式事务的挑战,接下来指出现有各种常见用法的问题,最后给出正确的方案。

6.1 NPC的挑战

分布式系统最大的敌人可能就是NPC了,在这里它是Network Delay, Process Pause, Clock Drift的首字母缩写。我们先看看具体的NPC问题是什么:

  • Network Delay,网络延迟。虽然网络在多数情况下工作的还可以,虽然TCP保证传输顺序和不会丢失,但它无法消除网络延迟问题。
  • Process Pause,进程暂停。有很多种原因可以导致进程暂停:比如编程语言中的GC(垃圾回收机制)会暂停所有正在运行的线程;再比如,我们有时会暂停云服务器,从而可以在不重启的情况下将云服务器从一台主机迁移到另一台主机。我们无法确定性预测进程暂停的时长,你以为持续几百毫秒已经很长了,但实际上持续数分钟之久进程暂停并不罕见。
  • Clock Drift,时钟漂移。现实生活中我们通常认为时间是平稳流逝,单调递增的,但在计算机中不是。计算机使用时钟硬件计时,通常是石英钟,计时精度有限,同时受机器温度影响。为了在一定程度上同步网络上多个机器之间的时间,通常使用NTP协议将本地设备的时间与专门的时间服务器对齐,这样做的一个直接结果是设备的本地时间可能会突然向前或向后跳跃。

分布式事务既然是分布式的系统,自然也有NPC问题。因为没有涉及时间戳,带来的困扰主要是NP。

6.2 现有方案的问题

我们看到开源项目dtm之外,包括各云厂商,各开源项目,他们给出的业务实现建议大多类似如下(这也是大多数用户最容易想到的方案):

  • 空补偿: “针对该问题,在服务设计时,需要允许空补偿,即在没有找到要补偿的业务主键时,返回补偿成功,并将原业务主键记录下来,标记该业务流水已补偿成功。”
  • 防悬挂: “需要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在,如果存在则要拒绝执行该笔服务,以免造成数据不一致。”

事实上,NPC里的P和C,以及P和C的组合,有很多种的场景,都可以导致上述竞态情况,就不一一赘述了。

虽然这种情况发生的概率不高,但是在金融领域,一旦涉及金钱账目,那么带来的影响可能是巨大的。

PS:幂等控制如果也采用“先查再改”,也是一样很容易出现类似的问题。解决这一类问题的关键点是要利用唯一索引,“以改代查”来避免竞态条件。

6.3 子事务屏障

 在dtm中,首创了子事务屏障技术,使用该技术,能够非常便捷的解决异常问题,极大的降低了分布式事务的使用门槛。

子事务屏障能够达到下面这个效果,看示意图:

所有这些请求,到了子事务屏障后:不正常的请求,会被过滤;正常请求,通过屏障。开发者使用子事务屏障之后,前面所说的各种异常全部被妥善处理,业务开发人员只需要关注实际的业务逻辑,负担大大降低。 子事务屏障提供了方法BranchBarrier.Call,业务开发人员,在busiCall里面编写自己的相关逻辑,调用 BranchBarrier.Call 。 BranchBarrier.Call保证,在空回滚、悬挂等场景下,busiCall不会被调用;在业务被重复调用时,有幂等控制,保证只被提交一次。子事务屏障会管理TCC、SAGA、事务消息等,也可以扩展到其他领域

6.4 原理

子事务屏障技术的原理是,在本地数据库,建立分支操作状态表dtm_barrier,唯一键为全局事务id-分支id-分支操作(try|confirm|cancel)

  1. 开启本地事务
  2. 对于当前操作op(try|confirm|cancel),insert ignore一条数据gid-branchid-op,如果插入不成功,提交事务返回成功(常见的幂等控制方法)
  3. 如果当前操作是cancel,那么在insert ignore一条数据gid-branchid-try,如果插入成功(注意是成功),则提交事务返回成功
  4. 调用屏障内的业务逻辑,如果业务返回成功,则提交事务返回成功;如果业务返回失败,则回滚事务返回失败

在此机制下,解决了乱序相关的问题

  • 空补偿控制--如果Try没有执行,直接执行了Cancel,那么3中Cancel插入gid-branchid-try会成功,不走屏障内的逻辑,保证了空补偿控制
  • 幂等控制--2中任何一个操作都无法重复插入唯一键,保证了不会重复执行
  • 防悬挂控制--Try在Cancel之后执行,那么Cancel会在3中插入gid-branchid-try,导致Try在2中不成功,就不执行屏障内的逻辑,保证了防悬挂控制

 对于SAGA、二阶段消息,也是类似的机制。

7. 更多高级场景

在实际应用中,还遇见过一些业务场景,需要一些额外的技巧进行处理

7.1 部分第三方操作无法回滚(go语言写了一点,不要在意这些细节,下面开始 .NET)

例如一个订单中的发货,一旦给出了发货指令,那么涉及线下相关操作,那么很难直接回滚。对于涉及这类情况的saga如何处理呢?

我们把一个事务中的操作分为可回滚的操作,以及不可回滚的操作。那么把可回滚的操作放到前面,把不可回滚的操作放在后面执行,那么就可以解决这类问题

	saga := dtmcli.NewSaga(DtmServer, shortuuid.New()).Add(Busi+"/CanRollback1", Busi+"/CanRollback1Revert", req).Add(Busi+"/CanRollback2", Busi+"/CanRollback2Revert", req).Add(Busi+"/UnRollback1", "", req).Add(Busi+"/UnRollback2", "", req).EnableConcurrent().AddBranchOrder(2, []int{0, 1}). // 指定step 2,需要在0,1完成后执行AddBranchOrder(3, []int{0, 1}) // 指定step 3,需要在0,1完成后执行

示例中的代码,指定Step 2,3 中的 UnRollback 操作,必须在Step 0,1 完成后执行。

对于不可回滚的操作,DTM的设计建议是,不可回滚的操作在业务上也不允许返回失败。可以这么思考,如果发货的操作返回了失败,那么这个失败的含义是不够清晰的,调用方不知道这个失败是修改了部分数据的失败,还是修改数据前的业务校验失败,因为这个操作不可回滚,所以调用方收到这个失败,是不知道如何正确处理这个错误的。

另外当你的一个全局事务中,如果出现了两个既不可回滚的又可能返回失败的操作,那么到了实际运行中,一个执行成功,一个执行失败,此时执行成功的那个事务无法回滚,那么这个事务的一致性就不可能保证了。

对于发货操作,如果可能在校验数据上可能发生失败,那么将发货操作拆分为发货校验、发货两个服务则会清晰很多,发货校验可回滚,发货不可回滚同时也不会失败。

7.2 超时回滚

saga属于长事务,因此持续的时间跨度很大,可能是100ms到1天,因此saga没有默认的超时时间。dtm支持saga事务单独指定超时时间,到了超时时间,全局事务就会回滚。

saga.TimeoutToFail = 1800

在saga事务中,设置超时时间一定要注意,这类事务里不能够包含无法回滚的事务分支,因为超时回滚时,已执行的无法回滚的分支,数据就是错的。

8.0 .NET CORE结合DTM实现Saga(C#启动)

8.1 准备工作(和前两期的注册环节、数据库差不多的操作,看过前两期的小伙伴可跳过8.1阶段)

8.1.1 Nuget引入Dtmcli

  <ItemGroup><PackageReference Include="Dtmcli" Version="1.4.0" /></ItemGroup>

8.1.2 生成转账数据库(EF_CORE)

//模型
public partial class UserMoney
{public int id { get; set; }public int money { get; set; }public int trading_balance { get; set; }public int balance { get; set; }public int trymoney { get; set; }public string guid { get; set; }
}

8.1.3 DbContext

 public class DtmDbContext : DbContext{public DtmDbContext() { }public DtmDbContext(DbContextOptions<DtmDbContext> options) : base(options) { }public virtual DbSet<UserMoney> UserMoney { get; set; }protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder){optionsBuilder.UseMySql("server=localhost;port=3307;user id=root;password=123;database=DTM_Test", ServerVersion.Parse("8.0.23-mysql")).UseLoggerFactory(LoggerFactory.Create(option =>{option.AddConsole();}));}protected override void OnModelCreating(ModelBuilder modelBuilder){modelBuilder.UseCollation("utf8_general_ci").HasCharSet("utf8");modelBuilder.Entity<UserMoney>(entity =>{entity.ToTable("UserMoney");});}}

8.1.4 数据库持久化

CREATE TABLE
IFNOT EXISTS DTM_Test.barrier (id BIGINT ( 22 ) PRIMARY KEY AUTO_INCREMENT,trans_type VARCHAR ( 45 ) DEFAULT '',gid VARCHAR ( 128 ) DEFAULT '',branch_id VARCHAR ( 128 ) DEFAULT '',op VARCHAR ( 45 ) DEFAULT '',barrier_id VARCHAR ( 45 ) DEFAULT '',reason VARCHAR ( 45 ) DEFAULT '' COMMENT 'the branch type who insert this record',create_time datetime DEFAULT now( ),update_time datetime DEFAULT now( ),KEY ( create_time ),KEY ( update_time ),UNIQUE KEY ( gid, branch_id, op, barrier_id ) ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4;

8.1.5 数据库最终生成

8.1.6 appsettings.json

{"Logging": {"LogLevel": {"Default": "Information","Microsoft.AspNetCore": "Warning"}},"AllowedHosts": "*","ConnectionString": "server=localhost;port=3307;user id=root;password=123;database=DTM_Test","DtmSettings": {"TransactionUrl": "http://localhost:5271","CompensateUrl": "http://localhost:5271"}
}

8.1.7 Program.cs

  // 注册DbContextbuilder.Services.AddDbContext<DtmDbContext>(options =>{options.UseMySql(builder.Configuration.GetValue<string>("ConnectionString"), ServerVersion.Parse("8.0.23-mysql"));});builder.Services.Configure<DtmSettings>(builder.Configuration.GetSection("DtmSettings"));builder.Services.AddDtmcli(dtm =>{dtm.DtmUrl = "http://localhost:36789";dtm.SqlDbType = "mysql";dtm.BarrierSqlTableName = "dtm_test.barrier";});

8.2 主程序事务API控制器

using DTM_EF;
using Dtmcli;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using System.Data.Common;
using MySqlConnector;
using DTM_EF.Model;
using Dtm_Saga;
using DtmCommon;
using Microsoft.CodeAnalysis.Operations;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.EntityFrameworkCore.Metadata.Internal;
using Newtonsoft.Json;namespace Dtm_Saga.Controllers
{[ApiController][Route("[controller]")]public class DtmSagaController : ControllerBase{private readonly ILogger<DtmSagaController> _logger;private readonly IDtmClient _dtmClient;private readonly IDtmTransFactory _transFactory;private readonly DtmSettings _settings;private readonly IBranchBarrierFactory _factory;private readonly DtmDbContext _dtmDbContext;public DtmSagaController(ILogger<DtmSagaController> logger,IDtmClient dtmClient,IDtmTransFactory transFactory,IOptions<DtmSettings> settings,IBranchBarrierFactory factory,DtmDbContext dtmDbContext){_logger = logger;_dtmClient = dtmClient;_transFactory = transFactory;_settings = settings.Value;_factory = factory;_dtmDbContext = dtmDbContext;}[HttpPost("dtm-Saga")]public async Task<IActionResult> Get(int Money, CancellationToken cancellationToken){var obj = TransResponse.BuildFailureResponse();try{//1. 创建gid。var gid = await _dtmClient.GenGid(cancellationToken);//2. 用户模型。UserMoney bodyA = new UserMoney() { id = 1, trymoney = -Money, guid = string.Empty };UserMoney bodyB = new UserMoney() { id = 2, trymoney = Money, guid = string.Empty };//3. 设置分支事务和补偿事务。var saga = _transFactory.NewSaga(gid).Add(_settings.TransactionUrl + "/Saga/UserATransactionUrl", _settings.CompensateUrl + "/Saga/UserACompensateUrl", bodyA).Add(_settings.TransactionUrl + "/Saga/UserBTransactionUrl", _settings.CompensateUrl + "/Saga/UserBCompensateUrl", bodyB).EnableWaitResult();//开启了`EnableWaitResult()`,则可通过捕获异常的方式,捕获事务失败的结果。//4. 执行submitawait saga.Submit();Console.ForegroundColor = ConsoleColor.Red;Console.WriteLine("result gid is {0}", gid);Console.ResetColor();obj = TransResponse.BuildSucceedResponse();}catch (DtmException ex){obj = TransResponse.BuildFailureResponse();}return Ok(obj);}}
}

8.3 用户1转账事务API控制器

using DTM_EF;
using DTM_EF.Model;
using Dtm_Saga;
using Dtmcli;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore.Metadata.Internal;
using Microsoft.Extensions.Caching.Distributed;
using MySqlConnector;
using Newtonsoft.Json;
using ServiceStack.Redis;
using System.Threading;namespace Dtm_Saga.Controllers
{[Route("api/[controller]")][ApiController]public class SagaUserAController : ControllerBase{private readonly IBranchBarrierFactory _barrierFactory;private readonly ILogger<SagaUserAController> _Logger;private readonly DtmDbContext _dtmDbContext;private readonly IRedisClient _redisClient;private readonly RedisService _redisService;public SagaUserAController(IBranchBarrierFactory barrierFactory,ILogger<SagaUserAController> Logger,DtmDbContext dtmDbContext,IRedisClient redisClient,RedisService redisService){_barrierFactory = barrierFactory;_Logger = Logger;_dtmDbContext = dtmDbContext;_redisClient = redisClient;_redisService = redisService;}[HttpPost][Route("/Saga/UserATransactionUrl")]public async Task<IActionResult> UserATransactionUrl([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildSucceedResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){try{await branchBarrier.Call(conn, async (tx) =>{//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();if (UserMoney is null){obj = TransResponse.BuildFailureResponse();throw new Exception($"用户{body.id}--不存在");}if (UserMoney.money + body.trymoney < 0){obj = TransResponse.BuildFailureResponse();throw new Exception($"用户{body.id}--金额不足");}//前序判断都通过,修改信息准备提交   UserMoney!.money += body.trymoney;_dtmDbContext.SaveChanges();await Task.CompletedTask;});}catch (Exception ex){_Logger.LogError(ex.Message);}}return Ok(obj);}[HttpPost][Route("/Saga/UserACompensateUrl")]public async Task<IActionResult> UserACompensateUrl([FromQuery] string gid, [FromQuery] string trans_type,[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){//var branchBarrier = _barrierFactory.CreateBranchBarrier(trans_type, gid, branch_id, op);var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildSucceedResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){try{await branchBarrier.Call(conn, async (tx) =>{//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();if (UserMoney is null){obj = TransResponse.BuildFailureResponse();throw new Exception($"用户{body.id}--不存在");}//前序判断都通过,修改信息准备提交 UserMoney!.money -= body.trymoney;_dtmDbContext.SaveChanges();await Task.CompletedTask;});}catch (Exception ex){_Logger.LogError(ex.Message);}}return Ok(obj);}}
}

8.4 用户2转账事务API控制器

using DTM_EF;
using DTM_EF.Model;
using Dtm_Saga;
using Dtmcli;
using ServiceStack.Redis;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Caching.Distributed;
using MySqlConnector;
using Newtonsoft.Json;namespace Dtm_Saga.Controllers
{[Route("api/[controller]")][ApiController]public class SagaUserBController : ControllerBase{private readonly IBranchBarrierFactory _barrierFactory;private readonly ILogger<SagaUserBController> _Logger;private readonly DtmDbContext _dtmDbContext;private readonly IRedisClient _redisClient;private readonly RedisService _redisService;public SagaUserBController(IBranchBarrierFactory barrierFactory,ILogger<SagaUserBController> Logger,DtmDbContext dtmDbContext,IRedisClient redisClient,RedisService redisService){_barrierFactory = barrierFactory;_Logger = Logger;_dtmDbContext = dtmDbContext;_redisClient = redisClient;_redisService = redisService;}[HttpPost][Route("/Saga/UserBTransactionUrl")]public async Task<IActionResult> UserBTransactionUrl([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildSucceedResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){try{await branchBarrier.Call(conn, async (tx) =>{//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();if (UserMoney is null){obj = TransResponse.BuildFailureResponse();throw new Exception($"用户{body.id}--不存在");}if (UserMoney.money + body.trymoney < 0){obj = TransResponse.BuildFailureResponse();throw new Exception($"用户{body.id}--金额不足");}//前序判断都通过,修改信息准备提交   UserMoney!.money += body.trymoney;_dtmDbContext.SaveChanges();await Task.CompletedTask;});}catch (Exception ex){_Logger.LogError(ex.Message);}}return Ok(obj);}[HttpPost][Route("/Saga/UserBCompensateUrl")]public async Task<IActionResult> UserBCompensateUrl([FromQuery] string gid, [FromQuery] string trans_type,[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildSucceedResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){try{await branchBarrier.Call(conn, async (tx) =>{//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();if (UserMoney == null){obj = TransResponse.BuildFailureResponse();throw new Exception($"用户{body.id}--不存在");}//修改信息准备提交       UserMoney!.money -= body.trymoney;_dtmDbContext.SaveChanges();await Task.CompletedTask;});}catch (Exception ex){_Logger.LogError(ex.Message);}}return Ok(obj);}}
}

9. 开始运行

9.1 先给A和B两位用户各1000块钱。

9.2 执行转账

转100。

转-200。 

此时我们可以看到Saga分布式事务已经正常执行,并完成了转100和-200的操作。

10. 并发下执行Saga分布式事务

我们思考一个问题,每次只有一个请求的时候Saga分布式事务完美运行,但是在高并发下也能正常运行吗?我们测试一下。

打开apipost,输入请求地址,选择一键压测。

每次请求转账10元,10个并发。难道真的能转账成功吗?(执行之前恢复数据,A:1000元;B:1000元)。我们开始运行。

执行之后可以看到A:960元,B:1080元。按照常理来说每次转账10块钱,并发10次。10*10=100元,应该是A900元,B1100元。但是为什么会出现这个情况呢?

原因就是上一个并发还没执行完,当前并发也会访问数据库资源。数据库执行冲突,导致金额转账失败。如果你的程序写成这样基本就可以卷铺盖走人了。

那我们应该如何解决呢?有的小伙伴会说可以使用RabbitMQ消息队列(.NET CORE消息队列RabbitMQ-CSDN博客)。这确实是一个解决方案,把数据交给RabbitMQ。然后订阅一个一个数据的执行,一个一个用户进行扣款或转账。确实可以完美解决这个并发的问题。解决并发有很多解决方案。今天呢就不用RabbitMQ了,换一个。用Redis缓存数据库的分布式锁来解决这个并发的问题。(针对于Redis缓存数据库的分布式锁的非阻塞锁、阻塞锁、红锁以及锁的续命.NET CORE使用Redis分布式锁续命(续期)问题-CSDN博客,缓存数据类型,缓存api,Lua脚本,主从模式,读写分离,哨兵模式,集群模式等........之后陆续会推出文章,现在不做过多赘述,先解决这个并发问题。)

10.1 Nuget引入StackExchange.Redis

  <ItemGroup><PackageReference Include="StackExchange.Redis" Version="2.7.4" /></ItemGroup>

ServiceStack.Redis自3.9版本以后开始收费,我们坚持一贯的作风,能不花钱的就不花钱。但是ServiceStack.Redis提供的分布式锁api写的是真的很好,StackExchange.Redis里没有阻塞锁。我们需要自己手写。

using Microsoft.AspNetCore.DataProtection.KeyManagement;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using System.Diagnostics;
using System.Net.Sockets;
using System.Threading;namespace Dtm_Saga
{public class RedisService{private readonly ConnectionMultiplexer _redis;private readonly IDatabase _database;/// <summary>/// 初始化 <see cref="RedisService"/> 类的新实例。/// </summary>/// <param name="connectionMultiplexer">连接多路复用器。</param>public RedisService(string connectionString){_redis = ConnectionMultiplexer.Connect(connectionString);_database = _redis.GetDatabase();}#region 分布式锁...#region 阻塞锁public bool RedisLock(string key, int expireMilliSeconds, int timeout){var script = @"local isNX = redis.call('SETNX', KEYS[1], ARGV[1])if isNX == 1 thenredis.call('PEXPIRE', KEYS[1], ARGV[2])return 1endreturn 0";RedisKey[] scriptkey = { key };RedisValue[] scriptvalues = { key, expireMilliSeconds * 1000 };var stopwatch = Stopwatch.StartNew();while (stopwatch.Elapsed.TotalSeconds < timeout){if (_database.ScriptEvaluate(script, scriptkey, scriptvalues).ToString() == "1"){stopwatch.Stop();return true;}}Console.WriteLine($"[{DateTime.Now}]{key}--阻塞锁超时");stopwatch.Stop();return false;}public bool RedisUnLock(string key){var script = @"local getLock = redis.call('GET', KEYS[1])if getLock == ARGV[1] thenredis.call('DEL', KEYS[1])return 1endreturn 0";RedisKey[] scriptkey = { key };RedisValue[] scriptvalues = { key };return _database.ScriptEvaluate(script, scriptkey, scriptvalues).ToString() == "1";}#endregion#endregion}
}

 用Redis的Lua脚本来实现阻塞锁的机制,Lua脚本在Redis中被认为是原子性的。在执行Lua脚本期间,Redis不会并行处理其他客户端的命令,而是将它们排队等待。因此,Lua脚本中的所有指令都会连续无中断地执行,不会与其他任何命令交错。

10.2 Program.cs代码修改

Program中注册Redis,这里连接超时加了500000秒,是要高并发时程序一直在请求Redis,实际生产环境需要多次测试,选择一个最佳的超时时间。

            builder.Services.AddSingleton<RedisService>(provider =>{//你的Redis连接字符串string redisConnectionString = "127.0.0.1:6379,abortConnect=false,syncTimeout=500000";return new RedisService(redisConnectionString);});

10.3 用户1转账事务API控制器代码修改

using DTM_EF;
using DTM_EF.Model;
using Dtm_Saga;
using Dtmcli;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore.Metadata.Internal;
using Microsoft.Extensions.Caching.Distributed;
using MySqlConnector;
using Newtonsoft.Json;
using ServiceStack.Redis;
using System.Threading;namespace Dtm_Saga.Controllers
{[Route("api/[controller]")][ApiController]public class SagaUserAController : ControllerBase{private readonly IBranchBarrierFactory _barrierFactory;private readonly ILogger<SagaUserAController> _Logger;private readonly DtmDbContext _dtmDbContext;private readonly IRedisClient _redisClient;private readonly RedisService _redisService;public SagaUserAController(IBranchBarrierFactory barrierFactory,ILogger<SagaUserAController> Logger,DtmDbContext dtmDbContext,IRedisClient redisClient,RedisService redisService){_barrierFactory = barrierFactory;_Logger = Logger;_dtmDbContext = dtmDbContext;_redisClient = redisClient;_redisService = redisService;}[HttpPost][Route("/Saga/UserATransactionUrl")]public async Task<IActionResult> UserATransactionUrl([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildSucceedResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){try{await branchBarrier.Call(conn, async (tx) =>{//Redis分布式锁,锁定if (_redisService.RedisLock("DataLock:UserATransactionUrl", 2000, 2000)){//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();if (UserMoney is null){obj = TransResponse.BuildFailureResponse();throw new Exception($"用户{body.id}--不存在");}if (UserMoney.money + body.trymoney < 0){obj = TransResponse.BuildFailureResponse();throw new Exception($"用户{body.id}--金额不足");}//前序判断都通过,修改信息准备提交   UserMoney!.money += body.trymoney;_dtmDbContext.SaveChanges();}await Task.CompletedTask;});}catch (Exception ex){_Logger.LogError(ex.Message);}finally{//Redis分布式锁,释放锁_redisService.RedisUnLock("DataLock:UserATransactionUrl");}}return Ok(obj);}[HttpPost][Route("/Saga/UserACompensateUrl")]public async Task<IActionResult> UserACompensateUrl([FromQuery] string gid, [FromQuery] string trans_type,[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){//var branchBarrier = _barrierFactory.CreateBranchBarrier(trans_type, gid, branch_id, op);var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildSucceedResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){try{await branchBarrier.Call(conn, async (tx) =>{//Redis分布式锁,锁定if (_redisService.RedisLock("DataLock:UserACompensateUrl", 2000, 2000)){//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();if (UserMoney is null){obj = TransResponse.BuildFailureResponse();throw new Exception($"用户{body.id}--不存在");}//前序判断都通过,修改信息准备提交 UserMoney!.money -= body.trymoney;_dtmDbContext.SaveChanges();}await Task.CompletedTask;});}catch (Exception ex) { _Logger.LogError(ex.Message); }finally{//Redis分布式锁,释放锁_redisService.RedisUnLock("DataLock:UserACompensateUrl");}}return Ok(obj);}}
}

10.4 用户2转账事务API控制器代码修改

using DTM_EF;
using DTM_EF.Model;
using Dtm_Saga;
using Dtmcli;
using ServiceStack.Redis;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Caching.Distributed;
using MySqlConnector;
using Newtonsoft.Json;namespace Dtm_Saga.Controllers
{[Route("api/[controller]")][ApiController]public class SagaUserBController : ControllerBase{private readonly IBranchBarrierFactory _barrierFactory;private readonly ILogger<SagaUserBController> _Logger;private readonly DtmDbContext _dtmDbContext;private readonly IRedisClient _redisClient;private readonly RedisService _redisService;public SagaUserBController(IBranchBarrierFactory barrierFactory,ILogger<SagaUserBController> Logger,DtmDbContext dtmDbContext,IRedisClient redisClient,RedisService redisService){_barrierFactory = barrierFactory;_Logger = Logger;_dtmDbContext = dtmDbContext;_redisClient = redisClient;_redisService = redisService;}[HttpPost][Route("/Saga/UserBTransactionUrl")]public async Task<IActionResult> UserBTransactionUrl([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildSucceedResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){try{await branchBarrier.Call(conn, async (tx) =>{//Redis分布式锁,锁定if (_redisService.RedisLock("DataLock:UserBTransactionUrl", 2000, 2000)){//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();if (UserMoney is null){obj = TransResponse.BuildFailureResponse();throw new Exception($"用户{body.id}--不存在");}if (UserMoney.money + body.trymoney < 0){obj = TransResponse.BuildFailureResponse();throw new Exception($"用户{body.id}--金额不足");}//前序判断都通过,修改信息准备提交   UserMoney!.money += body.trymoney;_dtmDbContext.SaveChanges();}await Task.CompletedTask;});}catch (Exception ex){_Logger.LogError(ex.Message);}finally{//Redis分布式锁,释放锁_redisService.RedisUnLock("DataLock:UserBTransactionUrl");}}return Ok(obj);}[HttpPost][Route("/Saga/UserBCompensateUrl")]public async Task<IActionResult> UserBCompensateUrl([FromQuery] string gid, [FromQuery] string trans_type,[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);var obj = TransResponse.BuildSucceedResponse();using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test")){try{await branchBarrier.Call(conn, async (tx) =>{//Redis分布式锁,锁定if (_redisService.RedisLock("DataLock:UserBCompensateUrl", 2000, 2000)){//获取用户账户信息var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();if (UserMoney == null){obj = TransResponse.BuildFailureResponse();throw new Exception($"用户{body.id}--不存在");}//修改信息准备提交       UserMoney!.money -= body.trymoney;_dtmDbContext.SaveChanges();}await Task.CompletedTask;});}catch (Exception ex) { _Logger.LogError(ex.Message); }finally{//Redis分布式锁,释放锁_redisService.RedisUnLock("DataLock:UserACompensateUrl");}}return Ok(obj);}}
}

!!!谨记!!!-----释放锁的时候,每个锁的释放代码只能在finally出现一次。不能在try里也写上释放锁。虽然当前并发在try里释放和finally里释放运行并没有问题。但是下一个并发在执行的时候上一个执行到try释放锁之后,立即抢锁,抢锁成功。结果在执行的时候上一个并发在执行finally的时候给释放了,这样是不对的。

10.5 Redis启动

修改好代码之后继续运行。(先启动一个win版本的Redis,linux docker启动等操作之后推出文章。数据进行恢复,A:1000元,B:1000元)。每次转账1元,直接上100,200,500并发。

转账1元,100并发:

转账1元,200并发:(让程序跑一会,喝杯水)

转账1元,500并发:(可以离开工位出去透透气)

为什么A:1627元,B:373元 呢?单次并发有点多导致HTTP请求超时。这时候当前的服务就要部署多个实例,可参考之前文章微服务架构Nacos(.NET CORE微服务之Nacos_nacos .net core-CSDN博客;.NET CORE微服务之Ocelot(连接Nacos)_net ocelot + noces-CSDN博客;.NET CORE微服务之Polly_polly .net core-CSDN博客),或用Nginx反向代理实现负载均衡。

虽然出现问题,也无法避免人工介入。但是我们最起码保证了用户资产并未出现超减或超加现象,这也是电商中的秒杀,防止商品超卖解决方案。当然应对高并发还有非常多的解决方案。

小结

本文给出了一个完整的 SAGA 事务方案,是一个可以实际运行的 SAGA,并解决高并发的使用场景,您只需要在这个示例的基础上进行简单修改,就能够用于解决您的真实问题

本文项目代码资源地址:【免费】.NETCORE分布式事务(三)DTM实现Saga及高并发下的解决方案资源-CSDN文库

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/780684.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3+threejs新手从零开发卡牌游戏(二十二):添加己方游戏流程(先后手、抽牌、主要阶段、战斗阶段、结束阶段)

首先在utils/common.ts里定义一些流程相关的变量&#xff1a; const flow ref([ // 游戏流程{name: "抽卡阶段"},{name: "主要阶段"},{name: "战斗阶段"},{name: "结束阶段"}])const flowIndex ref(0) // 当前流程const currentPla…

[C++初阶] 爱上C++ : 与C++的第一次约会

&#x1f525;个人主页&#xff1a;guoguoqiang &#x1f525;专栏&#xff1a;我与C的爱恋 本篇内容带大家浅浅的了解一下C中的命名空间。 在c中&#xff0c;名称&#xff08;name&#xff09;可以是符号常量、变量、函数、结构、枚举、类和对象等等。工程越大&#xff0c;名称…

什么是gif? 如何把视频格式转成gif动图格式?展现动图的魅力

一&#xff0c;什么是gif格式 gif是一种位图图形文件格式&#xff0c;主要用于显示索引彩色图像。gif格式在1987年由CompuServe公司开发&#xff0c;它采用LZW&#xff08;Lempel-Ziv-Welch&#xff09;无损压缩算法&#xff0c;这种算法可以有效地减少图像文件在网络上传…

在.Net6中用gdal实现第一个功能

目录 一、创建.NET6的控制台应用程序 二、加载Gdal插件 三、编写程序 一、创建.NET6的控制台应用程序 二、加载Gdal插件 Gdal的资源可以经过NuGet包引入。右键单击项目名称&#xff0c;然后选择 "Manage NuGet Packages"&#xff08;管理 NuGet 包&#xff09;。N…

【C++】 vector 数组/向量

文章目录 【 1. vector 的声明与初始化 】1.1 vector 的声明1.2 vector 的初始化1.2.1 构造一个空的 vector1.2.2 指定数量初值的方式初始化 vector1.2.3 迭代器的方式初始化1.2.4 构造一个相同的 vector 【 2. vector 的相关操作 】2.1 插入元素2.1.1 在vector的末尾插入新元素…

蚂蚁新村3.30答案:“秀女拈针锦线长,纤纤玉指领馨香”说的是哪一项非遗技艺

蚂蚁新村是一个虚拟社区。在这个虚拟社区中&#xff0c;用户可以参与各种活动&#xff0c;比如生产能量豆、做慈善捐赠等。同时&#xff0c;蚂蚁新村也提供了一些知识问答环节&#xff0c;用户在参与的过程中可以增进知识。这些问答内容往往涉及广泛的主题&#xff0c;如文化、…

iOS - Runtime - Class-方法缓存(cache_t)

文章目录 iOS - Runtime - Class-方法缓存(cache_t)1. 散列表的存取值 iOS - Runtime - Class-方法缓存(cache_t) Class内部结构中有个方法缓存&#xff08;cache_t&#xff09;&#xff0c;用散列表&#xff08;哈希表&#xff09;来缓存曾经调用过的方法&#xff0c;可以提高…

Python3:ModuleNotFoundError: No module named ‘elftools‘

问题背景 问题 ModuleNotFoundError: No module named ‘elftools’ 解决方法 pip3 install pyelftools 成功&#xff01;&#xff01;&#xff01;

YPay源支付V7开源版

YPay_V7版本即将停止维护更新&#xff0c;同时我们将开放最新版开源代码供学习和参考。虽然首批阶段的【function_8.1.php文件是加密的】&#xff0c;但授权已经除去&#xff0c;该代码将在新版YPay上线时开放给大家。我们也会定期进行迭代更新&#xff0c;随后将创建对应仓库&…

【QT学习】1.qt初识,创建qt工程,使用按钮,第一个交互按钮

1.初识qt--》qt是个框架&#xff0c;不是语言 1.学习路径 一 QT简介 &#xff0c;QTCreator &#xff0c;QT工程 &#xff0c;QT的第一个程序&#xff0c;类&#xff0c;组件 二 信号与槽 三 对话框 四 QT Desiner 控件 布局 样式 五 事件 六 GUI绘图 七 文件 八 …

解决 linux 服务器 java 命令不生效问题

在Linux系统中&#xff0c;当你安装Java并设置了JAVA_HOME环境变量后&#xff0c;你可能需要使用source /etc/profile命令来使Java命令生效。这是因为/etc/profile是一个系统级的配置文件&#xff0c;它包含了系统的全局环境变量设置。 但是需要注意的是&#xff0c;source /e…

使用C语言实现Linux下的并发Http服务器

使用C语言实现Linux下的并发Http服务器 文章目录 使用C语言实现Linux下的并发Http服务器先备知识Http协议请求格式&#xff1a;客户端请求服务端响应 Demo 实现Mini的Http服务器流程接收Http请求实现按行读取请求头部请求头部的结束 解析请求响应请求读取文件&#xff08;http需…

品质领航,流量赋能,2024喜尔康浙江省经销商培训会在喜尔康总部成功举行

3月29日&#xff0c;以“新零售、新流量、新风口”为主题的2024喜尔康浙江省经销商培训会在喜尔康总部正式开始举办。活动旨在智能新时代赋能经销商伙伴&#xff0c;通过抓住行业智能化风口&#xff0c;实现喜尔康与经销商的共赢&#xff0c;决胜未来新零售商机。 喜尔康始终致…

Charles for Mac 强大的网络调试工具

Charles for Mac是一款功能强大的网络调试工具&#xff0c;可以帮助开发人员和测试人员更轻松地进行网络通信测试和调试。以下是一些Charles for Mac的主要特点&#xff1a; 软件下载&#xff1a;Charles for Mac 4.6.6注册激活版 流量截获&#xff1a;Charles可以截获和分析通…

nuxt学习

一、遇到的问题 1、nuxt初始化失败问题解决方案 使用npm和pnpm初始化都失败 原因&#xff1a;主机连不上DNS服务器 解决方案 Step1: 打开文件夹 Windows:路径&#xff1a;C:\Windows\System32\drivers\etc Mac: 路径&#xff1a;/etc/hosts Step2: 使用记事本方式打开 …

44 el-dialog 的 appendToBody 属性, 导致 vue 响应式失效

前言 我们经常会碰到 一些 模型和视图 不同步的问题 通常意义上 主要的问题为 列表的某响应式数据更新着更新着 后面就变成非响应式对象了, 然后 就造成了 数据一直在更新, 但是 视图的渲染后面就未渲染了, 这是一个由于 模型上的问题 导致的数据的不在响应式更新 又或者 是…

【倪琴神品品鉴】全新倪诗韵神品级古琴

倪琴朱砂神品仲尼&#xff0c;仅此放漏一张&#xff1b;龙池侧签海门倪诗韵制&#xff0c;雁足上方刻“雷音琴坊”方章&#xff0c;凤沼下方有随形章“神品”二字&#xff1b;老木材纹理竖直&#xff0c;共振良好&#xff0c;是难得的佳器&#xff1b;附带倪老师亲笔签名收藏证…

图扑数字孪生智慧城市,综合治理一屏统览

现代城市作为一个复杂系统&#xff0c;牵一发而动全身&#xff0c;城市化进程中产生新的矛盾和社会问题都会影响整个城市系统的正常运转。智慧城市是应对这些问题的策略之一。领导曾在中央城市工作会议上指出&#xff0c;城市工作要树立系统思维&#xff0c;从构成城市诸多要素…

Hyper-V 虚拟机设置静态 IP 和外网访问

文章目录 环境说明1 问题简介2 解决过程 环境说明 宿主机操作系统&#xff1a;Windows 11 专业版漏洞复现操作系&#xff1a;debian-live-12.5.0-amd64-standard 1 问题简介 在 Windows 上用自带的 Hyper-V 虚拟机管理应用创建了一个 Debian 12 虚拟机&#xff0c;配置静态 IP…

windows安装Chocolatey

其实官网就有介绍&#xff0c;贴上原址&#xff1a; Chocolatey Software | Installing Chocolatey 安装步骤&#xff1a; 1、winX选择Windows Powershell(管理员) 2、复制以下指令 Set-ExecutionPolicy Bypass -Scope Process -Force; [System.Net.ServicePointManager]:…