RabbitMQ高级特性1

RabbitMQ高级特性1

  • 一.消息确认
    • 1.消息确认机制
    • 2.手动确认代码
      • 肯定确认
      • 否定确认1
      • 否定确认2
      • Spring中的代码
  • 二.持久性
    • 1.交换机持久化
    • 2.队列的持久化
    • 3.消息的持久化
    • 非持久化代码实现
    • 三方面都持久化,数据也会丢失
  • 三.发送方确认
    • 1.Confirm确认模式
    • 2.return返回模式
  • 四.总结
    • RabbitMQ保证消息可靠传输

一.消息确认

1.消息确认机制

  1. 自动确认:当autoAck等于true时, RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。自动确认模式适合对于消息可靠性要求不高的场景。

  2. 手动确认:当autoAck等于false时,RabbitMQ会等待消费者显式地调用Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移去消息。这种模式适合对消息可靠性要求比较高的场景.
    在这里插入图片描述

2.手动确认代码

肯定确认

Channel.basicAck(long deliveryTag, boolean multiple)

RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了。

参数说明
1)deliveryTag:消息的唯⼀标识,它是⼀个单调递增的64位的长整型值。 deliveryTag 是每个通道(Channel)独立维护的,所以在每个通道上都是唯⼀的。当消费者确认(ack)⼀条消息时,必须使用对应的通道上进行确认。

2)multiple:是否批量确认。在某些情况下,为了减少网络流量,可以对⼀系列连续的 deliveryTag 进行批量确认。值为true则会⼀次性把ack所有小于或等于指定deliveryTag的消息。值为false,则只确认当前指定deliveryTag的消息。

否定确认1

Channel.basicReject(long deliveryTag, boolean requeue)

RabbitMQ在2.0.0版本开始引⼊了 Basic.Reject 这个命令,消费者客户端可以调用
channel.basicReject方法来告诉RabbitMQ拒绝这个消息。

参数说明

1)deliveryTag:参考channel.basicAck。

2)requeue:表示拒绝后,这条消息如何处理。如果requeue参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下⼀个订阅的消费者。如果requeue参数设置为false,则RabbitMQ会把消息从队列中移除,而不会把它发送给新的消费者。

否定确认2

Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue)

Basic.Reject命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令。消费者客户端可以调用channel.basicNack方法来实现。

参数说明

前面的参数参考上述参数说明。

multiple的参数设置为true则接受deliveryTag编号之前所有未被当前消费者确认的消息,也就是批量处理未被确认的消息。

Spring中的代码

  1. AcknowledgeMode.NONE
    这种模式下,消息⼀旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ就会自动确认消息,从RabbitMQ队列中移除消息,如果消费者处理消息失败,消息可能会丢失。

ym配置

spring:application:name: rabbitmqdemorabbitmq:addresses: amqp://账号:密码@IP:端口号/虚拟机listener:simple:acknowledge-mode: none
  1. AcknowledgeMode.AUTO(默认)
    这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息,但是会一直尝试重发消息。

将yml配置中的 acknowledge-mode改成auto。

上述两种模式代码相同

Configuration


package com.example.rabbitmqdemo.config;import com.example.rabbitmqdemo.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfiguration {//消息确认//队列@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}//虚拟机@Bean("directExchange")public DirectExchange directExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();}//队列和虚拟机绑定@Bean("ackBinding")public Binding ackBinding(@Qualifier("ackQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("ack");}
}

** Constants**

package com.example.rabbitmqdemo.constant;
public class Constants {public static final String ACK_QUEUE = "ack.queue";public static final String ACK_EXCHANGE = "ack.exchange";
}

Controller

package com.example.rabbitmqdemo.controller;import com.example.rabbitmqdemo.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","ack is ok");return "ack is ok!";}
}

