动手造轮子:实现一个简单的 EventBus


动手造轮子:实现一个简单的 EventBus

Intro

EventBus 是一种事件发布订阅模式,通过 EventBus 我们可以很方便的实现解耦,将事件的发起和事件的处理的很好的分隔开来,很好的实现解耦。微软官方的示例项目 EShopOnContainers 也有在使用 EventBus 。

这里的 EventBus 实现也是参考借鉴了微软 eShopOnContainers 项目。

EventBus 处理流程:

640?wx_fmt=png

微服务间使用 EventBus 实现系统间解耦:

640?wx_fmt=png

借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。

起初觉得 EventBus 和 MQ 其实差不多嘛,都是通过异步处理来实现解耦合,高性能。后来看到了下面这张图才算明白为什么要用 EventBus 以及 EventBus 和 MQ 之间的关系,EventBus 是抽象的,可以用 MQ 来实现 EventBus。

640?wx_fmt=png

为什么要使用 EventBus

  1. 解耦合(轻松的实现系统间解耦)

  2. 高性能可扩展(每一个事件都是简单独立且不可更改的对象,只需要保存新增的事件,不涉及其他的变更删除操作)

  3. 系统审计(每一个事件都是不可变更的,每一个事件都是可追溯的)

  4. ...

EventBus 整体架构:

  • IEventBase :所有的事件应该实现这个接口,这个接口定义了事件的唯一id EventId 和事件发生的事件 EventAt

640?wx_fmt=png

  • IEventHandler:定义了一个 Handle 方法来处理相应的事件

640?wx_fmt=png

  • IEventStore:所有的事件的处理存储,保存事件的 IEventHandler,一般不会直接操作,通过 EventBus 的订阅和取消订阅来操作 EventStore

640?wx_fmt=png

  • IEventBus:用来发布/订阅/取消订阅事件,并将事件的某一个 IEventHandler 保存到 EventStore 或从 EventStore 中移除

640?wx_fmt=png

使用示例

来看一个使用示例,完整代码示例:

internal class EventTest	
{	public static void MainTest()	{	var eventBus = DependencyResolver.Current.ResolveService<IEventBus>();	eventBus.Subscribe<CounterEvent, CounterEventHandler1>();	eventBus.Subscribe<CounterEvent, CounterEventHandler2>();	eventBus.Subscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();	eventBus.Publish(new CounterEvent { Counter = 1 });	eventBus.Unsubscribe<CounterEvent, CounterEventHandler1>();	eventBus.Unsubscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();	eventBus.Publish(new CounterEvent { Counter = 2 });	}	
}	
internal class CounterEvent : EventBase	
{	public int Counter { get; set; }	
}	
internal class CounterEventHandler1 : IEventHandler<CounterEvent>	
{	public Task Handle(CounterEvent @event)	{	LogHelper.GetLogger<CounterEventHandler1>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");	return Task.CompletedTask;	}	
}	
internal class CounterEventHandler2 : IEventHandler<CounterEvent>	
{	public Task Handle(CounterEvent @event)	{	LogHelper.GetLogger<CounterEventHandler2>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");	return Task.CompletedTask;	}	
}

具体实现

EventStoreInMemory 实现:

EventStoreInMemory 是 IEventStore 将数据放在内存中的实现,使用了 ConcurrentDictionary 以及 HashSet 来尽可能的保证高效,具体实现代码如下:

public class EventStoreInMemory : IEventStore	
{	private readonly ConcurrentDictionary<string, HashSet<Type>> _eventHandlers = new ConcurrentDictionary<string, HashSet<Type>>();	public bool AddSubscription<TEvent, TEventHandler>()	where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	{	var eventKey = GetEventKey<TEvent>();	if (_eventHandlers.ContainsKey(eventKey))	{	return _eventHandlers[eventKey].Add(typeof(TEventHandler));	}	else	{	return _eventHandlers.TryAdd(eventKey, new HashSet<Type>()	{	typeof(TEventHandler)	});	}	}	public bool Clear()	{	_eventHandlers.Clear();	return true;	}	public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase	{	if(_eventHandlers.Count == 0)	return  new Type[0];	var eventKey = GetEventKey<TEvent>();	if (_eventHandlers.TryGetValue(eventKey, out var handlers))	{	return handlers;	}	return new Type[0];	}	public string GetEventKey<TEvent>()	{	return typeof(TEvent).FullName;	}	public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase	{	if(_eventHandlers.Count == 0)	return false;	var eventKey = GetEventKey<TEvent>();	return _eventHandlers.ContainsKey(eventKey);	}	public bool RemoveSubscription<TEvent, TEventHandler>()	where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	{	if(_eventHandlers.Count == 0)	return false;	var eventKey = GetEventKey<TEvent>();	if (_eventHandlers.ContainsKey(eventKey))	{	return _eventHandlers[eventKey].Remove(typeof(TEventHandler));	}	return false;	}	
}

EventBusInMemory 的实现,从上面可以看到 EventStore 保存的是 IEventHandler 对应的 Type,在 Publish 的时候根据 Type 从 IoC 容器中取得相应的 Handler 即可,如果没有在 IoC 容器中找到对应的类型,则会尝试创建一个类型实例,然后调用 IEventHandlerHandle 方法,代码如下:

/// <summary>	
/// EventBus in memory	
/// </summary>	
public class EventBus : IEventBus	
{	private static readonly ILogHelperLogger Logger = Helpers.LogHelper.GetLogger<EventBus>();	private readonly IEventStore _eventStore;	private readonly IServiceProvider _serviceProvider;	public EventBus(IEventStore eventStore, IServiceProvider serviceProvider = null)	{	_eventStore = eventStore;	_serviceProvider = serviceProvider ?? DependencyResolver.Current;	}	public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase	{	if (!_eventStore.HasSubscriptionsForEvent<TEvent>())	{	return false;	}	var handlers = _eventStore.GetEventHandlerTypes<TEvent>();	if (handlers.Count > 0)	{	var handlerTasks = new List<Task>();	foreach (var handlerType in handlers)	{	try	{	if (_serviceProvider.GetServiceOrCreateInstance(handlerType) is IEventHandler<TEvent> handler)	{	handlerTasks.Add(handler.Handle(@event));	}	}	catch (Exception ex)	{	Logger.Error(ex, $"handle event [{_eventStore.GetEventKey<TEvent>()}] error, eventHandlerType:{handlerType.FullName}");	}	}	handlerTasks.WhenAll().ConfigureAwait(false);	return true;	}	return false;	}	public bool Subscribe<TEvent, TEventHandler>()	where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	{	return _eventStore.AddSubscription<TEvent, TEventHandler>();	}	public bool Unsubscribe<TEvent, TEventHandler>()	where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	{	return _eventStore.RemoveSubscription<TEvent, TEventHandler>();	}	
}

项目实例

来看一个实际的项目中的使用,在我的活动室预约项目中有一个公告的模块,访问公告详情页面,这个公告的访问次数加1,把这个访问次数加1改成了用 EventBus 来实现,实际项目代码:https://github.com/WeihanLi/ActivityReservation/blob/67e2cb8e92876629a7af6dc051745dd8c7e9faeb/ActivityReservation/Startup.cs

0. 定义 Event 以及 EventHandler


public class NoticeViewEvent : EventBase	
{	public Guid NoticeId { get; set; }	// UserId	// IP	// ...	
}	
public class NoticeViewEventHandler : IEventHandler<NoticeViewEvent>	
{	public async Task Handle(NoticeViewEvent @event)	{	await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>	{	var notice = await dbContext.Notices.FindAsync(@event.NoticeId);	notice.NoticeVisitCount += 1;	await dbContext.SaveChangesAsync();	});	}	
}

这里的 Event 只定义了一个 NoticeId ,其实也可以把请求信息如IP/UA等信息加进去,在 EventHandler里处理以便日后数据分析。

1. 册 EventBus 相关服务以及 EventHandlers

services.AddSingleton<IEventBus, EventBus>();	
services.AddSingleton<IEventStore, EventStoreInMemory>();	
//register EventHandlers	
services.AddSingleton<NoticeViewEventHandler>();


2. 订阅事件


public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IEventBus eventBus)	
{	eventBus.Subscribe<NoticeViewEvent, NoticeViewEventHandler>(); 	// ...	
}

3. 发布事件


eventBus.Publish(new NoticeViewEvent { NoticeId = notice.NoticeId });

