rabbitMQ对消息不可达处理-备份交换机/备份队列

在这里插入图片描述

生产者发送消息,在消息不可达指定队列时,可以借助扇出类型交换机(之前写过消息回退的处理方案,扇出交换机处理的方案优先级高于消息回退)处理不可达消息,然后放置一个备份队列,供消费者处理不可达消息,同时也加一个报警队列,对于不能走正常流程的消息进行消费者告警。

先用方法配置类把各个组件声明:

在这里插入图片描述

package com.esint.configs;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class BackupConfig {/*** 定义组件常量名字*///交换机- 确认交换机额public static final String EXCHANGE_SURE = "sure.ex";//交换机- 备份交换机额public static final String EXCHANGE_BACK = "backup.ex";//队列- 正常确认队列public static final String QUEUE_SURE = "sure.queue";//队列-备份队列public static final String QUEUE_BACKUP = "backup.queue";//队列-警告队列public static final String QUEUE_WARN = "warn.queue";//routing-keypublic static final String ROUTING_KEY_SURE = "key1";/*** 声明组件*///确认交换机@Bean("sureExchange")public DirectExchange sureExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("alternate-exchange",EXCHANGE_BACK);return ExchangeBuilder.directExchange(EXCHANGE_SURE).durable(true).withArguments(arguments).build();}//备份交换机@Bean("backExchange")public FanoutExchange backExchange(){return new FanoutExchange(EXCHANGE_BACK);}//确认队列@Bean("sureQueue")public Queue sureQueue(){return QueueBuilder.durable(QUEUE_SURE).build();}//备份队列@Bean("backupQueue")public Queue backupQueue(){return QueueBuilder.durable(QUEUE_BACKUP).build();}//警告队列@Bean("warnQueue")public Queue warnQueue(){return QueueBuilder.durable(QUEUE_WARN).build();}/*** 绑定组件  确认队列 绑定 确认交换机 with key1*/@Beanpublic Binding sureQueueBindingSureExchange(@Qualifier("sureQueue") Queue sureQueue,@Qualifier("sureExchange")DirectExchange sureExchange){return BindingBuilder.bind(sureQueue).to(sureExchange).with(ROUTING_KEY_SURE);}/*** 绑定组件 备份队列 绑定 备份交换机*/@Beanpublic Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,@Qualifier("backExchange")FanoutExchange backExchange){return  BindingBuilder.bind(backupQueue).to(backExchange);}/*** 绑定组件 警告队列 绑定 备份交换机*/@Beanpublic Binding warnQueueBindingBackupExchange(@Qualifier("warnQueue") Queue warnQueue,@Qualifier("backExchange")FanoutExchange backExchange){return  BindingBuilder.bind(warnQueue).to(backExchange);}
}
生产者: 我们做出两个方法,一个可正常进行流程,一个routingKey异常无法路由到指定队列
package com.esint.controller;import com.esint.configs.BackupConfig;
import com.esint.constants.ResponseCode;
import com.esint.entity.ResponseEntity;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.kafka.clients.producer.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.concurrent.ExecutionException;@Api(value = "rabbitMQ-备份队列测试")
@RestController
@RequestMapping("/rabbit")
public class BackUpExchangeController {@Autowiredprivate RabbitTemplate rabbitTemplate;@ApiOperation(value = "routingKey正常测试",httpMethod = "GET",tags = {"去正常流程"})@ApiImplicitParams({@ApiImplicitParam(name="str",value="消息体",required = false,dataType = "String")})@ResponseBody@RequestMapping(value = "/test1", method = RequestMethod.GET)public ResponseEntity test1(String str ) {rabbitTemplate.convertAndSend(BackupConfig.EXCHANGE_SURE, BackupConfig.ROUTING_KEY_SURE,str);return new ResponseEntity(ResponseCode.SUCCESS).addData("routingKeyOk:"+str);}@ApiOperation(value = "routingKey非正常测试",httpMethod = "GET",tags = {"去备份-警告"})@ApiImplicitParams({@ApiImplicitParam(name="str",value="消息体",required = false,dataType = "String")})@ResponseBody@RequestMapping(value = "/test2", method = RequestMethod.GET)public ResponseEntity test2(String str ) {rabbitTemplate.convertAndSend(BackupConfig.EXCHANGE_SURE, BackupConfig.ROUTING_KEY_SURE+"wrong",str);return new ResponseEntity(ResponseCode.SUCCESS).addData("routingKeyWrong:"+str);}}
三个消费者分别监听正常队列 备份队列 警告队列

