RabbitMQ怎么保证可靠性

RabbitMQ怎么保证可靠性

  • 前言
  • 生产端问题
    • 解决方案
    • 代码
    • 验证
  • RabbitMQ问题
  • 消费端问题
    • 解决方案
    • 代码
    • 验证
  • 总结

前言

RabbitMQ相信大家都非常熟悉了,今天咱们来聊聊怎么保证RabbitMQ的可靠性。

那什么时候会出现问题呢?

第一种是生产端出现的问题。我们向队列中发送消息的时候,消息不一定可以发送到MQ中,这个时候如果我们不做任何处理,这样消息丢失了。

第二种则是RabbitMQ出现的问题。也就是说现在生产端的成功将消息发送到了RabbitMQ,但由于MQ并没有做持久化,这样宕机重启之后消息可能就丢失了。

第三种则是消费端的问题。消费端处理消息时如果出现异常,默认的解决方式是在重复消费多次,当次数超过阈值时直接删除消息,这也导致消息丢失。

接下来咱们就看看怎么应对以上三种问题。

生产端问题

解决方案

这里我们需要清楚发送的一个大体流程。

生产端发送消息到MQ之后,会收到一个结果,这个结果有acknack两种。

其中ack代表消息成功到达了交换机,但并不意味者消息到达了队列。不过ack的情况下消息未送达队列,会有相应的错误信息提醒。

nack就代表消息并未送达交换机

那么,怎么才能知道消息发送情况呢?

可以设置callback来获取消息发送结果。

代码

