Rabbitmq消息应答,持久化,权重分配(7)

消息应答

概览

消息应答机制是 RabbitMQ 中确保消息处理的可靠性和一致性的重要机制之一。当消费者从队列中接收到消息并处理完成后,通常需要向 RabbitMQ 发送一个明确的消息应答,以告知 RabbitMQ 消息已经被处理,并可以安全地从队列中移除。

作用

  1. 确认消息已处理:消费者可以向 RabbitMQ 发送消息应答,以确认已经成功地处理了消息。

  2. 消息传递的可靠性:通过消息应答机制,可以确保消息在被消费者成功处理后才被从队列中移除,从而保证消息传递的可靠性。

模式

  1. 自动应答(Automatic Acknowledgement): 在这种模式下,消费者收到消息后立即自动发送确认,告知 RabbitMQ 消息已经被处理。这种模式下,消息一旦被发送给消费者,就会立即从队列中移除,无论消息是否被成功处理。虽然这种模式简单方便,但是可能会导致消息丢失或重复消费的问题,不适用于对消息传递的可靠性有要求的场景。

  2. 手动应答(Manual Acknowledgement): 在这种模式下,消费者需要显式地向 RabbitMQ 发送消息应答,以确认消息已经被成功处理。消费者在处理完消息后,可以调用 channel.basicAck() 方法发送确认,或调用 channel.basicNack() 方法发送拒绝确认。手动应答模式可以确保消息在被消费者成功处理后才被从队列中移除,提高了消息传递的可靠性。

手动应答模式适用于对消息传递的可靠性有要求的场景,例如需要保证消息不丢失、不重复消费的场景。通过手动应答,可以更精确地控制消息的处理过程,提高消息传递的可靠性和一致性。

 配置

方式一

全局配置

在springboot的yml文件中配置

spring.rabbitmq.listener.simple.acknowledge-mode=manual

spring.rabbitmq.listener.simple.acknowledge-mode 参数用于配置 RabbitMQ 消息监听器的应答模式。它决定了消费者接收到消息后如何向 RabbitMQ 确认消息的处理状态。该参数可以设置为以下几种值:

  1. none:表示禁用应答模式。在这种模式下,消费者不会向 RabbitMQ 确认消息的处理状态,RabbitMQ 会认为消息一直处于未处理状态,直到连接关闭。这种模式下,消息可能会被多次消费,可能会导致消息重复处理的问题。

  2. auto:表示自动应答模式。在这种模式下,消费者接收到消息后会立即自动发送确认,告知 RabbitMQ 消息已经被处理。这种模式下,消息一旦被发送给消费者,就会立即从队列中移除,无论消息是否被成功处理。自动应答模式简单方便,但可能会导致消息丢失或重复消费的问题,不适用于对消息传递的可靠性有要求的场景。

  3. manual:表示手动应答模式。在这种模式下,消费者需要显式地向 RabbitMQ 发送消息应答,以确认消息已经被成功处理。消费者在处理完消息后,可以调用 channel.basicAck() 方法发送确认,或调用 channel.basicNack() 方法发送拒绝确认。手动应答模式可以确保消息在被消费者成功处理后才被从队列中移除,提高了消息传递的可靠性。

  4. manual_batch:表示批量应答模式。批量应答模式是 RabbitMQ 3.3.0 版本引入的一种新的应答模式。在批量应答模式下,消费者可以一次性确认多个消息的处理结果,从而提高应答的效率。消费者可以通过调用 channel.basicAck() 方法一次性确认多个消息的处理,或者调用 channel.basicNack() 方法一次性拒绝多个消息的处理。批量应答模式适用于需要提高消费者效率的场景,但需要注意确保消息处理的一致性和可靠性。

方式二

