动手造轮子:实现简单的 EventQueue

动手造轮子:实现简单的 EventQueue

Intro

最近项目里有遇到一些并发的问题,想实现一个队列来将并发的请求一个一个串行处理,可以理解为使用消息队列处理并发问题,之前实现过一个简单的 EventBus,于是想在 EventBus 的基础上改造一下,加一个队列,改造成类似消息队列的处理模式。消息的处理(Consumer)直接使用 .netcore 里的 IHostedService 来实现了一个简单的后台任务处理。

初步设计

  • Event 抽象的事件

  • EventHandler 处理 Event 的方法

  • EventStore 保存订阅 Event 的 EventHandler

  • EventQueue 保存 Event 的队列

  • EventPublisher 发布 Event

  • EventConsumer 处理 Event 队列里的 Event

  • EventSubscriptionManager 管理订阅 Event 的 EventHandler

实现代码

EventBase 定义了基本事件信息,事件发生时间以及事件的id:

  1. public abstract class EventBase

  2. {

  3. [JsonProperty]

  4. public DateTimeOffset EventAt { get; private set; }

  5. [JsonProperty]

  6. public string EventId { get; private set; }

  7. protected EventBase()

  8. {

  9. this.EventId = GuidIdGenerator.Instance.NewId();

  10. this.EventAt = DateTimeOffset.UtcNow;

  11. }

  12. [JsonConstructor]

  13. public EventBase(string eventId, DateTimeOffset eventAt)

  14. {

  15. this.EventId = eventId;

  16. this.EventAt = eventAt;

  17. }

  18. }

EventHandler 定义:

  1. public interface IEventHandler

  2. {

  3. Task Handle(IEventBase @event);

  4. }

  5. public interface IEventHandler<in TEvent> : IEventHandler where TEvent : IEventBase

  6. {

  7. Task Handle(TEvent @event);

  8. }

  9. public class EventHandlerBase<TEvent> : IEventHandler<TEvent> where TEvent : EventBase

  10. {

  11. public virtual Task Handle(TEvent @event)

  12. {

  13. return Task.CompletedTask;

  14. }

  15. public Task Handle(IEventBase @event)

  16. {

  17. return Handle(@event as TEvent);

  18. }

  19. }

EventStore:

  1. public class EventStore

  2. {

  3. private readonly Dictionary<Type, Type> _eventHandlers = new Dictionary<Type, Type>();

  4. public void Add<TEvent, TEventHandler>() where TEventHandler : IEventHandler<TEvent> where TEvent : EventBase

  5. {

  6. _eventHandlers.Add(typeof(TEvent), typeof(TEventHandler));

  7. }

  8. public object GetEventHandler(Type eventType, IServiceProvider serviceProvider)

  9. {

  10. if (eventType == null || !_eventHandlers.TryGetValue(eventType, out var handlerType) || handlerType == null)

  11. {

  12. return null;

  13. }

  14. return serviceProvider.GetService(handlerType);

  15. }

  16. public object GetEventHandler(EventBase eventBase, IServiceProvider serviceProvider) =>

  17. GetEventHandler(eventBase.GetType(), serviceProvider);

  18. public object GetEventHandler<TEvent>(IServiceProvider serviceProvider) where TEvent : EventBase =>

  19. GetEventHandler(typeof(TEvent), serviceProvider);

  20. }

EventQueue 定义:

  1. public class EventQueue

  2. {

  3. private readonly ConcurrentDictionary<string, ConcurrentQueue<EventBase>> _eventQueues =

  4. new ConcurrentDictionary<string, ConcurrentQueue<EventBase>>();

  5. public ICollection<string> Queues => _eventQueues.Keys;

  6. public void Enqueue<TEvent>(string queueName, TEvent @event) where TEvent : EventBase

  7. {

  8. var queue = _eventQueues.GetOrAdd(queueName, q => new ConcurrentQueue<EventBase>());

  9. queue.Enqueue(@event);

  10. }

  11. public bool TryDequeue(string queueName, out EventBase @event)

  12. {

  13. var queue = _eventQueues.GetOrAdd(queueName, q => new ConcurrentQueue<EventBase>());

  14. return queue.TryDequeue(out @event);

  15. }

  16. public bool TryRemoveQueue(string queueName)

  17. {

  18. return _eventQueues.TryRemove(queueName, out _);

  19. }

  20. public bool ContainsQueue(string queueName) => _eventQueues.ContainsKey(queueName);

  21. public ConcurrentQueue<EventBase> this[string queueName] => _eventQueues[queueName];

  22. }

