【RabbitMQ】 WorkQueues

消息分发

在【RabbitMQ】 HelloWorld中我们写了发送/接收消息的程序。这次我们将创建一个Work Queue用来在多个消费者之间分配耗时任务。

Work Queues(又称为:Task Queues)的主要思想是:尽可能的减少执行资源密集型任务时的等待时间。我们将任务封装为消息并发送到队列,在后台的工作进程将弹出任务并进行作业。当你运行很多worker,任务将在他们之间共享。

这个概念在WEB应用中尤为有效,因为在一个HTTP请求进行复杂操作是不可能的。

准备

在上一节我们发送了一条包含“Hello World”的消息。现在我们将要发送代表复杂任务的字符串。我们没有真实场景的复杂任务,例如调整图片大小或呈现PDF文件,让我们假装自己很忙 - 通过Thread.sleep()。我们将根据字符串中“.”的数量来衡量任务复杂度;每一个“.”增加1秒钟的工作时间。例如:一个“Hello...”将消耗3秒钟。

稍微修改下上一节中Send.java的代码,让我们可以从命令行参数中输入任意字符作为消息。这个程序将给我们的工作队列安排消息,命名为NewTask.java

String message = getMessage(argv);channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

一些封装方法来帮助我们从命令行参数中得到消息(简单来说就是将所有的命令行参数当做一条完整消息):

private static String getMessage(String[] strings){if (strings.length < 1)return "Hello World!";return joinStrings(strings, " ");
}private static String joinStrings(String[] strings, String delimiter) {int length = strings.length;if (length == 0) return "";StringBuilder words = new StringBuilder(strings[0]);for (int i = 1; i < length; i++) {words.append(delimiter).append(strings[i]);}return words.toString();
}

老的Recv.java程序也需要一些修改:他需要为消息中的每一个“.”伪造1秒钟的工作时间。称为Worker.java

final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");}}
};
boolean autoAck = true; // acknowledgment is covered below  消息确认,在后面会详细讲解
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

模拟任务执行:

private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}
}

循环调度

使用Task Queue的优点之一就是可以轻松的进行并行工作。如果我们正在构建一个积压的工作,我们可以仅仅通过添加更多的workers来解决。

首先,同时运行两个worker实例,他们都会从队列中得到消息,但事实上是什么样的呢?让我们看一看:

在IDEA中运行两次Worker.java,然后他们两个都会处于等待消息状态。运行NewTask.java,并携带命令行参数,可以在Edit Configurations中设置Program arguements。下面为官方教程中的命令行版本:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

主要观察两个worker的输出:

worker1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'First message.'[x] Received 'Third message...'[x] Received 'Fifth message.....'
worker2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'Second message..'[x] Received 'Fourth message....'

默认的,RabbitMQ将会按照顺序,以此发送每一条消息到每一个消费者。平均每个消费者是可以获得相同数量的消息的。这种分发消息的方式称为循环。

消息确认

 完成一个任务需要消耗一定时间,你可能想知道如果一个消费者开始了一个很长的任务,在仅仅完成了一部分的时候,死掉了,将会发生什么。在我们当前的代码中,一旦RabbitMQ分发一条消息给消费者,立即就会将该条消息从内存中删除。这种情况下,如果你杀掉一个worker,我们将会丢失它正在操作的消息。我们也会失去所有分发给他的还未处理的消息。

但是我们不想丢失任何消息。如果worker死掉,我们期望这个任务被重新分发给另一个worker。

为了确保消息从来没有丢失,RabbitMQ支持消息确认(acknowledgments)。一个确认是从消费者处发送以告诉RabbitMQ指定的消息收到了,处理完成了,RabbitMQ可以删除它了。

如果一个消费者宕机(channel关闭,connection关闭,TCP连接丢失等),没有发送ack,RabbitMQ将会知道这条消息没有处理完成,将会重新排队。如果此时存在其它消费者,将会迅速转发给其它消费者。这样你就可以确保消息不会丢失,即使进程偶尔宕机。

这里不存在消息超时,RabbitMQ在消费者宕机后会重发消息。即使处理数据用了很长很长的时间这也是没有问题的。

