rabbitmq载在.net中批量消费的问题记录

背景

最近遇到了一个问题,在使用rabbitmq的时候出现了丢消息、消息重复消费等一系列的问题,使用的是.net框架,背景是高并发压力下的mq消费,按理说即使队列中堆了几百条消息,我客户端可以同处理5个消息。

原因是多线程同时处理时导致的内存混乱。

官方文档已经解释的很全面了:https://www.rabbitmq.com/dotnet-api-guide.html

一个简易的单线程消费者

注意如下代码,这只是一个简易的单线程同步的消费者;
每次消费1条消息,消息消费完进行手动ack;

Task.Run(() =>{AutoResetEvent autoResetEvent = new AutoResetEvent(false);ConnectionFactory factory = new ConnectionFactory();// "guest"/"guest" by default, limited to localhost connectionsfactory.UserName = user;factory.Password = pass;factory.VirtualHost = vhost;factory.HostName = hostName;// this name will be shared by all connections instantiated by// this factoryfactory.ClientProvidedName = "app:audit component:event-consumer";IConnection conn = factory.CreateConnection();using (IModel channel = conn .CreateModel()){channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);channel.QueueDeclare(queueName, false, false, false, null);channel.QueueBind(queueName, exchangeName, routingKey, null);consumer.Received += (ch, ea) =>{var body = ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(queue: "my-queue",autoAck: false,consumer: consumer);}ConsoleUtil.WriteLine("mq started");autoResetEvent.WaitOne();ConsoleUtil.WriteLine("mq shutdown");}
});

批量消费

好的,那么我现在想要同时消费5条消息,想要达到并行的效果,需要如何改代码呢?看下面的改动:

改动1:

先看两个概念

  1. prefetchCount(预取计数):

    • prefetchCount 是一个用来限制每个消费者一次性从队列中获取的消息数量的参数。
    • 当你有多个消费者同时连接到同一个队列时,RabbitMQ 可以将消息均匀地分发给这些消费者。
    • 通过设置 prefetchCount,你可以告诉 RabbitMQ 每个消费者一次最多获取多少条消息。
    • 这个参数的目的是确保消息在被消费者处理之前不会全部放到内存中,从而提高系统的稳定性和性能。它有助于避免 一个消费者获取了太多消息而导致其他消费者无法获取任何消息的情况。
  2. concurrentConsumers(并发消费者):

    • concurrentConsumers 是指在同一队列上允许多少个并发消费者。
    • 每个并发消费者都会独立地处理消息,这有助于提高系统的处理能力和吞吐量。
    • 通过增加 concurrentConsumers 数量,你可以增加并发处理消息的能力。
    • 注意,这个参数不同于 prefetchCount,它控制的是同时运行的消费者的数量,而不是单个消费者一次性获取的消息数量。
// 参数1:prefetchSize:可接收消息的大小,如果设置为0,那么表示对消息本身的大小不限制
// 参数2:prefetchCount:处理消息最大的数量。相当于消费者能一次接受的队列大小
// 参数3:global:是不是针对整个 Connection 的,因为一个 Connection 可以有多个 Channel
// global=false:针对的是这个 Channel
// global=ture: 针对的是这个 Connection
channel.BasicQos(0, 5, false);
factory.ConsumerDispatchConcurrency = 5;

好的,这时候我配置了同时处理5条消息,看起来没问题了,但是官网文档有这样一句话:

IModel instance usage by more than one thread simultaneously should be avoided. Application code should maintain a clear notion of thread ownership for IModel instances.

This is a hard requirement for publishers: sharing a channel (an IModel instance) for concurrent publishing will lead to incorrect frame interleaving at the protocol level. Channel instances must not be shared by threads that publish on them.

If more than one thread needs to access a particular IModel instances, the application should enforce mutual exclusion. One way of achieving this is for all users of an IModel to lock the instance itself:

大概意思就是应该避免多个线程同时使用IModel实例,也就是channel对象,如果这么做的后果就是高负载情况下导致内存混乱,有可能你的线程1消费到了线程5本该消费的消息,这听起来后果是很严重的,那么我们应该怎么改动呢?官网也给方案了,就是给channel对象加锁,看下面的代码改动:

改动2

consumer.Received += (ch, ea) =>{	var body = ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...lock (channel){channel.BasicAck(ea.DeliveryTag, false);}
};lock (channel){channel.BasicConsume(queue: "my-queue",autoAck: false,consumer: consumer);
}

异步支持

新增一个配置:

factory.DispatchConsumersAsync = true;

然后修改消费者:

var consumer = new AsyncEventingBasicConsumer(channel);consumer.Received += async (model, ea) =>
{await Task.Run(() =>{var body = ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...lock (channel){channel.BasicAck(ea.DeliveryTag, false);}});
};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/69211.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

0010Java程序设计-springboot+vue影院售票系统设计与实现

摘 要目 录系统实现开发环境 摘 要 看电影已经成为了人们生活中不可缺少的一部分,电影院售票及管理系统是电影院的日常管理及售票任务的核心, 在电影院中, 工作人员并非只是放映电影, 还有诸如票房统计、影片放映、影片场次安排、…

离线数仓同步数据2

业务数据_全量表数据同步 1 gen_import_config.py脚本2 gen_import_config.sh脚本3 全量表数据同步脚本 2.2.5.4 DataX配置文件生成脚本 方便起见,此处提供了DataX配置文件批量生成脚本,脚本内容及使用方式如下。 1 gen_import_config.py脚本 1&#xf…

PE文件格式详解

摘要 本文描述了Windows系统的PE文件格式。 PE文件格式简介 PE(Portable Executable)文件格式是一种Windows操作系统下的可执行文件格式。PE文件格式是由Microsoft基于COFF(Common Object File Format)格式所定义的&#xff0c…

交换机和路由器的区别?

分析&回答 工作层次不同: 交换机主要工作在数据链路层(第二层)路由器工作在网络层(第三层) 转发依据不同: 交换机转发所依据的对象时:MAC地址。(物理地址)路由转…

淘宝数据库,主键如何设计的?

聊一个实际问题:淘宝的数据库,主键是如何设计的? 某些错的离谱的答案还在网上年复一年的流传着,甚至还成为了所谓的 MySQL 军规。其中,一个最明显的错误就是关于MySQL 的主键设计。 大部分人的回答如此自信&#xff…

论文阅读_大模型_ToolLLM

英文名称: ToolLLM: Facilitating Large Language Models to Master 16000 Real-world APIs 中文名称: TOOLLLM:帮助大语言模型掌握16000多个真实世界的API 文章: http://arxiv.org/abs/2307.16789 代码: https://github.com/OpenBMB/ToolBench 作者: Yujia Qin 日期…

保姆级 C++ 学习路线

上周有小伙伴留言求安排一手C/C学习路线,这周一份保姆级的C语言安排上! 以前就写过C语言的学习路线:可能是北半球最好的零基础C语言学习路线,这次把C的学习路线也安排上,专门花了一个多月写了这篇学习路线,…

桥接模式:连接抽象与实现

欢迎来到设计模式系列的第八篇文章!在之前的几篇文章中,我们已经学习了许多常见的设计模式,今天我们将继续探讨另一个重要的设计模式——桥接模式。 桥接模式简介 桥接模式是一种结构型设计模式,它主要用于将抽象部分与实现部分…

[Linux]编写一个极简版的shell(版本1)

[Linux]编写一个极简版的shell-version1 文章目录 [Linux]编写一个极简版的shell-version1命令行提示符打印接收命令行参数将命令行参数进行解释执行用户命令完整代码 本文能够帮助Linux系统学习者通过代码的角度更好地理解命令行解释器的实现原理。 命令行提示符打印 Linux操…

