21.发布确认模式-高级

问题

生产环境中由于一些不明原因,导致rabbitmq重启,在重启的期间生产者消息投递失败,导致消息丢失,需要手动处理恢复。那么如何才能进行rabbitmq的消息可靠性投递?特别是在极端的情况,rabbitmq集群不可用的时候,无法投递的消息该如何处理?

例如这样的异常信息:

方案

生产者将发送的消息发给rabbitmq的同时,将消息备份到缓存中。如果rabbitmq宕机了。会有一个定时任务会对未成功发送的消息进行重新投递。如果交换机成功收到消息会从缓存中清除已收到的消息。

分析

造成消息丢失会有两种情况,一种是交换机故障,另一个中是队列故障。

交换机确认消息是否收到的解决办法

 启用发布确认的配置

spring:rabbitmq:host: 192.168.171.128username: adminpassword: 123port: 5672publisher-confirm-type: correlated

 默认是none值,是不开启的,禁用发布确认模式。

correlated,发布消息到交换机后会触发回调方法。

simple, 单个确认消息。

代码

配置类

package com.xkj.org.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {//交换机public static final String EXCHANGE_NAME = "confirm.exchange";//队列public static final String QUEUE_NAME = "confirm.queue";//Routing Keypublic static final String ROUTING_KEY = "key1";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(EXCHANGE_NAME);}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}@Beanpublic Binding bindingQueueToExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,@Qualifier("confirmQueue") Queue confirmQueue) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);}}

回调接口

package com.xkj.org.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** 回调接口*/
@Slf4j
@Component // 1.第一步实例化MyCallback这个bean
public class MyCallback implements RabbitTemplate.ConfirmCallback {@Autowired // 2.第二步将rabbitTemplate实例依赖注入进来private RabbitTemplate rabbitTemplate;@PostConstructpublic void init() { //3.第三步执行此方法//将本类对象(ConfirmCallback的实现类对象)注入到RabbitTemplate中rabbitTemplate.setConfirmCallback(this);}/*** 交换机确认回调方法* @param correlationData* @param ack* @param cause* 1.发消息 交换机收到 调用*  1.1 correlationData 回调消息的id及相关信息*  1.2 交换机收到消息 ack = true*  1.3 cause null* 2.发消息 交换机接收失败 回调*  2.1 correlationData 回调消息的id及相关信息*  2.2 交换机收到消息 ack = false*  2.3 cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId(): "";if(ack) {log.info("交换机已经接收到id为:{}的消息", id);}else {log.info("交换机还未收到id为:{}的消息,原因:{}", id, cause);}}
}

消费者

package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import com.xkj.org.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ConfirmQueueConsumer {@RabbitListener(queues = ConfirmConfig.QUEUE_NAME)public void receiveMsg(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), "UTF-8");log.info("收到队列的消息:{}",  msg);}
}

生产者

@ApiOperation("测试发布确认发消息")@GetMapping("/sendMessage/{msg}")public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {CorrelationData correlation = new CorrelationData("1");rabbitTemplate.convertAndSend("confirm.exchange", "key1", message, correlation);log.info("发送消息内容{}", message);}

结果

小技巧:如果要是测试交换机接收失败的回调,可以通过修改生产者发消息的交换机的名字为一个不存在的名字即可。

@ApiOperation("测试发布确认发消息")@GetMapping("/sendMessage/{msg}")public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {CorrelationData correlation = new CorrelationData("1");rabbitTemplate.convertAndSend("confirm.exchange"+"123", "key1", message, correlation);log.info("发送消息内容{}", message);}

 问题:上面只能保证交换机收到消息的确认回调,不能保证队列收到消息的确认回调?

队列确认消息是否收到的解决办法

比如routingKey错了,或者队列出了问题,队列也将无法收到消息。

在仅开启生产者确认机制情况下,接换机接收到消息后,会直接给消息生产者发送确认消息。如果发现该消息不可路由,那么消息会直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

解决办法

通过设置mandatory参数可以在当消息传递过程中不可达目的时将消息返回给生产者。

添加配置

spring:rabbitmq:host: 192.168.171.128username: adminpassword: 123port: 5672publisher-confirm-type: correlatedpublisher-returns: true

publiser-returns发布退回消息。

说明:这里为了测试故意把routingkey写错

代码

生产者

估计把routingKey改成错误的 key1123

