MassTransit中RequestResponse基本使用

MassTransit 是一个自由、开源、轻量级的消息总线基于.Net框架, 用于创建分布式应用程序。方便搭建基于消息的松耦合异步通信的应用程序和服务。MassTransit 在现有消息传输上提供了一组广泛的功能, 从而使开发人员能够友好地使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠和可扩展的方式。

官网地址:http://masstransit-project.com/

发布订阅模式

这种场景十分常见,发送一个消息(或事件)到消息队列中,有一个或是多个订阅方对预期的消息接收处理。

基于需要搭建了两个WebApi程序,用于模拟发送方和订阅方,其中的RabbitMQ已预先搭建好了,只在程序中引用包配置下即可。

<PackageReference Include="MassTransit" Version="7.2.0" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.2.0" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.2.0" />

发布端配置

在Startup中增加MassTransit需要的服务及初始化配置。

  • 对RabbitMQ的连接地址端口、虚拟主机、访问账号密码等系列配置。

  • 对发送方需要发送的消息初始化一个请求客户端,配置请求信息及推送到MQ的地址。

services.AddMassTransit(x =>
{x.UsingRabbitMq((context, cfg) =>{cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{h.Username(Configuration["RabbitmqConfig:Username"]);h.Password(Configuration["RabbitmqConfig:Password"]);});});x.AddRequestClient<ValueEntered>(new Uri(GetServiceAddress("events-valueentered")));
});
services.AddMassTransitHostedService();

为了快速了解,使用Controller在Action中发起对MQ的消息推送

[ApiController]
[Route("[controller]")]
public class ValueController : ControllerBase
{readonly IPublishEndpoint _publishEndpoint;public ValueController(IPublishEndpoint publishEndpoint){_publishEndpoint = publishEndpoint;}[HttpPost]public async Task<ActionResult> Post(string value){await _publishEndpoint.Publish<ValueEntered>(new{Value = value});return Ok();}
}

订阅端配置

订阅端也创建一个WebApi应用,在Startup中增加MassTransit的服务,使用到的Nuget包和发布端一样。

  • 对RabbitMQ的连接地址端口、虚拟主机、访问账号密码等系列配置。

  • 为订阅端增加一个订阅处理的Handler,即如下的ValueEnteredEventConsumer

  • 增加一个接受点,指定队列名称,即发送端发送的队列名称,设置该队列消费处理的Consumer,即ValueEnteredEventConsumer

services.AddMassTransit(x =>
{x.AddConsumer<ValueEnteredEventConsumer>();x.UsingRabbitMq((context, cfg) =>{cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{h.Username(Configuration["RabbitmqConfig:Username"]);h.Password(Configuration["RabbitmqConfig:Password"]);});cfg.ReceiveEndpoint("events-valueentered", e =>{e.ConfigureConsumer<ValueEnteredEventConsumer>(context);});});
});
services.AddMassTransitHostedService();

如此一来,通过Postman发送一个请求,经发布端发布一个消息到RabbitMQ,订阅端侦听消息,处理消息,一切都很熟悉。

请求响应模式

在发布订阅的基础上,改变以往的习惯,当发布一个消息后,等待订阅方的处理,并将消息推送回RabbitMQ,发送方接受到处理后的消息继续执行。

请求端

在Startup中新加上一个用于发送消息(CheckOrderStatus)的请求客户端及指定消息队列名称(为每一个消息创建一个单独的队列)。

x.AddRequestClient<CheckOrderStatus>(new Uri(GetServiceAddress("events-checkorderstatus")));

增加一个Controller及Action,来请求及获取处理结果(OrderStatusResult)。

[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{private readonly IRequestClient<CheckOrderStatus> _client;public OrderController(IRequestClient<CheckOrderStatus> client){_client = client;}public async Task<OrderStatusResult> Get(string id){var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = id });return response.Message;}
}

响应端

同样在响应端Startup中对新的消息设置下消息侦听队列以及相应的Handler如下的ValueEnteredEventConsumer去消费消息并返回处理结果。

x.AddConsumer<CheckOrderStatusConsumer >();
x.UsingRabbitMq((context, cfg) =>
{// ...cfg.ReceiveEndpoint("events-checkorderstatus", e =>{e.ConfigureConsumer<CheckOrderStatusConsumer >(context);});
});

