RabbitMQ 消息丢失解决 (高级发布确认、消息回退与重发、备份交换机)

目录

一、发布确认SpringBoot版本

确认机制图例:

代码实战:

代码架构图:

1.1交换机的发布确认

添加配置类

消息消费者

消息生产者发布消息后的回调接口

测试:

 1.2回退消息并重发(队列的发布确认)

修改回调接口

生产者:

测试:

二、备份交换机

实战

生产者

报警消费者:

测试:


一、发布确认SpringBoot版本

        首先发布消息后进行备份在缓存里,如果消息成功发布确认到交换机,则从缓存里删除该消息,如果没有成功发布,则设置一个定时任务,重新从缓存里获取消息发布到交换机,直到成功发布到交换机。

确认机制图例:

代码实战:

一个交换机:confirm.exchange,一个队列:confirm.queue,一个消费者:confirm.consumer

其中交换机类型时 direct,与队列关联的 routingKey 是 key1

代码架构图:

1.1交换机的发布确认

配置文件中添加:

server:port: 8888
spring:rabbitmq:host: 192.168.163.132port: 5672username: 2252631565password: 2252631565
#    高级发布确认 发布消息成功后将会触发回调方法publisher-confirm-type: correlated
  • NONE 值是禁用发布确认模式,是默认值
  • CORRELATED 值是发布消息成功到交换器后会触发回调方法
  • SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

添加配置类

声明交换机和队列,并且将交换机和队列进行绑定:

@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE="confirm.exchange";public static final String CONFIRM_QUEUE="confirm.queue";public static final String ROUTING_KEY="key1";@Beanpublic DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE,false,false);}@Beanpublic Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE).build();}@Beanpublic Binding EAndQBind(@Qualifier("confirmExchange") DirectExchange exchange,@Qualifier("confirmQueue")Queue queue){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}
}

消息生产者

也可以说是 Controller 层,在这里发送两条消息给两个交换机,其中一个交换机是我们设置好的,另一个交换机不存在;这样就可以清晰看出交换机应答效果。

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//高级发布确认模式@GetMapping("/sendConfirmMsg/{message}")public void sendConfirmMsg(@PathVariable String message){log.info("发送一条时长为的消息给第一个队列内容是:{}",new Date().toString(),message);CorrelationData correlationData=new CorrelationData("1");correlationData.setReturnedMessage(new org.springframework.amqp.core.Message(message.getBytes()));rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY,message,correlationData);//向一个不存在的交换机发送消息log.info("发送一条时长为的消息给第一个队列内容是:{}",new Date().toString(),message);CorrelationData correlationData2=new CorrelationData("2");correlationData2.setReturnedMessage(new org.springframework.amqp.core.Message(message.getBytes()));rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE+123,ConfirmConfig.ROUTING_KEY,message,correlationData2);}}

消息消费者

监听 confirm.queue 队列

@Slf4j
@Component
public class ConfirmLetterQueue {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)public void confirmConsumer(Message message, Channel channel){log.info("收到了消息:{}",new String(message.getBody()));}
}

消息生产者发布消息后的回调接口

只要生产者发布消息,交换机不管是否收到消息,都会调用该类的 confirm 方法

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){//注入rabbitTemplate.setConfirmCallback(this::confirm);}/*交换机确认回调1.交换机收到了消息 触发回调1.1 correlationData(我们在发消息的时候自己创建的) 消息的ID以及消息内容1.2 ack 交换机收到消息 true1.3 cause 交换机收到消息的原因 null---------------------------------2.交换机未收到消息 触发回调2.1 correlationData 消息的ID以及消息内容2.2 ack 交换机未收到消息 false2.3 cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){log.info("发送消息到交换机成功!消息体为:{}",new String(correlationData.getReturned().getMessage().getBody()));}else {log.info("发送消息到交换机失败!原因为:{}",cause.toString());}}
}

测试:

 效果很明显,我们配置的交换机成功收到消息并转发给队列;不存在的交换机没有接受到消息并作出反应。

 1.2回退消息并重发(队列的发布确认)

在配置文件中开启消息回退功能

server:port: 8888
spring:rabbitmq:host: 192.168.163.133port: 5672username: 2252631565password: 2252631565
#    高级发布确认 发布消息成功后将会触发回调方法publisher-confirm-type: correlated#    消息回退 当消息未路由至队列时触发publisher-returns: true

修改回调接口

