使用 C# 实现一个 Event Bus

使用 C# 实现一个 Event Bus

Event Bus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。它允许不同的组件通过发布和订阅事件来进行解耦和通信。

在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。

首先,我们有两个基本的约束接口:IEventIAsyncEventHandler<TEvent>。IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler<TEvent>是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。

接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。其中,Publish<TEvent>PublishAsync<TEvent>方法用于发布事件,而OnSubscribe<TEvent>方法用于订阅事件。

然后,我们看到一个实现了本地事件总线的类LocalEventBusManager<TEvent>。它实现了ILocalEventBusManager<TEvent>接口,用于在单一管道内处理本地事件。它使用了一个Channel<TEvent>来存储事件,并提供了发布事件的方法PublishPublishAsync。此外,它还提供了一个自动处理事件的方法AutoHandle

总的来说,Event Bus提供了一种方便的方式来实现组件之间的松耦合通信。通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。这种机制可以提高代码的可维护性和可扩展性。

Github仓库地址:soda-event-bus

实现一些基本约束

先实现一些约束,实现IEvent约束事件,实现IAsyncEvnetHandler<TEvent> where TEvent:IEvent来约束事件的处理程序。

public interface IEvent
{}public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{Task HandleAsync(IEvent @event);void HandleException(IEvent @event, Exception ex);
}

接下来规定一下咱们的IEventBus,会有哪些操作方法。基本就是发布和订阅。

public interface IEventBus
{void Publish<TEvent>(TEvent @event) where TEvent : IEvent;Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;void OnSubscribe<TEvent>() where TEvent : IEvent;
}

实现一个本地事件总线

本地事件处理

本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager即本地事件管理,第二种是LocalEventBusPool池化本地事件。

LocalEvnetBusManager

LocalEventBusManager主要在单一管道内进行处理,集中进行消费。

public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{void Publish(TEvent @event);Task PublishAsync(TEvent @event) ;void AutoHandle();
}public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>where TEvent: IEvent
{readonly IServiceProvider _servicesProvider = serviceProvider;private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();public void Publish(TEvent @event){Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");_eventChannel.Writer.WriteAsync(@event);}private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();}public async Task PublishAsync(TEvent @event){await _eventChannel.Writer.WriteAsync(@event);}public void AutoHandle(){// 确保只启动一次if (!Cts.IsCancellationRequested) return;Task.Run(async () =>{while (!Cts.IsCancellationRequested){var reader = await _eventChannel.Reader.ReadAsync();await HandleAsync(reader);}}, Cts.Token);}async Task HandleAsync(TEvent @event){var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler is null){throw new NullReferenceException($"No handler for event {@event.GetType().Name}");}try{await handler.HandleAsync(@event);}catch (Exception ex){handler.HandleException( @event, ex);}}
}

LocalEventBusPool

LocalEventBusPool即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。

public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{private readonly IServiceProvider _serviceProvider = serviceProvider;private class ChannelKey{public required string Key { get; init; }public int Subscribers { get; set; }public override bool Equals(object? obj){if (obj is ChannelKey key){return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);}return false;}public override int GetHashCode(){return 0;}}private Channel<IEvent> Rent(string channel){_channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(new ChannelKey() { Key = channel }, value);return value;}private Channel<IEvent> Rent(ChannelKey channelKey){_channels.TryGetValue(channelKey, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(channelKey, value);return value;}private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();_channels.Clear();Cts.TryReset();}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);}public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{Rent(typeof(TEvent).Name).Writer.TryWrite(@event);}public void OnSubscribe<TEvent>() where TEvent : IEvent{var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??new ChannelKey() { Key = typeof(TEvent).Name };channelKey.Subscribers++;Task.Run(async () =>{try{while (!Cts.IsCancellationRequested){var @event = await ReadAsync(channelKey);var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");try{await handler.HandleAsync((TEvent)@event);}catch (Exception ex){handler.HandleException((TEvent)@event, ex);}}}catch (Exception e){throw new InvalidOperationException("Error on onSubscribe handler", e);}}, Cts.Token);}private async Task<IEvent> ReadAsync(string channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}private async Task<IEvent> ReadAsync(ChannelKey channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}
}

LocalEventBus

实现LocalEventBus继承自IEventBus即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。

public interface ILocalEventBus: IEventBus
{}public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{private  LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.Publish(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.Publish(@event);}}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");await EventBusPool.PublishAsync(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");await manager.PublishAsync(@event);}}public void OnSubscribe<TEvent>() where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.OnSubscribe<TEvent>();}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.AutoHandle();}}
}