Reference

  • https://github.com/dotnet-architecture/eShopOnContainers

  • https://docs.microsoft.com/zh-cn/previous-versions/msp-n-p/jj591559(v=pandp.10)

  • https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/integration-event-based-microservice-communications

  • https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/subscribe-events

  • https://github.com/sheng-jie/EventBus

  • https://github.com/WeihanLi/ActivityReservation

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/314807.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通过 nginx-proxy 实现自动反向代理和 HTTPS

本章节代码已经上传至 https://github.com/siegrainwong/.NET-Core-with-Docker/tree/master/Part3系列大纲这次我们讲第三篇&#xff1a;用 docker-compose 启动 WebApi 和 SQL Server在容器中集成 Skywalking APM通过 nginx-proxy 对 Portainer、Skywalking、WebApi 实现自动…

P4781 【模板】拉格朗日插值

传送门 把公式实现一下即可&#xff1a; 当xxx连续的时候可以优化为O(N)O(N)O(N)。 // Problem: P4781 【模板】拉格朗日插值 // Contest: Luogu // URL: https://www.luogu.com.cn/problem/P4781 // Memory Limit: 125 MB // Time Limit: 1000 ms // // Powered by CP Edi…

MediatR-进程内的消息通信框架

MediatR是一款进程内的消息订阅、发布框架&#xff0c;提供了Send方法用于发布到单个处理程序、Publish方法发布到多个处理程序&#xff0c;使用起来非常方便。目前支持 .NET Framework4.5、.NET Stardand1.3、.NET Stardand2.0等版本&#xff0c;可跨平台使用。要在项目中使用…

不好意思,这么久没有更新《从零开始掌握ASP.NET Core 》

点击上方蓝字&#xff0c;关注「我们」等了快个月了&#xff0c;终于开始更新了。因为感冒&#xff0c;弄的嗓子有点沙哑。所以停了半个月才是更新&#xff0c;目前一口气更新了12个章节&#xff0c;大家可以耐心观看内容了。《从零开始学ASP.NET Core 》-- 更新通知视频课程更…

使用Azure云原生构建博客是怎样一种体验?(下篇)

点击上方蓝字关注“汪宇杰博客”接上篇《使用Azure云原生构建博客是怎样一种体验&#xff1f;&#xff08;上篇&#xff09;》DNSAzure DNS 是一套分布全球的域名解析服务。具有超高可用性和接近实时的记录更新及生效速度。我的博客也使用了这项服务。Azure 现在可以提供域名注…

2021牛客暑期多校训练营1 G Game of Swapping Numbers 思维 + 巧妙的转换

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 给你两个数组A,BA,BA,B&#xff0c;你可以选择AAA的两个位置i,j,i<ji,j,i<ji,j,i<j交换Ai,AjA_i,A_jAi​,Aj​&#xff0c;需要交换正好kkk次&#xff0c;问你最大的∑i1n∣Ai−Bi∣\sum_{i1}^n|A_…

.NET Core 3.0之深入源码理解HttpClientFactory(一)

写在前面创建HttpClient实例的时候&#xff0c;在内部会创建HttpMessageHandler链&#xff0c;我们知道HttpMessageHandler是负责建立连接的抽象处理程序&#xff0c;所以HttpClient的维护实际上就是维护HttpMessageHandler的使用&#xff0c;释放HttpClient并不会及时释放连接…

WTM 构建DotNetCore开源生态,坐而论道不如起而行之

作为一个8岁开始学习编程&#xff0c;至今40岁的老程序员&#xff0c;这辈子使用过无数种语言&#xff0c;从basic开始&#xff0c;到pascal, C, C&#xff0c;到后来的 java, c#,perl,php,再到现在流行的python。小时候的我总觉得多掌握一门语言&#xff0c;我的技术能力就又前…

架构杂谈《六》

超时处理模式在服务化或者微服务架构里&#xff0c;传统的整体应用拆分成多个职责单一的微服务&#xff0c;微服务之间通过某种网络通信协议互相通信和交互&#xff0c;完成特定的功能&#xff0c;然而由于网络通信的不稳定&#xff0c;在设计系统时必须考虑到对网络通信的容错…

【BZOJ4543】Hotel加强版【神仙树形dp】【长链剖分】