实现 RabbitTemplate.ReturnsCallback 接口,并实现方法

@Slf4j
@Component
public class MyCallBack  implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){//注入rabbitTemplate.setConfirmCallback(this::confirm);rabbitTemplate.setReturnsCallback(this::returnedMessage);}/*交换机确认回调1.交换机收到了消息 触发回调1.1 correlationData(我们在发消息的时候自己创建的) 消息的ID以及消息内容1.2 ack 交换机收到消息 true1.3 cause 交换机收到消息的原因 null---------------------------------2.交换机未收到消息 触发回调2.1 correlationData 消息的ID以及消息内容2.2 ack 交换机未收到消息 false2.3 cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){log.info("时间:{}发送消息到交换机成功!",new Date());}else {log.info("发送消息到交换机失败!原因为:{}",cause.toString());}}//当消息未路由到队列时触发 只有失败时才触发 若消息发送至延迟队列则一定会触发回退 记得根据交换机名称排除延迟队列@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("消息:'{}',被交换机:{}回退,回退原因为:{},routingKey为:{}",new String(returned.getMessage().getBody()),returned.getExchange(),returned.getReplyText(),returned.getRoutingKey());//10s后消息重发try {Thread.sleep(10000);log.info("时间:{},生产者重新发消息",new Date());rabbitTemplate.convertAndSend(returned.getExchange(),ConfirmConfig.ROUTING_KEY,new String(returned.getMessage().getBody()));}catch (InterruptedException e) {throw new RuntimeException(e);}}
}

生产者:

向交换机中发送消息,指定错误的routingkey,触发队列回退消息并重发消息。

    //高级发布确认模式@GetMapping("/sendConfirmMsg/{message}")public void sendConfirmMsg(@PathVariable String message){//向一个不存在的队列发送消息log.info("时间:{}生产者发送一条的消息给第一个队列内容是:{}",new Date().toString(),message);CorrelationData correlationData2=new CorrelationData("2");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY+123,message,correlationData2);}

测试:

回退未进入队列的消息并重新发送消息。 

二、备份交换机

        什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout (扇出),这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警并可以重发消息。

实战

需要一个备份交换机 backup.exchange,类型为 fanout,该交换机发送消息到队列 backup.queue 和 warning.queue。

 修改高级确认发布 配置类

@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE="confirm.exchange";public static final String CONFIRM_QUEUE="confirm.queue";public static final String ROUTING_KEY="key1";//备份交换机public static final String BACKUP_EXCHANGE="backup.exchange";//备份队列public static final String BACKUP_QUEUE="backup.queue";//报警队列public static final String WARNING_QUEUE="warning.queue";@Beanpublic DirectExchange confirmExchange(){//绑定确认交换机与备份交换机Map<String,Object> argument=new HashMap<>();argument.put("alternate-exchange",BACKUP_EXCHANGE);return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).withArguments(argument).build();}//备份交换机@Beanpublic FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE);}@Beanpublic Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE).build();}//备份队列@Beanpublic Queue backupQueue(){return QueueBuilder.durable(BACKUP_QUEUE).build();}//警告队列@Beanpublic Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE).build();}@Beanpublic Binding EAndQBind(@Qualifier("confirmExchange") DirectExchange exchange,@Qualifier("confirmQueue")Queue queue){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}//绑定备份交换机与两个队列@Beanpublic Binding BAndBBing(@Qualifier("backupExchange") FanoutExchange exchange,@Qualifier("backupQueue")Queue queue){return BindingBuilder.bind(queue).to(exchange);}@Beanpublic Binding BAndWBing(@Qualifier("backupExchange") FanoutExchange exchange,@Qualifier("warningQueue")Queue queue){return BindingBuilder.bind(queue).to(exchange);}
}

生产者

        生产者发送两条消息 一个配置正确的路由,另一个是错误的路由。预期目标是正确路由正常接收消息,错误路由传输的信息由警告队列接收。

    //高级发布确认模式@GetMapping("/sendConfirmMsg/{message}")public void sendConfirmMsg(@PathVariable String message){log.info("时间:{}生产者发送两条消息队列内容是:{}",new Date().toString(),message);CorrelationData correlationData=new CorrelationData("1");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY,message,correlationData);//向一个不存在的队列发送消息CorrelationData correlationData2=new CorrelationData("2");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY+123,message,correlationData2);}

报警消费者:

接收不可路由的消息

