ASP.NET Core Web API下事件驱动型架构的实现(三):基于RabbitMQ的事件总线

在上文中,我们讨论了事件处理器中对象生命周期的问题,在进入新的讨论之前,首先让我们总结一下,我们已经实现了哪些内容。下面的类图描述了我们已经实现的组件及其之间的关系,貌似系统已经变得越来越复杂了。

其中绿色的部分就是上文中新实现的部分,包括一个简单的Event Store,一个事件处理器执行上下文的接口,以及一个基于ASP.NET Core依赖注入框架的执行上下文的实现。接下来,我们打算淘汰PassThroughEventBus,然后基于RabbitMQ实现一套新的事件总线。

事件总线的重构

根据前面的结论,事件总线的执行需要依赖于事件处理器执行上下文,也就是上面类图中PassThroughEventBus对于IEventHandlerExecutionContext的引用。更具体些,是在事件总线订阅某种类型的事件时,需要将事件处理器注册到IEventHandlerExecutionContext中。那么在实现RabbitMQ时,也会有着类似的设计需求,即RabbitMQEventBus也需要依赖IEventHandlerExecutionContext接口,以保证事件处理器生命周期的合理性。

为此,我们新建一个基类:BaseEventBus,并将这部分公共的代码提取出来,需要注意以下几点:

  1. 通过BaseEventBus的构造函数传入IEventHandlerExecutionContext实例,也就限定了所有子类的实现中,必须在构造函数中传入IEventHandlerExecutionContext实例,这对于框架的设计非常有利:在实现新的事件总线时,框架的使用者无需查看API文档,即可知道事件总线与IEventHandlerExecutionContext之间的关系,这符合SOLID原则中的Open/Closed Principle

  2. BaseEventBus的实现应该放在EdaSample.Common程序集中,更确切地说,它应该放在EdaSample.Common.Events命名空间下,因为它是属于框架级别的组件,并且不会依赖任何基础结构层的组件

BaseEventBus的代码如下:


public abstract class BaseEventBus : IEventBus
{
    protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext;
    protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext)
    {
        this.eventHandlerExecutionContext = eventHandlerExecutionContext;
    }
    public abstract Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent;
    public abstract void Subscribe<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler<TEvent>;
     
    // Disposable接口实现代码省略
}

在上面的代码中,PublishAsync和Subscribe方法是抽象方法,以便子类根据不同的需要来实现。

接下来就是调整PassThroughEventBus,使其继承于BaseEventBus:


public sealed class PassThroughEventBus : BaseEventBus
{
    private readonly EventQueue eventQueue = new EventQueue();
    private readonly ILogger logger;
    public PassThroughEventBus(IEventHandlerExecutionContext context,
        ILogger<PassThroughEventBus> logger)
        : base(context)
    {
        this.logger = logger;
        logger.LogInformation($"PassThroughEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}.");
        eventQueue.EventPushed += EventQueue_EventPushed;
    }
    private async void EventQueue_EventPushed(object sender, EventProcessedEventArgs e)
        => await this.eventHandlerExecutionContext.HandleEventAsync(e.Event);
    public override Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
    {
        return Task.Factory.StartNew(() => eventQueue.Push(@event));
    }
    public override void Subscribe<TEvent, TEventHandler>()
    {
        if (!this.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>())
        {
            this.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>();
        }
    }
     
    // Disposable接口实现代码省略
}

代码都很简单,也就不多做说明了,接下来,我们开始实现RabbitMQEventBus。

RabbitMQEventBus的实现

首先需要新建一个.NET Standard 2.0的项目,使用.NET Standard 2.0的项目模板所创建的项目,可以同时被.NET Framework 4.6.1或者.NET Core 2.0的应用程序所引用。创建新的类库项目的目的,是因为RabbitMQEventBus的实现需要依赖RabbitMQ C#开发库这个外部引用。因此,为了保证框架核心的纯净和稳定,需要在新的类库项目中实现RabbitMQEventBus。

Note:对于RabbitMQ及其C#库的介绍,本文就不再涉及了,网上有很多资料和文档,博客园有很多朋友在这方面都有使用经验分享,RabbitMQ官方文档也写得非常详细,当然是英文版的,如果英语比较好的话,建议参考官方文档。

