基于Abp VNext框架设计 - Masstransit分布式消息

abp 通过IDistributedEventBus接口集成自IEventBus实现分布式事件消息的发布订阅。

IEventBus在什么时机触发PublishAsync?

  1. 当前UnitOfWork完成时,触发IEventBusPublishAsync

  2. 在没有事务环境下,同步调用IEventBusPublishAsync

abp 默认实现基于RabbitMq消息队列Volo.Abp.EventBus.RabbitMQ实现分布式消息的发布与订阅。

消息治理核心问题:

  1. 生产端如何保证投递成功的消息不能丢失。

  2. Mq自身如何保证消息不丢失。

  3. 消费段如何保证消费端的消息不丢失。

基于abp 默认实现的DistributedEventBus不能满足以下场景:

  1. Publisher 生产者无法保证消息一定能投递到MQ。

  2. Consumer 消费端在消息消费时,出现异常时,没有异常错误处理机制(确保消费失败的消息能重新被消费)。

我们引入Masstransit,来提升abp对消息治理能力。

Masstransit提供以下开箱即用功能:

  1. Publish/Send/Request-Response等几种消息投递机制。

  2. 多种IOC容器支持。

  3. 异常机制。

  4. Saga事务管理。

  5. 事务活动补偿机制(Courier)

  6. 消息审计

  7. 消息管道处理机制

Abp 框架下事件消息集成

  1. 使用MassTransit重新实现IDistributedEventBus

  2. 在消费端Consumer传递用户身份信息。

  3. 使用Asp.Net Core Web Host 作消费端Consumer宿主。

集成MassTransit

在Module初始化时,注入MassTransit实例,并启动。

