RabbitMq-发布确认高级(避坑指南版)

在初学rabbitMq的时候,伙伴们肯定已经接触到了“发布确认”的概念,但是到了后期学习中,会接触到“springboot”中使用“发布确认”高级的概念。后者主要是解决什么问题呢?或者是什么样的场景引出这样的概念呢?

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 rabbitmq 重启期间生产者投递失败,导致消息丢失,需要手动处理和恢复。因此为了确保rabbitmq 的消息可靠投递,特别是在这样比较极端的情况,rabbitmq 集群不可用的时候,对无法投递的消息进行处理。

废话不说直接开始撸代码!!!在代码中解决实际问题~

一、代码架构分析:

        接触到这里,对于一条完整的“rabbitmq消息”发布链的构成大家已经不陌生了。主要是由:“消息生产者”、“交换机”、“队列”、“消费者”四个方面构成,如图所示:

二、构造“配置类”代码: 

声明交换机“confirm_exchange”、声明队列“confirm_queue”、通过routing-key对交换机和队列进行绑定。

package com.example.rabbitmq_demo.fabuquerengaoji;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @ClaseName: ConfirmConfig$* @Description:配置类 发布确认(高级)* @Author: wuhs* @Date: 2023/8/16$ 14:32$* 快捷键ctrl+shift+u  字母大小写转化*/
@Configuration
public class ConfirmConfig {// 交换机public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";// 队列public static final String CONFIRM_QUEUE_NAME = "confirm_queue";// ROUTING-KEYpublic static final String CONFIRM_ROUTING_KEY = "key1";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}@Beanpublic Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,@Qualifier("confirmQueue") Queue queue) {//一般使用在项目中使用@Qualifier来限定注入的Bean。return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);}
}

三、构建消费者代码:

通过@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)来监听队列,以此“充当”消费者。这一块也没啥好说的,直接上代码!

package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @ClaseName: Consumer$* @Description:消费者* @Author: wuhs* @Date: 2023/8/16$ 15:18$】*/
@Slf4j
@Component
public class Consumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void reciverConfirmMessage(Message message) {String msg = new String(message.getBody());log.info("接收到的队列confirm.queue消息:{}", msg);}
}

四、创建“回调”方法

        在最开始我们说到“确保rabbitmq 的消息可靠投递”的概念,那么具体如何确保呢?如果我们在消费者每次消费成功、未消费成功交换机都能进行“回调”确认,是不是就能知道哪些消息消费成功、哪些没有消费成功呢?

        在RabbitTemplate中有一个方法接口(ConfirmCallback),我们只需要实现这个接口并实现“confirm”方法,并将它注入进RabbitTemplate工具中即可创建“回调”。具体代码如下:

package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @ClaseName: MyCallBack$* @Description:* @Author: wuhs* @Date: 2023/8/16$ 16:17$*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 注入rabbitTemplate.setConfirmCallback(this);}//交换机确认回调方法  @Overridepublic void confirm(CorrelationData correlationData, boolean ack, String reason) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {//发消息 交换机接收到了消息 回调log.info("交换机已经收到了ID为:{}的消息", id);} else {//发消息 交换机没有接收到了消息 回调log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);}}
}

confirm方法参数介绍:

* 1. correlationData 保存回调消息的ID及相关信息
* 2. 交换机是否收到了消息 ack=true(收到)、ack=false(未收到)
* 3. reason 失败的原因

 六、配置类声明:(application.yml)

        在这里需要注意!!这也是最容易踩得坑,不知道有没有小伙伴遇没遇到,“publisher-confirm-type: correlated”也声明了,但是项目创建启动发布消息之后“没有成功回调”的情况,查看了很多的文章,很多博主只配置了publisher-confirm-type、但是并没有开启“confirm 确认机制”,所以会存在“误导”,导致一直找不到失败的原因~具体正确配置,看代码:

server:port: 8899spring:rabbitmq:host: 124.221.94.214port: 5672username: xgsmpassword: xgsm123# 发送者开启 confirm 确认机制publisher-confirms: truepublisher-confirm-type: correlated

 publisher-confirm-type参数介绍:

publisher-confirm-type这个参数一共有三种配置方法:

# NONE:禁用发布确认,是默认值。

# CORRELATED:发布消息后,交换机会触发回调方法。

# SIMPLE:有两种效果:

1:和CORRELATED一样会触发回调方法

2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,

# 要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

 七、创建Controller层(消息生产者)

        这里演示三种情况。第一种为正常情况下,发送成功后的回调;第二种消息为发送失败、当交换机不存在则发送失败(模拟发送失败),所以将交换机名称修改即可

package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @ClaseName: ProducerController$* @Description:消息生产者* @Author: wuhs* @Date: 2023/8/16$ 14:58$*/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;// @PathVariable主要作用:映射URL绑定的占位符@RequestMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message) {//正常发送CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);//发送失败-交换机不存在的情况CorrelationData correlationData2 = new CorrelationData("2");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"2", ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData2);log.info("发送的消息为:{}", message);}
}