以下就是在EdaSample案例中,RabbitMQEventBus的实现,我们先读一读代码,再对这部分代码做些分析。


public class RabbitMQEventBus : BaseEventBus
{
    private readonly IConnectionFactory connectionFactory;
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string exchangeName;
    private readonly string exchangeType;
    private readonly string queueName;
    private readonly bool autoAck;
    private readonly ILogger logger;
    private bool disposed;
    public RabbitMQEventBus(IConnectionFactory connectionFactory,
        ILogger<RabbitMQEventBus> logger,
        IEventHandlerExecutionContext context,
        string exchangeName,
        string exchangeType = ExchangeType.Fanout,
        string queueName = null,
        bool autoAck = false)
        : base(context)
    {
        this.connectionFactory = connectionFactory;
        this.logger = logger;
        this.connection = this.connectionFactory.CreateConnection();
        this.channel = this.connection.CreateModel();
        this.exchangeType = exchangeType;
        this.exchangeName = exchangeName;
        this.autoAck = autoAck;
        this.channel.ExchangeDeclare(this.exchangeName, this.exchangeType);
        this.queueName = this.InitializeEventConsumer(queueName);
        logger.LogInformation($"RabbitMQEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}.");
    }
    public override Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default(CancellationToken))
    {
        var json = JsonConvert.SerializeObject(@event, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
        var eventBody = Encoding.UTF8.GetBytes(json);
        channel.BasicPublish(this.exchangeName,
            @event.GetType().FullName,
            null,
            eventBody);
        return Task.CompletedTask;
    }
    public override void Subscribe<TEvent, TEventHandler>()
    {
        if (!this.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>())
        {
            this.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>();
            this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName);
        }
    }
    protected override void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                this.channel.Dispose();
                this.connection.Dispose();
                logger.LogInformation($"RabbitMQEventBus已经被Dispose。Hash Code:{this.GetHashCode()}.");
            }
            disposed = true;
            base.Dispose(disposing);
        }
    }
    private string InitializeEventConsumer(string queue)
    {
        var localQueueName = queue;
        if (string.IsNullOrEmpty(localQueueName))
        {
            localQueueName = this.channel.QueueDeclare().QueueName;
        }
        else
        {
            this.channel.QueueDeclare(localQueueName, true, false, false, null);
        }
        var consumer = new EventingBasicConsumer(this.channel);
        consumer.Received += async (model, eventArgument) =>
        {
            var eventBody = eventArgument.Body;
            var json = Encoding.UTF8.GetString(eventBody);
            var @event = (IEvent)JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
            await this.eventHandlerExecutionContext.HandleEventAsync(@event);
            if (!autoAck)
            {
                channel.BasicAck(eventArgument.DeliveryTag, false);
            }
        };
        this.channel.BasicConsume(localQueueName, autoAck: this.autoAck, consumer: consumer);
        return localQueueName;
    }
}

