Rabbitmq入门与应用(六)-rabbitmq的消息确认机制

rabbitmq的消息确认机制

确认消息是否发送给交换机

配置
server:port: 11111
spring:rabbitmq:port: 5672host: 192.168.201.81username: adminpassword: 123publisher-confirm-type: correlated
编码RabbitTemplate.ConfirmCallback

ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。

在配置类中编码确认回调函数。tips: 设置 rabbitTemplate.setMandatory(true);

配置类

rabbitTemplate.setConfirmCallback(ConfirmCallback confirmCallback);

CorrelationData:

1、消息ID需要封装到CorrelationData
2、correlationData.getFuture().addCallback(…)是一个回调函数:决定了每个业务处理confirm成功或失败的逻辑。

@Bean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);log.debug("Rabbitmq配置启动成功,RabbitTemplate:{}设置完成",rabbitTemplate);rabbitTemplate.setMessageConverter(messageConverter());rabbitTemplate.setConfirmCallback(new RabbitConfirmCallbackImpl());return rabbitTemplate;
}/*** 确保消息是否发送到交换机*/
class RabbitConfirmCallbackImpl implements RabbitTemplate.ConfirmCallback{@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.warn("****Exchange callback-检验是否发送成功********");log.warn("correlationData->相关数据:{}",correlationData);log.warn("ack->Exchange响应:{}",ack);log.warn("cause->错误原因:{}",cause);}
}
测试发送

测试向交换机发送数据,测试交换机是否成功收到。

假设给一个错误的Exchange
@Service
public class MqServiceImpl implements IMqService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendMessage(String msg) {//错误的Exchange名称,实际名称为:ssc_sc_routing_exchangefinal String EXCHANGE = "ssc_sc_routing_exchangex";final String ROUTING_KEY = "ssc_sc_routing_key";rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,msg);}
}

image-20231218164926379

如果Exchange正确
@Override
public void sendMessage(String msg) {final String EXCHANGE = "ssc_sc_routing_exchange";final String ROUTING_KEY = "ssc_sc_routing_key";rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,msg);
}

image-20231218164425398

确认消息是否从交换机发送到队列RabbitTemplate.ReturnsCallback

设置ResturnsCallback

通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调。

配置文件
spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: true     #检查是否绑定到队列中
配置
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitConfirmReturnCallbackImpl());class RabbitConfirmReturnCallbackImpl implements RabbitTemplate.ReturnsCallback{@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.warn("message:{}",returnedMessage.getMessage());log.warn("exchange:{}",returnedMessage.getExchange());log.warn("replyCode:{}",returnedMessage.getReplyCode());log.warn("replyText:{}",returnedMessage.getReplyText());log.warn("routingKey:{}",returnedMessage.getRoutingKey());}
}
测试

修改routingkey的值,让交换机不能路由到指定Queue。

package com.wnhz.ssc.cloud.mq.service.impl;import com.wnhz.ssc.cloud.mq.service.IMqService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MqServiceImpl implements IMqService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendMessage(String msg) {final String EXCHANGE = "ssc_sc_routing_exchange";//修改routingkey,给一个错误的值,正确值为: ssc_sc_routing_keyfinal String ROUTING_KEY = "ssc_sc_routing_keyx";rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,msg);}
}

image-20231218165907610

返回message:

message:(
Body:'"hello confirm call back"'MessageProperties[headers={__TypeId__=java.lang.String},contentType=application/json,contentEncoding=UTF-8,contentLength=0,receivedDeliveryMode=PERSISTENT,priority=0,deliveryTag=0]
)

消费确认信息

消费监听模式
  • Simple模式

    image-20231218155921090

    Simple模式即SMLC。simple模式每个消费者都有其私有的线程,可以增加消费者,也会自动增加消费线程,不管消费者是不是在处理消息,可能会造成资源线程的浪费。 对每个消费者使用一个内部队列和一个专用线程。如果容器配置为侦听多个队列,则使用同一个消费者线程来处理所有队列。并发控制由concurrentConsumers和其他属性。当消息从 RabbitMQ 客户端到达时,客户端线程通过队列将它们传递给消费者线程。

  • Direct模式

    image-20231218160106458

    压力集中在Connection线程池上,线程可以复用与多个消费者,但是如果采用这种模式,需要设置Connection线程池合适的参数。

Message对象结构

Message对象的结构,

消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

image-20231218173406699

image-20231218173218649

消息确认方式
  1. AcknowledgeMode.AUTO:自动确认。
  2. AcknowledgeMode.NONE:根据情况确认。
  3. AcknowledgeMode.MANUAL:手动确认。

direct模式:

image-20231218173612719

simple模式:

image-20231218173833493
消费端监听发送
@RabbitListener(queues = "data_confirm_queue")
@Override
public void receiveBookFromMq(Message message, Channel channel, Book book) {log.debug("message:{}", message);log.debug("message.getMessageProperties().getHeaders()===>{}",message.getMessageProperties().getHeaders());log.debug("[order消费者:]接收到消息: {}", book);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);log.debug("消息队列确认: {},{}",message.getMessageProperties().getConsumerQueue(), "接收到回调方法");} catch (IOException e) {e.printStackTrace();}
}
手动确认方式
  1. Basic.Ack 命令:用于确认当前消息。
  2. Basic.Nack 命令:用于否定当前消息(注意:这是AMQP 0-9-1的RabbitMQ扩展) 。
  3. Basic.Reject 命令:用于拒绝当前消息
