基于Abp VNext框架设计 - Masstransit分布式消息

abp 通过IDistributedEventBus接口集成自IEventBus实现分布式事件消息的发布订阅。

IEventBus在什么时机触发PublishAsync?

  1. 当前UnitOfWork完成时,触发IEventBusPublishAsync

  2. 在没有事务环境下,同步调用IEventBusPublishAsync

abp 默认实现基于RabbitMq消息队列Volo.Abp.EventBus.RabbitMQ实现分布式消息的发布与订阅。

消息治理核心问题:

  1. 生产端如何保证投递成功的消息不能丢失。

  2. Mq自身如何保证消息不丢失。

  3. 消费段如何保证消费端的消息不丢失。

基于abp 默认实现的DistributedEventBus不能满足以下场景:

  1. Publisher 生产者无法保证消息一定能投递到MQ。

  2. Consumer 消费端在消息消费时,出现异常时,没有异常错误处理机制(确保消费失败的消息能重新被消费)。

我们引入Masstransit,来提升abp对消息治理能力。

Masstransit提供以下开箱即用功能:

  1. Publish/Send/Request-Response等几种消息投递机制。

  2. 多种IOC容器支持。

  3. 异常机制。

  4. Saga事务管理。

  5. 事务活动补偿机制(Courier)

  6. 消息审计

  7. 消息管道处理机制

Abp 框架下事件消息集成

  1. 使用MassTransit重新实现IDistributedEventBus

  2. 在消费端Consumer传递用户身份信息。

  3. 使用Asp.Net Core Web Host 作消费端Consumer宿主。

集成MassTransit

在Module初始化时,注入MassTransit实例,并启动。