阅读上面的代码,需要注意以下几点:

  1. 正如上面所述,构造函数需要接受IEventHandlerExecutionContext对象,并通过构造函数的base调用,将该对象传递给基类

  2. 构造函数中,queueName参数是可选参数,也就是说:

    1. 如果通过RabbitMQEventBus发送事件消息,则无需指定queueName参数,仅需指定exchangeName即可,因为在RabbitMQ中,消息的发布方无需知道消息是发送到哪个队列中

    2. 如果通过RabbitMQEventBus接收事件消息,那么也分两种情况:

      1. 如果两个进程在使用RabbitMQEventBus时,同时指定了queueName参数,并且queueName的值相同,那么这两个进程将会轮流处理路由至queueName队列的消息

      2. 如果两个进程在使用RabbitMQEventBus时,同时指定了queueName参数,但queueName的值不相同,或者都没有指定queueName参数,那么这两个进程将会同时处理路由至queueName队列的消息

    3. 有关Exchange和Queue的概念,请参考RabbitMQ的官方文档

  3. 在Subscribe方法中,除了将事件处理器注册到事件处理器执行上下文之外,还通过QueueBind方法,将指定的队列绑定到Exchange上

  4. 事件数据都通过Newtonsoft.Json进行序列化和反序列化,使用TypeNameHandling.All这一设定,使得序列化的JSON字符串中带有类型名称信息。在此处这样做既是合理的,又是必须的,因为如果没有带上类型名称的信息,JsonConvert.DeserializeObject反序列化时,将无法判定得到的对象是否可以转换为IEvent对象,这样就会出现异常。但如果是实现一个更为通用的消息系统,应用程序派发出去的事件消息可能还会被由Python或者Java所实现的应用程序所使用,那么对于这些应用,它们并不知道Newtonsoft.Json是什么,也无法通过Newtonsoft.Json加入的类型名称来获知事件消息的初衷(Intent),Newtonsoft.Json所带的类型信息又会显得冗余。因此,简单地使用Newtonsoft.Json作为事件消息的序列化、反序列化工具,其实是欠妥的。更好的做法是,实现自定义的消息序列化、反序列化器,在进行序列化的时候,将.NET相关的诸如类型信息等,作为Metadata(元数据)附着在序列化的内容上。理论上说,在序列化的数据中加上一些元数据信息是合理的,只不过我们对这些元数据做一些标注,表明它是由.NET框架产生的,第三方系统如果不关心这些信息,可以对元数据不做任何处理

  5. 在Dispose方法中,注意将RabbitMQ所使用的资源dispose掉

使用RabbitMQEventBus

在Customer服务中,使用RabbitMQEventBus就非常简单了,只需要引用RabbitMQEventBus的程序集,然后在Startup.cs文件的ConfigureServices方法中,替换PassThroughEventBus的使用即可:


public void ConfigureServices(IServiceCollection services)
{
    this.logger.LogInformation("正在对服务进行配置...");
    services.AddMvc();
    services.AddTransient<IEventStore>(serviceProvider =>
        new DapperEventStore(Configuration["mssql:connectionString"],
            serviceProvider.GetRequiredService<ILogger<DapperEventStore>>()));
    var eventHandlerExecutionContext = new EventHandlerExecutionContext(services,
        sc => sc.BuildServiceProvider());
    services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext);
    // services.AddSingleton<IEventBus, PassThroughEventBus>();
    var connectionFactory = new ConnectionFactory { HostName = "localhost" };
    services.AddSingleton<IEventBus>(sp => new RabbitMQEventBus(connectionFactory,
        sp.GetRequiredService<ILogger<RabbitMQEventBus>>(),
        sp.GetRequiredService<IEventHandlerExecutionContext>(),
        RMQ_EXCHANGE,
        queueName: RMQ_QUEUE));
    this.logger.LogInformation("服务配置完成,已注册到IoC容器!");
}

Note:一种更好的做法是通过配置文件来配置IoC容器,在曾经的Microsoft Patterns and Practices Enterprise Library Unity Container中,使用配置文件是很方便的。这样只需要Customer服务能够通过配置文件来配置IoC容器,同时只需要让Customer服务依赖(注意,不是程序集引用)于不同的事件总线的实现即可,无需对Customer服务重新编译。

下面来验证一下效果。首先确保RabbitMQ已经配置并启动妥当,我是安装在本地机器上,使用默认安装。首先启动ASP.NET Core Web API,然后通过Powershell发起两次创建Customer的请求:

查看一下数据库是否更新正常:

并检查一下日志信息:

RabbitMQ中Exchange的信息:

总结

本文提供了一种RabbitMQEventBus的实现,目前来说是够用的,而且这种实现是可以使用在实际项目当中的。在实际使用中,或许也会碰到一些与RabbitMQ本身有关的问题,这就需要具体问题具体分析了。此外,本文没有涉及事件消息丢失、重发然后保证最终一致性的问题,这些内容会在后面讨论。从下文开始,我们着手逐步实现CQRS架构的领域事件和事件存储部分。

源代码的使用

本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,通过不同的release tag来区分针对不同章节的源代码。本文的源代码请参考chapter_3这个tag,如下:

相关文章: 

原文地址:https://www.cnblogs.com/daxnet/p/aspnetcore-eda-part3.html


