RabbitMQ教程C#版 - 工作队列

先决条件
本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表联系我们。

1.工作队列

(使用.NET客户端)

在第一篇教程RabbitMQ教程C#版 “Hello World”中,我们编写了两个程序,用于从一个指定的队列发送和接收消息。在本文中,我们将创建一个工作队列,用于在多个工作线程间分发耗时的任务。

工作队列(又名:任务队列)背后的主要想法是避免立即执行资源密集型、且必须等待其完成的任务。相反的,我们把这些任务安排在稍后完成。我们可以将任务封装为消息并把它发送到队列中,在后台运行的工作进程将从队列中取出任务并最终执行。当您运行多个工作线程,这些任务将在这些工作线程之间共享。

这个概念在Web应用程序中特别有用,因为在一个HTTP请求窗口中无法处理复杂的任务。

2.准备

我们将略微修改上一个示例中的Send程序,以其可以在命令行发送任意消息。
这个程序将调度任务到我们的工作队列中,所以让我们把它命名为NewTask

像教程[1],我们需要生成两个项目:

dotnet new console --name NewTask

mv NewTask/Program.cs NewTask/NewTask.cs


dotnet new console --name Worker

mv Worker/Program.cs Worker/Worker.cs


cd NewTask

dotnet add package RabbitMQ.Client

dotnet restore


cd ../Worker

dotnet add package RabbitMQ.Client

dotnet restore


var message = GetMessage(args);

var body = Encoding.UTF8.GetBytes(message);


var properties = channel.CreateBasicProperties();

properties.Persistent = true;


channel.BasicPublish(exchange: "",

                     routingKey: "task_queue",

                     basicProperties: properties,

                     body: body);

从命令行参数获取消息的帮助方法:

private static string GetMessage(string[] args)

{

    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");

}

我们旧的Receive.cs脚本也需要进行一些更改:它需要为消息体中的每个点模拟一秒种的时间消耗。它将处理由RabbitMQ发布的消息,并执行任务,因此我们把它复制到Worker项目并修改:

// 构建消费者实例。

var consumer = new EventingBasicConsumer(channel);


// 绑定消息接收事件。

consumer.Received += (model, ea) =>

{

    var body = ea.Body;

    var message = Encoding.UTF8.GetString(body);

    Console.WriteLine(" [x] Received {0}", message);


    // 模拟耗时操作。

    int dots = message.Split('.').Length - 1;

    Thread.Sleep(dots * 1000);


    Console.WriteLine(" [x] Done");

};


channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);

模拟虚拟任务的执行时间:

int dots = message.Split('.').Length - 1;

Thread.Sleep(dots * 1000);

3.循环调度

使用任务队列的优点之一是能够轻松地并行工作。如果我们正在积累积压的工作,我们仅要增加更多的工作者,并以此方式可以轻松扩展。

首先,我们尝试同时运行两个Worker实例。他们都会从队列中获取消息,但究竟如何?让我们来看看。

您需要打开三个控制台,两个运行Worker程序,这些控制台作为我们的两个消费者 - C1和C2。

# shell 1

cd Worker

dotnet run

# => [*] Waiting for messages. To exit press CTRL+C

# shell 2

cd Worker

dotnet run

# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们将发布一些新的任务。一旦你已经运行了消费者,你可以尝试发布几条消息:

# shell 3

cd NewTask

dotnet run "First message."

dotnet run "Second message.."

dotnet run "Third message..."

dotnet run "Fourth message...."

dotnet run "Fifth message....."

让我们看看有什么发送到了我们的Worker程序:

# shell 1

# => [*] Waiting for messages. To exit press CTRL+C

# => [x] Received 'First message.'

# => [x] Received 'Third message...'

# => [x] Received 'Fifth message.....'

# shell 2

# => [*] Waiting for messages. To exit press CTRL+C

# => [x] Received 'Second message..'

# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ会按顺序将每条消息发送给下一个消费者。消费者数量平均的情况下,每个消费者将会获得相同数量的消息。这种分配消息的方式称为循环(Round-Robin)。请尝试开启三个或更多的Worker程序来验证。

4.消息确认

处理一项任务可能会需要几秒钟的时间。如果其中一个消费者开启了一项长期的任务并且只完成了部分就挂掉了,您可能想知道会发生什么?在我们当前的代码中,一旦RabbitMQ把消息分发给了消费者,它会立即将这条消息标记为删除。在这种情况下,如果您停掉某一个Worker,我们将会丢失这条正在处理的消息,也将丢失所有分发到该Worker但尚未处理的消息。

