看eShopOnContainers学一个EventBus

最近在看微软eShopOnContainers 项目,看到事件总线觉得不错,和大家分享一下

看完此文你将获得什么?

  1. eShop中是如何设计事件总线的

  2. 实现一个InMemory事件总线eShop中是没有InMemory实现的,这算是一个小小小的挑战

发布订阅模式

发布订阅模式可以让应用程序组件之间解耦,这是我们使用这种模式最重要的理由之一,如果你完全不知道这个东西,建议你先通过搜索引擎了解一下这种模式,网上的资料很多这里就不再赘述了。

eShop中的EventBus就是基于这种模式的发布/订阅
发布订阅模式核心概念有三个:发布者、订阅者、调度中心,这些概念在消息队列中就是生产者、消费者、MQ实例

在eShop中有两个EventBus的实现:

  • 基于RabbitMq的EventBusRabbitMQ

  • 基于AzureServiceBus的EventBusServiceBus

IEventBus开始

先来看一看,所有EventBus的接口IEventBus

public interface IEventBus{   
 void Publish(IntegrationEvent @event);  
  void Subscribe<T, TH>()      
    where T : IntegrationEvent      
     where TH : IIntegrationEventHandler<T>;  
    
void SubscribeDynamic<TH>(string eventName)        where TH : IDynamicIntegrationEventHandler;    

void UnsubscribeDynamic<TH>(string eventName)        where TH : IDynamicIntegrationEventHandler;    

void Unsubscribe<T, TH>()      
 where TH : IIntegrationEventHandler<T>    
     where T : IntegrationEvent; }

嗯,乍一看看是有点眼晕的,仔细看它的核心功能只有三个:

  1. Publish 发布

  2. Subscribe 订阅

  3. Unsubscribe 取消订阅

这对应着发布订阅模式的基本概念,不过对于事件总线的接口添加了许多约束:

  1. 发布的内容(消息)必须是IntegrationEvent及其子类

  2. 订阅事件必须指明要订阅事件的类型,并附带处理器类型

  3. 处理器必须是IIntegrationEventHandler的实现类

Ok,看到这里先不要管Dynamic相关的方法,然后记住这个两个关键点:

  1. 事件必须继承IntegrationEvent

  2. 处理器必须实现IIntegrationEventHandler<T>TIntegrationEvent子类

另外,看下 IntegrationEvent有什么

public class IntegrationEvent{    
public IntegrationEvent()    {Id = Guid.NewGuid();CreationDate = DateTime.UtcNow;}    public Guid Id  { get; }  
 public DateTime CreationDate { get; } }

IEventBusSubscriptionsManager是什么

public interface IEventBusSubscriptionsManager{   
 bool IsEmpty { get; }  
  event EventHandler<string> OnEventRemoved;  
   void AddDynamicSubscription<TH>(string eventName)  
       where TH : IDynamicIntegrationEventHandler;  
 void AddSubscription<T, TH>()    
   where T : IntegrationEvent       where TH : IIntegrationEventHandler<T>;  
    void RemoveSubscription<T, TH>()  
          where TH : IIntegrationEventHandler<T>    
               where T : IntegrationEvent;    void RemoveDynamicSubscription<TH>(string eventName)  
         where TH : IDynamicIntegrationEventHandler;  
 bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
 
 bool HasSubscriptionsForEvent(string eventName);    Type GetEventTypeByName(string eventName);  
 void Clear();
 IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;  
 IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);    string GetEventKey<T>(); }

这个接口看起来稍显复杂些,我们来简化下看看:

public interface IEventBusSubscriptionsManager{   
 void AddSubscription<T, TH>()  
 void RemoveSubscription<T, TH>()IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() }

最终,这三个方法就是我们要关注的,添加订阅、移除订阅、获取指定事件的订阅信息。

SubscriptionInfo是什么?

public bool IsDynamic { get; }public Type HandlerType{ get; }

