ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

在ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现中,我介绍了事件驱动型架构的一种简单的实现,并演示了一个完整的事件派发、订阅和处理的流程。这种实现太简单了,百十行代码就展示了一个基本工作原理。然而,要将这样的解决方案运用到实际生产环境,还有很长的路要走。今天,我们就研究一下在事件处理器中,对象生命周期的管理问题。

事实上,不仅仅是在事件处理器中,我们需要关心对象的生命周期,在整个ASP.NET Core Web API的应用程序里,我们需要理解并仔细推敲被注册到IoC容器中的服务,它们的生命周期应该是个怎样的情形,这也是服务端应用程序设计必须认真考虑的内容。因为如果生命周期管理不合理,程序申请的资源无法合理释放,最后便会带来内存泄漏、程序崩溃等各种问题,然而这样的问题对于服务端应用程序来说,是非常严重的。

记得在上一篇文章的结束部分,我给大家留下一个练习,就是让大家在CustomerCreatedEventHandler事件处理器的HandleAsync方法中,填入自己的代码,以便对获得的事件消息做进一步的处理。作为本文的引子,我们首先将这部分工作做完,然后再进一步分析生命周期的问题。

Event Store

Event Store是CQRS体系结构模式中最为重要的一个组成部分,它的主要职责就是保存发生于领域模型中的领域事件,并对事件数据进行归档。当仓储需要获取领域模型对象时,Event Store也会配合快照数据库一起,根据领域事件的发生顺序,逐步回放并重塑领域模型对象。事实上,Event Store的实现是非常复杂的,虽然从它的职责上来看并不算太复杂,然而它所需要解决的事件同步、快照、性能、消息派发等问题,使得CQRS体系结构的实现变得非常复杂。在实际应用中,已经有一些比较成熟的框架和工具集,能够帮助我们在CQRS中很方便地实现Event Store,比如GetEventStore就是一个很好的开源Event Store框架,它是基于.NET开发的,在微软官方的eShopOnContainers说明文档中,也提到了这个框架,推荐大家上他们的官网(https://eventstore.org/)了解一下。在这里我们就先不深入研究Event Store应该如何实现,我们先做一个简单的Event Store,以便展示我们需要讨论的问题。

延续着上一版的代码库(https://github.com/daxnet/edasample/tree/chapter_1),我们首先在EdaSample.Common.Events命名空间下,定义一个IEventStore的接口,这个接口非常简单,仅仅包含一个保存事件的方法,代码如下:


public interface IEventStore : IDisposable
{
    Task SaveEventAsync<TEvent>(TEvent @event)
        where TEvent : IEvent;
}

SaveEventAsync方法仅有一个参数:由泛型类型TEvent绑定的@event对象。泛型约束表示SaveEventAsync方法仅能接受IEvent接口及其实现类型的对象作为参数传入。接口定义好了,下一步就是实现这个接口,对传入的事件对象进行保存。为了实现过程的简单,我们使用Dapper,将事件数据保存到SQL Server数据库中,来模拟Event Store对事件的保存操作。

Note:为什么IEventStore接口的SaveEventAsync方法签名中,没有CancellationToken参数?严格来说,支持async/await异步编程模型的方法定义上,是需要带上CancellationToken参数的,以便调用方请求取消操作的时候,方法内部可以根据情况对操作进行取消。然而有些情况下取消操作并不是那么合理,或者方法内部所使用的API并没有提供更深层的取消支持,因此也就没有必要在方法定义上增加CancellationToken参数。在此处,为了保证接口的简单,没有引入CancellationToken的参数。

接下来,我们实现这个接口,并用Dapper将事件数据保存到SQL Server中。出于框架设计的考虑,我们新建一个Net Standard Class Library项目,在这个新的项目中实现IEventStore接口,这么做的原因已经在上文中介绍过了。代码如下:


public class DapperEventStore : IEventStore
{
    private readonly string connectionString;
    public DapperEventStore(string connectionString)
    {
        this.connectionString = connectionString;
    }
    public async Task SaveEventAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        const string sql = @"INSERT INTO [dbo].[Events]
([EventId], [EventPayload], [EventTimestamp])
VALUES
(@eventId, @eventPayload, @eventTimestamp)";
        using (var connection = new SqlConnection(this.connectionString))
        {
            await connection.ExecuteAsync(sql, new
            {
                eventId = @event.Id,
                eventPayload = JsonConvert.SerializeObject(@event),
                eventTimestamp = @event.Timestamp
            });
        }
    }
    #region IDisposable Support
    // 此处省略
    #endregion
}

IDisposable接口的实现部分暂且省略,可以看到,实现还是非常简单的:通过构造函数传入数据库的连接字符串,在SaveEventAsyc方法中,基于SqlConnection对象执行Dapper的扩展方法来完成事件数据的保存。

Note: 此处使用了JsonConvert.SerializeObject方法来序列化事件对象,也就意味着DapperEventStore程序集需要依赖Newtonsoft.Json程序集。虽然在我们此处的案例中不会有什么影响,但这样做会造成DapperEventStore对Newtonsoft.Json的强依赖,这样的依赖关系不仅让DapperEventStore变得不可测试,而且Newtonsoft.Json将来未知的变化,也会影响到DapperEventStore,带来一些不确定性和维护性问题。更好的做法是,引入一个IMessageSerializer接口,在另一个新的程序集中使用Newtonsoft.Json来实现这个接口,同时仅让DapperEventStore依赖IMessageSerializer,并在应用程序启动时,将Newtonsoft.Json的实现注册到IoC容器中。此时,IMessageSerializer可以被Mock,DapperEventStore就变得可测试了;另一方面,由于只有那个新的程序集会依赖Newtonsoft.Json,因此,Newtonsoft.Json的变化也仅仅会影响那个新的程序集,不会对框架主体的其它部分造成任何影响。

EventStore实现好了,接下来,我们将其用在CustomerCreatedEventHandler中,以便将订阅的CustomerCreatedEvent保存下来。

事件数据的保存

保存事件数据的第一步,就是在ASP.NET Core Web API的IoC容器中,将DapperEventStore注册进去。这一步是非常简单的,只需要在Startup.cs的ConfigureServices方法中完成即可。代码如下:


public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();
    services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();
    services.AddTransient<IEventStore>(serviceProvider => new DapperEventStore(Configuration["mssql:connectionString"]));
    services.AddSingleton<IEventBus, PassThroughEventBus>();
}

