RabbitMQ简单模式和工作模式

RabbitMQ 是一个消息队列中间件,用于在分布式系统中进行消息传递。在 RabbitMQ 中,有几种工作模式,其中简单模式和工作模式是其中两种基本的模式之一。

  1. 简单模式(Simple Mode):

    • 在简单模式中,有一个生产者(Producer)将消息发送到一个队列(Queue)中,然后有一个消费者(Consumer)从队列中接收并处理消息。
    • 这是最基本的消息队列模式,适用于单个生产者和单个消费者的场景。
    • 生产者将消息发送到队列,而消费者从队列中接收并处理消息,消息的传递是单向的。
  2. 工作模式(Work Queue Mode):

    • 工作模式也被称为竞争消费者模式。在这种模式下,有多个消费者监听同一个队列,但每条消息只能被其中一个消费者接收和处理。
    • 当消息被发送到队列时,它将被发送给下一个空闲的消费者,从而实现消息的分发和并发处理。
    • 这种模式对于处理大量工作的情况很有用,可以通过增加消费者的数量来提高消息处理的速度。

在 RabbitMQ 中,简单模式和工作模式的实现通常使用一些基本的概念,包括生产者、消费者、队列和消息。生产者负责发送消息到队列,而消费者则负责从队列中接收和处理消息。

下面是一个使用 RabbitMQ 和 Node.js(使用 amqplib 库)以及 TypeScript 实现工作模式的简单示例。在这个例子中,我们将使用 amqplib 库来连接 RabbitMQ 服务器,并使用 TypeScript 来编写代码。

首先,确保你已经安装了 amqplib 库。可以使用以下命令进行安装:

npm install amqplib

接下来,创建一个生产者和一个消费者的 TypeScript 文件。以下是示例代码:

producer.ts:

import * as amqp from 'amqplib';async function produce() {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'work_queue';await channel.assertQueue(queue, { durable: true });for (let i = 0; i < 10; i++) {const message = `Message ${i}`;channel.sendToQueue(queue, Buffer.from(message), { persistent: true });console.log(` [x] Sent '${message}'`);}setTimeout(() => {connection.close();process.exit(0);}, 500);
}produce();

consumer.ts:

import * as amqp from 'amqplib';async function consume() {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'work_queue';await channel.assertQueue(queue, { durable: true });// 设置每次只处理一个消息[平均分配的概念,不会让一个work太忙和太闲]//这一行代码的作用是告诉 RabbitMQ 不要在消费者未确认(ack)之前向其发送新的消息await channel.prefetch(1);console.log(' [*] Waiting for messages. To exit press CTRL+C');await channel.consume(queue, async (msg) => {if (msg !== null) {const message = msg.content.toString();console.log(` [x] Received ${message}`);// Simulate some workawait new Promise(resolve => setTimeout(resolve, 1000));console.log(' [x] Done');channel.ack(msg);}});
}consume();

这个示例中,生产者将消息发送到名为 work_queue 的队列中,而消费者则监听该队列并处理消息。消费者使用 channel.prefetch(1) 来确保一次只接收一个消息,从而实现竞争消费者模式。

记得在运行前启动 RabbitMQ 服务器,并确保 TypeScript 文件已编译成 JavaScript。你可以使用以下命令进行编译:

tsc producer.ts
tsc consumer.ts

然后,分别运行 producer.jsconsumer.js。这样你就可以在 RabbitMQ 中看到消息的生产和消费过程。

RabbitMQ消息持久化和手动应答

