EventBus In eShop -- 解析微软微服务架构eShopOnContainers(四)

引言

大家好像对分析源码厌倦了,说实在我也会厌倦,不过不看是无法分析其后面的东西,从易到难是一个必要的过程。

今天说下EventBus,前几天园里的大神已经把其解刨事件总线(Event Bus)知多少,我今天就借着大神的肩膀,分析下在eShop项目中EventBus的实现。

最近发觉转发文章不写出处的,特此加上链接:http://inday.cnblogs.com

解析源码

我们知道使用EventBus是为了解除Publisher和Subscriber之间的依赖性,这样我们的Publisher就不需要知道有多少Subscribers,只需要通过EventBus进行注册管理就好了,在eShop项目中,有一个这样的接口IEventBus(eShopOnContainers\src\BuildingBlocks\EventBus\EventBus\Abstractions)


public interface IEventBus
{   
  
void Subscribe<T, TH>(Func<TH> handler)        where T : IntegrationEvent        where TH : IIntegrationEventHandler<T>;    

   void Unsubscribe<T, TH>()        where TH : IIntegrationEventHandler<T> where T : IntegrationEvent;  

  
void Publish(IntegrationEvent @event); }

我们可以看到这个接口定义了EventBus所需的一些操作, 对比大神的EventBus,相关功能都是一致的,我们看下它的实现类:EventBusRabbitMQ,从名字上可以看出,这是一个通过RabbitMQ来进行管理的EventBus,我们可以看到它使用了IEventBusSubscriptionsManager进行订阅存储,也就是大神文中的:

private readonly ConcurrentDictionary<Type, List<Type>> _eventAndHandlerMapping;

微软在Demo中把其提取出了接口,把一些常用方法给提炼了出来,但是核心还是Dictionary<string, List<Delegate>>, 使用Dictionary进行Map映射。通过Subscribe和UnSubscribe进行订阅和取消,使用Publish方法进行发布操作。


public void Subscribe<T, TH>(Func<TH> handler)    where T : IntegrationEvent    where TH : IIntegrationEventHandler<T>{   
 
var eventName = typeof(T).Name;  
 
var containsKey = _subsManager.HasSubscriptionsForEvent<T>();  
 
if (!containsKey){      
       
if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();}      
      
using (var channel = _persistentConnection.CreateModel()){channel.QueueBind(queue: _queueName,exchange: BROKER_NAME,routingKey: eventName);}}_subsManager.AddSubscription<T, TH>(handler); }

我们看到在订阅的时候,EventBus会检查下在Map中是否有相应的注册,如果没有的话首先回去RabbitMQ中创建一个新的channel进行绑定,随后在Map中进行注册映射。

UnSubscribe则直接从Map中取消映射,通过OnEventRemoved事件判断Map下此映射的subscriber是否为空,为空则从RabbitMQ中关闭channel。

在RabbitMQ的构造方法中,我们看到这样一个创建:CreateConsumerChannel(),这里创建了一个EventingBasicConsumer,当Queue中有新的消息时会通过ProcessEvent执行Map中注册的handler(subscribers),看图可能更清晰些:


在ProcessEvent方法中,回去Map中找寻subscribers,然后通过动态反射进行执行:

private async Task ProcessEvent(string eventName, string message)
{    if (_subsManager.HasSubscriptionsForEvent(eventName)){ var eventType = _subsManager.GetEventTypeByName(eventName);        var integrationEvent = JsonConvert.DeserializeObject(message, eventType);        var handlers = _subsManager.GetHandlersForEvent(eventName);     
   
foreach (var handlerfactory in handlers){          
  
var handler = handlerfactory.DynamicInvoke();      
  
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);  
     
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });}} }

微软通过简单的代码解耦了Publisher和Subscribers之间的依赖关系,我们引用大神的总结:

应用

在catalog.api中,微软出现了EventBus,我在上一篇中也提到了,这是我的一个疑惑,因为在catalog中并没有订阅操作,直接执行了Publish操作,原先以为是一个空操作,后来看了Basket.Api我才知道为何微软要用RabbitMQ。

使用RabbitMQ,我们不仅是从类之间的解耦,更可以跨项目,跨语言,跨平台的解耦,publisher仅仅需要把消息体(IntegrationEvent)传送到RabbitMQ,Consumer从Queue中获取消息体,然后推送到Subscribers执行相应的操作。我们看下Basket.Api.Startup.cs:

protected virtual void ConfigureEventBus(IApplicationBuilder app)
{  
  
var catalogPriceHandler = app.ApplicationServices        .GetService<IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>>();    var orderStartedHandler = app.ApplicationServices.GetService<IIntegrationEventHandler<OrderStartedIntegrationEvent>>();    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>(() => app.ApplicationServices.GetRequiredService<ProductPriceChangedIntegrationEventHandler>());eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler>(() => app.ApplicationServices.GetRequiredService<OrderStartedIntegrationEventHandler>()); }

