RabbitMQ——消息应答和持久化

文章目录

  • RabbitMQ——消息应答和持久化
    • 1、消息应答
      • 1.1、概念
      • 1.2、手动应答示例
    • 2、持久化

RabbitMQ——消息应答和持久化

1、消息应答

1.1、概念

概念

消息应答机制是指消费者在消费消息后向 RabbitMQ 确认(acknowledge)已经成功处理了消息。

这个机制有助于确保消息在被消费者处理后被正确地从队列中移除,从而防止消息的丢失。

两种消息应答机制

1、自动应答(Auto Acknowledgment): 在自动应答模式下,一旦消息被消费者接收,RabbitMQ 会立即将消息标记为已被消费,而不需要消费者明确地向 RabbitMQ 发送确认。

这种模式下,消息被认为已经成功处理,即使消费者在处理消息的过程中发生错误,消息也会从队列中删除。

开启自动应答: 在消费者订阅队列时,设置 autoAck 参数为 true

channel.basicConsume(queueName, true, consumer);

优点:简单,不需要手动确认。

适用场景:对消息的处理时机和可靠性要求不高,可以容忍一定程度的消息丢失。

2、手动应答(Manual Acknowledgment): 在手动应答模式下,消费者在处理完消息之后,需要向 RabbitMQ 发送明确的确认信号,告诉 RabbitMQ 可以安全地删除这条消息了。

这种模式下,消费者需要手动调用确认方法。

  • 关闭自动应答: 在消费者订阅队列时,设置 autoAck 参数为 false

    channel.basicConsume(queueName, false, consumer);
    
  • 手动确认消息: 在消费者处理完消息后,手动向 RabbitMQ 发送确认信号。

    channel.basicAck(deliveryTag, false);
    

优点:

1、更精细的控制消息的处理时机和可靠性,确保消息在成功处理后才被确认。

2、可以批量应答,并且减少网络拥堵

适用场景:对消息的可靠性传递有较高要求,需要在消息处理成功后才确认消息,以避免消息丢失。

1.2、手动应答示例

生产者:

package com.weipch.rabbitmq.three;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.weipch.rabbitmq.utils.RabbitMqUtils;import java.nio.charset.StandardCharsets;
import java.util.Scanner;public class Task02 {public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();boolean durable = true;channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();
//            要求生产者发送消息为持久化消息(要求保存到磁盘中)channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));System.out.println("生产者发出消息:" + message);}}
}

消费者:

package com.weipch.rabbitmq.three;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.weipch.rabbitmq.utils.RabbitMqUtils;
import com.weipch.rabbitmq.utils.SleepUtils;public class Worker03 {public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1等待接收消息处理时间较短");DeliverCallback deliverCallback=(consumerTag, message)->{SleepUtils.sleep(1);System.out.println("接收到的消息:"+new String(message.getBody()));/*手动应答1.deliveryTag: 消息的唯一标识符,用于确认具体的哪条消息已经被处理2.是否批量应答 false:不批量应答信道中的消息 true:批量* */channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag -> System.out.println(consumerTag+"消息被取消消费接口回调逻辑")));}
}

2、持久化

队列持久化

// 声明持久化的队列
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);

消息持久化

//要求生产者发送消息为持久化消息(要求保存到磁盘中)
//将消息的属性设置为 MessageProperties.PERSISTENT_TEXT_PLAIN,
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

即使消息被标记为持久化,也不能绝对保证消息在每个可能的故障场景下都不会丢失。例如:操作系统在写入磁盘时可能会使用缓存,而不是立即将数据写入磁盘。在极端情况下,如果发生硬件故障或操作系统崩溃,缓存中的数据可能还未来得及写入磁盘,导致数据丢失。

RabbitMQ的分发机制:

  • 轮询分发(不公平分发):这是RabbitMQ的默认分发策略。在这种模式下,消息会按顺序逐个发送给消费者。每条消息在被确认(acknowledged)后,即被从队列中移除,然后下一条消息会被发送给下一个消费者。这种机制确保了消息的公平分配,而不考虑消费者的处理能力或者负载情况。这可能导致某些消费者处理较快,而其他消费者处理较慢,从而影响整体的消息处理效率。

  • 公平分发(能者多劳):这种模式下,消息的分发会根据消费者的处理能力来决定。这意味着处理速度快的消费者会接收到更多的消息,而处理速度慢的消费者则会接收到较少的消息。这种方式旨在提高整体的处理效率,确保每个消费者都能根据自身的处理能力得到合理数量的消息。

    // basicQos设置为1是公平分发
    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
    

