EventBus/EventQueue 再思考

EventBus/EventQueue 再思考

Intro

之前写过两篇文章,造轮子系列的 EventBus/ EventQueue,回想起来觉得当前的想法有点问题,当时对 EvenStore 可能有点误解,有兴趣可以参考 

动手造轮子:实现一个简单的 EventBus

动手造轮子:实现简单的 EventQueue,

最近把 Event 相关的逻辑做了一个重构,修改 EventStore,重新设计了 Event 相关的组件

重构后的 Event

  • Event: 事件的抽象定义

  • EventHandler:事件处理器抽象定义

  • EventHandlerFactory:事件处理器工厂,用来根据事件类型获取事件处理器(新增)

  • EventPublisher:事件发布器,用于事件发布

  • EventSubscriber:事件订阅器,用于管理事件的订阅

  • EventSubscriptionManager:事件订阅管理器,在 EventSubscriber 的基础上增加了一个根据事件类型获取事件订阅器类型的方法

  • EventBus:事件总线,由 EventPubliser 和 EventSubscriber 组合而成,用来比较方便的做事件发布和订阅

  • EventQueue:事件队列,希望某些消息顺序处理的时候可以考虑用 EventQueue 的模式

  • EventStore:事件存储,事件的持久化存储(在之前的版本里,EventStore 实际作用是一个 EventSubscriptionManager,在最近的版本更新中已修改)

以上 EventSubscriberEventSubscriptionManager 一般不直接用,一般用 EventBus 来处理即可

EventHandlerFactory

这次引入了 EventHandlerFactory 用来抽象获取 EventHandler 的逻辑,原来的设计里是在处理 Event 的时候获取 EventHandler 的类型,然后从依赖注入框架中获取或创建新的 event handler 实例之后再调用 EventHandler 的 Handle 方法处理事件,有一些冗余

使用 EventHandlerFactory 之后就可以直接获取一个 EventHandler 实例集合,具体是实例化还是从依赖注入中获取就由 EventHandlerFactory 来决定了,这样就可以对依赖注入很友好,对于基于内存的简单 EventBus 来说,在服务注册之后就不需要再调用 Subscribe 去显式订阅了,因为再注册服务的时候就已经隐式实现了订阅的逻辑,这样实际就不需要 EventSubscriptionManager 来管理订阅了,订阅信息都在依赖注入框架内部,比如说 CounterEvent,要获取它的订阅信息,我只需要从依赖注入框架中获取 IEventHandler<CounterEvent> 的实例即可,实际就代替了原先 “EventStoreInMemory”,现在的 EventSubscriptionManagerInMemory

基于依赖注入的 EventHandlerFactory 定义:

public sealed class DependencyInjectionEventHandlerFactory : IEventHandlerFactory
{private readonly IServiceProvider _serviceProvider;public DependencyInjectionEventHandlerFactory(IServiceProvider serviceProvider = null){_serviceProvider = serviceProvider ?? DependencyResolver.Current;}public ICollection<IEventHandler> GetHandlers(Type eventType){var eventHandlerType = typeof(IEventHandler<>).MakeGenericType(eventType);return _serviceProvider.GetServices(eventHandlerType).Cast<IEventHandler>().ToArray();}
}

如果不使用依赖注入,也可以根据 IEventSubscriptionManager 订阅信息来实现:

public sealed class DefaultEventHandlerFactory : IEventHandlerFactory
{private readonly IEventSubscriptionManager _subscriptionManager;private readonly ConcurrentDictionary<Type, ICollection<IEventHandler>> _eventHandlers = new ConcurrentDictionary<Type, ICollection<IEventHandler>>();private readonly IServiceProvider _serviceProvider;public DefaultEventHandlerFactory(IEventSubscriptionManager subscriptionManager, IServiceProvider serviceProvider = null){_subscriptionManager = subscriptionManager;_serviceProvider = serviceProvider ?? DependencyResolver.Current;}public ICollection<IEventHandler> GetHandlers(Type eventType){var eventHandlers = _eventHandlers.GetOrAdd(eventType, type =>{var handlerTypes = _subscriptionManager.GetEventHandlerTypes(type);var handlers = handlerTypes.Select(t => (IEventHandler)_serviceProvider.GetServiceOrCreateInstance(t)).ToArray();return handlers;});return eventHandlers;}
}

EventQueue Demo

来看一下 EventQueue 的示例,示例基于 asp.net core 的,定义了一个 HostedService 来实现一个 EventConsumer 来消费 EventQueue 中的事件信息

