动手造轮子:实现一个简单的 EventBus


动手造轮子:实现一个简单的 EventBus

Intro

EventBus 是一种事件发布订阅模式,通过 EventBus 我们可以很方便的实现解耦,将事件的发起和事件的处理的很好的分隔开来,很好的实现解耦。微软官方的示例项目 EShopOnContainers 也有在使用 EventBus 。

这里的 EventBus 实现也是参考借鉴了微软 eShopOnContainers 项目。

EventBus 处理流程:

640?wx_fmt=png

微服务间使用 EventBus 实现系统间解耦:

640?wx_fmt=png

借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。

起初觉得 EventBus 和 MQ 其实差不多嘛,都是通过异步处理来实现解耦合,高性能。后来看到了下面这张图才算明白为什么要用 EventBus 以及 EventBus 和 MQ 之间的关系,EventBus 是抽象的,可以用 MQ 来实现 EventBus。

640?wx_fmt=png

为什么要使用 EventBus

  1. 解耦合(轻松的实现系统间解耦)

  2. 高性能可扩展(每一个事件都是简单独立且不可更改的对象,只需要保存新增的事件,不涉及其他的变更删除操作)

  3. 系统审计(每一个事件都是不可变更的,每一个事件都是可追溯的)

  4. ...

EventBus 整体架构:

  • IEventBase :所有的事件应该实现这个接口,这个接口定义了事件的唯一id EventId 和事件发生的事件 EventAt

640?wx_fmt=png

  • IEventHandler:定义了一个 Handle 方法来处理相应的事件

640?wx_fmt=png

  • IEventStore:所有的事件的处理存储,保存事件的 IEventHandler,一般不会直接操作,通过 EventBus 的订阅和取消订阅来操作 EventStore

640?wx_fmt=png

  • IEventBus:用来发布/订阅/取消订阅事件,并将事件的某一个 IEventHandler 保存到 EventStore 或从 EventStore 中移除

640?wx_fmt=png

使用示例

来看一个使用示例,完整代码示例:

internal class EventTest	
{	public static void MainTest()	{	var eventBus = DependencyResolver.Current.ResolveService<IEventBus>();	eventBus.Subscribe<CounterEvent, CounterEventHandler1>();	eventBus.Subscribe<CounterEvent, CounterEventHandler2>();	eventBus.Subscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();	eventBus.Publish(new CounterEvent { Counter = 1 });	eventBus.Unsubscribe<CounterEvent, CounterEventHandler1>();	eventBus.Unsubscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();	eventBus.Publish(new CounterEvent { Counter = 2 });	}	
}	
internal class CounterEvent : EventBase	
{	public int Counter { get; set; }	
}	
internal class CounterEventHandler1 : IEventHandler<CounterEvent>	
{	public Task Handle(CounterEvent @event)	{	LogHelper.GetLogger<CounterEventHandler1>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");	return Task.CompletedTask;	}	
}	
internal class CounterEventHandler2 : IEventHandler<CounterEvent>	
{	public Task Handle(CounterEvent @event)	{	LogHelper.GetLogger<CounterEventHandler2>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");	return Task.CompletedTask;	}	
}

具体实现

EventStoreInMemory 实现:

EventStoreInMemory 是 IEventStore 将数据放在内存中的实现,使用了 ConcurrentDictionary 以及 HashSet 来尽可能的保证高效,具体实现代码如下:

public class EventStoreInMemory : IEventStore	
{	private readonly ConcurrentDictionary<string, HashSet<Type>> _eventHandlers = new ConcurrentDictionary<string, HashSet<Type>>();	public bool AddSubscription<TEvent, TEventHandler>()	where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	{	var eventKey = GetEventKey<TEvent>();	if (_eventHandlers.ContainsKey(eventKey))	{	return _eventHandlers[eventKey].Add(typeof(TEventHandler));	}	else	{	return _eventHandlers.TryAdd(eventKey, new HashSet<Type>()	{	typeof(TEventHandler)	});	}	}	public bool Clear()	{	_eventHandlers.Clear();	return true;	}	public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase	{	if(_eventHandlers.Count == 0)	return  new Type[0];	var eventKey = GetEventKey<TEvent>();	if (_eventHandlers.TryGetValue(eventKey, out var handlers))	{	return handlers;	}	return new Type[0];	}	public string GetEventKey<TEvent>()	{	return typeof(TEvent).FullName;	}	public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase	{	if(_eventHandlers.Count == 0)	return false;	var eventKey = GetEventKey<TEvent>();	return _eventHandlers.ContainsKey(eventKey);	}	public bool RemoveSubscription<TEvent, TEventHandler>()	where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	{	if(_eventHandlers.Count == 0)	return false;	var eventKey = GetEventKey<TEvent>();	if (_eventHandlers.ContainsKey(eventKey))	{	return _eventHandlers[eventKey].Remove(typeof(TEventHandler));	}	return false;	}	
}

EventBusInMemory 的实现,从上面可以看到 EventStore 保存的是 IEventHandler 对应的 Type,在 Publish 的时候根据 Type 从 IoC 容器中取得相应的 Handler 即可,如果没有在 IoC 容器中找到对应的类型,则会尝试创建一个类型实例,然后调用 IEventHandlerHandle 方法,代码如下:

/// <summary>	
/// EventBus in memory	
/// </summary>	
public class EventBus : IEventBus	
{	private static readonly ILogHelperLogger Logger = Helpers.LogHelper.GetLogger<EventBus>();	private readonly IEventStore _eventStore;	private readonly IServiceProvider _serviceProvider;	public EventBus(IEventStore eventStore, IServiceProvider serviceProvider = null)	{	_eventStore = eventStore;	_serviceProvider = serviceProvider ?? DependencyResolver.Current;	}	public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase	{	if (!_eventStore.HasSubscriptionsForEvent<TEvent>())	{	return false;	}	var handlers = _eventStore.GetEventHandlerTypes<TEvent>();	if (handlers.Count > 0)	{	var handlerTasks = new List<Task>();	foreach (var handlerType in handlers)	{	try	{	if (_serviceProvider.GetServiceOrCreateInstance(handlerType) is IEventHandler<TEvent> handler)	{	handlerTasks.Add(handler.Handle(@event));	}	}	catch (Exception ex)	{	Logger.Error(ex, $"handle event [{_eventStore.GetEventKey<TEvent>()}] error, eventHandlerType:{handlerType.FullName}");	}	}	handlerTasks.WhenAll().ConfigureAwait(false);	return true;	}	return false;	}	public bool Subscribe<TEvent, TEventHandler>()	where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	{	return _eventStore.AddSubscription<TEvent, TEventHandler>();	}	public bool Unsubscribe<TEvent, TEventHandler>()	where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	{	return _eventStore.RemoveSubscription<TEvent, TEventHandler>();	}	
}

项目实例

来看一个实际的项目中的使用,在我的活动室预约项目中有一个公告的模块,访问公告详情页面,这个公告的访问次数加1,把这个访问次数加1改成了用 EventBus 来实现,实际项目代码:https://github.com/WeihanLi/ActivityReservation/blob/67e2cb8e92876629a7af6dc051745dd8c7e9faeb/ActivityReservation/Startup.cs

0. 定义 Event 以及 EventHandler


public class NoticeViewEvent : EventBase	
{	public Guid NoticeId { get; set; }	// UserId	// IP	// ...	
}	
public class NoticeViewEventHandler : IEventHandler<NoticeViewEvent>	
{	public async Task Handle(NoticeViewEvent @event)	{	await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>	{	var notice = await dbContext.Notices.FindAsync(@event.NoticeId);	notice.NoticeVisitCount += 1;	await dbContext.SaveChangesAsync();	});	}	
}

这里的 Event 只定义了一个 NoticeId ,其实也可以把请求信息如IP/UA等信息加进去,在 EventHandler里处理以便日后数据分析。

1. 册 EventBus 相关服务以及 EventHandlers

services.AddSingleton<IEventBus, EventBus>();	
services.AddSingleton<IEventStore, EventStoreInMemory>();	
//register EventHandlers	
services.AddSingleton<NoticeViewEventHandler>();


2. 订阅事件


public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IEventBus eventBus)	
{	eventBus.Subscribe<NoticeViewEvent, NoticeViewEventHandler>(); 	// ...	
}

3. 发布事件


eventBus.Publish(new NoticeViewEvent { NoticeId = notice.NoticeId });