预取值

RabbitMQ中的预取(prefetch)是指在消费者从队列获取消息之前,队列可以推送多少条消息给消费者。设置预取值的主要目的是控制消费者获取消息的速率,防止某个消费者过快地消费消息,导致其他消费者无法及时处理消息。

在RabbitMQ中,可以通过basicQos方法来设置预取值。

// 设置预取值,这里设置为一次只推送5条消息给消费者
int prefetchCount = 5;
channel.basicQos(prefetchCount);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/704014.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

家用办公主机需要多少钱?推荐主机选购攻略!!

1.头部部分 本文将长期保持更新,您可以保存并随时查看。 过去推荐的 主持人推荐 以下家庭办公主机均采用性能强劲的12/13代i5配置,可以完美应对日常办公、平面设计、编辑等使用场景。 主机价格在4K左右,自带显示器,并附有三到…

xss靶场实战(xss-labs-master靶场)

xss-labs-master靶场链接&#xff1a;https://pan.baidu.com/s/1X_uZLF3CWw2Cmt3UnZ1bTw?pwdgk9c 提取码&#xff1a;gk9c xss-labs level 1 修改 url 地址中的name<script>alert(1)</script>&#xff0c;便可以通关 level 2 在搜索框中输入的 JS 代码无法执行 …

leetcode704--数组元素查找--二分法查找算法

第一部分---数组的基础知识介绍&#xff1a; 1.数组的定义&#xff1a;数组是存放在连续内存空间上的相同数据类型的数据的集合&#xff1b; 2.数组可以通过下标索引的方式获取到下标对应的数据&#xff1b; 3.数组下标是从0开始的&#xff0c;数组的内存空间地址是连续的&a…

用户体验设计师如何在 2024抢占先机?

01. 严峻的经济形势和就业市场 我们生活在一个通货膨胀的时代。就从超市抓几个苹果、卷心菜、鸡蛋&#xff0c;看看价格吧&#xff01;我不得不多次检查收据&#xff0c;因为我简直不敢相信。外出就餐费用上涨了 10-20%&#xff0c;现在 Spotify 和 YouTube 要求收取更高的订阅…

【达梦数据库】undo_retention 测试

最近遇见如下问题 【问题背景】备库抽数报错回滚记录版本太旧&#xff0c;涉及表为日志表&#xff08;含有两个text列&#xff09;&#xff0c;主库undo_retention90 备库undo_retention3600&#xff0c;主库每分钟有这个表的写入&#xff08;insert update&#xff09;&#x…

【力扣hot100】刷题笔记Day13

前言 元宵节快乐 ~ 周六在图书馆快乐刷题&#xff01;继续二叉树&#x1f374; 543. 二叉树的直径 - 力扣&#xff08;LeetCode&#xff09; 递归后序 class Solution:def diameterOfBinaryTree(self, root: Optional[TreeNode]) -> int:self.res 0 # 记录最长路径# 递归…

使用python写一个修改execl表格的脚本

使用的前提记得安装python的环境。 import os import pandas as pddef listdir(path): #传入根目录file_list []for file in os.listdir(path):file_path os.path.join(path, file) #获取绝对路径if os.path.isdir(file_path): #如果还是文件夹&#xff0c;就继续迭代本函数…

GPT-4:开启人工智能新纪元的多功能引擎

随着人工智能技术的飞速发展&#xff0c;GPT-4&#xff08;Generative Pre-trained Transformer 4&#xff09;作为最新一代的语言模型&#xff0c;已经成为了推动科技创新、优化人机交互体验的关键力量。本文将深入探讨GPT-4的核心作用&#xff0c;以及它如何在各个领域中展现…

STM32通用定时器输入捕获

通用定时器输入捕获部分框图介绍 通用定时器输入捕获脉宽测量原理 要测量脉宽的高电平的时间&#xff1a;t2-t1&#xff08;脉宽下降沿时间点-脉宽上升沿时间点&#xff09; 假设&#xff1a;递增计数模式 ARR&#xff1a;自动重装载寄存器的值 CCRx1&#xff1a;t1时间点CCRx…

2024最新可用免费天气预报API接口