但是我们不想丢失任何一个任务。如果一个Worker挂掉了,我们希望这个任务能被重新分发给其他Worker。

为了确保消息永远不会丢失,RabbitMQ支持消息确认机制。消费者回发一个确认信号Ack(nowledgement)给RabbitMQ,告诉它某个消息已经被接收、处理并且可以自由删除它。

如果一个消费者在还没有回发确认信号之前就挂了(其通道关闭,连接关闭或者TCP连接丢失),RabbitMQ会认为该消息未被完全处理,并将其重新排队。如果有其他消费者同时在线,该消息将会被会迅速重新分发给其他消费者。这样,即便Worker意外挂掉,也可以确保消息不会丢失。

没有任何消息会超时;当消费者死亡时,RabbitMQ将会重新分发消息。即使处理消息需要非常非常长的时间也没关系。

默认情况下,手动消息确认模式是开启的。在前面的例子中,我们通过将autoAck(“自动确认模式”)参数设置为true来明确地关闭手动消息确认模式。一旦完成任务,是时候删除这个标志并且从Worker手动发送一个恰当的确认信号给RabbitMQ。

// 构建消费者实例。

var consumer = new EventingBasicConsumer(channel);


// 绑定消息接收事件。

consumer.Received += (model, ea) =>

{

    var body = ea.Body;

    var message = Encoding.UTF8.GetString(body);

    Console.WriteLine(" [x] Received {0}", message);

    

    // 模拟耗时操作。

    int dots = message.Split('.').Length - 1;

    Thread.Sleep(dots * 1000);


    Console.WriteLine(" [x] Done");

    

    // 手动发送消息确认信号。

    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

};


// autoAck:false - 关闭自动消息确认,调用`BasicAck`方法进行手动消息确认。

// autoAck:true  - 开启自动消息确认,当消费者接收到消息后就自动发送ack信号,无论消息是否正确处理完毕。

channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

使用上面这段代码,我们可以确定的是,即使一个Worker在处理消息时,我们通过使用CTRL + C来终止它,也不会丢失任何消息。Worker挂掉不久,所有未确认的消息将会被重新分发。

忘记确认
遗漏BasicAck是一个常见的错误。这是一个很简单的错误,但导致的后果却是严重的。当客户端退出时(看起来像是随机分发的),消息将会被重新分发,但是RabbitMQ会吃掉越来越多的内存,因为它不能释放未确认的消息。
为了调试这种错误,您可以使用rabbitmqctl来打印messages_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上,删除sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

5.消息持久化

我们已经学习了如何确保即使消费者挂掉,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务还是会丢失。

当RabbitMQ退出或崩溃时,它会忘记已存在的队列和消息,除非告诉它不要这样做。为了确保消息不会丢失,有两件事是必须的:我们需要将队列和消息标记为持久

首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了做到这一点,我们需要把队列声明是持久的(Durable)

// 声明队列,通过指定durable参数为`true`,对消息进行持久化处理。 

channel.QueueDeclare(queue: "hello",

                     durable: true,

                     exclusive: false,

                     autoDelete: false,

                     arguments: null);

虽然这个命令本身是正确的,但是它在当前设置中不会起作用。那是因为我们已经定义过一个名为hello的队列,并且这个队列不是持久化的。RabbitMQ不允许使用不同的参数重新定义已经存在的队列,并会向尝试执行该操作的程序返回一个错误。但有一个快速的解决办法 - 让我们用不同的名称声明一个队列,例如task_queue

channel.QueueDeclare(queue: "task_queue",

                     durable: true,

                     exclusive: false,

                     autoDelete: false,

                     arguments: null);


注意,该声明队列QueueDeclare方法的更改需要同时应用于生产者和消费者代码。

此时,我们可以确定的是,即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将我们的消息标记为持久的(Persistent) - 通过将IBasicProperties.Persistent设置为true

// 将消息标记为持久性。

var properties = channel.CreateBasicProperties();

properties.Persistent = true;

关于消息持久性的说明
将消息标记为Persistent并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘,但当RabbitMQ接收到消息并且尚未保存消息时仍有一段时间间隔。此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存中,并没有真正写入磁盘。消息的持久化保证并不健壮,但对于简单的任务队列来说已经足够了。如果您需要一个更加健壮的保证,可以使用发布者确认。

6.公平调度

