如何在ASP.NET Core中使用Azure Service Bus Queue

原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES

作者:damienbod[1] 

译文:如何在ASP.NET Core中使用Azure Service Bus Queue

地址:https://www.cnblogs.com/lwqlun/p/10760227.html

作者:Lamond Lu

源代码:https://github.com/lamondlu/AzureServiceBusMessaging

640?wx_fmt=png

本文展示了如何使用Azure Service Bus Queue, 实现2个ASP.NET Core Api应用之间的消息传输。

配置Azure Service Bus Queue

你可以从官网文档中了解到如何配置一个Azure Service Bus Queue.

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal

这里我们使用Queue或者Topic来实现消息传输。Queue是一种消息传输类型,一旦一个消息被一个消费者接收了,该消息就会从Queue中被移除。

与Queue不同,Topic提供的是一对多的通讯方式。

架构图

整个应用的实现如下:

640?wx_fmt=png

•Api 1负责发送消息•Api 2负责监听Azure Service Bus,并处理接收到的消息

实现一个Service Bus Queue

这里我们首先需要引入Microsoft.Azure.ServiceBus[2]  程序集。Microsoft.Azure.ServiceBus[3]是Azure Service Bus的客户端库。针对Service Bus的连接字符串我们保存在项目的User Secret中。当部署项目的时候,我们可以使用Azure Key Valut来设置这个Secret值。

在Visual Studio中,右键点击API1, API2项目属性,选择Manage User Secrets就可以管理当前项目使用的所有私密信息。

为了发送向Azure Service Bus Queue发送消息,我们需要创建一个SendMessage方法,并接收一个消息参数。这里我们创建了一个我们自己的消息内容类型MyPayload, 将当前该MyPayload对象序列化成Json字符串, 添加到一个Message对象中。

using Microsoft.Azure.ServiceBus;using Microsoft.Extensions.Configuration;using Newtonsoft.Json;using System.Text;using System.Threading.Tasks;
namespace ServiceBusMessaging{ public class ServiceBusSender { private readonly QueueClient _queueClient; private readonly IConfiguration _configuration; private const string QUEUE_NAME = "simplequeue";
public ServiceBusSender(IConfiguration configuration) { _configuration = configuration; _queueClient = new QueueClient( _configuration .GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME); }
public async Task SendMessage(MyPayload payload) { string data = JsonConvert.SerializeObject(payload); Message message = new Message(Encoding.UTF8.GetBytes(data));
await _queueClient.SendAsync(message); } }}

在API 1和API 2中,我们需要将ServiceBusSender注册到应用程序的IOC容器中。这里为了测试方便,我们同时注册Swagger服务。

public void ConfigureServices(IServiceCollection services){    services.AddMvc();
services.AddScoped<ServiceBusSender>();
services.AddSwaggerGen(c => { c.SwaggerDoc("v1", new Info { Version = "v1", Title = "Payload View API", }); });}

接下来,我们就可以在控制器中通过构造函数注入的方式使用这个服务了。

在API1中,我们创建一个POST方法,这个方法会将API接收到Payload对象发送到Azure Service Bus Queue中。

[HttpPost][ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)][ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]public async Task<IActionResult> Create([FromBody][Required]Payload request){    if (data.Any(d => d.Id == request.Id))    {        return Conflict($"data with id {request.Id} already exists");    }
data.Add(request);
// Send this to the bus for the other services await _serviceBusSender.SendMessage(new MyPayload { Goals = request.Goals, Name = request.Name, Delete = false });
return Ok(request);}

从Queue中获取消息

为了监听Azure Service Bus Queue, 并处理接收到的消息,我们创建了一个新类ServiceBusConsumerServiceBusConsumer实现了IServiceBusConsumer接口。

Queue的连接字符串是使用IConfiguration读取的。RegisterOnMessageHandlerAndReceiveMessages方法负责注册消息处理程序ProcessMessagesAsync处理消息。ProcessMessagesAsync方法会将得到的消息转换成对象,并调用IProcessData接口完成最终的消息处理。

