C# 如何实现一个事件总线

EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。

它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。

首先,我们有两个基本的约束接口:IEventIAsyncEventHandler<TEvent>

IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler<TEvent>是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。

其中,Publish<TEvent>PublishAsync<TEvent>方法用于发布事件,而OnSubscribe<TEvent>方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类LocalEventBusManager<TEvent>。它实现了ILocalEventBusManager<TEvent>接口,用于在单一管道内处理本地事件。它使用了一个Channel<TEvent>来存储事件,并提供了发布事件的方法PublishPublishAsync。此外,它还提供了一个自动处理事件的方法AutoHandle

总的来说Event Bus提供了一种方便的方式来实现组件之间的松耦合通信。

通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。

这种机制可以提高代码的可维护性和可扩展性。

Github仓库地址:https://github.com/DonPangPang/soda-event-bus

实现一些基本约束

先实现一些约束,实现IEvent约束事件,实现IAsyncEvnetHandler<TEvent> where TEvent:IEvent来约束事件的处理程序。

public interface IEvent
{}public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{Task HandleAsync(IEvent @event);void HandleException(IEvent @event, Exception ex);
}

接下来规定一下咱们的IEventBus,会有哪些操作方法。基本就是发布和订阅。

public interface IEventBus
{void Publish<TEvent>(TEvent @event) where TEvent : IEvent;Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;void OnSubscribe<TEvent>() where TEvent : IEvent;
}

实现一个本地事件总线

本地事件处理

本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager即本地事件管理,第二种是LocalEventBusPool池化本地事件。

LocalEvnetBusManager

LocalEventBusManager主要在单一管道内进行处理,集中进行消费。

public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{void Publish(TEvent @event);Task PublishAsync(TEvent @event) ;void AutoHandle();
}public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>where TEvent: IEvent
{readonly IServiceProvider _servicesProvider = serviceProvider;private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();public void Publish(TEvent @event){Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");_eventChannel.Writer.WriteAsync(@event);}private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();}public async Task PublishAsync(TEvent @event){await _eventChannel.Writer.WriteAsync(@event);}public void AutoHandle(){// 确保只启动一次if (!Cts.IsCancellationRequested) return;Task.Run(async () =>{while (!Cts.IsCancellationRequested){var reader = await _eventChannel.Reader.ReadAsync();await HandleAsync(reader);}}, Cts.Token);}async Task HandleAsync(TEvent @event){var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler is null){throw new NullReferenceException($"No handler for event {@event.GetType().Name}");}try{await handler.HandleAsync(@event);}catch (Exception ex){handler.HandleException( @event, ex);}}
}
LocalEventBusPool

LocalEventBusPool即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。

public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{private readonly IServiceProvider _serviceProvider = serviceProvider;private class ChannelKey{public required string Key { get; init; }public int Subscribers { get; set; }public override bool Equals(object? obj){if (obj is ChannelKey key){return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);}return false;}public override int GetHashCode(){return 0;}}private Channel<IEvent> Rent(string channel){_channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(new ChannelKey() { Key = channel }, value);return value;}private Channel<IEvent> Rent(ChannelKey channelKey){_channels.TryGetValue(channelKey, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(channelKey, value);return value;}private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();_channels.Clear();Cts.TryReset();}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);}public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{Rent(typeof(TEvent).Name).Writer.TryWrite(@event);}public void OnSubscribe<TEvent>() where TEvent : IEvent{var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??new ChannelKey() { Key = typeof(TEvent).Name };channelKey.Subscribers++;Task.Run(async () =>{try{while (!Cts.IsCancellationRequested){var @event = await ReadAsync(channelKey);var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");try{await handler.HandleAsync((TEvent)@event);}catch (Exception ex){handler.HandleException((TEvent)@event, ex);}}}catch (Exception e){throw new InvalidOperationException("Error on onSubscribe handler", e);}}, Cts.Token);}private async Task<IEvent> ReadAsync(string channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}private async Task<IEvent> ReadAsync(ChannelKey channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}
}
LocalEventBus

实现LocalEventBus继承自IEventBus即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。

public interface ILocalEventBus: IEventBus
{}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{private  LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.Publish(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.Publish(@event);}}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");await EventBusPool.PublishAsync(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");await manager.PublishAsync(@event);}}public void OnSubscribe<TEvent>() where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.OnSubscribe<TEvent>();}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.AutoHandle();}}
}

