RabbitMQ-如何保证消息不丢失

RabbitMQ常用于 异步发送,mysql,redis,es之间的数据同步 ,分布式事务,削峰填谷等.....

在微服务中,rabbitmq是我们经常用到的消息中间件。它能够异步的在各个业务之中进行消息的接受和发送,那么如何保证rabbitmq的消息不丢失就显得尤为重要。

首先要分析问题,我们就要明确rabbitmq在什么时候可能会出现消息丢失的情况呢?

我们直接说结果

RabbitMQ在每个阶段都有可能使消息发生丢失

我们在这里把他们简单归结为三个层面

层面一 :生产者发送消息没有到达交换机或者没有到达绑定的队列。

层面二:RabbitMQ宕机可能导致的消息的丢失。

层面三:消费者宕机导致消息丢失。

层面一的解决方法常见的是

1.生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到Mq的过程中丢失,消息发送到Mq以后,会返回一个结果给发送者,表示消息的发送成功。

情况一:发送成功 生产者正常发送消息到队列之后会返回一个publish-confirm ack 这个意思是告诉生产者已经接收到消息了。

情况二:发送失败 这里的发送失败有两种,一种是生产者发送到交换机失败 此时返回 publish-confirm nack  。第二种是生产者发送到队列失败 返回 publish-return ack。

开启生产者确认机制的代码如下 ,在生产者的配置文件中加入以下配置
 

spring:rabbitmq:publisher-confirm-type: correlated #开启生产者确认机制publisher-returns: true

这里的

publisher-confirm-type:有三种模式可以选择:

第一种是none:代表关闭confirm机制

第二种是 simple:表示同步阻塞并等待mq的回执消息,即发送完消息后不能干其他的事情,只能等待mq的回执,很显然这样效率很低。

第三种是correlated:MQ异步回调方式返回回执消息,即生产者发送完消息后可以干其他的事情,直到接收到mq的回执。很明显这种效率要优于第二种。

配置return callback的代码如下,每个RabbitTemplate只能配置一个 代码如下
 

package com.itheima.publisher.com.it.heima.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;/*** @Auther: QuJingChuan* @Date: 2024/1/13 10:34* @Description:*/
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//配置回调rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.debug("收到消息return的callback,  {},{},{},{},{}",returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getMessage(),returnedMessage.getReplyCode(),returnedMessage.getReplyText());}});}
}

