RabbitMQ系列教程之二:工作队列(Work Queues)

今天开始RabbitMQ教程的第二讲,废话不多说,直接进入话题。   (使用.NET 客户端 进行事例演示)

  

   在第一个教程中,我们编写了一个从命名队列中发送和接收消息的程序。在本教程中,我们将创建一个工作队列,这个队列将用于在多个工人之间分配耗时的任务。


    工作队列【又名:任务队列】背后主要的思想是避免立刻执行耗时的工作任务,并且一直要等到它结束为止。相反,我们规划任务并晚些执行。我们封装一个任务作为消息发送到一个命名的消息队列中,后台运行的工作线程将获取任务并且最终执行该任务。当你运行很多的任务的时候他们会  共享工作线程和队列。

    这个概念在Web应用程序中是尤其有用的,异步执行可以在短时间内处理一个复杂Http请求。

1、准备工作

    在本系列教程的前一个教程中,我们发送了一个包含“Hello World!”的消息,现在我们发送一个代表复杂任务的字符串。我们不会创建一个真实的任务,比如对图像文件进行处理或PDF文件的渲染,因此让我们假装我们很忙-通过采用Thread.Sleep()功能来实现复杂和繁忙。我们将根据字符串中的点的数量作为它的复杂性,每一个点将占一秒钟的“工作”。例如,一个假的任务描述Hello…,有三个点,我们就需要三秒。

    我们将稍微修改一下我们以前的例子中Send 程序的代码,允许从命令行发送任意消息。这个程序将把任务发送到我们的消息队列中,所以我们叫它NewTask:

   像教程一,我们需要生成两个项目。

dotnet new console --name NewTask

    mv NewTask/Program.cs NewTask/NewTask.cs

    dotnet new console --name Worker

    mv Worker/Program.cs Worker/Worker.cs

    cd NewTask

   dotnet add package RabbitMQ.Client

   dotnet restore

   cd ../Worker

   dotnet add package RabbitMQ.Client

   dotnet restore



   var message = GetMessage(args);

   var body = Encoding.UTF8.GetBytes(message);


   var properties = channel.CreateBasicProperties();

   properties.Persistent = true;


   channel.BasicPublish(exchange: "",

                     routingKey: "task_queue",

                     basicProperties: properties,

                     body: body);

信息数据我们可以从命令行的参数获得:

private static string GetMessage(string[] args)
{  
 
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }


    我们的旧Receive.cs代码也需要一些修改:需要为消息体中每个点都需要消耗一秒钟的工作,先要计算出消息体内有几个点号,然后在乘以1000,就是这个复杂消息所消耗的时间,同时表示这是一个复杂任务。RabbitMQ将处理和发送理消息,并且执行这个任务,让我们拷贝以下代码黏贴到Worker的项目中,并进行相应的修改: 

var consumer = new EventingBasicConsumer(channel);

   consumer.Received += (model, ea) =>

   {

       var body = ea.Body;

       var message = Encoding.UTF8.GetString(body);

       Console.WriteLine(" [x] Received {0}", message);


       int dots = message.Split('.').Length - 1;

       Thread.Sleep(dots * 1000);


       Console.WriteLine(" [x] Done");

   };

   channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);


 我们自己假设的任务的模拟执行时间就是:

 int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);


2、轮询调度

    我们使用任务队列的好处之一就是使任务可以并行化,增加系统的并行处理能力。如果我们正在建立一个积压的工作,我们可以紧紧增加更多的Worker实例就可以完成大量工作的处理,修改和维护就很容易。

    首先,让我们同时运行两个Worker实例。他们都会从队列中得到消息,但具体如何?让我想想。

    你需要打开三个控制台的应用程序。两个控制台程序将运行Wroker程序。这些控制台程序将是我们的两个消费者C1和C2。

# shell 1

    cd Worker

    dotnet run

    # => [*] Waiting for messages. To exit press CTRL+C


    # shell 2

    cd Worker

    dotnet run

    # => [*] Waiting for messages. To exit press CTRL+C

   在第三个控制台应用程序中我们将发布新的任务。只要你已经启动了消费者程序,你可以看到一些发布的信息:

# shell 3

   cd NewTask

   dotnet run "First message."

   dotnet run "Second message.."

   dotnet run "Third message..."

   dotnet run "Fourth message...."

   dotnet run "Fifth message....."

 让我们看看交付了什么东西在Workers:

# shell 1

   # => [*] Waiting for messages. To exit press CTRL+C

   # => [x] Received 'First message.'

   # => [x] Received 'Third message...'

   # => [x] Received 'Fifth message.....'


   # shell 2

   # => [*] Waiting for messages. To exit press CTRL+C

   # => [x] Received 'Second message..'

   # => [x] Received 'Fourth message....'

默认情况下,RabbitMQ将会发送每一条消息给序列中每一个消费者。每个消费者都会得到相同数量的信息。这种分发消息的方式叫做轮询。我们尝试这三个或更多的Workers。 


3、消息确认

     处理一个任务可能需要几秒钟。如果有一个消费者开始了一个长期的任务,并且只做了一部分就发生了异常,你可能想知道到底发生了什么。我们目前的代码,一旦RabbitMQ发送一个消息给客户立即从内存中移除。在这种情况下,如果你关掉了一个Worker,我们将失去它正在处理的信息。我们也将丢失发送给该特定员工但尚未处理的所有信息。

    但我们不想失去任何任务。如果一个Worker出现了问题,我们希望把这个任务交给另一个Woker。

    为了确保消息不会丢失,RabbitMQ支持消息确认机制。ACK(nowledgement)确认消息是从【消息使用者】发送回来告诉RabbitMQ结果的一种特殊消息,确认消息告诉RabbitMQ指定的接受者已经收到、处理,并且RabbitMQ你可以自由删除它。

    如果一个【消费者Consumer】死亡(其通道关闭,连接被关闭,或TCP连接丢失)不会发送ACK,RabbitMQ将会知道这个消息并没有完全处理,将它重新排队。如果有其他用户同时在线,它就会快速地传递到另一个【消费者】。这样你就可以肯定,没有消息丢失,即使【Worker】偶尔死了或者出现问题。

    在没有任何消息超时;当【消费者】死亡的时候RabbitMQ会重新发送消息。只要是正常的,即使处理消息需要很长很长的时间也会重发消息给【消费者】。

   消息确认的机制默认是打开的。在以前的例子中,我们明确地把它们关闭设置noAck(“没有手动确认”)参数为true。是时候删除这个标志了,并且从Worker发送一个适当确认消息,一旦我们完成了工作任务。

var consumer = new EventingBasicConsumer(channel);

   consumer.Received += (model, ea) =>

   {

       var body = ea.Body;

       var message = Encoding.UTF8.GetString(body);

       Console.WriteLine(" [x] Received {0}", message);


       int dots = message.Split('.').Length - 1;

       Thread.Sleep(dots * 1000);


       Console.WriteLine(" [x] Done");


       channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

   };

   channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);


  使用这个代码,我们可以肯定的是,即使你使用Ctrl + C关掉一个正在处理消息的Worker,也不会丢失任何东西。【Worker】被杀死后,未被确认的消息很快就会被退回。

4、忘记确认

    忘记调用BasicAck这是一个常见的错误。虽然这是一个简单的错误,但后果是严重的。消息会被退回时,你的客户退出(这可能看起来像是随机的)但是RabbitMQ将会使用更多的内存保存这些任何延迟确认消息。
    为了调试这种错误,你可以使用rabbitmqctl打印messages_unacknowledged字段值:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
    如果是在Window环境下,删除掉sudo字符就可以:rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
5、持久性的消息

    我们已经学会了如何确保即使【消费者】死亡,任务也不会丢失。但是如果RabbitMQ服务器停止了,我们的任务仍然会丢失的。
    当RabbitMQ退出或死机会清空队列和消息,除非你告诉它即使宕机也不能丢失任何东西。要确保消息不会丢失,有两件事情我们是必需要做的:我们需要将队列和消息都标记为持久的。
    首先,我们需要确保我们RabbitMQ从来都不会损失我们的的队列。为了做到这一点,我们需要声明我们的队列为持久化的:

channel.QueueDeclare(queue: "hello",durable: true,exclusive: false,autoDelete: false,arguments: null);

    虽然这个命令本身是正确的,它不会起作用在我们目前的设置中。这是因为我们已经定义了一个叫hello的队列,它不是持久化的。RabbitMQ不允许你使用不同的参数重新定义一个已经存在的队列,在任何程序代码中,都试图返回一个错误。但有一个快速的解决方法-让我们声明一个名称不同的队列,例如task_queue:

channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);

    这行代码QueueDeclare表示队列的声明,创建并打开队列,这个段代码需要应用到【生产者】和【消费者】中。

    在这一点上,我们相信,task_queue队列不会丢失任何东西即使RabbitMQ重启了。现在我们要通过设置IbasicProperties.SetPersistent属性值为true来标记我们的消息持久化的。

var properties = channel.CreateBasicProperties();
properties.Persistent
= true;

     关于消息持久性的注意

     将消息标记为持久性并不能完全保证消息不会丢失。虽然该设置告诉RabbitMQ时时刻刻把保存消息到磁盘上,但是这个时间间隔还是有的,当RabbitMQ已经接受信息但并没有保存它,此时还有可能丢失。另外,RabbitMQ不会为每个消息调用fsync(2)--它可能只是保存到缓存并没有真正写入到磁盘。虽然他的持久性保证不强,但它我们简单的任务队列已经足够用了。如果您需要更强的保证,那么您可以使用Publisher Comfirms。

6、公平调度

   你可能已经注意到,调度仍然没有像我们期望的那样的工作。例如,在两个Workers的情况下,当所有的奇数消息是沉重的,甚至消息是轻的,一个Worker忙个不停,而另一个Worker几乎没事可做。哎,RabbitMQ对上述情况一无所知,仍将消息均匀发送。

   发生这种情况是因为当有消息进入队列的时候RabbitMQ才仅仅调度了消息。它根本不看【消费者】未确认消息的数量,它只是盲目的把第N个消息发送给第N个【消费者】。 

   为了避免上述情况的发生,我们可以使用prefetchcount = 1的设置来调用BasicQos方法。这个方法告诉RabbitMQ在同一时间不要发送多余一个消息的数据给某个【Worker】。或者,换句话说,当某个消息处理完毕,并且已经收到了消息确认之后,才可以继续发送消息给那个【Worker】。相反,它将把消息分配给给下一个不忙的【Worker】。

channel.BasicQos(0, 1, false);

   注意队列大小

   如果所有的工人都很忙,你的队列可以填满。你要留意这一点,也许会增加更多的【Worker】,或者有其他的策略。


7、把所有的代码放在一起

NewTask.cs类最终的代码是:

using System;

using RabbitMQ.Client;

using System.Text;


class NewTask

{

    public static void Main(string[] args)

    {

        var factory = new ConnectionFactory() { HostName = "localhost" };

        using(var connection = factory.CreateConnection())

        using(var channel = connection.CreateModel())

        {

            channel.QueueDeclare(queue: "task_queue",

                                 durable: true,

                                 exclusive: false,

                                 autoDelete: false,

                                 arguments: null);


            var message = GetMessage(args);

            var body = Encoding.UTF8.GetBytes(message);


            var properties = channel.CreateBasicProperties();

            properties.Persistent = true;


            channel.BasicPublish(exchange: "",

                                 routingKey: "task_queue",

                                 basicProperties: properties,

                                 body: body);

            Console.WriteLine(" [x] Sent {0}", message);

        }


        Console.WriteLine(" Press [enter] to exit.");

        Console.ReadLine();

    }


    private static string GetMessage(string[] args)

    {

        return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");

    }

}

Worker.cs完整源码如下

using System;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using System.Text;

using System.Threading;


class Worker

{

    public static void Main()