/*** 报警消费者*/
@Slf4j
@Component
public class WarningConsumer {//监听报警消息@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE)public void receiveWarningMsg(Message message){String msg=new String(message.getBody());log.info("报警发现不可路由消息:{},重发消息",msg);}
}

测试:

         生产者发送两条消息 一个配置正确的路由,另一个是错误的路由。预期目标是正确路由正常接收消息,错误路由传输的信息由警告队列接收。

        在此案例中,也设置了消息回退的配置,但是没有触发消息回退。由此得出:备份交换机的优先级更高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/146574.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

完整版解答!2023年数维杯国际大学生数学建模挑战赛B题

B题完整版全部5问&#xff0c;问题解答、代码&#xff0c;完整论文、模型的建立和求解、各种图表代码已更新&#xff01; 大家好&#xff0c;目前已完成2023数维杯国际赛B题全部5问的代码和完整论文已更新&#xff0c;部分展示如下&#xff1a; 部分解答图表 问题分析 B题前三…

TensorFlow案例学习:图片风格迁移

准备 官方教程&#xff1a; 任意风格的快速风格转换 模型下载地址&#xff1a; https://tfhub.dev/google/magenta/arbitrary-image-stylization-v1-256/2 学习 加载要处理的内容图片和风格图片 # 用于将图像裁剪为方形def crop_center(image):# 图片原始形状shape image…

计算机视觉:人脸识别与检测

目录 前言 识别检测方法 本文方法 项目解析 完整代码及效果展示 前言 人脸识别作为一种生物特征识别技术&#xff0c;具有非侵扰性、非接触性、友好性和便捷性等优点。人脸识别通用的流程主要包括人脸检测、人脸裁剪、人脸校正、特征提取和人脸识别。人脸检测是从获取的图…

拼多多百亿补贴商品详情API接口系列

拼多多API接口是拼多多网提供的一种应用程序接口&#xff0c;允许开发者通过程序访问拼多多网站的数据和功能。通过拼多多API接口&#xff0c;开发者可以开发各种应用程序&#xff0c;如店铺管理工具、数据分析工具、购物比价工具等。在本章中&#xff0c;我们将介绍拼多多API接…

wpf devexpress 开始点

此教程示范如何创建registration form和DevExpress WPF Data Editors 开始点 此项目源码 这个解决方案包含几个项目-每一个项目对应一个教程 RegistrationForm.BaseProject项目是基于工作的解决方案。项目包含三个视图&#xff1a;MainView&#xff0c;RegistraionView&…

安装最新版IntelliJ IDEA来开发Java应用程序

安装最新版IntelliJ IDEA来开发Java应用程序 Install the Latest Version of IntelliJ IDEA to Develop Java Applications 本文简要介绍如何安装配置JetBrains IntelliJ IDEA集成开发环境&#xff0c;从而开发Java应用程序&#xff1b;文中侧重实际操作和编程步骤&#xff0…

医院绩效考核系统源码 医院绩效考核系统方案

医院绩效考核系统源码 医院绩效考核系统是现代医院管理的重要方法和科学的管理工具。良好的绩效管理&#xff0c;有助于带动全院职工的工作积极性&#xff0c;有助于提高工作效率、提高医疗质量、改善服务水平、降低运营成本&#xff0c;全面提升医院的精细化管理水平。 医院绩…

ubuntu 23.04从源码编译安装rocm运行tensorflow-rocm

因为ubuntu22.04的RDP不支持声音转发&#xff0c;所以下载了ubuntu23.04.但官方的rocm二进制包最高只支持ubuntu22.04&#xff0c;不支持ubuntu 23.04&#xff0c;只能自己从源码编译虽然有网友告诉我可以用docker运行rocm。但是我已经研究了好几天&#xff0c;沉没成本太多&am…

【Linux】C文件系统详解(二)——什么是fd文件描述符以及理解“一切皆文件“

文章目录 fd-文件描述符如何深度理解"一切皆文件"**我们使用OS的本质:**FILEFILE是什么?谁提供的?和我们刚刚讲的内核的struct有关系吗FILE是一个结构体.该结构体内部一定要有以下字段:FILE是C语言标准库提供的.FILE和我们刚刚讲的内核的struct没有关系,最多就是上…

【STM32】串口和printf

