【消息中间件】Rabbitmq消息可靠性、持久化机制、各种消费

原文作者:我辈李想
版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。


文章目录

  • 前言
  • 一、常见用法
    • 1.消息可靠性
    • 2.持久化机制
    • 3.消息积压
      • 批量消费:增加 prefetch 的数量,提高单次连接的消息数
      • 并发消费:多部署几台消费者实例
    • 4.重复消费
  • 二、其他
    • 1.队列存在大量unacked数据
    • 2.重试连接:建立连接
    • 3.rabbitmq心跳连接
    • 4.重试连接:消费ack确认前连接异常断开时


前言


一、常见用法

1.消息可靠性

RabbitMQ 提供了多种机制来确保消息的可靠性,以防止消息丢失或被意外删除。以下是几种提高消息可靠性的方法:

  1. 持久化消息(Durable Message):在发布消息时,将消息的 deliveryMode 设置为 2,即可将消息设置为持久化消息。持久化消息会将消息写入磁盘,即使 RabbitMQ 服务器重启,消息也不会丢失。

  2. 持久化队列(Durable Queue):创建队列时,将队列的 durable 参数设置为 true,即可创建一个持久化队列。持久化队列会将队列的元数据和消息都存储在磁盘上,即使消息队列服务器重启,队列的元数据和消息仍然可以恢复。

  3. 确认模式(Publisher Confirms):使用确认模式可以确保消息被成功发送到 RabbitMQ 服务器,并得到确认。通过在信道上使用 channel.confirmSelect() 启用确认模式,然后通过 channel.waitForConfirms() 方法来等待服务器的确认。

  4. 事务模式(Transactions):使用事务模式可以保证消息的原子性,要么全部发送成功,要么全部失败。通过在信道上使用 channel.txSelect() 开启事务模式,在发送消息后使用 channel.txCommit() 提交事务,或使用 channel.txRollback() 进行回滚。

  5. 消费者应答(Consumer Acknowledgement):在消费者接收和处理消息后,必须发送确认应答给 RabbitMQ 服务器。通过使用 channel.basicAck() 方法发送确认应答,以告知服务器消息已经成功处理。

通过使用上述机制,可以在 RabbitMQ 中实现消息的可靠性传输和处理,以防止消息的丢失和重复传递。
这里有篇博客,大家可以看看。

2.持久化机制