@ApiOperation("测试发布确认发消息")@GetMapping("/sendMessage/{msg}")public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {CorrelationData correlation = new CorrelationData("1");rabbitTemplate.convertAndSend("confirm.exchange", "key1"+"123", message, correlation);log.info("发送消息内容{}", message);}

消费者

package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import com.xkj.org.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ConfirmQueueConsumer {@RabbitListener(queues = ConfirmConfig.QUEUE_NAME)public void receiveMsg(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), "UTF-8");log.info("收到队列的消息:{}",  msg);}
}

配置

package com.xkj.org.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {//交换机public static final String EXCHANGE_NAME = "confirm.exchange";//队列public static final String QUEUE_NAME = "confirm.queue";//Routing Keypublic static final String ROUTING_KEY = "key1";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(EXCHANGE_NAME);}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}@Beanpublic Binding bindingQueueToExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,@Qualifier("confirmQueue") Queue confirmQueue) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);}}

回调接口

package com.xkj.org.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;/*** 回调接口*/
@Slf4j
@Component // 1.第一步实例化MyCallback这个bean
public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowired // 2.第二步将rabbitTemplate实例依赖注入进来private RabbitTemplate rabbitTemplate;@PostConstructpublic void init() { //3.第三步执行此方法//将本类对象(ConfirmCallback的实现类对象)注入到RabbitTemplate中rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法* @param correlationData* @param ack* @param cause* 1.发消息 交换机收到 调用*  1.1 correlationData 回调消息的id及相关信息*  1.2 交换机收到消息 ack = true*  1.3 cause null* 2.发消息 交换机接收失败 回调*  2.1 correlationData 回调消息的id及相关信息*  2.2 交换机收到消息 ack = false*  2.3 cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId(): "";if(ack) {log.info("交换机已经接收到id为:{}的消息", id);}else {log.info("交换机还未收到id为:{}的消息,原因:{}", id, cause);}}/*** 可以在当消息传递过程中不可达目的时将消息返回给生产者* 注意此方法是消息传递失败才会调用,成功就不会执行* @param message* @param replyCode* @param replyText* @param exchange* @param routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {try {log.error("消息:{},被交换机:{},给退回了,原因:{},RoutingKey={}",new String(message.getBody(), "UTF-8"),exchange,replyText,routingKey);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}

测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/51241.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python 教程(六):函数式编程

目录 专栏列表前言函数定义参数返回值 示例函数类型普通函数空函数匿名函数(Lambda 函数)嵌套函数函数装饰器高阶函数 函数参数位置参数默认参数可变位置参数可变关键字参数 函数属性和方法__name____doc__func.__dict__func.__defaults__func.__annotat…

黑马头条Day11- 实时计算热点文章、KafkaStream

一、今日内容 1. 定时计算与实时计算 2. 今日内容 KafkaStream 什么是流式计算KafkaStream概述KafkaStream入门案例SpringBoot集成KafkaStream 实时计算 用户行为发送消息KafkaStream聚合处理消息更新文章行为数量替换热点文章数据 二、实时流式计算 1. 概念 一般流式计…

4、Python+MySQL+Flask的文件管理系统【附源码,运行简单】

4、PythonMySQLFlask的文件管理系统【附源码,运行简单】 总览 1、《文件管理系统》1.1 方案设计说明书设计目标工具列表 2、详细设计2.1 登录2.2 注册2.3 个人中心界面2.4 文件上传界面2.5 其他功能贴图 3、下载 总览 自己做的项目,禁止转载&#xff0c…

UART 通信协议

文章目录 一 简介二 电平标准三 引脚定义四 数据格式五 波特率 一 简介 ​ UART (Universal Asynchronous Receiver/Transmitter),通用异步收发器,是一种串行、异步、全双工通信协议。 串行:利用一条传输线,将数据一位一位地传送…

【七】Hadoop3.3.4基于ubuntu24的分布式集群安装

文章目录 1. 下载和准备工作1.1 安装包下载1.2 前提条件 2. 安装过程STEP 1: 解压并配置Hadoop选择环境变量添加位置的原则检查环境变量是否生效 STEP 2: 配置Hadoop2.1. 修改core-site.xml2.2. 修改hdfs-site.xml2.3. 修改mapred-site.xml2.4. 修改yarn-site.xml2.5. 修改hado…

引用的项目“xxxx/tsconfig.node.json”可能不会禁用发出。

vue3 报错: 引用的项目“xxxx/tsconfig.node.json”可能不会禁用发出。 解决: 进入对应的 json 文件: 修改: "noEmit": false 当 noEmit 设置为 false 时,TypeScript 编译器将根据项目配置生成相应的输出文…

基于Java的微博传播分析系统的设计与实现

1 项目介绍 1.1 摘要 本文致力于展示一项创新的微博传播分析系统设计与应用研究,该系统基于Java技术,巧妙利用大数据环境下的社交媒体——微博的庞大用户群及高度活跃特性,旨在深度探索信息传播的内在逻辑与社会影响机制。研究开篇明确定了…

OpenCV 灰度直方图

一 直方图的定义,意义和特征 1 定义 在统计学中,直方图是一种对数据分布情况的图形表示,是一种二维统计图表,他的两个坐标分别是统计样本(图像、视频帧)和样本的某种属性(亮度,像素…

bugku-web-cookies

进来以后看到一个巨长的字符串, 源码同样,发现url后面是base64编码解码得keys.txt 还有一个line参数,修改并没有发生任何变化。我想不到要改keys.txt成index.php(base64加密格式:aW5kZXgucGhw) line1时: line2时&…

AcWing 802. 区间和

var说明add存储了插入操作,在指定 x x x下标所在位置 a [ x ] c a[x]c a[x]cquery是求 [ L , R ] [L,R] [L,R]区间和用到的数组,最后才用到alls 是存储离散化之后的值 , 对于会访问到的每个下标,统统丢到 a l l s 里面 ,会把 x 和 [ L , R …

【Golang 面试 - 基础题】每日 5 题(七)

✍个人博客:Pandaconda-CSDN博客 📣专栏地址:http://t.csdnimg.cn/UWz06 📚专栏简介:在这个专栏中,我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话,欢迎点赞👍收藏…

数据结构----算法复杂度

1.数据结构前言 数据是杂乱无章的,我们要借助结构将数据管理起来 1.1 数据结构 数据结构(Data Structure)是计算机存储、组织数据的⽅式,指相互之间存在⼀种或多种特定关系的数 据元素的集合。没有⼀种单⼀的数据结构对所有⽤途都有⽤,所…

查看路由表 netstat -r

“Kernel IP routing table” 是Linux系统中用于展示和配置IP路由的表。它告诉操作系统如何将数据包从一个网络接口发送到另一个网络或主机。下面是对您给出的路由表条目的解释: Destination:目的地地址,可以是具体的IP地址,也可…

ctfshow解题方法

171 172 爆库名->爆表名->爆字段名->爆字段值 -1 union select 1,database() ,3 -- //返回数据库名 -1 union select 1,2,group_concat(table_name) from information_schema.tables where table_schema库名 -- //获取数据库里的表名 -1 union select 1,group_concat(…

Python爬虫入门02:Fiddler下载使用教程

文章目录 手机抓包全攻略:Fiddler 工具深度解析引言Fiddler 工具简介为什么选择 Fiddler? 安装与配置 Fiddler步骤一:下载与安装步骤二:配置浏览器代理步骤三:安装 HTTPS 证书 配置手机以使用 Fiddler步骤一&#xff1…

操作系统面试知识点总结5

#来自ウルトラマンメビウス(梦比优斯) 1 IO管理概述 1.1 I/O 设备 I/O 设备的类型分类。 1.1.1 按使用特性 人机交互类外部设备,例如打印机、显示器等。存储设备,例如磁盘、光盘等。网络通信设备,例如网络接口等。 1…

【计算机网络】IP地址和子网掩码(IP地址篇)

个人主页:【😊个人主页】 系列专栏:【❤️计算机网络】 文章目录 前言IP地址网络地址网络地址的定义与组成作用分类网络地址的分配与管理 广播地址(Broadcast Address)定义构成类型作用注意事项 广播地址功能 组播地址…

HiveSQL题——炸裂+开窗

一、每个学科的成绩第一名是谁? 0 问题描述 基于学生成绩表输出每个科目的第一名是谁呢? 1 数据准备 with t1 as(selectzs as name,[{"Chinese":80},{"Math":70}],{"English"…

CompletableFuture使用详解

简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多 时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Future 实现,是非常麻烦的。 CompletableFutur…

JMeter基本使用

一、JMeter线程组相关 进程:正在运行的程序。线程:是进程中的执行线索。线程组:进程中有许多线程,为了方便管理,可以对线程按照性质分组,分组的结果就是线程组。PS:三者关系,一个进…