.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/322178.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

P5024-保卫王国【动态dp,最小覆盖集】

正题 题目链接:https://www.luogu.org/problem/P5024 题目大意 一棵树&#xff0c;每次有要求 axby:a\ x\ b\ y:a x b y:表示aaa点是否必选和bbb点是否必选 然后每次求最小覆盖集。 解题思路 最小覆盖集全集-最大独立集 所以我们每次必选不选就用infinfinf或−inf-inf−inf…

Dubbo(三)之Spring zookeeper集成

转载自 Dubbo快速开始 Dubbo 采用全 Spring 配置方式&#xff0c;透明化接入应用&#xff0c;对应用没有任何 API 侵入&#xff0c;只需用 Spring 加载 Dubbo 的配置即可&#xff0c;Dubbo 基于 Spring 的 Schema 扩展 进行加载。 如果不想使用 Spring 配置&#xff0c;可以…

浅谈开发模式及架构发展

一、传统开发模式传统的开发模式基本一般是重服务端的开发方式&#xff0c;大部分工作都在服务端执行&#xff0c;然后返回到客户端&#xff08;通常是HTML&#xff09;。以Asp.net MVC为例&#xff0c;如下图&#xff1a;#1 根据请求的路由定位到对应的Controller的对应的Acti…

P1613-跑路【Floyd,倍增】

正题 题目链接:https://www.luogu.org/problem/P1613 题目大意 询问111到nnn的路径&#xff0c;每次可以走2n2^n2n条边&#xff0c;求最少次数(可以重复)。 解题思路 定义geti,j,tget_{i,j,t}geti,j,t​表示iii到jjj是否有2t2^t2t的路径。 然后geti,j,tgeti,k,t−1&amp;…

Dubbo(四)之xml配置方式

转载自 Dubbo xml配置方式 以 XML 配置的方式来配置你的 Dubbo 应用 有关 XML 的详细配置项&#xff0c;请参见&#xff1a;配置参考手册。如果不想使用 Spring 配置&#xff0c;而希望通过 API 的方式进行调用&#xff0c;请参见&#xff1a;API配置。想知道如何使用配置&a…

Actor-ES框架:Ray-Handler之CoreHandler编写

如图右上角所示&#xff0c;Ray中有两类Handler&#xff08;SubHandler和PartSubHandler&#xff09;,在使用中&#xff0c;SubHandler派生Actor的CoreHandler&#xff0c;PartSubHandler派生SQLToReadHandler&#xff0c;SQLToReadHandler派生Actor的ToReadHandler&#xff0c…

Dubbo(五)之动态配置中心

转载自 Dubbo动态配置中心 Dubbo 2.7 中的动态配置中心 配置中心&#xff08;v2.7.0&#xff09;在 Dubbo 中承担两个职责&#xff1a; 外部化配置。启动配置的集中式存储 &#xff08;简单理解为 dubbo.properties 的外部化存储&#xff09;。服务治理。服务治理规则的存储…

P4503-[CTSC2014]企鹅QQ【字符串hash】

题目大意 给出nnn个长度为lll且互不相同的串&#xff0c;若两个串只有一个字符不相同那么这两个串相似。 求有多少对相似的串。 解题思路 我们可以枚举不相似的位&#xff0c;然后我们考虑字符串hashhashhash 然后我们可以将删掉了一位的字符串拆分为由[1..k−1][1..k-1][1…

nssl1335-蛋糕切割【数论,GCD】

正题 题目大意 n∗mn*mn∗m的矩阵&#xff0c;求对角线经过多少个格子(经过格子内部才算)。 解题思路 FromZYCdalaoFrom\ ZYCdalaoFrom ZYCdalao的思路:::对于若(n,m)1(n,m)1(n,m)1(互质)则会经过nm−1nm-1nm−1个格子&#xff0c;所以我们可以将n∗mn*mn∗m拆分成gcd(n,m)gcd(…

使用Mono将C#编译运行至WebAssembly平台