1.数据通信的基本知识 1.串行/并行通信 2.单工/半双工/全双工通信 类似于【广播 对讲 电话】 不是有两根线就是全双工&#xff0c;而是输入和输出都有对应的数据线。 3.同步/异步通信 区分同步/异步通信的根本&#xff1a;判断是否有时钟信号&#xff08;时钟线&#xff09;。…

ai剪辑矩阵系统源码+无人直播系统源码技术开发

开发AI剪辑矩阵系统和无人直播系统源码&#xff0c;需要以下步骤&#xff1a; 1. 市场调研&#xff1a;了解市场需求和竞品情况&#xff0c;明确系统的功能和特点。 2. 系统设计&#xff1a;设计系统的整体架构和功能模块&#xff0c;包括视频剪辑、直播推流、实时互动、数据分…

【LeetCode刷题-滑动窗口】-- 795.区间子数组个数

795.区间子数组个数 class Solution {public int numSubarrayBoundedMax(int[] nums, int left, int right) {return lessEqualsThan(nums,right) - lessEqualsThan(nums,left - 1);}private int lessEqualsThan(int[] nums,int k){int len nums.length;int res 0,left 0,ri…

基于Genio 700 (MT8390)芯片的AR智能眼镜方案

AR眼镜是一种具有前所未有发展机遇的设备&#xff0c;无论是显示效果、体积还是功能都有明显的提升。AR技术因其智能、实时、三维、多重交互和开放世界的特点备受关注。 AR眼镜集成了AR技术、语音识别、智能控制等多项高科技功能&#xff0c;可以帮助用户实现更加便捷、高效、个…

一种基于NB‑IOT的粮库挡粮门异动监测装置

一种基于NB‑IOT的粮库挡粮门异动监测装置,包括若干个NB‑IOT开门监测装置、物联网后台管理系统、NB‑IOT低功耗广域网络和用户访问终端;各个NB‑IOT开门监测装置通过NB‑IOT低功耗广域网络与物联网后台管理系统连接,物联网后台管理系统与用户访问终端连接。 我国以往粮食收储…

将 ONLYOFFICE 文档编辑器与 Node.js 应用集成

我们来了解下&#xff0c;如何将 ONLYOFFICE 文档编辑器与您的 Web 应用集成。 许多 Web 应用都可以从文档编辑功能中获益。但是要从头开始创建这个功能&#xff0c;需要花费大量时间和精力。幸运的是&#xff0c;您可以使用 ONLYOFFICE——这是一款开源办公套件&#xff0c;可…

一文总结MySQL的指令是如何工作的

当你输入一条MySQL指令时候有没有想过会发生什么&#xff1f; 建立连接 首先你得先连到数据库上才行&#xff0c;这又分为长连接和短链接&#xff0c;短链接就是你查询一次就断开连接&#xff0c;长连接是你可以多次查询直到主动断开连接&#xff08;也可能被杀死进程&#x…

C语言——冒泡排序

一、冒泡排序是什么 冒泡排序&#xff1a; 冒泡排序(Bubble Sort)&#xff0c;又被称为气泡排序或泡沫排序。升序时&#xff1a;它会遍历若干次需要排序的数列&#xff0c;每次遍历时&#xff0c;它都会从前往后依次的比较相邻两个数的大小&#xff1b;如果前者比后者大&#x…

三十二、W5100S/W5500+RP2040树莓派Pico<UPnP示例>

文章目录 1 前言2 简介2 .1 什么是UPnP&#xff1f;2.2 UPnP的优点2.3 UPnP数据交互原理2.4 UPnP应用场景 3 WIZnet以太网芯片4 UPnP示例概述以及使用4.1 流程图4.2 准备工作核心4.3 连接方式4.4 主要代码概述4.5 结果演示 5 注意事项6 相关链接 1 前言 随着智能家居、物联网等…

centos虚拟机无法接受消息(防火墙)

1.利用wireshark抓包&#xff0c; 发现发送信息后&#xff0c; 虚拟机返回 :host administratively prohibited 2.发现是centos虚拟机未关闭防火墙 &#xff08;关闭后可正常接收消息&#xff09;

rabbitMQ的Topic模式的生产者与消费者使用案例

topic模式 RoutingKey 按照英文单词点号多拼接规则填充。其中消费者匹配规则时候 * 代表一个单词&#xff0c;#表示多个单词 消费者C1的RoutingKey 规则按照*.orange.* 匹配 绑定队列Q1 package com.esint.rabbitmq.work05;import com.esint.rabbitmq.RabbitMQUtils; import …