对某个消费者单独配置

 @RabbitListener(queues = {"queue_normal"})public void consumeNormal(String msg, Message message, Channel channel) throws IOException {log.debug("消费者 - 普通队列 - 接收消息:" + msg);try {/***     deliveryTag:表示消息的唯一标识符,用于标识需要确认的消息。*                  每个消息都有一个唯一的 deliveryTag,由 RabbitMQ 自动生成。*                   在确认消息时,需要指定对应消息的 deliveryTag。*     multiple: 表示是否批量确认消息。如果将 multiple 参数设置为 false,*                则只确认指定 deliveryTag 对应的单个消息;如果将 multiple*                参数设置为 true,则表示确认所有 deliveryTag 小于或等于指定值的消息。*                通常情况下,建议将 multiple 参数设置为 false,以避免误操作导致确认了未处理的消息。*     requeue:表示是否重新将消息放入队列中。如果将 requeue 参数设置为 true,*              则表示消息将被重新放入队列中,等待被重新消费;如果将 requeue*              参数设置为 false,则表示消息将被丢弃,不会重新放入队列中。通常情况下,*              在确认消息时应将 requeue 参数设置为 false,以确保消息不会被重复消费。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);throw new ServerException("普通队列消费消息失败!");}}

方法:channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)参数详解:

  1. deliveryTag:表示消息的唯一标识符,用于标识需要确认的消息。每个消息都有一个唯一的 deliveryTag,由 RabbitMQ 自动生成。在确认消息时,需要指定对应消息的 deliveryTag

  2. multiple:表示是否批量确认消息。如果将 multiple 参数设置为 false,则只确认指定 deliveryTag 对应的单个消息;如果将 multiple 参数设置为 true,则表示确认所有 deliveryTag 小于或等于指定值的消息。通常情况下,建议将 multiple 参数设置为 false,以避免误操作导致确认了未处理的消息。

  3. requeue:表示是否重新将消息放入队列中。如果将 requeue 参数设置为 true,则表示消息将被重新放入队列中,等待被重新消费;如果将 requeue 参数设置为 false,则表示消息将被丢弃,不会重新放入队列中。通常情况下,在确认消息时应将 requeue 参数设置为 false,以确保消息不会被重复消费。

测试

这里用之前的简单模式进行测试,如果不想看前面的,可以自己创建队列和消费者

package com.model.listener;import com.code.exception.ServiceException;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.concurrent.TimeUnit;/*** @Author: Haiven* @Time: 2024/4/19 10:29* @Description: TODO*/
@Component
@Slf4j
@RabbitListener(queuesToDeclare = @Queue(value = "${rabbitmq.simple.queue}"))
public class SimpleConsumer {@RabbitHandlerpublic void simpleHandler(String msg, Message message, Channel channel) throws IOException, ServiceException {long msgId = message.getMessageProperties().getDeliveryTag();log.debug("简单模式消费者 - simple consumer - 接收到消息:" + msg);try {//线程睡 4 sTimeUnit.SECONDS.sleep(4);//手动应答channel.basicAck(msgId, false);log.debug("简单模式消费者 - simple consumer - 处理消息成功:" + msg);}catch (Exception e){channel.basicNack(msgId, false, true);log.debug("简单模式消费者 - simple consumer - 处理消息失败:" + msg);throw new ServiceException("简单模式接收消息失败");}}
}

 发送消息

 后台接收

这里暂时看不出消息是否是第一时间消费完毕,无法确认消息是否由channel.basicAck(msgId, false);方法应答,后续回结合死信队列,会更直观的验证

 消息持久化

概览

在 RabbitMQ 中,消息持久化是一种确保消息在服务器宕机或重启后不会丢失的重要机制。通过消息持久化,可以将消息存储到磁盘上,以确保消息的可靠性和持久性。

要实现消息的持久化,需要同时确保消息和队列都被持久化。下面是实现消息持久化的步骤:

1.队列持久化:首先,需要确保队列被声明为持久化。在声明队列时,需要设置 durable 参数为 true,表示该队列是持久化的。

Springboot在创建队列的时候:

QueueBuilder.durable(name)表示为持久化队列

QueueBuilder.nonDurable(name)表示为非持久化队列

2.消息持久化:然后,在发布消息时,需要将消息标记为持久化。在发布消息时,需要设置 deliveryMode 参数为 2,表示该消息是持久化的。

发送消息时:

rabbitTemplate.convertAndSend();发送的消息默认就是持久化的

通过以上步骤,队列和消息都被声明为持久化的,确保了消息在服务器宕机或重启后不会丢失。需要注意的是,消息持久化会带来一定的性能开销,因为需要将消息写入磁盘,所以在一些对性能要求较高的场景下,需要权衡考虑是否使用消息持久化机制。

权重分配

概览

在 RabbitMQ 中,权重分配通常指的是在消息队列的消费者之间进行负载均衡,以确保消息能够在多个消费者之间均匀分配,达到最优的消息处理效率。RabbitMQ 并没有直接提供权重分配的功能,但可以通过一些方法实现类似的效果。

实现

  1. 多个消费者绑定同一个队列:可以将多个消费者绑定到同一个队列上,RabbitMQ 将会循环地将消息发送给不同的消费者,实现简单的轮询分发。这种方式可以实现简单的权重分配,但不能根据消费者的处理能力动态调整权重。

  2. 手动设置消费者优先级:在消费者处理消息时,可以根据一些策略手动设置消费者的优先级。例如,可以根据消费者的处理能力、负载情况等因素动态调整消费者的优先级,从而实现动态的权重分配。这种方式需要在业务代码中实现逻辑,并且需要维护消费者的优先级信息。

  3. 使用 Direct Exchange 进行路由:可以使用 Direct Exchange 进行消息的路由,并根据消费者的能力对消息进行标记。然后,消费者根据消息的标记选择性地接收消息,实现动态的权重分配。这种方式需要在消息生产者和消费者之间约定好消息的标记,以及消费者的能力等信息。

  4. 使用 RabbitMQ 插件:RabbitMQ 社区提供了一些插件,例如 Consistent Hash Exchange、Priority Queue 等,可以实现更复杂的消息路由和权重分配策略。可以根据实际需求选择合适的插件来实现权重分配。

总的来说,权重分配是一个比较复杂的问题,需要根据实际业务需求和系统架构选择合适的方法。在设计消息队列系统时,需要考虑消息的生产和消费速度、消费者的处理能力、系统的稳定性等因素,综合考虑选择合适的权重分配策略。

配置

package com.model.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @Author: Haiven* @Time: 2024/4/19 11:51* @Description: TODO*/
@Component
@Slf4j
public class WorkConsumer {@RabbitListener(queues = {"work"}, concurrency = "1")public void consumer01(String msg){log.debug("消费者 -01- 接收消息:" + msg );}@RabbitListener(queues = {"work"}, concurrency = "6")public void consumer02(String msg){log.debug("消费者 -02- 接收消息:" + msg );}
}

@RabbitListener(queues = {"work"}, concurrency = "5")

concurrency 参数设置了消费者实例的数量为 5。通过调整 concurrency 参数的值,可以实现不同消费者实例的权重分配。

需要注意的是,concurrency 参数表示每个消费者实例的并发线程数,而不是消费者的数量。例如,如果 concurrency 参数设置为 5,则表示每个消费者实例将会启动 5 个并发线程来处理消息,如果需要配置多个消费者实例,可以通过创建多个 MyMessageListener Bean 来实现。

 测试

向之前的工作队列发送多条消息

 后台接收

 后台接收的频次比例为 1:6(之前为轮训分发)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/3811.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何消除浏览器SmartScreen对网站“不安全”提示?

面对互联网时代用户对网站安全性和可信度的严苛要求,网站运营者时常遭遇Microsoft Defender SmartScreen(SmartScreen)提示网站不安全的困扰。本文将剖析SmartScreen判定网站不安全的原因,并为运营者提供应对策略,以恢…

[最新]CentOS7设置开机自启动Hadoop集群

安装好Hadoop后我们可以使用开机自启动的方式,节约敲命令的时间。注意是centOS7版本!!!和centOS6版本区别非常大!!! 1、切换到系统目录 [rootmaster ~]# cd /etc/systemd [rootmaster systemd]# ll total 32 -rw-r--r-- 1 root root 720 Jun 30 23:11 bootcha…

ip https证书360

https证书主要作用是保障网络安全,在http协议的基础上通过SSL/TLS加密技术实现安全通信协议。对客户端以及服务器之间的传输数据进行加密,确保数据的完整性和机密性,维护用户隐私。通过HTTPS协议,我们可以安全地进行在线购物、网上…

【threejs教程7】threejs聚光灯、摄影机灯和汽车运动效果

【图片完整效果代码位于文章末】 在上一篇文章中我们实现了汽车模型的加载,这篇文章主要讲如何让汽车看起来像在运动。同时列出聚光灯和摄像机灯光的加载方法。 查看上一篇👉【threejs教程6】threejs加载glb模型文件(小米su7)&…

Kubernetes学习-核心概念篇(一) 初识Kubernetes

🏷️个人主页:牵着猫散步的鼠鼠 🏷️系列专栏:Kubernetes渐进式学习-专栏 🏷️个人学习笔记,若有缺误,欢迎评论区指正 目录 1. 前言 2. 什么是Kubernetes 3. 为什么需要Kubernetes 3.1. 应…

【高校科研前沿】东北地理所在遥感领域顶刊RSE发布中国主要红树植物群落遥感分类成果

目录 01 文章简介 02 研究内容 03 文章引用 01 文章简介 论文名称:Mangrove species mapping in coastal China using synthesized Sentinel-2 high-separability images(基于Sentinel-2高分离度图像的中国沿海红树群落制图) 第一作者及…

口才培训需要多久才能看到成效?

口才培训需要多久才能看到成效? 口才培训需要多久才能看到成效,这个问题的答案因个体差异而异,受到多种因素的影响。以下是对此问题的详细分析: 首先,每个人的口才基础和学习能力不同。有些人可能天生具备良好的口才…

面试C++(基础篇)- C++是如何工作的?

1:C是如何工作的&#xff1f; 首先以一个最简单的Hello word程序入门来看&#xff1a; #include <iostream>int main() {std::cout << "Hello World!\n"<< std::endl;std::cin.get(); }1&#xff1a;#include是预编译命令&#xff0c;发生在编译…

WoodMart主题下载:为您的电商网站带来自然而优雅的购物体验

在电子商务的激烈竞争中&#xff0c;一个设计精良、用户友好的在线商店是吸引和保留客户的关键。WoodMart主题&#xff0c;作为一款专为Shopify平台设计的高级主题&#xff0c;以其自然美学和强大的功能&#xff0c;帮助您的商店在众多竞争对手中脱颖而出。 [WoodMart主题的核…

开源框架-链路追踪(SkyWalking)

SkyWalking 极简入门 | Apache SkyWalking 开发环境配置&#xff1a; -javaagent:D:\xxxxx\yyyy\skywalking-agent.jar -DSW_AGENT_NAMEspringboot-xxxx-demo -DSW_AGENT_COLLECTOR_BACKEND_SERVICES127.0.0.1:11800

多行Textview 计算切分后的长度,并回退长度

实现类似的效果&#xff0c;一个多行的 textview&#xff0c; 如果赋值一个超长的字符&#xff0c;尾部长度回退部分&#xff0c;并添加 ... 最后添加一个详情按钮。 如果不超长则不显示详情 效果如图&#xff1a; 获取截断之后的字符长度 fun getLimitedCharacterCount(textV…

Docker与Linux容器:“探索容器化技术的奥秘”

目录 一、Docker概述 二、容器技术的起源&#xff1a; 三、Linux容器 四、Docker的出现 五、Docker容器特点&#xff1a; 六、Docker三大概念&#xff1a; 容器&#xff1a; 镜像&#xff1a; 仓库&#xff1a; 七、Docker容器常用命令 一、Docker概述 在云原生时代&…

每周题解:拯救大兵瑞恩

题目描述 1944 年&#xff0c;特种兵麦克接到国防部的命令&#xff0c;要求立即赶赴太平洋上的一个孤岛&#xff0c;营救被敌军俘虏的大兵瑞恩。 瑞恩被关押在一个迷宫里&#xff0c;迷宫地形复杂&#xff0c;但幸好麦克得到了迷宫的地形图。 迷宫的外形是一个长方形&#x…

架构师的六大生存法则与价值创造

目录 什么影响架构的成败 架构师的六大生存法则 一、所有的架构规划必须有且只有一个正确的目标 二、架构活动需要尊重和顺应人性 三、架构活动在有限的资源下最大化商业价值 四、架构师要考虑依赖的商业模块和技术生命周期 五、架构师为什么要关注技术体系的外部适应性…

【DINO】环境配置

1. DINO简介 作为一款基于Transformer性能强劲的计算机视觉算法&#xff0c;一经发布即受追捧&#xff0c;本文记录下在DINO官方代码在集群上的环境配置及训练自己的数据集过程。 DINO原文&#xff1a;https://arxiv.org/abs/2203.03605 DINO源代码&#xff1a;https://github.…

2021长城杯(部分复现)

2021年4月25日&#xff0c;上午8点左右&#xff0c;警方接到被害人金某报案&#xff0c;声称自己被敲诈数万元&#xff1b;经询问&#xff0c;昨日金某被嫌疑人诱导裸聊&#xff0c;下载了某“裸聊”软件&#xff0c;导致自己的通讯录和裸聊视频被嫌疑人获取&#xff0c;对其进…

Oracle数据库的AI能力分析,释放企业数据价值

解锁Oracle数据库的AI潜力 Oracle数据库提供了一系列的AI能力&#xff0c;旨在帮助企业和开发者更高效地利用人工智能技术。以下是Oracle数据库AI能力的一些关键点&#xff1a;1. AI向量相似性搜索&#xff1a;Oracle Database 23c引入了AI Vector Search功能&#xff0c;该功…

看企业中很多老师傅都说没前途,该不该放弃嵌入式单片机行业?

在企业中&#xff0c;我们经常会听到很多老师傅感叹嵌入式单片机行业没有前途&#xff0c;这也让不少人陷入了迷茫&#xff0c;不知道该不该放弃这个行业。其实&#xff0c;我发现很多新手在嵌入式和单片机领域都存在一个误区&#xff0c;那就是他们过于专注于工作技能的提升&a…

Win10装机(EasyU优启通制作优盘装机)

文章目录 EasyU优启通制作U盘WIndow 10 环境下载将Win10环境放在C盘之外的磁盘目录下&#xff0c;如D:/ 安装1. 进入BIOS2. 格式化C盘3. WinNTSetup4. 设置5.就绪&#xff1f;无需其他选项开始即可6. 重启&#xff0c;拔出U盘&#xff0c;就将自动安装6. 安装好后配置即可 参考…

C++中的queue(容器适配器)

目录 一、成员函数 一、构造函数 二、入栈 push 三、出栈 pop 四、判空 empty 五、队列大小 size 六、取队头元素 front 七、取队尾元素 back 八、入栈 emplace 九、交换函数 swap 二、非成员函数重载 一、关系运算符重载 二、交换函数 swap C中的queue不再是容…