确认队列消费者:

package com.esint.consumer;import com.esint.configs.BackupConfig;
import com.esint.configs.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class BackUpConsumer01 {@RabbitListener(queues = BackupConfig.QUEUE_SURE)public void reveiver(Message message){log.info("正常消费者C1:" +  new String(message.getBody()),"UTF-8");}}

备份队列消费者:

package com.esint.consumer;import com.esint.configs.BackupConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class BackUpConsumer02 {@RabbitListener(queues = BackupConfig.QUEUE_BACKUP)public void reveiver(Message message){log.info("备份消费者C2:" +  new String(message.getBody()),"UTF-8");}
}

警告队列消费者:

package com.esint.consumer;import com.esint.configs.BackupConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class BackUpConsumer03 {@RabbitListener(queues = BackupConfig.QUEUE_WARN)public void reveiver(Message message){log.info("警告消费者C3:" +  new String(message.getBody()),"UTF-8");}}
测试:

1.正常流程测试
在这里插入图片描述
在这里插入图片描述

 com.esint.consumer.BackUpConsumer01      : 正常消费者C1:你好啊 正常队列

2.路由不达消息测试

在这里插入图片描述
在这里插入图片描述

com.esint.consumer.BackUpConsumer03      : 警告消费者C3:这个消息不可达 routing-key不对 它去哪里了?
com.esint.consumer.BackUpConsumer02      : 备份消费者C2:这个消息不可达 routing-key不对 它去哪里了?

测试达到预期结果!

在队列消息不可达时,备份交换机处理优先级高于消息回退处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/185148.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IP地址的地理位置:固定性与动态性的平衡

IP地址的地理位置是网络通信中一个重要的元素&#xff0c;常被用于定位设备和用户。然而&#xff0c;很多人好奇&#xff0c;IP地址的地理位置是否会发生变化&#xff1f;本文将深入讨论IP地址地理位置的固定性与动态性之间的平衡&#xff0c;以及造成这种变化的因素。 1. IP地…

DevEco Studio设置每次进入 是否自动进入上一次的项目

首先 我们第一次创建项目 并不是这个界面 如果我们想在这个界面创建项目的话 可以 点击左上角 File 下的 New 下的 Create Project 这里 我们可以点击左上角 File 选择下面的 Settings… 这个界面就有非常多的配置 然后 我们选择到下图操作的位置 这里有一个Reopen projects…

MySQL进阶知识:锁

目录 前言 全局锁 表级锁 表锁 元数据锁&#xff08;MDL&#xff09; 意向锁 行级锁 行锁 行锁演示 间隙锁/临界锁 演示 前言 MySQL中的锁&#xff0c;按照锁的粒度分&#xff0c;分为以下三类 全局锁&#xff1a;锁定数据库中的所有表。表级锁&#xff1a;每次操…

民安智库(第三方市场调查公司):专业调研引领某月饼生产商企业发展

在中国的传统佳节中&#xff0c;月饼是一种重要的节日食品&#xff0c;也是送礼的首选。某月饼生产商一直以来以其高品质、独特口味的月饼而备受消费者喜爱。为了更好地了解消费者对产品的满意度&#xff0c;该月饼生产商决定委托民安智库&#xff08;湖北知名满意度测评公司&a…

el-row错位问题解决

<el-row type"flex" style"flex-wrap:wrap">

yolov8 原木识别模型

一、模型介绍 模型基于 yolov8数据集采用SKU-110k&#xff0c;这数据集太大了十几个 G&#xff0c;所以只训练了 10 轮左右就拿来微调了原木数据微调&#xff1a;纯手工标注 200 张左右原木图片&#xff0c;训练 20 轮的效果 PS&#xff1a;因为训练时间比较长 Google 的 Cola…

关于pyqt5与moviepy到打包的坑点

1,pyqt5 关于pyqt5 designer.exe 的使用主要就是了解pyqt5右侧菜单栏的功能使用 打包后的文件&#xff0c;需要继承改类&#xff0c;进行图形指令交互 关于pyqt5&#xff0c;要了解信号&#xff0c;和槽点的相互关系。 我在pyqt5中使用moviepy的时候&#xff0c;需要用到异步…

[VNCTF 2023] web刷题记录

