1. 引言
事件总线这个概念对你来说可能很陌生,但提到观察者(发布-订阅)模式,你也许就很熟悉。事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。从上图可知,核心就4个角色:
事件(事件源+事件处理)
事件发布者
事件订阅者
事件总线
实现事件总线的关键是:
事件总线维护一个事件源与事件处理的映射字典;
通过单例模式,确保事件总线的唯一入口;
利用反射完成事件源与事件处理的初始化绑定;
提供统一的事件注册、取消注册和触发接口。
以上源于我在事件总线知多少(1)中对于EventBus的分析和简单总结。基于以上的简单认知,我们来梳理下eShopOnContainers中EventBus的实现机制。
2. 高屋建瓴--看类图
我们直接以上帝视角,来看下其实现机制,上类图。
我们知道事件的本质是:事件源+事件处理。 针对事件源,其定义了 Handle
方法用于响应事件。不同之处在于方法参数的类型: 第一个接受的是一个强类型的 dynamic
。 为什么要单独提供一个事件源为 dynamic
可以简化事件源的构建,更趋于灵活。
有了事件源和事件处理,接下来就是事件的注册和订阅了。为了方便进行订阅管理,系统提供了额外的一层抽象 InMemoryEventBusSubscriptionsManager
就是使用内存进行存储事件源和事件处理的映射字典。 从类图中看 SubscriptionInfo
,其主要用于表示事件订阅方的订阅类型和事件处理的类型。
我们来近距离看下
//InMemoryEventBusSubscriptionsManager.cs
//定义的事件名称和事件订阅的字典映射(1:N)
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
//保存所有的事件处理类型
private readonly List<Type> _eventTypes;
//定义事件移除后事件
public event EventHandler<string> OnEventRemoved;
//构造函数初始化
public InMemoryEventBusSubscriptionsManager()
{
_handlers = new Dictionary<string, List<SubscriptionInfo>>();
_eventTypes = new List<Type>();
}
//添加动态类型事件订阅(需要手动指定事件名称)
public void AddDynamicSubscription<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler
{
DoAddSubscription(typeof(TH), eventName, isDynamic: true);
}
//添加强类型事件订阅(事件名称为事件源类型)
public void AddSubscription<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = GetEventKey<T>();
DoAddSubscription(typeof(TH), eventName, isDynamic: false);
if (!_eventTypes.Contains(typeof(T)))
{
_eventTypes.Add(typeof(T));
}
}
//移除动态类型事件订阅
public void RemoveDynamicSubscription<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler
{
var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);
DoRemoveHandler(eventName, handlerToRemove);
}
//移除强类型事件订阅
public void RemoveSubscription<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent
{
var handlerToRemove = FindSubscriptionToRemove<T, TH>();
var eventName = GetEventKey<T>();
DoRemoveHandler(eventName, handlerToRemove);
}
添加了这么一层抽象,即符合了单一职责原则,又完成了代码重用。
IEventBusSubscriptionsManager
的依赖,即可完成订阅管理。 你这里可能会好奇,为什么要暴露一个 EventBusRabbitMQ
源码亲密接触。
3.3.1. 构造函数定义
IRabbitMQPersistentConnection
以便连接到对应的Broke。
使用空对象模式注入
OnEventRemoved
事件,取消队列的绑定。(这也就回答了上面遗留的问题)
3.3.2. 事件订阅的逻辑:
public void Publish(IntegrationEvent @event)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
var policy = RetryPolicy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{
_logger.LogWarning(ex.ToString());
});
using (var channel = _persistentConnection.CreateModel())
{
var eventName = @event.GetType()
.Name;
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
var message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);
policy.Execute(() =>
{
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; // persistent
channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory:true, basicProperties: properties, body: body);
});
}
}
这里面有以下几个知识点:
使用Polly,以2的阶乘的时间间隔进行重试。(第一次2s后,第二次4s后,第三次8s后...重试)
使用direct全匹配、单播形式的路由机制进行消息分发
消息主体是格式化的json字符串
指定
mandatory:true
告知服务器当根据指定的routingKey和消息找不到对应的队列时,直接返回消息给生产者。
3.3.4. 然后看看事件消息的监听
Received事件委托处理消息接收事件
调用
以上代码主要包括以下知识点:
4. EventBus的集成和使用
以上介绍了EventBus的实现要点,那各个微服务是如何集成呢?
1. 注册
2. 注册单例模式的
services.AddSingleton<IEventBusSubscriptionsManager,InMemoryEventBusSubscriptionsManager>();
3. 注册单例模式的
完成了以上集成,就可以在代码中使用事件总线进行事件的发布和订阅。
4. 发布事件
若要发布事件,需要根据是否需要事件源(参数传递)来决定是否需要申明相应的集成事件,需要则继承自
IEventBus
的实例的
IIntegrationEventHandler
或
IEventBus
的实例调用
TestEvent
事件,B服务订阅该事件,同样需要在B服务复制定义一个 <code class="prettyprint code-in-text prettyprinted" style="box-sizing: border-box;background: rgb(243, 241, 241);color: rgb(88, 88, 88);line-height: 18px;font-family: consolas, menlo, courier, monospace, " initial="" microsoft="" !important;"="" 0px="">TestEvent
。 这也是微服务的一个通病,重复代码。
5. 最后
通过一步一步的源码梳理,我们发现eShopOnContainers中事件总线的总体实现思路与引言部分的介绍十分契合。所以对于事件总线,不要觉得高深,明确参与的几个角色以及基本的实现步骤,那么不管是基于RabbitMQ实现也好还是基于Azure Service Bus也好,万变不离其宗!
//定义事件处理
public class ProductPriceChangedIntegrationEventHandler : IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
{
public async Task Handle(ProductPriceChangedIntegrationEvent @event)
{
//do something
}
}
//事件源的声明
public class ProductPriceChangedIntegrationEvent : IntegrationEvent
{
public int ProductId { get; private set; }
public decimal NewPrice { get; private set; }
public decimal OldPrice { get; private set; }
public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, decimal oldPrice)
{
ProductId = productId;
NewPrice = newPrice;
OldPrice = oldPrice;
}
}
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
//...
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
Json字符串的反序列化
利用依赖注入容器解析集成事件(Integration Event)和事件处理(Event Handler)类型
反射调用具体的事件处理方法
private async Task ProcessEvent(string eventName, string message)
{
if (_subsManager.HasSubscriptionsForEvent(eventName))
{
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
{
var subscriptions = _subsManager.GetHandlersForEvent(eventName);
foreach (var subscription in subscriptions)
{
if (subscription.IsDynamic)
{
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
dynamic eventData = JObject.Parse(message);
await handler.Handle(eventData);
}
else
{
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
var handler = scope.ResolveOptional(subscription.HandlerType);
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
}
}
}
}
}