Copy/// <summary>
/// 配置DistributedEventBus
/// </summary>
/// <param name="context"></param>
/// <param name="configuration"></param>
/// <param name="hostingEnvironment"></param>
private void ConfigureDistributedEventBus(ServiceConfigurationContext context, IConfiguration configuration, IWebHostEnvironment hostingEnvironment)
{var options = context.Services.GetConfiguration().GetSection("Rabbitmq").Get<MassTransitEventBusOptions>();var mqConnectionString = "rabbitmq://" + options.ConnectionString;context.Services.AddMassTransit(mtConfig =>{//inject consumers into IOC from assemblymtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));mtConfig.AddBus(provider =>{var bus = Bus.Factory.CreateUsingRabbitMq(mqConfig =>{var host = mqConfig.Host(new Uri(mqConnectionString), h =>{h.Username(options.UserName);h.Password(options.Password);});// set special message serializermqConfig.UseBsonSerializer();// integrated existed logger compontentmqConfig.UseExtensionsLogging(provider.GetService<ILoggerFactory>());mqConfig.ReceiveEndpoint(host, "authcenter-queue", q =>{//set rabbitmq prefetch countq.PrefetchCount = 200;//set message retry policyq.UseMessageRetry(r => r.Interval(3, 100));q.Consumer<SmsTokenValidationCreatedEventConsumer>(provider);EndpointConvention.Map<SmsTokenValidationCreatedEvent>(q.InputAddress);});mqConfig.ReceiveEndpoint(host, "user-synchronization", q =>{//set rabbitmq prefetch countq.PrefetchCount = 50;//q.UseRateLimit(100, TimeSpan.FromSeconds(1));//q.UseConcurrencyLimit(2);//set message retry policyq.UseMessageRetry(r => r.Interval(3, 100));q.Consumer<UserSyncEventConsumer>(provider);EndpointConvention.Map<UserSyncEvent>(q.InputAddress);});mqConfig.ConfigureEndpoints(provider);mqConfig.UseAuditingFilter(provider, o =>{o.ReplaceAuditing = true;});});// set authtication middleware for user identitybus.ConnectAuthenticationObservers(provider);return bus;});});
}

在MassTransit中,使用IBusControl接口 StartAsync 或 StopAsync 来启动或停止。

使用IPublishEndpoint重新实现IDistributedEventBus接口,实现与abp分布式事件总线集成。

Copy public class MassTransitDistributedEventBus : IDistributedEventBus, ISingletonDependency{private readonly IPublishEndpoint _publishEndpoint;//protected IHybridServiceScopeFactory ServiceScopeFactory { get; }protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }public MassTransitDistributedEventBus(IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,IPublishEndpoint publishEndpoint){//ServiceScopeFactory = serviceScopeFactory;_publishEndpoint = publishEndpoint;DistributedEventBusOptions = distributedEventBusOptions.Value;//Subscribe(distributedEventBusOptions.Value.Handlers);}/**  Not Implementation*/public Task PublishAsync<TEvent>(TEvent eventData)where TEvent : class{return _publishEndpoint.Publish(eventData);}public Task PublishAsync(Type eventType, object eventData){return _publishEndpoint.Publish(eventData, eventType);}}

到此,我们实现了MassTransit与Abp集成。

事件消息传递User Claims

在实际业务实现过程中,我们会用消息队列实现“削峰填谷”的效果。异步消息队列中传递用户身份信息如何实现呢?

我们先看看abp在WebApi中,如何确定当前用户?

ICurrentUser 提供当前User Claims抽象。而ICurrentUser依赖于ICurrentPrincipalAccessor,在Asp.Net core中利用HttpContext User 来记录当前用户身份。

在MassTransit中,利用IPublishObserver > IConsumeObserver 生产者/消费端的观察者,来实现传递已认证的用户Claims。

Copy    /// <summary>/// 生产者传递当前用户Principal/// </summary>public class AuthPublishObserver : IPublishObserver{private readonly ICurrentPrincipalAccessor _currentPrincipalAccessor;private readonly IClaimsPrincipalFactory _claimsPrincipalFactory;public AuthPublishObserver(ICurrentPrincipalAccessor currentPrincipalAccessor,IClaimsPrincipalFactory claimsPrincipalFactory){_currentPrincipalAccessor = currentPrincipalAccessor;_claimsPrincipalFactory = claimsPrincipalFactory;}public Task PrePublish<T>(PublishContext<T> context) where T : class{var claimsPrincipal = _claimsPrincipalFactory.CreateClaimsPrincipal(_currentPrincipalAccessor.Principal);if (claimsPrincipal != null){context.Headers.SetAuthenticationHeaders(claimsPrincipal);}return TaskUtil.Completed;}public Task PostPublish<T>(PublishContext<T> context) where T : class => TaskUtil.Completed;public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => TaskUtil.Completed;}
Copy/// <summary>/// 消费端从MqMessage Heads 中获取当前用户Principal,并赋值给HttpContext/// </summary>public class AuthConsumeObserver : IConsumeObserver{private readonly IHttpContextAccessor _httpContextAccessor;private readonly IServiceScopeFactory _factory;public AuthConsumeObserver(IHttpContextAccessor httpContextAccessor, IServiceScopeFactory factory){_httpContextAccessor = httpContextAccessor;_factory = factory;}public Task PreConsume<T>(ConsumeContext<T> context) where T : class{if (_httpContextAccessor.HttpContext == null){_httpContextAccessor.HttpContext = new DefaultHttpContext{RequestServices = _factory.CreateScope().ServiceProvider};}var abpClaimsPrincipal = context.Headers.TryGetAbpClaimsPrincipal();if (abpClaimsPrincipal != null && abpClaimsPrincipal.IsAuthenticated){var claimsPrincipal = abpClaimsPrincipal.ToClaimsPrincipal();_httpContextAccessor.HttpContext.User = claimsPrincipal;Thread.CurrentPrincipal = claimsPrincipal;}return TaskUtil.Completed;}public Task PostConsume<T>(ConsumeContext<T> context) where T : class{_httpContextAccessor.HttpContext = null;return TaskUtil.Completed;}public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class{_httpContextAccessor.HttpContext = null;return TaskUtil.Completed;}}

使用Asp.Net Core Web Host 作消费端Consumer宿主

基于以下几点原因,我们使用Asp.Net Core Web Host 作为消息端Consumer宿主

  1. 部署在Linux环境下,Asp.Net Core Web Host 通常使用守护进程来启动服务实例,这样可以保证服务不被中断。

  2. 根据abp vnext DDD 项目分层,最大程度利用Application层应用方法,复用abp vnext 框架机制。

MassTransit 深入研究

  1. 延迟消息

  2. 限流、熔断降级

  3. 批量消费

  4. Saga

References

  1. abp vnext disctributed event bus

  2. MassTransit

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/311482.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[蓝桥杯2018决赛]换零钞-枚举

题目描述 x星球的钞票的面额只有&#xff1a;100元&#xff0c;5元&#xff0c;2元&#xff0c;1元&#xff0c;共4种。 小明去x星旅游&#xff0c;他手里只有2张100元的x星币&#xff0c;太不方便&#xff0c;恰好路过x星银行就去换零钱。 小明有点强迫症&#xff0c;他坚持要…

16进制数用空格分开 tcp_面试时,你是否被问到过TCP/IP协议?

点击蓝字关注我们看到这句话&#xff0c;有没有感到很熟悉呀&#xff1f;相信很多人在面试的时候都被要求&#xff0c;很多人会觉得我们在实际开发中一般用不到这些知识&#xff0c;所以对这些东西不屑一顾。但是小编认为想要成为一个完美的网工,那么对这些基础知识必须要有一定…

直接使用汇编编写 .NET Standard 库

前言Common Language Runtime&#xff08;CLR&#xff09;是一个很强大的运行时&#xff0c;它接收 Common Intermediate Language&#xff08;CIL&#xff09; 的输入并最终产生机器代码并执行。CIL 在 CLR 上相当于 ASM 汇编代码的存在。CLR 之上的语言 C#、F#、VB.NET 等语言…

[蓝桥杯2016决赛]七星填数-next_permutation枚举

题目描述 如下图所示&#xff1a; 在七角星的14个节点上填入1~14 的数字&#xff0c;不重复&#xff0c;不遗漏。要求每条直线上的四个数字之和必须相等。 图中已经给出了3个数字。请计算其它位置要填充的数字&#xff0c;答案唯一。 填好后&#xff0c;请提交绿色节点的4个数…

python的argsort函数_python——argsort函数

numpy中argsort函数用法&#xff0c;有需要的朋友可以参考下。在Python中使用help帮助>>> import numpy>>> help(numpy.argsort)Help on function argsort in module numpy.core.fromnumeric:argsort(a, axis-1, kindquicksort, orderNone)Returns the indic…

第三个一千行+500行总结-数据结构C复习--知识点总结3--七到九章

第七章 (接知识点总结2) 图 图的遍历: //深度优先搜索 #define OK 1 #define True 1 #define Error -1 #define False 0 typedef enum{DG, DN, UDG. UDN}Graph; int visited[MAX]; //Graph代表图的一种存储结构比如邻接表,邻接矩阵 void TranverseGraph(Graph g){ int…

系统蓝屏的几种姿势,确定不了解下么?

前言在 蓝屏&#xff08;BSOD&#xff09;转储设置&#xff0c;看本文就够了&#xff01;这篇文章里比较详细的介绍了蓝屏转储设置。做好设置后&#xff0c;我们就可以在需要的时候使系统蓝屏了。本文介绍几种使系统蓝屏的办法&#xff0c;当然肯定还有其它办法&#xff0c;如果…

最长公共子串-dp

题目: 给定两个字符串&#xff0c;求出它们之间最长的相同子字符串的长度。 公共子串和公共子序列不同&#xff0c;公共子序列不要求连续&#xff0c;但公共子串必须是连续的。如: A “helloworld” B “loop” A和B的最长公共子序列是"loo",但最长公共子串是&quo…

the python challenge_The Python Challenge 谜题全解(持续更新)

Python Challenge(0-2)是个很有意思的网站&#xff0c;可以磨练使用python的技巧&#xff0c;每一关都有挑战&#xff0c;要编写相应的代码算出关键词&#xff0c;才可以获取下一关的url&#xff0c;还是很好玩的QAQLEVEL 0显然是计算图片中的\(2^{38}\)&#xff0c;结果为2748…

智能对话引擎:两天快速打造疫情问答机器人

01微软AI技术开源知识库疫情机器人近一个月来&#xff0c;“新冠肺炎疫情”成了所有人的热点话题&#xff0c;抗击疫情的战役在全国紧张有序地进行着。随着全国各地的企业陆续复工&#xff0c;怎样防范、保护自己和家人成了当下每个人的焦点。为了配合奋战在一线的医护人员打赢…

数码管

题目背景 小明的单片机上面的LED显示屏坏掉了&#xff0c;于是他请你来为他修显示屏。 屏幕上可以显示0~9的数字&#xff0c;其中每个数字由7个小二极管组成&#xff0c;各个数字对应的表示方式如图所示&#xff1a; 题目描述 为了排除电路故障&#xff0c;现在你需要计算&am…

fh 幅频特性曲线怎么画fl_初学者怎么练习线条?教你如何画出流畅线条的技巧...

初学者怎么练习线条&#xff1f;怎样才能画出流畅线条&#xff1f;画出流畅线条有哪些技巧&#xff1f;想必这些问题都是绘画初学者们比较伤脑筋的问题&#xff0c;那么到底怎样才能画出流畅线条呢&#xff1f;今天灵猫课堂老师就在网络上收集整理了关于初学者怎么练习线条&…

.NET Core开发实战(第12课:配置变更监听)--学习笔记

12 | 配置变更监听&#xff1a;配置热更新能力的核心这一节讲解如何使用代码来监视配置变化并做出一些动作当我们需要追踪配置发生的变化&#xff0c;可以在变化发生时执行一些特定的操作配置主要提供了一个 GetReloadToken 方法&#xff0c;这就是跟踪配置的关键方法接着使用上…

icoding复习1,2

icoding复习 1 链表 倒数查找 1. 已知一个带有表头结点的单链表, 假设链表只给出了头指针L。在不改变链表的前提下&#xff0c;请设计一个尽可能高效的算法&#xff0c; 查找链表中倒数第k个位置上的结点&#xff08;k为正整数&#xff09;。 函数原型为&#xff1a;int lnk_s…

密电破译-dp

题目背景 墨家家主召集弟子的原因是因为截获了密电并破获了重大情报&#xff0c;“公主薨&#xff0c;国王失踪&#xff0c;墨家即将面临灭顶之灾”。 题目描述 密电是由大小写字母组成字符串&#xff0c;密电之所以能破译是因为墨家掌握了破解方法&#xff0c;密钥是一个整数…

ASP.NET Core Web API基于RESTFul APIs的集合结果过滤和分页

译者荐语&#xff1a;如何在RESTFul APIs中进行集合结果分页&#xff1f;还是用客户端来拼接链接地址么&#xff1f;原文来自互联网&#xff0c;由长沙DotNET技术社区【邹溪源】翻译。如译文侵犯您的版权&#xff0c;请联系小编&#xff0c;小编将在24小时内删除。在ASP.NET Co…

icoding复习6 图

icoding复习6 1. 邻接表1 试在邻接表存储结构上实现图的基本操作 insert_vertex 和 insert_arc&#xff0c;相关定义如下&#xff1a; typedef int VertexType; typedef enum{ DG, UDG }GraphType; typedef struct ArcNode{ int adjvex; InfoPtr *info; stru…

python帮助系统函数_【Python】【基础知识】【内置函数】【help的使用方法】

原英文帮助文档&#xff1a;help([object])Invoke the built-in help system. (This function is intended for interactive use.) If no argument is given, the interactive help system starts on the interpreter console. If the argument is a string, then the string i…

统计二进制数-dp

题目描述 输入一个正整数m,请输出从0到m中每一个数字二进制数中含有1的个数的总和,由于数值较大结果需要模100000. 输入格式 一个m 输出格式 二进制数中含有1的个数的总和s 输入输出样例 输入 2 输出 2 输入 5 输出 7 说明/提示 样例说明 20%的数据 m<500 50%的数据 m<…

.net 微服务实践

l 前言本文记录了我的一次.net core 微服务架构实践经验&#xff0c;以及所用到的技术l 优点每个服务聚焦于一块业务&#xff0c;无论在开发阶段或是部署阶段都是独立的&#xff0c;更适合被各个小团队开发维护&#xff0c;团队对服务的整个生命周期负责&#xff0c;工作在独…