题意&#xff1a;给一棵 nnn 个点的树&#xff0c;求两两距离相等的三元组个数。 n≤105n\leq 10^5n≤105 显然相当于是找一个点到这三个点距离相等。子树内和子树外到当前点的距离为某个值的点的个数可以长链剖分快速得到&#xff0c;但统计答案非常棘手。 接下来是个鬼才想…

基于surging 的stage组件设计,谈谈我眼中的微服务

一、前言surging 开源地址&#xff1a;https://github.com/dotnetcore/surging随着业务的发展&#xff0c;并发量的增多&#xff0c;业务的复杂度越来越大&#xff0c;对于系统架构能力要求越来越高&#xff0c;这时候微服务的设计思想应运而生&#xff0c;但是对于微服务需要引…

HDU - 6971 K - I love max and multiply sosdp

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 思路&#xff1a; 直接求i&j>ki\And j>ki&j>k不是很好求&#xff0c;所以转换成i&jki\And jki&jk的情况。 考虑对a,ba,ba,b求一遍超集&#xff0c;让后从[0,n−1][0,n-1][0,n−1]扫…

推荐10个技术圈优质的公众号大号

公众号有很多但需要什么只有自己知道本次筛选了一批技术圈优质的公众号&#xff0c;主要与python、人工智能、机器学习、技术人生相关希望对你有所帮助!▼★长按二维码&#xff0c;选择“识别二维码”进行关注。▲长按二维码&#xff0c;识别关注简介&#xff1a;Python爱好者社…

使用Kubeadm创建k8s集群之部署规划(三十一)

前言 上一篇我们讲述了使用Kubectl管理k8s集群&#xff0c;那么接下来&#xff0c;我们将使用kubeadm来启动k8s集群。部署k8s集群存在一定的挑战&#xff0c;尤其是部署高可用的k8s集群更是颇为复杂&#xff08;后续会讲&#xff09;。因此本教程会在部署的过程中穿插讲…

HDU - 6967 G I love data structure 线段树维护矩阵 + 细节

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 给你两个长度为nnn的数组a,ba,ba,b&#xff0c;你需要完成如下四种操作&#xff1a; 思路&#xff1a; 思路还是比较简单的&#xff0c;首先建一颗线段树&#xff0c;线段树中维护a,b,a2,b2,aba,b,a^2,b^…

荐读|属性与可直接访问的数据成员之间应该如何选

写在前面在书写C#代码的时候你是否有过这样的经历&#xff1a;经常混用属性以及公有的数据成员。毕竟他们的用法基本一致&#xff0c;对于使用来说好像没什么区别啊。其实我也经常使用类的公有的数据成员来定义一些常量&#xff0c;为了简单&#xff0c;在一些仅仅需要对外暴露…

2021牛客暑期多校训练营3 I Kuriyama Mirai and Exclusive Or 差分 + 二进制分治

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 给你一个数组aaa&#xff0c;让你实现以下两个操作之后输出数组aaa。 n≤6e5,ai≤230−1n\le6e5,a_i\le2^{30}-1n≤6e5,ai​≤230−1 思路&#xff1a; 下面介绍的思路清奇&#xff0c;反正我想不到。 对…

Lock VS Monitor

介绍介绍对开发人员来说&#xff0c;处理关键代码部分的多线程应用程序是非常重要的。Monitor和lock是c#语言中多线程应用程序中提供线程安全的方法(lock关键字的本质就是对Monitor的封装)。两者都提供了一种机制来确保只有一个线程同时执行代码&#xff0c;以避免代码功能被其…

程序员修神之路--做好分库分表其实很难之二

菜菜哥&#xff0c;上次听你给我讲了分库的情况后&#xff0c;我明白了很多&#xff0c;能再给我讲讲分表吗有收获就好&#xff0c;分表其实有很多情况和分库类似还有不一样的情况吗&#xff1f;有呀&#xff0c;本来数据库和表是不同层面的东西&#xff0c;肯定有差异那你给讲…

2021牛客暑期多校训练营3 B Black and white 最小生成树 + 思维

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 思路&#xff1a; 对于每个数的位置(i,j)(i,j)(i,j)&#xff0c;如果将这个位置染黑&#xff0c;那么我们连一个i−>jni->jni−>jn的边&#xff0c;可以发现我们的操作不影响连通性。如果想要全部染…