EventPublisher:

  1. public interface IEventPublisher

  2. {

  3. Task Publish<TEvent>(string queueName, TEvent @event)

  4. where TEvent : EventBase;

  5. }

  6. public class EventPublisher : IEventPublisher

  7. {

  8. private readonly EventQueue _eventQueue;

  9. public EventPublisher(EventQueue eventQueue)

  10. {

  11. _eventQueue = eventQueue;

  12. }

  13. public Task Publish<TEvent>(string queueName, TEvent @event)

  14. where TEvent : EventBase

  15. {

  16. _eventQueue.Enqueue(queueName, @event);

  17. return Task.CompletedTask;

  18. }

  19. }

EventSubscriptionManager:

  1. public interface IEventSubscriptionManager

  2. {

  3. void Subscribe<TEvent, TEventHandler>()

  4. where TEvent : EventBase

  5. where TEventHandler : IEventHandler<TEvent>;

  6. }

  7. public class EventSubscriptionManager : IEventSubscriptionManager

  8. {

  9. private readonly EventStore _eventStore;

  10. public EventSubscriptionManager(EventStore eventStore)

  11. {

  12. _eventStore = eventStore;

  13. }

  14. public void Subscribe<TEvent, TEventHandler>()

  15. where TEvent : EventBase

  16. where TEventHandler : IEventHandler<TEvent>

  17. {

  18. _eventStore.Add<TEvent, TEventHandler>();

  19. }

  20. }

EventConsumer:

  1. public class EventConsumer : BackgroundService

  2. {

  3. private readonly EventQueue _eventQueue;

  4. private readonly EventStore _eventStore;

  5. private readonly int maxSemaphoreCount = 256;

  6. private readonly IServiceProvider _serviceProvider;

  7. private readonly ILogger _logger;

  8. public EventConsumer(EventQueue eventQueue, EventStore eventStore, IConfiguration configuration, ILogger<EventConsumer> logger, IServiceProvider serviceProvider)

  9. {

  10. _eventQueue = eventQueue;

  11. _eventStore = eventStore;

  12. _logger = logger;

  13. _serviceProvider = serviceProvider;

  14. }

  15. protected override async Task ExecuteAsync(CancellationToken stoppingToken)

  16. {

  17. using (var semaphore = new SemaphoreSlim(Environment.ProcessorCount, maxSemaphoreCount))

  18. {

  19. while (!stoppingToken.IsCancellationRequested)

  20. {

  21. var queues = _eventQueue.Queues;

  22. if (queues.Count > 0)

  23. {

  24. await Task.WhenAll(

  25. queues

  26. .Select(async queueName =>

  27. {

  28. if (!_eventQueue.ContainsQueue(queueName))

  29. {

  30. return;

  31. }

  32. try

  33. {

  34. await semaphore.WaitAsync(stoppingToken);

  35. //

  36. if (_eventQueue.TryDequeue(queueName, out var @event))

  37. {

  38. var eventHandler = _eventStore.GetEventHandler(@event, _serviceProvider);

  39. if (eventHandler is IEventHandler handler)

  40. {

  41. _logger.LogInformation(

  42. "handler {handlerType} begin to handle event {eventType}, eventId: {eventId}, eventInfo: {eventInfo}",

  43. eventHandler.GetType().FullName, @event.GetType().FullName,

  44. @event.EventId, JsonConvert.SerializeObject(@event));

  45. try

  46. {

  47. await handler.Handle(@event);

  48. }

  49. catch (Exception e)

  50. {

  51. _logger.LogError(e, "event {eventId} handled exception", @event.EventId);

  52. }

  53. finally

  54. {

  55. _logger.LogInformation("event {eventId} handled", @event.EventId);

  56. }

  57. }

  58. else

  59. {

  60. _logger.LogWarning(

  61. "no event handler registered for event {eventType}, eventId: {eventId}, eventInfo: {eventInfo}",

  62. @event.GetType().FullName, @event.EventId,

  63. JsonConvert.SerializeObject(@event));

  64. }

  65. }

  66. }

  67. catch (Exception ex)

  68. {

  69. _logger.LogError(ex, "error running EventConsumer");

  70. }

  71. finally

  72. {

  73. semaphore.Release();

  74. }

  75. })

  76. );

  77. }

  78. await Task.Delay(50, stoppingToken);

  79. }

  80. }

  81. }

  82. }

