33 | 集成事件:使用RabbitMQ来实现EventBus
为了演示我们的发布和订阅的话,我们在这里的代码做一些稍微的调整
namespace GeekTime.API.Application.DomainEventHandlers
{public class OrderCreatedDomainEventHandler : IDomainEventHandler<OrderCreatedDomainEvent>{ICapPublisher _capPublisher;public OrderCreatedDomainEventHandler(ICapPublisher capPublisher){_capPublisher = capPublisher;}public async Task Handle(OrderCreatedDomainEvent notification, CancellationToken cancellationToken){await _capPublisher.PublishAsync("OrderCreated", new OrderCreatedIntegrationEvent(notification.Order.Id));}}
}
这里我们发布了一个 OrderCreated 的集成事件,然后订阅一个 OrderCreated
namespace GeekTime.API.Application.IntegrationEvents
{public class SubscriberService : ISubscriberService, ICapSubscribe{IMediator _mediator;public SubscriberService(IMediator mediator){_mediator = mediator;}[CapSubscribe("OrderPaymentSucceeded")]public void OrderPaymentSucceeded(OrderPaymentSucceededIntegrationEvent @event){//Do SomeThing}[CapSubscribe("OrderCreated")]public void OrderCreated(OrderCreatedIntegrationEvent @event){//Do SomeThing}}
}
通过标注属性,我们就可以完成订阅
也就是说我们创建一个订单的时,我们会触发订单创建的领域事件,订单创建的领域事件又发送了一个订单创建的集成事件,然后我们在订阅服务里面订阅了订单创建的集成事件
在发布和订阅的地方分别打上一个断点,启动程序,可以看到整个流程
我们再梳理一下整个流程,首先我们创建了一个订单,这个订单触发了我们的 OrderCreated 的领域事件,OrderCreated 的领域事件的处理器像我们的 EventBus 发布了一个 OrderCreated 的集成事件,我们在订阅服务的地方订阅了这个事件,所以我们可以接收到并且做出相应的处理
我们观察一下数据库的表,一共有四张表,cap.publish 和 cap.received 这两张表分别对应发送事件表和接收事件表,order 和 user 这两张表是我们的领域模型表
整个 CAP 的框架,它的实现原理其实有两个关键点,一个是事件表,一个就是事务控制,也就是说将事件的存储嵌入到我们的业务逻辑的事务里面去,这样子我们就可以保证我们的业务与事件是要么都能存储成功,要么都失败
整个 CAP 框架它的应用性是非常强的,非常建议在处理集成事件的时候使用这个框架