说说 RabbiMQ 的应答模式

RabbiMQ 我们都很熟悉了,是很常用的一个开源消息队列。搞懂 RabbiMQ 的应答模式对我们排查错误很有帮助,也能避免一些坑。本文说说 RabbiMQ 的应答模式。

生产者发出一条消息给 RabbiMQ ,RabbiMQ 将消息推送给消费者,消费者处理完消息后告诉 RabbiMQ,我已经接收到消息并处理了,RabbiMQ 收到通知后会将消息从队列中删除。消费者通知 MQ 的这个过程就是消息的应答。在 RabbiMQ 中有两种应答模式:自动应答和手动应答。

版本

  • dotNET Core :3.1

  • RabbitMQ:3.8.2

  • RabbitMQ.Client:6.2.1

自动应答

当 RabbiMQ 开启了消息的自动应答,一旦 RabbiMQ 将消息分发给了消费者,就会将消息从内存中删除。这种情况下,如果正在执行的消费者挂掉,就会丢失正在处理的消息。

生产者代码

static void Main(string[] args)
{ConnectionFactory factory = new ConnectionFactory{UserName = "oec2003",Password = "000000",HostName = "10.211.55.6"};using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){Console.WriteLine("RabbitMQ连接成功,请输入消息,输入exit退出");channel.QueueDeclare("oec2003", false, false, false, null);string input;do{input = Console.ReadLine();var body = Encoding.UTF8.GetBytes(input);channel.BasicPublish("", "oec2003", null, body);}while (input.Trim().ToLower() != "exit");}
}

消费者代码

static void Main(string[] args)
{ConnectionFactory factory = new ConnectionFactory{UserName = "oec2003",Password = "000000",HostName = "10.211.55.6"};using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){Console.WriteLine("消费者开始监听......");channel.QueueDeclare("oec2003", false, false, false, null);EventingBasicConsumer consumer = new EventingBasicConsumer(channel);consumer.Received += (ch, ea) =>{string message = Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}:收到消息: {message}");System.Threading.Thread.Sleep(10000);};channel.BasicConsume("oec2003", true, consumer);Console.ReadKey();}
}
  • channel.BasicConsume 方法的第二个参数设置为 true 表示自动应答;

  • 开启自动应答后,消息是生产者发布后,当有消费者连接上后,所有的消息都会被自动确认,并且从内存中删除,这时如果消费者进程挂掉,没有处理的消息会丢失,正在处理中的消息也不会被重新投递;

  • 自动应答的好处是消息队列不会处于堵塞状态,但代价有点大,生产环境中还是不建议使用。

手动应答

手动应答,当消费者接收到消息处理完后,需要发送一个回执,告诉 RabbiMQ 服务端,这时 RabbiMQ 才会将该消息删除。

生产者的代码和上面的一样,消费者代码需要做相关调整,如下:

static void Main(string[] args)
{ConnectionFactory factory = new ConnectionFactory{UserName = "oec2003",Password = "000000",HostName = "10.211.55.6"};using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){Console.WriteLine("消费者开始监听......");EventingBasicConsumer consumer = new EventingBasicConsumer(channel);consumer.Received += (ch, ea) =>{string message = Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}:收到消息: {message}");channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume("oec2003", false, consumer);Console.ReadKey();}
}
  • channel.BasicConsume 方法的第二个参数设置为 false ,表示手动应答模式;

  • 在处理完消息后调用 channel.BasicAck(ea.DeliveryTag, false); 来进行应答,告诉 RabbiMQ 消息已经收到,RabbiMQ 收到这个回执后,才会删除消息。

可能遇到的问题

流量控制问题

在手动模式下,生产者发送消息后消息会从 Ready 进入到 Unacked 中,当消费者进行应答之后消息从 Unacked 中删除。

如果消息的产生速度远远大于消费者的处理速度,这时消息就会都在消费者处进行积压了。我们会看到 Unacked 中的数量会越来越大,这样消费者的压力就会越来越大,这时就需要使用 Qos 来进行限流。

Qos

在消费者中使用 channel.BasicQos(0, 2, false); 来进行 Qos 的设置,如下图:

BasicQos 方法有三个参数:

  • prefetchSize:批量获取消息的总大小,0为不限制;

  • prefetchCount:每次处理消息的个数,比如 prefetchCount 设置为 2 ,那么处于 Unacked 状态的消息最多就 2 条,当其中一条进行了得到了应答后,才会从 Ready 中转入一条到 Unacked

  • global:设置为 true 表示对 channel 进行控制,否则对每个消费者进行限制,一个 channel 可以有多个消费者

为什么使用 Qos :

  • 提高服务稳定性,因为有 prefetchCount 参数的控制,不会有海量的数据涌进来导致消费者服务挂掉;

  • 提高吞吐量,当队列有多个消费者时,每个消费者的能力不一样,我们可以通过 prefetchCount 参数来合理安排每个消费者的处理能力,不会出现有的空闲,有的积压。

prefetchCount 是一个非常关键的参数,当消费者处理消息时,出现一些异常情况,导致无法进行 Ack 应答,没有应答的数量大于等于 prefetchCount 时,队列就会发生堵塞。所以我们一定要确保消息的处理能够被异常捕获,并在 finally 中进行 Ack 应答,代码如下:

try
{string message = Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}:收到消息: {message}");if (message == "error"){throw new Exception("mq error");}else if (message == "sleep"){System.Threading.Thread.Sleep(60000);}
}
catch (Exception)
{//处理异常
}
finally
{channel.BasicAck(ea.DeliveryTag, false);
}

一旦队列堵塞了,一种处理方式就是断掉客户端,这样,处在 Unacked 中的消息会重新回到 Ready 中,会重新进行投递进行消费。

总结

1、自动应答模式需要慎用,特别是生产环境;

2、不开启 Qos ,消费者可能会面临很大压力,但消息不会堵塞(测试过 500 个未进行 Ack 没有造成堵塞),现在不确定在没有 Qos 的情况下,有没有默认的最大 prefetchCount ;

3、开启 Qos ,prefetchCount 的值很关键,并且需要做好异常处理,防止堵塞。

希望本文对您有所帮助!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/305785.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微软2020开源回顾:止不住的挨骂,停不下的贡献

喜欢就关注我们吧!2020年,Linus Torvalds 开启“飚骚话”模式,言语不再激烈。看到“大喷子” Linus 都有机会变慈祥,料想微软近年来主动拥抱开源并示好的行为应该能改变他人的看法。然而事实并非如此,虽然微软积极投身…

每日一题——LeetCode859

方法一 个人方法: 首先s和goal要是长度不一样或者就只有一个字符这两种情况可以直接排除剩下的情况s和goal的长度都是一样的,s的长度为2也是特殊情况,只有s的第一位等于goal的第二位,s的第二位等于goal的第一位才能满足剩下的我们…

4倍速!ML.NET Model Builder GPU 与 CPU 对比测试

当我们使用 Visual Studio 进行机器学习开发时,一般都会推荐安装 ML.NET Model Builder ,这让我们的开发更加可视化,并且按照步骤载入相关的训练集,选择好模型就够了,一切就是如此朴实无华。说到 ML.NET Model Builder…

ML.NET 推荐引擎中一类矩阵因子分解的缺陷

ML.NET 作为 .NET 跨平台的机器学习套件,在回归、预测、分类甚至是图像识别、异常检测都展现出简洁快速的优势,以往的文章已介绍过不再赘述。其实机器学习场景中还有一类非常常见的,就是推荐,特别是在线购物、文娱产品为了提升用户…

晋升新一线的合肥,跨平台的.NET氛围究竟如何?

大伙可能不知道,2020年合肥已经成功晋升为新一线城市了。本文通过对目前合肥.NET招聘信息以及公众号的相关数据的分析来看下目前合肥.NET的大环境。就着2020中国.NET开发者峰会顺利举行的东风,同时为进一步提升合肥地区的.NET技术氛围,一些合…

开源项目barcodelib-C#条形码图像生成库

介绍该库旨在为开发人员提供一个简单的类,以便他们在需要根据数据字符串生成条形码图像时使用。用法该库包含一个名为BarcodeLib的类,其中包含三个构造函数:Barcode(); Barcode(string); Barcode(string, BarcodeLib.TYPE);如果决定使用参数创…

ctf php 读取flag,BugkuCTF flag.php(反序列化)