using Microsoft.Azure.ServiceBus;using Microsoft.Extensions.Configuration;using Microsoft.Extensions.Logging;using Newtonsoft.Json;using System.Text;using System.Threading;using System.Threading.Tasks;
namespace ServiceBusMessaging{ public interface IServiceBusConsumer { void RegisterOnMessageHandlerAndReceiveMessages(); Task CloseQueueAsync(); }
public class ServiceBusConsumer : IServiceBusConsumer { private readonly IProcessData _processData; private readonly IConfiguration _configuration; private readonly QueueClient _queueClient; private const string QUEUE_NAME = "simplequeue"; private readonly ILogger _logger;
public ServiceBusConsumer(IProcessData processData, IConfiguration configuration, ILogger<ServiceBusConsumer> logger) { _processData = processData; _configuration = configuration; _logger = logger; _queueClient = new QueueClient( _configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME); }
public void RegisterOnMessageHandlerAndReceiveMessages() { var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 1, AutoComplete = false };
_queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions); }
private async Task ProcessMessagesAsync(Message message, CancellationToken token) { var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body)); _processData.Process(myPayload); await _queueClient.CompleteAsync(message.SystemProperties.LockToken); }
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) { _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception"); var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
_logger.LogDebug($"- Endpoint: {context.Endpoint}"); _logger.LogDebug($"- Entity Path: {context.EntityPath}"); _logger.LogDebug($"- Executing Action: {context.Action}");
return Task.CompletedTask; }
public async Task CloseQueueAsync() { await _queueClient.CloseAsync(); } }}

其中IProcessData接口存在于类库项目ServiceBusMessaging中,它是用来处理消息的。

public interface IProcessData{    void Process(MyPayload myPayload);}

在Api 2中,我们创建一个ProcessData类,它实现了IProcessData接口。

public class ProcessData : IProcessData{    public void Process(MyPayload myPayload)    {        DataServiceSimi.Data.Add(new Payload        {            Name = myPayload.Name,            Goals = myPayload.Goals        });    }}

这里为了简单测试,我们创建了一个静态类DataServiceSimi,其中存放了API2中所有保存Payload对象。同时,我们还创建了一个新的控制器ViewPayloadMessagesController,在其中添加了一个GET Action,并返回了静态类DataServiceSimi中的所有数据。

[Route("api/[controller]")][ApiController]public class ViewPayloadMessagesController : ControllerBase{    [HttpGet]    [ProducesResponseType(StatusCodes.Status200OK)]    public ActionResult<List<Payload>> Get()    {        return Ok(DataServiceSimi.Data);    }}

最后我们还需要将ProcessData注册到API2的IOC容器中。

public void ConfigureServices(IServiceCollection services){    services.AddMvc();
services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>(); services.AddTransient<IProcessData, ProcessData>();}

最终效果

现在我们分别启用2个Api项目,并在Api 1的Swagger文档界面,调用POST请求,添加一个Payload

640?wx_fmt=png

操作完成之后,我们访问Api 2的/api/ViewPayloadMessages, 获得结果如下,Api 1发出的消息出现在了Api 2的结果集中,这说明Api 2从Azure Service Bus Queue中获取了消息,并保存在了自己的静态类DataServiceSimi中。

640?wx_fmt=png

References

[1] damienbod: https://damienbod.com/author/damienbod/
[2] Microsoft.Azure.ServiceBus: https://www.nuget.org/packages/Microsoft.Azure.ServiceBus
[3] Microsoft.Azure.ServiceBus: https://www.nuget.org/packages/Microsoft.Azure.ServiceBus

.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com 
640?wx_fmt=jpeg


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/316159.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NEC Programming Contest 2021 (AtCoder Beginner Contest 229)