Consumer中获取请求参数,执行请求,返回执行结果。

public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{public async Task Consume(ConsumeContext<CheckOrderStatus> context){if (context.Message.OrderId == "9527"){throw new InvalidOperationException("Order not found");}Console.WriteLine($"OrderId:{context.Message.OrderId}");await context.RespondAsync<OrderStatusResult>(new{OrderId = context.Message.OrderId,Timestamp = Guid.NewGuid().ToString(),StatusCode = "1",StatusText = "Close"});}
}

这样一来,当请求端发起一个消息(事件)到RabbitMQ,响应端侦听并处理完毕返回处理结果到RabbitMQ,请求端依照响应结果继续执行后续请求。

HTTP方式差异

与以往的Http请求方式有所不同,通过httpClient.PostAsync发送请求,接收端处理并返回结果,而走requestClient发送请求到RabbitMQ,再由RabbitMQ推送到侦听节点消费并返回结果,如下第一二部分结构。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/300317.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

php2612,达人曝光LGLSNJ2612AR质量好吗?怎么样呢?体验报告揭秘

其实这个LGLSNJ2612AR还可以的 哟&#xff0c;超凡的造型设计和外观&#xff0c;做工细腻&#xff0c;不少的人想要了解这LGLSNJ2612AR怎么样&#xff1f;质量好不好&#xff1f;实实在在的说这款LGLSNJ2612AR使用体验后个人感觉蛮可以的哈&#xff0c;刚购没多久的&#xff0c…

vim编辑技巧

转载于:https://blog.51cto.com/ovcer/1353415

IT基础设施最佳实践ITIL

IT基础设施最佳实践ITIL1. 前言1.1.困扰问题随着IT技术在企业中的应用越广&#xff0c;企业业务流程的正常运作就越离不开IT部门的支持。IT在给企业业务带来效益的同时&#xff0c;也带来了成本和风险的 困扰&#xff0c;尤其是在某些特殊行业&#xff0c;例如电信、金融等行业…

一个本科生,只用了两年就拿下诺贝尔奖,拯救了无数糖尿病患者

全世界只有3.14 % 的人关注了青少年数学之旅早在三千五百年前&#xff0c;古埃及就已经有对糖尿病的描述&#xff0c;两千多年前&#xff0c;古希腊医生亚的阿勒特奥斯把这种主要症状为“排泄多且甜的尿液”的疾病命名为“Diabetes”&#xff0c;也就是糖尿病。几千年来&#x…

C#中使用jieba.NET、WordCloudSharp制作词云图

词云简介“词云”由美国西北大学新闻学副教授、新媒体专业主任里奇戈登&#xff08;Rich Gordon&#xff09;于2006年最先使用&#xff0c;是通过形成“关键词云层”或“关键词渲染”&#xff0c;对文本中出现频率较高的“关键词”的视觉上的突出。网上大部分文章介绍的是使用P…

js框架页弹出页面关闭

