MASA Framework - EventBus设计

概述

利用发布订阅模式来解耦不同架构层级,亦可用于解决隔离业务之间的交互

优点:

  • 松耦合

  • 横切关注点

  • 可测试性

  • 事件驱动

发布订阅模式

发布者通过调度中心将消息发送给订阅者。调度中心解决发布与订阅者之间的关系,保证消息可以送达订阅者手中。

发布者与订阅者互不相识,发布者只管向调度中心发布消息,而订阅者只关心自己订阅的消息类型

3edff410da963ab9d2b648e5ef70b632.png

多订阅者保序执行

在常见的发布订阅模式中,的确很少见到类似的说法。但在实际业务中我们会有类似的需求,一个消息由调度中心协调多个订阅者按照顺序执行消息,同时还可以将上一个订阅者处理过的消息传递给下一个订阅者。这样既可以保留发布订阅模式的特性,又有了顺序执行逻辑的特性。

一个小思考:如果EventBus的配置支持动态调整的话,是否业务的执行顺序也可以被动态排列组合?

换句话说它或许可以为进程内工作流提供了一个可能性

dcb388de56b385d189a9fb3861b81267.png

Event Sourcing(事件溯源)

一种事件驱动的架构模式,可以用于审计和溯源

  • 基于事件驱动架构

  • 以事件为事实

  • 业务数据由事件计算产生的视图,可以持久化也可以不持久化

CQRS(命令查询的责任分离)

CQRS是一种架构模式,能够使改变模型与查询模型的实现分离

604fd16011af2bab790e6fc332142a80.png

Event Sourcing & CQRS

事件溯源可以与CQRS很好的配合

  • 在Command Handler中持久化事件到Event Store的同时实时计算一个最终视图给View DB用于查询展示

  • 在Query中既可以通过View DB获取最新状态,也可以通过Event Store来重放事件来校验View或用于更严谨的业务

79361361836b39a5abd65f458b5c7bd3.png

Saga

Saga是一个长活事务被分解成可以交错运行的子事务集合。其中每个子事务都是一个保持数据库一致性的真实事务

  • 每个Saga由一系列sub-transaction Ti 组成

  • 每个Ti 都有对应的补偿动作Ci,补偿动作用于撤销Ti造成的结果

两种执行顺序

  • T1, T2, T3...[Tx retry]...,Tn

  • T1, T2, ..., Tj, Cj,..., C2, C1

两种恢复策略

  • backward recovery,向后恢复,补偿所有已完成的事务,如果任一子事务失败。即上面提到的第二种执行顺序,其中j是发生错误的sub-transaction,这种做法的效果是撤销掉之前所有成功的sub-transation,使得整个Saga的执行结果撤销

  • forward recovery,向前恢复,重试失败的事务,假设每个子事务最终都会成功。适用于必须要成功的场景,执行顺序是类似于这样的:T1, T2, ..., Tj(失败), Tj(重试),..., Tn,其中j是发生错误的sub-transaction。该情况下不需要Ci

BuildingBlocks的类视图

作为接口标准,BuildingBlocks中并没有过多的干涉实现方式,它只保留了最基础的功能流程限制,以达到最小EventBus的功能集合。至于最终是基于接口还是特性来实现订阅关系的,交还给Contrib自行决定。

事件

用于本地事件的发布/订阅

  • IEvent:事件接口,IEvent<TResult>为带返回值的基础事件接口

  • IEventHanldler<TEvent>:事件处理器接口,ISagaEventHandler<TEvent>为Saga的实现提供了基础接口要求

  • IMiddleware<TEvent>:中间件接口,允许在事件执行前挂载预处理动作和时间执行后的收尾动作

  • IEventBus:事件总线接口,用于发送事件,并提供订阅关系维护和附加功能执行

b347dd5e3cbe454dd53d389893dacca2.png

集成事件

用于跨进程事件的发布/订阅

  • IntegrationEventLog:集成事件日志,用于实现本地消息表的消息模型

  • IIntegrationEventLogService:集成事件日志服务接口

  • ITopic:发布/订阅的主题

  • IIntegrationEvent:集成事件接口

  • IIntegrationEventBus:集成事件总线,用于跨进程调用的事件总线

4f2ae2159a5f80f4aca3bf9f90783fec.png

CQRS

用于使改变模型与查询模型的实现分离

  • IQuery<TResult>:查询的接口

  • IQueryHandler<TCommand,TResult>:查询处理器接口

  • ICommand:可用于增删改等指令的接口

  • ICommandHandler<TCommand>:指令处理器接口

39c0a9e423945b5ee99d51a4d89d2194.png

Event Bus

要完成上述的这些功能,我们需要借助于EventBus,它需要有以下基础功能

  • 接收事件

  • 维护订阅关系

  • 转发事件

