RabbitMQ 常见问题

1. 如何保证消息顺序消费

在RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,保持先进先出的原则,这个由Rabbitmq保证。而不同队列中的消息,RabbitMQ 是无法保证其顺序性。顺序消费主要是指消费者按照队列中消息出队的顺序消费,出现顺序错乱的场景主要有两种:

  • 一个queue存在多个consumer去消费,这样就可能会造成顺序的错乱。虽然consumer从MQ里面读取消息是顺序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,如果先消费的处理时间很长,后面消费的处理时间很快,这样就会出现先出队的消息要晚于后出队的消息执行,从而造成消息顺序错乱。
  • 一个quque上只有一个consumer去消费,但是这个consumer是多线程异步处理,因此也不能保证这个consumer按顺序消费;

解决方案:

  1. 单一队列顺序消费:将所有需要按顺序处理的消息发送到同一个队列中,然后只使用一个消费者单线程去处理消费队列中的消息。这样可以确保消息按照发送的顺序被消费,只是会牺牲消息消费的吞吐量。
  2. 设置消息序号标识:根据队列中消息的顺序设置一个递增的序号,当消费者消费处理时判断当前消息的序号的前一位是否已存在消息的处理记录表中,不存在说明前面一个消息还未处理完,然后不断轮询记录表中的前一个序号的记录,直到查询到前一个序号的记录之后再执行当前消息的处理逻辑,执行完成后再将当前消息的序号信息存储到记录表中。
2. 如何实现消息延时消费

有些场景需要保证队列中的消息不能立即被消费,需要延长一段时间后再进行消费。正常来说,消息到达队列后并且队列没有消息堆积的情况,只要有消费者监听了这个队列并且消费者是正常运行的,那么消息就会立刻被消费掉。实现这种延时消费的方案有两种:

  1. 延时队列:RabbitMQ 本身不支持延时队列,但是我们可以通过 RabbitMQ 的插件 rabbitmq-delayed-message-exchange来发生延迟消息。
  2. 死信队列 + 消息过期:创建一个队列并设置好对应的死信队列,该队列不设置任何消费者。先将消息放到这个队列中,并设置消息过期时间,当消息过期后,就会转存到死信队列中,然后消费者只要监听死信队列就好。
3. 如何解决消息堆积问题

消息的堆积是指在消息队列中,当生产者以较快的速度发送消息,而消费者处理消息的速度较慢或者消费者挂了的时候,导致消息在队列中积累并达到队列的存储上限。解决方案有两种:

  1. 增加更多消费者,提高消费速度:通过水平扩展消费者的数量,提高消息的处理速度,从而减少消息在队列中的滞留时间。
  2. 扩大队列容积,提高堆积上限:如果要提升队列容积,只把消息保存在内存中显然是不行的。可以使用惰性队列,惰性队列接收到消息后直接存入磁盘而非内存,消费者要消费消息时才会从磁盘中读取并加载到内存。
4. 如何保证消息的可靠性
4.1 生产者丢失消息:生产者发送消息由于网络等原因并没有发送到RabbitMq,或者消息发送到RabbitMq,但是没有找到指定的交换机或者没有匹配到对应的消息队列时,这时就会造成消息的丢失。

解决方案:

  • 消息确认机制:在生产者创建的channel上开启确认模式,并在channel上添加监听,通过回调channel.addConfirmListener()函数来创建一个ConfirmListener。如果Broker收到消息,则会给生产者返回一个应答结果,ConfirmListener监听到broker的应答结果,根据具体的应答结果对消息进行重新发送,或记录日志等后续处理。
  • 消息返回机制:对于一些不可达的消息,broker会返回一个信号通知生产端,如果消息可达,则不会返回任何信号。通过在channel上添加channel.addReturnListener()函数来创建一个ReturnListener,用于监听不可达的消息,然后进行后续的处理。
4.2 broker消息中间件自身丢失消息:RabbitMq收到生产者的消息后还没有来得及持久化到磁盘,又或者创建队列没有持久化以及消息并没有设置为持久化,在Mq故障宕机后都会有消息丢失的情况。

解决方案:

  • 做持久化设置:在创建队列以及发送消息的时候同时设置队列持久化和消息持久化。
4.3 consumer消费者丢失消息:消费者自动ack配置情况下,业务代码异常或者其他故障消息并没有处理完成也会自动ack,从而导致消息丢失。

解决方案:

  • 设置手动确认:消费者在声明队列时,设置autoAck=false,RabbitMQ就会等待消费者显式返回ack信号后才从队列中移去消息。消费者成功处理消息后,手动调用void basicAck(long deliveryTag, boolean multiple)方法通知broker该消息已经消费成功。消息处理失败后,通过回调void basicNack(long deliveryTag, boolean multiple, boolean requeue)方法或者void basicReject(long deliveryTag, boolean requeue)方法通知broker该消息消费失败。
