SpringBoot基于RabbitMQ实现消息可靠性

文章目录

  • 1. ☃️概述
  • 2. ☃️生产者消息确认
    • 2.1 ❄️❄️概述
    • 2.2 ❄️❄️实战
      • ⛷️⛷️⛷️2.2.1 修改配置
      • ⛷️⛷️⛷️2.2.2 定义 Return 回调
      • ⛷️⛷️⛷️2.2.3 定义ConfirmCallback
  • 3. ☃️消息持久化
    • 3.1 ❄️❄️交换机持久化
    • 3.2 ❄️❄️队列持久化
    • 3.3 ❄️❄️消息持久化
  • 4. ☃️消费者消息确认
    • 4.1 ❄️❄️三种确认模式
    • 4.2 ❄️❄️消息失败重试机制
      • ⛷️⛷️⛷️4.2.1 本地重试机制
      • 4.2.2 ⛷️⛷️⛷️失败策略
  • 5. ☃️总结


在这里插入图片描述

1. ☃️概述

消息从发送到消费者接收 会经历的过程如下:

在这里插入图片描述

丢失消息的可能性

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

2. ☃️生产者消息确认

2.1 ❄️❄️概述

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 MQ 过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

image.png
image.png

2.2 ❄️❄️实战

⛷️⛷️⛷️2.2.1 修改配置

spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true

配置说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

⛷️⛷️⛷️2.2.2 定义 Return 回调

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:
修改publisher服务,添加一个:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});}
}

⛷️⛷️⛷️2.2.3 定义ConfirmCallback

ConfirmCallback 可以在发送消息时指定,因为每个业务处理 confirm 成功或失败的逻辑不一定相同。

public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息体String message = "hello, spring amqp!";// 2.全局唯一的消息ID,需要封装到 CorrelationData 中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){// 3.1.ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 4.发送消息rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);// 休眠一会儿,等待ack回执//Thread.sleep(20);
}

3. ☃️消息持久化

生产者确认可以确保消息投递到 RabbitMQ 的队列中,但是消息发送到 RabbitMQ 以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

3.1 ❄️❄️交换机持久化

@Bean
public DirectExchange simpleExchange(){// 三个参数:①交换机名称、②是否持久化、③当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", true, false);
}

事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。

3.2 ❄️❄️队列持久化

@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}

事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。

3.3 ❄️❄️消息持久化

默认情况下,SpringAMQP 交换机 队列 以及发出的任何消息都是持久化的,不用特意指定。

4. ☃️消费者消息确认

RabbitMQ 是 阅后即焚 机制,RabbitMQ 确认消息被消费者消费后会立刻删除。

而 RabbitMQ 是通过 消费者回执 来确认消费者是否成功处理消息的:消费者获取消息后,应该向 RabbitMQ 发送 ACK 回执,表明自己已经处理消息。

设想这样的场景:

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。

4.1 ❄️❄️三种确认模式

而 SpringAMQP 则允许配置三种确认模式:
•manual:手动ack,需要在业务代码结束后,调用api发送ack。
•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack。
•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除(存在丢失消息的风险)。

由此可知:

  • none 模式下,消息投递是不可靠的,可能丢失
  • auto 模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的 auto 即可。

相关配置

spring:rabbitmq:listener:simple:#acknowledge-mode: none # 关闭ack#acknowledge-mode: manual  # 手动ackacknowledge-mode: auto # 自动ack

4.2 ❄️❄️消息失败重试机制

当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:怎么办呢?

⛷️⛷️⛷️4.2.1 本地重试机制

我们可以利用 Spring 的 retry 机制,在消费者出现异常时利用 本地重试,而不是无限制的 requeue 到 mq 队列。
修改 consumer 服务的 application.yml 文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 在重试3次后,SpringAMQP 会抛出异常 AmqpRejectAndDontRequeueException,说明本地重试触发了。
  • 查看 RabbitMQ 控制台,发现消息被删除了,说明最后 SpringAMQP 返回的是ack,mq删除消息了。

结论

  • 开启本地重试时,消息处理过程中抛出异常,不会 requeue 到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring 会返回 ack,消息会被丢弃。

4.2.2 ⛷️⛷️⛷️失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由 Spring 内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecovery 接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
2)定义一个RepublishMessageRecoverer,关联队列和交换机
代码如下