接收与转发事件

这两个功能其实可以合并为一个接口,由发布者调用Publish,再由Event Bus根据订阅关系转发即可

维护订阅关系

在.Net项目中,我们常见的用于扫描自动注册的方式是接口特性

MediatR支持接口的方式去扫描事件订阅关系,举个例子:IRequestHandler<,>

public class PingHandler : IRequestHandler<Ping, string>
{public Task<string> Handle(Ping request, CancellationToken cancellationToken){return Task.FromResult("Pong");}
}

如果你的代码洁癖程度没有高的离谱,或许你希望是这样

public class NetHandler : IRequestHandler<Ping, string>, IRequestHandler<Telnet, string>
{public Task<string> Handle(Ping request, CancellationToken cancellationToken){return Task.FromResult("Pong");}public Task<string> Handle(Telnet request, CancellationToken cancellationToken){return Task.FromResult("Success");}
}

看着好像还行?如果很多呢?

那有没有办法解决这个问题?

特性!我们来看个例子

public class NetHandler
{[EventHandler]public Task PingAsync(PingEvent @event){//TODO}[EventHandler]public Task TelnetAsync(TelnetEvent @event){//TODO}
}

似乎我们找到了一个出路

多订阅者保序执行

通过事件层层推进确实可以满足顺序执行的场景,但如果你被大量无限套娃的事件包围的时候或许你需要另外一个出路,看下例子:

public class NetHandler
{[EventHandler(0)]public Task PingAsync(PingEvent @event){//TODO}[EventHandler(1)]public Task LogAsync(PingEvent @event){//TODO}
}

只要参数是同一个Event就会按照EventHandler的Order顺序执行。

Saga

那执行失败了怎么办,如果两个方法因为其中一个需要调用远程服务而无法跟本地事务结合,能帮我回滚吗?

来吧,SAGA走起,帮你再做个取消动作,同时还支持重试机制,以及是否忽略当前步骤的取消动作。

我们先来预设一下场景:

  1. 调用CheckBalanceAsync来检查余额

  2. 调用WithdrawAsync, 抛出 exception

  3. 重试WithdrawAsync 3 次

  4. 调用CancelWithdrawAsync

代码如下:

public class TransferHandler
{[EventHandler(1)]public Task CheckBalanceAsync(TransferEvent @event){//TODO}[EventHandler(2, FailureLevels.ThrowAndCancel, enableRetry: true, retryTimes: 3)]public Task WithdrawAsync(TransferEvent @event){//TODOthrow new Exception();}[EventHandler(2, FailureLevels.Ignore, enableRetry: false, isCancel: true)]public Task CancelWithdrawAsync(TransferEvent @event){//TODO}
}

AOP

举个业务场景,给所有Command在执行前增加一个参数验证

我们提供了Middleware,允许像俄罗斯套娃一样(.Net Middleware)做横切关注点的相关的事情

public class LoggingMiddleware<TEvent>: IMiddleware<TEvent> where TEvent : notnull, IEvent
{private readonly ILogger<LoggingMiddleware<TEvent>> _logger;public LoggingMiddleware(ILogger<LoggingMiddleware<TEvent>> logger) => _logger = logger;public async Task HandleAsync(TEvent @event, EventHandlerDelegate next){_logger.LogInformation("----- Handling command {EventName} ({@Event})", typeof(TEvent).FullName, @event);await next();}
}

注册DI

builder.Services.AddTransient(typeof(IMiddleware<>), typeof(LoggingMiddleware<>))

MASA EventBus完整功能列表

  • 接收事件

  • 维护订阅关系 - 接口

  • 维护订阅关系 - 特性

  • 多订阅者顺序执行

  • 转发事件

  • Saga

  • AOP

  • UoW

