【RabbitMQ】 WorkQueues

消息分发

在【RabbitMQ】 HelloWorld中我们写了发送/接收消息的程序。这次我们将创建一个Work Queue用来在多个消费者之间分配耗时任务。

Work Queues(又称为:Task Queues)的主要思想是:尽可能的减少执行资源密集型任务时的等待时间。我们将任务封装为消息并发送到队列,在后台的工作进程将弹出任务并进行作业。当你运行很多worker,任务将在他们之间共享。

这个概念在WEB应用中尤为有效,因为在一个HTTP请求进行复杂操作是不可能的。

准备

在上一节我们发送了一条包含“Hello World”的消息。现在我们将要发送代表复杂任务的字符串。我们没有真实场景的复杂任务,例如调整图片大小或呈现PDF文件,让我们假装自己很忙 - 通过Thread.sleep()。我们将根据字符串中“.”的数量来衡量任务复杂度;每一个“.”增加1秒钟的工作时间。例如:一个“Hello...”将消耗3秒钟。

稍微修改下上一节中Send.java的代码,让我们可以从命令行参数中输入任意字符作为消息。这个程序将给我们的工作队列安排消息,命名为NewTask.java

String message = getMessage(argv);channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

一些封装方法来帮助我们从命令行参数中得到消息(简单来说就是将所有的命令行参数当做一条完整消息):

private static String getMessage(String[] strings){if (strings.length < 1)return "Hello World!";return joinStrings(strings, " ");
}private static String joinStrings(String[] strings, String delimiter) {int length = strings.length;if (length == 0) return "";StringBuilder words = new StringBuilder(strings[0]);for (int i = 1; i < length; i++) {words.append(delimiter).append(strings[i]);}return words.toString();
}

老的Recv.java程序也需要一些修改:他需要为消息中的每一个“.”伪造1秒钟的工作时间。称为Worker.java

final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");}}
};
boolean autoAck = true; // acknowledgment is covered below  消息确认,在后面会详细讲解
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

模拟任务执行:

private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}
}

循环调度

使用Task Queue的优点之一就是可以轻松的进行并行工作。如果我们正在构建一个积压的工作,我们可以仅仅通过添加更多的workers来解决。

首先,同时运行两个worker实例,他们都会从队列中得到消息,但事实上是什么样的呢?让我们看一看:

在IDEA中运行两次Worker.java,然后他们两个都会处于等待消息状态。运行NewTask.java,并携带命令行参数,可以在Edit Configurations中设置Program arguements。下面为官方教程中的命令行版本:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

主要观察两个worker的输出:

worker1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'First message.'[x] Received 'Third message...'[x] Received 'Fifth message.....'
worker2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'Second message..'[x] Received 'Fourth message....'

默认的,RabbitMQ将会按照顺序,以此发送每一条消息到每一个消费者。平均每个消费者是可以获得相同数量的消息的。这种分发消息的方式称为循环。

消息确认

 完成一个任务需要消耗一定时间,你可能想知道如果一个消费者开始了一个很长的任务,在仅仅完成了一部分的时候,死掉了,将会发生什么。在我们当前的代码中,一旦RabbitMQ分发一条消息给消费者,立即就会将该条消息从内存中删除。这种情况下,如果你杀掉一个worker,我们将会丢失它正在操作的消息。我们也会失去所有分发给他的还未处理的消息。

但是我们不想丢失任何消息。如果worker死掉,我们期望这个任务被重新分发给另一个worker。

为了确保消息从来没有丢失,RabbitMQ支持消息确认(acknowledgments)。一个确认是从消费者处发送以告诉RabbitMQ指定的消息收到了,处理完成了,RabbitMQ可以删除它了。

如果一个消费者宕机(channel关闭,connection关闭,TCP连接丢失等),没有发送ack,RabbitMQ将会知道这条消息没有处理完成,将会重新排队。如果此时存在其它消费者,将会迅速转发给其它消费者。这样你就可以确保消息不会丢失,即使进程偶尔宕机。