进去后是个登录页面,但是login根本不会跳转,看源码也没提示,但是这道题给了一个提示:hint,那么盲猜应该是一个get参数,因为post不能跳转,那么get总有内容吧,跟上hint参数&#xff0c…

2021技术人新展望

一、前言各位小伙伴们还有几天新的一年即将来临,这篇文章作为今年的结束吧。不知道大家对自己每一年的技术发展规划是什么,我在这里分享一下我2021年的新的规划,这里非常感谢各位小伙伴对我的关注。二、内容概要2021的布局客户端技术分享服务…

在 ASP.NET 中实现会话状态的基础

简介 在 Web 应用程序这样的无状态环境中,了解会话状态的概念并没有实际的意义。尽管如此,有效的状态管理对于大多数 Web 应用程序来说都是一个必备的功能。Microsoft ASP.NET 以及许多其他服务器端编程环境都提供了一个抽象层,允许应用程序基…

【One by One系列】IdentityServer4(三)使用用户名和密码

继续介绍IdentityServer4,我们上篇介绍了IdentityServer4实现OAuth2.0的授权方式之一的客户端凭证,接下来我们继续介绍OAuth2.0的另外一种授权方式密码式,Resource Owner Password Credentials。post请求token?grant_typepassword&usernameUSERNAME…

【One by One系列】IdentityServer4(二)使用Client Credentials保护API资源

书接上回,我们将会正式开始介绍IdentityServer4。IdentityServer4是实现了OAuth2.0OpenId Connect两种协议的优秀第三方库,属于.net生态中的优秀成员。可以很容易集成至ASP.NET Core,颁发token。使用Id4基本步骤如下:**1.**在Startup.Configu…

使用 Avalonia 开发 UOS 原生应用

最近很火的是国产的操作系统 UOS 系统,这个系统现在存在的问题就是生态没搭建起来。作为 dotnet 开发者,可以通过很多不同方向的技术为 UOS 系统生态贡献应用,如 Xamarin 和 GTK# 或 wine 和 win32 应用,或 mono 和 WinForms 应用…

云原生那些顶级开源项目,你都用过哪些?

CNCF,英文全称为Cloud Native Computing Foundation,中文译为“云原生计算基金会”, 构建可持续生态系统,并围绕一系列高质量开源项目促进社区的发展,本文大概介绍下几个主要的项目。???? Kubernetes⭐ Star: 73.5k https://g…

2021年首届.NET线下沙龙上海站 - 2021 .NET Meetup in Shanghai

.NET Conf 2020 刚刚在苏州落下帷幕, .NET 开发者们的热情不减,来自五湖四海的朋友一起参加疫情之下的 .NET 盛会。2021年上海第一场线下活动就要来了,快来加入我们一起学习.NET 吧!活动时间:2021年1月23日 13:00-18:0…

跟我一起学Redis之加个哨兵让主从复制更加高可用

前言主从复制的实现在上一篇已经分享过,虽然主从复制本身的确让读写分离更加高效,但是对于整体高可用存在很大的劣势:当主节点宕机了之后还需要人为重新进行主从关系配置;这不是开玩笑嘛,这样人为干预,故障…

智能实验室-全能优化(Guardio) 4.0.0.670 beta 8

怎样获取最新版本?□全新下载:全能优化(Guardio):http://files.cnblogs.com/unruledboy/Guardio.Release.zip智能实验室- 全能优化(Guardio) 4.0.0.670 beta 8 更新记录: 1.添加:系统加速 之 系统速度&…

如何在 ASP.Net Core 中实现 健康检查

健康检查 常用于判断一个应用程序能否对 request 请求进行响应,ASP.Net Core 2.2 中引入了 健康检查 中间件用于报告应用程序的健康状态。ASP.Net Core 中的 健康检查 落地做法是暴露一个可配置的 Http 端口,你可以使用 健康检查 去做一个最简单的活性检…

【One by One系列】IdentityServer4(四)授权码流程

接下来我们介绍新内容,OAuth2.0叫做授权码(authorization code),在OpenID Connect中则属于OpenId Connect Flow,称为授权码流程(Authorization Code Flow),这种方式主要场景:保密客户端,服务器端的web应用“例如asp.net core mvc,…

Nemuria UML架构图 第3次迭代. 重构

Nemuria UML架构图 第3次迭代. 重构