基于CAP组件实现补偿事务与幂等性保障

【.NET Core】| 总结/Edison Zhou

1补偿事务和幂等性

在微服务架构下,我们会采用异步通信来对各个微服务进行解耦,从而我们会用到消息中间件来传递各个消息。 

d6de945864750c23b37fa5432e5ce48e.png

补偿事务

某些情况下,消费者需要返回值以告诉发布者执行结果,以便于发布者实施一些动作,通常情况下这属于补偿范围

例如,在一个电商程序中,订单初始状态为 pending,当商品数量成功扣除时将状态标记为 succeeded ,否则为 failed。

那么,这样看来实现逻辑应该是:当订单微服务提交订单,并发布了一个已下单的消息至下游微服务比如库存微服务,当库存微服务扣减库存后,无论扣减成功与否,都发送一个回调给订单微服务告知扣减状态。

如果我们自己来实现,可能需要较多的工作量,我们可以借助CAP组件来实现,它提供的callback功能可以很方便的做到这一点

幂等性

所谓幂等性,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。

在采用了消息中间件的分布式系统中,存在3中可能:

  • Exactly Once(*) (仅有一次)

  • At Most Once (最多一次)

  • At Least Once (最少一次)

带 * 号的也就是Exactly Once在实际场景中,很难达到

我们都知道,在CAP组件中,采用了数据库表(准确来说是临时存储),也许可以做到At Most Once,但是并没有提供严格保证消息不丢失的相关功能或配置。因此,CAP采用的交付保证是At Least Once,它并没有实现幂等。

其实,目前业界大多数基于事件驱动的框架都是要求用户自己来保证幂等性的,比如ENode,RocketMQ等。

综述,CAP组件可以帮助实现一些比较不严格的幂等,但是严格的幂等无法做到。这就需要我们自己来处理,通常有两种方式:

(1)以自然的方式处理幂等消息

比如数据库提供的 INSERT ON DUPLICATE KEY UPDATE 或者是才去类型的程序判断行为。

(2)显示处理幂等消息

这种方式更为常见,在消息传递过程中传递ID,然后由单独的消息跟踪器来处理。比如,我们可以借助Redis来实现这个消息跟踪器,下面的示例就是基于Redis来显示处理幂等的。

2基于CAP组件的Sample

这里我们以刚刚提到的电商服务为例,订单服务负责下单,库存服务负责扣减库存,二者通过Kafka进行消息传递,通过MongoDB进行持久化数据,CAP作为事件总线。

案例结构图

订单下单时会将将初始化状态为Pending的订单数据存入MongoDB,然后发送一个订单已下达的消息至事件总线,下游系统库存服务订阅这个消息并消费,也就是扣减库存。库存扣减成功后,订单服务根据扣减状态将订单状态改为Succeeded或Failed。

00ff74e626e150949573e3fa6112c0e0.jpeg

编写订单服务

创建一个ASP.NET 5/6 WebAPI项目,引入以下Package:

PM>Install-Package AutoMapper
PM>Install-Package AutoMapper.Extensions.Microsoft.DependencyInjection
PM>Install-Package DotNetCore.CAP
PM>Install-Package DotNetCore.CAP.Kafka
PM>Install-Package DotNetCore.CAP.MongoDB

编写一个Controller用于接收下单请求:

[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{private readonly IOrderRepository _orderRepository;private readonly IMapper _mapper;private readonly ICapPublisher _eventPublisher;public OrdersController(IOrderRepository orderRepository, IMapper mapper, ICapPublisher eventPublisher){_orderRepository = orderRepository;_mapper = mapper;_eventPublisher = eventPublisher;}[HttpGet]public async Task<ActionResult<IList<OrderVO>>> GetAllOrders(){var orders = await _orderRepository.GetAllOrders();return Ok(_mapper.Map<IList<OrderVO>>(orders));}[HttpGet("id")]public async Task<ActionResult<OrderVO>> GetOrder(string id){var order = await _orderRepository.GetOrder(id);if (order == null)return NotFound();return Ok(_mapper.Map<OrderVO>(order));}[HttpPost]public async Task<ActionResult<OrderVO>> CreateOrder(OrderDTO orderDTO){var order = _mapper.Map<Order>(orderDTO);// 01.生成订单初始数据order.OrderId = SnowflakeGenerator.Instance().GetId().ToString();order.CreatedDate = DateTime.Now;order.Status = OrderStatus.Pending;// 02.订单数据存入MongoDBawait _orderRepository.CreateOrder(order);// 03.发布订单已生成事件消息await _eventPublisher.PublishAsync(name: EventNameConstants.TOPIC_ORDER_SUBMITTED,contentObj: new EventData<NewOrderSubmittedEvent>(new NewOrderSubmittedEvent(order.OrderId, order.ProductId, order.Quantity)),callbackName: EventNameConstants.TOPIC_STOCK_DEDUCTED);return CreatedAtAction(nameof(GetOrder), new { id = order.OrderId }, _mapper.Map<OrderVO>(order));}
}

这里使用了CAP提供的callback机制实现订单状态的修改。其原理就是新建了一个Consumer用于接收库存微服务的新Topic订阅消费。其中,Topic名字定义在了一个常量中。

public class ProductStockDeductedEventService : IProductStockDeductedEventService, ICapSubscribe
{private readonly IOrderRepository _orderRepository;public ProductStockDeductedEventService(IOrderRepository orderRepository){_orderRepository = orderRepository;}[CapSubscribe(name: EventNameConstants.TOPIC_STOCK_DEDUCTED, Group = EventNameConstants.GROUP_STOCK_DEDUCTED)]public async Task MarkOrderStatus(EventData<ProductStockDeductedEvent> eventData){if (eventData == null || eventData.MessageBody == null)return;var order = await _orderRepository.GetOrder(eventData.MessageBody.OrderId);if (order == null)return;if (eventData.MessageBody.IsSuccess){order.Status = OrderStatus.Succeed;// Todo: 一些额外的逻辑}else{order.Status = OrderStatus.Failed;// Todo: 一些额外的逻辑}await _orderRepository.UpdateOrder(order);}
}

这里回调的消费逻辑很简单,就是根据库存扣减的结果更新订单的状态。

编写库存服务

创建一个ASP.NET 5/6 WebAPI项目,引入以下Package:

PM>Install-Package AutoMapper
PM>Install-Package AutoMapper.Extensions.Microsoft.DependencyInjection
PM>Install-Package DotNetCore.CAP
PM>Install-Package DotNetCore.CAP.Kafka
PM>Install-Package DotNetCore.CAP.MongoDB

编写一个Controller用于接收库存查询请求:

public class StocksController : ControllerBase
{private readonly IStockRepository _stockRepository;private readonly IMapper _mapper;private readonly ICapPublisher _eventPublisher;public StocksController(IStockRepository stockRepository, IMapper mapper, ICapPublisher eventPublisher){_stockRepository = stockRepository;_mapper = mapper;_eventPublisher = eventPublisher;}[HttpGet]public async Task<ActionResult<IList<StockVO>>> GetAllStocks(){var stocks = await _stockRepository.GetAllStocks();return Ok(_mapper.Map<IList<StockVO>>(stocks));}[HttpGet("id")]public async Task<ActionResult<StockVO>> GetStock(string id){var stock = await _stockRepository.GetStock(id);if (stock == null)return NotFound();return Ok(_mapper.Map<StockVO>(stock));}[HttpPost]public async Task<ActionResult<StockVO>> CreateStock(StockDTO stockDTO){var stock = _mapper.Map<Stock>(stockDTO);stock.CreatedDate = DateTime.Now;stock.UpdatedDate = stock.CreatedDate;await _stockRepository.CreateStock(stock);return CreatedAtAction(nameof(GetStock), new { id = stock.ProductId }, _mapper.Map<StockVO>(stock));}
}

编写一个Consumer用于消费订单下达事件的消息:

public class NewOrderSubmittedEventService : INewOrderSubmittedEventService, ICapSubscribe
{private readonly IStockRepository _stockRepository;private readonly IMsgTracker _msgTracker;public NewOrderSubmittedEventService(IStockRepository stockRepository, IMsgTracker msgTracker){_stockRepository = stockRepository;_msgTracker = msgTracker;}[CapSubscribe(name: EventNameConstants.TOPIC_ORDER_SUBMITTED, Group = EventNameConstants.GROUP_ORDER_SUBMITTED)]public async Task<EventData<ProductStockDeductedEvent>> DeductProductStock(EventData<NewOrderSubmittedEvent> eventData){// 幂等性保障if(await _msgTracker.HasProcessed(eventData.Id))return null;// 产品Id合法性校验var productStock = await _stockRepository.GetStock(eventData.MessageBody.ProductId);if (productStock == null)return null;// 核心扣减逻辑EventData<ProductStockDeductedEvent> result;if (productStock.StockQuantity - eventData.MessageBody.Quantity >= 0){// 扣减产品实际库存productStock.StockQuantity -= eventData.MessageBody.Quantity;// 提交至数据库await _stockRepository.UpdateStock(productStock);result = new EventData<ProductStockDeductedEvent>(new ProductStockDeductedEvent(eventData.MessageBody.OrderId, true));}else{// Todo: 一些额外的逻辑result = new EventData<ProductStockDeductedEvent>(new ProductStockDeductedEvent(eventData.MessageBody.OrderId, false, "扣减库存失败"));}// 幂等性保障await _msgTracker.MarkAsProcessed(eventData.Id);return result;}
}

在消费逻辑中,会经历幂等性校验、合法性校验、扣减逻辑 和 添加消费记录。最终,会再次发送一个订单扣减完成事件,供订单服务将其作为回调进行消费,也就是更新订单状态。

自定义MsgTracker

在上面的示例代码中,我们自定义了一个MsgTracker消息跟踪器,它是基于Redis实现的,示例代码如下:

public class RedisMsgTracker : IMsgTracker
{private const string KEY_PREFIX = "msgtracker:"; // 默认Key前缀private const int DEFAULT_CACHE_TIME = 60 * 60 * 24 * 3; // 默认缓存时间为3天,单位为秒private readonly IRedisCacheClient _redisCacheClient;public RedisMsgTracker(IRedisCacheClient redisCacheClient){_redisCacheClient = redisCacheClient ?? throw new ArgumentNullException("RedisClient未初始化");}public async Task<bool> HasProcessed(string msgId){var msgRecord = await _redisCacheClient.GetAsync<MsgTrackLog>($"{KEY_PREFIX}{msgId}");if (msgRecord == null)return false;return true;}public async Task MarkAsProcessed(string msgId){var msgRecord = new MsgTrackLog(msgId);await _redisCacheClient.SetAsync($"{KEY_PREFIX}{msgId}", msgRecord, DEFAULT_CACHE_TIME);}
}

在示例代码中,约定了所有服务发送的消息都是EventData类,它接受一个泛型,定义如下:

public class EventData<T> where T : class
{public string Id { get; set; }public T MessageBody { get; set; }public DateTime CreatedDate { get; set; }public EventData(T messageBody)
{MessageBody = messageBody;CreatedDate = DateTime.Now;Id = SnowflakeGenerator.Instance().GetId().ToString();}
}

其中,它自带了一个由雪花算法生成的消息Id用于传递过程中的唯一性,这个Id也被MsgTracker用于幂等性校验。

测试验证

首先,在库存服务里面先查一下各个商品的库存:

83673435e8629fc8fb98d43780606662.png

可以看到商品Id为1003的库存有5个。

其次,在订单服务里面新建一个订单请求,买5个Id为1003的商品:

{"userId": "1002","productId": "1003","quantity": 5
}

提交成功后,查看库存状态:

79f5035950c92af9e174d8b94c577c4d.png

然后再查看订单状态:

b65fe2d2d74672835d1e20ed74c3ce03.png

如果这时再下单Id=1003的商品,订单状态变为-1即Failed:

e919944d3c151c1fbad24973209c3a0d.png

CAP与本地事务集成

在上面的示例代码中,如果订单提交MongoDB成功,但是在发布消息的时候失败了,那么下单逻辑就应该是失败的。这时,我们希望这两个操作可以在一个事务里边进行原子性保障,CAP提供了与本地事务的集成机制,在本地消息表与业务逻辑数据存储为同一个存储类型介质下(如本文例子的MongoDB)可以做到事务的集成。

例如,我们将数据持久化和消息发布/消费重构在一个Service类中进行封装,Controller只需调用即可。

(1)封装OrderService

public class OrderService : IOrderService
{private readonly ICapPublisher _eventPublisher;private readonly IMongoCollection<Order> _orders;private readonly IMongoClient _client;public OrderService(IOrderDatabaseSettings settings, ICapPublisher eventPublisher){_client = new MongoClient(settings.ConnectionString);_orders = _client.GetDatabase(settings.DatabaseName).GetCollection<Order>(settings.OrderCollectionName);_eventPublisher = eventPublisher;}public async Task<IList<Order>> GetAllOrders(){return await _orders.Find(o => true).ToListAsync();}public async Task<Order> GetOrder(string orderId){return await _orders.Find(o => o.OrderId == orderId).FirstOrDefaultAsync();}public async Task CreateOrder(Order order){// 本地事务集成示例using (var session = _client.StartTransaction(_eventPublisher)){// 01.订单数据存入MongoDB_orders.InsertOne(order);// 02.发布订单已生成事件消息_eventPublisher.Publish(name: EventNameConstants.TOPIC_ORDER_SUBMITTED,contentObj: new EventData<NewOrderSubmittedEvent>(new NewOrderSubmittedEvent(order.OrderId, order.ProductId, order.Quantity)),callbackName: EventNameConstants.TOPIC_STOCK_DEDUCTED);// 03.提交事务await session.CommitTransactionAsync();}}public async Task UpdateOrder(Order order){await _orders.ReplaceOneAsync(o => o.OrderId == order.OrderId, order);}
}

(2)Controller修改调用方式

[HttpPost]
public async Task<ActionResult<OrderVO>> CreateOrder(OrderDTO orderDTO)
{var order = _mapper.Map<Order>(orderDTO);// 01.生成订单初始数据order.OrderId = SnowflakeGenerator.Instance().GetId().ToString();order.CreatedDate = DateTime.Now;order.Status = OrderStatus.Pending;// 02.订单数据提交await _orderService.CreateOrder(order);return CreatedAtAction(nameof(GetOrder), new { id = order.OrderId }, _mapper.Map<OrderVO>(order));
}

同理,我们也可以将Consumer端的消费逻辑重构为CAP与本地事务集成,这里不再赘述。

本文示例代码细节:https://github.com/EdisonChou/EDT.EventBus.Sample

End总结

本文介绍了事务补偿与幂等性的基本概念,并基于CAP组件给了一个事务补偿和幂等性保障的DEMO示例,在实际使用中可能还会借助CAP提供的事务能力将数据持久化和发布消息作为一个事务实现原子性,即CAP与本地事务的集成。

希望本文能够对你有所帮助!

参考资料

CAP官方文档,https://cap.dotnetcore.xyz/user-guide/zh/cap

e873cee0bd5a81cd15e3a9b9acd3831c.gif

年终总结:Edison的2021年终总结

数字化转型:我在传统企业做数字化转型

C#刷题:C#刷剑指Offer算法题系列文章目录

.NET面试:.NET开发面试知识体系

.NET大会:2020年中国.NET开发者大会PDF资料

05695dc2260ec3afedf76afcba41eca7.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/283929.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker与k8s

前言 随着k8s 作为容器编排解决方案变得越来越流行&#xff0c;有些人开始拿 Docker 和 k8s进行对比&#xff0c;不禁问道&#xff1a;Docker 不香吗&#xff1f; k8s 是kubernets的缩写&#xff0c;’8‘代表中间的八个字符。 其实 Docker 和 k8s 并非直接的竞争对手&#xff…

Linux下启动tomcat报java.lang.OutOfMemoryError: PermGen space

2019独角兽企业重金招聘Python工程师标准>>> 一、错误信息 java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav…

Redis安装[Windows]

一. redis下载地址: https://github.com/ServiceStack/redis-windows/tree/master/downloads 根据需要的下载对应版本*.zip即可.(我这里是win7x64) 二.使用 1. 下载之后解压到你相应的目录下: 1 文件介绍&#xff1a; 2 redis-benchmark.exe #基准测试 3 redis-check-aof.e…

简练软考知识点整理-项目启动过程组

启动过程组包含定义一个新项目或现有项目的一个新阶段&#xff0c;授权开始该项目或阶段的一组过程。在启动过程中&#xff0c;定义初步范围和落实初步财务资源&#xff0c;识别那些将相互作用并影响项目总体结果的内外部干系人&#xff0c;选定项目经理&#xff08;如果尚未安…

在 ASP.NET Core 中上传文件

简介文件上传是指将媒体文件&#xff08;本地文件或网络文件&#xff09;从客户端上传至服务器存储。ASP.NET Core 支持使用缓冲的模型绑定&#xff08;针对较小文件&#xff09;和无缓冲的流式传输&#xff08;针对较大文件&#xff09;上传一个或多个文件。缓冲和流式传输是上…

Paxos算法详解

Paxos、Raft分布式一致性算法应用场景一文讲述了分布式一致性问题与分布式一致性算法的典型应用场景。作为分布式一致性代名词的Paxos算法号称是最难理解的算法。本文试图用通俗易懂的语言讲述Paxos算法。 一、Paxos算法背景 Paxos算法是Lamport宗师提出的一种基于消息传递的分…

LeetCode 322. Coin Change

原题 You are given coins of different denominations and a total amount of money amount. Write a function to compute the fewest number of coins that you need to make up that amount. If that amount of money cannot be made up by any combination of the coins, …

Teiid:数据虚拟化Data Virtualization平台

2019独角兽企业重金招聘Python工程师标准>>> Teiid介绍 http://teiid.jboss.org/ 数据虚拟化的定义 https://en.wikipedia.org/wiki/Data_virtualization http://www.denodo.com/en/data-virtualization/overview 数据虚拟化的文章 Sick of ETL? Database virtuali…

如何仿造一个websocket请求?

之前两次singnalr、 websocket实时推送相关&#xff1a;• .NET WebSockets 核心原理初体验[1]• SignalR 从开发到生产部署避坑指南[2]tag&#xff1a;浏览器--->nginx--> server其中提到nginx默认不会为客户端转发Upgrade、Connection标头&#xff0c; 因为为了让被代理…

【转】为什么自动车完全不可以犯错误

为什么自动车完全不可以犯错误 有人跟我讲&#xff0c;我对Google的自动车要求太苛刻了。人无完人&#xff0c;所以Google的产品也不需要是完美的&#xff0c;只要“够好用”就有市场。世界上有那么多糟糕的司机&#xff0c;酒后驾车的&#xff0c;开车时发短信的&#xff0c;打…

从“互联网+教育”到“教育+互联网”——互联网文化基因视域下的审思

作者信息 朱敬/广西师范大学教育学部教授&#xff0c;教育学博士&#xff0c;博士生导师&#xff1b; 蔡建东/河南大学教育学部教授&#xff0c;教育学博士。 本文摘要 近年来国务院与教育部文件逐渐使用“教育互联网”一词&#xff0c;从“互联网教育”到“教育互联网”&a…

Node.js Stream - 基础篇

背景 在构建较复杂的系统时&#xff0c;通常将其拆解为功能独立的若干部分。这些部分的接口遵循一定的规范&#xff0c;通过某种方式相连&#xff0c;以共同完成较复杂的任务。譬如&#xff0c;shell通过管道|连接各部分&#xff0c;其输入输出的规范是文本流。 在Node.js中&am…

Axure RP使用攻略--动态面板的用途(8)

写了几个Axure教程之后发现&#xff0c;可能教程的起点有些高了&#xff0c;过分的去讲效果的实现&#xff0c;而忽略了axure功能以及基础元件的使用&#xff0c;那么从这个教程开始&#xff0c;把这些逐渐的展开讲解。 关于动态面板 动态面板是axure原型制作中使用非常频繁的一…

ABP 6.0.0-rc.1的新特性

2022-07-26官方发布ABP 6.0.0-rc.1版本&#xff0c;本文挑选了几个新特性进行了介绍&#xff0c;主要包括LeptonX Lite默认主题、OpenIddict模块&#xff0c;以及如何将Identity Server迁移到OpenIddict。据ABP官方公众号介绍&#xff0c;ABP 6.0.0稳定版的计划发布日期为2022-…

Java并发包--线程池框架

转载请注明出处&#xff1a;http://www.cnblogs.com/skywang12345/p/3509903.html 线程池架构图 线程池的架构图如下&#xff1a; 1. Executor 它是"执行者"接口&#xff0c;它是来执行任务的。准确的说&#xff0c;Executor提供了execute()接口来执行已提交的 Runna…

c 试水解码jpeg图片比特流(已成功解码)

找到一张采用霍夫曼通用DC,AC编码表的图片&#xff0c;提取出此图片的比特流准备对它解码&#xff0c;再反推怎样编码。 下图是此图片比特流前100个字节。解码是每次读一字节&#xff0c;对这8比特解码&#xff0c;如8比特不能解码&#xff0c;再读入一字节。因为霍夫曼表最多…

Raft算法详解

Raft算法属于Multi-Paxos算法&#xff0c;它是在Multi-Paxos思想的基础上&#xff0c;做了一些简化和限制&#xff0c;比如增加了日志必须是连续的&#xff0c;只支持领导者、跟随者和候选人三种状态&#xff0c;在理解和算法实现上都相对容易许多 从本质上说&#xff0c;Raft算…

淘宝弹性布局方案lib-flexible研究

1. lib-flexible不能与响应式布局兼容 先说说响应式布局的一些基本认识&#xff1a; 响应式布局的表现是&#xff1a;网页通过css媒介查询判断可视区域的宽度&#xff0c;在不同的范围应用不同的样式&#xff0c;以便在不同尺寸的设备上呈现最佳的界面效果。典型的例子是&#…

[No0000DB]C# FtpClientHelper Ftp客户端上传下载重命名 类封装

using System; using System.Diagnostics; using System.IO; using System.Text; using Shared;namespace Helpers {public static class FileHelper{#region Methods/// <summary>/// 向文本文件的尾部追加内容/// </summary>/// <param name"filePa…

WPF效果第一百九十四篇之伸缩面板

前面一篇玩耍了一下登录实现效果;今天在原来的基础上来玩耍一下伸缩面板的效果;闲话不多扯直接看效果:1、关于前台简单布局:2、左侧面板伸缩动画&#xff1a;<Storyboard x:Key"ShowConfigSb"><ThicknessAnimationUsingKeyFrames Storyboard.TargetProperty…