4.4 消费者手动确认过程中异常:消费端在消费消息过程中出现异常,还没能调用basicAck应答,此时消息将变成unacked状态,并且一直处于队列中。

解决方案:

  • 进行异常捕获:对消费者业务处理逻辑进行异常捕获,然后在catch里面调用basicNack方法返回消费失败的ack信息给broker。再将消费失败的消息记录到日志或者数据库中,后续人工进行处理。
4.5 无效消息循环重入队列:如果消费端在消费消息过程中捕获异常,并进行basicNack应答,将消息重新放入队列中,可能会出现无效的消息循环入队列的问题。假设消息或者代码本身有bug,每次处理这个消息都会报异常,那消息将一直处于消费——>报异常——>重入队列——>继续消费——>报异常。。。的死循环过程。

解决方案:

  • 禁止消息重入队:当捕获到异常的时候,调用basicNack方法时,通过设置requeue参数为false,消息不会重入队消费。再将消费失败的消息记录到日志或者数据库中,后续人工进行处理。
4.6 broker未接收到ack信息:如果消费端消费完后,调用basicAck应答过程中由于网络中断等原因,未能将ack信息发送到broker时,会导致队列中的消息一直处于unacked状态。

解决方案:

  • 在声明队列的时候添加死信队列,当broker一直未收到消费者返回的ack确认信息的消息,那么RabbitMQ消息队列就不清楚这条消息到底有没有被消费成功,就会将这条消息存放到死信队列中。后续再由人工去进行处理。
5. 如何防止消息重复消费

如果消息已经被消费者消费了,只是在消费者返回ack确认信息的时候出现异常,导致ack信息未能发送到broker,从而导致RabbitMQ不清楚这条消息到底有没有被消费成功,消息队列可能会在消费者未确认消息时进行重试,导致消息被重复消费。
解决方案:

  • 消费端在消费逻辑处理中做好幂等性,保证同一个消息不被消费者重复消费两次。实现消息幂等性的方案:生产者每次发送消息的时候会生成一个全局唯一的id放到信息中,每次消费消息之前根据这个全局id去查询db或者redis中是否存在该id的消息信息,如果有,则说明该消息已经消费过,直接返回不再做后续处理;如果没有,则说明该消息未被消费过,继续进行后续业务处理,处理成功之后再将该全局id插入到bd或者redis中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/579639.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UDP攻击是什么?遇到UDP攻击怎么办

UDP攻击,也称为UDP洪水攻击,是一种拒绝服务(DoS)或分布式拒绝服务(DDoS)攻击的形式。在此类攻击中,攻击者会发送大量的UDP流量到目标网络或服务器,以消耗其网络带宽或系统资源。由于…

爬虫工作量由小到大的思维转变---<第二十八章 Scrapy中间件说明书>

爬虫工作量由小到大的思维转变---<第二十六章 Scrapy通一通中间件的问题>-CSDN博客 前言: (书接上面链接)自定义中间件玩不明白? 好吧,写个翻译的文档点笔记,让中间件更通俗一点!!! 正文: 全局图: 爬虫中间件--->翻译笔记: from scrapy import s…

Scikit-Learn线性回归(二)

Scikit-Learn线性回归二:多项式回归 1、多项式回归2、多项式回归的原理3、Scikit-Learn多项式回归3.1、Scikit-Learn多项式回归API1、多项式回归 本文接上篇:Scikit-Learn线性回归(一) 上篇中,我们详细介绍了线性回归的概念、原理和推导,以及通过由浅入深的案例,详解了Sc…

淘宝/天猫商品API:实时数据获取与安全隐私保护的指南

一、引言 随着电子商务的快速发展,淘宝/天猫等电商平台已成为商家和消费者的重要交易场所。对于电商企业而言,实时掌握店铺商品的销售情况、库存状态等信息至关重要。然而,手动管理和更新商品信息既费时又费力。因此,淘宝/天猫提…

随笔笔记-2023

随笔 computed 是基于他们的依赖进行缓存的,。如果要随时计算 new Date().now(因为不是响应式的),那么需要用 computed。 如果不希望用缓存那么就用 methods 字符与字节 1 字节8 位1B8 bit;1KB 1024B,1MB1024KB1024*1024B 编码:…

面试题之二HTTP和RPC的区别?

面试题之二 HTTP和RPC的区别? Ask范围:分布式和微服务 难度指数:4星 考察频率:70-80% 开发年限:3年左右 从三个方面来回答该问题: 一.功能特性 1)HTTP是属于应用层的协议:超文本传输协议…

【SpringCloud】-OpenFeign实战及源码解析、与Ribbon结合