@Configuration
public class ErrorMessageConfig {@Bean	//	处理失败消息的交换机public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Bean	//	处理失败消息的队列public Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

其实 我们在生产中会指定死信交换机来处理失败的消息


5. ☃️总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理


在这里插入图片描述



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/824224.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

进程、线程和协程

进程、线程和协程 进程是程序的执行实例 线程是进程的执行路径 协程是基于线程之上但又比线程更加轻量级的存在 进程与线程的区别 线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位 进程和程序的区别 程序:执行特定任务的一串代码&a…

牛客Linux高并发服务器开发学习第二天

Gcc编译 利用gcc 生成应用时如果不加-o 和应用名,默认生成a.out 可以用./ a.out打开 Gcc工作流程 可执行程序Windows系统中为.exe Linux系统中为.out g也可以编辑c程序 gcc也可以编译cpp代码,只是在编译阶段gcc不能自动共和C程序使用的库进行联接&…

JS-43-Node.js02-安装Node.js和npm

Node.js是一个基于Chrome V8引擎的JavaScript运行时环境,可以让JavaScript实现后端开发,所以,首先在本机安装Node.js环境。 一、安装Node.js 官网:下载 Node.js 默认两个版本的下载: 64位windows系统的LTS(Long Tim…

CST电磁仿真物体表面的Sheet结构和生成3D Model【基础教程】

由Sheet结构生成3D Model 使用Shell Solid and Thicken Sheet! Modeling > Tools > Shape Tools > Shell Solid or Thicken Sheet Shell Solidor ThickenSheet会根据不同类型的模型提供两种完全不同的功能。 如033.由3D Model生成Cavity 所述&#xff…

飞行机器人专栏(十四)-- Kinect DK 人体骨骼点运动提取方法

系列文章目录 Ubuntu 18.04/20.04 CV环境配置(下)--手势识别TRTposeKinect DK人体骨骼识别_ubuntu kinect骨骼测试-CSDN博客文章浏览阅读1.3k次。trt_pose_ros kinect实现手势识别和人体骨骼识别,用于机器人运动控制参考_ubuntu kinect骨骼测…

Postgresql源码(126)TupleStore使用场景与原理分析

相关 《Postgresql源码(125)游标恢复执行的原理分析》 《Postgresql游标使用介绍(cursor)》 总结 开源PG中使用tuple store来缓存tuple集,默认使用work_mem空间存放,超过可以落盘。在PL的returns setof场景…

Pascal VOC(VOC 2012、VOC 2007) 数据集的简介

一、数据集介绍 PascalVOC(2005~2012)数据集是PASCAL VOC挑战官方使用的数据集。该数据集包含20类的物体。每张图片都有标注,标注的物体包括人、动物(如猫、狗、岛等)、交通工具(如车、船飞机等)、家具(如椅…

Redux极客园项目初始化搭建

基本结构搭建 实现步骤 在 Login/index.js 中创建登录页面基本结构在 Login 目录中创建 index.scss 文件,指定组件样式将 logo.png 和 login.png 拷贝到 assets 目录中 代码实现 pages/Login/index.js import ./index.scss import { Card, Form, Input, Button }…

【LLM】认识LLM

文章目录 1.LLM1.1 LLM简介1.2 LLM发展1.3 市面常见的LLM1.4 LLM涌现的能力 2.RAG2.1 RAG简介2.2 RAG 的工作流程2.3 RAG 和 Finetune 对比2.4 RAG的使用场景分析 3. LangChain3.1 LangChain简介3.2 LangChain的核心组件3.3 LangChain 入门 4.开发 RAG 应用的整体流程5. 环境配…

GPT状态和原理 - 解密OpenAI模型训练

目录 1 如何训练 GPT 助手 1.1 第一阶段 Pretraining 预训练 1.2 第二阶段:Supervised Finetuning有监督微调 1.3 第三阶段 Reward Modeling 奖励建模 1.4 第四阶段 Reinforcement Learning 强化学习 1.5 总结 2 第二部分:如何有效的应用在您的应…

原生实现ajax

1 什么是ajax AJAX Asynchronous JavaScript and XML(异步的 JavaScript 和 XML)。 AJAX 不是新的编程语言,而是一种使用现有标准的新方法。 AJAX 最大的优点是在不重新加载整个页面的情况下,可以与服务器交换数据并更新部分网…

unity学习(85)——同步节奏(tcp架构确实有问题)

挂的时间长了,就出现其他下线本地不destroy的情况了,而且此时再登录,新渲染中已经没有已经下线的玩家 unity这边就没有收到126!!!125的问题是多种多样的!!! 化简服务器w…

设计循环队列(队列oj)

1.设计循环队列 设计你的循环队列实现。 循环队列是一种线性数据结构,其操作表现基于 FIFO(先进先出)原则并且队尾被连接在队首之后以形成一个循环。它也被称为“环形缓冲器”。 循环队列的一个好处是我们可以利用这个队列之前用过的空间。…

【笔记】vscode debug进入site-packages包源码

选择左侧栏第三个图标,点击创建 launch.json 文件 选择 Python Debugger 选择Python文件 这里可以看到launch.json 文件 在configurations中添加键值对 "justMyCode": false在文件中打上断点,点击"三角符"号开始调试 按F11或者红框…

reportlab 生成pdf文件 (python)

1 安装 pip install reportlab2 应用场景 通过网页动态生成PDF文档大量的报告和数据发布用XML一步生成PDF 官网案例 3 PLATYPUS Platypus是“Page Layout and Typography Using Scripts”,是使用脚本的页面布局和印刷术的缩写,这是一个高层次页面布局…

【面试题】MySQL 事务的四大特性说一下?

事务是一个或多个 SQL 语句组成的一个执行单元,这些 SQL 语句要么全部执行成功,要么全部不执行,不会出现部分执行的情况。事务是数据库管理系统执行过程中的一个逻辑单位,由一个有限的数据库操作序列构成。 事务的主要作用是保证数…

记一次kafkakerberos认证问题

1,报错信息 排查思路:检查kerberos配置文件 kerberos.kafka.principalkafka/huawe_baseSECURITY.COM kerberos.kafka.keytabPath/etc/huawe_base.keytab kerberos.kafka.krb5ConfPath/etc/krb5.conf但是查看kafka_client_jass.conf文件,发现…

网络基础-TCP/IP和OSI协议模型

一、OSI和TCP/IP模型 二、OSI七层模型 三、TCP/IP模型 参考:https://www.cnblogs.com/f-ck-need-u/p/7623252.html

关联规则挖掘(二)

目录 三、FP-增长算法(一)算法的背景(二)构造FP-树(三)生成频繁项集 四、关联规则的评价(一)支持度和置信度的不足(二)相关性分析 三、FP-增长算法 &#xf…

.NET 发布,部署和运行应用程序

.NET应用发布 发布.Net应用有很多种方式,下面列举三种发布方式: 单文件发布跨平台发布Docker发布 单文件发布 右键工程,选择“发布”,部署模式选择“独立”,目标运行时选择自己想要部署到的系统,我这里用…