注意我们使用的是services.AddTransient方法来注册DapperEventStore,我们希望应用程序在每次请求IEventStore实例时,都能获得一个新的DapperEventStore的实例。

接下来,打开CustomerCreatedEventHandler.cs文件,在构造函数中加入对IEventStore的依赖,然后修改HandleAsync方法,在该方法中使用IEventStore的实例来完成事件数据的保存。代码如下:



public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent>
{
    private readonly IEventStore eventStore;
    public CustomerCreatedEventHandler(IEventStore eventStore)
    {
        this.eventStore = eventStore;
    }
    public bool CanHandle(IEvent @event)
        => @event.GetType().Equals(typeof(CustomerCreatedEvent));
    public async Task<bool> HandleAsync(CustomerCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        await this.eventStore.SaveEventAsync(@event);
        return true;
    }
    public Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default)
        => CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false);
}

OK,代码修改完毕,测试一下。

看看数据库中客户信息是否已经创建:

看看数据库中事件数据是否已经保存成功:

OK,数据全部保存成功。

然而,事情真的就这么简单么?No。在追踪了IEventStore实例(也就是DapperEventStore)的生命周期后,你会发现,问题没有想象的那么简单。

追踪对象的生命周期

在使用services.AddTransient/AddScoped/AddSingleton/AddScoped这些方法对服务进行注册时,使用不同的方法也就意味着选择了不同的对象生命周期。在此我们也不再深入讨论每种方法之间的差异,微软官方有详细的文档和demo(抱歉我没有贴出中文链接,因为机器翻译的缘故,实在有点不堪入目),如果对ASP.NET Core的IoC容器不熟悉的话,建议先了解一下官网文章的内容。在上面我稍微提了一下,我们是用AddTransient方法来注册DapperEventStore的,因为我们希望在每次使用IEventStore的时候,都会有一个新的DapperEventStore被创建。现在,让我们来验证一下,看情况是否果真如此。

日志的使用

追踪程序执行的最有效的方式就是使用日志。在我们的场景中,使用基于文件的日志会更合适,因为这样我们可以更清楚地看到程序的执行过程以及对象的变化过程。同样,我不打算详细介绍如何在ASP.NET Core Web API中使用日志,微软官网同样有着非常详尽的文档来介绍这些内容。在这里,我简要地将相关代码列出来,以介绍如何启用基于文件的日志系统。

