JAVA面试题分享一百九十九:RabbitMQ 发布确认高级

目录

一、前言

二、发布确认SpringBoot版本

介绍

实战

添加配置类

消息生产者

消息消费者

消息生产者发布消息后的回调接口

三、回退消息

介绍

四、实战

修改配置文件

修改回调接口

五、备份交换机

介绍

实战

修改高级确认发布 配置类

报警消费者


一、前言

在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?

二、发布确认SpringBoot版本

简单的发布确认机制在应答与签收已经介绍,本内容将介绍整合了 SpringBoot 的发布确认机制。

介绍

首先发布消息后进行备份在缓存里,如果消息成功发布确认到交换机,则从缓存里删除该消息,如果没有成功发布,则设置一个定时任务,重新从缓存里获取消息发布到交换机,直到成功发布到交换机。

确认机制图例:

image

实战

一个交换机:confirm.exchange,一个队列:confirm.queue,一个消费者:confirm.consumer

其中交换机类型时 direct,与队列关联的 routingKey 是 key1

代码架构图:

image

在配置文件当中需要添加:

server:port: 8888
spring:rabbitmq:host: 192.168.91.200port: 5672username: rootpassword: 123publisher-confirm-type: correlated
  • NONE 值是禁用发布确认模式,是默认值
  • CORRELATED 值是发布消息成功到交换器后会触发回调方法
  • SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

添加配置类

声明交换机和队列,并且将交换机和队列进行绑定

/*** @version 1.0* desc:配置类,发布确认(高级)*/
@Configuration
public class ConfirmConfig {//交换机public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";//队列public static final String CONFIRM_QUEUE_NAME = "confirm_queue";//routingKeypublic static final String CONFIRM_ROUTING_KEY = "key1";//声明交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}//声明队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//绑定@Beanpublic Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}
}

消息生产者

也可以说是 Controller 层

/*** @version 1.0* desc:高级消息发布 消息生产者*/
@Slf4j
@RequestMapping("/confirm")
@RestController
public class ProductController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息,测试确认@GetMapping("/sendMessage/{message}")public void sendMessage(@PathVariable("message") String message){//指定消息 id 为 1CorrelationData correlationData1 = new CorrelationData("1");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,message+"key1",correlationData1);log.info("发送消息内容:{}",message+"key1");//指定消息 id 为 2CorrelationData correlationData2 = new CorrelationData("2");String CONFIRM_ROUTING_KEY = "key2";rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,CONFIRM_ROUTING_KEY,message+"key2",correlationData2);log.info("发送消息内容:{}",message+"key2");}}

消息消费者

监听 confirm.queue 队列

/*** @version 1.0* desc:接受消息*/
@Slf4j
@Component
public class Consumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void receiveConfirmMessage(Message message){String msg = new String(message.getBody());log.info("接受到的队列confirm.queue消息:{}",msg);}
}

消息生产者发布消息后的回调接口

只要生产者发布消息,交换机不管是否收到消息,都会调用该类的 confirm 方法

/*** @version 1.0* desc:回调接口*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {//注入@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){//注入rabbitTemplate.setConfirmCallback(this);}/*** 交换机不管是否收到消息的一个回调方法* 1. 发消息 交换机接收到了 回调* @param correlationData  保存回调信息的Id及相关信息* @param ack              交换机收到消息 为true* @param cause            未收到消息的原因**/@Overridepublic void confirm(CorrelationData correlationData, boolean ack,String cause) {String id = correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到了ID为:{}的消息",id);}else {log.info("交换机还未收到ID为:{}的消息,由于原因:{}",id,cause);}}
}

http://localhost:8888/confirm/sendMessage/大家好啊

结果分析:

image

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为 "key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。

丢弃的消息交换机是不知道的,需要解决告诉生产者消息传送失败。

三、回退消息

介绍

获取回退的消息,首先在配置文件开启该功能,然后需要自定义类实现 RabbitTemplate.ReturnsCallback 接口,并且初始化时,使用该自定义类作为回退消息的处理类,同时开启 Mandatory,设置为 true

在启动开启 Mandatory,或者在代码里手动开启 Mandatory 参数,或者都开启😸

配置类文件开启:

# 新版
spring:rabbitmq:template:mandatory: true# 旧版
spring:rabbitmq:mandatory: true

代码中开启:

rabbitTemplate.setMandatory(true);

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

四、实战

修改配置文件

spring:rabbitmq:host: 192.168.91.200port: 5672username: rootpassword: 123publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true
server:port: 8888

修改回调接口

实现 RabbitTemplate.ReturnsCallback 接口,并实现方法

