RabbitMQ(二) - RabbitMQ与消息发布确认与返回、消费确认

RabbitMQ消息确认

SpringBoot与RabbitMQ整合后,对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致;

消息发布确认

  • 生产者给交换机发送消息后、若是不管了,则会出现消息丢失;
    • 解决方案1: 交换机接受到消息、给生产者一个答复ack, 若生产者没有收到ack, 可能出现消息丢失,因此重新发送消息;
  • 解决方案1隐藏问题:若是交换机发送了ack, 出现网络延迟,则生产者没有收到ack, 就会出现消息重复发送问题, 进而衍生幂等性问题;
    • 隐藏问题解决方案1:在数据库中增加一张去重表,设置唯一索引; 生产者在消息内容中,翻入唯一ID,消费者消费时、先从数据库查询是否存在,存在则不处理该消息;
      • 适用于并发低、业务严谨的场景
    • 隐藏问题解决方案2:利用Redis的String的setnx,若key存在,则不处理、若key存在,则执行业务;
      • 适用于短时间处理大量消息,且 key不会重复;
        -这就是大名鼎鼎的幂等性问题,贼讨厌这些专有名词;

业务开发中的幂等性

  1. 前端保存数据时、点击多次保存按钮,插入多条数据;
    • 解决方案 :前端限制按钮点击、 数据库设置业务唯一索引;
  2. 消息推送中,可能出现多条内容一样的消息,又不可以重复处理 ,需要幂等性处理;
    上家公司中,后台给app客户端推送系统消息时、配置给所有用户推送消息, 其他服务给我的应用消息服务推送 RabbitMQ消息, 正常来说, 每次推送的消息, 设备ID和用户ID合起来唯一的,结果其他服务业务数据存在问题,有些旧数据没有清除, 导致相通的设备ID,用户ID, 一次给设备用户推送了十几条,安卓客户端当当当的响, 直接惊动了产品经理; 经过排查上,是上游数据有问题,代码又很老,其他服务负责人排查了好几天, 把问题数据清楚了, 结果后面又产生了问题数据;
    • 解决方案:由于会一次性处理几万条推送消息,因此对业务要求速度高,因此利用Redis的String的setNx, 以taskId + mobileDevId + userId + tenantId 组成了唯一key,若是存在,则不处理; key有限时间为60分钟, 就成功处理了该问题;
    • 吐槽:上游的业务问题,让下游服务做业务保证,属实离谱;

{
“taskId” :“xxxx”;
“mobileDevId” : “xxxx”;
“userId”:“xxx”;
“tenantId” : “xxx”;
“其他字段”: “…”
}

RabbitMQ发布确认与返回

SpringBoot发布确认与返回

配置:
第二个参数因为过时,所以要配置第三个参数为correlated,表示用来确认消息;

#生产者
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated

生产者:

  1. 通过RabbitTempldate#setConfirmCallback设置确认回调, 即交换器发送给ack给生产者,生产者调用ConfirmCallback回调, 若出现异常cause,则可重新推送;
    通过RabbitTempldate#setReturnCallback设置返回回调;
  2. 通过template#waitForConfirms(xxx)表示等待xxx毫秒后确认,超时返回false;
    • 若返回false, 则进行业务补救处理;
public class ConfirmProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{@Autowiredprivate RabbitTemplate template;@Autowiredprivate DirectExchange confirmExchange;AtomicInteger index = new AtomicInteger(0);AtomicInteger count = new AtomicInteger(0);private final String[] keys = {"sms", "mail"};@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() throws IOException {//短信String sms = "{userName: xxx; phone:xxx}";HashMap<String, Object> map = new HashMap<>();map.put("userName", "hanxin");map.put("phone", index.getAndIncrement());template.setMandatory(true);template.setConfirmCallback(this);template.setReturnCallback(this);template.convertAndSend(confirmExchange.getName(), "confirm", map);System.out.println("send sms confirm");}@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send2() throws IOException {template.invoke((operations) -> {//短信String sms = "{userName: xxx; phone:xxx}";HashMap<String, Object> map = new HashMap<>();map.put("userName", "hanxin");map.put("phone", index.getAndIncrement());//必须设置template.setMandatory(true);template.convertAndSend(confirmExchange.getName(), "confirm", map);System.out.println("send sms confirm");return template.waitForConfirms(1000);});}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("receive confirm callback, ack = " + ack);}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("receive return call : message:" + message.getBody());System.out.println("receive return call : replyCode:" + replyCode);System.out.println("receive return call : replyText:" + replyText);System.out.println("receive return call : exchange:" + exchange);System.out.println("receive return call : routingKey:" + routingKey);}
}

业务开发中,非严谨,追求性能高业务建议使用send,这个过程是异步确认的;
严谨业务建议使用send2, 同步等待相应,出现问题好确认;

交换机:

@Configuration
public class ConfirmConfig {public final static String CONFIRM_QUEUE_NAME = "confirmQueue";public final static String CONFIRM_EXCHANGE_NAME = "confirmExchange";public final static String CONFIRM_ROUTING_NAME = "confirm";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Beanpublic ConfirmProducer confirmProducer() {return new ConfirmProducer();}
}

运行结果:

receive status : true

RabbitMQ发布确认

若是使用原生Rabbit MQ客户端API,则有三种方式:

  1. 声明channel是需要交换机确认的
channel.confirmSelect();
  1. 发布单条消息
for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);channel.basicPublish("", queue, null, body.getBytes());channel.waitForConfirmsOrDie(5_000);
}

channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待RabbitMQ给出一个响应,用来表明这个消息已经正确发送到了RabbitMQ服务端。但是要注意,这个方法会同步阻塞channel,在等待确认期间,channel将不能再继续发送消息,也就是说会明显降低集群的发送速度即吞吐量。

官方说明了,其实channel底层是异步工作的,会将channel阻塞住,然后异步等待服务端发送一个确认消息,才解除阻塞。但是我们在使用时,可以把他当作一个同步工具来看待。利用一个异步转同步功能,可利用JUC实现;

然后如果到了超时时间,还没有收到服务端的确认机制,那就会抛出异常。然后通常处理这个异常的方式是记录错误日志或者尝试重发消息,但是尝试重发时一定要注意不要使程序陷入死循环。

  1. 发送批量消息
    条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方式就是发送一批消息后,再一起确认
   int batchSize = 100;int outstandingMessageCount = 0;long start = System.nanoTime();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);ch.basicPublish("", queue, null, body.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}}if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(5_000);}

存在隐藏问题:若是500条消息处理太久,超时了,则响应失败,消息重新入队、出现重新消费问题;

  1. 异步确认消息
    实现的方式也比较简单,Producer在channel中注册监听器来对消息进行确认。核心代码就是一个:
channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);

这三种确认机制都能够提升Producer发送消息的安全性。通常情况下,第三种异步确认机制的性能是最好的。第一种安全性最高。

消费者确认

当交换机接受消息后,就要转发给消费者;如何保证消息不丢失?重复消费?

SpringBoot消费确认

自动确认
  • 若是业务中,一些消息发送给消费者,若是消息出现异常,消费者返回通知交换机消息出现了异常,交换机会将消息重新入队;

  • 若是没有确认消息,交换机没有收到消息,会将消息会重新放入队列中,每次消费者启动都会把以前消费的消息重新消费;

  • SpringBoot整合RabbitMQ后, 设置参数max-attempts为最大重试次数、retry.enabled为开启重试机制;

spring.rabbitmq.listener.direct.retry.max-attempts=5
spring.rabbitmq.listener.direct.retry.enabled=true

  • 为什么要开启重试?
    不开启重试、消费者处理消息发生异常后, RabbitMQ会丢弃该消息, 通常业务开发中,是不允许的。

  • 为什么设置重试次数?
    不开启重试次数,则消息会一直重新入队,占用内存,若是错误消息过多,RabbitMQ内存爆了;

手动确认

在手动确认的模式下,不管是消费成功还是消费失败,一定要记得确认消息,不然消息会一直处于unack状态,直到消费者进程重启或者停止。
设置参数:

spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费者:
通过使用channel#basicAck, basicNack, basicReject完成;