局部callback设置如下

    @GetMapping("testmq")public Result testmq(){String orderId = String.valueOf(UUID.randomUUID());String messageData = "下订单!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map=new HashMap<>();map.put("orderId",orderId);map.put("messageData",messageData);map.put("createTime",createTime);//        设置发送的callbackCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());correlationData.getFuture().addCallback(result -> {// 判断结果if (result.isAck()) {log.info("发送成功");} else {log.error("消息未达到交换机,发送失败");}}, ex -> {log.error("出现异常,发送失败");});rabbitTemplate.convertAndSend(RabbitMQConfig.NORMALEXCHANGE, RabbitMQConfig.TESTROUTING, map, message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}, correlationData);return Result.succ("ok");}

验证

消息发送成功
在这里插入图片描述
交换机名称有误
在这里插入图片描述
队列路由出错
虽然没有错误,但给了我们warning。
在这里插入图片描述

RabbitMQ问题

这里就比较简单了,那就是做下持久化就可以了

首先是交换机,队列和消息的持久化
交换机

@BeanDirectExchange normalExchange() {/*** durable 是否持久化* autoDelete 没有queue绑定时是否自动删除*/return new DirectExchange(NORMALEXCHANGE, true, false);}

队列

@Beanpublic Queue cleanDQueue() {return QueueBuilder.durable(CLEANQUEUE).build();}

消息的持久化

rabbitTemplate.convertAndSend(RabbitMQConfig.NORMALEXCHANGE, RabbitMQConfig.TESTROUTING, map, message -> {// 设置消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}, correlationData);

消费端问题

解决方案

消费端出现错误时,会进行重试,当重试次数超过阈值之后有三种解决方案,如下

  • RejectAndDontRequeueRecoverer:超过阈值,直接丢失消息。
  • ImmediateRequeueMessageRecoverer:超过阈值,返回nack,然后消息重新入队。
  • RepublishMessageRecoverer:超过阈值直接将消费失败的消息投递到指定交换机。

这里我们以RepublishMessageRecoverer为例做下演示。

代码

首先需要声明消费消息失败后传递的交换机和队列

   @BeanDirectExchange normalExchange() {/*** durable 是否持久化* autoDelete 没有queue绑定时是否自动删除*/return new DirectExchange(NORMALEXCHANGE, true, false);}
//  用于处理消费失败消息的队列@Beanpublic Queue republishQueue() {return QueueBuilder.durable(REPULISHQUEUE).build();}
//  绑定失败消费消息队列@BeanBinding bindingRepublish() {return BindingBuilder.bind(republishQueue()).to(normalExchange()).with(REPULISHROUTING);}

然后配置下RepublishMessageRecoverer策略,随便找个config注入下bean就可以。

 //  设置RepublishMessageRecoverer,消费失败的消息转移到另一队列中,交给管理员手动处理@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {/*** NORMALEXCHANGE   接收消费失败消息的交换机* REPULISHROUTING  接收消费失败消息的路由key*/return new RepublishMessageRecoverer(rabbitTemplate, NORMALEXCHANGE, REPULISHROUTING);}

验证

咱们看看如果消费出错会咋样

我们可以看到被消费的队列中信息被删除了。
在这里插入图片描述
然后我们设置的转入队列中的消息数加一,这时候我们可以接收下该队列中的信息,存储到数据库中,方便维护人员手动进行处理。
在这里插入图片描述

总结

从生产端、RabbitMQ以及消费端三方面介绍了一下怎么保证RabbitMQ的可靠性,另外还有关于死信队列和延迟队列的内容在这篇博客中,大家有兴趣可以看一下。

RabbitMQ的死信队列和延迟队列

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/20753.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第五十天 进入子序列问题 | 300.最长递增子序列 674.最长连续递增序列 718.最长重复子数组

题目&#xff1a;300.最长递增子序列 1.dp数组的定义&#xff1a; 以nums[i]为结尾的最长递增子序列的长度 为什么一定表示 “以nums[i]结尾的最长递增子序” &#xff0c;因为我们在 做 递增比较的时候&#xff0c;如果比较 nums[j] 和 nums[i] 的大小&#xff0c;那么两个递…

MySQL-事务日志

事务的隔离性由 锁机制 实现 事务的原子性、一致性、隔离性 由事务的 redo日志 和 undo 日志来保证 redo log 称为 重做日志&#xff0c;提供再写入操作&#xff0c;恢复提交事务修改的页操作&#xff0c;用来保证事务的持久性。undo log 称为 回滚日志&#xff0c;回滚行记录…

selenium自动化介绍

文章目录 一、selenium原理 安装二、selenium使用1.创建浏览器对象&#xff0c;访问网址2.消除警告提示3.不显示浏览器中受控制字样4.防检测5.设置延时5.1强制延时5.2隐式延时 6.设置浏览器窗口大小 三、案例实战&#xff1a;百度搜索四、iframe标签五、案例实战&#xff1a;Q…

第一周 数据结构与算法以及复杂度分析

数据结构与算法 算法定义 算法&#xff08;algorithm&#xff09;是在有限时间内解决特定问题的一组指令或操作步骤&#xff0c;它具有以下特性。 1.问题是明确的&#xff0c;包含清晰的输入和输出定义。 2.具有可行性&#xff0c;能够在有限步骤、时间和内存空间下完成。 3.…

【第五节】C++的多态性与虚函数

目录 前言 一、子类型 二、静态联编和动态联编 三、虚函数 四、纯虚函数和抽象类 五、虚析构函数 六、重载&#xff0c;重定义与重写的异同 前言 面向对象程序设计语言的三大核心特性是封装性、继承性和多态性。封装性奠定了基础&#xff0c;继承性是实现代码重用和扩展…

Linux内网中安装jdk1.8详细教程

本章教程,主要介绍如何在内网环境中配置JDK1.8环境变量 一、下载Linux版压缩包 下载地址:https://www.oracle.com/java/technologies/downloads/#java8 下载完成之后,通过XFTP等工具,将安装包上传到内网服务器 二、安装配置步骤 1、解压压缩包 tar -zxvf /usr/local/jdk-…

linux--自动备份文件

问题&#xff1a; 1&#xff0c;rm删除无法找回&#xff1b; 2&#xff0c;使用git的时候会出现各种可能导致文件丢失&#xff0c;无法找回的情况。 3&#xff0c;......。 设置自动备份文件和目录

使用Python, 用shp文件边界裁剪tif文件

在Python中, 用shp文件边界裁剪tif文件 from osgeo import gdal import osgdal.PushErrorHandler("CPLQuietErrorHandler")def subset_by_shp(shape_fn, raster_fn, raster_out):"""根据 shapefile 对栅格文件进行裁剪并输出结果:param shape_fn: sh…

根据PDF模版填充数据并生成新的PDF

准备模版 使用 福昕高级PDF编辑器 &#xff08;本人用的这个&#xff0c;其他的也行&#xff0c;能作模版就行&#xff09;打开PDF文件点击 表单 选项&#xff0c;点击 文本域在需要填充数据的位置设计文本域设置 名称、提示名称相当于 属性名&#xff0c;提示就是提示&#x…

基于SSM的“基于Apriori算法的网络书城”的设计与实现(源码+数据库+文档)

基于SSM的“基于Apriori算法的网络书城”的设计与实现&#xff08;源码数据库文档) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SSM 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 网站功能展示图 首页 商品分类 热销 新品 我的订单 个…

二位偏序,P3660 [USACO17FEB] Why Did the Cow Cross the Road III G

一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 P3660 [USACO17FEB] Why Did the Cow Cross the Road III G - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 二、解题报告 1、思路分析 二维偏序问题 我们将坐标按照第一维排序 然后树状数组维护区间内的…

【深度学习】【STWave】时空图预测,车流量预测,Efficient Spectral Graph Attention Network

Spatio-Temporal meets Wavelet: Disentangled Traffic Flow Forecasting via Efficient Spectral Graph Attention Network 代码&#xff1a;https://github.com/LMissher/STWave 论文&#xff1a;https://arxiv.org/abs/2112.02740 帮助&#xff1a; https://docs.qq.com/s…

C++STL---vector模拟实现

通过上篇文章&#xff0c;我们知道vector的接口实际上和string是差不多的&#xff0c;但是他俩的内部结构却大不一样&#xff0c;vector内有三个成员变量&#xff1a;_start、_finish、_endofstorage: _start指向容器的头元素&#xff0c;_finish指向有效元素末尾的元素&#x…

Vue2 + Element UI 封装 Table 递归多层级列表头动态

1、在 components 中创建 HeaderTable 文件夹&#xff0c;在创建 ColumnItem.vue 和 index.vue。 如下&#xff1a; 2、index.vue 代码内容&#xff0c;如下&#xff1a; <template><div><el-table:data"dataTableData"style"width: 100%"…

OSM历史10年(2014-2024)全国数据下载(路网、建筑物、POI、水系、地表覆盖利用······)

点击下方全系列课程学习 点击学习—>ArcGIS全系列实战视频教程——9个单一课程组合系列直播回放 零、前沿 这次向大家介绍一下OSM&#xff08;OpenStreetMap&#xff09;十年历史数据&#xff08;2014—2014&#xff09;的下载方法。当然我们也下载好分享给大家&#xff…

JAVA web期末复习总结

C/S结构与B/S结构区别&#xff1a; 在C/S结构中&#xff0c;客户端通常是一个独立的应用程序&#xff0c;需要在用户的计算机上安装和运行。而在BS结构中&#xff0c;客户端是一个Web浏览器&#xff0c;用户只需要通过浏览器打开网页&#xff0c;不需要安装额外的应用程序。 C…

python正则表达式中的分组功能

在Python的re模块中&#xff0c;group()方法是用于从一个匹配的对象&#xff08;例如&#xff0c;re.match或re.search返回的对象&#xff09;中提取匹配的字符串。 当你使用正则表达式进行匹配时&#xff0c;匹配对象会包含原始字符串中与模式匹配的部分。group()方法可以用来…

Reactive 踩坑

vue 响应式踩坑 let questionInfo reactive([ , ]) api.getQuestions( id ).then(function (response){// 这里用法有问题questionInfo response.data.data.questions;concole.log(questionInfo) })响应式数据本身是个函数&#xff0c;&#xff08;不然咋帮你动态变化页面…

k8s_设置dns

配置k8s dns 在 Kubernetes 集群中&#xff0c;CoreDNS 是默认的 DNS 服务器&#xff0c;它负责处理集群内所有的 DNS 请求。 kubectl edit cm coredns -n kube-system (此命令修改coredns 配置) kubectl describe cm coredns -n kube-system&#xff08;此命令查看coredns 配…

程序员上岸指南

如果你还在996&#xff0c;大小周&#xff0c;感觉身体被掏空&#xff0c;那么你可以看看下面这篇文章&#xff0c;我特意搜集了一些苦逼程序员的上岸教程。 人生真的就是做几道选择题&#xff0c;选错了&#xff0c;忙也是瞎忙。选对了&#xff0c;躺着都能赢。总的来说&#…