一、背景介绍 二、正文 OpenFeign是什么? OpenFeign(简称Feign)是一个声明式的Web服务客户端,用于简化服务之间的HTTP通信。与Nacos和Ribbon等组件协同,以支持在微服务体系结构中方便地进行服务间的通信&#xff1b…

Linux账号与权限管理

目录 一、账号管理 1. 用户账号 1.1 概述 1.2 用户账号类型 1.3 用户账号区分 2. 组账号 2.1 概述 2.2 区分 3. 用户账号管理 3.1 重要文件存放位置 3.2 基本信息 3.3 添加用户 3.3.1 通式 3.3.2 选项示例 3.4 密码管理 3.4.1 通式 3.4.2 选项示例 3.4.3 免交…

macos Apple开发证书 应用签名p12证书 获取生成方法 codesign 证书获取

在开发macos应用的时候必须要对自己开发的应用进行签名才能使用, 下面介绍个人如何获取Apple开发签名证书. 必备条件, 你需要先安装 xcode , 注册一个苹果开发者账号 免费的就可以, 以下为获取流程 You need to create a cert through xcode. Additionally, you need to have…

深信服技术认证“SCCA-C”划重点:云计算基础

为帮助大家更加系统化地学习云计算知识,高效通过云计算工程师认证,深信服特推出“SCCA-C认证备考秘笈”,共十期内容。“考试重点”内容框架,帮助大家快速get重点知识。 划重点来啦 *点击图片放大展示 深信服云计算认证&#xff08…

[Angular] 笔记 10:服务与依赖注入

什么是 Services & Dependency Injection? chatgpt 回答: 在 Angular 中,Services 是用来提供特定功能或执行特定任务的可重用代码块。它们可以用于处理数据、执行 HTTP 请求、管理应用程序状态等。Dependency Injection(依赖注入&#…

浅谈互联网架构演变

更好的阅读体验 \large{\color{red}{更好的阅读体验}} 更好的阅读体验 前言 可以将某个项目或产品的架构体系按照如下方式分层: 业务层面:项目业务体系技术层面: 数据架构:数据持久层策略应用架构:应用层的实现方式 …

云原生Kubernetes系列 | Kubernetes Secret及ConfigMap

云原生Kubernetes系列 | Kubernetes Secret及Configmap 1. Secret及Configmap使用背景简介2. Secret2.1. Secret创建方式2.1.1. 命令行方式2.1.2. 文件方式2.1.3. 变量方式2.1.4. YAML文件方式2.2. Secret使用方式2.2.1. 用于传递配置文件2.2.3. 用于传递变量3. ConfigMap1. Se…

CPU亲和性和NUMA架构

何为CPU的亲和性 CPU的亲和性,进程要在某个给定的 CPU 上尽量长时间地运行而不被迁移到其他处理器的倾向性,进程迁移的频率小就意味着产生的负载小。亲和性一词是从affinity翻译来的,实际可以称为CPU绑定。 在多核运行的机器上,…

AlignBench:量身打造的中文大语言模型对齐评测

对齐(Alignment),是指大语言模型(LLM)与人类意图的一致性。换言之,就是让LLM生成的结果更加符合人类的预期,包括遵循人类的指令,理解人类的意图,进而能产生有帮助的回答等…

SadTalker数字人增加视频输出mp4质量精度

最近在用数字人简易方案,看到了sadtalker虽然效果差,但是可以作为一个快速方案,没有安装sd的版本,随便找了个一键安装包 设置如上 使用倒是非常简单,但是出现一个问题,就是输出的mp4都出马赛克了 界面上却…

Servlet见解2

4 创建servlet的三种方式 4.1 实现Servlet接口的方式 import javax.servlet.*; import javax.servlet.annotation.WebServlet; import java.io.IOException;WebServlet("/test1") public class Servlet1 implements Servlet {Overridepublic void init(ServletConf…

怎么实现Servlet的自动加载

在实际开发时,有时候会希望某些Servlet程序可以在Tomcat启动时随即启动。但在默认情况下,第一次访问servlet的时候,才创建servlet对象。 如果servlet构造函数里面的代码或者init方法里面的代码比较多,就会导致用户第一次访问serv…

制作一个可以离线安装的Visual Studio安装包

须知 前提条件,需要电脑可以正常上网且网速还行,硬盘可以空间容量足够大,怎么判断容量够用?由组件数量的多少来决定。Visual Studio 频道和发布节奏 https://learn.microsoft.com/zh-cn/visualstudio/productinfo/release-rhythm…

AutoSAR(基础入门篇)3.2-Autosar中RTE的Ports【S/R】与【C/S】

目录 一、RTE的Ports【S/R】 1、特征 1.1、扮演SWCs和BSW的交流途径 1.2、其他特征