RabbitMq 消息确认和退回机制

一、Rabbit中消息确认和退回机制

1、发布确认

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,(单个)如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号。

1.1 开启发布确认

//开启发布确认
channel.confirmSelect();

1.2 发布确认的两种方式

      同步确认

      就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

public static void publishMessageIndividually() throws Exception {Channel channel = RabbitMqUtils.getChannel();//队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, true, false, false, null);//开启发布确认channel.confirmSelect();long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());//服务端返回 false 或超时时间内未返回,生产者可以消息重发boolean flag = channel.waitForConfirms();if (flag) {System.out.println("消息发送成功");}}long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");}
  异步确认 

    他是利用回调函数来达到消息可靠性传递的

//    异步确认发布private static void publishMessageAsync() throws Exception {Channel channel = RabbitMqUtils.getChannel();//队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, true, false, false, null);//开启发布确认channel.confirmSelect();/**线程安全有序的一个哈希表,适用于高并发的情况下1.轻松的将序号与消息进行关联2.轻松批量删除条目,只要给到序号3.支持高并发(多线程)**///消息确认成功,回调函数ConcurrentSkipListMap<Long,String> outstandingConfirms =new ConcurrentSkipListMap<>();ConfirmCallback ackCallback = (deliveryTag , multiple)->{ //确认了多少条,multiple:批量或单个if (multiple){  //批量的//2.轻松批量删除条目,只要给到序号ConcurrentNavigableMap<Long,String> confirmed =outstandingConfirms.headMap(deliveryTag); //消息的序号confirmed.clear();}else {  //单个的outstandingConfirms.remove(deliveryTag);}System.out.println("确认的消息:"+deliveryTag);};//消息确认失败,回调函数/*** 1.消息的标记* 2.是否为批量确认*/ConfirmCallback nackCallback = (deliveryTag , multiple)->{//3、打印一下未确认的消息都有哪些String message = outstandingConfirms.get(deliveryTag);System.out.println("未确认的消息是:"+message+":::未确认的消息:"+deliveryTag);};//准备消息的监听器 ,监听哪些消息成功了, 哪些失败了channel.addConfirmListener( ackCallback, nackCallback);//开始时间long begin = System.currentTimeMillis();//发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());//1.轻松的将序号与消息进行关联,将每一条消息都存放hash表里面outstandingConfirms.put(channel.getNextPublishSeqNo(),message);}//结束时间long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");}

2、 退回机制

退回模式(return)说的是当消息到达交换机后,但是没有找到匹配的队列时,将消息回退给生产者。

默认情况下,如果消息没有匹配到队列会直接丢弃,采用退回模式可以在生产者端监听改消息是否被成功投递到队列中

channel.addReturnListener(new ReturnCallback() {@Overridepublic void handle(Return returnMessage) {System.out.println("消息被回退,原因:"+returnMessage.getReplyText());System.out.println(returnMessage.getExchange()); // 交换机System.out.println(returnMessage.getReplyCode()); // 返回原因的代码System.out.println(returnMessage.getReplyText()); // 返回信息,例如NO_ROUTESystem.out.println(returnMessage.getRoutingKey()); // 路由KEY}});

二、Spring boot 中Rabbit 实现消息确认和退回机制

1、 开启配置

spring:#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: rootpassword: root#虚拟host 可以不设置,使用server默认hostvirtual-host: JCcccHost#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true

2、配置消息确认和退回机制

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author : JCccc* @CreateTime : 2019/9/3* @Description :**/
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);// 配置发布到交换机的确认rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/***correlationData :客户端在发送原始消息时提供的对象。*ack:exchange交换机是否成功收到了消息。true成功,false代表失败。*cause:失败原因。*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);System.out.println("ConfirmCallback:     "+"确认情况:"+ack);System.out.println("ConfirmCallback:     "+"原因:"+cause);}});// 配置交换机没有找到对应的消息队列时,消息退回时的处理rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("ReturnCallback:     "+"消息:"+message);System.out.println("ReturnCallback:     "+"回应码:"+replyCode);System.out.println("ReturnCallback:     "+"回应信息:"+replyText);System.out.println("ReturnCallback:     "+"交换机:"+exchange);System.out.println("ReturnCallback:     "+"路由键:"+routingKey);}});return rabbitTemplate;}}

2.1 其他方式

package com.student.rabbitmq.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** 第二种方式*/
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;// 将我们实现的MyCallBack接口注入到RabbitTemplate中@PostConstructpublic void init() {// 设置确认消息交给谁处理rabbitTemplate.setConfirmCallback(this);// 设置回退消息交给谁处理rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法** @param correlationData 保存回调消息的ID以及相关信息* @param ack             表示交换机是否收到消息(true表示收到)* @param cause           表示消息接收失败的原因(收到消息为null)*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {System.out.println("交换机已经收到ID为:{}的消息");} else {System.out.println("交换机还未收到ID为:{}的消息,原因为:{}" + cause);}}/*** 路由出现问题的消息回退方法**/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(new String(message.getBody())+exchange+replyText+routingKey);}}

三、参考

发布确认—发布确认逻辑和发布确认的策略

消息可靠性之发布确认、退回机制_rabbittemplate 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/39052.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【面试题】网络 IO多路复用模型 select

目录 1.概念 使用select模型的步骤 select模型特点&#xff1a; Windows 和Linux 有什么区别&#xff1f; 为什么要引入select模型呢 同步阻塞问题我们可以利用多线程 或者把socket改成非阻塞 当我们要接受数据的时候我们要来回查看接受缓冲区有没有数据这样我们就要来回切…

英伟达被“压制”的25年

十九世纪中叶的美国西部&#xff0c;掀起了一场轰轰烈烈的淘金热&#xff0c;但最终赚到钱的&#xff0c;并不是拿命去赌的淘金者。一个名叫萨姆布瑞南的商人&#xff0c;通过向淘金者出售铲子&#xff0c;成了加州历史上第一位百万富翁。 每一次风口出现时&#xff0c;总有企…

使用Qt Installer Framework在centos7中打包

文章目录 步骤 1: 安装Qt和Qt Installer Framework安装Qt安装Qt Installer Framework步骤 2: 创建项目目录结构步骤 3: 编写安装脚本配置文件(config/config.xml)Package 信息meta/package.xmldata 目录步骤 4: 编写安装脚本步骤 5: 生成安装程序总结在CentOS 7中使用Qt Inst…

k8s自动清理节点服务

要在 Kubernetes 中实现当某个节点的 CPU 或内存使用超过 90% 时清理该节点上的服务&#xff0c;你可以使用以下几种方法&#xff1a; 自定义脚本和 cron job&#xff1a;编写一个脚本监控节点的资源使用情况&#xff0c;并在超过阈值时触发清理操作。使用 DaemonSet 运行监控…

互联网下的扭蛋机小程序开发:探索其独特优势

随着互联网技术的飞速发展&#xff0c;小程序作为一种新兴的轻量级应用形式&#xff0c;已经在各个领域展现出强大的生命力和广泛的应用前景。在娱乐和零售行业&#xff0c;扭蛋机作为一种经典的随机性消费体验方式&#xff0c;结合小程序进行开发&#xff0c;带来了诸多独特优…

每日一更 EFK日志分析系统

需要docker和docker-compose环境 下面时docker-compose.yaml文件 [rootnode1 docker-EFK]# cat docker-compose.yaml version: 3.3services:elasticsearch:image: "docker.elastic.co/elasticsearch/elasticsearch:7.17.5"container_name: elasticsearchrestart: …

鲲鹏arm服务器部署paddleOCR

1. 部署环境信息查看 1.1 操作系统 $ cat /etc/os-release PRETTY_NAME"UnionTech OS Server 20" NAME"UnionTech OS Server 20" VERSION_ID"20" VERSION"20" ID"uos" PLATFORM_ID"platform:uel20" HOME_URL&q…

「AIGC」大数据开发语言Scala入门

Scala 是一种多范式编程语言,设计初衷是集成面向对象编程和函数式编程的特点。它运行在 Java 虚拟机(JVM)上,因此可以与 Java 库无缝集成。Scala 也因其在大数据处理领域的应用而受到欢迎,特别是与 Apache Spark 这类框架结合使用。 1. 环境搭建 安装 Scala:可以从 Scala…

使用 Python 五年后,我发现学 python 必看这三本书!少走一半弯路

第一本 《Python编程-从入门到实践》 适合零基础的读者 豆瓣评分&#xff1a;9.1 推荐指数&#xff1a;5颗星 推荐理由&#xff1a; 本书是针对所有层次的 Python 读者而作的 Python 入门书。全书分为两部分&#xff1a; 第一部分介绍使用Python 编程所必须了解的…

CV每日论文--2024.6.28

1、On Scaling Up 3D Gaussian Splatting Training 中文标题&#xff1a;扩展 3D 高斯泼溅训练 简介&#xff1a;3D高斯点描(3DGS)由于其卓越的视觉质量和渲染速度,越来越受欢迎用于3D重建。然而,3DGS的训练目前仅在单个GPU上进行,由于内存限制,它的处理高分辨率和大规模3D重建…

SPU和sku的区别

SPU&#xff08;Standard Product Unit&#xff09;和SKU&#xff08;Stock Keeping Unit&#xff09;是两种常见的商品管理概念&#xff0c;它们在商品分类和管理中扮演着不同的角色。 SPU&#xff08;标准产品单元&#xff09;&#xff1a; 定义&#xff1a;SPU代表了一种标…

java中的抽象类abstract

抽象类 ① 格式&#xff1a;abstract class A {} ② 抽象类不能实例化对象 ③ 抽象类可以没有抽象方法&#xff0c;但是抽象方法所在的类&#xff0c;一定是抽象类 ④ 抽象类中包含构造器&#xff0c;因为在子类实例化对象时&#xff0c;会直接或间接调用父类构造器 抽象方法…

2024 年江西省研究生数学建模竞赛题目 B题投标中的竞争策略问题---完整文章分享(仅供学习)

问题&#xff1a; 招投标问题是企业运营过程中必须面对的基本问题之一。现有的招投标平台有国家级的&#xff0c;也有地方性的。在招投标过程中&#xff0c;企业需要全面了解招标公告中的相关信息&#xff0c;在遵守招投标各种规范和制度的基础上&#xff0c;选择有效的竞争策…

新手教学系列——【Python开发】不同系统更换pip源的方法

在使用Python进行开发时,你可能会发现使用pip安装包的速度较慢,尤其是在国内进行操作时。为了提高安装速度,我们可以将pip的默认源更换为国内的一些镜像源。本文将详细介绍如何在不同操作系统上进行这一操作,并给出常用的国内镜像源。 为什么要换源 pip默认使用的是官方的…

vector::C++

在 C 标准库中&#xff0c;std::vector 是一个动态数组容器&#xff0c;提供了类似于数组的功能&#xff0c;但具有自动调整大小的能力。它是 C 标准模板库&#xff08;STL&#xff09;的一部分&#xff0c;广泛用于存储和管理一组动态大小的元素。 std::vector 的特点 动态大…

axios源码打包关于rollup.config.js文件分析

axios 项目下的rollup.config.js文件 const lib require("./package.json"); const outputFileName "axios"; const name "axios"; const namedInput "./index.js"; const defaultInput "./lib/axios.js";export defa…

怎么彻底关闭Nacos?解决启动时Spring连接Nacos报错

Nacos Nacos是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。它是阿里巴巴开源的一个项目,旨在帮助您发现、配置和管理微服务。Nacos提供了一系列丰富的特性集合,支持服务发现和服务健康监测、动态配置服务、服务元数据和流量管理等功能。 主要功能 服务…

Steam社区101错误代码/steam社区报错、打不开怎么办

Steam社区是很多游戏玩家经常逛的一个互动空间&#xff0c;玩家可以在Steam社区了解游戏的相关评价&#xff0c;也可以在Steam社区和五湖四海的游戏玩家一起讨论最近游戏的心得&#xff0c;分享游玩技巧&#xff0c;探讨游戏战术等等&#xff0c;结识不同地区的玩家。不过很多玩…

【数据库原理】总结(期末版)

题型关系范式题[数据库原理]关系范式总结&#xff08;自用&#xff09;-CSDN博客事务分析题[数据库原理]事务-CSDN博客Sql题 MySQL:MySQL基本语法 Oracle:Oracle基本语法 ​​​​​​ 关系代数[数据库原理]关系代数-CSDN博客 sql里面主要是考增删改查授权撤销权限等内容&#…

Java案例实现双色球

一问题&#xff1a; 二具体代码&#xff1a; package 重修;import java.util.Random; import java.util.Scanner;public class first {public static void main(String[] args) {int []usersnumbersusernumslect();System.out.println("用户");for (int i 0; i <…