文章目录 象棋王子电子木鱼BabyGo 象棋王子 考点&#xff1a;前端js代码审计 直接查看js源码&#xff0c;搜一下alert 丢到控制台即可 电子木鱼 考点&#xff1a;整数溢出 main.rs我们分段分析 首先这段代码是一个基于Rust的web应用程序中的路由处理函数。它使用了Rust的异步…

SpringMVC多种类型数据响应

SpringMVC多种类型数据响应入门 1.概念 RequestMapping 作用&#xff1a;用于建立请求URL和处理请求方法之间的对应关系 位置&#xff1a; 类上&#xff0c;请求URL的第一级访问目录。此处不写的话&#xff0c;就相当于应用的根目录 方法上&#xff0c;请求URL的第二级访问目…

交叉熵损失函数(Cross-Entropy Loss Function)

交叉熵损失函数&#xff08;Cross-Entropy Loss Function&#xff09; 在处理机器学习或深度学习问题时&#xff0c;损失/成本函数用于在训练期间优化模型。目标几乎总是最小化损失函数。损失越低&#xff0c;模型越好。交叉熵损失是最重要的成本函数。它用于优化分类模型。对…

10.0 输入输出 I/O

IO操作主要是指使用Java程序完成输入&#xff08;Input&#xff09;、输出&#xff08;Output&#xff09;操作。所谓输入是指将文件内容以数据流的形式读取到内存中&#xff0c;输出是指通过Java程序将内存中的数据写入到文件中&#xff0c;输入、输出操作在实际开发中应用较为…

TiDB专题---2、TiDB整体架构和应用场景

上个章节我们讲解了TiDB的发展和特性&#xff0c;这节我们讲下TiDB具体的架构和应用场景。首先我们回顾下TiDB的优势。 TiDB的优势 与传统的单机数据库相比&#xff0c;TiDB 具有以下优势&#xff1a; 纯分布式架构&#xff0c;拥有良好的扩展性&#xff0c;支持弹性的扩缩容…

一、Linux系统概述和安装

目录 1、Linux系统概述 2、Linux发行版介绍 3、虚拟机软件介绍 4、VMware安装 5、Linux系统&#xff08;CentOS&#xff09;系统安装 6、登录并查看IP地址 7、Linux连接工具CRT使用 7.1 概述 7.2 CRT安装 7.3 使用步骤 7.4 文件上传 8、Linux的快照 8.1 作用 8.2…

Go 从编译到执行

一、Go运行编译简介 Go语言&#xff08;也称为Golang&#xff09;自从2009年由Google发布以来&#xff0c;已成为现代软件开发中不可或缺的一部分。设计者Rob Pike, Ken Thompson和Robert Griesemer致力于解决多核处理器、网络系统和大型代码库所引发的现实世界编程问题。我们…

kubeadm快速搭建k8s高可用集群

1.安装及优化 1.1基本环境配置 1.环境介绍 &#xff08;1&#xff09;.高可用集群规划 主机名ip地址说明k8s-master01192.168.2.96master节点k8s-master02192.168.2.97master节点k8s-master03192.168.2.98master节点k8s-node01192.168.2.99node节点k8s-node02192.168.2.100n…

【10张图带你搞清楚生成树协议】

STP协议分类 BPDU&#xff0c;网桥协议数据单元 STP路径开销&#xff0c;以链路带宽为准&#xff0c;两个标准&#xff0c;现在主要以NEW为准 在网络刚开始运行的阶段&#xff0c;所有交换机都会从所有端口发送BPDU&#xff0c;大家都认为自己是root&#xff0c;随着B…

基于YOLOv8深度学习的火焰烟雾检测系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、深度学习实战

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

增强静态数据的安全性

静态数据是数字数据的三种状态之一&#xff0c;它是指任何静止并包含在永久存储设备&#xff08;如硬盘驱动器和磁带&#xff09;或信息库&#xff08;如异地备份、数据库、档案等&#xff09;中的数字信息。 静态数据是指被动存储在数据库、文件服务器、端点、可移动存储设备…

多线程05

前言 前面我们说到了死锁以及线程可见性的问题 我们将线程可见性主要归结于是JVM自身的一个bug 一个线程写一个线程读 会将一直不变的变量优化到直接从寄存器中读取,而不是缓存等读取,因为这样我们就设置了使用volatile关键字使得用到这个变量的时候必须从内存中读取数据 死锁主…