Listener

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑System.out.printf("接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//不做具体实现的消费者业务逻辑}}
  1. AcknowledgeMode.MANUAL
    手动确认模式下,消费者必须在成功处理消息后显式调用 basicAck 方法来确认消息。如果消息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息,这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理。

将yml配置中的 acknowledge-mode改成manual。

Listener

package com.example.rabbitmqdemo.listener;import com.example.rabbitmqdemo.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.io.UnsupportedEncodingException;/*** Created with IntelliJ IDEA.* Description:* User: hp* Date: 2025-04-03* Time: 9:26*/
@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws IOException {//消费者逻辑System.out.printf("接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//不做具体实现的消费者业务逻辑try {//int sum = 3 / 0;//确认消息(肯定)channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e) {//否定确认//最后一个参数为true,则发生异常重新入队,false,为不再入队channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}}}

二.持久性

1.交换机持久化

交换器的持久化是通过在声明交换机时是将durable参数置为true实现的。

相当于将交换机的属性在服务器内部保存,当MQ的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机,交换机会自动建立,相当于⼀直存在。

如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失,对⼀个长期使用的交换器来说,建议将其置为持久化的。

设置交换机的持久化

在这里插入图片描述

2.队列的持久化

队列的持久化是通过在声明队列时将 durable 参数置为true实现的。

如果队列不设置持久化,那么在RabbitMQ服务重启之后,该队列就会被删掉,此时数据也会丢失。(队列没有了,消息也无处可存了)

队列的持久化能保证该队列本⾝的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将消息设置为持久化。

咱们前面用的创建队列的方式都是持久化的。

队列持久化
在这里插入图片描述
队列非持久化

在这里插入图片描述

3.消息的持久化

消息实现持久化,需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是MessageDeliveryMode.PERSISTENT

设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧存在。如果只设置队列持久化,重启之后消息会丢失。

如果只设置消息的持久化,重启之后队列消失,继而消息也丢失。所以单单设置消息持久化而不设置队列的持久化显得毫无意义

非持久化代码实现

交换机、队列和绑定

//非持久化队列@Bean("presQueue")public Queue presQueue() {return QueueBuilder.nonDurable(Constants.PRES_QUEUE).build();}//非持久化交换机@Bean("presExchagne")public DirectExchange presExchange() {return ExchangeBuilder.directExchange(Constants.PRES_EXCHANGE).durable(false).build();}@Bean("presBinding")public Binding presBinding(@Qualifier("presQueue") Queue queue,@Qualifier("presExchagne") Exchange exchange) {//如果参数传递的是Exchange类型而不是DirectExchang类型就需要使用noargs作为收尾return BindingBuilder.bind(queue).to(exchange).with("pres").noargs();}

Producer

@RequestMapping("/pres")public String pres() {Message message  = new Message("Presistent test...".getBytes(),new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE,"pres",message);return "pres is ok!";}

RabbitMQ服务器的虚拟机和队列

在这里插入图片描述
在这里插入图片描述

三方面都持久化,数据也会丢失

  1. 从消费者来说,如果在订阅消费队列时将autoAck参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据居丢失。这种情况很好解决,将autoAck参数设置为false,并进行手动确认。

  2. 在持久化的消息正确存入RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存⼊磁盘中。RabbitMQ并不会为每条消息都进行同步存盘(调用内核的fsync方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMQ服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。

三.发送方确认

1.Confirm确认模式

Producer在发送消息的时候,对发送端设置⼀个ConfirmCallback的监听,无论消息是否到达Exchange,这个监听都会被执行,如果Exchange成功收到,ACK( Acknowledge character ,确认字符)为true,如果没收到消息,ACK就为false。

yml配置

spring:application:name: rabbitmqdemorabbitmq:addresses: amqp://账号:Miami@IP:端口号/虚拟机listener:simple:#acknowledge-mode: none#acknowledge-mode: autoacknowledge-mode: manualpublisher-confirm-type: correlated #消息发送确认

Configuration

package com.example.rabbitmqdemo.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调函数rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack) {System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n",correlationData == null ? null : correlationData.getId(),cause);//相应的业务处理}}});return rabbitTemplate;}
}

Producer

@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");rabbitTemplateConfig.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test...",correlationData);return "confirm is ok!";}

2.return返回模式

Configuration

