说说 RabbiMQ 的应答模式

RabbiMQ 我们都很熟悉了,是很常用的一个开源消息队列。搞懂 RabbiMQ 的应答模式对我们排查错误很有帮助,也能避免一些坑。本文说说 RabbiMQ 的应答模式。

生产者发出一条消息给 RabbiMQ ,RabbiMQ 将消息推送给消费者,消费者处理完消息后告诉 RabbiMQ,我已经接收到消息并处理了,RabbiMQ 收到通知后会将消息从队列中删除。消费者通知 MQ 的这个过程就是消息的应答。在 RabbiMQ 中有两种应答模式:自动应答和手动应答。

版本

  • dotNET Core :3.1

  • RabbitMQ:3.8.2

  • RabbitMQ.Client:6.2.1

自动应答

当 RabbiMQ 开启了消息的自动应答,一旦 RabbiMQ 将消息分发给了消费者,就会将消息从内存中删除。这种情况下,如果正在执行的消费者挂掉,就会丢失正在处理的消息。

生产者代码

static void Main(string[] args)
{ConnectionFactory factory = new ConnectionFactory{UserName = "oec2003",Password = "000000",HostName = "10.211.55.6"};using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){Console.WriteLine("RabbitMQ连接成功,请输入消息,输入exit退出");channel.QueueDeclare("oec2003", false, false, false, null);string input;do{input = Console.ReadLine();var body = Encoding.UTF8.GetBytes(input);channel.BasicPublish("", "oec2003", null, body);}while (input.Trim().ToLower() != "exit");}
}

消费者代码

static void Main(string[] args)
{ConnectionFactory factory = new ConnectionFactory{UserName = "oec2003",Password = "000000",HostName = "10.211.55.6"};using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){Console.WriteLine("消费者开始监听......");channel.QueueDeclare("oec2003", false, false, false, null);EventingBasicConsumer consumer = new EventingBasicConsumer(channel);consumer.Received += (ch, ea) =>{string message = Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}:收到消息: {message}");System.Threading.Thread.Sleep(10000);};channel.BasicConsume("oec2003", true, consumer);Console.ReadKey();}
}
  • channel.BasicConsume 方法的第二个参数设置为 true 表示自动应答;

  • 开启自动应答后,消息是生产者发布后,当有消费者连接上后,所有的消息都会被自动确认,并且从内存中删除,这时如果消费者进程挂掉,没有处理的消息会丢失,正在处理中的消息也不会被重新投递;

  • 自动应答的好处是消息队列不会处于堵塞状态,但代价有点大,生产环境中还是不建议使用。

手动应答

手动应答,当消费者接收到消息处理完后,需要发送一个回执,告诉 RabbiMQ 服务端,这时 RabbiMQ 才会将该消息删除。

生产者的代码和上面的一样,消费者代码需要做相关调整,如下:

static void Main(string[] args)
{ConnectionFactory factory = new ConnectionFactory{UserName = "oec2003",Password = "000000",HostName = "10.211.55.6"};using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){Console.WriteLine("消费者开始监听......");EventingBasicConsumer consumer = new EventingBasicConsumer(channel);consumer.Received += (ch, ea) =>{string message = Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}:收到消息: {message}");channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume("oec2003", false, consumer);Console.ReadKey();}
}
  • channel.BasicConsume 方法的第二个参数设置为 false ,表示手动应答模式;

  • 在处理完消息后调用 channel.BasicAck(ea.DeliveryTag, false); 来进行应答,告诉 RabbiMQ 消息已经收到,RabbiMQ 收到这个回执后,才会删除消息。

可能遇到的问题

流量控制问题

在手动模式下,生产者发送消息后消息会从 Ready 进入到 Unacked 中,当消费者进行应答之后消息从 Unacked 中删除。

如果消息的产生速度远远大于消费者的处理速度,这时消息就会都在消费者处进行积压了。我们会看到 Unacked 中的数量会越来越大,这样消费者的压力就会越来越大,这时就需要使用 Qos 来进行限流。

Qos

在消费者中使用 channel.BasicQos(0, 2, false); 来进行 Qos 的设置,如下图:

BasicQos 方法有三个参数:

  • prefetchSize:批量获取消息的总大小,0为不限制;

  • prefetchCount:每次处理消息的个数,比如 prefetchCount 设置为 2 ,那么处于 Unacked 状态的消息最多就 2 条,当其中一条进行了得到了应答后,才会从 Ready 中转入一条到 Unacked

  • global:设置为 true 表示对 channel 进行控制,否则对每个消费者进行限制,一个 channel 可以有多个消费者

为什么使用 Qos :

  • 提高服务稳定性,因为有 prefetchCount 参数的控制,不会有海量的数据涌进来导致消费者服务挂掉;

  • 提高吞吐量,当队列有多个消费者时,每个消费者的能力不一样,我们可以通过 prefetchCount 参数来合理安排每个消费者的处理能力,不会出现有的空闲,有的积压。

prefetchCount 是一个非常关键的参数,当消费者处理消息时,出现一些异常情况,导致无法进行 Ack 应答,没有应答的数量大于等于 prefetchCount 时,队列就会发生堵塞。所以我们一定要确保消息的处理能够被异常捕获,并在 finally 中进行 Ack 应答,代码如下:

try
{string message = Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}:收到消息: {message}");if (message == "error"){throw new Exception("mq error");}else if (message == "sleep"){System.Threading.Thread.Sleep(60000);}
}
catch (Exception)
{//处理异常
}
finally
{channel.BasicAck(ea.DeliveryTag, false);
}

一旦队列堵塞了,一种处理方式就是断掉客户端,这样,处在 Unacked 中的消息会重新回到 Ready 中,会重新进行投递进行消费。

总结

1、自动应答模式需要慎用,特别是生产环境;

2、不开启 Qos ,消费者可能会面临很大压力,但消息不会堵塞(测试过 500 个未进行 Ack 没有造成堵塞),现在不确定在没有 Qos 的情况下,有没有默认的最大 prefetchCount ;

3、开启 Qos ,prefetchCount 的值很关键,并且需要做好异常处理,防止堵塞。

希望本文对您有所帮助!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/305785.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

php 输出json utf8,php json_encode utf-8中文问题

以前碰到最多的是json_encode是gbk 编码时出现乱码,今天发现uft8也会出现中文乱码了,下面我们一起看问题如何解决吧。utf-8字符json_encode为变成转成utf16编码,也就是介个样子: 代码如下复制代码$ ./php/bin/php -r echo json_en…

2005-5-29+ 认识httphandler

httphandler是做什么的?用任何一个搜索引擎都可以找到一大堆资料,我就不多说了,把我今天学习的一些认识写在这里,希望可以从另一个侧面让大家认识httphandler。所谓httphandler,其实是.net用来处理页面请求的。大家可以…

php oracle 锁表,ORACLE 用户锁定问题

在开发环境下,由于直接修改了数据库用户的密码,之后一直不能连接,及时执行alter user username account unlock 还是提示用户锁定。刚开始只是怀疑是数据库的问题,就一直在数据库上找问题,其实最终的问题是更改密码后&…

微软2020开源回顾:止不住的挨骂,停不下的贡献

喜欢就关注我们吧!2020年,Linus Torvalds 开启“飚骚话”模式,言语不再激烈。看到“大喷子” Linus 都有机会变慈祥,料想微软近年来主动拥抱开源并示好的行为应该能改变他人的看法。然而事实并非如此,虽然微软积极投身…

下午回家啦~

决定回家,今天买火车票也很顺利,晚上七点多就到家了。老爸老妈,还有很多亲人朋友,俺就要见到你们了,激动、开心!

用matlab实现机械臂的仿真,基于MATLAB的SCARA机械臂仿真与性能评估

工业机器人以其代替人类单调繁重的体力劳动,便于实现自动化提高生产效率等优点,而被广泛应用于工程机械、汽车零部件、轨道交通、轻工造纸等行业,具有可观的经济效益。到2015年,中国机器人市场将成世界最大规模的市场。与此同时,它又是融合了多个学科的复杂机电一体化产品。近年…

每日一题——LeetCode859

方法一 个人方法: 首先s和goal要是长度不一样或者就只有一个字符这两种情况可以直接排除剩下的情况s和goal的长度都是一样的,s的长度为2也是特殊情况,只有s的第一位等于goal的第二位,s的第二位等于goal的第一位才能满足剩下的我们…

4倍速!ML.NET Model Builder GPU 与 CPU 对比测试