在这个方法里,我们看到了Subscribe操作,想想之前的提问有点搞笑,不过研究明白了也不错,对吧!

总结

今天我们看了EventBus在Demo中的应用,总结一下。

1、EventBus可以很好的解耦订阅者和发布者之间的依赖

2、使用RabbitMQ能够跨项目、跨平台、跨语言的解耦订阅者和发布者

虽然在Demo中我们看到对订阅者的管理是通过Dictionary内存的方式,所以我们的Subscribe仅仅只在Basket.Api中看到,但微软是通过IEventBusSubscriptionsManager接口定义的,我们可以通过自己的需求来进行定制,可以做成分布式的,比如使用memcached。

写在最后

每个月到下旬就会比较忙,所以文章发布会比较慢,但我也会坚持学习完eShop的,为了学习,我建了个群,大家可以进来一起学习,有什么建议和问题都可以进来哦。

eShop虽好,但不建议大家放到生产环境,毕竟是一个Demo,而且目前还是ALPHA版本,用来学习是一个很好的教材,这就是一个大杂烩,学习中你会学到很多新的东西,大家如果看好core的发展,可以一起研究下。

QQ群:376248054

相关文章:

原文地址:http://www.cnblogs.com/inday/p/eventbus-in-eshopcontainers.html


.NET社区新闻,深度好文,微信中搜索dotNET跨平台或扫描二维码关注

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/324919.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

再有人问你synchronized是什么,就把这篇文章发给他。

转载自 再有人问你synchronized是什么&#xff0c;就把这篇文章发给他。 在再有人问你Java内存模型是什么&#xff0c;就把这篇文章发给他中我们曾经介绍过&#xff0c;Java语言为了解决并发编程中存在的原子性、可见性和有序性问题&#xff0c;提供了一系列和并发处理相关的…

基本数据类型、包装类、String三者之间的相互转换