Copy/// <summary>
/// 配置DistributedEventBus
/// </summary>
/// <param name="context"></param>
/// <param name="configuration"></param>
/// <param name="hostingEnvironment"></param>
private void ConfigureDistributedEventBus(ServiceConfigurationContext context, IConfiguration configuration, IWebHostEnvironment hostingEnvironment)
{var options = context.Services.GetConfiguration().GetSection("Rabbitmq").Get<MassTransitEventBusOptions>();var mqConnectionString = "rabbitmq://" + options.ConnectionString;context.Services.AddMassTransit(mtConfig =>{//inject consumers into IOC from assemblymtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));mtConfig.AddBus(provider =>{var bus = Bus.Factory.CreateUsingRabbitMq(mqConfig =>{var host = mqConfig.Host(new Uri(mqConnectionString), h =>{h.Username(options.UserName);h.Password(options.Password);});// set special message serializermqConfig.UseBsonSerializer();// integrated existed logger compontentmqConfig.UseExtensionsLogging(provider.GetService<ILoggerFactory>());mqConfig.ReceiveEndpoint(host, "authcenter-queue", q =>{//set rabbitmq prefetch countq.PrefetchCount = 200;//set message retry policyq.UseMessageRetry(r => r.Interval(3, 100));q.Consumer<SmsTokenValidationCreatedEventConsumer>(provider);EndpointConvention.Map<SmsTokenValidationCreatedEvent>(q.InputAddress);});mqConfig.ReceiveEndpoint(host, "user-synchronization", q =>{//set rabbitmq prefetch countq.PrefetchCount = 50;//q.UseRateLimit(100, TimeSpan.FromSeconds(1));//q.UseConcurrencyLimit(2);//set message retry policyq.UseMessageRetry(r => r.Interval(3, 100));q.Consumer<UserSyncEventConsumer>(provider);EndpointConvention.Map<UserSyncEvent>(q.InputAddress);});mqConfig.ConfigureEndpoints(provider);mqConfig.UseAuditingFilter(provider, o =>{o.ReplaceAuditing = true;});});// set authtication middleware for user identitybus.ConnectAuthenticationObservers(provider);return bus;});});
}

在MassTransit中,使用IBusControl接口 StartAsync 或 StopAsync 来启动或停止。

使用IPublishEndpoint重新实现IDistributedEventBus接口,实现与abp分布式事件总线集成。

Copy public class MassTransitDistributedEventBus : IDistributedEventBus, ISingletonDependency{private readonly IPublishEndpoint _publishEndpoint;//protected IHybridServiceScopeFactory ServiceScopeFactory { get; }protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }public MassTransitDistributedEventBus(IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,IPublishEndpoint publishEndpoint){//ServiceScopeFactory = serviceScopeFactory;_publishEndpoint = publishEndpoint;DistributedEventBusOptions = distributedEventBusOptions.Value;//Subscribe(distributedEventBusOptions.Value.Handlers);}/**  Not Implementation*/public Task PublishAsync<TEvent>(TEvent eventData)where TEvent : class{return _publishEndpoint.Publish(eventData);}public Task PublishAsync(Type eventType, object eventData){return _publishEndpoint.Publish(eventData, eventType);}}

到此,我们实现了MassTransit与Abp集成。

事件消息传递User Claims

在实际业务实现过程中,我们会用消息队列实现“削峰填谷”的效果。异步消息队列中传递用户身份信息如何实现呢?

我们先看看abp在WebApi中,如何确定当前用户?

ICurrentUser 提供当前User Claims抽象。而ICurrentUser依赖于ICurrentPrincipalAccessor,在Asp.Net core中利用HttpContext User 来记录当前用户身份。

在MassTransit中,利用IPublishObserver > IConsumeObserver 生产者/消费端的观察者,来实现传递已认证的用户Claims。

Copy    /// <summary>/// 生产者传递当前用户Principal/// </summary>public class AuthPublishObserver : IPublishObserver{private readonly ICurrentPrincipalAccessor _currentPrincipalAccessor;private readonly IClaimsPrincipalFactory _claimsPrincipalFactory;public AuthPublishObserver(ICurrentPrincipalAccessor currentPrincipalAccessor,IClaimsPrincipalFactory claimsPrincipalFactory){_currentPrincipalAccessor = currentPrincipalAccessor;_claimsPrincipalFactory = claimsPrincipalFactory;}public Task PrePublish<T>(PublishContext<T> context) where T : class{var claimsPrincipal = _claimsPrincipalFactory.CreateClaimsPrincipal(_currentPrincipalAccessor.Principal);if (claimsPrincipal != null){context.Headers.SetAuthenticationHeaders(claimsPrincipal);}return TaskUtil.Completed;}public Task PostPublish<T>(PublishContext<T> context) where T : class => TaskUtil.Completed;public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => TaskUtil.Completed;}
Copy/// <summary>/// 消费端从MqMessage Heads 中获取当前用户Principal,并赋值给HttpContext/// </summary>public class AuthConsumeObserver : IConsumeObserver{private readonly IHttpContextAccessor _httpContextAccessor;private readonly IServiceScopeFactory _factory;public AuthConsumeObserver(IHttpContextAccessor httpContextAccessor, IServiceScopeFactory factory){_httpContextAccessor = httpContextAccessor;_factory = factory;}public Task PreConsume<T>(ConsumeContext<T> context) where T : class{if (_httpContextAccessor.HttpContext == null){_httpContextAccessor.HttpContext = new DefaultHttpContext{RequestServices = _factory.CreateScope().ServiceProvider};}var abpClaimsPrincipal = context.Headers.TryGetAbpClaimsPrincipal();if (abpClaimsPrincipal != null && abpClaimsPrincipal.IsAuthenticated){var claimsPrincipal = abpClaimsPrincipal.ToClaimsPrincipal();_httpContextAccessor.HttpContext.User = claimsPrincipal;Thread.CurrentPrincipal = claimsPrincipal;}return TaskUtil.Completed;}public Task PostConsume<T>(ConsumeContext<T> context) where T : class{_httpContextAccessor.HttpContext = null;return TaskUtil.Completed;}public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class{_httpContextAccessor.HttpContext = null;return TaskUtil.Completed;}}

使用Asp.Net Core Web Host 作消费端Consumer宿主

基于以下几点原因,我们使用Asp.Net Core Web Host 作为消息端Consumer宿主

  1. 部署在Linux环境下,Asp.Net Core Web Host 通常使用守护进程来启动服务实例,这样可以保证服务不被中断。

  2. 根据abp vnext DDD 项目分层,最大程度利用Application层应用方法,复用abp vnext 框架机制。

MassTransit 深入研究

  1. 延迟消息

  2. 限流、熔断降级

  3. 批量消费

  4. Saga

References

  1. abp vnext disctributed event bus

  2. MassTransit

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/311482.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

16进制数用空格分开 tcp_面试时,你是否被问到过TCP/IP协议?

点击蓝字关注我们看到这句话&#xff0c;有没有感到很熟悉呀&#xff1f;相信很多人在面试的时候都被要求&#xff0c;很多人会觉得我们在实际开发中一般用不到这些知识&#xff0c;所以对这些东西不屑一顾。但是小编认为想要成为一个完美的网工,那么对这些基础知识必须要有一定…

直接使用汇编编写 .NET Standard 库

前言Common Language Runtime&#xff08;CLR&#xff09;是一个很强大的运行时&#xff0c;它接收 Common Intermediate Language&#xff08;CIL&#xff09; 的输入并最终产生机器代码并执行。CIL 在 CLR 上相当于 ASM 汇编代码的存在。CLR 之上的语言 C#、F#、VB.NET 等语言…

[蓝桥杯2016决赛]七星填数-next_permutation枚举

题目描述 如下图所示&#xff1a; 在七角星的14个节点上填入1~14 的数字&#xff0c;不重复&#xff0c;不遗漏。要求每条直线上的四个数字之和必须相等。 图中已经给出了3个数字。请计算其它位置要填充的数字&#xff0c;答案唯一。 填好后&#xff0c;请提交绿色节点的4个数…

系统蓝屏的几种姿势,确定不了解下么?

前言在 蓝屏&#xff08;BSOD&#xff09;转储设置&#xff0c;看本文就够了&#xff01;这篇文章里比较详细的介绍了蓝屏转储设置。做好设置后&#xff0c;我们就可以在需要的时候使系统蓝屏了。本文介绍几种使系统蓝屏的办法&#xff0c;当然肯定还有其它办法&#xff0c;如果…

最长公共子串-dp

题目: 给定两个字符串&#xff0c;求出它们之间最长的相同子字符串的长度。 公共子串和公共子序列不同&#xff0c;公共子序列不要求连续&#xff0c;但公共子串必须是连续的。如: A “helloworld” B “loop” A和B的最长公共子序列是"loo",但最长公共子串是&quo…

智能对话引擎:两天快速打造疫情问答机器人

01微软AI技术开源知识库疫情机器人近一个月来&#xff0c;“新冠肺炎疫情”成了所有人的热点话题&#xff0c;抗击疫情的战役在全国紧张有序地进行着。随着全国各地的企业陆续复工&#xff0c;怎样防范、保护自己和家人成了当下每个人的焦点。为了配合奋战在一线的医护人员打赢…

数码管

题目背景 小明的单片机上面的LED显示屏坏掉了&#xff0c;于是他请你来为他修显示屏。 屏幕上可以显示0~9的数字&#xff0c;其中每个数字由7个小二极管组成&#xff0c;各个数字对应的表示方式如图所示&#xff1a; 题目描述 为了排除电路故障&#xff0c;现在你需要计算&am…

fh 幅频特性曲线怎么画fl_初学者怎么练习线条?教你如何画出流畅线条的技巧...

初学者怎么练习线条&#xff1f;怎样才能画出流畅线条&#xff1f;画出流畅线条有哪些技巧&#xff1f;想必这些问题都是绘画初学者们比较伤脑筋的问题&#xff0c;那么到底怎样才能画出流畅线条呢&#xff1f;今天灵猫课堂老师就在网络上收集整理了关于初学者怎么练习线条&…

.NET Core开发实战(第12课:配置变更监听)--学习笔记

12 | 配置变更监听&#xff1a;配置热更新能力的核心这一节讲解如何使用代码来监视配置变化并做出一些动作当我们需要追踪配置发生的变化&#xff0c;可以在变化发生时执行一些特定的操作配置主要提供了一个 GetReloadToken 方法&#xff0c;这就是跟踪配置的关键方法接着使用上…

ASP.NET Core Web API基于RESTFul APIs的集合结果过滤和分页

译者荐语&#xff1a;如何在RESTFul APIs中进行集合结果分页&#xff1f;还是用客户端来拼接链接地址么&#xff1f;原文来自互联网&#xff0c;由长沙DotNET技术社区【邹溪源】翻译。如译文侵犯您的版权&#xff0c;请联系小编&#xff0c;小编将在24小时内删除。在ASP.NET Co…

.net 微服务实践

l 前言本文记录了我的一次.net core 微服务架构实践经验&#xff0c;以及所用到的技术l 优点每个服务聚焦于一块业务&#xff0c;无论在开发阶段或是部署阶段都是独立的&#xff0c;更适合被各个小团队开发维护&#xff0c;团队对服务的整个生命周期负责&#xff0c;工作在独…

redis过期监听性能_基于Redis的延迟处理

延迟处理是一个非常常用的一个功能;例如, 下单成功后,在30分钟内没有支付,自动取消订单;延迟队列便是延迟处理中最常见的实现方式;先一起看下JDK中延迟队列是如何实现的.JUC的DelayQueue在JDK中, 提供了一套延迟队列的实现, 是JUC包中DelayQueue类.在使用时只需要让处理的元素对…

【译】来看看WebWindow,一个跨平台的.NET Core webview 库

本文翻译自 ASP.NET 项目组的 Steve Sanderson 的博客&#xff0c;发表于 2019 年 11 月 18 日。Steve Sanderson 是 Blazor 最早的创造者。它类似于 Electron&#xff0c;但没有捆绑 Node.js 和 Chromium&#xff0c;也没有大部分 API。我的上一篇文章研究了如何用 web 渲染的…

sql if 和insert_拼多多面试:Mybatis是如何实现SQL语句复用功能的?

在工作中&#xff0c;往往有这样的需求&#xff0c;对于同一个sql条件查询&#xff0c;首先需要统计记录条数&#xff0c;用以计算pageCount&#xff0c;然后再对结果进行分页查询显示&#xff0c;看下面一个例子。<sql id"studentProperties"><!--sql片段-…

代码演示C#各版本新功能

代码演示C#各版本新功能C#各版本新功能其实都能在官网搜到&#xff0c;但很少有人整理在一起&#xff0c;并通过非常简短的代码将每个新特性演示出来。代码演示C#各版本新功能C# 2.0版 - 2005泛型分部类型匿名方法可以为null的值类型迭代器协变和逆变C# 3.0版 - 2007自动实现的…

《C++ Primer》1.52节练习

练习1.23 #include <iostream> #include "Sales_item.h"using namespace std;int main() {Sales_item trans1, trans2;cout << "请输入若干销售记录:" << endl;if (cin >> trans1) {int num 1;while (cin >> trans2)if (t…

ASP.NET Core 反向代理部署知多少

引言最近在折腾统一认证中心&#xff0c;看到开源项目[IdentityServer4.Admin&#xff1a;https://github.com/skoruba/IdentityServer4.Admin]集成了IdentityServer4和管理面板&#xff0c;就直接拿过来用了。在尝试Nginx部署时遇到了诸如虚拟目录映射&#xff0c;请求头超长、…

函数传参string_JavaScript 高阶函数入门浅析

原文&#xff1a;https://www.freecodecamp.org/news/a-quick-intro-to-higher-order-functions-in-javascript-1a014f89c6b/译者&#xff1a;jingruzhang校对者&#xff1a;acusp高阶函数高阶函数可以接收函数作为参数&#xff0c;同时也可以返回一个新的函数。高阶函数之所以…

.NET Core开发实战(第13课:配置绑定:使用强类型对象承载配置数据)--学习笔记...

13 | 配置绑定&#xff1a;使用强类型对象承载配置数据要点&#xff1a;1、支持将配置值绑定到已有对象2、支持将配置值绑定到私有属性上继续使用上一节代码首先定义一个类作为接收配置的实例class Config {public string Key1 { get; set; }public bool Key5 { get; set; }pub…