【Shashlik.EventBus】.NET 事件总线,分布式事务最终一致性简介

分布式事务、CAP定理、事件总线,在当前微服务、分布式、集群大行其道的架构前提下,是不可逃避的几个关键字,在此不会过多阐述相关的理论知识。Shashlik.EventBus就是一个基于.NET6的开源事件总线解决方案,同时也是分布式事务最终一致性、延迟事件解决方案。Shashlik.EventBus采用的是异步确保的思路(本地消息表),将消息数据与业务数据在同一事务中进行提交或回滚,以此来保证消息数据的可靠性。其设计目标是高性能、简单、易用、易扩展,为抛弃历史包袱,仅支持NET6,采用最宽松的 MIT 开源协议。

 https://github.com/dotnet-shashlik/shashlik.eventbus

各位爷高兴了给个star呗。

5b8f39c44a253261525473d4dd5435c0.png

如图所示,消息数据需要和业务数据在同一的事务中进行提交或者回滚,最后Shashlik.EventBus会检查消息数据是否已提交,如果已提交才会执行真正的消息发送。所以要求事务的隔离级别最低为读已提交(RC)。

关于消息幂等

Shashlik.EventBus不能保证业务消息的幂等性,为了保证消息的可靠传输,EventBus以及消息中间件对消息QOS处理等级必须为at least once (至少到达一次),一般消息中间件都需要开启消息持久化避免消息丢失。简而言之就是一个事件处理类可能处理多次同一个事件,事件消息的幂等性应该由业务方进行处理。比如用户订单付款完成为一个事件,付款完成后需要修改订单状态为待发货,也就是在付款完成事件处理类中可能收到多次这个订单的付款完成事件,那么业务的幂等性处理就可以使用锁,判断订单状态,如果订单状态已经为待发货,则直接返回并忽略本次事件响应。

延迟事件

Shashlik.EventBus支持基于本地的延迟事件机制,考虑到不是所有的消息中间件都支持延迟功能,且为了最大程度保证消息的可靠性,最后采用了System.Timers.Timer来执行延迟功能。

延迟事件同样适用于分布式事务最终一致性,但如果延迟事件处理类处理异常由重试器介入处理后,那么最终的延迟执行时间和期望的延迟时间就会产生较大的差异,是否忽略这里的时间差需要由具体的业务来决定。比如订单30分钟未付款需要关闭订单,30分钟后关闭订单出现了异常,最后由重试器到了40分钟后才关闭,也不影响订单,那么认为这个时间差可以容忍。又比如双11啦,发布一个延迟事件,晚上12点叫醒我起来买买买,只有1分钟时间,过了就买不到了,那么这种情况可以在事件处理类中,自行根据当前时间、事件发送时间、延迟执行时间等要素,自行决定业务如何处理。

延迟事件和普通事件在事件定义和事件处理类声明和处理时没有任何区别,仅仅是在发布事件时需要指定延迟时间。

上代码

需求:一个新用户注册以后有以下需求:

  1. 发送欢迎注册短信;

  2. 发放新用户优惠券;

  3. 30分钟后推送新用户优惠活动信息。

1. 服务配置,这里以MySql + RabbitMQ为例:

services.AddEventBus(r =>{// 这些都是缺省配置,可以直接services.AddEventBus()// 运行环境,注册到MQ的事件名称和事件处理名称会带上此后缀r.Environment = "Production";// 最大失败重试次数,默认60次r.RetryFailedMax = 60;// 消息重试间隔,默认2分钟r.RetryInterval = 60 * 2;// 单次重试消息数量限制,默认100r.RetryLimitCount = 100;// 成功的消息过期时间,默认3天,失败的消息永不过期,必须处理r.SucceedExpireHour = 24 * 3;            // 消息处理失败后,重试器介入时间,默认5分钟后r.StartRetryAfter = 60 * 5;            // 事务提交超时时间,单位秒,默认60秒r.TransactionCommitTimeout = 60;// 重试器执行时消息锁定时长r.LockTime = 110;})// 使用ef DbContext mysql.AddMySql<DemoDbContext>()// 配置RabbitMQ.AddRabbitMQ(r =>{r.Host = "localhost";r.UserName = "rabbit";r.Password = "123123";});
  1. 定义事件

// 新用户注册完成事件,实现接口IEventpublic class NewUserEvent : IEvent{public string Id { get;set; }public string Name { get; set; }}// 定义新用户注册延迟活动推送事件public class NewUserPromotionEvent : IEvent{public string Id { get;set; }public string Name { get; set; }public string PromotionId { get; set; }}
  1. 发布事件

public class UserManager{public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext){EventPublisher = eventPublisher;DbContext = dbContext;}private IEventPublisher EventPublisher { get; }private DemoDbContext DbContext { get; }public async Task CreateUserAsync(UserInput input){// 开启本地事务using var tran = await DbContext.DataBase.BeginTransactionAsync();try{// 创建用户逻辑处理...// 发布新用户事件// 通过注入IEventPublisher发布事件,需要传入事务上下文数据await EventPublisher.PublishAsync(new NewUserEvent{Id = user.Id,Name = input.Name}, DbContext.GetTransactionContext());// 发布延迟事件// 通过ef扩展,直接使用DbContext发布事件,自动使用当前上下文事务await DbContext.PublishEventAsync(new NewUserPromotionEvent{Id = user.Id,Name = input.Name,PromotionId = "1"}, DatetimeOffset.Now.AddMinutes(30));// 提交本地事务await tran.CommitAsync();}catch(Exception ex){// 回滚事务,消息数据也将回滚不会发布await tran.RollbackAsync();}}}
  1. 定义事件处理类

// 一个事件可以有多个处理类,可以分布在不同的微服务中// 用于发送短信的事件处理类public class NewUserEventForSmsHandler : IEventHandler<NewUserEvent>{public async Task Execute(NewUserEvent @event, IDictionary<string, string> items){// 发送短信...}}// 用于发放消费券的事件处理类public class NewUserEventForCouponsHandler : IEventHandler<NewUserEvent>{public async Task Execute(NewUserEvent @event, IDictionary<string, string> items){// 业务处理...}}// 用于新用户延迟活动的事件处理类,将在指定时间执行public class NewUserPromotionEventHandler : IEventHandler<NewUserPromotionEvent>{public async Task Execute(NewUserPromotionEvent @event, IDictionary<string, string> items){// 业务处理...}}

默认的,发布、声明到消息中间件的事件、事件处理器名称生产规则为{Type.Name}.{Options.Environment},在分布式架构下需要,您需要了解这个默认规则,这点不同于CAP框架必须显示声明,当然Shashlik.EventBus也可以使用EventBusNameAttribute特性来显示声明,详细说明请上github查看wiki文档。

XA事务支持(TransactionScope)

虽然尽可能的不要使用TransactionScope,但在某些场景仍然是需要的,Shashlik.EventBus对其提供了事务支持,可以通过XaTransactionContext.Current获取当前环境的事务上下文,发布事件如下:

public class UserManager{public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext){EventPublisher = eventPublisher;DbContext = dbContext;}private IEventPublisher EventPublisher { get; }private DemoDbContext DbContext { get; }public async Task CreateUserAsync(UserInput input){// 开启事务using var scope = new TransactionScope();try{// 创建用户逻辑处理...// 发布新用户事件// 通过注入IEventPublisher发布事件,需要传入事务上下文数据await EventPublisher.PublishAsync(new NewUserEvent{Id = user.Id,Name = input.Name// 使用 XaTransactionContext.Current}, XaTransactionContext.Current);// 提交事务await scope.Complete();}catch(Exception ex){// 回滚事务,消息数据也将回滚不会发布await tran.RollbackAsync();}}}

扩展

如果默认实现不能满足你的需求,可以自行实现可扩展接口,并注册即可。

  • IMsgIdGenerator:消息Id生成器,是指传输的全局唯一id,不是指存储的id。默认guid

  • IEventPublisher:事件发布处理器。

  • IMessageSerializer:消息序列化、反序列化处理类。默认Newtonsoft.Json。

  • IReceivedMessageRetryProvider:已接收消息重试器。

  • IPublishedMessageRetryProvider:已发布消息重试器。

  • IEventHandlerInvoker: 事件处理执行器

  • IEventNameRuler:事件名称规则生成(对应消息队列topic/route)。

  • IEventHandlerNameRuler:事件处理名称规则生成(对应消息队列queue/group)。

  • IEventHandlerFindProvider:事件处理类查找器

  • IExpiredMessageProvider:已过期消息删除处理器。

  • IMessageListener:消息监听处理器。

  • IRetryProvider:重试执行器。

  • IPublishHandler:消息发布处理器。

  • IReceivedHandler:消息接收处理器。

  • IMessageStorageInitializer:存储介质初始化。

  • IMessageStorage:消息存储、读取等操作。

例:

// 替换默认的IMsgIdGeneratorservice.AddSingleton<IMsgIdGenerator, CustomMsgIdGenerator>();service.AddEventBus().AddMemoryQueue().AddMemoryStorage();

后续计划

  • 功能

    •  Dashboard

  • 消息中间件支持

    •  RabbitMQ

    •  Kafka

    •  RocketMQ

    •  ActiveMQ

    •  Pulsar

    •  Redis

  • 存储支持

    •  MySql

    •  PostgreSql

    •  SqlServer

    •  Oracle

    •  MongoDb

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/282820.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

5个超实用的Visual Studio插件

工欲善其事&#xff0c;必先利其器,整理的一些我必装的5款Visual Studio插件&#xff0c;希望你们能get到。01 CodeMaidCodeMaid快速整理代码文件&#xff0c;规范你的代码&#xff0c;提高代码阅读体验。代码自动对齐&#xff0c;格式化代码&#xff08;ps&#xff1a;不用再按…

BZOJ1509: [NOI2003]逃学的小孩(树的直径)

Time Limit: 5 Sec Memory Limit: 64 MBSubmit: 1126 Solved: 567[Submit][Status][Discuss]Description Input 第一行是两个整数N&#xff08;3  N  200000&#xff09;和M&#xff0c;分别表示居住点总数和街道总数。以下M行&#xff0c;每行给出一条街道的信息。第i1行…

Lombok@Builder和@NoArgsConstructor冲突

问题 今天在使用lombok简化model类时。使用Builder建造者模式。报以下异常 解决办法。 去掉NoArgsConstructor添加AllArgsConstructor源码分析 下图是编译后的源码 只使用Builder会自动创建全参构造器。而添加上NoArgsConstructor后就不会自动产生全参构造器

ntop linux,Linux下开源监控软件Ntop的性能提升方案

摘要&#xff1a;Ntop是一款Linux下常见的开源监控软件&#xff0c;它可以监测的数据包括&#xff1a;网络流量、使用协议、系统负载、端口情况、数据包发送时间等。正常情况下它工作的时候就像一部被动声纳&#xff0c;默默的接收看来自网络的各种信息&#xff0c;通过对这些数…

性能优化8--内存泄露

一.根源&#xff1a; 内存泄露简单说就是已经没有用的资源&#xff0c;但是由于被其他资源引用着无法被GC销毁。 二.内存泄露常见场景 1.单例导致内存泄露 单例的静态特性使得它的生命周期同应用的生命周期一样长&#xff0c;如果一个对象已经没有用处了&#xff0c;但是单例还…

记一次 .NET 某打印服务 非托管内存泄漏

一&#xff1a;背景 1. 讲故事前段时间有位朋友在微信上找到我&#xff0c;说他的程序出现了内存泄漏&#xff0c;能不能帮他看一下&#xff0c;这个问题还是比较经典的&#xff0c;加上好久没上非托管方面的东西了&#xff0c;这篇就和大家分享一下&#xff0c;话不多说&#…

mysql经典的8小时问题-wait_timeout

2019独角兽企业重金招聘Python工程师标准>>> 前段时间 现网突然频繁报出 连接不上数据库&#xff0c;偶滴的妖孽&#xff0c;其他地方都是用mysql&#xff0c;也没遇到这个问题呀。 java.io.EOFExceptionat at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:1913…

Chrome DevTools — Network

记录网络请求 默认情况下&#xff0c;只要DevTools在开启状态&#xff0c;DevTools会记录所有的网络请求&#xff0c;当然&#xff0c;记录都是在Network面板展示的。 停止记录网络请求 点击Stop recording network log红色图标&#xff0c;当它变为灰色时&#xff0c;表示DevT…

MySQL 查看表结构简单命令

一、简单描述表结构&#xff0c;字段类型 desc tabl_name; 显示表结构&#xff0c;字段类型&#xff0c;主键&#xff0c;是否为空等属性&#xff0c;但不显示外键。 例如&#xff1a;desc table_name 二、查询表中列的注释信息 select * from information_schema.columns wher…

简单获取任意app的URL Schemes

简单说明 最近业务需要&#xff0c;一直在查询App的scheme相关信息&#xff0c;找到一种比较可靠的方法&#xff0c;分享给大家 步骤如下&#xff1a; 在电脑上使用iTunes下载那个app下载完后&#xff0c;在itunes里点击这个app&#xff0c;选择->Show in Finder&#xff0c…

Dnslog在SQL注入中的利用

参考文献&#xff1a;www.anquanke.com/post/id/98096https://bbs.pediy.com/thread-223881.htm DNSlog在Web攻击的利用 在某些无法直接利用漏洞获得回显的情况下&#xff0c;但是目标可以发起DNS请求&#xff0c;这个时候就可以通过DNSlog把想获得的数据外带出来。 常用情况 S…

让泛型的思维扎根在脑海——深刻理解泛型

1.前言往往一些刚接触C#编程的初学者&#xff0c;对于泛型的认识就是直接跳到对泛型集合的使用上&#xff0c;虽然微软为我们提供了很多内置的泛型类型&#xff0c;但是如果我们只是片面的了解调用方式&#xff0c;这会导致我们对泛型盲目的使用。至于为什么要使用泛型&#xf…

android 系统ui修改器,分享两个效果 - Android 系统 UI 管理

SystemUIManage.gifDimming the System Bars (沉浸模式)知乎 和 Medium 中都使用到了这个效果&#xff0c;作为沉浸式阅读模式。// This example uses decor view, but you can use any visible view.View decorView getWindow().getDecorView();int uiOptions View.SYSTEM_U…

打游戏要存进度-备忘录模式

打游戏要存进度-备忘录模式 学习自 《大话设计模式》 备忘录模式漫谈 备忘录的这种设计思想是非常常见的&#xff0c;比如说围棋游戏的悔棋&#xff0c;绘图软件的撤销功能等等&#xff0c;都或多或少的使用了备忘录模式来处理对象的状态。 备忘录(Memento): 在不破坏封装性的前…

利用lay-ui结合ajax实现分页功能(不借助框架,简单易懂)

效果图: 1.创建html页面 01.html(前台文件) 2.创建index.php(后台文件) ------------------热身结束,开始正式分页之旅------------------ 3.在html页面中引入layui需要用到的css以及js,还有我们自己额外需要用到的jquery 4.在html文件中,将基本的分页栏显示出来 5.好啦,htm…

酷派手机android版本,系统版本迎来升级

系统版本迎来升级这个应该是两个版本之间最大但是却不那么直观的不同了&#xff0c;因为从TD版酷派大神F1采用的CoolLife UI 5.0版本&#xff0c;再到联通版酷派大神F1所搭载的CoolLife UI 5.5版本&#xff0c;它们之间经历了一个比较不错的升级。在图标ICON&#xff0c;功能设…

最终用户计算安全——特权访问控制

本篇算是系列的第二篇&#xff0c;之前写了一篇关于勒索软件攻击的&#xff0c;坦白说写这样的文很费脑子&#xff0c;而且喜欢看的读者估计也不多…不过我觉得整理一下思路&#xff0c;对于通过最终用户计算产品或方案来提升组织安全还是有很大的意义的。所以一边喝着清茶吃着…

详述 IntelliJ IDEA 插件的安装及使用方法

首先&#xff0c;进入插件安装界面&#xff1a; Mac&#xff1a;IntelliJ IDEA -> Preferences -> Plugins;Windows&#xff1a;File -> Settings -> Plugins.标注 1&#xff1a;显示 IntelliJ IDEA 的插件分类&#xff0c; All plugins&#xff1a;显示 IntelliJ …

杭漂两年,深漂两年,宇宙的尽头到底在哪儿

hi&#xff0c;这里是桑小榆。这次分享的是一位杭漂两年&#xff0c;深漂两年的码农伙伴的经历。首先他能够在大学期间就寻找到自己的热爱并持之以恒值得令人学习。其次他的工作经历可以说是非常的“程序员”&#xff0c;因为程序员所面对的职业生涯中&#xff0c;所谓的实习&a…