测试结果: 

 如果是routing-key错误,这种情况会触发回调嘛?让我们验证一下;修改routing-key为“错误值”

  CorrelationData correlationData3 = new CorrelationData("3");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"key3", message,correlationData3);

测试结果:

         通过结果可以看出,消息发送成功了,而且也触发了“成功的回调”。但是我们知道的是,由于路由失败,这里消费者并没有对消息进行消费,这是为什么呢?那是因为,在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。解决方式为:通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。具体操作如下

1、application.yml文件中添加消息回退配置

# 发送者开启 return 确认机制publisher-returns: true

 2、实现RabbitTemplate中的方法接口ReturnCallback,并实现“returnedMessage”方法,最后将类注入到RabbitTemplate的RabbitTemplate中,详细代码如下:

package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @ClaseName: MyCallBack$* @Description:* @Author: wuhs* @Date: 2023/8/16$ 16:17$*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 注入rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法* 1、发消息 交换机接收到了消息 回调* 1.1 correlationData 保存回调消息的ID及相关信息* 1.2 交换机收到消息 ack=true* 2、发消息 交换机接收失败了 回调* 2.1 correlationData 保存回调消息的ID及相关信息* 2.2 交换机接收到消息 ack=false* 2.3 reason 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String reason) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("交换机已经收到了ID为:{}的消息", id);} else {log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);}}//可以在当消息传递的过长中不可达目的地时将消息返回给生产者// 只有不可待目的地的时候 才进行回退@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息{},被交换机{}退回,退回原因:{},路由key:{}", message, exchange, replyText, routingKey);}
}

测试结果:

2023-08-17 10:36:32.476  INFO 21108 --- [221.94.214:5672] c.e.r.fabuquerengaoji.MyCallBack : 消息(Body:'消息确认发布测试' MessageProperties [headers={spring_returned_message_correlation=3}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),被交换机confirm_exchange退回,退回原因:NO_ROUTE,路由key:key1key3

问题解决!~ 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/43211.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

day45 ● 70. 爬楼梯 (进阶)● 322. 零钱兑换 ● 279.完全平方数

70. 爬楼梯 class Solution {public int climbStairs(int n) {if(n <2) return n;int[] dp new int [n];dp[0] 1;dp[1] 2;for(int i 2; i< n;i){dp[i] dp[i-1] dp[i-2];}return dp[n-1];} } 322. 零钱兑换 class Solution {public int coinChange(int[] coins, in…

为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?

目录 一、为什么需要带有 subscribe 的 group.id二、我们需要使用commitSync手动提交偏移量吗&#xff1f;三、如果我想手动提交偏移量&#xff0c;该怎么做&#xff1f; 一、为什么需要带有 subscribe 的 group.id 消费概念&#xff1a; Kafka 使用消费者组的概念来实现主题的…

vscode | linux | c++ intelliense 被弃用解决方案

每日一句&#xff0c;vscode用的爽是爽&#xff0c;主要是可配置太强了。如果也很会研究&#xff0c;可以直接去咸鱼接单了 废话少说&#xff0c;直接整。 用着用着说是c intelliense被弃用&#xff0c;很多辅助功能无法使用&#xff0c;像查看定义、查看引用、函数跳转、智能提…

基于Rust的QuickLZ压缩算法的详细实现与分析

1. 引言 QuickLZ是一种被广泛应用的高效压缩算法。在许多应用中&#xff0c;快速的数据压缩和解压缩是非常关键的&#xff0c;特别是在网络传输和存储空间有限的场景中。为了满足现代软件开发的需求&#xff0c;我们将使用Rust语言来实现这一算法。Rust是一种专为系统级编程而…

Nodejs沙箱逃逸--总结

一、沙箱逃逸概念 JavaScript和Nodejs之间有什么区别&#xff1a;JavaScript用在浏览器前端&#xff0c;后来将Chrome中的v8引擎单独拿出来为JavaScript单独开发了一个运行环境&#xff0c;因此JavaScript也可以作为一门后端语言&#xff0c;写在后端&#xff08;服务端&#…

七夕特辑——3D爱心(可监听鼠标移动)

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 「推荐专栏」&#xff1a; ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄&#xff0c;vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…

【005】ts学习笔记【函数扩展】

函数扩展 参数类型 //注意&#xff0c;参数不能多传&#xff0c;也不能少传 必须按照约定的类型来 const fn (name: string , age : number ) : string > {return name age }let desc fn( "张三", 18) console.log(desc)可选参数与默认值 //可选的参数 和 默…

深入理解Flink Mailbox线程模型

文章目录 整体设计processMail1.Checkpoint Tigger2.ProcessingTime Timer Trigger processInput兼容SourceStreamTask 整体设计 Mailbox线程模型通过引入阻塞队列配合一个Mailbox线程的方式&#xff0c;可以轻松修改StreamTask内部状态的修改。Checkpoint、ProcessingTime Ti…

@Repeatable的作用以及具体如何使用

文章目录 1. 前言2. 先说结论3. 案例演示 1. 前言 最近无意看到某些注解上有Repeatable&#xff0c;出于比较好奇&#xff0c;因此稍微研究并写下此文章。 2. 先说结论 Repeatable的作用&#xff1a;使被他注释的注解可以在同一个地方重复使用。 具体使用如下&#xff1a; T…

CentOS7源码安装MySQL详细教程

&#x1f60a; 作者&#xff1a; Eric &#x1f496; 主页&#xff1a; https://blog.csdn.net/weixin_47316183?typeblog &#x1f389; 主题&#xff1a;CentOS7源码安装MySQL详细教程 ⏱️ 创作时间&#xff1a; 2023年08月014日 文章目录 1、安装的四种方式2、源码安装…

深度解析:DDoS攻击与先进防御策略

目录 DDoS 介绍 DDoS 攻击理论 DDoS 介绍 DDoS&#xff08;分布式拒绝服务&#xff09;攻击是一种恶意网络活动&#xff0c;旨在通过同时向目标系统发送大量请求或流量&#xff0c;使其无法正常运行或提供服务。攻击者通常利用网络上的多个计算机和设备&#xff0c;形成一个&…

极智嘉x吉利汽车 x京东物流,引领汽车行业智慧物流新变革!

近日&#xff0c;中国领先的汽车制造商吉利汽车携手中国领先的技术驱动的供应链解决方案及物流服务商京东物流、全球仓储机器人引领者极智嘉(Geek)&#xff0c;在西安吉利汽车制造基地RDC仓库率先落地SkyPick上存下拣解决方案&#xff0c;实现了全物流链精益化、智能化、一体化…

Spring-4-掌握Spring事务传播机制

今日目标 能够掌握Spring事务配置 Spring事务管理 1 Spring事务简介【重点】 1.1 Spring事务作用 事务作用&#xff1a;在数据层保障一系列的数据库操作同成功同失败 Spring事务作用&#xff1a;在数据层或业务层保障一系列的数据库操作同成功同失败 1.2 案例分析Spring…

STM32--TIM定时器(2)

文章目录 输出比较PWM输出比较通道参数计算舵机简介直流电机简介TB6612 PWM基本结构PWM驱动呼吸灯PWM驱动舵机PWM控制电机 输出比较 输出比较&#xff0c;简称OC&#xff08;Output Compare&#xff09;。 输出比较的原理是&#xff0c;当定时器计数值与比较值相等或者满足某种…

【数据结构OJ题】有效的括号

原题链接&#xff1a;https://leetcode.cn/problems/valid-parentheses/ 目录 1. 题目描述 2. 思路分析 3. 代码实现 1. 题目描述 2. 思路分析 这道题目主要考查了栈的特性&#xff1a; 题目的意思主要是要做到3点匹配&#xff1a;类型、顺序、数量。 题目给的例子是比较…

【Hibench 】完成 HDP-Spark 性能测试

&#x1f341; 博主 "开着拖拉机回家"带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——&#x1f390;开着拖拉机回家_Linux,Java基础学习,大数据运维-CSDN博客 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341; 希望本文能够给您带来一定的…

单片机实训报告

这周我们进行了单片机实训&#xff0c;一周中我们通过七个项目1&#xff1a;P1 口输入/输出 2&#xff1a;继电器控制 3 音频控制 4&#xff1a;子程序设计 5&#xff1a;字符碰头程序设计 6&#xff1a;外部中断 7&#xff1a; 急救车与交通信号灯&#xff0c;练习编写了子程…

mysql 设置 mysql 日志时间与系统时间保持一致

临时设置 mysql> show variables like %log_timestamps%;-----------------------| Variable_name | Value |-----------------------| log_timestamps | UTC |-----------------------1 row in set (0.00 sec)系统是 CST &#xff0c; nysql 是 UTC当UTC时间为0点时&am…

docker的使用方法总结

Docker是一个非常强大的工具&#xff0c;它可以用于创建、部署和运行应用程序。以下是一些docker相关的常用指令&#xff0c; 1、查看docker版本 docker version 2、查看正在运行的Docker容器 docker ps 3、查看所有的docker容器&#xff08;包括没有运行的容器&#xff0…

Python 之 Http 获取网页的 html 数据,并去掉 html 格式等相关信息

Python之 Http 获取网页的 html 数据,并去掉 html 格式等相关信息 目录 Python之 Http 获取网页的 html 数据,并去掉 html 格式等相关信息