EventConsumer 定义如下:

public class EventConsumer : BackgroundService
{private readonly IEventQueue _eventQueue;private readonly IEventHandlerFactory _eventHandlerFactory;public EventConsumer(IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory){_eventQueue = eventQueue;_eventHandlerFactory = eventHandlerFactory;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){var queues = await _eventQueue.GetQueuesAsync();if (queues.Count > 0){await queues.Select(async q =>{var @event = await _eventQueue.DequeueAsync(q);if (null != @event){var handlers = _eventHandlerFactory.GetHandlers(@event.GetType());if (handlers.Count > 0){await handlers.Select(h => h.Handle(@event)).WhenAll();}}}).WhenAll();}await Task.Delay(1000, stoppingToken);}}
}

定义 PageViewEventPageViewEventHandler,用来记录和处理请求访问记录

public class PageViewEvent : EventBase
{
}
public class PageViewEventHandler : EventHandlerBase<PageViewEvent>
{public static int Count;public override Task Handle(PageViewEvent @event){Interlocked.Increment(ref Count);return Task.CompletedTask;}
}

事件很简单,事件处理也只是增加了 PageViewEventHandler 内定义的 Count。

服务注册:

// 注册事件核心组件
// 会注册 EventBus、EventHandlerFactory、EventQueue 等
services.AddEvents()// 注册 EventHanlder.AddEventHandler<PageViewEvent, PageViewEventHandler>();
// 注册 EventQueuePubliser,默认注册的 IEventPublisher 是 EventBus
services.AddSingleton<IEventPublisher, EventQueuePublisher>();
// 注册 EventConsumer
services.AddHostedService<EventConsumer>();

事件发布,定义了一个中间件来发布 PageViewEvent,定义如下:

// pageView middleware
app.Use((context, next) =>{var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();eventPublisher.Publish(new PageViewEvent());return next();});

然后定义一个接口来获取上面定义的 PageViewEventHandler 中的 Count

[Route("api/[controller]")]
public class EventsController : ControllerBase
{[HttpGet("pageViewCount")]public IActionResult Count(){return Ok(new { Count = PageViewEventHandler.Count });}
}

运行起来之后,访问几次接口,看上面的接口返回 Count 是否会增加,正常的话每访问一次接口就会增加 1,并发访问问题也不大,因为每个事件都是顺序处理的,即使并发访问也没有关系,事件发布之后,在队列里都是顺序处理的,这也就是引入事件队列的目的(好像上面的原子递增没什么用了...) 如果没看到了增加,稍等一会儿再访问试试,事件处理会迟到,但总会处理,毕竟是异步处理的,有些延迟很正常,而且上面我们还有一个 1s 的延迟

More

作者水平有限,如果上述有哪些不对的地方还望指出,万分感谢

Reference

  • https://github.com/WeihanLi/WeihanLi.Common/tree/dev/src/WeihanLi.Common/Event

  • https://github.com/WeihanLi/WeihanLi.Common/blob/dev/samples/AspNetCoreSample/Startup.cs

  • https://www.cnblogs.com/weihanli/p/implement-a-simple-event-bus.html

  • https://www.cnblogs.com/weihanli/p/implement-event-queue.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/309707.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络原理题+复习资料

1.试说明运输层在协议栈中的地位和作用&#xff0c;运输层的通信和网络层的通信有什么重要区别&#xff1f;为什么运输层是必不可少的&#xff1f; 答&#xff1a;运输层处于面向通信部分的最高层&#xff0c;同时也是用户功能中的最低层&#xff0c;向它上面的应用层提供服务…

德清租房软件测试,门头沟实习生出租房

10 图2室 65㎡苏州街海淀南路小区距4号线大兴线海淀黄庄地铁站步行438m来自经纪人: 陈伟建1天前8300元8 图1室 35㎡北太平庄花园路8号院距10号线牡丹园地铁站步行1122m来自经纪人: 陈泽科1天前4800元10 图1室 45㎡西北旺芳怡园距16号线西北旺地铁站步行1128m来自经…

[推荐]大量 Blazor 学习资源(一)

预警前言 / Introduction Blazor 是什么&#xff1f;Blazor 允许您使用 C# 而不是 JavaScript 构建交互式 Web UI。Blazor 应用由使用 C#、HTML 和 CSS 实现的可重用 Web UI 组件组成。客户端和服务器代码都用 C# 编写&#xff0c;允许您共享代码和库。???? 本文主要来给大…