首先,在Web API服务的项目上,添加对Serilog.Extensions.Logging.File的nuget包,使用它能够非常方便地启用基于文件的日志。然后,打开Program.cs文件,添加ConfigureLogging的调用:


public static IWebHost BuildWebHost(string[] args) =>
    WebHost.CreateDefaultBuilder(args)
        .ConfigureLogging((context, lb) =>
        {
            lb.AddFile(LogFileName);
        })
        .UseStartup<Startup>()
        .Build();

此处LogFileName为本地文件系统中的日志文件文件名,为了避免权限问题,我将日志写入C:\Users\<user>\appdata\local目录下,因为我的Web API进程是由当前登录用户启动的,所以写在这个目录下不会有权限问题。如果今后我们把Web API host在IIS中,那么启动IIS服务的用户需要对日志所在的目录具有写入的权限,日志文件才能被正确写入,这一点是需要注意的。

好了,现在可以使用日志了,先试试看。在Startup类的构造函数中,加入ILoggerFactory参数,并在构造函数执行时获取ILogger实例,然后在ConfigureServices调用中输出一些内容:


public class Startup
{
    private readonly ILogger logger;
    public Startup(IConfiguration configuration, ILoggerFactory loggerFactory)
    {
        Configuration = configuration;
        this.logger = loggerFactory.CreateLogger<Startup>();
    }
    public IConfiguration Configuration { get; }
    public void ConfigureServices(IServiceCollection services)
    {
        this.logger.LogInformation("正在对服务进行配置...");
        services.AddMvc();
        services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();
        services.AddTransient<IEventStore>(serviceProvider =>
            new DapperEventStore(Configuration["mssql:connectionString"]));
        services.AddSingleton<IEventBus, PassThroughEventBus>();
        this.logger.LogInformation("服务配置完成,已注册到IoC容器!");
    }
    // 其它方法暂时省略
}

现在重新启动服务,然后查看日志文件,发现日志可以被正确输出:

接下来,使用类似的方式,向PassThroughEventBus的构造函数和Dispose方法中加入一些日志输出,在CustomersController的Create方法中、CustomerCreatedEventHandler的构造函数和HandleAsync方法中、DapperEventStore的构造函数和Dispose方法中也加入一些日志输出,以便能够观察当新的客户信息被创建时,Web API的执行过程。限于文章篇幅,就不在此一一贴出各方法中加入日志输出的代码了,大家可以根据本文最后所提供的源代码链接来获取源代码。简单地举个例子吧,比如对于DapperEventStore,我们通过构造函数注入ILogger的实例:


public class DapperEventStore : IEventStore
{
    private readonly string connectionString;
    private readonly ILogger logger;
    public DapperEventStore(string connectionString,
        ILogger<DapperEventStore> logger)
    {
        this.connectionString = connectionString;
        this.logger = logger;
        logger.LogInformation($"DapperEventStore构造函数调用完成。Hash Code:{this.GetHashCode()}.");
    }
    // 其它函数省略
}

这样一来,在DapperEventStore的其它方法中,就可以通过logger来输出日志了。

发现问题

同样,再次运行Web API,并通过Powershell发起一次创建客户信息的请求,然后打开日志文件,整个程序的执行过程基本上就一目了然了:

从上面的日志内容可以得知,当应用程序正常退出时,由IoC容器托管的PassThroughEventBus和DapperEventStore都能够被正常Dispose,目前看来没什么问题,因为资源可以正常释放。现在让我们重新启动Web API,连续发送两次创建客户信息的请求,再次查看日志,我们得到了下面的内容:

从上面的日志内容可以看到,在Web API的整个运行期间,CustomerCreatedEventHandler仅被构造了一次,而且在每次处理CustomerCreatedEvent事件的时候,都是使用同一个DapperEventStore实例来保存事件数据。也就是说,CustomerCreatedEventHandler和DapperEventStore在整个Web API服务的生命周期中,有且仅有一个实例,它们是Singleton的!然而,在进行系统架构的时候,我们应该尽量保证较短的对象生命周期,以免因为状态的不一致性导致不可回滚的错误出现,这也是架构设计中的一种最佳实践。虽然目前我们的DapperEventStore在程序正常退出的时候能够被Dispose掉,但如果DapperEventStore使用了非托管资源,并且非托管资源并没有很好地管理自己的内存呢?久而久之,DapperEventStore就产生了内存泄漏点,慢慢地,Web API就会出现内存泄漏,系统资源将被耗尽。假如Web API被部署在云中,应用程序监控装置(比如AWS的Cloud Watch)就会持续报警,并强制服务断线,整个系统的可用性就无法得到保障。所以,我们更期望DapperEventStore能够正确地实现C#的Dispose模式,在Dispose方法中合理地释放资源,并且仅在需要使用DapperEventStore时候才去构建它,用完就及时Dispose,以保证资源的合理使用。这也就是为什么我们使用services.AddTransient方法来注册CustomerCreatedEventHandler以及DapperEventStore的原因。

然而,事实却并非如此。究其原因,就是因为PassThroughEventBus是单例实例,它的生命周期是整个Web API服务。而在PassThroughEventBus的构造函数中,CustomerCreatedEventHandler被作为参数传入,于是,PassThroughEventBus产生了对CustomerCreatedEventHandler的依赖,而连带地也产生了对DapperEventStore的依赖。换句话说,在整个应用程序运行的过程中,IoC框架完全没有理由再去创建新的CustomerCreatedEventHandler以及DapperEventStore的实例,因为事件处理器作为强引用被注册到PassThroughEventBus中,而PassThroughEventBus至始至终没有变过!

Note:为什么PassThroughEventBus可以作为单例注册到IoC容器中?因为它提供了无状态的全局性的基础结构层服务:事件总线。在PassThroughEventBus的实现中,这种全局性体现得不明显,我们当然可以每一次HTTP请求都创建一个新的PassThroughEventBus来转发事件消息并作处理。然而,在今后我们要实现的基于RabbitMQ的事件总线中,如果我们还是每次HTTP请求都创建一个新的消息队列,不仅性能得不到保证,而且消息并不能路由到新创建的channel上。注意:我们将其注册成单例,一个很重要的依据是由于它是无状态的,但即使如此,我们也要注意在应用程序退出的时候,合理Dispose掉它所占用的资源。当然,在这里,ASP.NET Core的IoC机制会帮我们解决这个问题(因为我注册了PassThroughEventBus,但我没有显式调用Dispose方法,我仍然能从日志中看到“PassThroughEventBus已经被Dispose”的字样),然而有些情况下,ASP.NET Core不会帮我们做这些,就需要我们自己手工完成。

OMG!由于构造函数注入,使得对象之间产生了依赖关系,从而影响到了它们的生命周期,这可怎么办?既然问题是由依赖引起的,那么就需要想办法解耦。

解耦!解决事件处理器对象生命周期问题

经过分析,我们需要解除PassThroughEventBus对各种EventHandler的直接依赖。因为PassThroughEventBus是单例的,那么由它引用的所有组件也只可能具有相同的生命周期。然而,这样的解耦又该如何做呢?将EventHandler封装到另一个类中?结果还是一样,PassThroughEventBus总会通过某种对象关系,来间接引用到EventHandler上,造成EventHandler全局唯一。

或许,应该要有另一套生命周期管理体系来管理EventHandler的生命周期,使得每当PassThroughEventBus需要使用EventHandler对所订阅的事件进行处理的时候,都会通过这套体系来请求新的EventHandler实例,这样一来,PassThroughEventBus也就不再依赖于某个特定的实例了,而仅仅是引用了各种EventHandler在新的生命周期管理体系中的注册信息。每当需要的时候,PassThroughEventBus都会将事件处理器的注册信息传给新的管理体系,然后由这套新的体系来维护事件处理器的生命周期。

通过阅读微软官方的eShopOnContainers案例代码后,证实了这一想法。在案例中,有如下代码:



// namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
private async Task ProcessEvent(string eventName, string message)
{
    if (_subsManager.HasSubscriptionsForEvent(eventName))
    {
        using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
        {
            var subscriptions = _subsManager.GetHandlersForEvent(eventName);
            foreach (var subscription in subscriptions)
            {
                if (subscription.IsDynamic)
                {
                    var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                    dynamic eventData = JObject.Parse(message);
                    await handler.Handle(eventData);
                }
                else
                {
                    var eventType = _subsManager.GetEventTypeByName(eventName);
                    var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                    var handler = scope.ResolveOptional(subscription.HandlerType);
                    var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                    await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                }
            }
        }
    }
}

