【Shashlik.EventBus】.NET 事件总线,分布式事务最终一致性简介

分布式事务、CAP定理、事件总线,在当前微服务、分布式、集群大行其道的架构前提下,是不可逃避的几个关键字,在此不会过多阐述相关的理论知识。Shashlik.EventBus就是一个基于.NET6的开源事件总线解决方案,同时也是分布式事务最终一致性、延迟事件解决方案。Shashlik.EventBus采用的是异步确保的思路(本地消息表),将消息数据与业务数据在同一事务中进行提交或回滚,以此来保证消息数据的可靠性。其设计目标是高性能、简单、易用、易扩展,为抛弃历史包袱,仅支持NET6,采用最宽松的 MIT 开源协议。

 https://github.com/dotnet-shashlik/shashlik.eventbus

各位爷高兴了给个star呗。

5b8f39c44a253261525473d4dd5435c0.png

如图所示,消息数据需要和业务数据在同一的事务中进行提交或者回滚,最后Shashlik.EventBus会检查消息数据是否已提交,如果已提交才会执行真正的消息发送。所以要求事务的隔离级别最低为读已提交(RC)。

关于消息幂等

Shashlik.EventBus不能保证业务消息的幂等性,为了保证消息的可靠传输,EventBus以及消息中间件对消息QOS处理等级必须为at least once (至少到达一次),一般消息中间件都需要开启消息持久化避免消息丢失。简而言之就是一个事件处理类可能处理多次同一个事件,事件消息的幂等性应该由业务方进行处理。比如用户订单付款完成为一个事件,付款完成后需要修改订单状态为待发货,也就是在付款完成事件处理类中可能收到多次这个订单的付款完成事件,那么业务的幂等性处理就可以使用锁,判断订单状态,如果订单状态已经为待发货,则直接返回并忽略本次事件响应。

延迟事件

Shashlik.EventBus支持基于本地的延迟事件机制,考虑到不是所有的消息中间件都支持延迟功能,且为了最大程度保证消息的可靠性,最后采用了System.Timers.Timer来执行延迟功能。

延迟事件同样适用于分布式事务最终一致性,但如果延迟事件处理类处理异常由重试器介入处理后,那么最终的延迟执行时间和期望的延迟时间就会产生较大的差异,是否忽略这里的时间差需要由具体的业务来决定。比如订单30分钟未付款需要关闭订单,30分钟后关闭订单出现了异常,最后由重试器到了40分钟后才关闭,也不影响订单,那么认为这个时间差可以容忍。又比如双11啦,发布一个延迟事件,晚上12点叫醒我起来买买买,只有1分钟时间,过了就买不到了,那么这种情况可以在事件处理类中,自行根据当前时间、事件发送时间、延迟执行时间等要素,自行决定业务如何处理。

延迟事件和普通事件在事件定义和事件处理类声明和处理时没有任何区别,仅仅是在发布事件时需要指定延迟时间。

上代码

需求:一个新用户注册以后有以下需求:

  1. 发送欢迎注册短信;

  2. 发放新用户优惠券;

  3. 30分钟后推送新用户优惠活动信息。

1. 服务配置,这里以MySql + RabbitMQ为例:

services.AddEventBus(r =>{// 这些都是缺省配置,可以直接services.AddEventBus()// 运行环境,注册到MQ的事件名称和事件处理名称会带上此后缀r.Environment = "Production";// 最大失败重试次数,默认60次r.RetryFailedMax = 60;// 消息重试间隔,默认2分钟r.RetryInterval = 60 * 2;// 单次重试消息数量限制,默认100r.RetryLimitCount = 100;// 成功的消息过期时间,默认3天,失败的消息永不过期,必须处理r.SucceedExpireHour = 24 * 3;            // 消息处理失败后,重试器介入时间,默认5分钟后r.StartRetryAfter = 60 * 5;            // 事务提交超时时间,单位秒,默认60秒r.TransactionCommitTimeout = 60;// 重试器执行时消息锁定时长r.LockTime = 110;})// 使用ef DbContext mysql.AddMySql<DemoDbContext>()// 配置RabbitMQ.AddRabbitMQ(r =>{r.Host = "localhost";r.UserName = "rabbit";r.Password = "123123";});
  1. 定义事件

// 新用户注册完成事件,实现接口IEventpublic class NewUserEvent : IEvent{public string Id { get;set; }public string Name { get; set; }}// 定义新用户注册延迟活动推送事件public class NewUserPromotionEvent : IEvent{public string Id { get;set; }public string Name { get; set; }public string PromotionId { get; set; }}
  1. 发布事件

public class UserManager{public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext){EventPublisher = eventPublisher;DbContext = dbContext;}private IEventPublisher EventPublisher { get; }private DemoDbContext DbContext { get; }public async Task CreateUserAsync(UserInput input){// 开启本地事务using var tran = await DbContext.DataBase.BeginTransactionAsync();try{// 创建用户逻辑处理...// 发布新用户事件// 通过注入IEventPublisher发布事件,需要传入事务上下文数据await EventPublisher.PublishAsync(new NewUserEvent{Id = user.Id,Name = input.Name}, DbContext.GetTransactionContext());// 发布延迟事件// 通过ef扩展,直接使用DbContext发布事件,自动使用当前上下文事务await DbContext.PublishEventAsync(new NewUserPromotionEvent{Id = user.Id,Name = input.Name,PromotionId = "1"}, DatetimeOffset.Now.AddMinutes(30));// 提交本地事务await tran.CommitAsync();}catch(Exception ex){// 回滚事务,消息数据也将回滚不会发布await tran.RollbackAsync();}}}
  1. 定义事件处理类

// 一个事件可以有多个处理类,可以分布在不同的微服务中// 用于发送短信的事件处理类public class NewUserEventForSmsHandler : IEventHandler<NewUserEvent>{public async Task Execute(NewUserEvent @event, IDictionary<string, string> items){// 发送短信...}}// 用于发放消费券的事件处理类public class NewUserEventForCouponsHandler : IEventHandler<NewUserEvent>{public async Task Execute(NewUserEvent @event, IDictionary<string, string> items){// 业务处理...}}// 用于新用户延迟活动的事件处理类,将在指定时间执行public class NewUserPromotionEventHandler : IEventHandler<NewUserPromotionEvent>{public async Task Execute(NewUserPromotionEvent @event, IDictionary<string, string> items){// 业务处理...}}

默认的,发布、声明到消息中间件的事件、事件处理器名称生产规则为{Type.Name}.{Options.Environment},在分布式架构下需要,您需要了解这个默认规则,这点不同于CAP框架必须显示声明,当然Shashlik.EventBus也可以使用EventBusNameAttribute特性来显示声明,详细说明请上github查看wiki文档。

XA事务支持(TransactionScope)

虽然尽可能的不要使用TransactionScope,但在某些场景仍然是需要的,Shashlik.EventBus对其提供了事务支持,可以通过XaTransactionContext.Current获取当前环境的事务上下文,发布事件如下:

public class UserManager{public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext){EventPublisher = eventPublisher;DbContext = dbContext;}private IEventPublisher EventPublisher { get; }private DemoDbContext DbContext { get; }public async Task CreateUserAsync(UserInput input){// 开启事务using var scope = new TransactionScope();try{// 创建用户逻辑处理...// 发布新用户事件// 通过注入IEventPublisher发布事件,需要传入事务上下文数据await EventPublisher.PublishAsync(new NewUserEvent{Id = user.Id,Name = input.Name// 使用 XaTransactionContext.Current}, XaTransactionContext.Current);// 提交事务await scope.Complete();}catch(Exception ex){// 回滚事务,消息数据也将回滚不会发布await tran.RollbackAsync();}}}

扩展

如果默认实现不能满足你的需求,可以自行实现可扩展接口,并注册即可。

  • IMsgIdGenerator:消息Id生成器,是指传输的全局唯一id,不是指存储的id。默认guid

  • IEventPublisher:事件发布处理器。

  • IMessageSerializer:消息序列化、反序列化处理类。默认Newtonsoft.Json。

  • IReceivedMessageRetryProvider:已接收消息重试器。

  • IPublishedMessageRetryProvider:已发布消息重试器。

  • IEventHandlerInvoker: 事件处理执行器

  • IEventNameRuler:事件名称规则生成(对应消息队列topic/route)。

  • IEventHandlerNameRuler:事件处理名称规则生成(对应消息队列queue/group)。

  • IEventHandlerFindProvider:事件处理类查找器

  • IExpiredMessageProvider:已过期消息删除处理器。

  • IMessageListener:消息监听处理器。

  • IRetryProvider:重试执行器。

  • IPublishHandler:消息发布处理器。

  • IReceivedHandler:消息接收处理器。

  • IMessageStorageInitializer:存储介质初始化。

  • IMessageStorage:消息存储、读取等操作。

例:

// 替换默认的IMsgIdGeneratorservice.AddSingleton<IMsgIdGenerator, CustomMsgIdGenerator>();service.AddEventBus().AddMemoryQueue().AddMemoryStorage();

后续计划

  • 功能

    •  Dashboard

  • 消息中间件支持

    •  RabbitMQ

    •  Kafka

    •  RocketMQ

    •  ActiveMQ

    •  Pulsar

    •  Redis

  • 存储支持

    •  MySql

    •  PostgreSql

    •  SqlServer

    •  Oracle

    •  MongoDb

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/282820.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

5个超实用的Visual Studio插件

工欲善其事&#xff0c;必先利其器,整理的一些我必装的5款Visual Studio插件&#xff0c;希望你们能get到。01 CodeMaidCodeMaid快速整理代码文件&#xff0c;规范你的代码&#xff0c;提高代码阅读体验。代码自动对齐&#xff0c;格式化代码&#xff08;ps&#xff1a;不用再按…

BZOJ1509: [NOI2003]逃学的小孩(树的直径)

Time Limit: 5 Sec Memory Limit: 64 MBSubmit: 1126 Solved: 567[Submit][Status][Discuss]Description Input 第一行是两个整数N&#xff08;3  N  200000&#xff09;和M&#xff0c;分别表示居住点总数和街道总数。以下M行&#xff0c;每行给出一条街道的信息。第i1行…

Blazor University (52)依赖注入 —— 拥有多个依赖项:正确的方式

原文链接&#xff1a;https://blazor-university.com/dependency-injection/component-scoped-dependencies/owning-multiple-dependencies-the-right-way/拥有多个依赖项&#xff1a;正确的方式在上一节[1]中&#xff0c;我们看到了将多个拥有的依赖项注入组件的错误方法。本节…

Gradle 1.12用户指南翻译——第五十四章. 构建原生二进制文件

其他章节的翻译请参见&#xff1a;http://blog.csdn.net/column/details/gradle-translation.html翻译项目请关注Github上的地址&#xff1a;https://github.com/msdx/gradledoc本文翻译所在分支&#xff1a;https://github.com/msdx/gradledoc/tree/1.12。直接浏览双语版的文档…

android 调用c wcf服务,如何使用命名管道从c调用WCF方法?

更新&#xff1a;通过协议here,我无法弄清楚未知的信封记录.我在网上找不到任何例子.原版的&#xff1a;我有以下WCF服务static void Main(string[] args){var inst new PlusFiver();using (ServiceHost host new ServiceHost(inst,new Uri[] { new Uri("net.pipe://loc…

VK Cup 2015 - Qualification Round 1 A. Reposts(树)

传送门 Description One day Polycarp published a funny picture in a social network making a poll about the color of his handle. Many of his friends started reposting Polycarps joke to their news feed. Some of them reposted the reposts and so on. These event…

Lombok@Builder和@NoArgsConstructor冲突

问题 今天在使用lombok简化model类时。使用Builder建造者模式。报以下异常 解决办法。 去掉NoArgsConstructor添加AllArgsConstructor源码分析 下图是编译后的源码 只使用Builder会自动创建全参构造器。而添加上NoArgsConstructor后就不会自动产生全参构造器

现在商业有种竞争叫“跨界打击”

随着互联网的发展&#xff0c;“跨界打击”的事情可谓是无处不在。行业跨界打击会抢占某个行业的市场份额&#xff0c;甚至可能淘汰一个行业。跨界打击者可能是某个行业的新进入者&#xff0c;也可能是现有竞争者&#xff0c;更可能是彻底的替代者或颠覆者。跨界打击&#xff0…

架构之美阅读笔记之一

寒假生活开始了&#xff0c;关于软件架构这部分的学习&#xff0c;我选择的是《架构之美》这本书。这本出版于2009年的书&#xff0c;由浅入深地讲述了从架构的概述&#xff0c;到企业级应用架构&#xff0c;系统架构&#xff0c;最终用户应用架构&#xff0c;再到语言与架构模…

ntop linux,Linux下开源监控软件Ntop的性能提升方案

摘要&#xff1a;Ntop是一款Linux下常见的开源监控软件&#xff0c;它可以监测的数据包括&#xff1a;网络流量、使用协议、系统负载、端口情况、数据包发送时间等。正常情况下它工作的时候就像一部被动声纳&#xff0c;默默的接收看来自网络的各种信息&#xff0c;通过对这些数…

Java异常处理教程

异常是在没有定义正常执行路径时在Java程序的执行期间可能出现的条件。Java通过将执行操作的代码与处理错误的代码分离来处理错误。 当发生异常时&#xff0c;Java会创建一个包含有关异常的所有信息的对象&#xff0c;并将其传递给相应的异常处理代码。有关异常的信息包括异常的…

性能优化8--内存泄露

一.根源&#xff1a; 内存泄露简单说就是已经没有用的资源&#xff0c;但是由于被其他资源引用着无法被GC销毁。 二.内存泄露常见场景 1.单例导致内存泄露 单例的静态特性使得它的生命周期同应用的生命周期一样长&#xff0c;如果一个对象已经没有用处了&#xff0c;但是单例还…

那些年,登山徒步记录,立贴

2018年1月-9月份暂无数据。&#xff08;惨无人道&#xff0c;已经丧失了自我。&#xff09; 10月份2017年2月份02月12日 25.00KM 牛木外线3月份暂无数据。 4月份1.04月09日 16.00KM 火凤线 5月份1.05月06日 20.00KM 渔帽线&#xff08;第一机耕路&#xff09; 6月份1.06月11日 …

记一次 .NET 某打印服务 非托管内存泄漏

一&#xff1a;背景 1. 讲故事前段时间有位朋友在微信上找到我&#xff0c;说他的程序出现了内存泄漏&#xff0c;能不能帮他看一下&#xff0c;这个问题还是比较经典的&#xff0c;加上好久没上非托管方面的东西了&#xff0c;这篇就和大家分享一下&#xff0c;话不多说&#…

android静态方法如何测试,android – 如何使用mock()和spy()测试静态方法

通常情况下,如果你最终使用PowerMock,这是一个很好的迹象,表明你最有可能是错误的方式.如果不是直接引用毕加索,而是创建一个组件,它的职责是加载图像,让我们说类ImageLoader.这会给你什么&#xff1f;>关注点分离&#xff1a;如果明天你决定转移到Glide,你不应该改变你使用…

mysql经典的8小时问题-wait_timeout

2019独角兽企业重金招聘Python工程师标准>>> 前段时间 现网突然频繁报出 连接不上数据库&#xff0c;偶滴的妖孽&#xff0c;其他地方都是用mysql&#xff0c;也没遇到这个问题呀。 java.io.EOFExceptionat at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:1913…

Chrome DevTools — Network

记录网络请求 默认情况下&#xff0c;只要DevTools在开启状态&#xff0c;DevTools会记录所有的网络请求&#xff0c;当然&#xff0c;记录都是在Network面板展示的。 停止记录网络请求 点击Stop recording network log红色图标&#xff0c;当它变为灰色时&#xff0c;表示DevT…

Blazor University 中文版网站已上线

在学习 Blazor 的过程中&#xff0c;找到了一个网站 Blazor University&#xff08;https://blazor-university.com&#xff09;。发现网站内容非常详实&#xff0c;正像首页所说的&#xff1a;通过浏览本网站中的信息&#xff0c;我打算带您从完全的新手到Blazor的所有方面的专…

android:paddingtop 百分比,相对层中的百分比宽度

相对层中的百分比宽度我正在为登录进行表单布局。Activity在我的Android应用程序中。下面的图片是我希望它看起来的样子&#xff1a;我能够通过以下方式实现这个布局XML..问题是&#xff0c;这有点麻烦。我不得不对主机EditText的宽度进行硬编码。具体而言&#xff0c;我必须具…

MySQL 查看表结构简单命令

一、简单描述表结构&#xff0c;字段类型 desc tabl_name; 显示表结构&#xff0c;字段类型&#xff0c;主键&#xff0c;是否为空等属性&#xff0c;但不显示外键。 例如&#xff1a;desc table_name 二、查询表中列的注释信息 select * from information_schema.columns wher…