RabbitMQ之消息的可靠性传递

系列文章目录

提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
RabbitMQ之消息的可靠性传递


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 系列文章目录
  • 前言
  • 一、消息的可靠性传递的概念
  • 二、三种模式的实现
    • 环境准备
    • 确认模式
    • 退回模式
    • 消费者确认
  • 总结


前言

提示:这里可以添加本文要记录的大概内容:

在当今的信息化时代,消息传递在企业级应用和分布式系统中扮演着至关重要的角色。而 RabbitMQ 作为一款强大的消息队列中间件,以其可靠性和高性能成为了众多开发者的首选。本文将深入探讨 RabbitMQ 中消息的可靠性传递机制,以及如何在实际应用中确保消息的不丢失。
通过阅读本文,您将了解到 RabbitMQ 可靠消息传递的核心概念和工作原理。我们将探讨消息确认、持久性、队列和交换机的配置以及错误处理等关键主题,以帮助您构建高度可靠的消息传递系统。
无论您是刚刚开始接触 RabbitMQ,还是已经在实际项目中使用它,本文都将为您提供有价值的见解和实用的指导。让我们一起深入了解 RabbitMQ 的可靠性特性,掌握构建可靠系统的关键技能。


提示:以下是本篇文章正文内容,下面案例可供参考

一、消息的可靠性传递的概念

在RabbitMQ中,消息投递的路径为:生产者->交换机->队列->消费者。而在消息的投递过程中,每一个环节都可能投递失败,那么RabbitMQ是通过什么方法确认消息投递成功的呢?

  • 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
  • 退回模式(return)可以监听消息是否从交换机成功传递到队列。
  • 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

二、三种模式的实现

环境准备

1.首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:

spring:rabbitmq:host: 192.168.0.162port: 5672username: zhangsanpassword: zhangsanvirtual-host: /#日志格式
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

2.在生产者的配置类创建交换机和队列

@Configuration
public class RabbitConfig {private final String EXCHANGE_NAME="my_topic_exchange";private final String QUEUE_NAME="my_queue";// 1.创建交换机@Bean("bootExchange")public Exchange getExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME) // 交换机类型.durable(true) // 是否持久化.build();}// 2.创建队列@Bean("bootQueue")public Queue getMessageQueue(){return QueueBuilder.durable(QUEUE_NAME) // 队列持久化.build();}// 3.将队列绑定到交换机@Beanpublic Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();}
}

确认模式

1.生产者配置文件开启确认模式

spring:rabbitmq:host: 192.168.0.162port: 5672username: zhangsanpassword: zhangsanvirtual-host: /# 开启确认模式publisher-confirm-type: correlated

2.生产者定义确认模式的回调方法

@SpringBootTest
public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testConfirm(){// 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** 被调用的回调方法* @param correlationData 相关配置信息* @param ack 交换机是否成功收到了消息* @param cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){System.out.println("confirm接受成功!");}else{System.out.println("confirm接受失败,原因为:"+cause);// 做一些处理。}}});rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","send message...");}
}

退回模式

1.生产者配置文件开启退回模式

spring:rabbitmq:host: 192.168.0.162port: 5672username: zhangsanpassword: zhangsanvirtual-host: /# 开启确认模式publisher-confirm-type: correlated# 开启回退模式publisher-returns: true

2.生产者定义退回模式的回调方法

@SpringBootTest
public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testReturn(){// 定义退回模式的回调方法。交换机发送到队列失败后才会执行returnedMessage方法rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {/*** @param returned 失败后将失败信息封装到参数中*/@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息对象:"+returned.getMessage());System.out.println("错误码:"+returned.getReplyCode());System.out.println("错误信息:"+returned.getReplyText());System.out.println("交换机:"+returned.getExchange());System.out.println("路由键:"+returned.getRoutingKey());// 处理消息...}});rabbitTemplate.convertAndSend("my_topic_exchange","my_routing1","send message...");}
}

消费者确认

在 RabbitMQ 中,当消费者接收到消息后,会向队列发送一个确认消息,表示已经成功接收并处理了该消息。只有当确认消息被发送后,该消息才会从队列中被移除。这种机制被称为消费者消息确认(Consumer Acknowledge,简称 Ack)。这就类似于快递员派送快递时需要我们签收一样,否则快递就会一直存在于快递公司的系统中。
消息确认分为自动确认和手动确认两种方式。自动确认意味着只要消费者接收到消息,无论是否成功处理了该消息,都会自动发送确认消息,并将消息从队列中移除。然而,在实际开发过程中,如果在接收到消息后业务处理出现异常,那么消息就可能会丢失。因此,需要设置手动确认,也就是只有在业务处理成功后,才会发送确认消息通知队列;如果出现异常,则会发送拒绝消息,让消息仍然保留在队列中。

  • 自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
  • 手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”