在RabbitMQ中,消息持久化是一种机制,可以确保消息在服务器宕机或重启之后不丢失。默认情况下,RabbitMQ的消息是存储在内存中的,如果服务器宕机,则会导致消息的丢失。要实现消息的持久化,可以采取以下步骤:

  1. 创建一个持久化的交换机(Exchange):
    在定义交换机时,将其durable参数设置为true,例如:

    channel.exchangeDeclare("exchange_name", "direct", true);
    
  2. 创建一个持久化的队列(Queue):
    在定义队列时,将其durable参数设置为true,例如:

    channel.queueDeclare("queue_name", true, false, false, null);
    
  3. 将持久化的队列与交换机进行绑定:
    使用队列和交换机的bind方法进行绑定,例如:

    channel.queueBind("queue_name", "exchange_name", "routing_key");
    
  4. 发布持久化的消息:
    在发布消息时,将消息的deliveryMode属性设置为2,表示消息是持久化的,例如:

    String message = "Hello RabbitMQ!";
    channel.basicPublish("exchange_name", "routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    

通过以上步骤,就可以实现消息的持久化。当RabbitMQ服务器宕机或重启后,消息会被保存在磁盘中,并在服务器恢复后重新投递给消费者。需要注意的是,虽然消息被持久化了,但是在发送到队列之前,仍然有可能发生丢失,所以在实际的应用中,还需要考虑一些因素,比如网络故障、消费者的可靠性等。

3.消息积压

批量消费:增加 prefetch 的数量,提高单次连接的消息数

为了提高消费性能,可以将多个消息批量进行消费,减少消费者和消息队列的交互次数。通过设置合适的批量消费大小,可以在一次网络往返中消费多个消息,从而提高消费性能。
要实现RabbitMQ的批量消费,可以使用RabbitMQ的channel.basicQos方法来设置每次消费的消息数量。以下是一个示例代码,演示如何实现批量消费:

import pikadef callback(ch, method, properties, body):print("Received message: %s" % body)# 处理消息的逻辑# 发送确认给RabbitMQch.basic_ack(delivery_tag=method.delivery_tag)def consume_messages():connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 设置每个消费者一次性获取的消息数量channel.basic_qos(prefetch_count=10)# 注册消费者并开始消费消息channel.basic_consume(queue='my_queue', on_message_callback=callback)# 进入一个循环,一直等待消息的到来channel.start_consuming()consume_messages()

在这里插入图片描述

在上面的代码中,我们通过channel.basic_qos(prefetch_count=10)设置每次处理的消息数量为10。这样,在消费者处理完10条消息之前,RabbitMQ将不会再向其发送更多消息。

这样,就实现了RabbitMQ的批量消费。你可以根据需求,在basic_qos方法中设置适合你的消息数量。

并发消费:多部署几台消费者实例

可以采用多线程或多进程的方式进行消息的并发消费,将多个消费者并行处理消息。通过增加并发消费者的数量,可以提高消息的处理速度,提高消费的性能。
使用进程池来消费RabbitMQ的消息可以更好地管理并发性能。通过使用进程池,可以在一个固定的池子中创建多个进程,并且复用它们来消费消息,从而减少进程创建和销毁的开销。

以下是一个使用进程池消费RabbitMQ消息的示例:

import multiprocessing
import os
import time
import pikadef consumer(queue_name):connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue=queue_name)def callback(ch, method, properties, body):print(f'Process {os.getpid()} received message: {body}')time.sleep(1)channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()def main():# 创建进程池pool = multiprocessing.Pool(processes=5)# 在进程池中提交任务for _ in range(5):pool.apply_async(consumer, ('my_queue',))pool.close()pool.join()if __name__ == '__main__':main()

在上述示例中,我们使用multiprocessing.Pool来创建一个包含5个进程的进程池。然后,我们使用apply_async方法向进程池中提交任务,每个任务都是调用consumer函数来消费"my_queue"队列中的消息。进程池会自动分配任务给闲置的进程来处理。通过closejoin方法,我们可以确保所有任务都被完成。

4.重复消费

  1. 消息确认:在消费者处理完一条消息后,通过调用basic_ack方法手动确认消息已经成功消费。这样,RabbitMQ就会将该消息标记为已经处理,不会再次发送给其他消费者。同时,还可以设置auto_ack参数为False,禁用自动消息确认机制,以确保消息被正确确认。

  2. 消息持久化:可以通过设置消息的delivery_mode属性为2来将消息标记为持久化消息。这样,即使消费者在处理消息时发生故障,消息也会被保存在磁盘上,待消费者恢复正常后会重新投递。

  3. 唯一消费者:可以通过设置队列的exclusive参数为True,创建一个排他队列。这样,只有一个消费者可以连接到该队列,并独占地消费其中的消息,避免重复消费。

  4. 消息去重:在消费者端可以维护一个已消费消息的记录,例如在数据库或缓存中记录已消费的消息的ID或唯一标识。每次消费消息时,先检查记录中是否已经存在该消息,如果存在则跳过,避免重复处理。

  5. 幂等操作:在消费者的处理逻辑中,要确保操作是幂等的,即多次执行同一个操作的效果和执行一次的效果是一样的。这样,即使消息被重复消费,也不会产生副作用。

二、其他

1.队列存在大量unacked数据

通过rabbitmq的后台管理,进入相应的队列,滑到最下边,找到purge。purge将清空这个队列的消息。
在这里插入图片描述

2.重试连接:建立连接

import pika
from retry import retry@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))def consume(self, callback):"""Start consuming AMQP messages in the current process"""try:self.start_consuming_message()except ConnectionClosed as e:self.clear()self.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)except ChannelClosed as e:self.clear()self.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)finally:self.start_consuming_message()

3.rabbitmq心跳连接