分布式事件总线

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/590968.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年山东省中职“网络安全”试题——B-3:Web安全之综合渗透测试

B-3&#xff1a;Web安全之综合渗透测试 服务器场景名称&#xff1a;Server2010&#xff08;关闭链接&#xff09; 服务器场景操作系统&#xff1a;"需要环境有问题加q" 使用渗透机场景Kali中的工具扫描服务器&#xff0c;通过扫描服务器得到web端口&#xff0c;登陆…

计算机专业个人简历范文(8篇)

HR浏览一份简历也就25秒左右&#xff0c;如果你连「好简历」都没有&#xff0c;怎么能找到好工作呢&#xff1f; 如果你不懂得如何在简历上展示自己&#xff0c;或者觉得怎么改简历都不出彩&#xff0c;那请你一定仔细读完。 互联网运营个人简历范文> 男 22 本科 AI简历…

【Pytorch】Pytorch或者CUDA版本不符合问题解决与分析

NVIDIA CUDA Toolkit Release Notes Package installation issues INSTALL PYTORCH 先声毒人&#xff1a;最好资料就是上面三份资料&#xff0c;可以通过官网明确的获取一手信息&#xff0c;你所遇到的99%的问题都可以找到&#xff0c;明确的解决方案&#xff0c;建议最好看…

linux sh 脚本文件换行错误

windows 写好的脚本到服务运行不起来&#xff0c;显示换行问题 因为 windwos 的换行和 linux 的换行风格不同 解决办法&#xff1a;在使用的文本编辑器中&#xff0c;修改格式为 unix 格式 以 notepad 为例&#xff0c;在编辑 -> 文档格式转换中设置格式为 Unix

fmincon函数的决策变量可以是二维矩阵,但不建议是高维矩阵

1&#xff09;二维矩阵代码 clear all clc% 定义目标函数 fun (x) sum(sum(x.^2));% 初始矩阵 x0 2 rand(2, 2);% 定义空的线性不等式约束 A []; b [];% 定义空的线性等式约束 Aeq []; beq [];% 定义变量的上下界 lb ones(2,2); ub [];% 使用 fmincon 求解 options …

LLM大语言模型(四):在ChatGLM3-6B中使用langchain

目录 背景准备工作工具添加LangChain 已实现工具Calculator、Weather Tool配置 自定义工具自定义kuakuawo Agent 多工具使用参考 背景 LangChain是一个用于开发由语言模型驱动的应用程序的框架。它使应用程序能够: 具有上下文意识&#xff1a;将语言模型与上下文源(提示指令&…

Rancher 单节点 docker 部署备份与恢复

Rancher 单节点 docker 部署备份与恢复 1. 备份集群 获取 rancher server 容器名&#xff0c;本例为 angry_aryabhata docker ps | grep rancher/rancher6a27b8634c80 rancher/rancher:v2.5.14 xxx angry_aryabhata停止容器 docker stop angry_aryabhata创建备…

基于Spring Boot的美妆分享系统:打造个性化推荐、互动社区与智能决策

基于Spring Boot的美妆分享系统&#xff1a;打造个性化推荐、互动社区与智能决策 1. 项目介绍2. 管理员功能2.1 美妆管理2.2 页面管理2.3 链接管理2.4 评论管理2.5 用户管理2.6 公告管理 3. 用户功能3.1 登录注册3.2 分享商品3.3 问答3.4 我的分享3.5 我的收藏夹 4. 创新点4.1 …

国标GB28181对接的时候如何配置服务端口和本地端口

目 录 一、国标GB28181对接需要配置的端口等参数 二、GB28181服务器端口的配置&#xff1a;SIP服务器端口 三、GB28181设备测端口的配置&#xff1a;本地SIP端口 &#xff08;一&#xff09;本地SIP端口配置的意义 &#xff08;二&#xf…