天气API接口数据, 数据字段最全&#xff0c;免费&#xff0c;稳定的实况天气预报接口 5分钟左右更新一次&#xff0c;支持全国3000多个市区县, 包含基本天气信息、24小时逐小时天气、气象预警列表、湿度、能见度、气压、降雨量、紫外线、风力风向风速、日出日落、空气质量、pm2…

细粒度目标检测问题剖析

问题剖析 相对于一般目标检测任务&#xff0c;细粒度目标更容易出现类内差异大、类间差异小等现象。 所谓细粒度目标识别&#xff0c;是指在目标检测的基础上&#xff0c;识别出目标的具体型号与类别&#xff0c;例如不只识别出飞机目标&#xff0c;还能识别出飞机型号。粗粒…

洛谷P1106题解

题目描述 键盘输入一个高精度的正整数 N&#xff08;不超过 250 位&#xff09;&#xff0c;去掉其中任意 k 个数字后剩下的数字按原左右次序将组成一个新的非负整数。编程对给定的 N 和 k&#xff0c;寻找一种方案使得剩下的数字组成的新数最小。 输入格式 输入两行正整数。…

Google炸场,推出开“放”可商用的大语言模型Gemma!超级轻量,个人电脑即可运行

与OpenAI的封闭式大型模型不同&#xff0c;谷歌、Meta等科技巨头正致力于开发开源模型&#xff0c;以期实现技术上的快速追赶。 介绍 2月21日&#xff0c;谷歌发布了其最新一代的开源AI模型——Gemma&#xff08;https://ai.google.dev/gemma&#xff09;&#xff0c;这是一个…

2024.2.29 模拟实现 RabbitMQ —— 项目展示

目录 项目介绍 核心功能 核心技术 演示直接交换机 演示扇出交换机 演示主题交换机 项目介绍 此处我们模拟 RabbitMQ 实现了一个消息队列服务器 核心功能 提供了 虚拟主机、交换机、队列、绑定、消息 概念的管理九大核心 API 创建队列、销毁队列、创建交换机、销毁交换机、…

【LeetCode】升级打怪之路 Day 06:哈希表的应用

今日题目&#xff1a; 349. 两个数组的交集 | LeetCode202. 快乐数 | LeetCode1. 两数之和 | LeetCode205. 同构字符串 | LeetCode599. 两个列表的最小索引总和 | LeetCode 目录 应用 1 -- 哈希集合LeetCode 349. 两个数组的交集 【easy】LeetCode 202. 快乐数 【小技巧】 应用…

基于JSP的毕业设计选题系统的设计与实现

基于JSP的毕业设计选题系统的设计与实现 (源代码论文) A. 项目简介 毕业设计选题系统就是能够使学生通过互联网完成毕业设计课题的选定&#xff0c;它采用Web方式&#xff0c;同时适用于局域网和Internet&#xff0c;它要实现审核&#xff0c;权限管理&#xff0c;邮件通知…

Python中的atexit模块:优雅地处理程序退出

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站AI学习网站。 目录 前言 atexit模块概述 atexit模块的基本用法 示例代码&#xff1a;文件操作时的应用场景 典型应用场景 1 资源释放…

【Java】Deque接口与List接口中的remove方法

Deque接口与List接口中的remove方法的区别 太长不看系列&#xff1a; Deque的remove(Object o)。指定的是元素&#xff0c;List的remove(int index)&#xff0c;指定的是索引。 在刷力扣113.路径总和 II 时使用Deque的remove方法出现错误&#xff0c;记录一下原因和理清相关概念…

云里物里轻薄系列电子价签,如何革新零售?

云里物里的DS轻薄系列电子价签&#xff0c;凭借轻巧外观和强劲性能&#xff0c;为零售行业提供了更便捷的商品改价方案。这不仅是对纸质价标的替代&#xff0c;更以其安全性和可持续发展性&#xff0c;实现对零售行业的效率升级&#xff0c;让商家们轻松迎接数字化时代的挑战&a…

【Vue3】学习watch监视:深入了解Vue3响应式系统的核心功能(下)

&#x1f497;&#x1f497;&#x1f497;欢迎来到我的博客&#xff0c;你将找到有关如何使用技术解决问题的文章&#xff0c;也会找到某个技术的学习路线。无论你是何种职业&#xff0c;我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章&#xff0c;也欢…