    {

        var factory = new ConnectionFactory() { HostName = "localhost" };

        using(var connection = factory.CreateConnection())

        using(var channel = connection.CreateModel())

        {

            channel.QueueDeclare(queue: "task_queue",

                                 durable: true,

                                 exclusive: false,

                                 autoDelete: false,

                                 arguments: null);


            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);


            Console.WriteLine(" [*] Waiting for messages.");


            var consumer = new EventingBasicConsumer(channel);

            consumer.Received += (model, ea) =>

            {

                var body = ea.Body;

                var message = Encoding.UTF8.GetString(body);

                Console.WriteLine(" [x] Received {0}", message);


                int dots = message.Split('.').Length - 1;

                Thread.Sleep(dots * 1000);


                Console.WriteLine(" [x] Done");


                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

            };

            channel.BasicConsume(queue: "task_queue",

                                 noAck: false,

                                 consumer: consumer);


            Console.WriteLine(" Press [enter] to exit.");

            Console.ReadLine();

        }

    }

}

使用消息确认和BasicQos方法可以建立一个工作队列。持久化的选项可以让我们的任务队列保持存活即使RabbitMQ重启。  好了,写完了,翻译的不好,大家见谅。  原文地址如下:http://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

相关文章:

  • RabbitMQ系列教程之一:我们从最简单的事情开始!Hello World

  • 如何优雅的使用RabbitMQ

  • .NET 使用 RabbitMQ 图文简介

  • RabbitMQ 高可用集群搭建及电商平台使用经验总结

  • 搭建高可用的rabbitmq集群 + Mirror Queue + 使用C#驱动连接

  • RabbitMQ消息队列应用

  • 体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景

原文地址:http://www.cnblogs.com/PatrickLiu/p/6943830.html


.NET社区新闻,深度好文,微信中搜索dotNET跨平台或扫描二维码关注

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/324909.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

单例 (Singleton)设计模式

所谓类的单例设计模式,就是采取一定的方法保证在整个的软件系统中,对某个类只能存在一个对象实例,并且该类只提供一个取得其对象实例的方法。如果我们要让类在一个虚拟机中只能产生一个对象,我们首先必须将类的构造器的访问权限设…

面试中经常会问的智力题,来看看你会做几道

转载自 面试中经常会问的智力题,来看看你会做几道 下面是大部分题目来自滴滴出行2017秋招题。开始头脑风暴吧~~~ 问题 question one 有50家人家,每家一条狗。有一天警察通知,50条狗当中有病狗,行为和正常狗不一样。每人只能通…

你喜欢什么样的课堂?

最近看了一本书《让课堂充满幽默》,里面有些内容还是觉得挺有道理的。书中开头就说:“大量的课件、复杂的网络,使得老师似乎成了信息管理员和媒体播放机,学生则成为了被灌输的对象”,看后,内心深处反问自己…

Servlet 中文乱码处理

1、为什么使用中文乱码 我们在实现登录时候 ,需要进行前台的数据。获得数据以后可能会出现中文乱码,那应该如何处理呢? 2、get方式和Post提交方式的区别 [1]get数据的传输是不安全的 ,post数据传递更加安全 [2]get方式数据传递有大…

洛谷P1346-电车【日常图论,最短路,SPFA】