  • 自动开启和关闭事务

Integration Event Bus

用于跨服务的Event Bus,支持最终一致性,本地消息表

Pub/Sub

提供了Pub Sub接口,并基于Dapr Pub/Sub提供默认实现

本地消息表

提供了本地消息保存和UoW联动接口,并基于EF Core提供默认实现

使用方法

启用Dapr Event Bus

builder.Services.AddDaprEventBus<IntegrationEventLogService>(options=>{options.UseUoW<CatalogDbContext>(dbOptions => dbOptions.UseSqlServer("server=localhost;uid=sa;pwd=Password;database=test")).UseEventLog<CatalogDbContext>();)});

定义Integration Event

public class DemoIntegrationEvent : IntegrationEvent
{public override string Topic { get; set; } = nameof(DemoIntegrationEvent);//dapr topic name//todo other properties
}

定义DbContext(非必须,定义DbContext可以将本地消息表与业务事务联动)

public class CustomDbContext : IntegrationEventLogContext
{public DbSet<User> Users { get; set; } = null!;public CustomDbContext(MasaDbContextOptions<CustomDbContext> options) : base(options){}
}

发送 Event

IIntegrationEventBus eventBus; // from DI
await eventBus.PublishAsync(new DemoIntegrationEvent());

订阅 Event(基于Dapr Pub/Sub的版本)

[Topic("pubsub", nameof(DomeIntegrationEvent))]
public async Task DomeIntegrationEventHandleAsync(DomeIntegrationEvent @event)
{//todo
}

Domain Event Bus

在领域中同时提供Event Bus和Integration Event Bus的能力,允许实时发送事件或在Save时一次性触发

Domain Event Bus是最完整的能力,所以使用Domain Event Bus相当于已经开启了Event Bus和Integration Event Bus,在Domain Event Bus内部会自动协调事件分类往Event Bus和Integration Event Bus分流

启用Domain Event Bus

builder.Services
.AddDomainEventBus(options =>
{options.UseEventBus()//Use in-process events.UseUoW<CustomDbContext>(dbOptions => dbOptions.UseSqlServer("server=localhost;uid=sa;pwd=P@ssw0rd;database=idientity")).UseDaprEventBus<IntegrationEventLogService>()///Use cross-process events.UseEventLog<LocalMessageDbContext>().UseRepository<CustomDbContext>();
})

添加DomainCommand

Domain Event是进程内事件,IntegrationDomainEvent是跨进程事件

public class RegisterUserSucceededIntegrationEvent : IntegrationDomainEvent
{public override string Topic { get; set; } = nameof(RegisterUserSucceededIntegrationEvent);public string Account { get; set; } = default!;
}public class RegisterUserSucceededEvent : DomainEvent
{public string Account { get; set; } = default!;
}

进程内事件订阅

[EventHandler]
public Task RegisterUserHandlerAsync(RegisterUserDomainCommand command)
{//TODO
}

跨进程事件订阅

[Topic("pubsub", nameof(RegisterUserSucceededIntegrationEvent))]
public async Task RegisterUserSucceededHandlerAsync(RegisterUserSucceededIntegrationEvent @event)
{//todo
}

发送DomainCommand

IDomainEventBus eventBus;//from DI
await eventBus.PublishAsync(new RegisterUserDomainCommand());

使用场景

  • 兼顾遗留系统对接

  • 游走在云与非云中

  • 流计算

  • 微服务解耦和跨集群通信(需要将Dapr Pub/Sub改为Dapr Binding,不难)