Confirm Callback需要每次发消息的时候都要配置(要制定发消息的id方便回执的时候直到是谁发的消息)这里写一个测试类方便大家看。

 @Testvoid testConfirmCallback() throws InterruptedException {//创建cd 参数为每次发送消息的idCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//添加confirmCallBackcorrelationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {//这种情况一般是运行出现bug,一般不会发生。log.error("消息回调失败",ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {log.debug("收到confirm callback 回执");if (result.isAck()){//消息发送成功log.debug("消息发送成功收到ack");}else {//消息发送失败log.debug("消息发送失败收到nack,原因:{}",result.getReason());//TODO 重发消息等业务}}});rabbitTemplate.convertAndSend("amqp.test","amqptest","hello qjc",correlationData);Thread.sleep(2000);}

那么我们如何解决这个问题呢
方案一:重发消息 

方案二:记录日志

方案三:保存到数据库中定时发送,发送成功后删除表中的数据。

方案四:交给人工处理。

~生产者确认机制需要额外的网络和系统的资源开销,尽量不要使用。

~如果业务需要,那么无需开启publisher-return机制,因为一般路由失败都是自己业务的原因。

~对于nack消息可以有限次数的重试,依然失败则记录异常消息。

层面二的解决方法常见的是

2.消息持久化

由于mq是基于内存存储消息的,那么在mq服务宕机等一些情况下可能导致消息的丢失。同时内存空间有限,当消费者出现故障或者处理过慢,会导致消息积压,mq会对消息做迁移(page out 写入磁盘)从而引发mq阻塞。我们将消息存储在磁盘上就避免了这个问题。

一 :持久化交换机。

这里要选择Durable,因为Transient是临时交换机,当mq宕机后会消失。

代码展示
 

 @Beanpublic DirectExchange simpleExchange(){//分别是三个参数 交换机名称 是否持久化 当没有队列绑定时是否自动删除return new DirectExchange("qjc.exchange",true,false);}

二 :持久化队列。

这个与交换机类似,在此不做赘述。

代码展示

@Beanpublic Queue simpleQueue(){//springamqp在使用QueueBuilder来创建队列的时候,默认就是持久化的return QueueBuilder.durable("qjc.queue").build();}

三 :持久化消息。

这里选择delivery mode 选择2 ,1是不持久的。

代码展示

 Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
如果不选择持久化队列,交换机,消息的话我们还有另一种方案

Lazy Queue(惰性队列)

惰性队列的特征如下

~接受到消息的时候直接存入磁盘而非内存(内存中只保留最近的消息)

~消费者需要消息的时候才会从磁盘中取出数据加载到内存

~支持数百万条的消息存储

在mq3.12版本后,所有的队列都是Lazy Queue模式,无法更改。

如果各位小伙伴的版本低于3.12那我这里提供了两种方式创建惰性队列

或用注解声明

    @RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy")))public void listenLazyQueue(String msg){log.debug("接收到lazyqueue的消息" + msg);}

3.消费者确认机制

RabbitMQ支持消费者确认机制,即:当消费者处理消息后可以向mq发送ack回执,mq收到消息后会在队列中删除该消息。

SpringAMQP已经实现了消息确认的功能,并且允许我们通过配置文件选择ack的处理方式,有三种方式。

- none: 不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用  
- manual: 手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活  
- auto: 自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.  
当业务出现异常时,根据异常判断返回不同结果:  
- 如果是业务异常,会自动返回nack  
- 如果是消息处理或校验异常,自动返回reject

注意我们需要再消费者的配置文件中加入参数

这就是mq保证消息不丢失的一些方式和解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/655126.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

档案数字化转型面临问题

档案数字化转型面临以下问题&#xff1a; 1. 技术问题&#xff1a;档案数字化需要借助先进的技术手段和设备&#xff0c;包括扫描仪、存储设备和数据管理软件等。这些技术的成本高、操作复杂&#xff0c;需要专业的人员进行操作和维护。 2. 安全问题&#xff1a;档案数字化后的…

【Java程序设计】【C00176】基于SSM的图书管理系统(论文+PPT)

基于SSM的图书管理系统&#xff08;论文PPT&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于ssm的图书管理系统 本系统分为前台系统、后台管理员以及后台学员3个功能模块。 前台系统&#xff1a;当游客打开系统的网址后&#xff0c;首先看到的就…

gmsh 01 对多个面,及体进行剖分

#include <set> #include <cmath> #include <gmsh.h>#include <iostream>int main(int argc, char** argv) {gmsh::initialize(argc, argv); // 初始化gmsh::model::add("t2"); // 创建 t2 modeldouble lc 0.05; gmsh::model::geo::add…

二手交易|校园二手交易小程序|基于微信小程序的闲置物品交易平台设计与实现(源码+数据库+文档)

校园二手交易小程序目录 目录 基于微信小程序的闲置物品交易平台设计与实现 一、前言 二、系统功能设计 三、系统实现 1、用户信息管理 2、商品信息管理 3、公告信息管理 4、论坛信息管理 四、数据库设计 1、实体ER图 五、核心代码 六、论文参考 七、最新计算机毕…

斜率优化dp模型整理

300. 任务安排1&#xff08;300. 任务安排1 - AcWing题库&#xff09; 思路&#xff1a;很明显这些任务是按顺序排好的&#xff0c;我们能执行的操作只是对它们进行分批&#xff0c;我们可以发现每一批之前的开始时间s&#xff0c;影响的不仅仅是当前这一批的结束时间&#xff…

8.6跳跃游戏②(LC45-M)

算法&#xff1a; 与上一题一样&#xff0c;还是看最大覆盖范围 要从覆盖范围出发&#xff0c;不管怎么跳&#xff0c;覆盖范围内一定是可以跳到的&#xff0c;以最小的步数增加覆盖范围&#xff0c;覆盖范围一旦覆盖了终点&#xff0c;得到的就是最少步数&#xff01; 这里…

C++大学教程(第九版)7.30 打印array对象 7.31 逆序打印字符串(递归练习题)

文章目录 题目代码运行截图题目代码运行截图 题目 (打印array对象)编写一个递归函数printArray它以一个array对象一个开始下标和一个结束下标作为实参&#xff0c;不返回任何值并打印这个array对象。当开始下标和结束下标相等时&#xff0c;这个函数应该停止处理并返回。 代码…

跨镜网络解决方案:SD-WAN专线,实现企业全球互联

在全球化的背景下&#xff0c;越来越多的企业需要在海外社媒平台推广、研发访问海外平台、实现海外分部和国内互联互通等需求。然而&#xff0c;面对不同地区之间网络连接不稳定、高延迟、高成本等问题&#xff0c;如何实现企业的全球互联成为了一个亟待解决的难题。 幸运的是&…

laravel学习笔记

这两天公司活不多&#xff0c;学习了一下laravel框架。经过几天的学习&#xff0c;也对它有一些大概的了解。现在我就把我所学的到在这儿做下笔记吧。 一、laravel集合 其实&#xff0c;这里所说的集合&#xff0c;跟前端es6中的集合概念还是有那一点的不太一样。前端es6的集合…

C++ 数论相关题目:卡特兰数应用、快速幂求组合数。满足条件的01序列

给定 n 个 0 和 n 个 1 &#xff0c;它们将按照某种顺序排成长度为 2n 的序列&#xff0c;求它们能排列成的所有序列中&#xff0c;能够满足任意前缀序列中 0 的个数都不少于 1 的个数的序列有多少个。 输出的答案对 1097 取模。 输入格式 共一行&#xff0c;包含整数 n 。 …

Java 集合 02 综合练习+基本数据类型对应的包装类

练习1、 自己写的代码&#xff1a; import java.util.ArrayList; public class practice {public static void main(String[] args) {//定义一个集合ArrayList<String> list new ArrayList<>();list.add("aaa");list.add("bbb");list.add(…

python实现豆瓣电影搜索并自动添加相关信息

1.豆瓣电影搜索 2.豆瓣电影信息自动添加 搜索 众所周知&#xff0c;豆瓣搜索有加密&#xff0c;得解密才行&#xff0c;还好网上众多大神都给破解了&#xff0c;那咱们拿来直接使用就行 相关代码仓库&#xff1a;xadmin-server/movies/utils/douban/search.py at movies nin…

React中使用LazyBuilder实现页面懒加载方法一

前言&#xff1a; 在一个表格中&#xff0c;需要展示100条数据&#xff0c;当每条数据里面需要承载的内容很多&#xff0c;需要渲染的元素也很多的时候&#xff0c;容易造成页面加载的速度很慢&#xff0c;不能给用户提供很好的体验时&#xff0c;懒加载是优化页面加载速度的方…

AJAX进阶(重点)

目录 ◆ 同步代码和异步代码 ◆ 回调函数地狱和 Promise 链式调用 什么是回调函数地狱&#xff1f; Promise - 链式调用 什么是Promise链式调用&#xff1f; Promise 链式应用 &#xff08;重点&#xff09; ◆ async 和 await 使用 async函数和await_捕获错误 ◆ 事…

二分查找|详细讲解|两种写法

二分查找 目录 1 介绍2 例题引入3 “左闭右闭”写法4 “左闭右开”写法 1 介绍 二分查找适用于从一个递增或递减的有序数列中查找某一个值 因此&#xff0c;使用二分查找的条件是&#xff1a; 用于查找的内容从逻辑上来看是有序的查找的数量只能是一个而不是多个 在二分查…

RAG应用-七个最常见的故障点

近日&#xff0c;国外研究者发布了一篇论文《Seven Failure Points When Engineering a Retrieval Augmented Generation System》&#xff0c;探讨了在实际工程落地RAG应用过程中容易出的七类问题。 论文地址&#xff1a;https://arxiv.org/pdf/2401.05856.pdf 一、丢失内容&…

【Go-zero】手把手带你在goland中创建api文件并设置高亮

【Go-zero】手把手带你在goland中创建api文件并设置高亮 大家好 我是寸铁&#x1f44a; 总结了一篇手把手带你在goland中创建api文件并设置高亮解决方案的文章✨ 喜欢的小伙伴可以点点关注 &#x1f49d; 问题复盘 在使用go-zero 框架时&#xff0c;常常需要用到goctl 一键生成…

简单易用的购物车

实现了购物基本功能&#xff0c;那来修改一下就可以用&#xff0c;app,h5,小程序都可以 购物车插件 - DCloud 插件市场

会话技术复习笔记

一.登录校验的需求 什么是登录校验&#xff1f; 所谓登录校验&#xff0c;指的是我们在服务器端接收到浏览器发送过来的请求之后&#xff0c;首先我们要对请求进行校验。先要校验一下用户登录了没有&#xff0c;如果用户已经登录了&#xff0c;就直接执行对应的业务操作就可以…

数据结构——用Java实现二分搜索树

目录 一、树 二、二分搜索树 1.二叉树 2.二分搜索树 三、代码实现 1.树的构建 2.获取树中结点的个数 3.添加元素 4.查找元素 &#xff08;1&#xff09;查找元素是否存在 &#xff08;2&#xff09;查找最小元素 &#xff08;3&#xff09;查找最大元素 5.二分搜索…