网络原理往期考试题+部分详解+最终版

一&#xff0e;填空题&#xff1a; &#xff08;号代表出现次数&#xff0c;无则说明一次&#xff09; 1&#xff0e; 在采用电信号表达数据的系统中&#xff0c;数据有数字数据和__模拟数据__两种。 2. 国际标准化组织ISO提出的不基于特定机型、操作系统或公司的网络体系结…

计算机博士两篇一区两篇会议,本科博士联手!西电陈渤团队两篇论文被顶级会议录用...

第34届神经信息处理系统大会(Neural Information Processing Systems, NeurIPS&#xff0c;https://neurips.cc/)将于12月06日—12月12日&#xff0c;通过线上举行。该会议是跨学科的&#xff0c;主要包括人工智能和自然神经信息处理&#xff0c;代表着热门科研领域的最前沿&am…

从零开始实现 ASP.NET Core MVC 的插件式开发(七) - 问题汇总及部分问题解决方案...

标题&#xff1a;从零开始实现 ASP.NET Core MVC 的插件式开发(七) - 问题汇总及部分问题解决方案作者&#xff1a;Lamond Lu地址&#xff1a;https://www.cnblogs.com/lwqlun/p/12930713.html源代码&#xff1a;https://github.com/lamondlu/Mystique前景回顾从零开始实现 ASP…

C++实现顺序串(完整代码)

代码如下: #include<iostream> #include <cstring> #define _CRT_SECURE_NO_WARNINGS using namespace std;class String { public:String(){size 0;str new char[size 1];str[0] \0;}String(const String &obj){size obj.size;str new char[size 1];i…

投影仪硬件边缘融合服务器,带你了解投影融合的边缘融合显示技术

原标题&#xff1a;带你了解投影融合的边缘融合显示技术边缘融合显示系统是一个专业、复杂的视屏显示系统。在设计组建的时候务必考虑周密&#xff0c;消除各类不良因素。因为边缘融合系统建设具有相关器材多、系统连接复杂、易受环境因素干扰的特性&#xff0c;所以如果没有在…

Sql Server之旅——第六站 为什么都说状态少的字段不能建索引

我们在学sqlserver的时候&#xff0c;大多教科书和前辈们都说状态少的字段不要建索引&#xff0c;由此带来的开销还不如不建索引&#xff0c;但是这句话有多少人真的知道&#xff0c;或者说有多少人真的对此有比较深刻的理解&#xff0c;而不是听别人道听途说。。。这样记得快&…

概率论复习题+部分详解

概率论与数理统计练习题 1.假设检验中&#xff0c;显著性水平α\alphaα 限制&#xff08;第一类错误&#xff08;拒真错误&#xff09;#&#xff09;的概率 分析&#xff1a; &#xff08;1&#xff09;&#xff0e;原假设为真时拒绝原假设的概率不超过α &#xff08;2&…

【壹刊】Azure AD B2C(一)初识

一&#xff0c;引言&#xff08;上节回顾&#xff09;上一节讲到Azure AD的一些基础概念&#xff0c;以及如何运用 Azure AD 包含API资源&#xff0c;Azure AD 是微软提供的云端的身份标识和资源访问服务&#xff0c;帮助员工/用户/管理员访问一些外部资源和内部资源&#xff1…

英语期末复习unit 3-4课后习题第一题及背诵段落

unit 3 背诵段落&#xff1a; 2 When a recent college graduate came into my office not too long ago looking for a sales job, I asked him what he had done to prepare for the interview. He said he’d read something about us somewhere. 不久前一个新近毕业的大…

操作系统知识点总结+最终版

1、测试题要搞明白 点击可得测试题详解 2、操作系统的四个基本特征&#xff0c;基本功能 操作系统的目标:方便性、有效性、可扩充性、开放性。 操作系统的四大基本特征&#xff1a;1、并发2、共享3、虚拟4、异步&#xff1b; 操作系统的五大功能分别是处理器管理、存储器管理…

如何看云服务器性能,从存储速度看云服务器性能测试

阿 贝云提供免 费云服务器、免 费云虚拟主机&#xff0c;大家有兴趣的可以看看&#xff0c;物超所值喔。衡量存储性能一般看吞吐量(传输速度)和IOPS两个指标。吞吐量主要指大文件的连续读写速度&#xff0c;在大文件的复制、备份等场景适用&#xff0c;用“HD Tune专业版”中的…