/*** @version 1.0* desc:回调接口*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {//注入@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){//注入rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}/*** 交换机不管是否收到消息的一个回调方法* 1. 发消息 交换机接收到了 回调* @param correlationData  保存回调信息的Id及相关信息* @param ack              交换机收到消息 为true* @param cause            未收到消息的原因**/@Overridepublic void confirm(CorrelationData correlationData, boolean ack,String cause) {String id = correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到了ID为:{}的消息",id);}else {log.info("交换机还未收到ID为:{}的消息,由于原因:{}",id,cause);}}//可以在当消息传递过程中不可达目的地时将消息返回给生产者//只有不可达目的地的时候 才进行回退/*** 当消息无法路由的时候的回调方法*  message      消息*  replyCode    编码*  replyText    退回原因*  exchange     从哪个交换机退回*  routingKey   通过哪个路由 key 退回*/@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("消息{},被交换机{}退回,退回原因:{},路由key:{}",new String(returned.getMessage().getBody()),returned.getExchange(),returned.getReplyText(),returned.getRoutingKey());}
}

打开浏览器访问地址:http://localhost:8888/confirm/sendMessage/大家好啊

image

五、备份交换机

介绍

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。 在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。

什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

实战

需要一个备份交换机 backup.exchange,类型为 fanout,该交换机发送消息到队列 backup.queue 和 warning.queue

代码结构图:

image

修改高级确认发布 配置类