可以看到,高亮的这一行,通过Autofac创建了一个新的LifetimeScope,在这个Scope中,通过eventName来获得一个subscription对象(也就是EventHandler的注册信息),进而通过scope的ResolveOptional调用来获得新的EventHandler实例。基本过程就是这样,目前也不需要纠结IDynamicIntegrationEventHandler是干什么用的,也不需要纠结为什么要使用dynamic来保存事件数据。重点是,autofac的BeginLifetimeScope方法调用创建了一个新的IoC Scope,在这个Scope中解析(resolve)了新的EventHandler实例。在eShopOnContainer案例中,EventBusRabbitMQ的设计是特定的,必须依赖于Autofac作为依赖注入框架。或许这部分设计可以进一步改善,使得EventBusRabbitMQ不会强依赖于Autofac。

接下来,我们会引入一个新的概念:事件处理器执行上下文,使用类似的方式来解决对象生命周期问题。

事件处理器执行上下文

事件处理器执行上下文(Event Handler Execution Context, EHEC)为事件处理器提供了一个完整的生命周期管理机制,在这套机制中,事件处理器及其引用的对象资源可以被正常创建和正常销毁。现在让我们一起看看,如何在EdaSample的案例代码中使用事件处理器执行上下文。

事件处理器执行上下文的接口定义如下,当然,这部分接口是放在EdaSample.Common.Events目录下,作为消息系统的框架代码提供给调用方:


public interface IEventHandlerExecutionContext
{
    void RegisterHandler<TEvent, THandler>()
        where TEvent : IEvent
        where THandler : IEventHandler<TEvent>;
    void RegisterHandler(Type eventType, Type handlerType);
    bool HandlerRegistered<TEvent, THandler>()
        where TEvent : IEvent
        where THandler : IEventHandler<TEvent>;
    bool HandlerRegistered(Type eventType, Type handlerType);
    Task HandleEventAsync(IEvent @event, CancellationToken cancellationToken = default);
}

这个接口主要包含三种方法:注册事件处理器、判断事件处理器是否已经注册,以及对接收到的事件消息进行处理。整个结构还是非常清晰简单的。现在需要实现这个接口。根据上面的分析,这个接口的实现是需要依赖于IoC容器的,目前简单起见,我们仅使用微软ASP.NET Core标准的Dependency Injection框架来实现,当然,也可以使用Autofac,取决于你怎样去实现上面这个接口。需要注意的是,由于该接口的实现是需要依赖于第三方组件的(在这里是微软的Dependency Injection框架),因此,最佳做法是新建一个类库,并引用EdaSample.Common程序集,并在这个新的类库中,依赖Dependency Injection框架来实现这个接口。

以下是基于Microsoft.Extensions.DependencyInjection框架来实现的事件处理器执行上下文完整代码,这里有个兼容性问题,就是构造函数的第二个参数:serviceProviderFactory。在Microsoft.Extensions.DependencyInjection框架2.0版本之前,IServiceCollection.BuildServiceProvider方法的返回类型是IServiceProvider,但从2.0开始,它的返回类型已经从IServiceProvider接口,变成了ServiceProvider类。这里引出了框架设计的另一个原则,就是依赖较低版本的.NET Core,以便获得更好的兼容性。如果我们的EdaSample是使用.NET Core 1.1开发的,那么当下面这个类被直接用在ASP.NET Core 2.0的项目中时,如果不通过构造函数参数传入ServiceProvider创建委托,而是直接在代码中使用registry.BuildServiceProvider调用,就会出现异常。