1.消费者配置开启手动签收

spring:rabbitmq:host: 192.168.0.162port: 5672username: zhangsanpassword: zhangsanvirtual-host: /# 开启手动签收listener:simple:acknowledge-mode: manual

2.消费者处理消息时定义手动签收和拒绝签收的情况

@Component
public class AckConsumer {@RabbitListener(queues = "my_queue")public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {// 消息投递序号,消息每次投递该值都会+1long deliveryTag = message.getMessageProperties().getDeliveryTag();try {int i = 1/0; //模拟处理消息出现bugSystem.out.println("成功接受到消息:"+message);// 签收消息/*** 参数1:消息投递序号* 参数2:是否一次可以签收多条消息*/channel.basicAck(deliveryTag,true);}catch (Exception e){System.out.println("消息消费失败!");Thread.sleep(2000);// 拒签消息/*** 参数1:消息投递序号* 参数2:是否一次可以拒签多条消息* 参数3:拒签后消息是否重回队列*/channel.basicNack(deliveryTag,true,true);}}
}

总结

提示:这里对文章进行总结:

在 RabbitMQ 中,为了确保消息的可靠性传递,需要使用确认应答和重传机制。当生产者将消息发送到 RabbitMQ 服务器时,服务器会向生产者返回一个确认应答,表示消息已成功接收。如果生产者没有收到确认应答,它可以在一定时间内重传消息,直到收到确认应答或达到重试次数限制。
当消费者从队列中消费消息时,它会向 RabbitMQ 服务器发送一个确认应答,表示消息已成功处理。如果消费者在处理消息时出现异常或崩溃,RabbitMQ 服务器可以通过重传机制将消息重新推送给其他消费者进行处理,以确保消息不会丢失。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/633261.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

聚类模型评估指标

聚类模型评估指标-轮廓系数 计算样本i到同簇其它样本到平均距离ai,ai越小,说明样本i越应该被聚类到该簇(将ai称为样本i到簇内不相似度);计算样本i到其它某簇Cj的所有样本的平均距离bij,称为样本i与簇Cj的…

企业设计图纸安全、企业设计图纸安全软件

设计图纸对于企业的重要性不言而喻,因此保障设计图纸的安全显得尤为重要。以下是企业设计图纸安全需要注意的几个方面: 访问控制:只有授权人员才能访问设计图纸,需要通过账号密码或者其他验证方式进行身份认证。 加密传输&#…

常见面试题之JavaScript(1)

JS由哪三部分组成? ECMAScript:JS的核心内容,描述了语言的基础语法,比如var,for,数据类型(数组、字符串) 文档对象模型(DOM):DOM把整个HTML页面规划为元素构成的文档 浏览器对象模型 (BOM):对…

Windows下安装alipay-sdk-python时,pycrypto安装报错问题处理

1、安装alipay-sdk-python 时,保存内容如下。 Building wheels for collected packages: pycryptoBuilding wheel for pycrypto (setup.py) ... error error: subprocess-exited-with-error python setup.py bdist_wheel did not run successfully.│ exit c…

从零开始了解域名:什么是域名、域名的作用及类别

在互联网时代,域名作为一个网站在互联网上的身份标识,无论是企业或者个人建设网站,获取域名都是其中非常关键的一环。一个好的域名不仅便于记忆,还有助于强化品牌、利于宣传,让用户更好的找到你的网站。在下面的内容中…

基于YOLOv8深度学习的100种中草药智能识别系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、深度学习实战

《博主简介》 小伙伴们好,我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源,可关注公-仲-hao:【阿旭算法与机器学习】,共同学习交流~ 👍感谢小伙伴们点赞、关注! 《------往期经典推…

正则表达式..

1.字符串的合法检验 现在有一个字符串验证码 我们需要检验其的合法性 什么条件才能够使得字符串合法呢?就是6-10个字符串长度 并且以字母开头 并且其中由字母、数字、下划线构成 那么我们可以先通过自定义的方式进行检验 public class Main {public static void m…

解锁文字魔法:探索自然语言处理的秘密——从技术揭秘到应用实战!

目录 前言 关键技术——揭密自然语言处理的秘密武器! 领域应用——自然语言处理技术在不同领域的奇妙表演! 超越极限——自然语言处理技术面临的顽强挑战揭秘! 科技VS伦理——自然语言处理技术的发展与伦理社会的纠结较量! 开…

买家福音:亚马逊鲲鹏系统全自动操作助你轻松搞定一切

我一直以来都是亚马逊的忠实用户,但是最近我发现了一款真正令人惊叹的工具,改变了我在平台上的经验。我想分享一下我的感受,最近,我得知并尝试了亚马逊鲲鹏系统,简直是为买家账号管理量身定制的利器。在我账号过多时&a…

yolov8的目标检测、实例分割、关节点估计的原理解析

1 YOLO时间线 这里简单列下yolo的发展时间线,对每个版本的提出有个时间概念。 2 yolov8 的简介 工程链接:https://github.com/ultralytics/ultralytics 2.1 yolov8的特点 采用了anchor free方式,去除了先验设置可能不佳带来的影响借鉴General…

PHP Fatal error: Unparenthesized `a ? b : c ? d : e` is not supported.

这个错误是关于三元运算符的错误 这个错误在php8.0以下的版本好像是没问题呢 PHP Fatal error: Unparenthesized a ? b : c ? d : e is not supported. Use either (a ? b : c) ? d : e or a ? b : (c ? d : e) in /cangku/app/common.php on line 57 这个问题是 程…

一站式获取 PieCloudDB Database 产品、社区及数据库行业全动态

第一部分 PieCloudDB Database 最新动态 PieCloudDB 推出社区版全新版本 11月14日,PieCloudDB 再度升级,推出社区版全新版本,免费面向用户开放下载,新版本将支持单机和多节点部署两种方式。欢迎试用! 下载链接&…

【Python】如何方便地在一台服务器部署多个Python环境

【背景】 项目一多,为了防止环境互相干扰,不同的项目用包含了不同包的环境跑比较安全。但是新建一个虚拟环境在某些条件下觉得麻烦,特别是不能正常下载的内网环境,有没有别的更方便地在一台服务器快速部署多个Python环境的方法呢…

linux docker-compose安装失败解决

1.去github下载到本地 https://github.com/docker/compose/releases/ 2.上传到linux 服务器 mv dokcer-compose-linux-x86_64 /usr/loacal/bin/docker-compose 3.给权限 chmod x /usr/local/bin/docker-compose 4.查看是否安装成功 docker-compose -version 5.卸载 …

第14章_集合与数据结构拓展练习(前序、中序、后序遍历,线性结构,单向链表构建,单向链表及其反转,字符串压缩)

文章目录 第14章_集合与数据结构拓展练习选择填空题1、前序、中序、后序遍历2、线性结构3、其它 编程题4、单向链表构建5、单向链表及其反转6、字符串压缩 第14章_集合与数据结构拓展练习 选择填空题 1、前序、中序、后序遍历 分析: 完全二叉树: 叶结点…

nodejs学习计划--(二)fs文件系统和path模块

1. fs模块 fs 全称为 file system ,称之为 文件系统 ,是 Node.js 中的 内置模块 ,可以对计算机中的磁盘进行操 作。 1. 文件写入 文件写入就是将 数据 保存到 文件 中,我们可以使用如下几个方法来实现该效果 |方法|说明| |-|-| |w…

Python学习之路-正则表达式

Python学习之路-正则表达式 简介 正则表达式是计算机科学的一个概念。正则表达式使用单个字符串来描述、匹配一系列匹配某个句法规则的字符串。在很多文本编辑器里,正则表达式通常被用来检索、替换那些匹配某个模式的文本。 RE模块 在Python中需要通过正则表达式…

Flink TaskManager内存管理机制介绍与调优总结

内存模型 因为 TaskManager 是负责执行用户代码的角色,一般配置 TaskManager 内存的情况会比较多,所以本文当作重点讲解。根据实际需求为 TaskManager 配置内存将有助于减少 Flink 的资源占用,增强作业运行的稳定性。 TaskManager 内…

深度解析 Compose 的 Modifier 原理 -- Modifier.layout()、LayoutModifier

"Jetpack Compose - - Modifier 原理系列文章 " 📑 《 深度解析 Compose 的 Modifier 原理 - - Modifier、CombinedModifier 》 📑 《 深度解析 Compose 的 Modifier 原理 - - Modifier.composed()、ComposedModifier 》 📑 《 深度…

【Debian】非图形界面Debian10.0.0安装xfce和lxde桌面

一、安装 1. Debian10.0.0安装xfce桌面 sudo apt update sudo apt install xfce4 startxfce4 2. Debian10.0.0安装lxde桌面 sudo apt-get install lxde安装后重启电脑。 二、说明 XFCE、LXDE 和 GNOME 是三个流行的桌面环境,它们都是为类 Unix 操作系统设计…