前面对于分布式事务也讲了好几篇了(可靠消息最终一致性分布式事务 - TCC分布式事务 - 2PC、3PC
https://github.com/kklldog/AgileDT 开源不易,大家多多 ✨✨✨
回顾
前面一篇文章(可靠消息最终一致性 )我们详细介绍了基于可靠消息的分布式事务。为了更好的理解 AgileDT 的代码,我们还是有必要简单的来回顾下。
该方案总体流程上可分为以下步骤:
主动方在真正的业务开始前先向可靠消息服务发送一个“待确认”的消息
可靠消息服务收到待确认消息后持久化消息到数据库
如果以上操作成功则主动方开始真正的业务,如果失败则直接放弃执行业务
如果业务执行成功则发送“确认”消息给可靠消息服务,如果执行失败则发送“取消”给可靠消息服务。
如果可靠消息服务收到“确认”消息则更新数据库里的消息记录的状态为“待发送”,如果收到的消息为“取消”则更新消息状态为“已取消”
如果上一步更新的数据库为“待发送”,那么会开始往MQ投递消息,并且更改数据库里的消息记录的状态为“已发送”
上一步往MQ投递消息成功后,MQ会给被动方推送消息。
被动方收到消息后开始处理业务
如果业务处理成功,则被动方对MQ进行ACK回复,则这条消息会从MQ内移除掉
如果业务处理成功,则发送“已完成”消息给可靠消息服务
可靠消息服务收到“已完成”消息后更新数据库消息记录未“已完成”
废话不多说了,下面让我们演示下如何使用 AgileDT 来快速实现一个基于可靠消息的分布式事务。
以下我们还是以经典的订单下单完成给会员赠送积分的场景来演示。
使用 AgileDT
依赖组件
mysql
rabbitmq
目前支持 mysql 数据库,但是数据访问组件使用的是 freesql 所以后续要实现支持别的数据库也很简单。目前框架使用的可靠消息服务为 rabbitmq 。
运行服务端
在服务新建一个数据库并且新建一张表
// crate event_message table on mysql
create table if not exists event_message
(event_id varchar(36) not nullprimary key,biz_msg varchar(4000) null,status enum('Prepare', 'Done', 'WaitSend', 'Sent', 'Finish', 'Cancel') not null,create_time datetime(3) null,event_name varchar(255) null
);
使用docker-compose运行服务端
version: "3" # optional since v1.27.0
services:agile_dt:image: "kklldog/agile_dt"ports:- "5000:5000"environment:- db:provider=mysql- db:conn= Database=agile_dt;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306- mq:userName=admin- mq:password=123456- mq:host=192.168.0.115- mq:port=5672
安装客户端
在主动方跟被动方都需要安装AgileDT的客户端库
Install-Package AgileDT.Client
主动方使用方法
在业务数据库添加事务消息表
// crate event_message table on mysql
create table if not exists event_message
(event_id varchar(36) not nullprimary key,biz_msg varchar(4000) null,status enum('Prepare', 'Done', 'WaitSend', 'Sent', 'Finish', 'Cancel') not null,create_time datetime(3) null,event_name varchar(255) null
);
修改配置文件
在appsettings.json文件添加以下节点:"agiledt": {"server": "http://localhost:5000","db": {"provider": "mysql","conn": "Database=agile_order;Data Source=192.168.0.125;User Id=dev;Password=dev@123f;port=13306"//"conn": "Database=agile_order;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306"},"mq": {"host": "192.168.0.125",//"host": "192.168.0.115","userName": "admin","password": "123456","port": 5672}}
注入 AgileDT 客户端服务
public void ConfigureServices(IServiceCollection services){services.AddAgileDT();...}
实现IEventService方法
处理主动方业务逻辑的类需要实现IEventService接口,并且标记那个方法是真正的业务方法。AgileDT在启动的时候会扫描这些类型,并且使用AOP技术生成代理类,在业务方法前后插入对应的逻辑来跟可靠消息服务通讯。这里要注意的几个地方:
实现IEventService接口
使用DtEventBizMethod注解标记业务入口方法
使用DtEventName注解来标记事务的方法名称,如果不标记则使用类名
注意:业务方法最终一定要使用事务来同步修改消息表的status字段为done状态,这个操作框架没办法帮你实现
注意:业务方法如果失败请抛出Exception,如果不抛异常框架一律认为执行成功
public interface IAddOrderService:IEventService{bool AddOrder(Order order);}[DtEventName("orderservice.order_added")]public class AddOrderService : IAddOrderService{private readonly ILogger<AddOrderService> _logger;public AddOrderService(ILogger<AddOrderService> logger){_logger = logger;}public string EventId { get;set;}[DtEventBizMethod]public virtual bool AddOrder(Order order){var ret = false;//3. 写 Order 跟 修改 event 的状态必选写在同一个事务内FreeSQL.Instance.Ado.Transaction(() =>{order.EventId = EventId;//在订单表新增一个eventid字段,使order跟event_message表关联起来var ret0 = FreeSQL.Instance.Insert(order).ExecuteAffrows();var ret1 = FreeSQL.Instance.Update<OrderService.Data.entities.EventMessage>().Set(x => x.Status, MessageStatus.Done).Where(x => x.EventId == EventId).ExecuteAffrows();ret = ret0 > 0 && ret1 > 0;});return ret;}/// <summary>/// 构造后续业务处理需要的消息内容/// </summary>/// <returns></returns>public string GetBizMsg(){//这里可以构造传递到MQ的业务消息的内容,比如传递订单编号啊 ,以便后续的被动方处理业务时候使用var order = FreeSQL.Instance.Select<Order>().Where(x => x.EventId == EventId).First();return order?.Id;}}
在实现好 IAddOrderService 接口后,你可以像平常一样使用 IAddOrderService 来注入实现类。比如在 Controller 的构造函数注入进去。因为 AgileDT 在启动的时候会自动帮你注册。
注意:IAddOrderService 跟实现类的生命周期是 Scoped 。
被动方使用方法
在业务方数据库建表或者在业务表上加字段
对于被动方来说这里不是必须要建一个表。但是至少要有个地方来存储event_id的信息,最简单的是直接在业务主表上加event_id字段。修改配置文件
在appsettings.json文件添加以下节点:"agiledt": {"server": "http://localhost:5000","db": {"provider": "mysql","conn": "Database=agile_order;Data Source=192.168.0.125;User Id=dev;Password=dev@123f;port=13306"//"conn": "Database=agile_order;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306"},"mq": {"host": "192.168.0.125",//"host": "192.168.0.115","userName": "admin","password": "123456","port": 5672}}
注入AgileDT服务
public void ConfigureServices(IServiceCollection services){services.AddAgileDT();...}
实现IEventMessageHandler接口
被动方需要接收MQ投递过来的消息,这些处理类需要实现IEventMessageHandler接口。AgileDT启动的时候会去扫描这些类,然后跟MQ建立绑定关系。
这里必须使用DtEventName注解标记需要处理的事件名称
Reveive 方法必须是幂等的
public interface IOrderAddedMessageHandler: IEventMessageHandler{}[DtEventName("orderservice.order_added")]public class OrderAddedMessageHandler: IOrderAddedMessageHandler{static object _lock = new object();public bool Receive(EventMessage message){var bizMsg = message.BizMsg;var eventId = message.EventId;string orderId = bizMsg;lock (_lock){var entity = FreeSQL.Instance.Select<PointHistory>().Where(x => x.EventId == eventId).First();if (entity == null){var ret = FreeSQL.Instance.Insert(new PointHistory{Id = Guid.NewGuid().ToString(),EventId = message.EventId,OrderId = orderId,Points = 20,CreateTime = DateTime.Now}).ExecuteAffrows();return ret > 0;}else{return true;}}}}
总结
通过以上演示,我们快速的实现了一个订单下单会员赠送积分的服务。可以看到使用 AgileDT 可以很快速的实现一个分布式事务。特别是在实现过一个分布式事务后,后面实现起来就特别简单,只要实现几个接口就可以了。AgileDT 才刚刚起步,希望大家多多支持,多多✨✨✨ ,多多 PR 。
https://github.com/kklldog/AgileDT
.Net Core with 微服务 - 可靠消息最终一致性分布式事务
.Net Core with 微服务 - 分布式事务 - TCC
.Net Core with 微服务 - 分布式事务 - 2PC、3PC