RabbitMQ进阶

1.消息可靠性

消息从发送,到消费者接收,会经历多个过程:

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

- 发送时丢失:
  - 生产者发送的消息未送达exchange
  - 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案:

- 生产者确认机制
- mq持久化
- 消费者确认机制
- 失败重试机制

1.1.生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认

    • 消息成功投递到交换机,返回ack

    • 消息未投递到交换机,返回nack

  • publisher-return,发送者回执

    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

 

1.1.1.修改配置

首先,修改publisher服务中的application.yml文件,添加下面的内容:

spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true

说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:

    • simple:同步等待confirm结果,直到超时

    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback

  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

1.1.2.定义Return回调

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:

修改publisher服务,添加一个:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});}
}

1.1.3.定义ConfirmCallback

 ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。

public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息体String message = "hello, spring amqp!";// 2.全局唯一的消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){// 3.1.ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 4.发送消息rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);// 休眠一会儿,等待ack回执Thread.sleep(2000);
}

1.2.消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化

  • 队列持久化

  • 消息持久化

1.2.1.交换机持久化

RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。

SpringAMQP中可以通过代码指定交换机持久化:

@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", true, false);
}

 事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。

可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示:

1.2.2.队列持久化

RabbitMQ中队列默认是非持久化的,mq重启后就丢失。

SpringAMQP中可以通过代码指定交换机持久化:

@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}

事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。

可以在RabbitMQ控制台看到持久化的队列都会带上D的标示:

1.2.3.消息持久化

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:

  • 1:非持久化

  • 2:持久化

用java代码指定:

 默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。

1.3.消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

设想这样的场景:

  • 1)RabbitMQ投递消息给消费者

  • 2)消费者获取消息后,返回ACK给RabbitMQ

  • 3)RabbitMQ删除消息

  • 4)消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。

而SpringAMQP则允许配置三种确认模式:

•manual:手动ack,需要在业务代码结束后,调用api发送ack。

•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失

  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack

  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

1.3.1.演示none模式

修改consumer服务的application.yml文件,添加下面内容:

### 1.3.1.演示none模式修改consumer服务的application.yml文件,添加下面内容:

 修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {log.info("消费者接收到simple.queue的消息:【{}】", msg);// 模拟异常System.out.println(1 / 0);log.debug("消息处理完成!");
}

 测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。

1.3.2.演示auto模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/750234.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】浮点型在内存中的存储

文章目录 例题引入剖析原因浮点型的二进制转换(M)正负号之分(S)科学记数法(E)关于 S E M 在内存中的存储存取浮点型时的情况讨论 例题解析整形存储为浮点型并输出浮点型存储为整形并输出 在我的上一篇博客中…

接雨水-热题 100?-Lua 中文代码解题第4题

接雨水-热题 100?-Lua 中文代码解题第4题 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图,计算按此排列的柱子,下雨之后能接多少雨水。 示例 1: 输入:height [0,1,0,2,1,0,1,3,2,1,2,1] 输出:6 解释…

算法——前缀和之除自身以外数组的乘积、和为K的子数组、和可被K整除的子数组、连续数组、矩阵区域和

这几道题对于我们前面讲过的一维、二维前缀和进行了运用,包含了面对特殊情况的反操作 目录 4.除自身以外数组的乘积 4.1解析 4.2题解 5.和为K的子数组 5.1解析 5.2题解 6.和可被K整除的子数组 6.1解析 6.2题解 7.连续数组 7.1题解 7.2题解 8.矩阵区域和 8.1解析 …

GET和POST方法的区别

GET和POST的区别 在我们开发项目的时候常常会在Controller层使用到POST方法或者GET方法,犹豫到底将接口定义为GET方法还是POST方法?那这两者之间有什么区别呢? 看一下官方定义: GET 和 POST 是 HTTP 协议中最常用的两种请求方法…

爬虫学习 Scrapy中间件代理UA随机selenium使用

