将Abp默认事件总线改造为分布式事件总线

文章目录

  • 原理
    • 创建分布式事件总线
    • 实现自动订阅和事件转发
  • 使用
    • 启动Redis服务
    • 配置
    • 传递Abp默认事件
    • 传递自定义事件
  • 项目地址

原理

本地事件总线是通过Ioc容器来实现的。

IEventBus接口定义了事件总线的基本功能,如注册事件、取消注册事件、触发事件等。

Abp.Events.Bus.EventBus是本地事件总线的实现类,其中私有成员ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories是事件订阅表。通过维护事件订阅表来实现事件处理器的注册和取消注册。当对应类型的事件触发时,通过订阅表查找所有事件处理器,通过Ioc容器来获取处理器实例,然后通过反射来调用事件处理器的"HandleEvent"方法。

创建分布式事件总线

首先,我们需要一个分布式事件总线中间件,用来将事件从本地事件总线转发到分布式事件总线。常用的中间件有RabbitMQ、Kafka、Redis等。

开源社区已经有实现好的库,本项目参考了 wuyi6216/Abp.RemoteEventBus

这里已经定义好了一个分布式事件总线接口


public interface IDistributedEventBus : IDisposable
{void MessageHandle(string topic, string message);void Publish(IDistributedEventData eventData);void Subscribe(string topic);void Unsubscribe(string topic);void UnsubscribeAll();
}

为了兼容本地事件总线,我们需要定义一个分布式事件总线接口,继承自IEventBus接口。


public interface IMultipleEventBus : IDistributedEventBus, IEventBus
{}

实现自动订阅和事件转发

当注册本地事件时,将订阅分布式事件,事件Topic为类型的字符串表现形式