ARP欺骗原理和防护

ARP是什么? ARP是在局域网中根据上层协议的IP查找它的的Mac地址的网络层协议。 ARP欺骗原理 如果主机A要和主机B通信,它首先要检查自己的ARP缓存表,查看其中是否有和主机B对应的Mac地址,如果没有,则需要发送广播寻找主…

常用命令之mysql命令之show命令

一、mysql show命令简介 mysql数据库中show命令是一个非常实用的命令,SHOW命令用于显示MySQL数据库中的信息。它可以用于显示数据库、表、列、索引和用户等各种对象的信息。我们常用的有show databases,show tables,show full processlist等&…

SpringMVC常用注解、参数传递及页面跳转

一.SpringMVC常用注解 1.1.RequestMapping RequestMapping注解是一个用来处理请求地址映射的注解,可用于映射一个请求或一个方法,可以用在类或方法上。 标注在方法上运行代码 用于方法上,表示在类的父路径下追加方法上注解中的地址将会访…

无涯教程-JavaScript - NORMDIST函数

NORMDIST函数替代Excel 2010中的NORM.DIST函数。 描述 该函数返回指定均值和标准差的正态分布。此功能在统计中有非常广泛的应用,包括假设检验。 语法 NORMDIST(x,mean,standard_dev,cumulative)争论 Argument描述Required/OptionalXThe value for which you want the dis…

大数据课程K19——Spark的电影推荐案例推荐系统的冷启动问题

文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 掌握Spark的案例——电影推荐; ⚪ 掌握Spark的模型存储; ⚪ 掌握Spark的模型加载; ⚪ 掌握Spark的推荐系统的冷启动问题; 一、案例——电影推荐 1. 基于用户的推荐 1. 说明 我们现…

Redis——认识Redis

简单介绍 Redis诞生于2009年,全称是Remote Dictionary Server,远程词典服务器,是一个基于内存的键值型NoSQL数据库。 特征 键值(Key-value)型,value支持多种不同数据结构,功能丰富单线程&…

《C++ Primer》第2章 变量(一)

参考资料: 《C Primer》第5版《C Primer 习题集》第5版 2.1 基本内置类型(P30) C 定义的基本类型包括算术类型(arithmetic type)和空类型(void),其中算术类型包括字符、整型、布尔…

菜鸟教程《Python 3 教程》笔记(17):输入和输出

菜鸟教程《Python 3 教程》笔记(17) 17 输入和输出17.1 读取键盘输入17.2 读和写文件17.3 文件对象的方法17.3.1 read()、readline()、readlines() 17.3.2 tell()17.3.3 seek()17.4 pickle 模块(没看懂) 笔记带有个人侧重点&#…

LeetCode 面试题 03.01. 三合一

文章目录 一、题目二、C# 题解 一、题目 三合一。描述如何只用一个数组来实现三个栈。 你应该实现push(stackNum, value)、pop(stackNum)、isEmpty(stackNum)、peek(stackNum)方法。stackNum表示栈下标,value表示压入的值。 构造函数会传入一个stackSize参数&#x…

Linux CentOS7 awk的反转功能

处理文本文件,经常会遇到反向输出的要求。 可用命令rev对待处理的文件或标准输入快速完成。 可用命令tac对文件快速完成反向查看。 而对行中字符串(单词)可借助其他命令达到反向输出的目标。 我们在文章《Linux CentOS7sed的替换及逆转功能》讨论了sed流编辑器对…

学习Bootstrap 5的第五天

目录 图像 图像形状 实例 对齐图像 实例 居中图像 实例 响应式图像 实例 Jumbotron 实例 图像 图像形状 .rounded 类可以用于为图像或任何具有边框的元素添加圆角。这个类适用于Bootstrap的所有版本&#xff0c;并且在最新版本中得到了进一步的增强。 实例 <…