public class EventHandlerExecutionContext : IEventHandlerExecutionContext
{
    private readonly IServiceCollection registry;
    private readonly Func<IServiceCollection, IServiceProvider> serviceProviderFactory;
    private readonly ConcurrentDictionary<Type, List<Type>> registrations = new ConcurrentDictionary<Type, List<Type>>();
    public EventHandlerExecutionContext(IServiceCollection registry,
        Func<IServiceCollection, IServiceProvider> serviceProviderFactory = null)
    {
        this.registry = registry;
        this.serviceProviderFactory = serviceProviderFactory ?? (sc => registry.BuildServiceProvider());
    }
    public async Task HandleEventAsync(IEvent @event, CancellationToken cancellationToken = default(CancellationToken))
    {
        var eventType = @event.GetType();
        if (this.registrations.TryGetValue(eventType, out List<Type> handlerTypes) &&
            handlerTypes?.Count > 0)
        {
            var serviceProvider = this.serviceProviderFactory(this.registry);
            using (var childScope = serviceProvider.CreateScope())
            {
                foreach(var handlerType in handlerTypes)
                {
                    var handler = (IEventHandler)childScope.ServiceProvider.GetService(handlerType);
                    if (handler.CanHandle(@event))
                    {
                        await handler.HandleAsync(@event, cancellationToken);
                    }
                }
            }
        }
    }
    public bool HandlerRegistered<TEvent, THandler>()
        where TEvent : IEvent
        where THandler : IEventHandler<TEvent>
        => this.HandlerRegistered(typeof(TEvent), typeof(THandler));
    public bool HandlerRegistered(Type eventType, Type handlerType)
    {
        if (this.registrations.TryGetValue(eventType, out List<Type> handlerTypeList))
        {
            return handlerTypeList != null && handlerTypeList.Contains(handlerType);
        }
        return false;
    }
    public void RegisterHandler<TEvent, THandler>()
        where TEvent : IEvent
        where THandler : IEventHandler<TEvent>
        => this.RegisterHandler(typeof(TEvent), typeof(THandler));
    public void RegisterHandler(Type eventType, Type handlerType)
    {
        Utils.ConcurrentDictionarySafeRegister(eventType, handlerType, this.registrations);
        this.registry.AddTransient(handlerType);
    }
}

好了,事件处理器执行上下文就定义好了,接下来就是在我们的ASP.NET Core Web API中使用。为了使用IEventHandlerExecutionContext,我们需要修改事件订阅器的接口定义,并相应地修改PassThroughEventBus以及Startup.cs。代码如下:



// IEventSubscriber
public interface IEventSubscriber : IDisposable
{
    void Subscribe<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler<TEvent>;
}
// PassThroughEventBus
public sealed class PassThroughEventBus : IEventBus
{
    private readonly EventQueue eventQueue = new EventQueue();
    private readonly ILogger logger;
    private readonly IEventHandlerExecutionContext context;
    public PassThroughEventBus(IEventHandlerExecutionContext context,
        ILogger<PassThroughEventBus> logger)
    {
        this.context = context;
        this.logger = logger;
        logger.LogInformation($"PassThroughEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}.");
        eventQueue.EventPushed += EventQueue_EventPushed;
    }
    private async void EventQueue_EventPushed(object sender, EventProcessedEventArgs e)
        => await this.context.HandleEventAsync(e.Event);
    public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
        where TEvent : IEvent
            => Task.Factory.StartNew(() => eventQueue.Push(@event));
    public void Subscribe<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler<TEvent>
    {
        if (!this.context.HandlerRegistered<TEvent, TEventHandler>())
        {
            this.context.RegisterHandler<TEvent, TEventHandler>();
        }
    }
    #region IDisposable Support
    private bool disposedValue = false; // To detect redundant calls
    void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                this.eventQueue.EventPushed -= EventQueue_EventPushed;
                logger.LogInformation($"PassThroughEventBus已经被Dispose。Hash Code:{this.GetHashCode()}.");
            }
            disposedValue = true;
        }
    }
    public void Dispose() => Dispose(true);
    #endregion
}
// Startup.cs
public void ConfigureServices(IServiceCollection services)
{
    this.logger.LogInformation("正在对服务进行配置...");
    services.AddMvc();
    services.AddTransient<IEventStore>(serviceProvider =>
        new DapperEventStore(Configuration["mssql:connectionString"],
            serviceProvider.GetRequiredService<ILogger<DapperEventStore>>()));
    var eventHandlerExecutionContext = new EventHandlerExecutionContext(services,
        sc => sc.BuildServiceProvider());
    services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext);
    services.AddSingleton<IEventBus, PassThroughEventBus>();
    this.logger.LogInformation("服务配置完成,已注册到IoC容器!");
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
    eventBus.Subscribe<CustomerCreatedEvent, CustomerCreatedEventHandler>();
    if (env.IsDevelopment())
    {
        app.UseDeveloperExceptionPage();
    }
    app.UseMvc();
}

