33 | 集成事件:使用RabbitMQ来实现EventBus
这一节我们来讲解如何通过 CAP 组件和 RabbitMQ 来实现 EventBus
要实现 EventBus,我们这里借助了 RabbitMQ,它的整个安装和使用的体验是非常人性化的,如果是在 Windows 下开发的话,它可以有 Windows 的 installer,也可以在其它的操作系统下安装和使用,当然它也支持 Docker 的模式,我们可以在以下的地址获取到安装包和安装方法的说明
https://www.rabbitmq.com/download.html
另一个就是在 .NET Core 社区比较知名的 CAP 框架,这个框架是由我们国人开发的,它实现了开箱即用的 EventBus 的实现,我们可以通过简单的配置,就能把 RabbitMQ 集成进来,并且实现我们的集成事件的处理
https://github.com/dotnetcore/CAP
我们来看一下 CAP 框架的实现架构
它实际上实现了一个叫 OutBox 的设计模式,就是在我们的每个微服务,比如说微服务 A 的数据库 A,在这个数据库内部它建立了两张表,一张叫 publish 事件表和一张叫 receiver 事件表,这两张事件表用来记录微服务 A 发出的和微服务 A 收到的事件
当我们要发出事件时,我们会把事件的存储逻辑与我们的业务逻辑的事务合并,在同一个事务里提交,也就意味着当我们的业务逻辑提交成功时,我们的事件表里面的事件是一定存在的,它是与我们的业务逻辑的事务是强绑定的
如果说我们的业务逻辑失败了,事务回滚了,这条事件是不会出现在我们的事件表里的,这样子就可以做到说我们要发送的事件一定是与业务逻辑是一致的
接下来由我们组件来负责将事件表里的事件全部都发送到 EventBus,比如说 RabbitMQ 消息队列里面去,由接收方订阅
对于订阅的事件的话,设计的模式也是同理,当我们的应用程序在消息队列获取到信息的时候,它就会将这些消息持久化到我们的数据库的 Receive 事件表里,这样我们就可以在本地进行事件的处理,失败重试等操作,这些都是由 CAP 框架完成的,我们仅需要去做简单的配置,关注发布和订阅的业务逻辑即可
我们看一下代码,刚才有提到 CAP 的架构,关键的一点是需要事件的存储与我们的业务逻辑在同一个事务里,所以说我们在处理事务的逻辑部分的话,需要嵌入 CAP 的一部分代码,我们看一下 EFContext 的定义
public EFContext(DbContextOptions options, IMediator mediator, ICapPublisher capBus) : base(options)
{_mediator = mediator;_capBus = capBus;
}
之前有关注到有一个叫 ICapPublisher 这个入参,关键的是这一行代码我们需要关注一下
_currentTransaction = Database.BeginTransaction(_capBus, autoCommit: false);
这一行代码的作用是创建事务,我们可以看到创建事务的过程中,我们把 ICapPublisher 也传入给了这个方法的构造函数,实际上这个方法是由 CAP 的组件提供的,它的核心作用就是将我们要发送的事件与我们的业务的存储都放在同一个事务内部,这样子我们就可以使得事务提交时或者回滚时,我们的事件与业务逻辑的存取都是一致的
然后我们再来看一下配置的部分,写在 ServiceCollectionExtensions 下面
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{services.AddTransient<ISubscriberService, SubscriberService>();services.AddCap(options =>{options.UseEntityFramework<DomainContext>();options.UseRabbitMQ(options =>{configuration.GetSection("RabbitMQ").Bind(options);});//options.UseDashboard();});return services;
}
我们这里定义了一个 AddEventBus,可以看到将我们之前演示的代码订阅服务注入进来,然后 Services 最重点的代码是 AddCap,我们需要告诉 CAP 框架我们是针对 DomainContext 来实现我们的 EventBus,EventBus 与 DomainContext 共享我们的数据库连接,下面一行代码是指我们要用 RabbitMQ 来作为我们 EventBus 的消息队列的存储,这里可以看到使用了一个 Bind 的方法将我们的配置绑定到 RabbitMQ 的 options 上面去
我们可以看一下我们的配置
"RabbitMQ": {"HostName": "localhost","UserName": "admin","Password": "123456","VirtualHost": "geektime","ExchangeName": "geek_queue"}
这里可以看到我们定义了一个 RabbitMQ 的配置,然后这里面会有我们的 host,因为是本地安装的,所以访问地址就是 localhost,VirtualHost 是 RabbitMQ 一个比较特殊的设置,它的作用是将 RabbitMQ 的空间区分为不同的空间,你可以认为这是一个租户,相同的 VirtualHost,大家都可以认为是一个 RabbitMQ 的集群,最下面的 ExchangeName 就是队列需要订阅的 Exchange 的名称,消息的发布和订阅都是通过这个 Exchange 来的
然后我们在 Startup 这里添加一行
services.AddEventBus(Configuration);
这样我们就配置完成了