最近在看微软eShopOnContainers 项目,看到事件总线觉得不错,和大家分享一下
看完此文你将获得什么?
eShop中是如何设计事件总线的
实现一个InMemory事件总线eShop中是没有InMemory实现的,这算是一个小小小的挑战
发布订阅模式
发布订阅模式可以让应用程序组件之间解耦,这是我们使用这种模式最重要的理由之一,如果你完全不知道这个东西,建议你先通过搜索引擎了解一下这种模式,网上的资料很多这里就不再赘述了。
eShop中的EventBus就是基于这种模式的发布/订阅。
发布订阅模式核心概念有三个:发布者、订阅者、调度中心,这些概念在消息队列中就是生产者、消费者、MQ实例。
在eShop中有两个EventBus的实现:
基于RabbitMq的
EventBusRabbitMQ
基于AzureServiceBus的
EventBusServiceBus
。
从IEventBus
开始
先来看一看,所有EventBus的接口IEventBus
public interface IEventBus{
void Publish(IntegrationEvent @event);
void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler;
void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler;
void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
}
嗯,乍一看看是有点眼晕的,仔细看它的核心功能只有三个:
Publish 发布
Subscribe 订阅
Unsubscribe 取消订阅
这对应着发布订阅模式的基本概念,不过对于事件总线的接口添加了许多约束:
发布的内容(消息)必须是
IntegrationEvent
及其子类订阅事件必须指明要订阅事件的类型,并附带处理器类型
处理器必须是
IIntegrationEventHandler
的实现类
Ok,看到这里先不要管Dynamic
相关的方法,然后记住这个两个关键点:
事件必须继承
IntegrationEvent
处理器必须实现
IIntegrationEventHandler<T>
且T
是IntegrationEvent
子类
另外,看下 IntegrationEvent
有什么
public class IntegrationEvent{
public IntegrationEvent() {Id = Guid.NewGuid();CreationDate = DateTime.UtcNow;} public Guid Id { get; }
public DateTime CreationDate { get; }
}
IEventBusSubscriptionsManager是什么
public interface IEventBusSubscriptionsManager{
bool IsEmpty { get; }
event EventHandler<string> OnEventRemoved;
void AddDynamicSubscription<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
void AddSubscription<T, TH>()
where T : IntegrationEvent where TH : IIntegrationEventHandler<T>;
void RemoveSubscription<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent; void RemoveDynamicSubscription<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
bool HasSubscriptionsForEvent(string eventName); Type GetEventTypeByName(string eventName);
void Clear();
IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;
IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName); string GetEventKey<T>();
}
这个接口看起来稍显复杂些,我们来简化下看看:
public interface IEventBusSubscriptionsManager{
void AddSubscription<T, TH>()
void RemoveSubscription<T, TH>()IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>()
}
最终,这三个方法就是我们要关注的,添加订阅、移除订阅、获取指定事件的订阅信息。
SubscriptionInfo
是什么?
public bool IsDynamic { get; }public Type HandlerType{ get; }
SubscriptionInfo
中只有两个信息,这是不是一个Dynamic类型的Event以及这个Event所对应的处理器的类型。
这是你可能会有另一个疑问:
这个和IEventBus
有什么关系?
IEventBusSubscriptionsManager
含有更多功能:查看是否有订阅,获取事件的Type,获取事件的处理器等等IEventBusSubscriptionsManager
由IEventBus
使用,在RabbitMq和ServiceBus的实现中,都使用Manager去存储事件的信息,例如下面的代码:public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { // 查询事件的全名var eventName = _subsManager.GetEventKey<T>(); //向mq添加注册DoInternalSubscription(eventName); // 向manager添加订阅_subsManager.AddSubscription<T, TH>(); }private void DoInternalSubscription(string eventName){ var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey){ if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();} using (var channel = _persistentConnection.CreateModel()){channel.QueueBind(queue: _queueName,exchange: BROKER_NAME,routingKey: eventName);}} }
查询事件的名字是manager做的,订阅的时候是先向mq添加订阅,之后又加到manager中,manager管理着订阅的基本信息。
另外一个重要功能是获取事件的处理器信息,在rabbit mq的实现中,ProcessEvent方法中用manager获取了事件的处理器,再用依赖注入获得处理器的实例,反射调用Handle
方法处理事件信息:
private async Task ProcessEvent(string eventName, string message) { // 从manager查询信息if (_subsManager.HasSubscriptionsForEvent(eventName)){ using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)){ // 从manager获取处理器var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions){ // Di + 反射调用,处理事件(两个都是,只是针对是否是dynamic做了不同的处理)if (subscription.IsDynamic){ var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData);} else{ var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handler = scope.ResolveOptional(subscription.HandlerType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });}}}}}
IEventBusSubscriptionsManager的默认实现
在eShop中只有一个实现就是InMemoryEventBusSubscriptionsManager
类
这个类中有两个重要的字段
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers; private readonly List<Type> _eventTypes;
他们分别存储了事件列表和事件处理器信息词典
接下来就是实现一个
基于内存的事件总线
了
我们要做什么呢?IEventBusSubscriptionsManager 已经有了InMemory的实现了,我们可以直接拿来用,所以我们只需要自己实现一个EventBus就好了
先贴出最终代码:
public class InMemoryEventBus : IEventBus{
private readonly IServiceProvider _provider;
private readonly ILogger<InMemoryEventBus> _logger;
private readonly ISubscriptionsManager _manager;
private readonly IList<IntegrationEvent> _events;
public InMemoryEventBus(IServiceProvider provider,ILogger<InMemoryEventBus> logger, ISubscriptionsManager manager) {_provider = provider;_logger = logger;_manager = manager;}
public void Publish(IntegrationEvent e) {
var eventType = e.GetType();
var handlers = _manager.GetHandlersForEvent(eventType.FullName); foreach (var handlerInfo in handlers){
var handler = _provider.GetService(handlerInfo.HandlerType);
var method = handlerInfo.HandlerType.GetMethod("Handle");method.Invoke(handler, new object[] { e });}}
public void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>{_manager.AddSubscription<T, TH>();}
public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler{ throw new NotImplementedException();}
public void Unsubscribe<T, TH>()
where T : IntegrationEvent where TH : IIntegrationEventHandler<T>{_manager.RemoveSubscription<T, TH>();}
public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler{ throw new NotImplementedException();}
}
首先构造函数中声明我们要使用的东西:
public InMemoryEventBus(IServiceProvider provider,ILogger<InMemoryEventBus> logger, ISubscriptionsManager manager)
{
_provider = provider;
_logger = logger;
_manager = manager;
}
这里要注意的就是IServiceProvider provider
这是 DI容器,当我们在切实处理事件的时候我们选择从DI获取处理器的实例,而不是反射创建,这要做的好处在于,处理器可以依赖于其它东西,并且可以是单例的
public void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{_manager.AddSubscription<T, TH>();}
public void Unsubscribe<T, TH>()
where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
{_manager.RemoveSubscription<T, TH>();
}
订阅和取消订阅很简单,因为我们是InMemory的所以只调用了manager的方法。
接下来就是最重要的Publish方法,实现Publish有两种方式:
使用额外的线程和Queue让发布和处理异步
为了简单起见,我们先写个简单易懂的同步的
public void Publish(IntegrationEvent e){
// 首先要拿到集成事件的Type信息var eventType = e.GetType(); // 获取属于这个事件的处理器列表,可能有很多,注意获得的是SubscriptionInfovar handlers = _manager.GetHandlersForEvent(eventType.FullName); // 不解释循环foreach (var handlerInfo in handlers){ // 从DI中获取类型的实例var handler = _provider.GetService(handlerInfo.HandlerType); // 拿到Handle方法var method = handlerInfo.HandlerType.GetMethod("Handle"); // 调用方法method.Invoke(handler, new object[] { e });} }
OK,我们的InMemoryEventBus就写好了!
要实践这个InMemoryEventBus,那么还需要一个IntegrationEvent
的子类,和一个IIntegrationEventHandler<T>
的实现类,这些都不难,例如我们做一个添加用户的事件,A在添加用户后,发起一个事件并将新用户的名字作为事件数据,B去订阅事件,并在自己的处理器中处理名字信息。
思路是这样的:
写一个
AddUserEvent:IntegrationEvent
,里面有一个UserId和一个UserName
。写一个
AddUserEventHandler:IIntegrationEventHandler<AddUserEvent>
,在Handle
方法中输出UserId和Name到日志。注册DI,你要注册下面这些服务:
IEventBus=>InMemoryEventBusISubscriptionsManager=>InMemorySubscriptionsManagerAddUserEventHandler=>AddUserEventHandler
在Startup中为刚刚写的事件和处理器添加订阅(在这里已经可以获取到IEventBus实例了)
写一个Api接口或是什么,调用IEventBus的Publish方法,new 一个新的
AddUserEvent
作为参数传进去。
OK!到这里一个切实可用的InMemoryEventBus就可以使用了。
相关文章:
利用Service Fabric承载eShop On Containers
开篇有益-解析微软微服务架构eShopOnContainers(一)
EventBus In eShop -- 解析微软微服务架构eShopOnContainers(四)
我眼中的ASP.NET Core之微服务
微服务中的异步消息通讯
.NET Core 事件总线,分布式事务解决方案:CAP
微服务的概念——《微服务设计》读书笔记
微服务架构师的职责——《微服务设计读书笔记》
建模:确定服务的边界——《微服务设计》读书笔记
微服务集成——《微服务设计》读书笔记
服务的协作:服务间的消息传递——《微服务设计》读书笔记
拆分:分解单块系统——《微服务设计》读书笔记
部署:持续集成(CI)与持续交付(CD)——《微服务设计》读书笔记
测试——《微服务设计》读书笔记
监控——《微服务设计》读书笔记
安全——《微服务设计》读书笔记
康威定律和系统设计——《微服务设计》读书笔记
规模化微服务——《微服务设计》读书笔记
Net分布式系统之:微服务架构
原文:https://www.cnblogs.com/rocketRobin/p/8510198.html
.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com