默认的消息确认是被打开的。上面的例子中我们通过autoAck=true明确关闭了它。下面我们打开它,每当处理完一个任务,就发送回一个适当的确认消息。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)  每次接收一个未处理消息final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(envelope.getDeliveryTag(), false);}}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

使用现在的代码,我们可以保证即使在操作消息的时候通过CTRL+C关闭了一个消费者,也不会丢失消息。不久后,所有未处理完成的消息都会被重新发送。

Forgotten acknowledgment

忘记设置basicAck是很普通的事情,但是结果却很严重。当客户端退出(这可能听起来像随机分发)消息会被重新发送,但是RabbitMQ会吃掉越来越多的内容,因为它不会释放任何没有被确认的消息。

调试这种错误的使用rabbitmqctl来打印messages_unacknowledged的部分:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

消息持久化

我们学习了如何在消费者宕机的情况下保证数据不丢失。但是在RabbitMQ服务器宕机的情况下,数据依然是会丢失的。

当RabbitMQ退出或崩溃,它会忘记所有的队列和消息,除非你告诉它不要。两件事情来确保消息未丢失:我们需要标记队列和消息为持久化的。

首先,我们需要确保RabbitMQ从来不会丢失队列。因此我们需要声明队列为持久化的:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

这行代码是没有问题的,但是在我们的环境下是错误的。这是因为我们已经定义了一个叫做hello的非持久化队列。RabbitMQ不允许重新定义已经存在的队列(使用不用参数)。这里有一个快速的方法 - 定义一个不同名字的队列,如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

这个queueDeclare需要同时更改生产者和消费者的代码。

现在我们确保了task_queue在RabbitMQ重启的状态下也不会丢失。现在我们需要去标记我们的消息为持久化的 - 通过设置MessageProperties(实现了BasicProperties)的常量值:PERSISTENT_TEXT_PLAIN。

import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

注意消息持久化:

标记了消息为持久化也不能完全保证消息不会丢失。尽管告诉了RabbitMQ将消息保存在磁盘中,RabbitMQ刚刚接收数据,还没有保存的时候,这个时间区间是无法持久化的。同事,RabbitMQ没有对每条消息都进行fsync(2) -- 也许仅仅保存在缓存中并没有真正写入硬盘。持久化并不健壮,但是对于处理简单的任务队列已经足够了。如果你需要更加强健的保证可以使用publisher confirms

公平分发

你可能注意到有时候分发还是无法解决我们的某些问题。例如在某种情况下,有两个消费者,当所有的奇数消息非常大,偶数消息很小,一个消费者将会持续不断的工作,另一个消费者基本不工作。但是RabbitMQ并不知道这种情况,依然是依次分发。

这是因为RabbitMQ在消息进入队列是进行分发。并不探查消息的数量。仅仅是发送第n条消息给第n个消费者。

为了解决这个问题,我们可以使用basicQos方法,设置参数为prefetchCount = 1。这会告诉RabbitMQ每次只给一个消费者一条消息。或者说,不要在消费者正在处理和确认消息的时候发送新的消息给他们。相反,它将分发消息给下一个不忙的消费者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意队列的大小

如果所有的消费者都处于繁忙状态,队列会填满。可以添加更多的消费者或者其它方案。

Putting it all together

NewTask.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);String message = getMessage(argv);channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}private static String getMessage(String[] strings) {if (strings.length < 1)return "Hello World!";return joinStrings(strings, " ");}private static String joinStrings(String[] strings, String delimiter) {int length = strings.length;if (length == 0) return "";StringBuilder words = new StringBuilder(strings[0]);for (int i = 1; i < length; i++) {words.append(delimiter).append(strings[i]);}return words.toString();}
}

Worker.java

import com.rabbitmq.client.*;import java.io.IOException;public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(TASK_QUEUE_NAME, false, consumer);}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}}
}

 

转载于:https://www.cnblogs.com/shiyu404/p/6251773.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/394240.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python matplotlib库安装出错_使用pip install Matplotlib时出现内存错误

