【大数据测试之:RabbitMQ消息列队测试-发送、接收、持久化、确认、重试、死信队列并处理消息的并发消费、负载均衡、监控等】详细教程---保姆级

RabbitMQ消息列队测试教程

  • 一、环境准备
    • 1. 安装 RabbitMQ
    • 2. 安装 Python 依赖
  • 二、基本消息队列中间件实现
    • 1. 消息发送模块
    • 2. 消息接收模块
  • 三、扩展功能
    • 1. 消息持久化和队列持久化
    • 2. 消息优先级
    • 3. 死信队列(DLQ)
  • 四、并发处理和负载均衡
    • 1. 使用 Python 的 `threading` 模块启动多个消费者
  • 五、消息确认与重试机制
    • 1. 消息确认
    • 2. 重试机制
  • 六、监控与日志记录
    • 1. 日志记录
  • 七、测试步骤

一、环境准备

1. 安装 RabbitMQ

你可以在本地或通过 Docker 启动 RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

默认用户名和密码为 guest。你可以通过访问 http://localhost:15672 来管理 RabbitMQ。

2. 安装 Python 依赖

我们将使用 pika 库与 RabbitMQ 进行通信。安装 pika

pip install pika

二、基本消息队列中间件实现

我们首先构建一个简单的消息队列发送和接收框架,确保消息能够成功传递。

1. 消息发送模块

send_message.py 用于将消息发送到队列:

import pika# 连接到RabbitMQ服务器并发送消息
def send_to_queue(queue_name, message):try:# 建立连接connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 确保队列存在channel.queue_declare(queue=queue_name, durable=True)# 发送消息到队列channel.basic_publish(exchange='',routing_key=queue_name,body=message,properties=pika.BasicProperties(delivery_mode=2,  # 持久化消息))print(f"Sent: {message}")except Exception as e:print(f"Error sending message: {e}")finally:# 关闭连接connection.close()if __name__ == "__main__":send_to_queue('task_queue', 'Hello, RabbitMQ!')

2. 消息接收模块

receive_message.py 用于从队列接收消息并进行处理:

import pika# 连接到RabbitMQ服务器并接收消息
def receive_from_queue(queue_name):try:# 建立连接connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 确保队列存在channel.queue_declare(queue=queue_name, durable=True)# 定义回调函数来处理接收到的消息def callback(ch, method, properties, body):print(f"Received: {body.decode()}")ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认消息已处理# 开始消费消息channel.basic_consume(queue=queue_name, on_message_callback=callback)print(f"Waiting for messages in {queue_name}...")channel.start_consuming()except Exception as e:print(f"Error receiving message: {e}")if __name__ == "__main__":receive_from_queue('task_queue')

三、扩展功能

1. 消息持久化和队列持久化

为了保证消息在 RabbitMQ 重启后不会丢失,我们需要确保队列和消息都被持久化。

  • send_message.py 中,使用 delivery_mode=2 使消息持久化:

    properties=pika.BasicProperties(delivery_mode=2  # 持久化消息
    )
    
  • receive_message.py 中,确保队列是持久化的:

    channel.queue_declare(queue=queue_name, durable=True)
    

2. 消息优先级

RabbitMQ 支持设置消息的优先级,这可以用来保证高优先级的消息先被处理。你可以在声明队列时设置最大优先级:

channel.queue_declare(queue=queue_name, durable=True, arguments={'x-max-priority': 10})

然后,在发送消息时,你可以指定消息的优先级:

channel.basic_publish(exchange='',routing_key=queue_name,body=message,properties=pika.BasicProperties(priority=5  # 设置消息优先级)
)

3. 死信队列(DLQ)

当消息消费失败或超时,可以将消息发送到死信队列。定义死信队列和交换机的方式如下:

channel.queue_declare(queue=queue_name,durable=True,arguments={'x-dead-letter-exchange': 'dlx_exchange','x-message-ttl': 10000  # 设置消息的超时时间}
)

然后,在死信队列中进行消息处理。

四、并发处理和负载均衡

RabbitMQ 能够支持多个消费者并行消费队列中的消息。你可以通过创建多个消费者来实现并发消费。

1. 使用 Python 的 threading 模块启动多个消费者

import threadingdef start_consumer():receive_from_queue('task_queue')# 启动5个消费者并行处理消息
for _ in range(5):threading.Thread(target=start_consumer).start()

RabbitMQ 会自动将消息分发到不同的消费者,实现负载均衡。

五、消息确认与重试机制

如果消息处理失败,可以使用 手动确认 来重试消息。

1. 消息确认

在消费者中,使用 ch.basic_ack 来确认消息已被处理:

ch.basic_ack(delivery_tag=method.delivery_tag)

如果消息处理失败,可以拒绝消息并重新排队:

ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

2. 重试机制

可以设置 死信队列 来处理失败的消息,或者使用延迟队列来定时重试消息。

channel.queue_declare(queue=retry_queue,durable=True,arguments={'x-dead-letter-exchange': 'retry_exchange','x-message-ttl': 5000  # 设置消息的超时时间}
)

六、监控与日志记录

为了有效地监控消息队列的运行情况,建议启用 RabbitMQ 的 管理插件

  • 你可以访问 RabbitMQ 管理界面:http://localhost:15672,查看队列、交换机和消费者的状态。

1. 日志记录

在生产环境中,使用 Python 的 logging 模块进行日志记录:

import logginglogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)def callback(ch, method, properties, body):try:logger.info(f"Received message: {body.decode()}")ch.basic_ack(delivery_tag=method.delivery_tag)except Exception as e:logger.error(f"Error processing message: {e}")ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

七、测试步骤

  1. 发送测试消息:使用 send_message.py 向队列发送消息。
  2. 接收测试消息:使用 receive_message.py 接收消息并确认消息是否被正确消费。
  3. 消息持久化测试:重启 RabbitMQ,确保消息不会丢失。
  4. 并发消费测试:启动多个消费者,并测试消息是否能够均匀分发。
  5. 消息重试测试:模拟消费失败,并测试死信队列和消息重试机制是否有效。

推荐阅读:《大数据测试专栏》,《Docker实践详解》


下期预告:

  • 《大数据测试之:RabbitMQ消息测试-消息加密和认证+不同消息列队测试+处理复杂的消息路由及交换机配置》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/62269.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DAMODEL丹摩|部署FLUX.1+ComfyUI实战教程

本文仅做测评体验,非广告。 文章目录 1. FLUX.1简介2. 实战2. 1 创建资源2. 1 ComfyUI的部署操作2. 3 部署FLUX.1 3. 测试5. 释放资源4. 结语 1. FLUX.1简介 FLUX.1是由黑森林实验室(Black Forest Labs)开发的开源AI图像生成模型。它拥有12…

具体的技术和工具在县级融媒体建设3.0中有哪些应用?

以下是结合数据来看县级融媒体建设3.0的一些情况: 技术应用方面 大数据:人民网舆情数据中心执行主任董盟君提到,通过大数据分析可让融媒体单位快速关注聚焦点,实现智能策划、智能推送、智能传播,推动媒体传播影响力提…

中兴机顶盒B860AV1.1刷机固件升级和教程「适用4/8G版」

准备工作: TTL 线(CH340G 按系统版本找到要对应驱动)下载 putty 软件拆开电视盒接好 TTL 线(2、5、6 针脚对应GND、RX、TX)在资源管理器的端口选项下找到 CH340G,记住端口号(如 COM4&#xff0…

论 AI(人工智能)的现状

刚直播完,就写篇文章,说说我对AI的看法。 一直以来,想对AI 有一个总结,现在AI工具层出不穷,但是它的本质上发展还是缓慢,通过其他方式来扩展本身的能力。 我认为的人工智能 只有像钢铁侠与贾维斯以及终结…

volcano k8s 部署

下载volcano-development文件 官网 https://volcano.sh/zh/docs/installation/volcano-development.yaml wget https://raw.githubusercontent.com/volcano-sh/volcano/master/installer/volcano-development.yaml部署volcano 查下需要下载的镜像 grep vc- volcano-develo…

SeggisV1.0 遥感影像分割软件【源代码】讲解

在此基础上进行二次开发,开发自己的软件,例如:【1】无人机及个人私有影像识别【2】离线使用【3】变化监测模型集成【4】个人私有分割模型集成等等,不管是您用来个人学习 还是公司研发需求,都相当合适,包您满…

Dubbo 最基础的 RPC 应用(使用 ZooKeeper)

看国内的一些项目时 Dubbo 这个词经常闪现,一直也不以为然,未作搜索,当然也不知道它是做什么用的。直到最近阅读关于大型网站架构相关的书中反复提到 Dubbo 后,觉得不能再对它视而不见。Google 了一下,它是在阿里巴巴创…

QINQ技术

定义 QINQ即802.1q in 802.1q,因为IEEE802.1Q中定义的Vlan Tag域只有12个比特,仅能表示4096个Vlan,随网络发展被用尽,于是在原有带vlan的数据上再携带一层vlan标签用于扩展vlan数目。一般来说外层vlan是公网,内层是私…

用哪两种方式来实现集合的排序?

集合(Set)是一种常见的数据结构,用于存储不重复的元素。在某些情况下,我们需要对集合中的元素进行排序。虽然集合本身是无序的,但我们可以将集合转换为其他有序的数据结构(如列表)来实现排序。 …

数据库与大数据管理 头歌作业 期末复习

1、 下列说法错误的是?c A、UserCF算法推荐的是那些和目标用户有共同兴趣爱好的其他用户所喜欢的物品 B、ItemCF算法推荐的是那些和目标用户之前喜欢的物品类似的其他物品 C、UserCF算法的推荐更偏向个性化 D、UserCF随着用户数目的增大,用户相似度…

在 Linux 系统中根据pid查找软件位置

在 Linux 系统中,如果您知道一个进程的 PID(进程标识符),并且想要找到该进程对应的可执行文件的位置,可以使用以下几种方法: 方法一:使用 ps 命令 ps 命令可以显示进程的详细信息,包括可执行文件的路径。假设您的 PID 是 1234,可以使用以下命令: ps -p 1234 -o co…

未成年人模式护航,保障安全健康上网

为保护未成年人的上网环境,预防未成年人沉迷网络,帮助未成年人培养积极健康的用网习惯,HarmonyOS SDK 提供未成年人模式功能,在华为设备上加强对面向未成年人的产品和服务的管理。 场景介绍(应用跟随系统未成年人模式…

linux基础2

声明! 学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下,如涉及侵权马上删除文章,笔记只是方便各位师傅的学习和探讨,文章所提到的网站以及内容,只做学习交流,其他均与本人以及泷羽sec团队无关&#…

如何分析 Nginx 日志

分析 Nginx 日志可以帮助我们了解服务器性能、流量来源、用户行为,以及诊断问题(如错误和攻击)。以下是详细的分析方法: 1. 日志类型 Nginx 有两种主要日志: 访问日志 (Access Log):记录客户端对服务器的…

博客摘录「 【UnityEditor编辑器扩展】遍历子节点,带中断实现」2024年11月28日

Unity项目内,全局搜索预制体 var title ""; var progress 0; var files SearchFiles(Application.dataPath, new []{"*.prefab"}); EditorUtility.DisplayProgressBar(title, assetPath, progress / filesCount); EditorUtility.ClearProgr…

数据结构入门(C语言复习)malloc开辟free释放

/*void* malloc(size_t) 如果成功&#xff0c;会返回从堆内存上分配的内存指针 如果失败&#xff0c;会返回空指针*/ #include<stdio.h> #include<stdlib.h>//malloc要用 #include<string.h> typedef struct { int x;//如果是char x&#xff0c;内存对齐…

Git仓库迁移到远程仓库(源码、分支、提交)

单个迁移仓库 一、迁移仓库 1.准备工作 > 手动在电脑创建一个临时文件夹&#xff0c;CMD进入该目录 > 远程仓库上创建一个同名的空仓库 2.CMD命令&#xff1a;拉取旧Git仓库&#xff08;包含提交、分支、源码&#xff09; $ git clone --bare http://git.domain.cn/…

鸿蒙千帆启新程,共绘数字生态蓝图

华为的鸿蒙千帆起计划&#xff1a;共筑数字未来&#xff0c;学习华为创新之路 在当今全球科技竞争日益激烈的背景下&#xff0c;华为作为中国科技企业的代表&#xff0c;正通过其自主创新的鸿蒙系统&#xff0c;引领一场移动应用生态的变革。鸿蒙千帆起计划&#xff0c;作为华…

React Native 组件详解之 ActivityIndicator、Button、FlatList、Image、ImageBackground

React Native 组件详解&#xff1a;ActivityIndicator、Button、FlatList、Image 和 ImageBackground 在 React Native 中&#xff0c;组件是构建用户界面的基本单元。它们可以是简单的 UI 元素&#xff0c;如按钮或图像&#xff0c;也可以是复杂的列表或表单。了解这些组件的…

高级java每日一道面试题-2024年11月28日-JVM篇-调优命令有哪些?

如果有遗漏,评论区告诉我进行补充 面试官: 调优命令有哪些? 我回答: 在Java高级面试中&#xff0c;调优命令是面试官常问的问题之一。以下是对Java调优命令的详细介绍&#xff1a; 一、主要调优命令 1. jps&#xff08;JVM Process Status Tool&#xff09; 功能&#x…