题目 一个有向图,每个点有个默认方向和若干个其他方向,走默认方向权值为0,其他方向权值为1,求最短路 输入 3 2 1(3个点,点2到点1) 2 2 3(2个点,起点为1,2为默认点,3为…

main()方法

main()方法的使用说明: main()方法作为程序的入口main()方法也是一个普通的静态方法main()方法可以作为我们与控制台交互的方式。(之前:使用Scanner)

实现自己的.NET Core配置Provider之EF

《10分钟就能学会.NET Core配置》里详细介绍了.NET Core配置的用法,另外我还开源了自定义的配置Provider:EF配置Provider和Yaml配置Provider。本文先来聊聊EF配置Provider的实现,其中会涉及到EntityFramework Core的知识,不熟悉也…

今天的雪糕格外好吃!

赤日炎炎,室外的温度超过30℃。午休罢,教室里一片寂静,大家都有一个目的——等待老师的进来,继续上课。偶尔有几位同学貌似等待焦急,遂将目光瞥向窗外,若有所思。还有几位好动同学在自己的座位上左右摇动&a…

学习分布式不得不会的BASE理论

转载自 学习分布式不得不会的BASE理论 eBay的架构师Dan Pritchett源于对大规模分布式系统的实践总结,在ACM上发表文章提出BASE理论,BASE理论是对CAP理论的延伸,核心思想是即使无法做到强一致性(Strong Consistency,…

Servelt 中文乱码

1、为什么使用中文乱码 我们在实现登录时候 ,需要进行前台的数据。获得数据以后可能会出现中文乱码,那应该如何处理呢? 2、get方式和Post提交方式的区别 [1]get数据的传输是不安全的 ,post数据传递更加安全 [2]get方式数据传递有大…

洛谷P2296-寻找道路【日常图论,最短路,SPFA】

题目 一个有向图,要求满足要求的最短路径,要求为: 路径上的所有点的出边所指向的点都直接或间接与终点连通。 输入1 3 2 (3个点,2条边) 1 2 (1和2之间可以连接) 2 1 1 3 (从1到3) 输出1 -1 输入2 6 6 1 2 1 3 2 6 2 5 4 5 3…

对属性可以赋值的位置

对属性可以赋值的位置: ①默认初始化 ②显式初始化/⑤在代码块中赋值 ③构造器中初始化 ④有了对象以后,可以通过"对象.属性"或"对象.方法"的方式,进行赋值 执行的先后顺序:① - ② / ⑤ - ③ - ④ package …

活动 | Unity带你亲临王者荣耀KPL总决赛,领略电竞的魅力

《王者荣耀》是由腾讯出品的 Made with Unity的现象级游戏。在Unite Shanghai 2017中的案例专场,我们也曾邀请了《王者荣耀》项目组的技术总监邓君,分享过《王者技术修炼之路》。 从移动游戏到电子竞技,KPL(King Pro League)也是《王者荣耀》…

《此生未完成》痛句摘录(一)

System.out.print("今天开始继续读书摘录"); //不知道官方让不让我在博客里面记录 //如果不让的话我可能得转到别的上面记录 System.out.print("现在开始看《此生未完成》"); System.out.println("今天是第一天!");人应该把快乐建立在…

Servlet 登录时数据校验

包含JavaScript登陆时校验和服务器端的校验 JSP校验意义 客户端验证:使用JavaScript验证 好处:降低服务器端负担 不足: 不安全,可以跳过只能进行格式验证,无法进行业务验证(用户名是否存在) 服务…

Java的并发编程中的多线程问题到底是怎么回事儿?

转载自 Java的并发编程中的多线程问题到底是怎么回事儿? 在我之前的一篇《再有人问你Java内存模型是什么,就把这篇文章发给他。》文章中,介绍了Java内存模型,通过这篇文章,大家应该都知道了Java内存模型的概念以及作…

DDD理论学习系列(2)-- 领域

1. 引言 领域一词,主要有以下两个意思: 一国主权所达之地。学术思想或社会活动的范围。 不管是指国家的主权范围也好还是学术活动范围,都是在讲一个范围,一个界限。比如我们常说的,学术领域、思想领域、技术领域、语…

洛谷P1462-通往奥格玛瑞的路【日常图论,二分查找,最短路,SPAF】

前言 这道题感谢朋友的帮助,这里是他的博客地址: http://blog.csdn.net/sugar_free_mint 题目 一个无向图,每个点和边都有一定的权值,要求从点1到点2在经过边的权值小于b的情况下经过点的最大权值尽量小 输入 4 4 8(4个点,4条…

《此生未完成》痛句摘录(2)

System.out.print("今天开始继续读书摘录"); //不知道官方让不让我在博客里面记录 //如果不让的话我可能得转到别的上面记录 System.out.print("现在开始看《此生未完成》"); System.out.println("今天是第二天!");他说他已经不再是二…

java正则表达式中的坑String.matches(regex)、Pattern.matches(regex, str)和Matcher.matches()

问题:程序会计算表达式的值 //将数值转换以K为单位 String value "10*1000*1000"; String regex"\\s*\\*\\s*1000\\s*"; boolean isMatch value .matches(regex); if(isMatch){value value.replaceFirst(regex,""); }else{String…