分布式事件总线

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/687831.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

上位机图像处理和嵌入式模块部署(cmake的使用)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 过去我们编写windows程序的时候&#xff0c;习惯上都是直接使用visual studio创建工程开发。而开发linux程序的时候&#xff0c;则是编写好c、cpp代…

MySQL 基础知识(九)之视图

目录 1 视图的介绍 2 视图算法 3 创建视图 4 查看视图结构 5 修改视图 6 删除视图 7 参考文档 1 视图的介绍 视图是一张并不存储数据的虚拟表&#xff0c;其本质是根据 SQL 语句动态查询数据库中的数据。数据库中只存放了视图的定义&#xff0c;通过 SQL 语句使用视图时…

去掉图片水印但是不伤原图?看完这些方法就知道了

小伙伴们&#xff0c;你们是不是经常在网上找一些好看的图片作为壁纸呢&#xff1f;有时候会遇到一些带着平台水印的图片&#xff0c;是不是觉得不太美观呢&#xff1f;别着急&#xff0c;其实我们可以使用一些去水印软件来将这些水印去除掉&#xff0c;让图片更加美观。那么&a…

Graph + LLM图数据库技术如何助力行业大语言模型应用落地

随着 AI 人工智能技术的迅猛发展和自然语言处理领域的研究日益深入&#xff0c;如何构建强大的大语言模型对于企业来说愈发重要&#xff0c;尤其是在特定行业领域中。 图数据库作为处理复杂数据结构的有力工具&#xff0c;为企业构建行业大语言模型提供了强大的支持。本文将探…

腾讯云4核8G服务器配置性能测评,2024更新

4核8G服务器支持多少人同时在线访问&#xff1f;阿腾云的4核8G服务器可以支持20个访客同时访问&#xff0c;关于4核8G服务器承载量并发数qps计算测评&#xff0c;云服务器上运行程序效率不同支持人数在线人数不同&#xff0c;公网带宽也是影响4核8G服务器并发数的一大因素&…

扫描电子显微镜(SEM)样品制备要求与方法解析

扫描电子显微镜&#xff08;Scanning Electron Microscope&#xff0c;简称SEM&#xff09;是一种强大的分析工具&#xff0c;广泛应用于材料科学、生物学、医学、半导体材料和化学化工等领域。SEM能够提供高分辨率的表面形貌图像&#xff0c;因此样品制备成为获取准确、清晰图…

Python第十七章(继承)

继承&#xff1a;子类继承父类的所有方法和属性 一。单继承&#xff1a;一个子类继承一个父类 注释&#xff1a;B是子类&#xff0c;继承了A的函数方法&#xff0c;当调用B时候&#xff0c;会同时使用A中的全部方法&#xff0c;object类是顶级类或者基类&#xff0c;其他子类叫…

IP定位技术助力网络安全保护

随着网络技术的不断发展&#xff0c;网络安全问题日益凸显&#xff0c;如何有效保护网络安全已成为亟待解决的问题。IP定位技术作为一种前沿的网络安全防护手段&#xff0c;正在逐步成为网络安全保护的重要工具。 首先&#xff0c;我们要明确什么是IP定位技术。IP定位技术是一…

express如何挂载前端项目

如何在express的web服务器中放置前端项目呢&#xff1f; 或者说express如何挂载前端打包后的dist目录&#xff1f; 三个步骤&#xff1a; ①导入path const path require(path);②静态托管当前目录 app.use(express.static(path.join(__dirname, dist)));目的是访问到dist目…