SubscriptionInfo中只有两个信息,这是不是一个Dynamic类型的Event以及这个Event所对应的处理器的类型。

这是你可能会有另一个疑问:

这个和IEventBus有什么关系?

  1. IEventBusSubscriptionsManager含有更多功能:查看是否有订阅,获取事件的Type,获取事件的处理器等等

  2. IEventBusSubscriptionsManagerIEventBus使用,在RabbitMq和ServiceBus的实现中,都使用Manager去存储事件的信息,例如下面的代码:

    public void Subscribe<T, TH>()    where T : IntegrationEvent    where TH : IIntegrationEventHandler<T>
    {    // 查询事件的全名var eventName = _subsManager.GetEventKey<T>();    //向mq添加注册DoInternalSubscription(eventName);    // 向manager添加订阅_subsManager.AddSubscription<T, TH>();
    }private void DoInternalSubscription(string eventName){    var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);    if (!containsKey){        if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();}        using (var channel = _persistentConnection.CreateModel()){channel.QueueBind(queue: _queueName,exchange: BROKER_NAME,routingKey: eventName);}}
    }

    查询事件的名字是manager做的,订阅的时候是先向mq添加订阅,之后又加到manager中,manager管理着订阅的基本信息。

另外一个重要功能是获取事件的处理器信息,在rabbit mq的实现中,ProcessEvent方法中用manager获取了事件的处理器,再用依赖注入获得处理器的实例,反射调用Handle方法处理事件信息:

    private async Task ProcessEvent(string eventName, string message)    {        // 从manager查询信息if (_subsManager.HasSubscriptionsForEvent(eventName)){            using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)){                // 从manager获取处理器var subscriptions = _subsManager.GetHandlersForEvent(eventName);                foreach (var subscription in subscriptions){                    // Di + 反射调用,处理事件(两个都是,只是针对是否是dynamic做了不同的处理)if (subscription.IsDynamic){ var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;                        dynamic eventData = JObject.Parse(message);                        await handler.Handle(eventData);}                    else{                        var eventType = _subsManager.GetEventTypeByName(eventName);                        var integrationEvent = JsonConvert.DeserializeObject(message, eventType);                        var handler = scope.ResolveOptional(subscription.HandlerType);                        var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);                        await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });}}}}}

IEventBusSubscriptionsManager的默认实现

在eShop中只有一个实现就是InMemoryEventBusSubscriptionsManager

这个类中有两个重要的字段

    private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;    private readonly List<Type> _eventTypes;

他们分别存储了事件列表和事件处理器信息词典

接下来就是实现一个

基于内存的事件总线

我们要做什么呢?IEventBusSubscriptionsManager 已经有了InMemory的实现了,我们可以直接拿来用,所以我们只需要自己实现一个EventBus就好了

先贴出最终代码:

public class InMemoryEventBus : IEventBus{   
   private readonly IServiceProvider _provider;
    private readonly ILogger<InMemoryEventBus> _logger;
    private readonly ISubscriptionsManager _manager;  
    private readonly IList<IntegrationEvent> _events;  
    
    public InMemoryEventBus(IServiceProvider provider,ILogger<InMemoryEventBus> logger, ISubscriptionsManager manager)    {_provider = provider;_logger = logger;_manager = manager;}  
    
    public void Publish(IntegrationEvent e)    {    
        var eventType = e.GetType();      
         var handlers = _manager.GetHandlersForEvent(eventType.FullName);        foreach (var handlerInfo in handlers){          
           var handler = _provider.GetService(handlerInfo.HandlerType);        
            var method = handlerInfo.HandlerType.GetMethod("Handle");method.Invoke(handler, new object[] { e });}}  
  
    public void Subscribe<T, TH>()      
              where T : IntegrationEvent      
                where TH : IIntegrationEventHandler<T>{_manager.AddSubscription<T, TH>();}  
  
   public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler{        throw new NotImplementedException();}  
  
    public void Unsubscribe<T, TH>()    
                    where T : IntegrationEvent        where TH : IIntegrationEventHandler<T>{_manager.RemoveSubscription<T, TH>();}  
    
    public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler{        throw new NotImplementedException();} }

首先构造函数中声明我们要使用的东西:

public InMemoryEventBus(IServiceProvider provider,ILogger<InMemoryEventBus> logger, ISubscriptionsManager manager)
{    
_provider = provider;    
_logger = logger;    
_manager = manager; }

这里要注意的就是IServiceProvider provider这是 DI容器,当我们在切实处理事件的时候我们选择从DI获取处理器的实例,而不是反射创建,这要做的好处在于,处理器可以依赖于其它东西,并且可以是单例的

public void Subscribe<T, TH>()    
where T : IntegrationEvent
   where TH : IIntegrationEventHandler<T> {_manager.AddSubscription<T, TH>();}
   
public void Unsubscribe<T, TH>()  
     where T : IntegrationEvent    where TH : IIntegrationEventHandler<T> {_manager.RemoveSubscription<T, TH>(); }

订阅和取消订阅很简单,因为我们是InMemory的所以只调用了manager的方法。

接下来就是最重要的Publish方法,实现Publish有两种方式:

  1. 使用额外的线程和Queue让发布和处理异步

  2. 为了简单起见,我们先写个简单易懂的同步的

    public void Publish(IntegrationEvent e){    
    // 首先要拿到集成事件的Type信息var eventType = e.GetType();    // 获取属于这个事件的处理器列表,可能有很多,注意获得的是SubscriptionInfovar handlers = _manager.GetHandlersForEvent(eventType.FullName);    // 不解释循环foreach (var handlerInfo in handlers){        // 从DI中获取类型的实例var handler = _provider.GetService(handlerInfo.HandlerType);        // 拿到Handle方法var method = handlerInfo.HandlerType.GetMethod("Handle");        // 调用方法method.Invoke(handler, new object[] { e });} }

OK,我们的InMemoryEventBus就写好了!

要实践这个InMemoryEventBus,那么还需要一个IntegrationEvent的子类,和一个IIntegrationEventHandler<T>的实现类,这些都不难,例如我们做一个添加用户的事件,A在添加用户后,发起一个事件并将新用户的名字作为事件数据,B去订阅事件,并在自己的处理器中处理名字信息。

思路是这样的:

  1. 写一个 AddUserEvent:IntegrationEvent,里面有一个UserId和一个UserName

  2. 写一个AddUserEventHandler:IIntegrationEventHandler<AddUserEvent>,在Handle方法中输出UserId和Name到日志。

  3. 注册DI,你要注册下面这些服务:

    IEventBus=>InMemoryEventBusISubscriptionsManager=>InMemorySubscriptionsManagerAddUserEventHandler=>AddUserEventHandler
  4. 在Startup中为刚刚写的事件和处理器添加订阅(在这里已经可以获取到IEventBus实例了)

  5. 写一个Api接口或是什么,调用IEventBus的Publish方法,new 一个新的AddUserEvent作为参数传进去。

OK!到这里一个切实可用的InMemoryEventBus就可以使用了。

相关文章:

原文:https://www.cnblogs.com/rocketRobin/p/8510198.html


.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/321982.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jzoj3682-Points and Segments【模型转化,欧拉回路】

正题 题目大意 给出若干个区间&#xff0c;然后给每个区间涂颜色(蓝或红)&#xff0c;求一种方案使得每个点的颜色数量差不超过111。 解题思路 我们可以从每个lll向rrr连一条双向边&#xff0c;若此时我们可以跑出欧拉回路&#xff0c;那么这就满足颜色差为0(从l∼rl\sim rl∼…

常用解题算法总结

一、四大基本算法 分治法 动态规划&#xff08;一次买卖股票、多次买卖股票、最大连续子序列和、最大连续子序列积、最长公共子序列&#xff09; 贪心算法 穷举法 二、常用便捷算法 异或法&#xff08;单次偶次数、顺序单次偶次数&#xff09; 位运算&#xff08;单次k次…

创建基于MailKit和MimeKit的.NET基础邮件服务

邮件服务是一般的系统都会拥有和需要的功能&#xff0c;但是对于.NET项目来说&#xff0c;邮件服务的创建和使用会较为的麻烦。.NET对于邮件功能提供了System.Net.Mail用于创建邮件服务&#xff0c;该基础服务提供邮件的基础操作&#xff0c;并且使用也较为的简单。对于真正将该…

欢乐纪中A组赛【2019.8.23】

前言 我好菜 成绩 %%%TRXdalao\%\%\% TRXdalao%%%TRXdalao RankRankRankPersonPersonPersonScoreScoreScoreAAABBBCCC888(H−2)TRX(H-2)TRX(H−2)TRX120120120202020100100100000121212(H−2)HJW(H-2)HJW(H−2)HJW100100100100100100000000181818(J−3)XXY(J-3)XXY(J−3)XXY80…

Java JVM总结

一、jvm参数 1&#xff09;内存 -Xms -Xmx -Xss -Xloggc:file -Xprof -XX:DisabledExplicitGC -XX:PreBlockSpin -XX:CompileThreshold 2&#xff09;Parallel -XX:SurvivorRatio -XX:PreTenureSizeThreshold -XX:MaxTenuringThreshold -XX:ParallelGCThreads -XX:Us…

EF Core下利用Mysql进行数据存储在并发访问下的数据同步问题

小故事在开始讲这篇文章之前&#xff0c;我们来说一个小故事&#xff0c;纯素虚构&#xff08;真实的存钱逻辑并非如此&#xff09;小刘发工资后&#xff0c;赶忙拿着现金去银行&#xff0c;准备把钱存起来&#xff0c;而与此同时&#xff0c;小刘的老婆刘嫂知道小刘的品性&…

牛客练习赛50-记录

正题 比赛链接:https://ac.nowcoder.com/acm/contest/1080#question 成绩 本届 升高二届 总结 以后还是不要写太多自己不擅长的写法&#xff0c;空间要多检查&#xff0c;不要像个傻逼一样啥都写错。 尽量不要为了省一点空间和时间写一些不舒服的东西&#xff0c;尽量在能…

物联网框架ServerSuperIO在.NetCore实现跨平台的实践路线

正所谓天下大势&#xff0c;不跟风不行。你不跨平台&#xff0c;很low嘛。java说&#xff1a;你们能跨嘛&#xff0c;跨给我看看。C#说&#xff1a;不要强人所难嘛。java说&#xff1a;能部署在云上吗&#xff1f;docker&#xff1f;微服务&#xff1f;C#说&#xff1a;不要强人…

Spring Aop总结

一、什么是AOP 面向方面的编程&#xff08;AOP&#xff09;是一种编程技术&#xff0c;是面向对象编程的补充&#xff0c;它也提供了模块化。 在面向对象编程中&#xff0c;关键的单元是对象&#xff0c;AOP的关键单元是切面&#xff0c;或者说关注点。一些切面可能有集中的代…

P3750-[六省联考2017]分手是祝愿【期望dp】

正题 题目链接:https://www.luogu.org/problem/P3750 题目大意 nnn盏灯和按钮&#xff0c;每次随机选择一个xxx按下后会让xxx的倍数的灯都取反&#xff0c;然后若最少kkk步就可以将所有灯关闭那么直接选择最优策略&#xff0c;求关闭所有灯的期望次数。 解题思路 做期望dpdpd…

使用WebApiClient请求和管理Restful Api

前言本篇文章的内容是WebApiClient应用说明篇&#xff0c;如果你没有了解过WebApiClient&#xff0c;可以先阅读以下相关文章&#xff1a;WebApi client 的面向切面编程我来给.Net设计一款HttpClient.Net45下HttpClient的几个缺陷.net的retrofit--WebApiClient库.net的retrofit…

Spring MVC总结

一、Spring MVC &#xff08;1&#xff09;介绍 Spring MVC是一个基于Java的实现了MVC设计模式的请求驱动类型的轻量级Web框架。 通过把Model&#xff0c;View&#xff0c;Controller分离&#xff0c;将web层进行职责解耦&#xff0c;把复杂的web应用分成逻辑清晰的几部分&…

拥抱.NET Core系列:MemoryCache 缓存选项

MSCache项目MSCache 目前最新的正式版是 2.0.0&#xff0c;预览版是2.1.0&#xff0c;会与 .NETCore 2.1 一起发布。本篇用了2.0.0版本开源在 GitHub 上&#xff0c;仓库地址是&#xff1a;https://github.com/aspnet/CachingNuGet地址为&#xff1a;https://www.nuget.org/pac…

牛客练习赛51-记录

正题 比赛链接:https://ac.nowcoder.com/acm/contest/1083#question 成绩 可怜的zycT3zycT3zycT3被n0n0n0卡了半天&#xff0c;这里感谢一下排雷 总结 比赛状态较好&#xff0c;后面没有T6T6T6的题解 T1:abcT1:abcT1:abc 题目大意 给出一个字符串&#xff0c;求有多少个abc…

SpringBoot总结

一、SpringBoot &#xff08;1&#xff09;简介 SpringFramework&#xff1a;最重要的特征是依赖注入。所有 SpringModules 不是依赖注入就是 IOC 控制反转。使用 DI 或者是 IOC 的时候&#xff0c;可以开发松耦合应用。松耦合应用的单元测试可以很容易的进行。 Spring MVC&…

Metrics.net + influxdb + grafana 构建WebAPI的自动化监控和预警

前言这次主要分享通过Metrics.net influxdb grafana 构建WebAPI的自动化监控和预警方案。通过执行耗时&#xff0c;定位哪些接口拖累了服务的性能&#xff1b;通过请求频次&#xff0c;设置适当的限流和熔断机制&#xff0c;拦截非法或不合理的请求&#xff0c;保障服务的可用…

jzoj6342-[NOIP2019模拟2019.9.7]Tiny Counting【树状数组,容斥】

正题 题目大意 一个序列SSS&#xff0c;求有多少个互不相同的4元组(a,b,c,d)(a,b,c,d)(a,b,c,d)使得a<b且Sa<Sba<b且S_a<S_ba<b且Sa​<Sb​ c<b且Sc>Sdc<b且S_c>S_dc<b且Sc​>Sd​ 解题思路 若可以重复其实答案就是逆序对个数乘上正序对…

jzoj6343-[NOIP2019模拟2019.9.7]Medium Counting【记忆化dfs,dp】

正题 题目大意 给出nnn个字符串SiS_iSi​&#xff0c;然后有些???号可以进行随便填字母。 然后要求Si<Si1S_i<S_{i1}Si​<Si1​的情况下求方案数。 解题思路 定义fl,r,p,cf_{l,r,p,c}fl,r,p,c​表示只考虑l∼rl\sim rl∼r的字符串&#xff0c;只考虑ppp往后的字…

EF Core 2.1路线图:视图、GROUP BY和惰性加载

Entity Framework Core一直追随着初始Entity Framework的发展&#xff0c;并不断推陈出新。它首先推出的是对视图的支持&#xff0c;这听起来有些耸人听闻。在即将推出的EF Core 2.1之前&#xff0c;EF Core并未对数据库视图提供官方的支持&#xff0c;也不支持缺少主键的数据库…

计算机网络总结

一、计算机网络体系 &#xff08;1&#xff09;OSI分层 &#xff08;7层&#xff09; 物理层、数据链路层、网络层、传输层、会话层、表示层、应用层。 &#xff08;2&#xff09;TCP/IP分层&#xff08;4层&#xff09; 网络接口层、 网际层、运输层、 应用层。 &#xff0…