channel.basicAck(long deliveryTag,boolean multiple)

basicAck 方法用于确认当前消息。

public void basicAck(long deliveryTag, boolean multiple) throws IOException {this.delegate.basicAck(deliveryTag, multiple);
}
  • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。

  • multiple:为了减少网络流量,手动确认可以被批处理。

    • true: 代表批量应答 channel 上未应答的消息,比当前tag小的未应答的也一并应答(如5,6,7未应答)。

    image-20240221084206673

    • false: 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

      image-20240221084249669

basicNack

basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现

public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {this.delegate.basicNack(deliveryTag, multiple, requeue);
}
basicReject(long deliveryTag, boolean requeue)

basicNack 方法用于否定当前消息。basicReject 方法用于明确拒绝当前的消息而不是确认。

public void basicReject(long deliveryTag, boolean requeue) throws IOException {this.delegate.basicReject(deliveryTag, requeue);
}

消息遗弃或入队,一般建议消息丢弃重新发。

  • requeue: true :重回队列,false :丢弃,我们在nack方法中必须设置 false,否则重发没有意义。
出现异常的解决方案
package com.wnhz.mq.order.service.impl;import com.rabbitmq.client.Channel;
import com.wnhz.domain.Book;
import com.wnhz.mq.order.service.IOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;@Slf4j
@Service
public class OrderServiceImpl implements IOrderService {private void buildException(){throw  new RuntimeException("[消费者:] 消费出现异常......");}@RabbitListener(queues = "data_confirm_queue")@Overridepublic void receiveBookFromMq(Message message, Channel channel, Book book) {try {//制造异常测试buildException();log.debug("message:{}", message);log.debug("message.getMessageProperties().getHeaders()===>{}",message.getMessageProperties().getHeaders());log.debug("[order消费者:]接收到消息: {}", book);channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);log.debug("消息队列确认: {},{}",message.getMessageProperties().getConsumerQueue(), "接收到回调方法");} catch (Exception e) {log.debug("消费异常: {}",e.getMessage());try {log.debug("尝试丢弃:{}消息.....................",book);channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);} catch (IOException ex) {ex.printStackTrace();}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/693189.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python学习笔记——自定义函数(基础知识)

自定义函数非常简洁有效地实现了代码的复用,让程序编写、阅读、测试和修改变得更加容易。 下面记录Python自定义函数的使用。 1、定义函数: def describe_pet(pet_name,animal_typedog):显示宠物的信息print(f"\nI have a {animal_type}.")…

仿12306校招项目-前后端运行

目录 1.git 克隆 2.设置JDK版本 3.sql脚本导入数据 4.启动中间件 5.运行后端 6.运行前端 1.git 克隆 打开 IntelliJ IDEA,菜单栏顶部找到 Git -> Clone 选项。找到 Clone 这个按钮输入 gitgitee.com:nageoffer/12306.git或者https://gitee.com/nageoffer/…

C# CAD交互界面-模态窗体与非模态窗体调用方式

运行环境Visual Studio 2022 c# cad2016 一、模态窗体调用方式: 当一个模态窗体打开时,它会阻塞主窗体的所有输入,直到关闭该模态窗体为止。例如,弹出一个对话框让用户必须完成某些操作后才能继续使用主程序。 [CommandMethod(&q…

C++正则表达式笔记

最近翻了翻正则表达式的一些资料&#xff0c;做个记录。 1、微软官方 <regex> 函数 | Microsoft Learn 2、正则表达式语法简介 正则表达式语法简介 - 简书 3、正则表达式基础语法大全 正则表达式基础语法大全_正则表达式语法大全-CSDN博客 4、练习 &#xff08;1…

HarmonyOS - 实现多设备协同开发实战教程~

前言 现在随着个人设备越来越多&#xff0c;越来越需要多个设备之间相互感知和连接&#xff0c;设备和设备之间可以相互联动&#xff0c;形成互联互通的场景&#xff0c;而搭载HarmonyOS的设备恰好可以满足这一点 。下面通过开发一个HarmonyOS的多端分布式表白应用来实现设备之…

python coding with ChatGPT 打卡第21天| 二叉树:最近公共祖先

相关推荐 python coding with ChatGPT 打卡第12天| 二叉树&#xff1a;理论基础 python coding with ChatGPT 打卡第13天| 二叉树的深度优先遍历 python coding with ChatGPT 打卡第14天| 二叉树的广度优先遍历 python coding with ChatGPT 打卡第15天| 二叉树&#xff1a;翻转…

hope实验室预备役第4次测试题解

目录 1.Foreign Exchange 2.Takahashi Gets Lost 3.Sasha and the Beautiful Array 4.Sasha and the Drawing 5.Sasha and the Casino 6.Only one of two 7.村村通 8.传送门 1.Foreign Exchange 原题链接 Sample 1 InputcopyOutputcopy 4 5 7 0 3 2 2 4 3 5 25 Sample…

【AI绘画】Stable Diffusion简介_stable diffusion变现

手把手教你入门绘图超强的AI绘画&#xff0c;用户只需要输入一段图片的文字描述&#xff0c;即可生成精美的绘画。给大家带来了全新保姆级教程资料包 &#xff08;文末可获取&#xff09; Stable Diffusion是2022年发布的深度学习文本到图像生成模型&#xff0c;它主要用于根据…

ncnn之三(补充):window环境下vs2022安装ncnn+protobuf

启动VS2022 下面的 x64 Native Tools Command Prompt for VS2022 protobuf git clone gitgithub.com:protocolbuffers/protobuf.git# 或者 下载 https://github.com/google/protobuf/archive/v3.11.2.zip cmake -G"NMake Makefiles" -DCMAKE_BUILD_TYPERelease -D…

HTML的特殊字符

HTML的特殊字符 有些特殊的字符在 html 文件中是不能直接表示的&#xff0c;例如: 空格&#xff0c;小于号(<)&#xff0c;大于号(>)&#xff0c;按位与(&)。 空格 示例代码&#xff1a; 运行结果&#xff1a; 由于html 标签就是用 < > 表示的&#xff0…

【快速搞定Webpack5】修改输出文件目录及自动清理上次打包文件(五)

介绍 默认情况下webpack打包后&#xff0c;我们的图片和js等文件都会被打包到dist目录下&#xff0c;文件多了混淆在一起一方面不利于文件的查找和管理&#xff0c;另外一方面看上去也不美观。 所以今天我们学习的内容就是控制输出后的文件进入不同的目录。 一、配置 新增4…

BioTech - 大型蛋白质复合物的组装流程 (CombFold)

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/136187314 CombFold是用于预测大型蛋白质复合物结构的组合和分层组装算法&#xff0c;利用AlphaFold2预测的亚基之间的成对相互作用。CombFold的组…

MES系统的功能有哪些?

阅读本文&#xff0c;你将了解&#xff1a;一、MES系统是什么&#xff1b;二、MES系统的功能&#xff1b;三、MES系统的使用场景与案例分析&#xff1b;四、如何更高效地利用MES系统。 这是我们公司正在使用的MES系统&#xff0c;已为大家搭建好模板了&#xff0c;无需下载&…

美团外卖商超销量数据

字段内容&#xff1a; shop_id varchar(50) NOT NULL, shop_id_str varchar(50) NOT NULL, shop_name varchar(400) DEFAULT NULL, shop_min_price varchar(10) DEFAULT NULL, shop_score varchar(10) DEFAULT NULL, shop_wm_score varchar(10) DEFAULT NU…

【Vuforia+Unity】AR02-长方体物体识别

1.创建模型 选择多维长方体图&#xff0c;这个长方体是生活中的真实物体的拍摄图&#xff0c;提前把6个面拍摄好并裁剪干净。 官网创建模型https://developer.vuforia.com/targetmanager/project/targets?projectId0ddbb5c17e7f4bf090834650bbea4995&avfalse 设置长宽高…

0220作业

C语言实现LED1闪烁 led.h #ifndef __LED_H__ #define __LED_H__//RCC寄存器封装 #define RCC_MP_AHB4_ENSETR (*(volatile unsigned int*)0x50000A28) //寄存器封装//GPIO寄存器封装 typedef struct{volatile unsigned int MODER; //00volatile unsigned int OTYPER; //04vol…

java数据结构与算法刷题-----LeetCode144. 二叉树的前序遍历

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 解题思路 利用递归&#xff0c;每次入栈一个结点&#xff08;每次递归都是…

优思学院【六西格玛案例】美国医院急诊部满意度提升

今天&#xff0c;优思学院来分享一个早期六西格玛项目的案例&#xff0c;项目背景是这样的&#xff0c;多年前&#xff0c;美国犹他州盐湖城的LDS医院已经实施了许多最佳实践。医院提供了床旁登记、高级分诊协议、护理点测试和实时放射学检查。一个强大而全面的持续质量改进计划…

CSS三大定位方式(浮动、定位、弹性盒)详细解析

CSS三大定位方式 前言&#xff1a;作为一名前端开发&#xff0c;已经工作2年了。由于自己是半路出家&#xff0c;从嵌入式方向转到前端开发&#xff0c;都是边百度边开发&#xff0c;很多基础都不了解&#xff0c;只要解决问题就好&#xff0c;但是近来为了让自己知识体系化&a…

北京高考数学填空题真题练一练(2014-2023)

距离2024年高考还有不到四个月的时间&#xff0c;今天我们来看看北京市的高考数学题真题。最近几年&#xff0c;只有北京、天津、上海三个直辖市的高考题是自主命题&#xff0c;其他省份全部是使用教育部统一命题的试卷。而且北京、天津、上海的数学现在也不再区分文理卷了&…