RabbitMQ 心跳是一种保持连接活跃的机制。当 RabbitMQ 与客户端建立连接后,它会定期发送心跳包来确认连接仍然有效。如果在一段时间内没有收到心跳回复,RabbitMQ 将会关闭连接。心跳属于ConnectionParameters参数heartbeat,我理解是应该用于生产者,确保能够成功发送消息,如果消费者中设置了heartbeat,一定要大于消费程序的处理时间,保证消费期间结束后,可以响应心跳。

parameters = pika.ConnectionParameters(host, int(port), '/', credentials=userx, heartbeat=int(heartbeat))

如果消费者使用心跳,还可以参考这个博客

4.重试连接:消费ack确认前连接异常断开时

这个需要开启链接断开的重试,属于ConnectionParameters的retry_delay和connection_attempts参数。rabbitmq重启,消费者中使用heartbeat时间不足以覆盖消费时间。

connectionParameters = pika.ConnectionParameters(host='localhost',virtual_host=5672,credentials=credentials,socket_timeout=10,heartbeat=0,retry_delay=10, # 连接尝试重连间隔connection_attempts=10, # 连接尝试次数
)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/586049.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

门诊病历系统教程,社区诊所电子处方系统软件操作教程

一、软件程序问答 门诊病历系统教程,社区诊所电子处方系统软件操作教程 1、电子处方软件在开处方时候,可以一键导入模板吗? 如下图,软件以 佳易王诊所电子处方软件V17.1为例说明 软件右侧点击 配方模板,只需输入症…

从零开始学Python系列课程第17课:容器型数据类型之列表(上)

前言 列表算是 Python 中比较常用的一种容器型数据类型,那么什么是列表,列表有什么样的作用致使它在 Python 中这么受欢迎呢?这便是接下来我们要一起讨论的问题。 在不久之前我们讲过变量,我们将数据使用变量保存,但是…

08.哲说建造者模式(Builder Pattern)

“The odds that we’re in ‘base reality’ is one in billions.” —— Elon Musk 这段话出自马斯克在2016年的一次演讲,“人类活在真实世界的几率,可能不到十亿分之一”。此言一出,可谓一石激起千层浪。有人嘲讽马斯克是“语不惊人死不休…

[2024] 十大免费电脑数据恢复软件——轻松恢复电脑上已删除文件

哈喽大家好!你有没有需要适用于电脑的免费数据恢复软件呢?数据丢失可是个烦心事,无论是硬件故障还是软件损坏,甚至是意外删除、格式化或计算机病毒,都让人郁闷至极。当你遇到数据丢失的情况时,你一定希望能…

rollup 源码解析 - watch 监听

文章目录 rollup watch 实现流程watchWatchEmitter 实现 watchInternalWatcher 管理整个 watch 阶段Task 运行任务FileWatcher 实现文件监听 rollup watch 实现流程 每一个配置了 watch 的配置项都会变成一个 Task 任务,每个任务通过 FileWatcher 即 chokidar 进行…

elasticsearch 索引数据多了怎么办,如何调优,部署

当Elasticsearch索引的数据量增加时,可能会遇到性能瓶颈,需要进行调优。以下是一些建议和步骤,可帮助你处理数据量增加的情况: 1. 硬件升级: 增加节点数或升级硬件,包括更快的CPU、更大的内存和更快的存储…

【Git】Git的基本操作

前言 Git是当前最主流的版本管理器,它可以控制电脑上的所有格式的文件。 它对于开发人员,可以管理项目中的源代码文档。(可以记录不同提交的修改细节,并且任意跳转版本) 本篇博客基于最近对Git的学习,简单介…

JavaScript 基础学习笔记(二):数据类型、数值类型、字符串类型、布尔类型、undefined、类型转换、隐式转换、显式转换、Number

目录 一、数据类型 1.1 数值类型 1.2 字符串类型 1.3 布尔类型 1.4 undefined 二、类型转换 2.1 隐式转换 2.2 显式转换 2.3 Number 一、数据类型 计算机程序可以处理大量的数据,为了方便数据的管理,将数据分成了不同的类型: 注&a…

Python中的用户交互函数详解,提升用户体验!