我使用的是Python2.7&#xff0c;如果我试图安装Matplotlib&#xff0c;如果我使用“pip install Matplotlib”&#xff0c;就会出现这个错误Exception:Traceback (most recent call last):File "/usr/local/lib/python2.7/dist-packages/pip/basecommand.py", line …

笑看职场什么程序员才抢手,什么样的程序员涨薪多?

​程序员&#xff0c;怎么才算合格&#xff0c;不好说吧&#xff1b;他就像销售一样&#xff0c;一名销售员&#xff0c;比如网络销售卖茶叶&#xff0c;他卖茶叶很厉害呀&#xff0c;可是你让他去销售房地产&#xff0c;就算他有点销售的基础&#xff0c;也要重新去学怎么销售…

Android画布Canvas裁剪clipRect,Kotlin

Android画布Canvas裁剪clipRect&#xff0c;Kotlin private fun mydraw() {val originBmp BitmapFactory.decodeResource(resources, R.mipmap.pic).copy(Bitmap.Config.ARGB_8888, true)val newBmp Bitmap.createBitmap(originBmp.width, originBmp.height, Bitmap.Config.A…

调查|73%的公司正使用存在漏洞的超期服役设备

本文讲的是调查&#xff5c;73%的公司正使用存在漏洞的超期服役设备&#xff0c;一份新近的调查覆盖了北美350家机构的212000台思科设备。结果显示&#xff0c;73%的企业正在使用存在漏洞、超期服役的网络设备。该数字在上一年仅为60%。 Softchoice公司思科部门业务主管大卫魏格…

为什么要做稀疏编码_为什么我每天都要编码一年,所以我也学到了什么,以及如何做。...

为什么要做稀疏编码by Paul Rail由Paul Rail 为什么我每天都要编码一年&#xff0c;所以我也学到了什么&#xff0c;以及如何做。 (Why I coded every day for a year, what I learned, and how you can do it, too.) I was looking to switch careers. The world today is no…

深度装机大师一键重装_笔记本怎么重装系统?笔记本自己如何重装系统?

如何给笔记本重装系统呢?笔记本系统使用时间长了难免会运行缓慢&#xff0c;我们第一反应就是重装系统笔记本了。但是很多小白用户们就惆怅了&#xff0c;不知道笔记本怎么重装系统&#xff0c;怎么进行重装系统笔记本呢?首先&#xff0c;笔记本电脑可以重置系统&#xff0c;…

leetcode剑指 Offer 11. 旋转数组的最小数字(二分查找)

把一个数组最开始的若干个元素搬到数组的末尾&#xff0c;我们称之为数组的旋转。输入一个递增排序的数组的一个旋转&#xff0c;输出旋转数组的最小元素。例如&#xff0c;数组 [3,4,5,1,2] 为 [1,2,3,4,5] 的一个旋转&#xff0c;该数组的最小值为1。 示例 1&#xff1a; 输…

XMLHttpRequest状态码及相关事件

