【大数据测试之:RabbitMQ消息列队测试-发送、接收、持久化、确认、重试、死信队列并处理消息的并发消费、负载均衡、监控等】详细教程---保姆级

RabbitMQ消息列队测试教程

  • 一、环境准备
    • 1. 安装 RabbitMQ
    • 2. 安装 Python 依赖
  • 二、基本消息队列中间件实现
    • 1. 消息发送模块
    • 2. 消息接收模块
  • 三、扩展功能
    • 1. 消息持久化和队列持久化
    • 2. 消息优先级
    • 3. 死信队列(DLQ)
  • 四、并发处理和负载均衡
    • 1. 使用 Python 的 `threading` 模块启动多个消费者
  • 五、消息确认与重试机制
    • 1. 消息确认
    • 2. 重试机制
  • 六、监控与日志记录
    • 1. 日志记录
  • 七、测试步骤

一、环境准备

1. 安装 RabbitMQ

你可以在本地或通过 Docker 启动 RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

默认用户名和密码为 guest。你可以通过访问 http://localhost:15672 来管理 RabbitMQ。

2. 安装 Python 依赖

我们将使用 pika 库与 RabbitMQ 进行通信。安装 pika

pip install pika

二、基本消息队列中间件实现

我们首先构建一个简单的消息队列发送和接收框架,确保消息能够成功传递。

1. 消息发送模块

send_message.py 用于将消息发送到队列:

import pika# 连接到RabbitMQ服务器并发送消息
def send_to_queue(queue_name, message):try:# 建立连接connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 确保队列存在channel.queue_declare(queue=queue_name, durable=True)# 发送消息到队列channel.basic_publish(exchange='',routing_key=queue_name,body=message,properties=pika.BasicProperties(delivery_mode=2,  # 持久化消息))print(f"Sent: {message}")except Exception as e:print(f"Error sending message: {e}")finally:# 关闭连接connection.close()if __name__ == "__main__":send_to_queue('task_queue', 'Hello, RabbitMQ!')

2. 消息接收模块

receive_message.py 用于从队列接收消息并进行处理:

import pika# 连接到RabbitMQ服务器并接收消息
def receive_from_queue(queue_name):try:# 建立连接connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 确保队列存在channel.queue_declare(queue=queue_name, durable=True)# 定义回调函数来处理接收到的消息def callback(ch, method, properties, body):print(f"Received: {body.decode()}")ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认消息已处理# 开始消费消息channel.basic_consume(queue=queue_name, on_message_callback=callback)print(f"Waiting for messages in {queue_name}...")channel.start_consuming()except Exception as e:print(f"Error receiving message: {e}")if __name__ == "__main__":receive_from_queue('task_queue')

三、扩展功能

1. 消息持久化和队列持久化

为了保证消息在 RabbitMQ 重启后不会丢失,我们需要确保队列和消息都被持久化。

  • send_message.py 中,使用 delivery_mode=2 使消息持久化:

    properties=pika.BasicProperties(delivery_mode=2  # 持久化消息
    )
    
  • receive_message.py 中,确保队列是持久化的:

    channel.queue_declare(queue=queue_name, durable=True)
    

2. 消息优先级

RabbitMQ 支持设置消息的优先级,这可以用来保证高优先级的消息先被处理。你可以在声明队列时设置最大优先级:

channel.queue_declare(queue=queue_name, durable=True, arguments={'x-max-priority': 10})

然后,在发送消息时,你可以指定消息的优先级:

channel.basic_publish(exchange='',routing_key=queue_name,body=message,properties=pika.BasicProperties(priority=5  # 设置消息优先级)
)

3. 死信队列(DLQ)

当消息消费失败或超时,可以将消息发送到死信队列。定义死信队列和交换机的方式如下:

channel.queue_declare(queue=queue_name,durable=True,arguments={'x-dead-letter-exchange': 'dlx_exchange','x-message-ttl': 10000  # 设置消息的超时时间}
)

然后,在死信队列中进行消息处理。

四、并发处理和负载均衡

RabbitMQ 能够支持多个消费者并行消费队列中的消息。你可以通过创建多个消费者来实现并发消费。

1. 使用 Python 的 threading 模块启动多个消费者

import threadingdef start_consumer():receive_from_queue('task_queue')# 启动5个消费者并行处理消息
for _ in range(5):threading.Thread(target=start_consumer).start()

RabbitMQ 会自动将消息分发到不同的消费者,实现负载均衡。

五、消息确认与重试机制

如果消息处理失败,可以使用 手动确认 来重试消息。

1. 消息确认

在消费者中,使用 ch.basic_ack 来确认消息已被处理:

ch.basic_ack(delivery_tag=method.delivery_tag)

如果消息处理失败,可以拒绝消息并重新排队:

ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

2. 重试机制

可以设置 死信队列 来处理失败的消息,或者使用延迟队列来定时重试消息。

channel.queue_declare(queue=retry_queue,durable=True,arguments={'x-dead-letter-exchange': 'retry_exchange','x-message-ttl': 5000  # 设置消息的超时时间}
)

六、监控与日志记录

为了有效地监控消息队列的运行情况,建议启用 RabbitMQ 的 管理插件

  • 你可以访问 RabbitMQ 管理界面:http://localhost:15672,查看队列、交换机和消费者的状态。

1. 日志记录

在生产环境中,使用 Python 的 logging 模块进行日志记录:

import logginglogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)def callback(ch, method, properties, body):try:logger.info(f"Received message: {body.decode()}")ch.basic_ack(delivery_tag=method.delivery_tag)except Exception as e:logger.error(f"Error processing message: {e}")ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

七、测试步骤

  1. 发送测试消息:使用 send_message.py 向队列发送消息。
  2. 接收测试消息:使用 receive_message.py 接收消息并确认消息是否被正确消费。
  3. 消息持久化测试:重启 RabbitMQ,确保消息不会丢失。
  4. 并发消费测试:启动多个消费者,并测试消息是否能够均匀分发。
  5. 消息重试测试:模拟消费失败,并测试死信队列和消息重试机制是否有效。

推荐阅读:《大数据测试专栏》,《Docker实践详解》


下期预告:

  • 《大数据测试之:RabbitMQ消息测试-消息加密和认证+不同消息列队测试+处理复杂的消息路由及交换机配置》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/62269.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DAMODEL丹摩|部署FLUX.1+ComfyUI实战教程

本文仅做测评体验,非广告。 文章目录 1. FLUX.1简介2. 实战2. 1 创建资源2. 1 ComfyUI的部署操作2. 3 部署FLUX.1 3. 测试5. 释放资源4. 结语 1. FLUX.1简介 FLUX.1是由黑森林实验室(Black Forest Labs)开发的开源AI图像生成模型。它拥有12…

具体的技术和工具在县级融媒体建设3.0中有哪些应用?

以下是结合数据来看县级融媒体建设3.0的一些情况: 技术应用方面 大数据:人民网舆情数据中心执行主任董盟君提到,通过大数据分析可让融媒体单位快速关注聚焦点,实现智能策划、智能推送、智能传播,推动媒体传播影响力提…

中兴机顶盒B860AV1.1刷机固件升级和教程「适用4/8G版」

准备工作: TTL 线(CH340G 按系统版本找到要对应驱动)下载 putty 软件拆开电视盒接好 TTL 线(2、5、6 针脚对应GND、RX、TX)在资源管理器的端口选项下找到 CH340G,记住端口号(如 COM4&#xff0…

SeggisV1.0 遥感影像分割软件【源代码】讲解

在此基础上进行二次开发,开发自己的软件,例如:【1】无人机及个人私有影像识别【2】离线使用【3】变化监测模型集成【4】个人私有分割模型集成等等,不管是您用来个人学习 还是公司研发需求,都相当合适,包您满…

QINQ技术

定义 QINQ即802.1q in 802.1q,因为IEEE802.1Q中定义的Vlan Tag域只有12个比特,仅能表示4096个Vlan,随网络发展被用尽,于是在原有带vlan的数据上再携带一层vlan标签用于扩展vlan数目。一般来说外层vlan是公网,内层是私…

linux基础2

声明! 学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下,如涉及侵权马上删除文章,笔记只是方便各位师傅的学习和探讨,文章所提到的网站以及内容,只做学习交流,其他均与本人以及泷羽sec团队无关&#…

鸿蒙千帆启新程,共绘数字生态蓝图

华为的鸿蒙千帆起计划:共筑数字未来,学习华为创新之路 在当今全球科技竞争日益激烈的背景下,华为作为中国科技企业的代表,正通过其自主创新的鸿蒙系统,引领一场移动应用生态的变革。鸿蒙千帆起计划,作为华…

Qt-系统相关(2)多线程网络

Qt多线程 在 Qt 中,多线程的处理⼀般是通过 QThread类 来实现。 QThread 代表⼀个在应⽤程序中可以独⽴控制的线程,也可以和进程中的其他线程共享数据。 QThread 对象管理程序中的⼀个控制线程。 QThread 常⽤ API: 使用线程 关于创建线程…

永久免费的PDF万能水印删除工具

永久免费的PDF万能水印删除工具 1.简介 PDF万能水印删除工具,可以去除99.9%的PDF水印。例如:XObject水印(含图片水印)、文本水印、绘图水印/曲线水印、注释水印、工件水印、剪切路径水印等等。本软件是永久免费,无有…

华三(HCL)和华为(eNSP)模拟器共存安装手册

接上章叙述,解决同一台PC上同时部署华三(HCL)和华为(eNSP)模拟器。原因就是华三HCL 的老版本如v2及以下使用VirtualBox v5版本,可以直接和eNSP兼容Oracle VirtualBox,而其他版本均使用Oracle VirtualBox v6以上的版本,…

深度理解进程的概念(Linux)

目录 一、冯诺依曼体系 二、操作系统(OS) 设计操作系统的目的 核心功能 系统调用 三、进程的概念与基本操作 简介 查看进程 通过系统调用获取进程标识符 通过系统调用创建进程——fork() 四、进程的状态 操作系统中的运行、阻塞和挂起 理解linux内核链表 Linux的进…

SQLite 管理工具 SQLiteStudio 3.4.5 发布

SQLiteStudio 3.4.5 版本现已发布,它带来了大量的 bug 修复,并增加了一些小功能。SQLiteStudio 是一个跨平台的 SQLite 数据库的管理工具。 具体更新内容包括: 现在可以使用 Collations Editor 窗口在数据库中注册 Extension-based collatio…

非常简单实用的前后端分离项目-仓库管理系统(Springboot+Vue)part 2

七、创建前端项目 你下载了nodejs吗?从cn官网下载:http://nodejs.cn/download/,或者从一个国外org网站下载,选择自己想要的版本https://nodejs.org/download/release/,双击下载好的安装文件,选择安装路径安…

继续完善wsl相关内容:基础指令

文章目录 前言一、我们需要安装wsl,这也是安装docker desktop的前提,因此我们在这篇文章里做了介绍:二、虽然我们在以安装docker desktop为目的时,不需要安装wsl的分发(distribution),但是装一个分发也是有诸多好处的:三、在使用wsl时,不建议把东西直接放到系统里,因…

20241124 Typecho 视频插入插件

博文免不了涉及到视频插入这些,网上的插件都或多或少的比较重,和Typecho的风格不搭配 后面就有了DPlay插件精简而来的VideoInsertion插件 VideoInsertion: Typecho 视频插入插件 目录结构 rockhinlink-ht2:/var/www/html/typecho/usr/plugins/VideoInsertion$ tree -h [4.…

css:项目

这是一个完整的网站制作的流程 美工会先制作一个原型图: 原型图写的不详细,就是体现一个网页大致的布局 然后美工再做一个psd样例图片 然后再交给程序员 项目 模块化开发:把代码的不同的样式封装起来,需要用到相同样式的标签就…

Qt桌面应用开发 第九天(综合项目一 飞翔的鸟)

目录 1.鸟类创建 2.鸟动画实现 3.鼠标拖拽 4.自动移动 5.右键菜单 6.窗口透明化 项目需求: 实现思路: 创建项目导入资源鸟类创建鸟动画实现鼠标拖拽实现自动移动右键菜单窗口透明化 1.鸟类创建 ①鸟类中包含鸟图片、鸟图片的最小值下标和最大值…

网络安全期末复习

第1章 网络安全概括 (1)用户模式切换到系统配置模式(enable)。 (2)显示当前位置的设置信息,很方便了解系统设置(show running-config)。 (3)显…

使用Python实现自动化邮件通知:当长时程序运行结束时

使用Python实现自动化邮件通知:当长时程序运行结束时 前提声明 本代码仅供学习和研究使用,不得用于商业用途。请确保在合法合规的前提下使用本代码。 目录 引言项目背景项目设置代码分析 导入所需模块定义邮件发送函数发送邮件 实现步骤结语全部代码…

Python学习35天

# 定义父类 class Computer: CPUNone MemoryNone diskNone def __init__(self,CPU,Memory,disk): self.disk disk self.Memory Memory self.CPU CPU def get_details(self): return f"CPU:{self.CPU}\tdisk:{self.disk}\t…