更多Python学习内容:ipengtao.com 用户进行交互的Python应用程序,有许多常用的用户交互函数可以帮助创建更具吸引力和友好的用户界面。本文将介绍一些常用的Python用户交互函数,并提供详细的示例代码,以帮助大家更好地理解它们的用…

右键添加 idea 打开功能

1.开始运行regedit 2.找到: HKEY_LOCAL_MACHINE\SOFTWARE\Classes\Directory\shell _3.开始设置 一、右键shell目录新建项Idea二、右键Idea新建command三、选择Idea 右侧空白出新建字符串 名字为Icon 值填入idea的运行程序地址 四、选择command 默认项填入idea的运行程序地址…

技术探秘:在RISC Zero中验证FHE——RISC Zero应用的DevOps(2)

1. 引言 前序博客: 技术探秘:在RISC Zero中验证FHE——由隐藏到证明:FHE验证的ZK路径(1) 技术探秘:在RISC Zero中验证FHE——由隐藏到证明:FHE验证的ZK路径(1) 中&…

项目中cesium使用方法

cesium方法整理 一、安装依赖 // 安装cesium npm install cesium --save // 安装turf工具 npm install truf/turf --save // 安装cesium vite插件 npm install vite-plugin-cesim --save 二、项目中引用 import * as Cesium from cesium import cesium/Build/Cesium/Widget…

【privateGPT】使用privateGPT训练您自己的LLM

了解如何在不向提供商公开您的私人数据的情况下训练您自己的语言模型 使用OpenAI的ChatGPT等公共人工智能服务的主要担忧之一是将您的私人数据暴露给提供商的风险。对于商业用途,这仍然是考虑采用人工智能技术的公司最大的担忧。 很多时候,你想创建自己…

【GOLANG】使用插件 Goanno 的方式来对方法、接口、结构体注释模板配置

直接 使用插件 Goanno 的方式来对方法、接口、结构体注释模板配置 1、简单安装 Goanno 插件 File->Settings->Plugins , 搜索 Goanno Normal Method 配置内容如下: // Title ${function_name} // Description ${todo} // Author mumu ${date} ${time} // Par…

技能分析:这就是人们写在简历上的内容

您希望您的技能部分听起来像其他人一样吗?另一方面,您是否想遗漏一项顶级技能,因为许多其他简历也列出了它?在脱颖而出和涵盖雇主寻求的所有技能之间找到平衡可能是一个挑战。 优秀的简历技能部分会考虑到每个雇主所寻求的特质。…

海云安亮相2023北京国际金融安全论坛,助力金融企业数字化转型降本增效

近日,2023北京国际金融安全论坛暨金融科技标准认证生态大会在北京金融安全产业园成功举办。深圳海云安网络安全技术有限公司(以下简称“海云安”)受邀参展亮相此次大会。海云安作为国内领先的金融科技服务商,展示了开发安全系列产…

C#中的集合

一、集合的概念 数组可以保存多个对象,但在某些情况下无法确定到底需要保存多少个对象,由于数组的长度不可变,因此数组将不再适用。 如何保存数目不确定的对象呢? 为了保存这些数目不确定的对象,C#中提供了一系列特殊…

Unity坦克大战开发全流程——开始场景——排行榜数据逻辑

开始场景——排行榜数据逻辑 排行榜单条数据 排行榜列表 然后在数据管理类中声明一个对应的字段 初始化数据 然后再在上一节课所编写的UpdatePanelInfo函数中处理数据更新的逻辑 时间换算算法 然后再在数据管理类中编写一个在排行榜中添加数据的方法以提供给外部 直到当前RankI…

【BERT】深入理解BERT模型1——模型整体架构介绍

前言 BERT出自论文:《BERT:Pre-training of Deep Bidirectional Transformers for Language Understanding》 2019年 近年来,在自然语言处理领域,BERT模型受到了极为广泛的关注,很多模型中都用到了BERT-base或者是BE…

搜索算法和推荐算法、广告算法的区别

广告和推荐算法的技术框架比较相似,在线计算时都分为召回(candidates generating)和排序(candidates ranking)两个阶段(这似乎是计算资源有限条件下,所有检索问题的通用架构)。 在某…