package com.wdl.day13;import org.junit.Test;/** 包装类的使用:* 1.java提供了8种基本数据类型对应的包装类&#xff0c;使得基本数据类型的变量具有类的特征** 2.掌握的&#xff1a;基本数据类型、包装类、String三者之间的相互转换****/ public class WrapperTest {//Strin…

很简单很简单的DBHelper类

记录一个简单的DBHelper类吧&#xff0c;用的时候在上来拿&#xff01; /// <summary>/// 数据库连接工具类/// </summary>public class DBHelper{string constr "Data Source.;Initial CatalogschoolDB;Integrated SecurityTrue";private SqlConnecti…

洛谷P3371-【模板】单源最短路【SPFA】

题目 一个有向图&#xff0c;求一点到所有点的最短距离 输入 4 6 1(4个点&#xff0c;6条边&#xff0c;从1出发) 1 2 2(1点到2点有一条权值2的线) 2 3 2 2 4 1 1 3 5 3 4 3 1 4 4 输出 0 2 4 3 解题思路 这题数据非常的大 说明 时空限制&#xff1a;1000ms,128M …

DDD理论学习系列(1)-- 通用语言

1.引言 在开始之前&#xff0c;我想我们有必要先了解以下DDD的主要参与者。因为毕竟语言是人说的吗&#xff0c;就像我们面向对象编程一样&#xff0c;那通用语言面向的是&#xff1f;DDD的主要参与者&#xff1a;领域专家开发人员领域专家&#xff1a;精通业务的任何人。开发…

Java中枚举的线程安全性及序列化问题

转载自 Java中枚举的线程安全性及序列化问题 Java SE5提供了一种新的类型-Java的枚举类型&#xff0c;关键字enum可以将一组具名的值的有限集合创建为一种新的类型&#xff0c;而这些具名的值可以作为常规的程序组件使用&#xff0c;这是一种非常有用的功能。本文将深入分析枚…

Servlet 参数读取

1、配置参数读取的意义&#xff1a; 把参数提取到配置的信息中这样就大大的增加了整个代码的使用性方面后期的代码维护 需要知道&#xff0c; 在这里的参数读取仅仅是读到程序里面。 2、实现代码 package com.bjsxt.servlet;import javax.servlet.ServletException; import j…

定了!对于本周四(7.16日)抽奖活动取消简要说明,新抽奖活动暂定下周三(7.22日)...

大家好&#xff0c;我是雄雄&#xff0c;对于本周四&#xff08;7.16日&#xff09;抽奖活动取消简要说明&#xff0c;新抽奖活动暂定下周三&#xff08;7.22日&#xff09;&#xff0c;欢迎各位粉丝积极参与&#xff0c;奖品已经准备好了&#xff0c;你&#xff01;准备好了吗…

洛谷P1144-最短路计算【日常最短路,日常图论,SPFA】

题目 一个无向图&#xff0c;求点1到每个点的最短路的路径数量 输入 5 7(5个点,7条边) 1 2(表示1到2有边) 1 3 2 4 3 4 2 3 4 5 4 5 输出 &#xff08;答案mod100003&#xff09; 1 1 1 2 4 解题思路 注意这是无向图&#xff0c;然后请看数据范围 对于20%的数…

RabbitMQ系列教程之二:工作队列(Work Queues)

今天开始RabbitMQ教程的第二讲&#xff0c;废话不多说&#xff0c;直接进入话题。 (使用.NET 客户端 进行事例演示) 在第一个教程中&#xff0c;我们编写了一个从命名队列中发送和接收消息的程序。在本教程中&#xff0c;我们将创建一个工作队列&#xff0c;这个队列将用于在…

单例 (Singleton)设计模式

所谓类的单例设计模式&#xff0c;就是采取一定的方法保证在整个的软件系统中&#xff0c;对某个类只能存在一个对象实例&#xff0c;并且该类只提供一个取得其对象实例的方法。如果我们要让类在一个虚拟机中只能产生一个对象&#xff0c;我们首先必须将类的构造器的访问权限设…

面试中经常会问的智力题,来看看你会做几道

转载自 面试中经常会问的智力题&#xff0c;来看看你会做几道 下面是大部分题目来自滴滴出行2017秋招题。开始头脑风暴吧~~~ 问题 question one 有50家人家&#xff0c;每家一条狗。有一天警察通知&#xff0c;50条狗当中有病狗&#xff0c;行为和正常狗不一样。每人只能通…

你喜欢什么样的课堂?

最近看了一本书《让课堂充满幽默》&#xff0c;里面有些内容还是觉得挺有道理的。书中开头就说&#xff1a;“大量的课件、复杂的网络&#xff0c;使得老师似乎成了信息管理员和媒体播放机&#xff0c;学生则成为了被灌输的对象”&#xff0c;看后&#xff0c;内心深处反问自己…

Servlet 中文乱码处理

1、为什么使用中文乱码 我们在实现登录时候 &#xff0c;需要进行前台的数据。获得数据以后可能会出现中文乱码&#xff0c;那应该如何处理呢&#xff1f; 2、get方式和Post提交方式的区别 [1]get数据的传输是不安全的 &#xff0c;post数据传递更加安全 [2]get方式数据传递有大…

洛谷P1346-电车【日常图论,最短路,SPFA】

题目 一个有向图&#xff0c;每个点有个默认方向和若干个其他方向&#xff0c;走默认方向权值为0&#xff0c;其他方向权值为1&#xff0c;求最短路 输入 3 2 1(3个点&#xff0c;点2到点1) 2 2 3&#xff08;2个点&#xff0c;起点为1&#xff0c;2为默认点&#xff0c;3为…

main()方法

main()方法的使用说明&#xff1a; main()方法作为程序的入口main()方法也是一个普通的静态方法main()方法可以作为我们与控制台交互的方式。&#xff08;之前&#xff1a;使用Scanner&#xff09;

实现自己的.NET Core配置Provider之EF

《10分钟就能学会.NET Core配置》里详细介绍了.NET Core配置的用法&#xff0c;另外我还开源了自定义的配置Provider&#xff1a;EF配置Provider和Yaml配置Provider。本文先来聊聊EF配置Provider的实现&#xff0c;其中会涉及到EntityFramework Core的知识&#xff0c;不熟悉也…

今天的雪糕格外好吃!

赤日炎炎&#xff0c;室外的温度超过30℃。午休罢&#xff0c;教室里一片寂静&#xff0c;大家都有一个目的——等待老师的进来&#xff0c;继续上课。偶尔有几位同学貌似等待焦急&#xff0c;遂将目光瞥向窗外&#xff0c;若有所思。还有几位好动同学在自己的座位上左右摇动&a…

学习分布式不得不会的BASE理论

转载自 学习分布式不得不会的BASE理论 eBay的架构师Dan Pritchett源于对大规模分布式系统的实践总结&#xff0c;在ACM上发表文章提出BASE理论&#xff0c;BASE理论是对CAP理论的延伸&#xff0c;核心思想是即使无法做到强一致性&#xff08;Strong Consistency&#xff0c;…

Servelt 中文乱码

1、为什么使用中文乱码 我们在实现登录时候 &#xff0c;需要进行前台的数据。获得数据以后可能会出现中文乱码&#xff0c;那应该如何处理呢&#xff1f; 2、get方式和Post提交方式的区别 [1]get数据的传输是不安全的 &#xff0c;post数据传递更加安全 [2]get方式数据传递有大…