您可能已经注意到调度仍然无法完全按照我们期望的方式工作。例如,在有两个Worker的情况下,假设所有奇数消息都很庞大、偶数消息都很轻量,那么一个Worker将会一直忙碌,而另一个Worker几乎不做任何工作。是的,RabbitMQ并不知道存在这种情况,它仍然会平均地分发消息。

发生这种情况是因为RabbitMQ只是在消息进入队列后就将其分发。它不会去检查每个消费者所拥有的未确认消息的数量。它只是盲目地将第n条消息分发给第n位消费者。

为了改变上述这种行为,我们可以使用参数设置prefetchCount = 1basicQos方法。

这就告诉RabbitMQ同一时间不要给一个Worker发送多条消息。或者换句话说,不要向一个Worker发送新的消息,直到它处理并确认了前一个消息。
相反,它会这个消息调度给下一个不忙碌的Worker。

channel.BasicQos(0, 1, false);

关于队列大小的说明
如果所有的Worker都很忙,您的队列可能会被填满。请留意这一点,可以尝试添加更多的Worker,或者使用其他策略。

7.组合在一起

我们NewTask.cs类的最终代码:

using System;

using RabbitMQ.Client;

using System.Text;


class NewTask

{

    public static void Main(string[] args)

    {

        // 实例化连接工厂。

        var factory = new ConnectionFactory() { HostName = "localhost" };

        

        // 创建连接、信道。

        using(var connection = factory.CreateConnection())

        using(var channel = connection.CreateModel())

        {

            // 声明队列,标记为持久性。

            channel.QueueDeclare(queue: "task_queue",

                                 durable: true,

                                 exclusive: false,

                                 autoDelete: false,

                                 arguments: null);

            

            // 获取发送消息。

            var message = GetMessage(args);

            var body = Encoding.UTF8.GetBytes(message);

            

            // 将消息标记为持久性。

            var properties = channel.CreateBasicProperties();

            properties.Persistent = true;

            

            // 发送数据包。

            channel.BasicPublish(exchange: "",

                                 routingKey: "task_queue",

                                 basicProperties: properties,

                                 body: body);

                                 

            Console.WriteLine(" [x] Sent {0}", message);

        }


        Console.WriteLine(" Press [enter] to exit.");

        Console.ReadLine();

    }


    private static string GetMessage(string[] args)

    {

        return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");

    }

}

(NewTask.cs源码)

还有我们的Worker.cs

using System;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using System.Text;

using System.Threading;


class Worker

{

    public static void Main()

    {

        // 实例化连接工厂。

        var factory = new ConnectionFactory() { HostName = "localhost" };

        

         // 创建连接、信道。

        using(var connection = factory.CreateConnection())

        using(var channel = connection.CreateModel())

        {

            // 声明队列,标记为持久性。

            channel.QueueDeclare(queue: "task_queue",

                                 durable: true,

                                 exclusive: false,

                                 autoDelete: false,

                                 arguments: null);

            

            // 告知RabbitMQ,在未收到当前Worker的消息确认信号时,不再分发给消息,确保公平调度。

            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);


            Console.WriteLine(" [*] Waiting for messages.");


            // 构建消费者实例。

            var consumer = new EventingBasicConsumer(channel);

            

            // 绑定消息接收事件。

            consumer.Received += (model, ea) =>

            {

                var body = ea.Body;

                var message = Encoding.UTF8.GetString(body);

                Console.WriteLine(" [x] Received {0}", message);


                // 模拟耗时操作。

                int dots = message.Split('.').Length - 1;

                Thread.Sleep(dots * 1000);


                Console.WriteLine(" [x] Done");


                // 手动发送消息确认信号。

                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

            };

            

            channel.BasicConsume(queue: "task_queue",

                                 autoAck: false,

                                 consumer: consumer);


            Console.WriteLine(" Press [enter] to exit.");

            Console.ReadLine();

        }

    }

}

(Worker.cs源码)

使用消息确认机制和BasicQ您可以创建一个工作队列。即使RabbitMQ重新启动,通过持久性选项也可让任务继续存在。

有关IModel方法和IBasicProperties的更多信息,您可以在线浏览RabbitMQ .NET客户端API参考。

现在,我们可以继续阅读教程[3],学习如何向多个消费者发送相同的消息。

8.写在最后

本文翻译自RabbitMQ官方教程C#版本。本文介绍如与官方有所出入,请以官方最新内容为准。

水平有限,翻译的不好请见谅,如有翻译错误还请指正。

  • 原文链接:RabbitMQ tutorial - Work Queues

  • 实验环境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code

  • 最后更新:2018-04-03