/*** @version 1.0* desc:配置类,发布确认(高级)*/
@Configuration
public class ConfirmConfig {//交换机public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";//队列public static final String CONFIRM_QUEUE_NAME = "confirm_queue";//routingKeypublic static final String CONFIRM_ROUTING_KEY = "key1";//关于备份的//交换机public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";//队列public static final String BACKUP_QUEUE_NAME = "backup_queue";//报警队列public static final String WARNING_QUEUE_NAME = "warning_queue";//声明交换机,设置该交换机的备份交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();}//声明队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//绑定@Beanpublic Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}//备份交换机的创建@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明备份队列@Bean("backupQueue")public Queue backupQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}//声明报警队列@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}//绑定 备份队列绑定备份交换机@Beanpublic Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,@Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(backupQueue).to(backupExchange);}//绑定 报警队列绑定备份交换机@Beanpublic Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue") Queue warningQueue,@Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(warningQueue).to(backupExchange);}}

报警消费者

/*** @version 1.0* decs:报警消费者
*/
@Slf4j
@Component
public class WarningConsumer {//接收报警信息@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)public void receiveWarningMsg(Message message){String msg = new String(message.getBody());log.error("报警发现不可路由消息:{}",msg);}
}

由于之前写过 confirm.exchange 交换机,当更改配置了,需要删掉,不然会报错

打开浏览器访问地址:http://localhost:8888/confirm/sendMessage/大家好啊

image

Mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/237158.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于单片机智能自动浇花系统设计

**单片机设计介绍,基于单片机智能自动浇花系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的智能自动浇花系统是一种可以自动感知周围环境,并执行相应动作的系统。通过使用传感器检测土…

【Netty】NIO与Netty核心概念

目录 NIO编程NIO介绍NIO和BIO的比较缓冲区(Buffer)基本介绍常用API缓冲区对象创建添加数据读取数据 通道(Channel)基本介绍Channel常用类ServerSocketChannelSocketChannel Selector (选择器)基本介绍常用API介绍示例代码 NIO 三大核心原理 Netty核心概念Netty 介绍原生 NIO 存…

【QT表格-6】QTableWidget的currentCellChanged实现中途撤销

背景: 【QT表格-1】QStandardItem的堆内存释放需要单独delete,还是随QStandardItemModel的remove或clear自动销毁?-CSDN博客 【QT表格-2】QTableWidget单元格结束编辑操作endEditting_qtablewidget 单元格编辑事件-CSDN博客 【QT表格-3】Q…

【Chrome】ERR_SSL_PROTOCOL_ERROR问题

文章目录 前言一、下载二、使用步骤总结 前言 Edge升级最新版后,有的https访问不了,报如下错误 发现新版Chrome以及Chromium内核访问nginx ssl时报错,顺着这个思路接着查看到大佬的结论:服务器nginx使用的openssl版本过低&#…

C++入门【12-C++ 数组】

C 数组 C 支持数组数据结构,它可以存储一个固定大小的相同类型元素的顺序集合。数组是用来存储一系列数据,但它往往被认为是一系列相同类型的变量。 数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99&#xff0…

控制理论simulink+matlab

控制理论下的simulink和matlab使用 根轨迹LQR控制器简单使用状态观测器设计 根轨迹 z [-1]; %开环传递函数的零点 p [0 -2 -3 -4]; %开环传递函数的系统极点 k 1; %开环传递函数的系数,反映在比例上 g zpk(z,p,k); %生成开环传递函数%生成的传递函数如…

社交网络分析(汇总)

这里写自定义目录标题 写在最前面社交网络分析系列文章汇总目录 提纲问题一、社交网络相关定义和概念提纲问题1. 社交网络、社交网络分析;2. 六度分隔理论、贝肯数、顿巴数;3. 网络中的数学方法:马尔科夫过程和马尔科夫链、平均场理论、自组织…

使用JDBC对数据库进行简单操作

用Connection获得了数据库连接对象后,可以用Statement类型进行数据库操作。 在Statement对象中,有三种,分别是Statement,PrepareStatement,CallableStatement。 这三个的区别在于: Statement 用于执行不…

KubePi JWT 默认密钥权限绕过漏洞复现(CVE-2023-22463)

0x01 产品简介 KubePi 是一款简单易用的开源 Kubernetes 可视化管理面板。 0x02 漏洞概述 KubePi 存在权限绕过漏洞,攻击者可通过默认 JWT 密钥获取管理员权限控制整个平台,使用管理员权限操作核心的功能。 0x03 影响范围 KubePi <= 1.6.2 0x04 复现环境 FOFA: ti…

【Jenkins】远程API接口:Java 包装接口使用示例

jenkins-rest 库是一个面向对象的 Java 项目&#xff0c;它通过编程方式提供对 Jenkins REST API 的访问&#xff0c;以访问 Jenkins 提供的一些远程 API。它使用 jclouds 工具包构建&#xff0c;可以轻松扩展以支持更多 REST 端点。其功能集不断发展&#xff0c;用户可以通过拉…

怎么压缩过大的GIF图片?几个步骤轻松搞定!

GIF图片由于其图片格式&#xff0c;本身就会很大&#xff0c;但是微信QQ还有一些其他的社交平台对上传的表情包是有限制的&#xff0c;这个时候就需要借助一些图片处理工具对GIF进行压缩。 下面就向大家介绍三种好用的方法并展示具体的操作步骤。 一、使用嗨格式压缩大师进行压…

RouterSrv-路由功能

2023年全国网络系统管理赛项真题 模块B-Windows解析 题目 安装Remote Access服务开启路由转发,为当前实验环境提供路由功能。启用网络地址转换功能,实现内部客户端访问互联网资源。答题步骤 安装Remote Access服务开启路由转发,为当前实验环境提供路由功能。 配置网卡 加…

Day67力扣打卡

打卡记录 美丽塔 II&#xff08;前缀和 单调栈&#xff09; 链接 class Solution:def maximumSumOfHeights(self, maxHeights: List[int]) -> int:n len(maxHeights)stack collections.deque()pre, suf [0] * n, [0] * nfor i in range(n):while stack and maxHeights…

eNSP拓扑建立:RIP静态路由

实验名称&#xff1a; RIP动态路由协议 实验目的&#xff1a; 1、掌握RIPv1的配置方法 2、查看RIP路由分析过程 3、掌握测试RIP网络连通性的方法 步骤一:建立拓扑图 步骤二&#xff1a;配置PC终端的ip、子网掩码、网关。 步骤三&#xff1a;配置路由器&#xff0c;如图所示 步…

【K8s】3# 使用kuboard管理K8s集群(NFS存储安装)

文章目录 1.NFS是什么2.配置NFS服务器2.1.执行以下命令安装 nfs 服务器所需的软件包2.2.执行命令 vim /etc/exports&#xff0c;创建 exports 文件&#xff0c;文件内容如下2.3.执行以下命令&#xff0c;启动 nfs 服务2.4.检查配置是否生效 3.在客户端测试NFS3.1.执行以下命令安…

easyexcle处理复杂动态单元格合并问题,合并动态行列

GetMapping("getAddDelSummaryExport") ApiOperation("新增删除比例报表--导出") ApiImplicitParams({ApiImplicitParam(name "season", value "季节", paramType "query", dataType "String"),ApiImplicitPa…

Electron Vite打包后,部分图标未显示的解决方案

背景 这个问题&#xff0c;弄了一晚上&#xff0c;头都大了&#xff0c;找了一堆博客也没解决。主要参考这个&#xff1a;https://blog.csdn.net/m0_73845616/article/details/129741099。 下面讲一下我的解决方案。 解决方案 上面链接里的方法&#xff0c;我采用第二、三个都…

C# Onnx Yolov8 Detect 物体检测 多张图片同时推理

目录 效果 模型信息 项目 代码 下载 C# Onnx Yolov8 Detect 物体检测 多张图片同时推理 效果 模型信息 Model Properties ------------------------- date&#xff1a;2023-12-18T11:47:29.332397 description&#xff1a;Ultralytics YOLOv8n-detect model trained on …

Istio 社区周报(第一期):2023.12.11 - 12.17

欢迎来到 Istio 社区周报 Istio 社区朋友们&#xff0c;你们好&#xff01; 我很高兴呈现第一期 Istio 社区周报。作为 Istio 社区的一员&#xff0c;每周我将为您带来 Istio 的最新发展、有见地的社区讨论、专业提示和重要安全新闻内容。 祝你阅读愉快&#xff0c;并在下一期中…

第二十二章 : Spring Boot 集成定时任务(一)

第二十二章 &#xff1a; Spring Boot 集成定时任务&#xff08;一&#xff09; 前言 本章知识点&#xff1a; 介绍使用Spring Boot内置的Scheduled注解来实现定时任务-单线程和多线程&#xff1b;以及介绍Quartz定时任务调度框架&#xff1a;简单定时调度器&#xff08;Simp…