  • 部分AOP类场景

总结

事件驱动可以解决一些特定场景的问题,凡事都有两面性,在本来就很简单的业务场景中使用如此复杂的模式会带来不小的负担。

学以致用,学无止境。

开源地址

MASA.BuildingBlocks:https://github.com/masastack/MASA.BuildingBlocks

MASA.Contrib:https://github.com/masastack/MASA.Contrib

MASA.Utils:https://github.com/masastack/MASA.Utils

MASA.EShop:https://github.com/masalabs/MASA.EShop

如果你对我们的MASA Framework感兴趣,无论是代码贡献、使用、提Issue,欢迎联系我们

4e0e50610cfb61c3b05b5c029f99920b.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/292725.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

wireshark-win64-3.4.0安装_这9类轴承的安装方法,你可都知道?有哪些需要注意的呢?...

轴承是当代机械设备中一种重要零部件。随着时间的推移&#xff0c;轴承会发生磨损&#xff0c;合理的安装和使用可以让机械设备减少不必要的安全隐患。前面文章讲了如何拆卸轴承&#xff0c;今天就给大家讲讲各类轴承应该如何安装&#xff01;一、轴承安装前的准备工作轴承的安…

史上最牛数学简史

全世界只有3.14 % 的人关注了爆炸吧知识“中国现代数学之父”华罗庚曾说过宇宙之大&#xff0c;粒子之微火箭之速&#xff0c;化工之巧地球之变&#xff0c;生物之谜日用之繁&#xff0c;无处不用数学回首往昔数学始终伴随我们左右纵横交错的几何、繁琐复杂的运算难以求解的方程…

linux下使用pidcat找bug

第一步&#xff1a; 安装pidcat 第二步&#xff1a; 找到APP的包名比如adb shell ps | grep sangforadb shell pm list package第三步&#xff1a; 在ubuntu终端输入pidcat.py 包名结果&#xff1a;

创建与删除索引

索引是加速查询的主要手段&#xff0c;特别对于涉及多个表的查询更是如此。本节中&#xff0c;将介绍索引的作用、特点&#xff0c;以及创建和删除索引的语法。13.4.1 使用索引优化查询索引是高速定位数据的技术&#xff0c;首先通过一个演示样例来了解其含义及作用&#xff0…

r vector 4 elements_Vector类与Enumeration接口

Vector类用于保存一组对象&#xff0c;由于java不支持动态数组&#xff0c;Vector可以用于实现跟动态数组差不多的功能。如果要将一组对象存放在某种数据结构中&#xff0c;但是不能确定对象的个数时&#xff0c;Vector是一个不错的选择。例&#xff1a;将键盘上输入的一个数字…

[深入JUnit] 测试运行的入口

阅读前提 了解JUnit 对JUnit的内部实现有兴趣 不妨看看[深入JUnit] Before, After, Test的秘密] 代码版本: junit 4.12代码搜索工具&#xff1a; http://grepcode.com/常用符号 _: 用来略去代码段中无关紧要的parameter ...: 用来略去无关紧要的代码实现 本文的展开方式&…

.NET6之MiniAPI(七):中间件

http协议&#xff0c;是由客户端发出请求&#xff0c;服务端响应结果并返回&#xff0c;我们把这个请求来回抽象成一个请求管道&#xff0c;那中间件就是这个管道上的阀门&#xff0c;控制着流量的进出和中断。每一个请求都要经过中间件的过滤&#xff0c;滤掉不合格的请求&…

linux之telnet命令使用

telnet命令通常用来远程登录。telnet程序是基于TELNET协议的远程登录客户端程序。Telnet协议是TCP/IP协议族中的一员&#xff0c;是Internet远程登陆服务的标准协议和主要方式。它为用户提供了在本地计算机上完成远程主机工作的 能力。在终端使用者的电脑上使用telnet程序&…

arraylist从大到小排序_经典排序方法的python实现和复杂度分析

1.冒泡排序:冒泡排序算法的运作如下&#xff1a;比较相邻的元素。如果第一个比第二个大(升序)&#xff0c;就交换他们两个。对每一对相邻元素作同样的工作&#xff0c;从开始第一对到结尾的最后一对。这步做完后&#xff0c;最后的元素会是最大的数。针对所有的元素重复以上的步…

HOOK学习笔记与心得

一、 Hook介绍钩子(Hook)&#xff0c;是Windows消息处理机制的一个平台,应用程序可以在上面设置子程以监视指定窗口的某种消息&#xff0c;而且所监视的窗口可以是其他进程所创建的。当消息到达后&#xff0c;在目标窗口处理函数之前处理它。钩子机制允许应用程序截获处理wind…

access函数_ACCESS中的DLookUp函数是如何运算的?

​一、DLookUp函数介绍1. DLookUp函数的用途&#xff1a;可以用于从指定集合(一个域)中获取符合条件的特定字段的值。2. DLookUp函数的格式为&#xff1a;DLookUp( expr , domain , [criteria] )其中&#xff1a;expr 为字段名&#xff0c;或以字段名为基础的表达式字符串domai…

汇编语言之基础知识

1、机器语言 说到汇编语言的产生&#xff0c;首先要讲一下机器语言。机器语言是机器指令的集合。什么是机器指令&#xff1f;我们在使用CE时&#xff0c;常常见到。 请看下图&#xff1a; 图中所示的就是机器指令&#xff08;或称机器码&#xff09;&#xff0c;这是十六进制的…

Entity Framework 简单增删改操作

前言 在 Entity Framework 简单查询操作 中主要是学习了在Entity Framework中的几种不同模式的查询操作&#xff0c;现在主要来学习一下简单的增加、删除、修改操作。 增加 在EF中添加操作一般有两种方式&#xff1a;一是直接创建对象&#xff0c;然后调用“DbSet”的”Add()”…

华为云服务器初探二(完结)

在上一篇《华为云服务器初探》 中介绍了在使用华为云服务器部署时的一些关键点&#xff0c;本篇继续&#xff0c;内容涉及如下:中间件的部署问题解决NAT 网关使用数据库服务的访问dotNET Core 程序的构建Redis首先更正上一篇中的一个错误&#xff0c;在运行参数中进行密码设置&…

for in for of区别_Python 第5课:for…in循环黄金搭档之列表

乐学趣学Py● 05&#xff1a;for…in循环黄金搭档之列表●Python趣味小百科Python程序中有一个有彩蛋&#xff0c;在IDLE Pythton模式下输入import this会出现一首(The Zen of Python, by Tim Peters)‘Pyton之禅’的小诗。这首小诗表明了用Python编写代码时遵循的原则&#xf…

开源虎墩同名电影《小虎墩大英雄》定档大年初一

文末有福利&#xff0c;记得看到最后哦~| 作者&#xff1a;虎虎生风的开源虎墩组| 编辑&#xff1a;刘雪洁| 责编&#xff1a;王玥敏开源虎墩诞生记&#xff1a;小源机器人2.0大家还记得 2020 疫情肆虐的时候开源社与来自全国各地的开源爱好者隔空合作&#xff0c;共同打造的疫…