2019独角兽企业重金招聘Python工程师标准>>> <!-- lang: js --> function to(href) {if (top.location self.location) {//window.opener null;window.opener.location.reload();window.open(, _self);window.close();} else {window.location.href href;}…

狂言50年要拿30个诺奖的日本,如今怎么样了?

全世界只有3.14 % 的人关注了青少年数学之旅2019年10月9日&#xff0c;日本化学家吉野彰&#xff08;Akira Yoshino&#xff09;因在锂离子电池的发明和应用领域做出的卓越贡献&#xff0c;与美国科学家 John B. Goodenough、英国科学奖 M. Stanley Whittingham 一起荣获2019年…

VS2022+.NET6+C#10,.NET开发起飞

VS2022.NET6C#10一起体验是啥感觉&#xff1f;爽&#xff01;令人印象深刻的是VS2022打开超大项目的流畅&#xff0c;.NET6极致简化的MiniAPI框架&#xff0c;C#10各种炫酷新语法。看看下图的代码你能认识吗&#xff1f;来自.NET6的MiniAPI框架&#xff0c;直接在MapGet里面使用…

jdk8之lambda

2019独角兽企业重金招聘Python工程师标准>>> Oracle号称今年一定发布jdk8, 即使有bug, jdk8无疑最大的宠儿就是lambda表达式了&#xff0c;还是直接上代码&#xff0c;看看lambda表达式怎么了。 环境&#xff1a; openjdk8, eclipse4.3.1(支持jdk8编译) 实例代码&…

【转】Asp.Net中Excel操作权限的问题

近日在打开原来写的一个网页程序运行时&#xff0c;出现了Excel操作权限的问题&#xff0c;具体的说就是在代码中调用下面这段与Excel操作有关的语句时 Application curExcelApp new ApplicationClass(); 提示权限不足&#xff0c;具体的提示内容如下&#xff1a; 检索 COM 类…

Visual Studio 2022 预览版2 发布啦

我们很高兴地宣布 Visual Studio 2022 的第二个预览版发布啦&#xff01;预览版 1 是有史以来第一个 64 位 Visual Studio&#xff0c;提供了改进的可扩展性。从预览版 2 开始&#xff0c;我们专注于提供有关个人和团队生产力、现代开发和不断创新等主题的新功能。在本文中&…

批作业是小学老师的一大乐趣 | 今日最佳

全世界只有3.14 % 的人关注了青少年数学之旅

【小安翻唱】凉宫春日的忧郁--冒険でしょでしょ第五届外语歌曲大赛助兴节目~绫魂论坛送aya的生日礼物筹备开始~...

第五届外语歌曲大赛如火如荼的进行的听了那么多参赛选手的歌曲自己也不由的也想唱唱正好遇上绫魂论坛的各位管理员为aya筹备生日礼物 好吧~ 我也来小唱一首 小小的final版送上~&#xff08;非正式版 请无视错词和中间以及最后的杂音 灭哈哈&#xff09; 顺便再借用了一下某位选…

WCF后续之旅(11): 关于并发、回调的线程关联性(Thread Affinity)

对于一般的多线程操作&#xff0c;比如异步地进行基于文件系统的IO操作&#xff1b;异步地调用Web Service&#xff1b;或者是异步地进行数据库访问等等&#xff0c;是和具体的线程无关的。也就是说&#xff0c;对于这些操作&#xff0c;任意创建一个新的线程来执行都是等效的。…

成年人改变生活的方式,都是从它开始

全世界只有3.14 % 的人关注了青少年数学之旅2019已经不足80天&#xff0c;年初立下的flag倒了几个&#xff1f;史蒂夫马丁有一句话&#xff1a;“所有的人生谜语都可以从阅读中找到答案。”无论是读影评、读好书&#xff0c;亦或者涉猎趣闻轶事、汲取犀利观点&#xff0c;总会悄…

表白和被表白时遇到的骚操作,最后一个天秀!| 今日最佳

全世界只有3.14 % 的人关注了青少年数学之旅【1】【2】【3】【4】【5】【6】【7】【8】【9】

这么简单的序列化 System.Text.Json.Serialization 也报错了?

咨询区 kofifus&#xff1a;我准备将项目中的 JSON.NET 切换到原生的 System.Text.Json&#xff0c;但我遇到了个意外报错&#xff0c;测试代码如下&#xff1a;using System.Text.Json.Serialization; using Newtonsoft.Json;public class C {public C(string PracticeName) {…

直观机械动图,一秒看懂原理!

全世界只有3.14 % 的人关注了青少年数学之旅这些神奇唯美的机械动图&#xff0c;让我们不得不感叹制造的魅力&#xff01;仿生四翼飞行机器人▲单手磁性拉链▲风洞实验——模拟飞机飞行▲可以说是最快的折弯机了▲这是谁设计的死循环▲高压水除锈▲摩擦焊接▲打地洞▲能轻松把车…

数据校验器架构模式组

刘 岳林 (yuelin_liumsn.com), 软件工程师2007 年 1 月 15 日本文阐述软件架构与设计模式&#xff0c;它为架构师和开发人员提供了一组关于数据校验的架构模式&#xff08;隔离校验器&#xff0c;可组装校验器&#xff0c;动态策略校验器&#xff0c;动态注册校验器等&#xff…

罗斯文2007(Northwind 2007)数据库、Access 2007 样列数据库分析[转]

Northwind 2007 是 MS 的一套销售类样列库&#xff0c;本片文章主要描述&#xff0c;它的结构及关系。 起因:   前些阵子一直想做一个虚拟项目来学习巩固一下 .net3.0 的一些新技术、为什么要做虚拟项目&#xff1f; 当然是系统的学习一些东西了、我对虚拟项目的要求是不能太…