@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调函数rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack) {System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n",correlationData == null ? null : correlationData.getId(),cause);//相应的业务处理}}});//return模式rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("消息退回: " + returnedMessage);}});return rabbitTemplate;}

四.总结

RabbitMQ保证消息可靠传输

在这里插入图片描述

Producer -> Broker:发送方确认

  1. Producer -> Exchange :Confirm模式(网络问题)
  2. Exchange -> Queue : return模式(代码或者配置层错误,导致消息路由失败)
  3. 队列移除:死信等

Broker:持久化(RabbitMQ服务器宕机导致消息丢失)

  1. 交换机持久化
  2. 队列持久化
  3. 消息持久化

Broker -> Consumer 消息确认方式(消费者未来得及消费信息,就宕机了)

  1. 自动确认
  2. 手动确认

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/75551.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java网络编程NIO

一、NIO是什么? NIO可以说是比BIO更强大的IO,可以设置非阻塞模式(通过事件的方式监听数据的到来) BIO是基于socket通信,一个线程对应一个socket连接,读取数据要一直等待 NIO是基于channel通信,一个线程管…

【动态规划】二分优化最长上升子序列

最长上升子序列 II 题解 题目传送门:AcWing 896. 最长上升子序列 II 一、题目描述 给定一个长度为 N 的数列,求数值严格单调递增的子序列的长度最长是多少。 输入格式: 第一行包含整数 N第二行包含 N 个整数,表示完整序列 输…

Dify接口api对接,流式接收流式返回(.net)

试了好多种方法除了Console.WriteLine()能打印出来,试了好些方法都不行,不是报错就是打印只有一行,要么就是接收完才返回...下面代码实现调用api接收流式数据,并进行流式返回给前端: using Furion.HttpRemote; using …

19-元素显示模式及浮动(CSS3)

知识目标 掌握标准文档流的解析规则掌握元素的显示模式掌握元素浮动属性语法与使用掌握浮动塌陷解决方法 1. 标准文档流 2. 元素显示模式 元素显示模式就是元素&#xff08;标签&#xff09;以什么方式进行显示&#xff0c;比如<div>独占一行&#xff0c;一行可以放多…

HTML jQuery 项目 PDF 批注插件库在线版 API 示例教程

本文章介绍 HTML && jQuery Web项目中 PDF 批注插件库 ElasticPDF 在线版 API 示例教程&#xff0c;API 包含 ① 导出批注后PDF数据&#xff1b;② 导出纯批注 json 数据&#xff1b;③ 加载旧批注&#xff1b;④ 切换文档&#xff1b;⑤ 切换用户&#xff1b;⑥ 清空批…

CATIA装配体全自动存储解决方案开发实战——基于递归算法的产品结构树批量处理技术

一、功能定位与技术架构 本工具针对CATIA V5装配体文件管理场景&#xff0c;实现了一套全自动递归存储系统&#xff0c;主要功能包括&#xff1a; ​智能路径选择&#xff1a;通过Tkinter目录对话框实现可视化路径选择​产品结构递归解析&#xff1a;深度优先遍历装配体中的子…

C#:接口(interface)

目录 接口的核心是什么&#xff1f; 1. 什么是接口&#xff08;Interface&#xff09;&#xff0c;为什么要用它&#xff1f; 2. 如何定义和使用接口&#xff1f; 3.什么是引用接口&#xff1f; 如何“引用接口”&#xff1f; “引用接口”的关键点 4. 接口与抽象类的区…

基于卷积神经网络CNN实现电力负荷多变量时序预测(PyTorch版)

前言 系列专栏:【深度学习:算法项目实战】✨︎ 涉及医疗健康、财经金融、商业零售、食品饮料、运动健身、交通运输、环境科学、社交媒体以及文本和图像处理等诸多领域,讨论了各种复杂的深度神经网络思想,如卷积神经网络、循环神经网络、生成对抗网络、门控循环单元、长短期记…

关于inode,dentry结合软链接及硬链接的实验

一、背景 在之前的博客 缺页异常导致的iowait打印出相关文件的绝对路径-CSDN博客 里 2.2.3 一节里&#xff0c;我们讲到了file&#xff0c;fd&#xff0c;inode&#xff0c;dentry&#xff0c;super_block这几个概念&#xff0c;在这篇博客里&#xff0c;我们针对inode和dentr…

游戏引擎学习第201天

仓库:https://gitee.com/mrxiao_com/2d_game_5 回顾之前的内容&#xff0c;并遇到了一次一阶异常&#xff08;First-Chance Exception&#xff09;。 欢迎来到新一期的开发过程&#xff0c;我们目前正在编写调试接口代码。 当前&#xff0c;我们已经在布局系统上进行了一些工…

计算机视觉算法实战——基于YOLOv8的行人流量统计系统

✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ ​​​ ​​​​​​​​​ ​​ 引言:智能客流分析的市场需求 在零售、交通、安防等领域,准确的行人流量统计对于商业决策、公共安全管理…

Redis是什么?架构是怎么样的?

目录 前言 一,Redis架构 1.1 本地缓存 1.2 远程缓存 二,强大的Redis优点 2.1 支持多种数据类型 2.2 内存过期策略 2.3 内存淘汰策略 2.4 持久化 三,Redis是什么 前言 我是一个程序员,维护了一个商品服务,它的背后直连Mysql数据库,假设商品服务对外每秒需要提供1万次…

蓝桥杯真题——传送阵

原题连接&#xff1a;蓝桥杯2024年第十五届省赛真题-传送阵 - C语言网 知识点&#xff1a;并查集 题目描述 小蓝在环球旅行时来到了一座古代遗迹&#xff0c;里面并排放置了 n 个传送阵&#xff0c;进入第 i 个传送阵会被传送到第 ai 个传送阵前&#xff0c;并且可以随时选择…

彩虹表攻击

1. 引言 密码安全一直是信息安全领域的重要课题。攻击者可以利用**暴力破解(Brute-Force Attack)和字典攻击(Dictionary Attack)等方式尝试破解密码。然而,计算机性能的提升使得这些方法的效率不断提高,其中彩虹表攻击(Rainbow Table Attack)**是一种极具威胁性的密码…

Vue2 监听器 watcher

文章目录 前言监听器的作用&#xff1a;工作流程&#xff1a;基本用法1. 简单监听2. 对象形式配置 使用场景1. 执行异步操作2. 监听路由变化3. 复杂对象/数组变化 关键配置项与计算属性的区别动态添加监听器注意事项 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&a…

Linux系统程序设计:从入门到高级Day02

这一篇 我带大家复习一下&#xff0c;C语言中的文件 那一部分 大家注意 这里的图并非原创 是当时我老师的图片 本片作用主要是 后续会有文件相关操作&#xff0c;这篇帮大家复习C语言文件中的内容 有助于大家后面的理解。 文章中代码大多是图片格式&#xff0c;是因为这是我…

N元语言模型的时间和空间复杂度计算

对于N元语言模型&#xff0c;时间复杂度是O(V ^ {N-1})&#xff0c;空间复杂度是O(V ^ {N})&#xff0c;N是词汇表的大小。 空间复杂度&#xff1a;存储所有可能的N-1元组及其对应的词的频次需要大量的存储空间。例如&#xff0c;对于一个三元模型&#xff08;N3&#xff09;&…

Tmux 核心操作速查指南

Tmux 最常用操作笔记 1. 基本概念 会话&#xff08;Session&#xff09;&#xff1a;一个tmux会话可以包含多个窗口&#xff0c;适合长期任务管理。窗口&#xff08;Window&#xff09;&#xff1a;每个窗口是一个独立的终端界面&#xff0c;可包含多个面板。面板&#xff08…

哈希表系列一>两数之和

目录 题目&#xff1a;方法&#xff1a;暴力代码&#xff1a;优化后代码&#xff1a; 题目&#xff1a; 链接: link 方法&#xff1a; 暴力代码&#xff1a; public int[] twoSum(int[] nums, int target) {解法一&#xff1a;暴力解法&#xff1a;int n nums.length;for(int…

端到端机器学习流水线(MLflow跟踪实验)

目录 端到端机器学习流水线(MLflow跟踪实验)1. 引言2. 项目背景与意义2.1 端到端机器学习流水线的重要性2.2 MLflow的作用2.3 工业级数据处理需求3. 数据集生成与介绍3.1 数据集构成3.2 数据生成方法4. 机器学习流水线与MLflow跟踪4.1 端到端机器学习流水线4.2 MLflow跟踪实验…