RabbitMQ教程:工作队列(Work Queues)(二)

RabbitMQ教程:工作队列(Work Queues)(二)

一、引言

在快节奏的软件开发世界中,我们经常面临需要异步处理任务的场景,比如在Web应用中处理耗时的图片处理或数据分析任务。这些任务如果直接在用户的HTTP请求中同步处理,会导致用户体验不佳,因为用户需要等待任务完成才能继续。这时,工作队列(Work Queues)就显得尤为重要。工作队列允许我们将任务排队,然后在后台异步处理,这样可以释放Web服务器来处理更多的用户请求,提高应用的响应速度和吞吐量。

在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现工作队列。我们将创建一个模拟的任务分发系统,它能够在多个工作进程之间分配任务,确保任务的均衡执行和持久存储,即使在RabbitMQ服务器重启的情况下也不会丢失任务。
在这里插入图片描述

二、简介

在上一篇教程中,我们学习了如何使用RabbitMQ发送和接收消息。今天,我们将探索工作队列(Work Queues),这是一种在多个工作进程(workers)之间分配耗时任务的机制。工作队列也被称为任务队列(Task Queues),它的核心思想是避免立即执行资源密集型任务,而是将任务安排到以后执行。通过这种方式,我们可以将任务封装成消息并发送到队列中,然后由后台运行的工作进程来处理这些任务。

三、准备工作

3.1 说明

在之前的教程中,我们发送了包含“Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。由于我们没有实际的任务(比如需要调整大小的图片或需要渲染的PDF文件),我们将使用Task.Delay()函数来模拟工作负载。

3.2 生成项目

首先,我们需要生成两个项目(也可直接vs创建):

dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
cd ../Worker
dotnet add package RabbitMQ.Client

这些命令创建了两个新的控制台应用程序,一个用于发送任务(NewTask),另一个用于接收并处理任务(Worker)。

四、实战

4.1 修改发送程序

我们将更新NewTask程序,以便从命令行发送任意消息。这个程序将任务安排到我们的工作队列中,因此我们将其命名为NewTask

using RabbitMQ.Client;
using System.Text;await PublishMessagesAsync(20);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{// 循环发送指定次数的消息for (int i = 1; i <= loopCount; i++){// 调用SendMessageToQueue方法发送消息,并包含当前迭代次数await SendMessageToQueue($"Iteration {i} - Hello World");// 这里可以添加延迟,如果需要的话// await Task.Delay(1000);}Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();// 使用异步方式创建通道using var channel = await connection.CreateChannelAsync();// 异步声明名为"task_queue"的持久队列await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,autoDelete: false, arguments: null);// 将消息内容编码为字节数组var body = Encoding.UTF8.GetBytes(message);// 创建消息属性,并设置为持久化var properties = new BasicProperties{Persistent = true};// 异步发布消息到队列await channel.BasicPublishAsync(exchange: string.Empty, routingKey: "task_queue", mandatory: true,basicProperties: properties, body: body);// 打印消息发送确认信息Console.WriteLine($" [x] Sent {message}");
}

4.2 修改接收程序