57.网游逆向分析与插件开发-游戏增加自动化助手接口-接管游戏的自动药水设定功能

内容来源于&#xff1a;易道云信息技术研究院VIP课 码云地址&#xff08;master分支&#xff09;&#xff1a;https://gitee.com/dye_your_fingers/sro_-ex.git 码云版本号&#xff1a;51307d6bf69f2f3c645c70d09f841f5e32da79b9 代码下载地址&#xff0c;在 SRO_EX 目录下&…

全志R128使用SPI驱动ST7789V1.47寸LCD

R128 平台提供了 SPI DBI 的 SPI TFT 接口&#xff0c;具有如下特点&#xff1a; Supports DBI Type C 3 Line/4 Line Interface ModeSupports 2 Data Lane Interface ModeSupports data source from CPU or DMASupports RGB111/444/565/666/888 video formatMaximum resoluti…

开源在线客服系统源码全端通吃:聊天记录云端实时保存 附带完整的搭建教程

随着互联网的普及和消费者对客户服务体验的要求提高&#xff0c;传统的电话客服已经不能满足用户的需求。企业需要一个更加便捷、高效、实时的在线客服系统来提供更好的客户服务。然而&#xff0c;市场上的许多在线客服系统要么功能不全&#xff0c;要么价格昂贵。在这种情况下…

速盾网络:2024欢迎用户来选择速盾

尊敬的用户&#xff1a; 新年伊始&#xff0c;速盾网络迎来了崭新的一年。在这个充满希望和机遇的时刻&#xff0c;我们由衷地欢迎您选择速盾&#xff0c;与我们一同踏上网络安全的旅程。 速盾网络一直致力于为用户提供卓越的网络安全解决方案&#xff0c;以应对不断演变的网…

Spring Boot 自动配置功能介绍

Spring Boot 自动配置功能介绍 Spring Boot 是一个流行的 Java 开发框架&#xff0c;它提供了许多便利的功能和工具&#xff0c;帮助开发者快速构建应用程序。其中一个最引人注目的特性是其强大的自动配置功能。 什么是自动配置&#xff1f; 在传统的 Java 开发中&#xff0…

系统监视工具 | htop

引言 Htop 是一个交互式的系统监视器&#xff0c;提供了更加直观和友好的界面来显示系统的资源使用情况。是 top 命令的替代品&#xff0c;具有更多的功能和更好的可视化效果。 Htop 最初由 Hisham Muhammad 开发&#xff0c;在 2004 年发布第一个版本。它的目标是提供一个更…

基于SpringBoot的宠物领养系统

文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 🍅文末获取源码联系🍅 项目介绍 基于SpringBoot的宠物领养系统,java项目…

jsp结合servlet

servlet配置 环境配置2023.12.31 idea配置搭建 创建一个普通的java项目 由于新版idea去除了add framework support的ui显示&#xff0c;可以在左边项目栏中使用快捷键shiftk或者setting中搜索add framework support在修改对应的快捷键 点击ok然后应该就是下面这样的结果 这里…

openCv读取外网URL链接图片

安装指定库 要使用OpenCV读取URL链接中的图像&#xff0c;你可以使用urllib库下载图像&#xff0c;并使用OpenCV对其进行处理。以下是一个简单的例子&#xff1a; 首先&#xff0c;确保你已经安装了OpenCV和urllib库&#xff0c;终端执行下面语句。 pip install opencv-pytho…

HarmonyOS应用开发-搭建开发环境

本文介绍如何搭建 HarmonyOS 应用的开发环境&#xff0c;介绍下载安装 DevEco Studio 开发工具和 SDK 的详细流程。华为鸿蒙 DevEco Studio 是面向全场景的一站式集成开发环境&#xff0c;面向全场景多设备&#xff0c;提供一站式的分布式应用开发平台&#xff0c;支持分布式多…

监控电脑的软件(无感知、无进程、无图标)

当下&#xff0c;电脑监控软件扮演着越来越重要的角色。然而&#xff0c;在探讨这一话题时&#xff0c;我们必须首先明确一个重要的前提&#xff1a;任何未经他人同意的监控行为都是不道德的&#xff0c;并且可能构成违法行为。因此&#xff0c;本文将专注于合法的、经过授权的…