在 RabbitMQ 中,消息持久化和手动应答是两个关键的概念,它们可以帮助确保消息的可靠传递和处理。下面简要介绍这两个概念:

  1. 消息持久化(Message Durability):

    • 默认情况下,RabbitMQ 中的消息是瞬时的,也就是说,如果 RabbitMQ 服务器停止或崩溃,所有未处理的消息都会丢失。
    • 通过将消息标记为持久化,你可以确保消息在 RabbitMQ 服务器重启后仍然可用。要实现消息持久化,需要在发送消息时设置消息的 deliveryMode 属性为 2persistent)。
    • 例如,在生产者端设置消息为持久化:
    channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
    
    • 在消费者端,你需要确保队列和消息都被声明为持久化:
    channel.assertQueue(queue, { durable: true });
    

    这样,即使 RabbitMQ 服务器重启,持久化的消息也不会丢失。

  2. 手动应答(Manual Acknowledgment):

    • 默认情况下,RabbitMQ 使用自动应答(auto-acknowledgment)模式,即一旦消息被传递给消费者,RabbitMQ 就将其标记为已处理。
    • 在某些情况下,你可能需要更细粒度的控制,以确保消息在被消费者完全处理之后才被标记为已处理。这就是手动应答的用途。
    • 在消费者端,需要将 noAck 设置为 false,表示手动应答模式:
channel.consume(queueName, async (msg: Message | null) => {if (msg) {const data: EmailTask = JSON.parse(msg.content.toString());console.log('Processing mail task:', msg.content.toString());try {//模拟邮件发送await new Promise(resolve => setTimeout(resolve, 1000));console.log(' [x] Done');channel.ack(msg);} catch (error) {console.log('error:', data);// 处理消息失败,判断是否需要进行重试if (canRetry(msg)) {// 重新入队,进行下一次尝试channel.reject(msg, true);} else {// 不进行重试,将消息从队列中移除channel.ack(msg);}}}
});
  • 在这种情况下,消费者需要在处理完消息后显式调用 channel.ack(msg) 来确认消息已被处理。如果消费者崩溃或在处理消息时发生错误,消息将保持在队列中,直到被明确确认。
  • 在 RabbitMQ 中,channel.reject 方法用于拒绝一条消息。它的参数如下channel.reject(msg, requeue);
    msg: 要拒绝的消息对象。
    requeue: 如果设置为 true,则被拒绝的消息将被重新排队,即重新放回队列。如果设置为 false,则消息将被删除。默认为 true。

综合使用消息持久化和手动应答,可以确保在面对不同情况时,消息的可靠传递和处理。

重试间隔和次数

在进行消息重试时,你可以考虑添加一些延迟和控制重试次数的逻辑。以下是修改后的代码,考虑了重试间隔和次数的情况:

function canRetry(msg: Message) {const maxRetryAttempts = 3; // 最大重试次数// 从消息的属性中获取重试次数const retryCount = msg.properties.headers['x-retry-count'] || 0;// 判断是否达到最大重试次数return retryCount < maxRetryAttempts;
}async function processMailTask(msg: Message) {const data: EmailTask = JSON.parse(msg.content.toString());console.log('Processing mail task:', msg.content.toString());try {// 模拟邮件发送await new Promise(resolve => setTimeout(resolve, 1000));console.log(' [x] Done');channel.ack(msg);} catch (error) {console.log('error:', data);// 处理消息失败,判断是否需要进行重试if (canRetry(msg)) {// 获取当前重试次数const retryCount = msg.properties.headers['x-retry-count'] || 0;// 计算下一次重试的延迟时间,可以根据重试次数进行指数退避const delay = Math.pow(2, retryCount) * 1000;// 在一定的延迟后重新入队,进行下一次尝试setTimeout(() => {channel.nack(msg, false, false);}, delay);} else {// 不进行重试,将消息从队列中移除channel.ack(msg);}}
}channel.consume(queueName, async (msg: Message | null) => {if (msg) {await processMailTask(msg);}
});

在这个示例中,我添加了一个 processMailTask 函数来处理邮件任务。在处理失败的情况下,根据重试次数计算下一次重试的延迟时间,然后使用 setTimeout 在一定的延迟后重新入队。这里使用了指数退避的策略,即每次重试的延迟时间是上一次的两倍。

channel.nack(msg, allUpTo, requeue);
  • msg: 要否定的消息对象。
  • allUpTo(可选参数): 如果设置为 true,则所有比当前消息更早的消息也将被否定。默认为 false
  • requeue(可选参数): 如果设置为 true,则被否定的消息将被重新排队,即重新放回队列。如果设置为 false,则消息将被删除。默认为 true

在你的代码中,channel.nack(msg, false, false); 表示对当前消息 msg 进行否定,并且不重新将消息放回队列,而是将其删除。这在处理重试逻辑时很重要,因为我们通过 setTimeout 自定义了重试的时间,并手动重新入队。如果 requeue 设置为 true,消息会立即被重新入队,这可能与我们的定制重试逻辑冲突。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/647735.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端 - 视觉呈现技术

前言 不知道大家有没有在微信中看见过&#xff0c;那种特别炫酷的h5&#xff0c;从事前端工作的伙伴看着这么炫酷的效果&#xff0c;又不知如何实现。本文就带你走进前端高阶视觉特效。 特效 视觉呈现做的最熟的&#xff0c;我觉得应是视频了&#xff0c;每个视频工具都能根…

使用封装函数判断素数

#include<stdio.h> int judgeprime(int n) //判断是不是素数 { int i 0; int j 1; //默认素数&#xff0c;判断之后若不是素数&#xff0c;返回j0&#xff0c;否则返回j1 for (i 2; i < n; i) {if (n%i 0) //有其他共因数&#xff0c;不为素数{j 0;break;} }…

26.删除排序数组中的重复项(力扣LeetCode)

26.删除排序数组中的重复项 题目描述 给你一个 非严格递增排列 的数组 nums &#xff0c;请你 原地 删除重复出现的元素&#xff0c;使每个元素 只出现一次 &#xff0c;返回删除后数组的新长度。元素的 相对顺序 应该保持 一致 。然后返回 nums 中唯一元素的个数。 考虑 nu…

v-if 导致 elementui 表单校验失效问题解决

问题 在使用 elementui 表单的过程中&#xff0c;某些表单项需要通过 v-if 来判断是否展示&#xff0c;但是这些表单项出现了检验失效的问题。 解决方法 1、给需要 v-if 判断的表单项添加 key 值 <el-form ref"form" :model"form"><el-form-i…

【Unity学习笔记】创建人物控制器

人物左右移动 1 导入模型&#xff0c;如果没有模型&#xff0c;则在 窗口-资产商店-free sample 找到人物模型 2 在 窗口-包管理中 导入自己的模型 3 在自己的资产文件夹中找到Prefabs Base HighQuality MaleFree1模型&#xff0c;导入到场景中 4 Assets中创建C#项目 写入如下…

四、MySQL之DML DQL

有关数据表的DML操作 INSERT 针对于数据的插入DELETE 针对于数据的删除UPDATE 针对于数据的修改 4.1 INSERT语句 INSERT INTO 表名 [(列名1,列名2,....)] VALUES (值1&#xff0c;值2&#xff0c;...); 默认情况下&#xff0c;一条插入命令只针对一行进行影响INSERT INTO 表…

Leetcode:二分搜索树层次遍历

题目&#xff1a; 给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 示例&#xff1a; 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;[[3],[9,…

使用阿里云的oss对象存储服务实现图片上传(前端vue后端java详解)

一&#xff1a;前期准备&#xff1a; 1.1&#xff1a;注册阿里云账号&#xff0c;开启对象存储oss功能&#xff0c;创建一个bucket&#xff08;百度教程多的是&#xff0c;跟着创建一个就行&#xff0c;创建时注意存储类型是标准存储&#xff0c;读写权限是公共读&#xff09;…

《吐血整理》高级系列教程-吃透Fiddler抓包教程(24)-Fiddler如何优雅地在正式和测试环境之间来回切换-中篇

1.简介 在开发或者测试的过程中&#xff0c;由于项目环境比较多&#xff0c;往往需要来来回回地反复切换&#xff0c;那么如何优雅地切换呢&#xff1f;宏哥今天介绍几种方法供小伙伴或者童鞋们进行参考。 2.实际工作场景 2.1问题场景 &#xff08;1&#xff09;已发布线上…

YOLOv8改进 | Conv篇 | 利用DualConv二次创新C2f提出一种轻量化结构(轻量化创新)

一、本文介绍 本文给大家带来的改进机制是利用DualConv改进C2f提出一种轻量化的C2f,DualConv是一种创新的卷积网络结构,旨在构建轻量级的深度神经网络。它通过结合33和11的卷积核处理相同的输入特征映射通道,优化了信息处理和特征提取。DualConv利用组卷积技术高效排列卷积…

2024美赛数学建模D题思路模型代码论文

2024美赛数学建模各题思路模型代码&#xff1a;开赛后第一时间更新&#xff0c;更新见文末 一、2023题目重述 Homer是棒球运动中的术语&#xff0c;是非正式的美式英语单词。令人惊讶的是&#xff0c;Homer&#xff08;本垒打&#xff09;在剑桥词典网站的搜索次数超过79000次…

JavaScript基础之输入输出与变量常量详解

输入和输出 输出和输入也可理解为人和计算机的交互&#xff0c;用户通过键盘、鼠标等向计算机输入信息&#xff0c;计算机处理后再展示结果给用户&#xff0c;这便是一次输入和输出的过程。 举例说明&#xff1a;如按键盘上的方向键&#xff0c;向上/下键可以滚动页面&#x…

webpack常见的loader和plugin

Webpack 中的 Loader 和 Plugin 是两个核心概念&#xff0c;它们用于处理不同类型的文件和执行一些额外的构建任务。下面是一些常见的 Webpack Loader 和 Plugin&#xff1a; 常见 Loader: babel-loader: 用于将 ECMAScript 2015 代码转译为向后兼容的 JavaScript。 style-load…

【Spring Boot 3】【JPA】日期时间类型持久化

【Spring Boot 3】【JPA】日期时间类型持久化 背景介绍开发环境开发步骤及源码工程目录结构背景 软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花…

unity 装饰器模式(实例详解)

文章目录 简介1. **组件装饰器&#xff08;Component Decorators&#xff09;**:2. **游戏对象特效装饰器&#xff08;GameObject Effects Decorator&#xff09;**:3. **输入处理装饰器&#xff08;Input Handling Decorators&#xff09;**:4. **性能优化装饰器&#xff08;P…

70从零开始学Java之Collection与Collections有什么区别?

作者:孙玉昌,昵称【一一哥】,另外【壹壹哥】也是我哦 CSDN博客专家、万粉博主、阿里云专家博主、掘金优质作者 前言 截止到现在,壹哥已经把Java里的List、Set和Map这三大集合都给大家讲解完毕了,不知道各位掌握了多少呢?如果你对之前的内容还没有熟练掌握,可以把壹哥前…

springboot核心有几层架构

Spring Boot核心有四层架构&#xff1a; 应用层&#xff1a;包含应用程序的入口点和控制器层。这层负责接收请求、处理业务逻辑&#xff0c;并返回响应结果。 服务层&#xff1a;包含业务逻辑的实现。这层负责处理各种业务逻辑&#xff0c;例如数据处理、事务管理等。 数据访…

day32WEB 攻防-通用漏洞文件上传二次渲染.htaccess变异免杀

本章节知识点&#xff1a; 1 、文件上传 - 二次渲染 2 、文件上传 - 简单免杀变异 3 、文件上传 -.htaccess 妙用 4 、文件上传 -PHP 语言特性 前置知识&#xff1a; 后门代码需要用特定格式后缀解析&#xff0c;不能以图片后缀解析脚本后门代码 ( 解析漏洞除外 ) 如&…

codeforces 1200分

文章目录 1.[B. Same Parity Summands](https://codeforces.com/contest/1352/problem/B)2.[C. Challenging Cliffs](https://codeforces.com/problemset/problem/1537/C)3.[B. Sorted Adjacent Differences](https://codeforces.com/contest/1339/problem/B)4.[C1. k-LCM (eas…

应急响应-windows-日志分析

日志概述 在windows系统中&#xff0c;日志文件包括&#xff1a;系统日志、安全性日志及应用程序日志&#xff0c;对于应急响应工程师来说这三类日志需要熟练掌握&#xff0c;其位置如下。 在windows 2000专业版/windows XP/Windows Server 200&#xff08;注意日志文件的后缀…