目录 中间件UA、代理处理---process_requestUA随机 代理处理seleniumscrapy 中间件 控制台操作 (百度只起个名 scrapy startproject mid scrapy genspider baidu baidu.com setting.py内 ROBOTSTXT_OBEY FalseLOG_LEVEL "WARNING"运行 scrapy crawl baidu middle…

ArcGIS分享图层数据的最佳方法

在工作中,经常需要将图层数据分享给其他人。 如下图所示,需要分享的是【CJDCQ】和【GHDLTB】,图层带有符号系统: 一、分享gdb数据库及lyr文件 分享数据自然要找到源数据: 但是,gdb数据是不带符号系统的&a…

微信小程序开发系列(三十四)·自定义组件的创建、注册以及使用(数据和方法事件的使用)

目录 1. 分类和简介 2. 公共组件 2.1 创建 2.2 注册 2.3 使用 3. 页面组件 3.1 创建 3.2 注册 3.3 使用 4. 组件的数据和方法的使用 4.1 组件数据的修改 4.2 方法事件的使用 1. 分类和简介 小程序目前已经支持组件化开发,可以将页面中的功能…

springboot基于spring boot的在线答题微信小程序

摘 要 在线答题微信小程序是考试中重要的一环,在线答题是学生获取任务信息的主要渠道。为了方便学生能够在网站上查看任务信息、考试,于是开发了基于 springboot框架设计与实现了一款简洁、轻便的在线答题微信小程序。本微信小程序解决了在线答题事务中的…

2.3 HTML5新增的常用标签

2.3.1 HTML5新增文档结构标签 在HTML5版本之前通常直接使用<div>标签进行网页整体布局&#xff0c;常见布局包括页眉、页脚、导航菜单和正文部分。为了区分文档结构中不同的<div>内容&#xff0c;一般会为其配上不同的id名称。例如&#xff1a; <div id"h…

每天学习一个Linux命令之rsync

每天学习一个Linux命令之rsync 介绍 在Linux系统中&#xff0c;有许多强大的工具可用于文件和目录之间的同步和备份。其中一个最常用且功能强大的工具是rsync。rsync是一款开源的命令行工具&#xff0c;用于快速、安全和有效地将文件和目录从一个位置同步到另一个位置。 rsy…

FFmpeg转码参数说明及视频转码示例

-b : 设置音频或者视频的转码码率 -b:v 只设置视频码率 -b:a 只设置音频码率 -ab: 只设置音频码率, 默认码率大小为: 128k bit/s -g: 设置视频GOP大小,表示I帧之间的间隔,默认为12 -ar: 设置音频采样率,默认0 -ac: 设置音频通道数量 默认0 -bf: 设置连…

CTFHUB-web-信息泄漏

题目所在位置&#xff1a;技能树->web->信息泄漏 目录遍历 打开题目&#xff0c;我们进入的是这个页面 翻译过来就是 得到的信息就是&#xff1a;flag要在这些目录里面寻找&#xff0c;我们直接一个一个点开查看就行 发现得到一个flag.txt&#xff0c;点击打开得到flag …

开发K8S Operator

原理 大多数人使用Kubernetes的方式是使用原生资源(如pod、deployment、service等)部署应用程序。但是,也可以扩展Kubernetes的功能,从而添加满足特定需求的新业务逻辑,这就是Operator的作用。 作用 Operator应用,当我们在运用集群能力部署应用时,可能需要一组资源同时部…

鸿蒙Harmony应用开发—ArkTS声明式开发(容器组件:UIExtensionComponent (系统接口))

UIExtensionComponent用于支持在本页面内嵌入其他应用提供的UI。展示的内容在另外一个进程中运行&#xff0c;本应用并不参与其中的布局和渲染。 通常用于有进程隔离诉求的模块化开发场景。 说明&#xff1a; 该组件从API Version 10开始支持。后续版本如有新增内容&#xff0…

npm设置淘宝镜像地址

介绍 npm设置淘宝镜像 命令 npm config set registry https://registry.npmmirror.com/

【Java】List, Set, Queue, Map 区别?

目录 List, Set, Queue, Map 区别&#xff1f; Collection和Collections List ArrayList 和 Array区别&#xff1f; ArrayList与LinkedList区别? ArrayList 能添加null吗&#xff1f; ArrayList 插入和删除时间复杂度&#xff1f; LinkedList 插入和删除时间复杂度&…

k8s编排系统

Kubernetes&#xff08;简称K8s&#xff09;是一个开源的容器编排系统&#xff0c;由Google基于其内部的Borg项目开发&#xff0c;并于2014年正式对外发布。目前&#xff0c;Kubernetes已成为云原生计算基金会&#xff08;Cloud Native Computing Foundation, CNCF&#xff09;…

SDN网络简单认识(2)——南向接口

目录 一、概述 二、南向接口与南向协议 2.1 南向接口&#xff08;Southbound Interfaces&#xff09; 2.2 南向协议&#xff08;Southbound Protocols&#xff09; 2.3 区别与联系 三、常见南向协议 2.1 OpenFlow 2.2 OVSDB&#xff08;Open vSwitch Database Manageme…

计算机网络-数据链路层

一、认识以太网 "以太网" 不是⼀种具体的网络&#xff0c;而是一种技术标准; 既包含了数据链路层的内容, 也包含了⼀些物理 层的内容。 例如&#xff1a;规定了网络拓扑结构&#xff0c;访问控制方式&#xff0c;传输速率等; 例如&#xff1a;以太网中的网线必须使用…

vxe-table表格组件的使用已经query函数扩展

最近新项目使用vue3typescript开发后台管理系统&#xff0c;基本上展示内容一致表格的方式展示&#xff0c;所以使用vxe-table组件来开发&#xff0c;主要是为了方便使用工具栏&#xff0c;以及其他表格操作。 vxe-table 开发文档&#xff1a;https://vxetable.cn/#/table/sta…