Reference

  • https://github.com/dotnet-architecture/eShopOnContainers

  • https://docs.microsoft.com/zh-cn/previous-versions/msp-n-p/jj591559(v=pandp.10)

  • https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/integration-event-based-microservice-communications

  • https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/subscribe-events

  • https://github.com/sheng-jie/EventBus

  • https://github.com/WeihanLi/ActivityReservation

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/314807.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Codeforces Round #733 (Div. 1 + Div. 2) E. Minimax 分情况讨论 + 思维

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 给你一个串&#xff0c;你可以随意安排这个串&#xff0c;使得这个串的每个前缀的kmpkmpkmp数组最大值最小&#xff0c;定义为f(a)f(a)f(a)&#xff0c;并且字典序最小&#xff0c;输出安排之后的串。 n≤1e…

【Ynoi2011】成都七中【树论】【点分树】【离线】【树状数组】

题意&#xff1a;给一棵树&#xff0c;点有颜色&#xff0c;qqq 次询问&#xff0c;每次给定 l,r,xl,r,xl,r,x &#xff0c;求只保留编号在 [l,r][l,r][l,r] 中的点时点 xxx 所在连通块的颜色数。 所有数 ≤105\leq 10^5≤105 题目背景好评 首先所有颜色不同的话就是数连通块…

通过 nginx-proxy 实现自动反向代理和 HTTPS

本章节代码已经上传至 https://github.com/siegrainwong/.NET-Core-with-Docker/tree/master/Part3系列大纲这次我们讲第三篇&#xff1a;用 docker-compose 启动 WebApi 和 SQL Server在容器中集成 Skywalking APM通过 nginx-proxy 对 Portainer、Skywalking、WebApi 实现自动…

P4781 【模板】拉格朗日插值

传送门 把公式实现一下即可&#xff1a; 当xxx连续的时候可以优化为O(N)O(N)O(N)。 // Problem: P4781 【模板】拉格朗日插值 // Contest: Luogu // URL: https://www.luogu.com.cn/problem/P4781 // Memory Limit: 125 MB // Time Limit: 1000 ms // // Powered by CP Edi…

【HNOI/AHOI2018】毒瘤【容斥】【虚树/动态dp】

题意&#xff1a;nnn 个点 mmm 条边的连通无向图的独立集个数模 998244353998244353998244353。 n≤105,m≤n10n\leq 10^5,m\leq n10n≤105,m≤n10 为什么标题要把两个算法写一起&#xff1f;因为这两个东西在这类问题上是本质相同的&#xff0c;这也是写这篇博客的原因。 显…

MediatR-进程内的消息通信框架

MediatR是一款进程内的消息订阅、发布框架&#xff0c;提供了Send方法用于发布到单个处理程序、Publish方法发布到多个处理程序&#xff0c;使用起来非常方便。目前支持 .NET Framework4.5、.NET Stardand1.3、.NET Stardand2.0等版本&#xff0c;可跨平台使用。要在项目中使用…

Codeforces Round #586 (Div. 1 + Div. 2) D. Alex and Julian 数学 + 思维

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 给你一个无限个点的坐标轴&#xff0c;一个集合BBB&#xff0c;如果存在∣i−j∣bk|i-j|b_k∣i−j∣bk​的话&#xff0c;那么i,ji,ji,j之间就连边。现在问你至少要从集合BBB中去掉多少个数才能使得连完边之…

【十二省联考2019】字符串问题【后缀自动机】【拓扑排序】

题意&#xff1a;给一个字符串 SSS&#xff0c;以子串的形式给出一些 A 类串和 B 类串以及 mmm 对 A 类串支配 B 类串的关系。求一个总长度最长的 A 类串序列&#xff0c;使得每个串都存在一个 B 类串前缀被后一个串支配。无穷输出 −1-1−1。 ∣S∣,m≤2105|S|,m\leq 2\times …

Codeforces Round #586 (Div. 1 + Div. 2) B. Multiplication Table 思维 + 公式

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 给你一个n∗nn*nn∗n的矩阵&#xff0c;每个位置由ai∗aja_i*a_jai​∗aj​得来&#xff0c;主对角线为000&#xff0c;让你求出来aia_iai​。 n≤1e3n\le1e3n≤1e3 思路&#xff1a; 由公式ai,j∗ai,kaj,…

不好意思,这么久没有更新《从零开始掌握ASP.NET Core 》

点击上方蓝字&#xff0c;关注「我们」等了快个月了&#xff0c;终于开始更新了。因为感冒&#xff0c;弄的嗓子有点沙哑。所以停了半个月才是更新&#xff0c;目前一口气更新了12个章节&#xff0c;大家可以耐心观看内容了。《从零开始学ASP.NET Core 》-- 更新通知视频课程更…

【NOI2018】你的名字【后缀自动机】【可持久化线段树合并】【乱搞】