因为所有的主流网页浏览器都支持WebAssembly&#xff0c;开发者们现在可以寻找一个新的平台来部署他们的应用程序。由WebAssembly团队提供的标准工具链仅能将C、C编译成为WebAssembly&#xff0c;然而这对使用其他编程语言的开发者们并没有什么帮助。C#开发者就幸运的多了&…

Dubbo(六)之属性配置

转载自 Dubbo属性配置 属性配置 以属性配置的方式来配置你的 Dubbo 应用 如果你的应用足够简单&#xff0c;例如&#xff0c;不需要多注册中心或多协议&#xff0c;并且需要在spring容器中共享配置&#xff0c;那么&#xff0c;我们可以直接使用 dubbo.properties 作为默认…

nssl1336-膜拜神牛【LIS】

正题 题目大意 序列AAA和序列BBB。一个子集SSS使得不存在 Ax≥Ay&amp;Bx≤By(x,y∈S)A_x\geq A_y\ \&amp;\ B_x\leq B_y(x,y\in S)Ax​≥Ay​ & Bx​≤By​(x,y∈S) 求子集最大大小 解题思路 很显然我们可以先排序然后变成LISLISLIS问题。 先按照AAA为第一关键字…

使用Docker分分钟启动常用应用

前言Docker是目前比较火的一个概念&#xff0c;同时也是微服务中比较关键的一个容器化技术。但是&#xff0c;单从理论上好难看出Docker的优势&#xff0c;因此&#xff0c;我希望在这篇文章中提供一些Docker的使用示例&#xff0c;希望从实际应用上帮助大家理解Docker的优势&a…

Dubbo(七)之自动加载环境变量

转载自 自动加载环境变量 在 Dubbo 中自动加载环境变量 从 2.7.3 版本开始&#xff0c;Dubbo 会自动从约定 key 中读取配置&#xff0c;并将配置以 Key-Value 的形式写入到URL中。 支持的 key 有以下两个&#xff1a; dubbo.labels&#xff0c;指定一些列配置到 URL 中的键…

nssl1337-矩形统计【单调栈】

正题 题目大意 一个n∗nn*nn∗n的矩阵&#xff0c;然后有些位置破损。求可以剪出多少个不破损的矩形。 解题思路 预处理upi,jup_{i,j}upi,j​表示从(i,j)(i,j)(i,j)向上多少格子都是非破损格子。 然后我们枚举下界LowLowLow&#xff0c;将图像变成一个下部平整的条形图&…

TypeScript 2.7 版本发布

TypeScript 2.7版本已经发布了&#xff0c;新增了几个主要功能特性并进行了一些bug的修正。其中一些亮点包括对类属性的赋值检查、固定长度的元组和改进对象文字的类型推断。总的来说&#xff0c;这个版本对类型系统、ES2015特性和总体的TypeScript开发者体验都进行了改进优化。…

Dubbo(八)之API 配置

转载自 DubboAPI 配置 以API 配置的方式来配置你的 Dubbo 应用 API 属性与配置项一对一&#xff0c;各属性含义&#xff0c;请参见&#xff1a;配置参考手册&#xff0c;比如&#xff1a;ApplicationConfig.setName("xxx") 对应 <dubbo:application name"…

nssl1338-逃亡路径【最短路计数,bfs】

正题 题目大意 n∗mn*mn∗m的格子&#xff0c;一个走"日"字形的马&#xff0c;从(1,1)(1,1)(1,1)走到(n,m)(n,m)(n,m)的最短路条数。 解题思路 模板最短路计数改一下即可 当然因为边权都为1所以这里改成了bfsbfsbfs codecodecode #include<cstdio> #include&…

Azure Functions + Azure Batch实现MP3音频转码方案

客户需求客户的环境是一个网络音乐播放系统&#xff0c;根据网络情况提供给手机用户收听各种码率的MP3歌曲&#xff0c;在客户没购买歌曲的情况下提供一个三十秒内的试听版本。这样一个系统非常明确地一个需求就是会定期需要将一批从音乐版商手中获取到的高比特率音乐文件转换成…

Springboot Mybatis多数据源配置MybatisProperties坑

一、场景复现 配置了两个数据源&#xff0c;查询Dao却报错表不存在。 &#xff08;1&#xff09;maven <dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId> </dependency> …