ES6的重要特性

1. 块级作⽤域&#xff1a;引⼊ let 和 const 关键字&#xff0c;允许在块级作⽤域中声明变量&#xff0c;解决了变量提升和作⽤域污染的问题。 2. 箭头函数&#xff1a;使⽤箭头( > )定义函数&#xff0c;简化了函数的书写&#xff0c;并且⾃动绑定了 this 。 3. 模板字…

6.s081 学习实验记录(七)Multithreading

文章目录 一、Uthread: switching between threads简介提示实验代码实验结果 二、Using threads简介实验代码 三、Barrier简介实验代码实验结果 一、Uthread: switching between threads 简介 切换到 thread 分支 git fetchgit checkout threadmake clean 实现用户态线程的…

从Unity到Three.js(动态创建mesh)

js var let const基础 手动创建模型mesh功能测试&#xff0c;此功能跑通就可以实现很多功能了&#xff0c;如点云转mesh&#xff0c;磨碎效果等等。 import * as THREE from three;const scene new THREE.Scene(); const camera new THREE.PerspectiveCamera(60, window.in…

Python学习路线图

防止忘记&#xff0c;温故知新 进阶路线

使用手持激光三维扫描仪进行建筑立面测量需要注意些什么?

在进行采集作业前&#xff0c;首先需对作业区域进行实地勘察。对于某些有设计感、结构较为特殊的建筑物&#xff0c;若不提前对作业区域勘探&#xff0c;直接进行采集工作&#xff0c;往往会漏掉建筑物的某些结构特征&#xff0c;造成返工。对于建筑物结构相对简单的场景&#…

LLM(2)之指令提示词(Prompt)基础教学

LLM(2)之指令提示词 Author&#xff1a;Once Day Date&#xff1a;2024年2月15日 全系列专栏请查看:LLM实践成长_Once_day的博客-CSDN博客 参考文章&#xff1a; 中文完整版全9集ChatGPT提示工程师&#xff5c;AI大神吴恩达教你写提示词ChatGPT Shortcut - 简单易用的 Chat…

WordPress主题YIA移动端文章页的面包屑不显示怎么办?

平时我们一般都会在文章页导航菜单下方显示面包屑&#xff0c;类似于“当前位置&#xff1a;boke112百科 WordPress 正文”。平时用浏览器调试站点的时候&#xff0c;在Edge浏览器的“切换设备仿真”中&#xff0c;不管是选择什么设备都会显示面包屑。具体如下图所示&#xf…

数据库数据加密的 4 种常见思路的对比

应用层加解密方案数据库前置处理方案磁盘存取环节&#xff1a;透明数据加密DB 后置处理 最近由于工作需要&#xff0c;我对欧洲的通用数据保护条例做了调研和学习&#xff0c;其中有非常重要的一点&#xff0c;也是常识性的一条&#xff0c;就是需要对用户的个人隐私数据做好加…

【Java程序设计】【C00252】基于Springboot的实习管理系统(有论文)

基于Springboot的实习管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的实习管理系统 本系统分为前台功能模块、管理员功能模块、教师功能模块、学生功能模块以及实习单位功能模块。 前台功能模块&#xf…

MySQL篇之索引创建与失效

一、索引创建的原则 1). 针对于数据量较大&#xff0c;且查询比较频繁的表建立索引。 2). 针对于常作为查询条件&#xff08;where&#xff09;、排序&#xff08;order by&#xff09;、分组&#xff08;group by&#xff09;操作的字段建立索引。 3). 尽量选择区分度高的列作…

Vue-router中使用pinia,const xxxStore = usexxxStore()报错

“getActivePinia()” was called but there was no active Pinia. Are you trying to use a store before calling “app.use(pinia)”? See https://pinia.vuejs.org/core-concepts/outside-component-usage.html for help. This will fail in production. at useStore (pin…