public IDisposable Register(Type eventType, IEventHandlerFactory factory)
{GetOrCreateHandlerFactories(eventType);List<IEventHandlerFactory> currentLists;if (_handlerFactories.TryGetValue(eventType, out currentLists)){lock (currentLists){if (currentLists.Count == 0){//Register to distributed eventthis.Subscribe(eventType.ToString());}currentLists.Add(factory);}}return new FactoryUnregistrar(this, eventType, factory);
}

创建TriggerRemote,此方法用于将本地事件参数打包成为分布式事件消息payload,并发布该消息

public void TriggerRemote(Type eventType, object eventSource, IEventData eventData)
{var exceptions = new List<Exception>();eventData.EventSource = eventSource;try{var payloadDictionary = new Dictionary<string, object>{{ PayloadKey, eventData }};var distributedeventData = new DistributedEventData(eventType.ToString(), payloadDictionary);Publish(distributedeventData);}catch (Exception ex){exceptions.Add(ex);}if (exceptions.Any()){if (exceptions.Count == 1){exceptions[0].ReThrow();}throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions);}
}

当触发本地事件时,将消息转发至分布式事件总线。
在Trigger方法中调用TriggerRemote,事件状态回调和事件异常回调将不会被转发。

if (!(typeof(DistributedEventBusEvent) == eventType|| typeof(DistributedEventBusEvent).IsAssignableFrom(eventType)|| typeof(DistributedEventMessageHandleExceptionData) == eventType|| typeof(DistributedEventHandleExceptionData) == eventType))
{if (typeof(DistributedEventArgs) != eventType){TriggerRemote(eventType, eventSource, eventData);}
}

在消费端接收到分布式事件消息时,从Topic中解析类型,转发给本地事件。若此类型在本地事件注册过,则将消息反序列化为本地事件参数,然后触发本地事件。
本地事件处理器将触发最终的处理方法。


public virtual void MessageHandle(string topic, string message)
{Logger.Debug($"Receive message on topic {topic}");try{var eventData = _remoteEventSerializer.Deserialize<DistributedEventData>(message);var eventArgs = new DistributedEventArgs(eventData, topic, message);Trigger(this, new DistributedEventBusHandlingEvent(eventArgs));if (!string.IsNullOrEmpty(eventData.Type)){string pattern = @"(.*?)\[(.*?)\]";Match match = Regex.Match(eventData.Type, pattern);if (match.Success){var type = match.Groups[1].Value;var type2 = match.Groups[2].Value;var localTriggerType = typeFinder.Find(c => c.FullName == type).FirstOrDefault();var genericType = typeFinder.Find(c => c.FullName == type2).FirstOrDefault();if (localTriggerType != null && genericType != null){if (localTriggerType.GetTypeInfo().IsGenericType&& localTriggerType.GetGenericArguments().Length == 1&& !genericType.IsAbstract && !genericType.IsInterface){var localTriggerGenericType = localTriggerType.GetGenericTypeDefinition().MakeGenericType(genericType);if (eventData.Data.TryGetValue(PayloadKey, out var payload)){var payloadObject = (payload as JObject).ToObject(localTriggerGenericType);Trigger(localTriggerGenericType, this, (IEventData)payloadObject);}}}}else{var localTriggerType = typeFinder.Find(c => c.FullName == eventData.Type).FirstOrDefault();if (localTriggerType != null && !localTriggerType.IsAbstract && !localTriggerType.IsInterface){if (eventData.Data.TryGetValue(PayloadKey, out var payload)){var payloadObject = (payload as JObject).ToObject(localTriggerType);Trigger(localTriggerType, this, (IEventData)payloadObject);}}}Trigger(this, new DistributedEventBusHandledEvent(eventArgs));}}catch (Exception ex){Logger.Error("Consume remote message exception", ex);Trigger(this, new DistributedEventMessageHandleExceptionData(ex, topic, topic));}
}

使用

DistributedEventBus有不同的实现方式,这里以Redis为例

启动Redis服务

下载Redis并启动服务,使用默认端口6379

配置

生产者和消费者端都需要配置分布式事件总线

首先引用Abp.DistributedEventBus.Redis,并配置Abp模块依赖

[DependsOn(typeof(AbpDistributedEventBusRedisModule))]

在PreInitialize方法中配置Redis连接信息

 Configuration.Modules.DistributedEventBus().UseRedis().Configure(setting =>{setting.Server = "127.0.0.1:6379";});

用MultipleEventBus替换Abp默认事件总线

 //todo: 事件总线Configuration.ReplaceService(typeof(IEventBus),() => IocManager.IocContainer.Register(Component.For<IEventBus>().ImplementedBy<MultipleEventBus>()));

传递Abp默认事件

我们知道在使用仓储时,Abp会自动触发一些事件,如创建、更新、删除等。我们来测试这些事件是否能通过分布式事件总线来传递。

定义一个实体类,用于传递实体的增删改事件。


public class Person : FullAuditedEntity<int>
{public string Name { get; set; }public int Age { get; set; }public string PhoneNumber { get; set; }}

在消费者端,定义一个事件处理器,用于处理实体的增删改事件。


public class RemoteEntityChangedEventHandler :IEventHandler<EntityUpdatedEventData<Person>>,IEventHandler<EntityCreatedEventData<Person>>,IEventHandler<EntityDeletedEventData<Person>>,ITransientDependency
{void IEventHandler<EntityUpdatedEventData<Person>>.HandleEvent(EntityUpdatedEventData<Person> eventData){var person = eventData.Entity;Console.WriteLine($"Remote Entity Updated - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");}void IEventHandler<EntityCreatedEventData<Person>>.HandleEvent(EntityCreatedEventData<Person> eventData){var person = eventData.Entity;Console.WriteLine($"Remote Entity Created - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");}void IEventHandler<EntityDeletedEventData<Person>>.HandleEvent(EntityDeletedEventData<Person> eventData){var person = eventData.Entity;Console.WriteLine($"Remote Entity Deleted - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");}
}

在生产者端,用IRepository对实体进行增删改操作。


var person = new Person()
{Name = "John",Age = 36,PhoneNumber = "18588888888"};personRepository.Insert(person);var person2 = new Person()
{Name = "John2",Age = 36,PhoneNumber = "18588888889"};
personRepository.Insert(person2);var persons = personRepository.GetAllList();
foreach (var p in persons)
{p.Age += 1;personRepository.Update(p);Console.WriteLine($"Entity Updated - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");}
foreach (var p in persons)
{personRepository.Delete(p);Console.WriteLine($"Entity Deleted - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");}

运行程序(同时运行消费者端和生产者端),可以看到消费者端打印出了实体的增删改事件。

在这里插入图片描述

注意:

分布式事件总线在两个独立系统间传递事件,所以需要定义一个共同的类型对象,用于事件参数的传递。
因此消费者端需要引用生产者端的模块,以便获取共同的类型对象。

public override Assembly[] GetAdditionalAssemblies()
{var clientModuleAssembly = typeof(Person).GetAssembly();return [clientModuleAssembly];
}

传递自定义事件

定义NotificationEventData,用于传递自定义事件。


public class NotificationEventData : EventData
{public int Id { get; set; }public string Title { get; set; }public string Message { get; set; }public bool IsRead { get; set; }
}

在消费者端,定义一个事件处理器,用于处理自定义事件。

public class NotificationEventHandler :IEventHandler<NotificationEventData>,      ITransientDependency
{void IEventHandler<NotificationEventData>.HandleEvent(NotificationEventData eventData){Console.WriteLine($"Id: {eventData.Id}");Console.WriteLine($"Title: {eventData.Title}");Console.WriteLine($"Message: {eventData.Message}");Console.WriteLine($"IsRead: {eventData.IsRead}");}
}

在生产者端,触发自定义事件。

var eventBus = IocManager.Instance.Resolve<IEventBus>();eventBus.Trigger<NotificationEventData>(new NotificationEventData()
{Title = "Hi",Message = "Customized definition event test!",Id = 100,IsRead = true,
});

运行程序(同时运行消费者端和生产者端),可以看到消费者端打印出了自定义事件。

在这里插入图片描述

项目地址

Github:DistributedEventBus

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/234402.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LTD256次升级 |一分钟创建小程序官网 • 官网内容可在小程序分享

1、 商品关联表单支持上传图片&#xff1b; 2、 「我的咨询」新增快捷添加入口&#xff1b; 3、 极速官微新增官网内容分享页面&#xff1b;优化创建流程&#xff1b; 4、 极速官微支持编辑方式添加文章与产品&#xff1b; 5、 极速官微新增数据枢纽入口与网站设置页面&#xf…

(9)Linux Git的介绍以及缓冲区

&#x1f4ad; 前言 本章我们先对缓冲区的概念进行一个详细的探究&#xff0c;之后会带着大家一步步去编写一个简陋的 "进度条" 小程序。最后我们来介绍一下 Git&#xff0c;着重讲解一下 Git 三板斧&#xff0c;一般只要掌握三板斧就基本够用了。 缓冲区&#xff…

邮政快递单号查询入口,标记需要的单号记录

批量查询邮政快递单号的物流信息&#xff0c;对需要的单号记录进行标记。 所需工具&#xff1a; 一个【快递批量查询高手】软件 邮政快递单号若干 操作步骤&#xff1a; 步骤1&#xff1a;运行【快递批量查询高手】软件&#xff0c;并登录 步骤2&#xff1a;点击主界面左上角…

一文教你提高写代码效率,程序员别错过!

首先&#xff0c;每个程序员都是会利用工具的人&#xff0c;也有自己囊里私藏的好物。独乐乐不如众乐乐&#xff0c;今天笔者整理了 3 个辅助我们写代码的黑科技&#xff0c;仅供参考。如果你有更好的工具&#xff0c;欢迎评论区分享。 1、Google/Stackoverflow——搜索解决方…

Guideline 2.3.2 - Performance - Accurate Metadata问题如何解决

当您的应用程序在苹果应用商城审核过程中被拒绝时&#xff0c;苹果会向您发送一封邮件&#xff0c;其中提供了关于拒绝原因的详细信息。本文将指导您如何正确处理Guideline 2.3.2 - Performance - Accurate Metadata问题&#xff0c;并提供解决方案&#xff0c;以确保您的应用程…

RK3568 android11 调试mipi摄像头 gc2093

一&#xff0c;摄像头简介 GC2093是一个高质量的1080P CMOS图像传感器&#xff0c;用于安全相机产品、数码相机产品和手机相机应用程序。包含了一个1920H x 1080V像素阵列、片上10位ADC和图像信号处理器。高性能和低功耗功能的全面集成使GC2093最适合设计&#xff0c;减少了实…

安全运营之安全加固和运维

安全运营是一个将技术、流程和人有机结合的复杂系统工程&#xff0c;通过对已有安全产品、工具和服务产出的数据进行有效的分析&#xff0c;持续输出价值&#xff0c;解决安全问题&#xff0c;以确保网络安全为最终目标。 安全加固和运维是网络安全运营中的两个重要方面。 安全…

【Proteus仿真】【Arduino单片机】视力保护仪

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真Arduino单片机控制器&#xff0c;使LCD1602液晶&#xff0c;DS18B20温度传感器、按键、蜂鸣器、继电器开关、HC05蓝牙模块等。 主要功能&#xff1a; 系统运行后&#xff0c;LCD16…

js键盘事件keydown事件,防止重复触发,组合键的配合使用

js键盘事件keydown事件&#xff0c;防止重复触发 键盘事件类型主要有三种&#xff1a; keydown 、keypress(不建议使用&#xff0c;部分浏览器已放弃)和 keyup 。 添加普通键盘keydown事件 // 监听键盘按下事件document.addEventListener(keydown, function(event) {// 输出按…

搭建动态网站之——基于Redhat8.6搭建Discuz论坛

目录 一、动态网站与静态网站区别 1、提供用户互动接口的动态网站 2、搭建动态网站的需求&#xff1a; 二、搭建步骤 第一步&#xff1a;www服务器配置 第二步;编辑网页文件 第三步&#xff1a;使用xftp 将Discuz包传到/discuz解压 1、将Discuz包移动到/discuz 2、解压…

ArkTS @Observed、@ObjectLink状态装饰器的使用

作用 Observed、ObjectLink装饰器用于在涉及嵌套对象或者数组元素为对象的场景中进行双向数据同步。 状态的使用 1.嵌套对象 我们将父类设置为Observed状态&#xff0c;这个时候&#xff0c;子应该设置ObjectLink才能完成数据的双向绑定&#xff0c;所以我们构建一个组件&…

C语言插入排序算法及代码

一、原理 在待排序的数组里&#xff0c;从数组的第二个数字开始&#xff0c;通过构建有序序列&#xff0c;对于未排序数据&#xff0c;在已排序序列中从后向前扫描&#xff0c;找到相应位置并插入。 二、代码部分 #include<stdio.h> #include<stdlib.h> int ma…

Android的基础开发

基础开发 listView ListView就是列表条目&#xff0c;可以向下滚动&#xff0c;也可以点击。 首先设置两个视图布局 activity_main2.xml【充当容器{ListView}】 <ListViewandroid:layout_width"match_parent"android:layout_height"match_parent"a…

AWS向量数据库Amazon OpenSearch Service使用测评

前言 在大模型盛行的当今&#xff0c;选择适宜的数据库显得尤为重要。因为你需要面对海量训练数据&#xff0c;快速的检索至关紧要&#xff0c;以及对于存储的要求也是至关重要的。对于海量的数据查询和存储是需要巨大的算力支持。向量数据库常用在一些图像文本或者视频的生成…

【大数据存储与处理】实验二 HBase 过滤器操作

实验二 HBase 过滤器操作 【实验目的】&#xff1a; 1.掌握使用 HBase 过滤器进行全表扫描。 【实验内容与要求】&#xff1a; 在 HBase 中&#xff0c;Get 和 Scan 操作都可以使用过滤器来设置输出的范围&#xff0c;类似于 SQL 里面 的 Where 查询条件。使用 show_filte…

【VScode和Leecode的爱恨情仇】command ‘leetcode.signin‘ not found

文章目录 一、关于command ‘leetcode.signin‘ not found的问题二、解决方案第一&#xff0c;没有下载Nodejs&#xff1b;第二&#xff0c;有没有在VScode中配置Nodejs第三&#xff0c;力扣的默认在VScode请求地址中请求头错误首先搞定配置其次搞定登入登入方法一&#xff1a;…

obswebsocket+douyinAPI+python,教你如何三步搭建自己的AI美女直播间,24小时的永动机

一&#xff1a;什么是AI直播美女直播间 就是在直播的时候通过弹幕进行选择不同的ai人物进行跳舞的直播间大致就是 可以看到左边是有提示&#xff0c;根据观众刷礼物的不同进行选择某一个AI人物进行展示&#xff0c;怎么通过技术手段实现呢 二&#xff1a;你需要懂的 其实还…

vmware安装银河麒麟V10高级服务器操作系统

vmware安装银河麒麟V10高级服务器操作系统 1、下载银河麒麟V10镜像2、VMware安装银河麒麟V10高级服务器操作系统2.1、新建虚拟机2.2、安装虚拟机 3、配置银河麒麟V10高级服务器操作系统3.1、安装vmware tools3.2、配置静态IP地址 和 dns3.3、查看磁盘分区 1、下载银河麒麟V10镜…

AI工具网站汇总——学习的好帮手

一、聊天AI 1.ChatGPT 地表最强AI聊天机器人 网址&#xff1a;https://chat.openai.com 2.Anthropic Anthropic发布的与ChatGPT竞争的聊天机器人 网址&#xff1a;https://www.anthropic.com 3.文心一言 百度全新知识增强大语言模型&#xff01;国产聊天机器人 网址&…

so-vits-svc的使用

1. 启动工程 找到工程的路径&#xff0c;找到启动的bat文件&#xff0c;这里以 d:/so-vits-svc为例。 2. 启动过程 启动后会出现cmp的一个弹框&#xff0c;初始启动相对较慢&#xff0c;请耐心等待一会儿&#xff0c;启动完成后&#xff0c;会出现一个页面&#xff0c;如下…