当我们使用 Visual Studio 进行机器学习开发时,一般都会推荐安装 ML.NET Model Builder ,这让我们的开发更加可视化,并且按照步骤载入相关的训练集,选择好模型就够了,一切就是如此朴实无华。说到 ML.NET Model Builder…

疯狂的爬网

今天下午13:00~16:00,来自61.152.236.158的访客对博客园进行了疯狂的爬网。访问次数 :45506 频率:13次/秒访问目标地址:主要是个人Blog各个分类的RSS。看来需要对各个分类的RSS进行进一步的性能优化。

php链表和联表的区别,PHP_浅谈PHP链表数据结构(单链表),链表:是一个有序的列表,但 - phpStudy...

浅谈PHP链表数据结构(单链表)链表:是一个有序的列表,但是它在内存中是分散存储的,使用链表可以解决类似约瑟夫问题,排序问题,搜索问题,广义表单向链表,双向链表,环形链表PHP的底层是…

Xamarin.Forms 5.0 来了

Xamarin.Forms 5.0 已经正式发布,并带来其新功能,具体看官方博客https://devblogs.microsoft.com/xamarin/xamarin-forms-5-0-is-here/。在很大程度上,它是关于稳定版本功能的正式发布的,这些功能已在实验版本中获得使用&#xff…

中国的怪现象

在中国,因为各种各样的原因,会冒出一些怪现象。以前我经常与人谈的一个就是“盈利机构在亏损,非盈利机构在赚钱”。我不知道这个算不算是共识,但是,我的确看到一些这样的情况。因为我们经常看到某些应该盈利的国有企业…

PHP秒杀截流原理,节流阀和去抖动的基本实现方法介绍

本篇文章给大家带来的内容是关于节流阀和去抖动的基本实现方法介绍,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。节流阀throttle触发的事件以周期的形式去执行,而非实时。如滴水的水龙头。function throttle (f…

ML.NET 推荐引擎中一类矩阵因子分解的缺陷

ML.NET 作为 .NET 跨平台的机器学习套件,在回归、预测、分类甚至是图像识别、异常检测都展现出简洁快速的优势,以往的文章已介绍过不再赘述。其实机器学习场景中还有一类非常常见的,就是推荐,特别是在线购物、文娱产品为了提升用户…

建立用户、密码类型

每次做用户密码等类型的验证都很麻烦,最近想了个办法来简化判断的逻辑。.Net中提供了隐式转换,虽然降低了代码可读性,但是使用确实很方便。而且我认为,定义自己的类型使用它并不会降低可读性。可以定义用户类型UserName1 usingSys…

tars框架php,TarsPHP: TARS-PHP是针对php使用tars二进制协议,以及tars平台整体运维、RPC等一系列能力的解决方案...

TARS-PHPTARS-PHP是针对php使用tars二进制协议,以及tars平台整体运维、RPC等一系列能力的解决方案。它主要由如下的几个部分组成:如果你想要快速的体验tars-server,请进入examples目录,里面有详尽的三种服务的部署guideline和开发guidelinetars-clientta…

晋升新一线的合肥,跨平台的.NET氛围究竟如何?

大伙可能不知道,2020年合肥已经成功晋升为新一线城市了。本文通过对目前合肥.NET招聘信息以及公众号的相关数据的分析来看下目前合肥.NET的大环境。就着2020中国.NET开发者峰会顺利举行的东风,同时为进一步提升合肥地区的.NET技术氛围,一些合…

软件工程心理学之1----开篇

最近偶然翻了下以前做过的项目或者做过的咨询或者监理的一些记录,文档之类的,又听了和看了一些讲座和相关的资料,觉得应该要重视软件工程中的心理学应用。因为之前自己在大学学过心理学,当时以为没什么用,但现在回顾一…

开源项目barcodelib-C#条形码图像生成库

介绍该库旨在为开发人员提供一个简单的类,以便他们在需要根据数据字符串生成条形码图像时使用。用法该库包含一个名为BarcodeLib的类,其中包含三个构造函数:Barcode(); Barcode(string); Barcode(string, BarcodeLib.TYPE);如果决定使用参数创…

成功测试管理者的九大原则

简介 许多测试管理者是从技术部门进到管理阶层的。尽管他们有可能受过很多测试或软件工程的培训和指导,但他们还是很难经常从失败和错误中学到管理技巧。作为一个管理者,你有两项基本工作:找出为你工作的最好 的员工并且建立一个能够使员工完…