@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public class ConfirmConsumer {//    @RabbitHandler : 标记的方法只能有一个参数,类型为String ,若是传Map参数、则需要传入map参数// @RabbitListener:标记的方法可以传入Channel, Message参数@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void listenObjectQueue(Channel channel, Message message, Map<String, Object> msg) throws IOException {System.out.println("接收到object.queue的消息" + msg);System.out.println("消息ID : " + message.getMessageProperties().getDeliveryTag());try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (IOException exception) {//拒绝确认消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);//拒绝消息
//            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}@RabbitHandlerpublic void listenObjectQueue2(Map<String, Object> msg) throws IOException {System.out.println("接收到object.queue的消息" + msg);}}
  • 确认收到一个或多个消息

    • void basicAck(long deliveryTag, boolean multiple) throws IOException;
    • deliveryTag :消息传递标识
    • multiple:是否批量确认,为true,则确认后,其他消息deliveryTag小于当前消息的deliveryTag的消息全部变为确认;(慎重)
  • 拒绝一个或多个消息:

    • void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
    • deliveryTag:消息的传递标识。
    • multiple: 如果为true,则拒绝所有consumer获得的小于deliveryTag的消息。(慎重)
    • requeue: 设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队列。
  • 拒绝一个消息:

    • void basicReject(long deliveryTag, boolean requeue) throws IOException;
    • deliveryTag:消息的传递标识。
    • requeue: 设置为false 表示不再重新入队,如果配置了死信队列则进入死信队列。
  • channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。

个人还是推荐手动确认,可控性更高;
SpringBoot消费者手动确认调用的API与RabbitMQClient原生API一致,都是通过这三个方法完成确认操作;

业务开发中,@RabbitHandler注解用的少,因为注解标记的方法只能传入 消息内容参数, 无法传Channel, Message, 获取到的消息有限, 而@RabbitListener则相反;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/23858.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring源码之XML文件中Bean标签的解析1

读取XML文件&#xff0c;创建对象 xml文件里包含Bean的信息&#xff0c;为了避免多次IO&#xff0c;需要一次性读取xml文件中所有bean信息&#xff0c;加入到Spring工厂。 读取配置文件 new ClassPathResource("applicationContext.xml")ClassPathResource是Sprin…

10倍提升效率,号称取代Elasticsearch?

[Manticore Search](https://github.com/manticoresoftware/manticoresearch/) 是一个使用 C 开发的高性能搜索引擎&#xff0c;创建于 2017 年&#xff0c;其前身是 Sphinx Search 。Manticore Search 充分利用了 Sphinx&#xff0c;显着改进了它的功能&#xff0c;修复了数百…

Charles抓包工具使用(一)(macOS)

Fiddler抓包 | 竟然有这些骚操作&#xff0c;太神奇了&#xff1f; Fiddler响应拦截数据篡改&#xff0c;实现特殊场景深度测试&#xff08;一&#xff09; 利用Fiddler抓包调试工具&#xff0c;实现mock数据特殊场景深度测试&#xff08;二&#xff09; 利用Fiddler抓包调试工…

Linux下TCP网络服务器与客户端通信程序入门

文章目录 目标服务器与客户端通信流程TCP服务器代码TCP客户端代码 目标 实现客户端连接服务器&#xff0c;通过终端窗口发送信息给服务器端&#xff0c;服务器接收到信息后对信息数据进行回传&#xff0c;客户端读取回传信息并返回。 服务器与客户端通信流程 TCP服务器代码 …

etcd

文章目录 etcd单机安装设置键值对watch操作读取键过往版本的值压缩修订版本lease租约&#xff08;过期机制&#xff09;授予租约撤销租约keepAlive续约获取租约信息 事务基于etcd实现分布式锁原生实现官方 concurrency 包实现 服务注册与发现Go 操作 Etcd 参考 etcd etcd 是一…

02|Oracle学习(数据类型、DDL)

1. 数据类型&#xff1a; 通常为&#xff1a;字符型、数值型、日期型以及大字段型大字段型&#xff1a;存放大数据及文件。 存储大数据时&#xff0c;基本上blob就能满足。 2. DDL&#xff08;数据库定义语言&#xff09; 主要包括对数据库对象的创建、删除及修改的操作。…

2.文件的逻辑结构

第四章 文件管理 2.文件的逻辑结构 顺序文件采用顺序存储则意味着各个逻辑上相邻的记录在物理上也是相邻的存储的。所以如果第0号记录的逻辑地址为0的话&#xff0c;则i号记录的逻辑为i *L。 特别的如果这个定长记录的顺序文件采用串结构&#xff0c;也就是这些记录的顺序和他…

go 结构体 - 值类型、引用类型 - 结构体转json类型 - 指针类型的种类 - 结构体方法 - 继承 - 多态(interface接口) - 练习

目录 一、结构体 1、python 与 go面向对象的实现&#xff1a; 2、初用GO中的结构体&#xff1a;&#xff08;实例化一个值类型的数据&#xff08;结构体&#xff09;&#xff09; 输出结果不同的三种方式 3、实例化一个引用类型的数据&#xff08;结构体&#xff09; 4、…

使用vscode+ssh免密远程Linux

使用vscodessh免密远程Linux 使用 SSH 密钥对&#xff1a;使用 SSH Agent&#xff1a;ssh-agent的使用场景 使用 SSH 密钥对&#xff1a; 确保你的本地机器上已经生成了 SSH 密钥对。如果没有&#xff0c;请使用以下命令生成密钥对&#xff1a; ssh-keygen -t rsa这将在 ~/.ssh…

Tomcat添加第三方jar包、如何在IDEA中启动部署Web模板

前言:公司最近维护老项目,是最原始的web项目,servlet和jsp结合的web项目,启动的时候配置了好几遍, 都起不来,很折磨人,这个文档比较全配置一遍准备工作 首先 拉取代码: git clone xxx.git ,如需要别的操作,自行baidu 也可以在idea中拉取第一步File ->Project Structure->…

Redis两种持久化方案RDB持久化和AOF持久化

Redis持久化 Redis有两种持久化方案&#xff1a; RDB持久化AOF持久化 1.1.RDB持久化 RDB全称Redis Database Backup file&#xff08;Redis数据备份文件&#xff09;&#xff0c;也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启…

VSCode插件Todo Tree的使用

在VSCode中安装插件Todo Tree。按下快捷键ctrlshiftP&#xff0c;输入setting.jspn&#xff0c;选择相应的配置范围&#xff0c;我们选择的是用户配置 Open User Settings(JSON)&#xff0c;将以下代码插入其中。 //todo-tree 标签配置从这里开始 标签兼容大小写字母(很好的功…

html2canvas以及new QRCode生成带有二维码的海报的问题

全局定义new QRCode var posterQrCode new QRCode(document.getElementById("qrcode"), {// text: data,width: 82,height: 82,correctLevel: 3 // 解决二维码识别不高的问题}); 取消修改的时候给posterQrCode传值posterQrCode.makeCode(data);解决二维码海报弹框出…

Spring Boot 集成Seata

Seata的集成方式有&#xff1a; 1. Seata-All 2. Seata-Spring-Boot-Starter 3. Spring-Cloud-Starter-Seata 本案例使用Seata-Spring-Boot-Starter演示&#xff1a; 第一步&#xff1a;下载Seata 第二步&#xff1a;为了更好看到效果&#xff0c;我们将Seata的数据存储改…

linuxARM裸机学习笔记(2)----汇编LED灯实验

MX6ULL 的 IO IO的复用功能 这里的只使用了低五位&#xff0c;用来配置io口&#xff0c;其中bit0~bit3(MUX_MODE)就是设置 GPIO1_IO00 的复用功能的&#xff0c;GPIO1_IO00 一共可以复用为 9种功能 IO&#xff0c;分别对应 ALT0~ALT8。每种对应了不同的功能 io的属性配置 HY…

SolidWorks 3D Interconnect介绍

目前市面上有的三维设计软件有很多&#xff0c;如UG、Pro/E、CATIA等&#xff0c;而且每个三维设计软件都会生成自己文件格式。由于产品设计的原因&#xff0c;我们避免不了的会需要去使用不同三维设计软件的文件&#xff0c;这对于工程师来说其实是一件比较麻烦的事。 为什么…

代码随想录第38天 | 动态规划理论基础、509. 斐波那契数、70. 爬楼梯、746. 使用最小花费爬楼梯

动态规划理论基础 如果某一问题有很多重叠子问题&#xff0c;使用动态规划是最有效的。所以动态规划中每一个状态一定是由上一个状态推导出来的。 动态规划解题步骤&#xff1a; 确定dp数组&#xff08;dp table&#xff09;以及下标的含义确定递推公式dp数组如何初始化确定…

[腾讯云Cloud Studio实战训练营]基于Cloud Studio完成图书管理系统

[腾讯云Cloud Studio实战训练营]基于Cloud Studio完成图书管理系统 ⭐前言&#x1f31c;Cloud Studio产品介绍1.登录2.创建工作空间3.工作空间界面简介4.环境的使用 ⭐实验实操&#x1f31c;Cloud Studio实现图书管理系统1.实验目的 2. 实验过程2.实验环境3.源码讲解3.1添加数据…

下载Windows 10光盘镜像(ISO文件)

文章目录 下载Windows 10镜像文件 下载Windows 10镜像文件 打开微软官网下载地址 立即下载工具 找到下载工具&#xff0c;双击运行&#xff0c;等待 接受条款&#xff0c;等待 选择为另一台电脑安装介质 选择Windows10&#xff0c;下一步 选择ISO文件&#xff0c;…

后端开发1.项目的搭建

创建maven项目 pom文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.ap…