我们的旧Receive.cs脚本也需要修改,以便模拟消息体中每个点号一秒的工作量。以下是修改后的代码:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhost
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();
// 异步声明名为"task_queue"的持久队列
await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,autoDelete: false, arguments: null);// 设置QoS参数,确保每次只有一个消息被分发给同一个消费者
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);Console.WriteLine(" [*] Waiting for messages.");// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);// 设置接收到消息时的事件处理程序
consumer.ReceivedAsync += async (model, ea) =>
{// 获取消息体并转换为字符串byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] Received {message}");// 模拟工作负载,延时1.5秒await Task.Delay(1500);// 手动确认消息,确保消息被正确处理await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
};// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"task_queue"队列中的消息
await channel.BasicConsumeAsync("task_queue", // 队列名称autoAck: false, // 是否自动确认消息,默认为false,需要手动确认consumer: consumer); // 指定消费者对象,用于接收消息
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

在这里插入图片描述

4.3 消息确认(Message Acknowledgment)

在处理任务时,可能会出现消费者处理任务时崩溃的情况。如果消费者在处理任务时崩溃,RabbitMQ会立即将消息标记为已删除,这样我们就会丢失正在处理的消息以及所有未处理的消息。

为了确保消息不会丢失,RabbitMQ支持消息确认(acknowledgment)。消费者会向RabbitMQ发送确认,告知它特定的消息已经被接收和处理,RabbitMQ可以安全地删除该消息。

如果消费者在未发送确认的情况下崩溃(例如,通道关闭、连接关闭或TCP连接丢失),RabbitMQ会将该消息重新排队。如果有其他消费者在线,RabbitMQ会迅速将该消息重新分发给其他消费者。这样,我们可以确保即使工作进程偶尔崩溃,也不会丢失任何消息。

默认情况下,消费者的确认超时时间为30分钟。这有助于检测未确认的消费者。如果需要,可以根据需要增加此超时时间。

在我们的代码中,我们已经将autoAck参数设置为false,并在处理完任务后手动发送确认。以下是确认消息的代码:

await channel.BasicConsumeAsync("task_queue", autoAck: false, consumer: consumer);

4.4 公平分发(Fair Dispatch)

你可能已经注意到,分发仍然不是我们想要的方式。例如,在有两个工作进程的情况下,如果所有奇数消息都很重,而偶数消息都很轻,一个工作进程将始终忙碌,而另一个工作进程几乎不工作。这是因为RabbitMQ在消息进入队列时就分发消息,它不会查看消费者未确认的消息数量。它只是简单地将每第n个消息分发给第n个消费者。

为了改变这种行为,我们可以使用BasicQos方法,并设置prefetchCount为1。这告诉RabbitMQ一次不要给工作进程超过一个消息。换句话说,直到工作进程处理并确认前一个消息之前,不要分发新消息给它。相反,它将分发给下一个不忙的工作进程。在这里插入图片描述

五、结论

在本教程中,我们深入探讨了RabbitMQ工作队列的概念和实现。通过构建一个模拟的任务分发系统,我们学习了如何在多个工作进程之间分配任务,以及如何确保任务的均衡执行和持久存储。以下是我们从本教程中获得的关键要点:

  1. 异步任务处理:通过使用工作队列,可以将耗时的任务异步处理,从而提高Web应用的响应速度和用户体验。

  2. 任务封装:将复杂的任务封装成消息,发送到队列中,由后台的工作进程来处理这些任务。

  3. 消息确认:实现了消息确认机制,确保了即使在消费者处理任务时崩溃,消息也不会丢失,并且可以被重新分发给其他消费者处理。

  4. 公平分发:通过设置prefetchCount为1,实现了公平分发,确保了工作负载在所有消费者之间均匀分配,避免了某些消费者过载而其他消费者空闲的情况。

  5. 持久化:将队列和消息设置为持久化,以确保即使RabbitMQ服务器重启,任务也不会丢失。

通过这些机制,我们能够建立一个健壮的工作队列系统,它不仅能够提高应用的性能,还能够在面对各种异常情况时保持任务的可靠性和持久性。这些知识为我们在实际开发中实现复杂的异步任务处理提供了坚实的基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/61089.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

乐维网管平台(七):网络稳定与高效的“安全锦囊”

试想一下&#xff0c;你给电脑升级了一个软件&#xff0c;升级完成后发现有BUG&#xff0c;经常无故卡死&#xff0c;这时候想回退或重新安装旧版本…相对地&#xff0c;一家企业的网络管理员&#xff0c;在对公司的核心交换机进行复杂的配置调整时&#xff0c;一个小小的疏忽&…

时代变迁对传统机器人等方向课程的巨大撕裂

2020年之后&#xff0c;全面转型新质课程规划&#xff0c;传统课程规划全部转为经验。 农耕-代表性生产关系-封建分配制度主要生产力-人力工业-代表性生产关系-资本分配制度工业分为机械时代&#xff0c;电气时代&#xff0c;信息时代&#xff1b;主要生产力-人力转为人脑&…

Spring6 AOP 面向切面编程

1. 概念 面向切面编程&#xff1a;一种编程思想。proxy动态代理&#xff08;实现了这种思想&#xff09;&#xff1a;在原方法执行时&#xff0c;给原方法的前面或着后面增加其他的方法。增加的方法并不会写在原方法中 原方法就是目标方法&#xff0c;增加的方法就是代理方法 …

计算机组成与原理(2) basic of computer architecture

Instruction Set Architecture (ISA) 和 Hardware System Architecture (HSA) 是计算机体系结构中两个重要的层次&#xff0c;它们各自的职责和作用如下&#xff1a; Instruction Set Architecture (ISA) 定义 ISA是指令集体系结构&#xff0c;是硬件和软件之间的接口。它定义…

window的wsl(Ubuntu)安装kafka步骤

环境&#xff1a;Win11 WSL(Linux子系统Ubuntu) apache-zookeeper-3.9.3-bin kafka_2.12-3.8.1 思路&#xff1a;apache上分别下载zookeeper和kafka&#xff0c;在wsl环境安装。在kafka上创建消息的topic&#xff0c;发送消息&#xff0c;接受消息&#xff0c;验证是否安…

数据结构树和二叉树知识点和递归序列

二叉树知识点 一.树的概念1.1关于树的名词解释 二.二叉树的概念1. 二叉树性质&#xff1a; 三.满二叉树与完全二叉树递归前序遍历递归中序遍历递归后续遍历 一.树的概念 树是一种非线性数据结构&#xff0c;它是由n个或大于n个的结点来组成具有层次关系的一个集合&#xff08;…

速通前端篇 —— CSS

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a;速通前端 目录 CSS的介绍 基本语法规范 CSS选择器 标签选择器 class选择器 id选择器 复合选择器 通配符选择器 CSS常见样式 颜…

使用 Elastic 3 步实现基于 OTel 的原生 K8s 和应用可观测性

作者&#xff1a;来自 Elastic Bahubali Shetti Elastic 的 OpenTelemetry 发行版现已支持 OTel Operator&#xff0c;可使用 EDOT SDK 自动检测应用程序&#xff0c;并管理 EDOT OTel Collector 的部署和生命周期以实现 Kubernetes 可观察性。了解如何通过 3 个简单步骤进行配…

stack、queue、priority_queue、deque的使用和模拟实现

目录 1.容器适配器 2.stack stack的常用接口及使用示例 stack的模拟实现 3.queue queue的常用接口及使用示例 queue的模拟实现 4.priority_queue priority_queue的常用接口及使用示例 priority_queue的模拟实现 5.deque 认识deque deque底层的数据结构 deque和ve…

Linux的cuDNN(cudnn)安装教程(CUDA(cuda\cuda toolkit))

CUDA(cuda\cuda toolkit&#xff09;安装教程 https://blog.csdn.net/huiyayaya/article/details/143863835?spm1001.2014.3001.5502官网下载cudnn https://developer.nvidia.com/rdp/cudnn-archive这个下载到自己的电脑 下载到本地就好 复制到服务器 切换到cudnn文件所在目…

Kafka中ACKS LSO LEO LW HW AR ISR OSR解析

名称解释 ACKS&#xff08;Acknowledgments&#xff09;确认、回执 LW&#xff08;Low watermark&#xff09;低水位、LSO&#xff08;Log start offset&#xff09;起始偏移量 HW&#xff08;High watermark&#xff09;高水位 LEO&#xff08;Log end offset&#xff09;…

C++设计模式行为模式———迭代器模式

文章目录 一、引言二、迭代器模式三、总结 一、引言 迭代器模式是一种行为设计模式&#xff0c; 让你能在不暴露集合底层表现形式 &#xff08;列表、 栈和树等&#xff09; 的情况下遍历集合中所有的元素。C标准库中内置了很多容器并提供了合适的迭代器&#xff0c;尽管我们不…

智能体Agent调研

单个智能体建模与优化现状 人类长期以来追求类似于或超越人类水平的人工智能 (AI)&#xff0c;而 基于AI的代理&#xff08;Agent&#xff09;被认为是一个有前途的研究方向。传统计算机领域的Agent有多种&#xff0c;如自动化脚本、网络爬虫、推荐系统、软件机器人能够独立自…

SAP PI/PO Proxy2JDBC SQL_QUERY动态接口示例

目录 背景&#xff1a; 完整demo步骤&#xff1a; IR: ID: SPROXY: 测试代码&#xff1a; 注意点&#xff1a; 背景&#xff1a; 中途临时帮客户项目做其他功能&#xff0c;项目上有部分开发项需要通过PO去第三方数据库取数&#xff0c;项目上的开发对PO不太熟&#xf…

【汇编语言】数据处理的两个基本问题(三) —— 汇编语言的艺术:从div,dd,dup到结构化数据的访问

文章目录 前言1. div指令1.1 使用div时的注意事项1.2 使用格式1.3 多种内存单元表示方法进行举例1.4 问题一1.5 问题一的分析与求解1.5.1 分析1.5.2 程序实现 1.6 问题二1.7 问题二的分析与求解1.7.1 分析1.7.2 程序实现 2. 伪指令 dd2.1 什么是dd&#xff1f;2.2 问题三2.3 问…

模型的评估指标——IoU、混淆矩阵、Precision、Recall、P-R曲线、F1-score、mAP、AP、AUC-ROC

文章目录 预测框的预测指标——IoU&#xff08;交并比&#xff09;分类预测指标混淆矩阵&#xff08;Confusion Matrix&#xff0c;TP、FP、FN、TN)Precision&#xff08;精度&#xff09;Recall&#xff08;召回率&#xff09;P-R曲线F1-scoreTPR、TNR、FPR、FNRROC曲线下面积…

轻量云服务器:入门级云计算的最佳选择

在云计算领域&#xff0c;轻量云服务器逐渐成为小型企业、个人开发者和初创公司关注的热点。它凭借高性价比、简单易用和稳定性能&#xff0c;正在改变传统的服务器部署方式。本文将详细解析轻量云服务器的定义、优势、适用场景及如何选择最佳方案&#xff0c;帮助您全面了解这…

【初阶数据结构篇】归并排序、计数排序

文章目录 须知 &#x1f4ac; 欢迎讨论&#xff1a;如果你在学习过程中有任何问题或想法&#xff0c;欢迎在评论区留言&#xff0c;我们一起交流学习。你的支持是我继续创作的动力&#xff01; &#x1f44d; 点赞、收藏与分享&#xff1a;觉得这篇文章对你有帮助吗&#xff1…

移远通信5G RedCap模组RG255C-CN通过中国电信5G Inside终端生态认证

近日&#xff0c;移远通信5G RedCap模组RG255C-CN荣获中国电信颁发的5G Inside终端生态认证证书。这表明&#xff0c;该产品在5G基本性能、网络兼容性、安全特性等方面已经过严格评测且表现优异&#xff0c;将进一步加速推动5G行业终端规模化应用。 中国电信5G Inside终端生态认…

Towards Reasoning in Large Language Models: A Survey

文章目录 题目摘要引言什么是推理?走向大型语言模型中的推理测量大型语言模型中的推理发现与启示反思、讨论和未来方向 为什么要推理?结论题目 大型语言模型中的推理:一项调查 论文地址:https://arxiv.org/abs/2212.10403 项目地址: https://github.com/jeffhj/LM-reason…