题意&#xff1a;给一个串 SSS&#xff0c;qqq 次询问&#xff0c;每次给定串 TTT 和 l,rl,rl,r &#xff0c;求有多少个本质不同的串是 TTT 的子串而不是 Sl…rS_{l\dots r}Sl…r​ 的子串。 ∣S∣≤5105,q≤105,∑∣T∣≤106|S|\leq 5\times 10^5,q\leq 10^5,\sum|T|\leq 10^…

2021牛客暑期多校训练营1 A.Alice and Bob 博弈 SG函数

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 有两堆石子&#xff0c;两个人每次可以进行如下操作&#xff1a;从某一堆狮子中拿出x(x>0)x(x>0)x(x>0)个&#xff0c;从另一堆石子中拿出s∗x(s>0)s*x(s>0)s∗x(s>0)个。谁不能操作谁输&…

【CTSC2010】珠宝商【后缀自动机】【点分治】【根号分治】

题意&#xff1a;给一棵 nnn 个点的树&#xff0c;每个点有个字符&#xff0c;另给一个长度为 mmm 的特征串&#xff0c;求树上 n2n^2n2 条有向路径在特征串中出现的次数之和。 n,m≤5104n,m\leq 5\times 10^4n,m≤5104 看到母串先建 SAM &#xff08;bushi 树上路径统计问题…

使用Azure云原生构建博客是怎样一种体验?(下篇)

点击上方蓝字关注“汪宇杰博客”接上篇《使用Azure云原生构建博客是怎样一种体验&#xff1f;&#xff08;上篇&#xff09;》DNSAzure DNS 是一套分布全球的域名解析服务。具有超高可用性和接近实时的记录更新及生效速度。我的博客也使用了这项服务。Azure 现在可以提供域名注…

2021牛客暑期多校训练营1 G Game of Swapping Numbers 思维 + 巧妙的转换

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 给你两个数组A,BA,BA,B&#xff0c;你可以选择AAA的两个位置i,j,i<ji,j,i<ji,j,i<j交换Ai,AjA_i,A_jAi​,Aj​&#xff0c;需要交换正好kkk次&#xff0c;问你最大的∑i1n∣Ai−Bi∣\sum_{i1}^n|A_…

.NET Core 3.0之深入源码理解HttpClientFactory(一)

写在前面创建HttpClient实例的时候&#xff0c;在内部会创建HttpMessageHandler链&#xff0c;我们知道HttpMessageHandler是负责建立连接的抽象处理程序&#xff0c;所以HttpClient的维护实际上就是维护HttpMessageHandler的使用&#xff0c;释放HttpClient并不会及时释放连接…

WTM 构建DotNetCore开源生态,坐而论道不如起而行之

作为一个8岁开始学习编程&#xff0c;至今40岁的老程序员&#xff0c;这辈子使用过无数种语言&#xff0c;从basic开始&#xff0c;到pascal, C, C&#xff0c;到后来的 java, c#,perl,php,再到现在流行的python。小时候的我总觉得多掌握一门语言&#xff0c;我的技术能力就又前…

【WC2014】时空穿梭【组合数】【莫比乌斯反演】【整除分块】【暴力多项式】

题意&#xff1a;TTT 组数据&#xff0c;给一个 nnn 维空间&#xff0c;第 iii 维大小为 [1,mi]∩Z[1,m_i]\cap \Z[1,mi​]∩Z&#xff0c;求大小为 ccc 的严格偏序上升的共线点集个数。答案模 100071000710007。 T≤100,n≤11,m≤105,c≤20T\leq 100,n\leq 11,m\leq 10^5,c\le…

2021牛客暑期多校训练营1 H Hash Function FFT\NTT

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 给你一个数组aaa&#xff0c;你需要找一个最小的模数xxx&#xff0c;使得aaa中每个数都模上xxx之后互不相同。 n≤5e5,ai≤5e5,ai!ajn\le5e5,a_i\le5e5,a_i!a_jn≤5e5,ai​≤5e5,ai​!aj​ 思路&#xff1a…

架构杂谈《六》

超时处理模式在服务化或者微服务架构里&#xff0c;传统的整体应用拆分成多个职责单一的微服务&#xff0c;微服务之间通过某种网络通信协议互相通信和交互&#xff0c;完成特定的功能&#xff0c;然而由于网络通信的不稳定&#xff0c;在设计系统时必须考虑到对网络通信的容错…