为了方便使用定义了一个 Event 扩展方法:

  1. public static IServiceCollection AddEvent(this IServiceCollection services)

  2. {

  3. services.TryAddSingleton<EventStore>();

  4. services.TryAddSingleton<EventQueue>();

  5. services.TryAddSingleton<IEventPublisher, EventPublisher>();

  6. services.TryAddSingleton<IEventSubscriptionManager, EventSubscriptionManager>();

  7. services.AddSingleton<IHostedService, EventConsumer>();

  8. return services;

  9. }

使用示例

定义 PageViewEvent 记录请求信息:

  1. public class PageViewEvent : EventBase

  2. {

  3. public string Path { get; set; }

  4. }

这里作为示例只记录了请求的Path信息,实际使用可以增加更多需要记录的信息

定义 PageViewEventHandler,处理 PageViewEvent

  1. public class PageViewEventHandler : EventHandlerBase<PageViewEvent>

  2. {

  3. private readonly ILogger _logger;

  4. public PageViewEventHandler(ILogger<PageViewEventHandler> logger)

  5. {

  6. _logger = logger;

  7. }

  8. public override Task Handle(PageViewEvent @event)

  9. {

  10. _logger.LogInformation($"handle pageViewEvent: {JsonConvert.SerializeObject(@event)}");

  11. return Task.CompletedTask;

  12. }

  13. }

这个 handler 里什么都没做只是输出一个日志

这个示例项目定义了一个记录请求路径的事件以及一个发布请求记录事件的中间件

  1. // 发布 Event 的中间件

  2. app.Use(async (context, next) =>

  3. {

  4. var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();

  5. await eventPublisher.Publish("pageView", new PageViewEvent() { Path = context.Request.Path.Value });

  6. await next();

  7. });

Startup 配置:

  1. public void ConfigureServices(IServiceCollection services)

  2. {

  3. // ...

  4. services.AddEvent();

  5. services.AddSingleton<PageViewEventHandler>();// 注册 Handler

  6. }

  7. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.

  8. public void Configure(IApplicationBuilder app, IHostingEnvironment env, IEventSubscriptionManager eventSubscriptionManager)

  9. {

  10. eventSubscriptionManager.Subscribe<PageViewEvent, PageViewEventHandler>();

  11. app.Use(async (context, next) =>

  12. {

  13. var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();

  14. await eventPublisher.Publish("pageView", new PageViewEvent() { Path = context.Request.Path.Value });

  15. await next();

  16. });

  17. // ...

  18. }

使用效果:

More

注:只是一个初步设计,基本可以实现功能,还是有些不足,实际应用的话还有一些要考虑的事情

  1. Consumer 消息逻辑,现在的实现有些问题,我们的应用场景目前比较简单还可以满足,如果事件比较多就会而且每个事件可能处理需要的时间长短不一样,会导致在一个批次中执行的 Event 中已经完成的事件要等待其他还没完成的事件完成之后才能继续取下一个事件,理想的消费模式应该是各个队列相互独立,在同一个队列中保持顺序消费即可

  2. 上面示例的 EventStore 的实现只是简单的实现了一个事件一个 Handler 的处理情况,实际业务场景中很可能会有一个事件需要多个 Handler 的情况

  3. 这个实现是基于内存的,如果要在分布式场景下使用就不适用了,需要自己实现一下基于redis或者数据库的以满足分布式的需求

  4. and more...

上面所有的代码可以在 Github 上获取,示例项目 Github 地址:https://github.com/WeihanLi/AspNetCorePlayground/tree/master/TestWebApplication