相关文章: 

  • .net core 使用Redis的发布订阅

  • RabbitMQ知多少

  • RabbitMQ系列教程之四:路由(Routing)

  • RabbitMQ系列教程之三:发布/订阅(Publish/Subscribe)

  • RabbitMQ系列教程之二:工作队列(Work Queues)

  • 如何优雅的使用RabbitMQ

  • .NET 使用 RabbitMQ 图文简介

  • RabbitMQ 高可用集群搭建及电商平台使用经验总结

  • .NET Core 使用RabbitMQ

  • ASP.NET Core Web API下事件驱动型架构的实现(三):基于RabbitMQ的事件总线

  • RabbitMQ教程C#版 “Hello World”

原文地址:https://www.cnblogs.com/esofar/p/rabbitmq-work-queues.html 


.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/321757.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

动态规划训练23 [Making the Grade POJ - 3666 ]

Making the Grade POJ - 3666 这道题目有点意思。 我们定义dp[i][j]表示的含义是把包含前i个元素的子序列变成非递减的子序列,并且最后一个元素变成j所需要耗费的最小代价 那么状态转移方程可以写出来就是: dp[i][j] min(dp[i-1][k] abs(num[i] - j…

jzoj3889-序列问题【dp,高精度】

正题 题目链接:https://jzoj.net/senior/#main/show/3889 题目大意 一个序列nnn&#xff0c;求两个集合S,T∈[1..n]S,T\in[1..n]S,T∈[1..n]使得ax(x∈S)a_x(x\in S)ax​(x∈S)的xorxorxor和就是ay(y∈T)a_y(y\in T)ay​(y∈T)的andandand和&#xff0c;且x<y(x∈S,y∈T)x…

10、java中文件的抽象表示

java中使用File类来作为文件和目录路径名的抽象表示&#xff0c;是对于文件或者目录本身的属性来说的&#xff0c;而不是针对于文件的内容。 一些关于File类基本操作的代码如下&#xff1a; public class FileTest {/*** 可用于操作文件或者目录* author chaizepeng** param a…

IdentityServer4实战 - 基于角色的权限控制及Claim详解

一.前言大家好&#xff0c;许久没有更新博客了&#xff0c;最近从重庆来到了成都&#xff0c;换了个工作环境&#xff0c;前面都比较忙没有什么时间&#xff0c;这次趁着清明假期有时间&#xff0c;又可以分享一些知识给大家。在QQ群里有许多人都问过IdentityServer4怎么用Role…

动态规划训练24 [Phalanx HDU - 2859 ]

Phalanx HDU - 2859 这是一道非常好的题目&#xff0c;我实在是没想到该怎么做&#xff0c;看了一下大神的题解才恍然大悟&#xff08;还有这种操作&#xff1f;&#xff09; 由于对称矩阵是以对称轴进行对称的&#xff08;废话&#xff09;&#xff0c;所以我们可以用dp[i][j…

jzoj3890-长途旅行【同余最短路】

正题 题目链接:https://jzoj.net/senior/#main/show/3890 题目大意 nnn个点mmm条边的图&#xff0c;询问是否有111到nnn长度为TTT的路径。 解题思路 让WWW等于连接111的最小权值的两倍&#xff0c;然后用fi,jf_{i,j}fi,j​表示到第iii个点是否有权值%Wj\%Wj%Wj。然后用fi,T%W…

11、java中的I/O流(1)

我对于流的理解是这样的&#xff0c;计算机的本质本来就是对输入的数据进行操作&#xff0c;然后将结果输出的一种工具&#xff0c;数据在各个数据源节点之间进行流动&#xff0c;感觉流就是对这种状态的一种抽象&#xff0c;一个数据流表示的就是一系列数据序列&#xff0c;ja…

nssl1436-赛艇表演【最短路】

正题 题目大意 nnn个点mmm条边的无向图&#xff0c;每个点有门票费&#xff0c;对于每个点求一个点使得去那里看完赛艇并回来消耗的时间最小。 解题思路 因为是无向图&#xff0c;所以去和回是同一条路&#xff0c;把每个点作为起点将门票费压入然后跑最短路。 codecodecode …

ASP.NET Core 集成测试

集成测试集成测试&#xff0c;也叫组装测试或联合测试。在单元测试的基础上&#xff0c;将所有模块按照设计要求&#xff08;如根据结构图&#xff09;组装成为子系统或系统&#xff0c;进行集成测试。实践表明&#xff0c;一些模块虽然能够单独地工作&#xff0c;但并不能保证…

12、java中的I/O流(2)

再介绍一下其他一些流的使用 数据操作流&#xff0c;数据输入流允许应用程序以独立于机器的方式从底层输入流读取原始Java数据类型&#xff0c;意思就是平台无关&#xff0c;相关的两个类DataInputStream、DataOutputStream&#xff0c;使用如下&#xff1a; public class Da…

nssl1437-逮虾户【二分答案】

正题 题目大意 nnn段路&#xff0c;每一段路速度是vidv_idvi​d&#xff0c;长度是sis_isi​&#xff0c;在ttt秒跑完了&#xff0c;求ddd的值。 解题思路 指数域二分&#xff0c;然后判断是否跑完即可。 codecodecode #include<cstdio> #include<cstring> #incl…

动态规划训练25 [Food Delivery ZOJ - 3469 ]好题

Food Delivery ZOJ - 3469 区间DP的一道好题。 在这道题里&#xff0c;无非就是从出发点向左走到x1再向右走到有y1&#xff0c;再向左走到x2&#xff0c;再向右走到y2.。。。这样&#xff0c;一直将所有的顾客遍历完。 显然&#xff0c;起点这个点是非常特殊的一个点&#xf…

使用C#开发Android应用之WebApp

近段时间了解了一下VS2017开发安卓应用的一些技术&#xff0c;特地把C#开发WebApp的一些过程记录下来&#xff0c;欢迎大家一起指教、讨论&#xff0c;废话少说&#xff0c;是时候开始表演真正的技术了。。1、新建空白Android应用2、拖一个WebView控件进来3、打开模拟器Genymot…

13、字符集和字符编码

字符集&#xff1a;字符集是多个字符的集合&#xff0c;常见字符集有&#xff1a;ASCII字符集、GB2312字符集、GB18030字符集、Unicode字符集等。 ASCII字符集&#xff1a;是英文大小写字符、阿拉伯数字和西文符号的一个集合。&#xff08;可以看一下电脑键盘上的键&#xff0c…

nssl1438-战略威慑【枚举,树的直径】

正题 题目大意 nnn个点的无根树&#xff0c;求两条不相交的路径使它们长度之积最大。 解题思路 我们暴力枚举第一条&#xff0c;然后求树的直径即可。 codecodecode #include<cstdio> #include<cstring> #include<algorithm> using namespace std; const i…

ASP.NET Core依赖注入深入讨论

这篇文章我们来深入探讨ASP.NET Core、MVC Core中的依赖注入&#xff0c;我们将示范几乎所有可能的操作把依赖项注入到组件中。依赖注入是ASP.NET Core的核心&#xff0c;它能让您应用程序中的组件增强可测试性&#xff0c;还使您的组件只依赖于能够提供所需服务的某些组件。举…

15、java中的集合(2)

说一下单列集合&#xff0c;java中的单列集合的顶级接口是Collection&#xff0c;它有两个子接口&#xff1a;List、Set&#xff0c;本篇介绍一下List接口及其实现类的功能方法和基本实现原理。 List集合是有序集合&#xff0c;这里的有序并不是指存入List集合的元素会被自动排…

P2467-[SDOI2010]地精部落【dp】

正题 题目链接:https://www.luogu.org/problem/P2467 题目大意 求长度为nnn的波动序列的个数。 解题思路 我们先考虑第一个是上升的&#xff0c;然后乘2即可。 设fi,jf_{i,j}fi,j​表示填1∼i1\sim i1∼i个&#xff0c;最前面的是jjj的个数。然后我们只要是1∼i−j11\sim i…

双向广搜 8数码问题

转载自&#xff1a;http://blog.sina.com.cn/s/blog_8627bf080100ticx.html Eight 题目链接&#xff1a;http://acm.hdu.edu.cn/showproblem.php?pid1043讲到双向广搜&#xff0c;那就不能不讲经典的八数码问题&#xff0c;有人说不做此题人生不完整 。所谓双向广搜&am…

使用静态基类方案让 ASP.NET Core 实现遵循 HATEOAS Restful Web API

Hypermedia As The Engine Of Application State (HATEOAS)HATEOAS&#xff08;Hypermedia as the engine of application state&#xff09;是 REST 架构风格中最复杂的约束&#xff0c;也是构建成熟 REST 服务的核心。它的重要性在于打破了客户端和服务器之间严格的契约&…