这里不存在消息超时,RabbitMQ在消费者宕机后会重发消息。即使处理数据用了很长很长的时间这也是没有问题的。

默认的消息确认是被打开的。上面的例子中我们通过autoAck=true明确关闭了它。下面我们打开它,每当处理完一个任务,就发送回一个适当的确认消息。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)  每次接收一个未处理消息final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(envelope.getDeliveryTag(), false);}}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

使用现在的代码,我们可以保证即使在操作消息的时候通过CTRL+C关闭了一个消费者,也不会丢失消息。不久后,所有未处理完成的消息都会被重新发送。

Forgotten acknowledgment

忘记设置basicAck是很普通的事情,但是结果却很严重。当客户端退出(这可能听起来像随机分发)消息会被重新发送,但是RabbitMQ会吃掉越来越多的内容,因为它不会释放任何没有被确认的消息。

调试这种错误的使用rabbitmqctl来打印messages_unacknowledged的部分:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

消息持久化

我们学习了如何在消费者宕机的情况下保证数据不丢失。但是在RabbitMQ服务器宕机的情况下,数据依然是会丢失的。

当RabbitMQ退出或崩溃,它会忘记所有的队列和消息,除非你告诉它不要。两件事情来确保消息未丢失:我们需要标记队列和消息为持久化的。

首先,我们需要确保RabbitMQ从来不会丢失队列。因此我们需要声明队列为持久化的:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

这行代码是没有问题的,但是在我们的环境下是错误的。这是因为我们已经定义了一个叫做hello的非持久化队列。RabbitMQ不允许重新定义已经存在的队列(使用不用参数)。这里有一个快速的方法 - 定义一个不同名字的队列,如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

这个queueDeclare需要同时更改生产者和消费者的代码。

现在我们确保了task_queue在RabbitMQ重启的状态下也不会丢失。现在我们需要去标记我们的消息为持久化的 - 通过设置MessageProperties(实现了BasicProperties)的常量值:PERSISTENT_TEXT_PLAIN。

import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

注意消息持久化:

标记了消息为持久化也不能完全保证消息不会丢失。尽管告诉了RabbitMQ将消息保存在磁盘中,RabbitMQ刚刚接收数据,还没有保存的时候,这个时间区间是无法持久化的。同事,RabbitMQ没有对每条消息都进行fsync(2) -- 也许仅仅保存在缓存中并没有真正写入硬盘。持久化并不健壮,但是对于处理简单的任务队列已经足够了。如果你需要更加强健的保证可以使用publisher confirms

公平分发

你可能注意到有时候分发还是无法解决我们的某些问题。例如在某种情况下,有两个消费者,当所有的奇数消息非常大,偶数消息很小,一个消费者将会持续不断的工作,另一个消费者基本不工作。但是RabbitMQ并不知道这种情况,依然是依次分发。

这是因为RabbitMQ在消息进入队列是进行分发。并不探查消息的数量。仅仅是发送第n条消息给第n个消费者。

为了解决这个问题,我们可以使用basicQos方法,设置参数为prefetchCount = 1。这会告诉RabbitMQ每次只给一个消费者一条消息。或者说,不要在消费者正在处理和确认消息的时候发送新的消息给他们。相反,它将分发消息给下一个不忙的消费者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意队列的大小

如果所有的消费者都处于繁忙状态,队列会填满。可以添加更多的消费者或者其它方案。

Putting it all together

NewTask.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);String message = getMessage(argv);channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}private static String getMessage(String[] strings) {if (strings.length < 1)return "Hello World!";return joinStrings(strings, " ");}private static String joinStrings(String[] strings, String delimiter) {int length = strings.length;if (length == 0) return "";StringBuilder words = new StringBuilder(strings[0]);for (int i = 1; i < length; i++) {words.append(delimiter).append(strings[i]);}return words.toString();}
}

Worker.java