Reference

  • https://github.com/WeihanLi/AspNetCorePlayground/tree/master/TestWebApplication/Event

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/312923.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【.NET Core 跨平台 GUI 开发】第二篇:Gtk# 布局入门,初识HBox 和 VBox

这是 Gtk# 系列博文的第二篇。在上一篇博文《编写你的第一个 Gtk# 应用》中&#xff0c;我们提到“一个 Gtk.Window 只能直接包含一个部件”。这意味着&#xff0c;在不做其他额外操作的情况下&#xff0c;如果你向一个 GtkWindow 中添加了一个 GtkLabel &#xff08;就像上一篇…

Java开发Web Service的几种解决方案

转自&#xff1a;http://blog.csdn.net/zolalad/article/details/25158995 Java开发中经常使用到的几种WebService技术实现方案 随着异构系统互联需求的不断增加&#xff0c;WebService的重要性也日益彰显出来。凭借webservice&#xff0c;我们可以实现基于不同程序语言的项目的…

【.NET Core 跨平台 GUI 开发】第一篇:编写你的第一个 Gtk# 应用

本文是【.NET Core 跨平台 GUI 开发】系列博文的第一篇。该系列博文是一个关于 Gtk# 跨平台应用开发的初级随笔集合。该随笔集合介绍了 GTK 和 Gtk# 的基本信息以及开发方法&#xff0c;并展示了如何使用 .NET Core 技术栈开发基于 Gtk# 的跨平台 GUI 程序。博文假设你已经对 C…

ASP.NET Core快速入门(第4章:ASP.NET Core HTTP介绍)--学习笔记

点击蓝字关注我们课程链接&#xff1a;http://video.jessetalk.cn/course/explore良心课程&#xff0c;大家一起来学习哈&#xff01;任务22&#xff1a;课程介绍1.HTTP 处理过程2.WebHost 的配置与启动3.Middleware 与管道4.Routing MiddleWare 介绍任务23&#xff1a;Http请求…

Java使用JWS API开发Web Service

JAX-WS&#xff0c;即Java API for XML Web Service&#xff0c;是Java开发基于SOAP协议的Web Service的标准。使用JWS API就可以直接开发简单的Web Service应用。 一、创建Web Service 打开Eclipse&#xff0c;新建一个Java Project&#xff0c;如下图所示&#xff1a; 新建了…

ASP.NET Core快速入门(第3章:依赖注入)--学习笔记

点击蓝字关注我们课程链接&#xff1a;http://video.jessetalk.cn/course/explore良心课程&#xff0c;大家一起来学习哈&#xff01;任务16&#xff1a;介绍1、依赖注入概念详解从UML和软件建模来理解从单元测试来理解2、ASP.NET Core 源码解析任务17&#xff1a;从UML角度来理…

使用wsimport命令创建Web Service客户端

一、wsimport简介 在jdk的bin文件夹中&#xff0c;有一个wsimport.exe工具。这个工具可以依据Web Service的描述文件wsdl生成相应的类文件&#xff0c;然后用这些类文件&#xff0c;被Web Service的客户端导入之后&#xff0c;就可以像调用本地的类一样调用WebService提供的方法…

读《持续交付2.0》

几年前看过《持续交付(发布可靠软件的系统方法)》&#xff0c;感触不是很深&#xff0c;最近看了这本书的译者乔梁编写的《持续交付2.0》&#xff0c;结合工作中的种种&#xff0c;又有一种相见恨晚的感觉。可见好书是需要经常翻阅的&#xff0c;每次都会带来新的收获和思考。全…

Java使用Apache CXF开发Web Service

转自&#xff1a;http://blog.csdn.net/hu_shengyang/article/details/38384597 以前工作中也用CXF,但都是用别人现成搭好的环境&#xff0c;这次自己重头搭建一遍环境。过程中也有遇到的问题&#xff0c;也做了简单的整理。 对于CXF是干什么用的&#xff0c;我不想多说&#x…

程序员修神之路--kubernetes是微服务发展的必然产物

菜菜哥&#xff0c;我昨天又请假出去面试了战况如何呀&#xff1f;多数面试题回答的还行&#xff0c;但是最后让我介绍微服务和kubernetes的时候&#xff0c;挂了话说微服务和kubernetes内容确实挺多的那你给我大体介绍一下呗可以呀&#xff0c;不过要请和coffee哦◆◆kubernet…