代码修改完成后,再次执行Web API,并发送两次(或多次)创建客户的请求,然后查看日志,我们发现,每次请求都会使用新的事件处理器去处理接收到的消息,在保存消息数据时,会使用新的DapperEventStore来保存数据,而保存完成后,会及时将DapperEventStore dispose掉:

小结

本文篇幅比较长,或许你没有太多耐心将文章读完。但我尽量将问题分析清楚,希望提供给读者的内容是详细的、有理有据的。文章中黑体部分是在设计过程中的一些思考和需要注意的地方,希望能够给读者在工作和学习之中带来启发和收获。总而言之,对象生命周期的管理,在服务端应用程序中是非常重要的,需要引起足够的重视。在下文中,我们打算逐步摆脱PassThroughEventBus,基于RabbitMQ来实现消息总线的基础结构。

源代码的使用

本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,通过不同的release tag来区分针对不同章节的源代码。本文的源代码请参考chapter_2这个tag,如下:

相关文章: 

  • ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现

原文地址:https://www.cnblogs.com/daxnet/p/8270480.html


.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/322341.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

和某ZYC巨佬和XXY巨佬的随机挑战2总结

前言 一切的起点在那个炎热的酷暑&#xff0c;菜的一批的WYCWYCWYC坐在最容易被∗*∗的左下角。这时他永远都想不到&#xff0c;他与巨佬之间的挑战&#xff0c;即将开始。 正题 规则 随机跳333到蓝题&#xff0c;然后写完。 完成记录 题目博客 T1:P3100−[USACO14JAN]T1:P31…

(十四)消息中间件MQ详解及四大MQ比较

一、消息中间件相关知识 1、概述 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能&#xff0c;成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件&#xff0c;如老牌的ActiveMQ、RabbitMQ&a…

g4e基础篇#3 Git安装与配置

现在你已经对Git有了最基本的了解&#xff0c;现在让我们开始动手开始安装和配置Git环境。Git工具包括Git命令行工具&#xff0c;图形化工具和服务器环境&#xff1b;在我们这个教程中&#xff0c;我们会使用以下软件配置我们的环境&#xff1a;• Windows 操作系统&#xff08…

[认证授权] 6.Permission Based Access Control

在前面5篇博客中介绍了OAuth2和OIDC&#xff08;OpenId Connect&#xff09;&#xff0c;其作用是授权和认证。那么当我们得到OAuth2的Access Token或者OIDC的Id Token之后&#xff0c;我们的资源服务如何来验证这些token是否有权限来执行对资源的某一项操作呢&#xff1f;比如…

微软发布PowerShell Core第一个版本:支持多平台开发

微软旗下的PowerShell团队正式宣布推出PowerShell Core 6.0&#xff0c;非常诡异的是这明明是Core的第一个版本&#xff0c;但是却用了一个6.0后缀的版本号。“这是我们对PowerShell做出的最大最重要的改变&#xff01;”微软技术研究员兼PowerShell创始人Jeffrey Snover在Twit…

.NET Core单文件发布静态编译AOT CoreRT

.NET Core单文件发布静态编译AOT CoreRT&#xff0c;将.NET Core应用打包成一个可执行文件并包含运行时。支持Windows, MacOS and Linux x64 w/ RyuJIT codegen。示例项目&#xff1a;https://github.com/dotnet/corert/tree/master/samples/WebApi下面来实际体验。首先确保安装…

2019纪中暑假游记+总结

Travels总篇\texttt{Travels总篇}Travels总篇 7/4\texttt{7/4}7/4 下午才去纪中&#xff0c;早上就一大早和同学出去玩&#xff0c;看了蜘蛛侠然后到3点多才出发。 因为走南沙大桥所以很快就到了(具体有多快忘了&#xff0c;反正路上一点都不塞车)。就愉快的去整理宿舍洗个早…

使用xUnit为.net core程序进行单元测试(上)

一. 导读为什么要编写自动化测试程序&#xff08;Automated Tests&#xff09;&#xff1f;可以频繁的进行测试可以在任何时间进行测试&#xff0c;也可以按计划定时进行&#xff0c;例如&#xff1a;可以在半夜进行自动测试。肯定比人工测试要快。可以更快速的发现错误。基本上…

select2删除选中项,allowClear设置

