【初始RabbitMQ】持久化的实现

RabbitMQ持久化

如何保障当 RabbitMQ 服务停掉以后消 息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化

队列如何实现持久化

之前我们创建的队列都是非持久化的,RabbitMQ如果重启的话,该队列就会被删除,如果要实现队列持久化,需要在声明队列的时候把durable参数设置持久化

 但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新 创建一个持久化的队列,不然就会出现错误

以下是控制台中持久化与非持久化队列的UI显示区,当Features列显示为D代表是持久化队列

 

消息实现持久化

要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性

生产者代码:

/*
* 消息再手动应答不丢失、放回消息队列重新消费*/
public class Task2 {//队列名称public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();/**生成一个队列* 1.队列名称* 2.队列里面的信息是否持久化(磁盘)默认情况时在内存* 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许* 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除* 5.其他参数 延迟消息等*/channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);//从控制台中输入信息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println("生产者发出消息:"+message);}}
}

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了

不公平分发

在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是 很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间 处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。为了避免这种情况,我们可以设置参数 channel.basicQos(1),将想要不公平分发的消费者设置该参数

消费者01代码如下:

public class Work01 {private static final String ACK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1等待接收消息短");//消息消费的时候如何处置消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody());SleepUtils.sleep(1);System.out.println("接收到消息:"+message);/*** 1.消息标记tag* 2.是否批量应答未应答的消息*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};//取消消息的回调CancelCallback cancelCallback = consumerTag->{System.out.println("消息消费被中断");};/*** 消费者信息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true自动应答 false手动应答* 3.消费者微车才能更改消费的回调* 4.消费者取消消费回调*/boolean autoAck = false;//设置不公平分发channel.basicQos(1);channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

消费者02代码如下:

public class Work02 {private static final String ACK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();System.out.println("C2等待接收消息长");//消息消费的时候如何处置消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody());SleepUtils.sleep(30);System.out.println("接收到消息:"+message);/*** 1.消息标记tag* 2.是否批量应答未应答的消息*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};//取消消息的回调CancelCallback cancelCallback = consumerTag->{System.out.println("消息消费被中断");};/*** 消费者信息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true自动应答 false手动应答* 3.消费者微车才能更改消费的回调* 4.消费者取消消费回调*/boolean autoAck = false;//设置不公平分发channel.basicQos(1);channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

 

效果展示:

预取值

本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费 者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此 缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有 未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何 消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知 这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高 向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理 的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理 的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的 内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范 围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这 将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境 中。对于大多数应用来说,稍微高一点的值将是最佳的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/688842.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java学习day13

流(Stream) 流是一个非常强大的概念,它提供了一种高效且便捷的方式来处理集合数据。你可以将流看作是一系列数据项的管道,你可以对这些数据进行各种操作,如过滤、映射、排序和归约。 流的创建 在Java中,…

nginx upstream server主动健康检测模块添加https检测功能[完整版]

目录 1 缘起1.1 功能定义2. 实现后的效果2.1 配置文件2.2 运行效果3. 代码实现3.1 配置指令3.1.1 配置指令定义:3.1.2 配置指令结构体:3.1.3 配置指令源码定义:3.2 模块的初始化3.3 添加新的健康检测类型的定义3.4 握手完成后的处理3. 5 发送http请求3.6 接收http响应3.7 连…

Linux中精简卷对Oracle的影响

1.精简卷介绍 redhat官网详细介绍: https://access.redhat.com/documentation/zh-cn/red_hat_enterprise_linux/8/html/configuring_and_managing_logical_volumes/creating-and-managing-thin-provisioned-volumes_configuring-and-managing-logical-volumes 2.…

GPT升级信息:能记住用户的喜好和习惯!

OpenAI刚刚宣布了ChatGPT的一项激动人心的更新! OpenAI在ChatGPT中新加了记忆功能和用户控制选项,这意味着GPT能够在与用户的互动中记住之前的对话内容,并利用这些信息在后续的交谈中提供更加相关和定制化的回答。 这一功能目前正处于测试阶段…

mysql宕机了怎么恢复数据

检查 MySQL 服务器状态:首先,确认 MySQL 服务器是否真的宕机。可以尝试连接 MySQL 服务器并执行一些简单的查询,例如 SELECT 1;。如果连接失败或查询无响应,那么可能是 MySQL 服务器宕机。 检查错误日志:在 MySQL 服务…

Shiro-11-web 介绍

配置 将Shiro集成到任何web应用程序的最简单方法是在web.xml中配置一个Servlet ContextListener和过滤器,该Servlet了解如何读取Shiro的INI配置。 INI配置格式本身的大部分是在配置页面的INI部分中定义的,但是我们将在这里介绍一些额外的特定于web的部…

【leetcode刷题之路】面试经典150题(2)——双指针+滑动窗口+矩阵

文章目录 2 双指针2.1 【双指针】验证回文串2.2 【双指针】判断子序列2.3 【双指针】两数之和 II - 输入有序数组2.4 【双指针】盛最多水的容器2.5 【双指针】三数之和 3 滑动窗口3.1 【双指针】长度最小的子数组3.2 【滑动窗口】无重复字符的最长子串3.3 【哈希表】串联所有单…

HTML板块左右排列布局——左侧 DIV 固定宽度,右侧 DIV 自适应宽度,填充满剩余页面

我们可以借助CSS中的 float 属性来实现。 实例&#xff1a; 布局需求&#xff1a; 左侧 DIV 固定宽度&#xff0c;右侧 DIV 自适应宽度&#xff0c;填充满剩余页面。 <!DOCTYPE html> <html><head><meta charset"UTF-8"><meta http-e…

介绍如何解决msvcr120.dll丢失问题,msvcp120.dll常见问题的解答

msvcr120.dll是Windows操作系统中的一个重要的动态链接库文件&#xff0c;它包含了许多与C运行库相关的函数。然而&#xff0c;有时用户在运行某些应用程序或游戏时可能会收到"msvcr120.dll丢失"的错误提示。本文将介绍如何解决msvcr120.dll丢失问题&#xff0c;并提…

【c语言】c语言转义字符详解

&#x1f388;个人主页&#xff1a;豌豆射手^ &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;c语言 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进步&…

prometheus基于consul的服务发现

文章目录 一、基础二、安装consul下载地址启动consul访问consul 三、编写服务发现文件nodes.json四、prometheus配置consul发现修改prometheus.yml重启Prometheus 参考 一、基础 二、安装consul 下载地址 https://developer.hashicorp.com/consul/install 启动consul mkdi…

HCIP-MGRE实验配置、PPP的PAP认证与CHAP认证、MGRE、GRE网络搭建、NAT

实验要求 R5为ISP,只能进行IP地址配素&#xff0c;其所有地址均为公有IP地址R1和R5间使用PPP的PAP认证&#xff0c;R5为主认证方 R2与R5之间使用PPP的chap认证&#xff0c;R5为主认证方 R3与R5之间使用HDLC封装。R1/R2/R3构建一个MGRE环境&#xff0c;R1为中心站点;R1、R4间为…

Android 13.0 SystemUI下拉状态栏定制二 锁屏页面横竖屏通知栏都居中功能实现

1.前言 在13.0的系统rom定制化开发中,在关于systemui的锁屏页面功能定制中,由于在平板横屏通知栏功能中,通知栏总是显示在右边,并且是在右边居中显示的, 由于需要和竖屏显示一样,所以就需要用到在时钟下面显示通知栏,然后同样需要居中显示通知栏,所以就来分析下相关的…

前端vue金额用逗号分隔

实现效果 代码 template部分 <el-input v-model"state.val"></el-input><div>{{ priceFor(state.val) }}</div> js部分 const state reactive({ val: });const priceFor (val)> {if(!val){return }else if(val.length<4){return…

Z分数标准化

Z分数标准化是一种常用的数据标准化方法&#xff0c;用于将不同数据集的值转换为具有相同比例和零均值、标准差为1的标准正态分布。这种标准化方法对于机器学习和统计分析中的特征缩放和数据预处理非常有用。 标准化的步骤如下&#xff1a; 计算均值和标准差&#xff1a; 对于…

关于电子/硬件试制报告(精简实用版)的一些讨论

TOC 1. 源由 从产品研发的角度&#xff0c;都有最初的工程试制阶段。这个阶段最终一定会有一份试制报告。 当然&#xff0c;整个试制报告涉及方方面面内容。通常电子行业&#xff0c;试制主要是两个方面&#xff1a; 电子/硬件试制&#xff1a;侧重在PCBA等方面结构/机械试…

进程链信任-父进程欺骗

文章目录 前记普通权限的父进程欺骗ShllCode上线进程提权基础进程提权注入 前记 父进程欺骗作用&#xff1a; 进程链信任免杀进程提权 检测&#xff1a; etw 普通权限的父进程欺骗 #include<stdio.h> #include<windows.h> #include <TlHelp32.h>DWORD …

elementui 中el-date-picker 选择年后输出的是Wed Jan 01 2025 00:00:00 GMT+0800 (中国标准时间)

文章目录 问题分析 问题 在使用 el-date-picker 做只选择年份的控制器时&#xff0c;出现如下问题&#xff1a;el-date-picker选择年后输出的是Wed Jan 01 2025 00:00:00 GMT0800 (中国标准时间)&#xff0c;输出了两次如下 分析 在 el-date-picker 中&#xff0c;我们使用…

【51单片机】如何设置中断函数(场景:在定时器工作完跳转到中断程序时,怎么识别我们的中断程序在哪里呢?)

前言 大家好吖&#xff0c;欢迎来到 YY 滴单片机系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过单片机的老铁 本章是【利用定时器和中断实现一个简单项目】中的一部分&#xff0c;感兴趣的老铁可以跳转传送门查看传送门 欢迎订阅 YY滴C专栏&#xff01;更多干货…

CI/CD部署

什么是CI&#xff0c;什么是CD CI和CD是软件开发中持续集成和持续交付的缩写。 CI代表持续集成&#xff08;Continuous Integration&#xff09;&#xff0c;是一种实践&#xff0c;旨在通过自动化构建、测试和代码静态分析等过程&#xff0c;频繁地将代码变更合并到共享存储…