.NET core3.0 使用Jwt保护api

摘要&#xff1a;本文演示如何向有效用户提供jwt&#xff0c;以及如何在webapi中使用该token通过JwtBearerMiddleware中间件对用户进行身份认证。认证和授权区别&#xff1f;首先我们要弄清楚认证&#xff08;Authentication&#xff09;和授权&#xff08;Authorization&#…

Java ArrayList的实现原理详解

ArrayList是Java List类型的集合类中最常使用的&#xff0c;本文基于Java1.8&#xff0c;对于ArrayList的实现原理做一下详细讲解。 &#xff08;Java1.8源码&#xff1a;http://docs.oracle.com/javase/8/docs/api/&#xff09; 一、ArrayList实现原理总结 ArrayList的实现原…

.NET开发者的机遇与Web Blazor基础(有彩蛋)

一.唠唠WebAssembly的发展历程目前有很多支持WebAssembly的项目&#xff0c;但发展最快的是Blazor&#xff0c;这是一个构建单页面的.NET技术&#xff0c;目前已经从Preview版本升级到了beta版本&#xff0c;微软计划在2020年5月发布Blazor的第一个版本。Blazor是什么&#xff…

Java LinkedList的实现原理详解

LinkedList是Java List类型的集合类的一种实现&#xff0c;此外&#xff0c;LinkedList还实现了Deque接口。本文基于Java1.8&#xff0c;对于LinkedList的实现原理做一下详细讲解。 &#xff08;Java1.8源码&#xff1a;http://docs.oracle.com/javase/8/docs/api/&#xff09…

知乎高赞:中国有哪些不错的开源软件产品?

点击蓝字“dotNET匠人”关注我哟加个“星标★”&#xff0c;每日 7:15&#xff0c;好文必达&#xff01;在知乎上&#xff0c;有个问题问“中国有什么拿得出手的开源软件产品&#xff08;在 GitHub 等社区受欢迎度较好的&#xff09;&#xff1f;”事实上&#xff0c;还不少呢~…

容器日志管理 (2) 开源日志管理方案 ELK/EFK

本篇已加入《.NET Core on K8S学习实践系列文章索引》&#xff0c;可以点击查看更多容器化技术相关系列文章。上一篇《容器日志管理&#xff08;1&#xff09;》中介绍了Docker自带的logs子命令以及其Logging driver&#xff0c;本篇将会介绍一个流行的开源日志管理方案ELK/EFK…

关于Scrum起源,读这一篇论文就足够啦!《新新产品开发游戏》

关于Scrum的起源&#xff0c;我们经常会提到1986年发表在HBR上的一篇论文&#xff0c;《The New New Product Development Game》&#xff0c;今天我们把它重新翻译&#xff0c;一起重温为何Scrum会如此设置3355&#xff1f;为何会用橄榄球的术语来代表Scrum&#xff1f;The Ne…

Java HashMap的实现原理详解

HashMap是Java Map类型的集合类中最常使用的&#xff0c;本文基于Java1.8&#xff0c;对于HashMap的实现原理做一下详细讲解。 &#xff08;Java1.8源码&#xff1a;http://docs.oracle.com/javase/8/docs/api/&#xff09; 一、HashMap实现原理总结 HashMap的实现原理总结如下…

ASP.NET Core快速入门(第5章:认证与授权)--学习笔记

点击蓝字关注我们课程链接&#xff1a;http://video.jessetalk.cn/course/explore良心课程&#xff0c;大家一起来学习哈&#xff01;任务31&#xff1a;课时介绍1.Cookie-based认证与授权2.Cookie-based认证实现3.Jwt认证与授权介绍4.Jwt认证与授权实现5.Jwt认证与授权6.Role …

Java HashSet的实现原理详解

HashSet是Java Map类型的集合类中最常使用的&#xff0c;本文基于Java1.8&#xff0c;对于HashSet的实现原理做一下详细讲解。 &#xff08;Java1.8源码&#xff1a;http://docs.oracle.com/javase/8/docs/api/&#xff09; 一、HashSet实现原理总结 HashSet的实现原理总结如下…