1.创建一个XMLHttpRequest对象 2.对XMLHttpRequest对象进行事件的监听(定义监听事件的位置不影响 3.对XMLHttpRequest对象的状态码 状态 名称描述0Uninitialized初始化状态。XMLHttpRequest 对象已创建或已被 abort() 方法重置1Open open() 方法已调用&#xff0c;但是 send()…

-code vs 1474 十进制转m进制

1474 十进制转m进制 时间限制: 1 s空间限制: 128000 KB题目等级 : 白银 Silver题解查看运行结果题目描述 Description将十进制数n转换成m进制数 m<16 n<100 输入描述 Input Description共一行 n和m 输出描述 Output Description共一个数 表示n的m进制 样例输入 Sample In…

人工智能时代号角已吹响 COMPUTEX如何凝聚AI这股力量?

当前谈到人工智能&#xff08;AI&#xff09;&#xff0c;或许大家最直接的反应是Google的AlphaGo&#xff0c;但比起“遥不可及”的围棋机器人&#xff0c;其实AI早就融入人们生活&#xff0c;就像苹果手机中的语音助手Siri&#xff0c;如此“平易近人”。从自动驾驶、机器人、…

python写入文字到txt只写入最后一行_python文件写入:向txt写入内容的设置

创建文本流的最简单方法是使用 open(),可以选择指定编码: f=open("myfile.txt","r",encoding="utf-8") 但是更为安全的方法是: with open("myfile.txt","w",encoding="utf-8") as f: f.write(str) 还可以设置…

python自带ide和pycharm哪个好_排名前三的Python IDE你选择哪个?我选PyCharm

世界上最好的 Python 编辑器或 IDE 是什么&#xff1f;炫酷的界面、流畅的体验&#xff0c;我们投 PyCharm一票&#xff0c;那么你呢&#xff1f;编辑Python程序&#xff0c;您有许多选项。有些人仍然喜欢一个基本的文本编辑器&#xff0c;如Emacs&#xff0c;VIM或Gedit&#…

leetcode1254. 统计封闭岛屿的数目(dfs)

有一个二维矩阵 grid &#xff0c;每个位置要么是陆地&#xff08;记号为 0 &#xff09;要么是水域&#xff08;记号为 1 &#xff09;。 我们从一块陆地出发&#xff0c;每次可以往上下左右 4 个方向相邻区域走&#xff0c;能走到的所有陆地区域&#xff0c;我们将其称为一座…

Dash的快速入门将使您在5分钟内进入“ Hello World”

by Anuj Pahade由Anuj Pahade Dash的快速入门将使您在5分钟内进入“ Hello World” (This quick intro to Dash will get you to “Hello World” in under 5 minutes) Dash is an open source library for creating reactive apps in Python. You can create amazing dashboa…

JSON/xml

JSON是什么&#xff1a; JSON(JavaScriptObject Notation, JS 对象简谱) 是一种轻量级的数据交换格式。它基于ECMAScript(欧洲计算机协会制定的js规范)的一个子集&#xff0c;采用完全独立于编程语言的文本格式来存储和表示数据。简洁和清晰的层次结构使得 JSON 成为理想的数据…

unity开宝箱动画_[技术博客]Unity3d 动画控制

在制作游戏时&#xff0c;导入的箱子模型本身自带动画。然而&#xff0c;它的动画是一个从打开到关闭的完整过程&#xff0c;并且没有给出控制打开关闭的方法。最直接的想法是对该动画进行拆分&#xff0c;再封装成不同的动画状态&#xff0c;但是不巧的是&#xff0c;这个动画…

php上传大文件时,服务器端php.ini文件中需要额外修改的选项

几个修改点&#xff1a; 1、upload_max_filesize 上传的最大文件 2、post_max_size 上传的最大文件 3、max_execution_time 修改为0表示无超时&#xff0c;一直等待 4、max_input_time 参考网址&#xff1a; 在php.ini中把max_input_time 和 max_execution_time设置得特别长…

《中国人工智能学会通讯》——11.21 结束语

11.21 结束语 本文针对交通流的网络性、异质性和动态性特点&#xff0c;结合当前多任务学习方法的不足提出了相应的解决方案。然而&#xff0c;在实际的应用场景中还存在更多的挑战&#xff0c;需要进一步深入的研究方向包括&#xff1a;① 高维任务的共同学习方法。在高维数据…

如何把一个软件嵌入另一个软件_自动化正在成为一个“软件”行业

摘要在智能制造时代&#xff0c;自动化行业正在成为一个软件行业&#xff0c;它正在改变着整个产业的未来&#xff0c;也将为制造业带来更为广阔的空间。自动化正在成为一个“软件”行业&#xff0c;在智能时代&#xff0c;软件正在成为自动化行业竞争的关键。自动化已然成为软…

leetcode1020. 飞地的数量(dfs)

给出一个二维数组 A&#xff0c;每个单元格为 0&#xff08;代表海&#xff09;或 1&#xff08;代表陆地&#xff09;。 移动是指在陆地上从一个地方走到另一个地方&#xff08;朝四个方向之一&#xff09;或离开网格的边界。 返回网格中无法在任意次数的移动中离开网格边界…