终于开始补提了 重点 : C&#xff0c; E的倒着算&#xff0c; F的染色&#xff0c;G的相邻的转换&#xff1b; B - Hard Calculation #include <iostream> #include <algorithm> #include <cstring> #include <vector> #include <cmath> #inclu…

2021银川Problem D. Farm(不保证正确性)

2021银川Problem D. Farm &#xff08;注&#xff1a;由于没有数据&#xff0c;暂不保证正确性&#xff09; 题意&#xff1a; 有n个点&#xff0c;m个有权边&#xff0c;有q个限制条件&#xff0c;每个限制条件有两个选择&#xff1a;选u个边&#xff0c;选第v个边&#xff…

从B站的代码泄露事件中,我们能学到些什么?

先声明一下&#xff0c;本文不聊ISSUE中的七七八八&#xff0c;也不聊代码是否写的好&#xff0c;更不聊是不是跟蔡徐坤有关之类的吃瓜内容。仅站在技术人的角度&#xff0c;从这次的代码泄露事件&#xff0c;聊聊在代码的安全管理上&#xff0c;通常都需要做哪些事来预防此类事…

Educational Codeforces Round 117 (Rated for Div. 2)

A. Distance B. Special Permutation C. Chat Ban D.X-Magic Pair E. Messages F&#xff1a;没看F&#xff0c;好难的样子 G. Max Sum Array #include <iostream> #include <algorithm> #include <cstring> #include <vector> #include <cmath>…

[Abp vNext 源码分析] - 3. 依赖注入与拦截器

一、简要说明ABP vNext 框架在使用依赖注入服务的时候&#xff0c;是直接使用的微软提供的 Microsoft.Extensions.DependencyInjection 包。这里与原来的 ABP 框架就不一样了&#xff0c;原来的 ABP 框架还需要抽象出来一个 IIocManager 用来管理整个 IoC 容器&#xff0c;现在…

hdu 7111-Remove

[hdu 7111] Brunhilda’s Birthday&#xff09; 题意&#xff1a; 和P6756 [BalticOI2013] Brunhilda’s Birthday&#xff09;一样的 给你p个质数集&#xff0c;您可以进行任意多次操作&#xff0c;每一次操作时&#xff0c;您选择一个素数pip_{i}pi​,这会使得n->⌊npi⌋…

Codeforces Round #757 (Div. 2)

A. Divan and a Store B. Divan and a New Project C. Divan and bitwise operations D1. Divan and Kostomuksha (easy version) D2. Divan and Kostomuksha (hard version) E. Divan and a Cottage 排序贪心 #include <iostream> #include <algorithm> #include…

dotnet core 微服务教程

这个教程主要是对于第一次使用dotnet core开发的同学。运行环境是在centos 7 &#xff0c;使用了docker容器。即这是一篇运行在linux的docker容器上的微服务的简单应用。一. 安装.NET SDK安装.NET之前&#xff0c;先安装一些依赖&#xff0c;运行下面的命令sudo rpm -Uvh https…

cf1552F. Telepanting

cf1552F. Telepanting 题意&#xff1a; 在一个坐标轴上&#xff0c;有n个传送门&#xff0c;格式为&#xff1a;xi,yi,si,可以从xi传送到yi&#xff0c;si表示状态&#xff0c;如果si为0&#xff0c;到位置xi时不会传送&#xff0c;si变为1.如果到达xi时si为1&#xff0c;则…

Educational Codeforces Round 118 (Rated for Div. 2)

A - Long Comparison B - Absent Remainder C - Poisoned Dagger D - MEX Sequences E - Crazy Robot 拿字符串比较 #include <iostream> #include <algorithm> #include <cstring> #include <vector> #include <cmath> #include <stack>…

eShopOnContainers 是一个基于微服务的.NET Core示例框架