转载自 select2删除选中项&#xff0c;allowClear设置 在使用select2过程中&#xff0c;有时候需要删除我们选中的选项&#xff0c;如下图&#xff1a; 这时候就需要设置select2的allowClear属性了。 有两种方法&#xff1a; 第一种&#xff1a; 直接用select2定义的一个c…

LeetCode算法总结-回溯法与深度优先搜索

转载自 LeetCode算法总结-回溯法与深度优先搜索 回溯法&#xff08;探索与回溯法&#xff09;是一种选优搜索法&#xff0c;又称为试探法&#xff0c;按选优条件向前搜索&#xff0c;以达到目标。但当探索到某一步时&#xff0c;发现原先选择并不优或达不到目标&#xff0c;就…

入门干货之用DVG打造你的项目主页-Docfx、Vs、Github

由于这三项技术涉及到的要点以及内容较多&#xff0c;希望大家有空能自己挖掘一下更多更深的用法。0x01、介绍VS&#xff0c;即VS2017以及以上版本&#xff0c;宇宙最好的IDE&#xff0c;集成了宇宙最有前景的平台&#xff0c;前阶段也支持了宇宙最好的语言。Github&#xff0c…

ASP.NET Core中使用IOC三部曲(一.使用ASP.NET Core自带的IOC容器)

前言本文主要是详解一下在ASP.NET Core中,自带的IOC容器相关的使用方式和注入类型的生命周期.这里就不详细的赘述IOC是什么 以及DI是什么了.. emm..不懂的可以自行百度.正文今天我们主要讲讲如何使用自带IOC容器,emm..虽然自带的功能不是那么强大,但是胜在轻量级..而且..不用引…

P4130,jzoj1214-[NOI2007]项链工厂【线段树】

正题 题目链接:https://www.luogu.org/problemnew/show/P4130 题目大意 一个环形颜色珠子链&#xff0c;位置(注意不是上面的珠子)从最上顺时针下来位置依次标号1∼n1\sim n1∼n。 然后要求支持以下操作 Rk:R\ k:R k:将所有珠子顺时针旋转kkk个。F:F:F:将所有珠子以111向下翻…

LeetCode常用算法模式大厂面试题整理

转载自 LeetCode常用算法模式&大厂面试题整理 文章目录 1、滑动窗口 2、双指针 3、快慢指针 4、合并区间 5、循环排序 6、就地反转链表 7、堆-优先队列问题 8、Top K 9、归并 10、单调栈 11、回溯法 BATJ等大厂面试真题汇总 1、滑动窗口 1 一个左指针&#xff0c;一个右…

ABPZero系列教程之拼多多卖家工具

此系列文章围绕着拼多多卖家工具来介绍ABPZero的使用&#xff0c;内容包括手机登录、手机注册、拼团提醒、微信公众号绑定帐号、有拼团发送消息到微信公众号&#xff08;只要关注过微信公众号并已绑定系统帐号&#xff09;。学习此系列必备&#xff1a;手机验证码&#xff1a;使…

g4e基础篇#4 了解Git存储库(Repo)

Git 存储库看上去就是一个文件夹&#xff0c;只是在这个文件夹中不仅仅保存了所有文件的当前版本&#xff0c;也同时保存了所有的历史记录&#xff0c;这些额外的信息都保存在当前文件夹下面的.git子目录中。因为前面我们所描述的git跟踪改动的特殊方式 &#xff0c;git可以在很…

net的retrofit--WebApiClient库

# 库简介WebApiClient是开源在github上的一个httpClient客户端库&#xff0c;内部基于HttpClient开发&#xff0c;是一个只需要定义c#接口(interface)&#xff0c;并打上相关特性&#xff0c;即可异步调用http-api的框架 &#xff0c;支持.net framework4.5、netcoreapp2.0和ne…

Sentinel(一)之简介

转载自 Sentinel: 分布式系统的流量防卫兵 Sentinel 是什么&#xff1f; 随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式服务架构的流量控制组件&#xff0c;主要以流量为切入点&#xff0c;从限流、流量整形、熔断降级、系统负…

使用xUnit为.net core程序进行单元测试(中)

第一部分: 使用xUnit为.net core程序进行单元测试(上), 下面有一点点内容是重叠的....String Assert测试string是否相等&#xff1a;[Fact]public void CalculateFullName(){var p new Patient{FirstName "Nick",LastName "Carter"};Assert.Equal(&quo…