import com.rabbitmq.client.*;import java.io.IOException;public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(TASK_QUEUE_NAME, false, consumer);}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}}
}

 

转载于:https://www.cnblogs.com/shiyu404/p/6251773.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/394240.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

笑看职场什么程序员才抢手,什么样的程序员涨薪多?

​程序员&#xff0c;怎么才算合格&#xff0c;不好说吧&#xff1b;他就像销售一样&#xff0c;一名销售员&#xff0c;比如网络销售卖茶叶&#xff0c;他卖茶叶很厉害呀&#xff0c;可是你让他去销售房地产&#xff0c;就算他有点销售的基础&#xff0c;也要重新去学怎么销售…

Android画布Canvas裁剪clipRect,Kotlin

Android画布Canvas裁剪clipRect&#xff0c;Kotlin private fun mydraw() {val originBmp BitmapFactory.decodeResource(resources, R.mipmap.pic).copy(Bitmap.Config.ARGB_8888, true)val newBmp Bitmap.createBitmap(originBmp.width, originBmp.height, Bitmap.Config.A…

调查|73%的公司正使用存在漏洞的超期服役设备

本文讲的是调查&#xff5c;73%的公司正使用存在漏洞的超期服役设备&#xff0c;一份新近的调查覆盖了北美350家机构的212000台思科设备。结果显示&#xff0c;73%的企业正在使用存在漏洞、超期服役的网络设备。该数字在上一年仅为60%。 Softchoice公司思科部门业务主管大卫魏格…

深度装机大师一键重装_笔记本怎么重装系统?笔记本自己如何重装系统?

如何给笔记本重装系统呢?笔记本系统使用时间长了难免会运行缓慢&#xff0c;我们第一反应就是重装系统笔记本了。但是很多小白用户们就惆怅了&#xff0c;不知道笔记本怎么重装系统&#xff0c;怎么进行重装系统笔记本呢?首先&#xff0c;笔记本电脑可以重置系统&#xff0c;…

XMLHttpRequest状态码及相关事件