找到一个好的示例框架很难&#xff0c;但不是不可能。大多数是小型Todo风格的应用程序&#xff0c;通常基于SimpleCRUD。值得庆幸的是&#xff0c;Microsoft已经为eShopOnContainers创建了一个基于微服务的.NET Core示例应用程序。eShopOnContainers是 .NET Core示例应用框架&a…

P1447 [NOI2010] 能量采集

P1447 [NOI2010] 能量采集 题意&#xff1a; 如果一棵植物与能量汇集机器(坐标为0&#xff0c;0)连接而成的线段上有 k 棵植物&#xff0c;则能量的损失为 2k 1 给你一个n*m的植物园&#xff0c;问能量损失是多少 1<n,m<1e5 题解&#xff1a; 本题所求式子为&#x…

P1232 [NOI2013] 树的计数

…调半天别的东西写错了&#xff0c;心力交瘁。 思路还是不会。。 具体就是二分&#xff0c;没想到&#xff0c;然后再贪心。 一直没整明白一个数它要往别的树走的条件是什么&#xff0c;日后研究。 #include <iostream> #include <cstdio> #include <cstring&…

「拥抱开源, 又见 .NET」系列第三次线下活动简报

「拥抱开源, 又见 .NET」随着 .NET Core的发布和开源&#xff0c;.NET又重新回到人们的视野。自2016年 .NET Core 1.0 发布以来&#xff0c;其强大的生命力让越来越多技术爱好者对她的未来满怀憧憬&#xff0c;越来越多的平台、框架热衷于为.NET Core不断更新的版本提供最有力的…

P3302 SDOI2013森林

P3302 [SDOI2013]森林 题意&#xff1a; 一片森林&#xff0c;有n个节点&#xff0c;m个边&#xff0c;现在有t个操作&#xff0c; Q x y k&#xff1a;Q x y k 查询点 x 到点 y 路径上所有的权值中&#xff0c;第 k 小的权值是多少 L x y 在点 x 和点 y 之间连接一条边。保证…

Codeforces Round #759 (Div. 2, based on Technocup 2022 Elimination Round 3)

感觉E思路明确只用了stl树状数组&#xff0c;F线段树复合修改&#xff0c;为什么都是2400。 A. Life of a Flower B. Array Eversion C. Minimize Distance E. Frequency Queries F. Non-equal Neighbours F : 首先 dp[i][j]sum[dp[i−1]]−dp[i−1][j]dp[i][j] sum[dp[i-1]] …

请给你的短信验证码接口加上SSL双向验证

序言去年年底闲来几天&#xff0c;有位同事专门在网上找一些注册型的app和网站&#xff0c;研究其短信接口是否安全&#xff0c;半天下来找到30来家&#xff0c;一些短信接口由于分析难度原因&#xff0c;没有继续深入&#xff0c;但差不多挖掘到20来个&#xff0c;可以肆意被调…

P2498 [SDOI2012]拯救小云公主

P2498 [SDOI2012]拯救小云公主 题意&#xff1a; 一个row * line的矩形&#xff0c;英雄在左下角(1,1),公主在右上角(row,line),有n个位置是boss。英雄现在要去公主那里&#xff0c;但是要避开boos&#xff0c;英雄决定找一条路径使到距离boss的最短距离最远。雄走的方向是任…

AtCoder Beginner Contest 230

A - AtCoder Quiz 3 B - Triple Metre C - X drawing 暂无 D - Destroyer Takahashi 暂无 贪心好难啊 E - Fraction Floor Sum F - Predilection G - GCD Permutation H - Bullion无 AAA int t;scanf("%d", &t);t t>42;printf("AGC%03d", t);BBB …

在Asp.Net Core中集成Kafka

在我们的业务中&#xff0c;我们通常需要在自己的业务子系统之间相互发送消息&#xff0c;一端去发送消息另一端去消费当前消息&#xff0c;这就涉及到使用消息队列MQ的一些内容&#xff0c;消息队列成熟的框架有多种&#xff0c;这里你可以读这篇文章来了解这些MQ的不同&#…