RabbitMQ工作模式 - 简单模式和work工作模式多个竞争的消费者

RabbitMQ 是一个消息队列中间件,用于在分布式系统中进行消息传递。在 RabbitMQ 中,有几种工作模式,其中简单模式和工作模式是其中两种基本的模式之一。

  1. 简单模式(Simple Mode):

    • 在简单模式中,有一个生产者(Producer)将消息发送到一个队列(Queue)中,然后有一个消费者(Consumer)从队列中接收并处理消息。
    • 这是最基本的消息队列模式,适用于单个生产者和单个消费者的场景。
    • 生产者将消息发送到队列,而消费者从队列中接收并处理消息,消息的传递是单向的。
  2. 工作模式(Work Queue Mode):

    • 工作模式也被称为竞争消费者模式。在这种模式下,有多个消费者监听同一个队列,但每条消息只能被其中一个消费者接收和处理。
    • 当消息被发送到队列时,它将被发送给下一个空闲的消费者,从而实现消息的分发和并发处理。
    • 这种模式对于处理大量工作的情况很有用,可以通过增加消费者的数量来提高消息处理的速度。

在 RabbitMQ 中,简单模式和工作模式的实现通常使用一些基本的概念,包括生产者、消费者、队列和消息。生产者负责发送消息到队列,而消费者则负责从队列中接收和处理消息。

下面是一个使用 RabbitMQ 和 Node.js(使用 amqplib 库)以及 TypeScript 实现工作模式的简单示例。在这个例子中,我们将使用 amqplib 库来连接 RabbitMQ 服务器,并使用 TypeScript 来编写代码。

首先,确保你已经安装了 amqplib 库。可以使用以下命令进行安装:

npm install amqplib

接下来,创建一个生产者和一个消费者的 TypeScript 文件。以下是示例代码:

producer.ts:

import * as amqp from 'amqplib';async function produce() {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'work_queue';await channel.assertQueue(queue, { durable: true });for (let i = 0; i < 10; i++) {const message = `Message ${i}`;channel.sendToQueue(queue, Buffer.from(message), { persistent: true });console.log(` [x] Sent '${message}'`);}setTimeout(() => {connection.close();process.exit(0);}, 500);
}produce();

consumer.ts:

import * as amqp from 'amqplib';async function consume() {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'work_queue';await channel.assertQueue(queue, { durable: true });channel.prefetch(1);console.log(' [*] Waiting for messages. To exit press CTRL+C');channel.consume(queue, async (msg) => {if (msg !== null) {const message = msg.content.toString();console.log(` [x] Received ${message}`);// Simulate some workawait new Promise(resolve => setTimeout(resolve, 1000));console.log(' [x] Done');channel.ack(msg);}});
}consume();

这个示例中,生产者将消息发送到名为 work_queue 的队列中,而消费者则监听该队列并处理消息。消费者使用 channel.prefetch(1) 来确保一次只接收一个消息,从而实现竞争消费者模式。

记得在运行前启动 RabbitMQ 服务器,并确保 TypeScript 文件已编译成 JavaScript。你可以使用以下命令进行编译:

tsc producer.ts
tsc consumer.ts

然后,分别运行 producer.jsconsumer.js。这样你就可以在 RabbitMQ 中看到消息的生产和消费过程。

RabbitMQ消息持久化和手动应答

在 RabbitMQ 中,消息持久化和手动应答是两个关键的概念,它们可以帮助确保消息的可靠传递和处理。下面简要介绍这两个概念:

  1. 消息持久化(Message Durability):

    • 默认情况下,RabbitMQ 中的消息是瞬时的,也就是说,如果 RabbitMQ 服务器停止或崩溃,所有未处理的消息都会丢失。
    • 通过将消息标记为持久化,你可以确保消息在 RabbitMQ 服务器重启后仍然可用。要实现消息持久化,需要在发送消息时设置消息的 deliveryMode 属性为 2persistent)。
    • 例如,在生产者端设置消息为持久化:
    channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
    
    • 在消费者端,你需要确保队列和消息都被声明为持久化:
    channel.assertQueue(queue, { durable: true });
    

    这样,即使 RabbitMQ 服务器重启,持久化的消息也不会丢失。

  2. 手动应答(Manual Acknowledgment):

    • 默认情况下,RabbitMQ 使用自动应答(auto-acknowledgment)模式,即一旦消息被传递给消费者,RabbitMQ 就将其标记为已处理。
    • 在某些情况下,你可能需要更细粒度的控制,以确保消息在被消费者完全处理之后才被标记为已处理。这就是手动应答的用途。
    • 在消费者端,需要将 noAck 设置为 false,表示手动应答模式:
channel.consume(queueName, async (msg: Message | null) => {if (msg) {const data: EmailTask = JSON.parse(msg.content.toString());console.log('Processing mail task:', msg.content.toString());try {//模拟邮件发送await new Promise(resolve => setTimeout(resolve, 1000));console.log(' [x] Done');channel.ack(msg);} catch (error) {console.log('error:', data);// 处理消息失败,判断是否需要进行重试if (canRetry(msg)) {// 重新入队,进行下一次尝试channel.reject(msg, true);} else {// 不进行重试,将消息从队列中移除channel.ack(msg);}}}
});
  • 在这种情况下,消费者需要在处理完消息后显式调用 channel.ack(msg) 来确认消息已被处理。如果消费者崩溃或在处理消息时发生错误,消息将保持在队列中,直到被明确确认。
  • 在 RabbitMQ 中,channel.reject 方法用于拒绝一条消息。它的参数如下channel.reject(msg, requeue);
    msg: 要拒绝的消息对象。
    requeue: 如果设置为 true,则被拒绝的消息将被重新排队,即重新放回队列。如果设置为 false,则消息将被删除。默认为 true。

综合使用消息持久化和手动应答,可以确保在面对不同情况时,消息的可靠传递和处理。

重试间隔和次数

在进行消息重试时,你可以考虑添加一些延迟和控制重试次数的逻辑。以下是修改后的代码,考虑了重试间隔和次数的情况:

function canRetry(msg: Message) {const maxRetryAttempts = 3; // 最大重试次数// 从消息的属性中获取重试次数const retryCount = msg.properties.headers['x-retry-count'] || 0;// 判断是否达到最大重试次数return retryCount < maxRetryAttempts;
}async function processMailTask(msg: Message) {const data: EmailTask = JSON.parse(msg.content.toString());console.log('Processing mail task:', msg.content.toString());try {// 模拟邮件发送await new Promise(resolve => setTimeout(resolve, 1000));console.log(' [x] Done');channel.ack(msg);} catch (error) {console.log('error:', data);// 处理消息失败,判断是否需要进行重试if (canRetry(msg)) {// 获取当前重试次数const retryCount = msg.properties.headers['x-retry-count'] || 0;// 计算下一次重试的延迟时间,可以根据重试次数进行指数退避const delay = Math.pow(2, retryCount) * 1000;// 在一定的延迟后重新入队,进行下一次尝试setTimeout(() => {channel.nack(msg, false, false);}, delay);} else {// 不进行重试,将消息从队列中移除channel.ack(msg);}}
}channel.consume(queueName, async (msg: Message | null) => {if (msg) {await processMailTask(msg);}
});

在这个示例中,我添加了一个 processMailTask 函数来处理邮件任务。在处理失败的情况下,根据重试次数计算下一次重试的延迟时间,然后使用 setTimeout 在一定的延迟后重新入队。这里使用了指数退避的策略,即每次重试的延迟时间是上一次的两倍。

channel.nack(msg, allUpTo, requeue);
  • msg: 要否定的消息对象。
  • allUpTo(可选参数): 如果设置为 true,则所有比当前消息更早的消息也将被否定。默认为 false
  • requeue(可选参数): 如果设置为 true,则被否定的消息将被重新排队,即重新放回队列。如果设置为 false,则消息将被删除。默认为 true

在你的代码中,channel.nack(msg, false, false); 表示对当前消息 msg 进行否定,并且不重新将消息放回队列,而是将其删除。这在处理重试逻辑时很重要,因为我们通过 setTimeout 自定义了重试的时间,并手动重新入队。如果 requeue 设置为 true,消息会立即被重新入队,这可能与我们的定制重试逻辑冲突。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/647406.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【github】github打开慢或者打不开解决方案

目录 1、打开hosts文件&#xff08;C:\Windows\System32\drivers\etc&#xff09; 2、然末尾放入一下两个 IP 地址&#xff1a; 3、替换覆盖原文件 最近github老是打不开&#xff0c;找了一个方法试了一下管用 github网址查询&#xff1a;https://ipaddress.com/website/git…

css 中 flex 布局最后一行实现左对齐

问题 flex 布局最后一行没有进行左对齐显示&#xff1a; <div classparent><div classchild></div><div classchild></div><div classchild></div><div classchild></div><div classchild></div><div…

2022年至2023年广东省职业院校技能大赛高职组“信息安全管理与评估”赛项样题

2022 年至 2023 年广东省职业院校技能大赛高职组“信息安全管理与评估”赛项样题 一、 第一阶段竞赛项目试题 本文件为信息安全管理与评估项目竞赛第一阶段试题&#xff0c;第一阶段内容包 括&#xff1a;网络平台搭建、网络安全设备配置与防护。 本阶段比赛时间为 180 分钟…

VirtualBox如何复制虚拟机

对于vmware或virtual pc虚拟机&#xff0c;要想快速复制几个虚拟机&#xff0c;以便集群使用&#xff0c;方法比较简单&#xff0c;例如直接复制其虚拟机相应的磁盘文件和配置文件即可&#xff0c;例如对于vmware&#xff0c;修改vmx文本文件中的内容如虚拟机名称、磁盘文件路径…

#Uniapp: uni.previewImage(OBJECT) 预览图片

uni.previewImage(OBJECT) 预览图片。 api地址 媒体-图片 示例 handlePreviewImg(current) {const urls this.rightList.map(x > x.icon)uni.previewImage({urls,current})}OBJECT 参数说明 参数名类型必填说明平台差异说明countNumber否最多可以选择的图片张数&#…

【知识---ubuntu和debian之间的关系】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、ubuntu和debian之间的关系衍生关系&#xff1a;Debian作为上游源&#xff1a;软件包管理&#xff1a;版本发布&#xff1a;社区和支持&#xff1a; 二、ubu…

华为数通方向HCIP-DataCom H12-831题库(判断题:121-140)

第121题 BGP/MPLS IP VPN内层采用MP-BGP分配的标签区分不同的VPN实例,外层可采用多种隧道类型,例如GRE隧道。 正确 错误 答案: 错误 解析: VPN业务的转发需要隧道来承载,隧道类型包括GRE隧道、LSP隧道、TE隧道(即CR-LSP)。 如果网络边缘的PE设备具备MPLS功能,但骨干网核…

林浩然与Hadoop的奇幻数据之旅

林浩然与Hadoop的奇幻数据之旅 Lin Haoran and the Enchanting Data Journey with Hadoop 在一个名为“比特村”的地方&#xff0c;住着一位名叫林浩然的程序员大侠。他并非江湖上常见的武艺高强之人&#xff0c;而是凭借一把键盘、一支鼠标&#xff0c;纵横在大数据的海洋里。…

UI跟随物体的关键是什么?重要吗?

引言 UI的跟随效果 在游戏开发中&#xff0c;UI的跟随效果是提高用户体验和交互性的重要组成部分。 本文将深入介绍如何创建一个高效且可定制的UI跟随目标组件&#xff0c;并分享一些最佳实践。 本文源工程在文末获取&#xff0c;小伙伴们自行前往。 UI跟随物体的关键 UI…

MQ回顾之kafka速通

不定期更新 官网概念自查 官网&#xff1a;Apache Kafka kafka结构 和kafka相关的关键名词有&#xff1a;Producer、Broker、Topic、Partition、Replication、Message、Consumer、Consumer Group、Zookeeper。 各名词解释已经泛滥&#xff0c;如果你想看点不一样的&#xf…

如何设计一个可靠UDP

背景 通信领域存在制约三角&#xff1a;时延&#xff0c;成本和质量。TCP是增大时延和成本来保证通信质量&#xff0c;UDP牺牲了质量保证了时延和成本。一定场景使用RDP可以找到这三之间的平衡点。实现可靠UDP主要有三种方式&#xff1a; 尽力可靠&#xff1a;接收方要求发送…

A 股承担着一个什么功能?

​A 股&#xff1a;中国资本市场的核心角色 A 股&#xff0c;即人民币普通股票&#xff0c;在中国资本市场中扮演着至关重要的角色。它不仅是投资者买卖交易的场所&#xff0c;更是中国经济发展的重要引擎。 首先&#xff0c;A 股为中国的企业提供了融资平台。中国有着庞大的…

从Elasticsearch来看分布式系统架构设计

从Elasticsearch来看分布式系统架构设计 - 知乎 分布式系统类型多&#xff0c;涉及面非常广&#xff0c;不同类型的系统有不同的特点&#xff0c;批量计算和实时计算就差别非常大。这篇文章中&#xff0c;重点会讨论下分布式数据系统的设计&#xff0c;比如分布式存储系统&…

Zookeeper3.5.7源码分析

文章目录 一、Zookeeper算法一致性1、Paxos 算法1.1 概述1.2 算法流程1.3 算法缺陷 2、ZAB 协议2.1 概述2.2 Zab 协议内容 3、CAP理论 二、源码详解1、辅助源码1.1 持久化源码(了解)1.2 序列化源码 2、ZK 服务端初始化源码解析2.1 启用脚本分析2.2 ZK 服务端启动入口2.3 解析参…

鸿蒙入门学习的一些总结

前言 刚开始接触鸿蒙是从2023年开始的&#xff0c;当时公司在调研鸿蒙开发板能否在实际项目中使用。我们当时使用的是OpenHarmony的&#xff0c;基于DAYU/rk3568开发板&#xff0c;最开始系统是3.2的&#xff0c;API最高是API9&#xff0c;DevecoStudio 版本3.1的。 鸿…

List, Set, Queue, Map 四者的区别

List、Set、Queue、Map 是 Java 中常用的集合类型&#xff0c;它们的主要区别如下&#xff1a; List List 是可重复有序的集合。可以通过索引访问 List 中的元素。可以添加、删除、修改 List 中的元素。常用的实现类有 ArrayList 和 LinkedList。 Set Set 是不可重复的无序…

excel统计分析——Duncan法多重比较

参考资料&#xff1a;生物统计学 Duncan法又称新复极差检验法&#xff0c;是对S-N-K法的改进&#xff0c;根据秩次距m对临界值的显著水平α进行调整&#xff0c;是最常用的多重比较方法。最小显著极差表示如下&#xff1a; 其中&#xff0c;m为秩次距&#xff0c;df为方差分析中…

【软件测试】学习笔记-制定一份有效的性能测试方案

什么是性能测试方案&#xff1f; 性能测试方案&#xff0c;通俗一点说就是指导你进行性能测试的文档&#xff0c;包含测试目的、测试方法、测试场景、环境配置、测试排期、测试资源、风险分析等内容。一份详细的性能测试方案可以帮助项目成员明确测试计划和手段&#xff0c;更…

CentOS服务器拒绝SSH登录

当CentOS服务器拒绝SSH登录时&#xff0c;有几个可能的原因和解决方法&#xff1a; 检查网络连接&#xff1a;确保服务器与您的计算机之间的网络连接是正常的。您可以尝试使用其他网络连接或ping服务器以检查是否能够访问。 确认SSH服务正在运行&#xff1a;在服务器上确认SSH…

Python3进行pdf文件分割及转word

今天有个pdf分割的需求&#xff0c;电脑装的Python3&#xff0c;网上查资料都是Python2的代码&#xff0c;所以整理一份3的 安装&#xff1a; pip install PyPDF2 import PyPDF2def funSplitPdf():pdf_file open(/path/fileName.pdf, rb)pdf_reader PyPDF2.PdfReader(pdf_fi…