28 | 工作单元模式(UnitOfWork):管理好你的事务
工作单元模式有如下几个特性:
1、使用同一上下文
2、跟踪实体的状态
3、保障事务一致性
我们对实体的操作,最终的状态都是应该如实保存到我们的存储中,进行持久化
接下来看一下代码
为了实现工作单元模式,这里定义了一个工作单元的接口
public interface IUnitOfWork : IDisposable
{Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default);
}
这两个方法的区别是:一个是返回的 int 是指我们影响的数据条数,另外一个返回 bool 表示我们保存是否成功,本质上这两个方法达到的效果是相同的
另外还定义了一个事务管理的接口
public interface ITransaction
{// 获取当前事务IDbContextTransaction GetCurrentTransaction();// 判断当前事务是否开启bool HasActiveTransaction { get; }// 开启事务Task<IDbContextTransaction> BeginTransactionAsync();// 提交事务Task CommitTransactionAsync(IDbContextTransaction transaction);// 事务回滚void RollbackTransaction();
}
在实现上我们是借助 EF 来实现工作单元模式的
看一下 EFContext 的定义
/// <summary>
/// DbContext 是 EF 的基类,然后实现了 UnitOfWork 的接口和事务的接口
/// </summary>
public class EFContext : DbContext, IUnitOfWork, ITransaction
{protected IMediator _mediator;ICapPublisher _capBus;// 后面的章节会详细讲到这两个参数public EFContext(DbContextOptions options, IMediator mediator, ICapPublisher capBus) : base(options){_mediator = mediator;_capBus = capBus;}#region IUnitOfWorkpublic async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default){var result = await base.SaveChangesAsync(cancellationToken);//await _mediator.DispatchDomainEventsAsync(this);return true;}可以看到这个方法实际上与上面的方法是相同的,所以这个方法可以不实现//public override Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = default)//{// return base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);//}#endregion#region ITransactionprivate IDbContextTransaction _currentTransaction;// 把当前的事务用一个字段存储public IDbContextTransaction GetCurrentTransaction() => _currentTransaction;// 获取当前的事务就是返回存储的私有对象public bool HasActiveTransaction => _currentTransaction != null;// 事务是否开启是判断当前这个事务是否为空/// <summary>/// 开启事务/// </summary>/// <returns></returns>public Task<IDbContextTransaction> BeginTransactionAsync(){if (_currentTransaction != null) return null;_currentTransaction = Database.BeginTransaction(_capBus, autoCommit: false);return Task.FromResult(_currentTransaction);}/// <summary>/// 提交事务/// </summary>/// <param name="transaction">当前事务</param>/// <returns></returns>public async Task CommitTransactionAsync(IDbContextTransaction transaction){if (transaction == null) throw new ArgumentNullException(nameof(transaction));if (transaction != _currentTransaction) throw new InvalidOperationException($"Transaction {transaction.TransactionId} is not current");try{await SaveChangesAsync();// 将当前所有的变更都保存到数据库transaction.Commit();}catch{RollbackTransaction();throw;}finally{if (_currentTransaction != null){// 最终需要把当前事务进行释放,并且置为空// 这样就可以多次的开启事务和提交事务_currentTransaction.Dispose();_currentTransaction = null;}}}/// <summary>/// 回滚/// </summary>public void RollbackTransaction(){try{_currentTransaction?.Rollback();}finally{if (_currentTransaction != null){_currentTransaction.Dispose();_currentTransaction = null;}}}#endregion
}
另外一个我们还是需要关注的一点就是如何管理我们的事务
这里有一个类 TransactionBehavior,这个类是用来注入我们的事务的管理过程的,具体它是怎么工作的在后续的章节会讲到,这里先关注它的实现过程
public class TransactionBehavior<TDbContext, TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TDbContext : EFContext
{ILogger _logger;TDbContext _dbContext;ICapPublisher _capBus;public TransactionBehavior(TDbContext dbContext, ICapPublisher capBus, ILogger logger){_dbContext = dbContext ?? throw new ArgumentNullException(nameof(dbContext));_capBus = capBus ?? throw new ArgumentNullException(nameof(capBus));_logger = logger ?? throw new ArgumentNullException(nameof(logger));}public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next){var response = default(TResponse);var typeName = request.GetGenericTypeName();try{// 首先判断当前是否有开启事务if (_dbContext.HasActiveTransaction){return await next();}// 定义了一个数据库操作执行的策略,比如说可以在里面嵌入一些重试的逻辑,这里创建了一个默认的策略var strategy = _dbContext.Database.CreateExecutionStrategy();await strategy.ExecuteAsync(async () =>{Guid transactionId;using (var transaction = await _dbContext.BeginTransactionAsync())using (_logger.BeginScope("TransactionContext:{TransactionId}", transaction.TransactionId)){_logger.LogInformation("----- 开始事务 {TransactionId} ({@Command})", transaction.TransactionId, typeName, request);response = await next();// next 实际上是指我们的后续操作,这里的模式有点像之前讲的中间件模式_logger.LogInformation("----- 提交事务 {TransactionId} {CommandName}", transaction.TransactionId, typeName);await _dbContext.CommitTransactionAsync(transaction);transactionId = transaction.TransactionId;}});return response;}catch (Exception ex){_logger.LogError(ex, "处理事务出错 {CommandName} ({@Command})", typeName, request);throw;}}
}
回过头来看一下我们的 EFContext,EFContext 实现 IUnitOfWork,工作单元模式的核心,它实现了事务的管理和工作单元模式,我们就可以借助 EFContext 来实现我们的仓储层