1.创建一个XMLHttpRequest对象 2.对XMLHttpRequest对象进行事件的监听(定义监听事件的位置不影响 3.对XMLHttpRequest对象的状态码 状态 名称描述0Uninitialized初始化状态。XMLHttpRequest 对象已创建或已被 abort() 方法重置1Open open() 方法已调用&#xff0c;但是 send()…

人工智能时代号角已吹响 COMPUTEX如何凝聚AI这股力量?

当前谈到人工智能&#xff08;AI&#xff09;&#xff0c;或许大家最直接的反应是Google的AlphaGo&#xff0c;但比起“遥不可及”的围棋机器人&#xff0c;其实AI早就融入人们生活&#xff0c;就像苹果手机中的语音助手Siri&#xff0c;如此“平易近人”。从自动驾驶、机器人、…

unity开宝箱动画_[技术博客]Unity3d 动画控制

在制作游戏时&#xff0c;导入的箱子模型本身自带动画。然而&#xff0c;它的动画是一个从打开到关闭的完整过程&#xff0c;并且没有给出控制打开关闭的方法。最直接的想法是对该动画进行拆分&#xff0c;再封装成不同的动画状态&#xff0c;但是不巧的是&#xff0c;这个动画…

如何把一个软件嵌入另一个软件_自动化正在成为一个“软件”行业

摘要在智能制造时代&#xff0c;自动化行业正在成为一个软件行业&#xff0c;它正在改变着整个产业的未来&#xff0c;也将为制造业带来更为广阔的空间。自动化正在成为一个“软件”行业&#xff0c;在智能时代&#xff0c;软件正在成为自动化行业竞争的关键。自动化已然成为软…

python怎么显示求余的除数_Python算术运算符及用法详解

算术运算符也即数学运算符&#xff0c;用来对数字进行数学运算&#xff0c;比如加减乘除。下表列出了 Python 支持所有基本算术运算符。表 1 Python 常用算术运算符运算符说明实例结果加12.45 1527.45-减4.56 - 0.264.3*乘5 * 3.618.0/除法(和数学中的规则一样)7 / 23.5//整除…

HTML td 标签的 colspan 属性

表格单元横跨两列的表格&#xff1a; <table border"1"><tr><th>Month</th><th>Savings</th></tr><tr><td colspan"2">January</td></tr><tr><td colspan"2">Fe…

Kotlin的Lambda表达式以及它们怎样简化Android开发(KAD 07)

作者&#xff1a;Antonio Leiva 时间&#xff1a;Jan 5, 2017 原文链接&#xff1a;https://antonioleiva.com/lambdas-kotlin/ 由于Lambda表达式允许更简单的方式建模式函数&#xff0c;所以它是Kotlin和任何其他现代开发语言的最强工具之一。 在Java6中&#xff0c;我们仅能下…

Pyhon进阶9---类的继承

类的继承 基本概念 定义 格式如下 继承中的访问控制 class Animal:__CNOUT 0HEIGHT 0def __init__(self,age,weight,height):self.__CNOUT self.__CNOUT 1self.age ageself.__weight weightself.HEIGHT heightdef eat(self):print({} eat.format(self.__class__.__name__…

quartz教程二

转载于:https://www.cnblogs.com/mumian2/p/10729901.html

python把图片转为字符画_Python 实现图片转换为字符画

主要使用 pillow如果没有安装 使用 pillow install pillow 安装一下看代码&#xff1a;from PIL import Imageimport argparse#字符画所用的字符集ascii_char list("$B%8&WM#*oahkbdpqwmZO0QLCJUYXzcvunxrjft/\|()1{}[]?-_~<>i!lI;:,\"^. ")def get…

76. Minimum Window Substring

最后更新 一刷 08-Jan-2017 昨天Amazon group面结束&#xff0c;刚回家。 国内以前喜欢的女生结婚了&#xff0c;嘿嘿...好开心呀~~ 这次面试感觉自己的做法完爆别人&#xff0c;比什么2 greedy好多了 总之表现比想象的好&#xff0c;最后一面的面试官真是聪明得一逼&#xff…

mysql浅拷贝_深拷贝与浅拷贝

在Python中&#xff0c;对象赋值实际上是对象的引用。当创建一个对象&#xff0c;然后把它赋给另一个变量的时候&#xff0c;Python并没有拷贝这个对象&#xff0c;而只是拷贝了这个对象的引用。1、浅拷贝&#xff1a;利用切片操作、工厂方法list方法拷贝2、深拷贝&#xff1a;…

iOS中的颜色

最近在改Bug的时候&#xff0c;才注意到iOS 中的颜色竟然也大有文章&#xff0c;特来记录一下。 先说一下问题&#xff0c;因为某界面中有用xib实现的一个view&#xff0c;而这个view 只在UIColletionView的layout 里通过nib 注册使用&#xff0c;为这个xib设置了背景色&#x…

多线程的基础知识

1、程序、进程、线程的基本概念 程序&#xff1a;为了完成某种任务用某一种语言编写的一组指令的集合就叫程序。程序就是一段静态的代码。 进程&#xff1a;进程是程序的依次执行过程&#xff0c;或者说是正在运行的一个程序。这是一个动态的过程&#xff0c;有它自身的产生运行…

springboot实现单点登录_什么是单点登录,php是如何实现单点登录的

文章来自&#xff1a;php中文网链接&#xff1a;https://www.php.cn/php-weizijiaocheng-429869.html作者&#xff1a;中文网商务合作:请加微信(QQ)&#xff1a;2230304070视频教程分享码农网&#xff1a;http://www.mano100.cn/rjyfk_url-url.html &#xff0c;升级终身会员即…

背景图处理,这是个好东西记录一下

背景图处理 rgba &#xff08;&#xff09;&#xff0c;前3个是三原色&#xff0c;第四个参数是透明度转载于:https://www.cnblogs.com/ChineseLiao/p/7479207.html