python2处理耗时任务_RabbitMQ Go客户端教程2——任务队列/工作队列

本文翻译自RabbitMQ官网的Go语言客户端系列教程,本文首发于我的个人博客:liwenzhou.com,教程共分为六篇,本文是第二篇——任务队列。

这些教程涵盖了使用RabbitMQ创建消息传递应用程序的基础知识。 你需要安装RabbitMQ服务器才能完成这些教程,请参阅安装指南或使用Docker镜像。 这些教程的代码是开源的,官方网站也是如此。

先决条件

本教程假设RabbitMQ已安装并运行在本机上的标准端口(5672)。如果你使用不同的主机、端口或凭据,则需要调整连接设置。

任务队列/工作队列

(使用Go RabbitMQ客户端)

4f5ffc7ef45a82f58f23d1ec6ddeffdf.png

第一个教程中,我们编写程序从命名的队列发送和接收消息。在这一节中,我们将创建一个工作队列,该队列将用于在多个工人之间分配耗时的任务。

工作队列(又称任务队列)的主要思想是避免立即执行某些资源密集型任务并且不得不等待这些任务完成。相反,我们安排任务异步地同时或在当前任务之后完成。我们将任务封装为消息并将其发送到队列,在后台运行的工作进程将取出消息并最终执行任务。当你运行多个工作进程时,任务将在他们之间共享。

这个概念在Web应用中特别有用,因为在Web应用中不可能在较短的HTTP请求窗口内处理复杂的任务,(译注:例如注册时发送邮件或短信验证码等场景)。

准备工作

在本教程的上一部分,我们发送了一条包含“ Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有实际的任务,例如调整图像大小或渲染pdf文件,所以我们通过借助time.Sleep函数模拟一些比较耗时的任务。我们会将一些包含.的字符串封装为消息发送到队列中,其中每有一个.就表示需要耗费1秒钟的工作,例如,hello...表示一个将花费三秒钟的假任务。

我们将稍微修改上一个示例中的send.go代码,以允许从命令行发送任意消息。该程序会将任务安排到我们的工作队列中,因此我们将其命名为new_task.go

body := bodyFrom(os.Args)  // 从参数中获取要发送的消息正文
err = ch.Publish("",           // exchangeq.Name,       // routing keyfalse,        // mandatoryfalse,amqp.Publishing {DeliveryMode: amqp.Persistent,ContentType:  "text/plain",Body:         []byte(body),})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

下面是bodyFrom函数:

func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s
}

我们以前的receive.go程序也需要进行一些更改:它需要为消息正文中出现的每个.伪造一秒钟的工作。它将从队列中弹出消息并执行任务,因此我们将其称为worker.go

msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args
)
failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dot_count := bytes.Count(d.Body, []byte("."))  // 数一下有几个.t := time.Duration(dot_count)time.Sleep(t * time.Second)  // 模拟耗时的任务log.Printf("Done")}
}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

请注意,我们的假任务模拟执行时间。

然后,我们就可以打开两个终端,分别执行new_task.goworker.go了。

# shell 1
go run worker.go# shell 2
go run new_task.go

循环调度

使用任务队列的优点之一是能够轻松并行化工作。如果我们的工作正在积压,我们可以增加更多的工人,这样就可以轻松扩展。

首先,让我们尝试同时运行两个worker.go脚本。它们都将从队列中获取消息,但是究竟是怎样呢?让我们来看看。

你需要打开三个控制台。其中两个将运行worker.go脚本。这些控制台将成为我们的两个消费者——C1和C2。

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们将发布新任务。启动消费者之后,你可以发布一些消息:

# shell 3
go run new_task.go msg1.
go run new_task.go msg2..
go run new_task.go msg3...
go run new_task.go msg4....
go run new_task.go msg5.....

然后我们在shell1shell2 两个窗口看到如下输出结果了:

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg1.
# => [x] Received a message: msg3...
# => [x] Received a message: msg5.....# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg2..
# => [x] Received a message: msg4....

默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。使用三个或者更多worker试一下。

消息确认

work 完成任务可能需要耗费几秒钟,如果一个worker在任务执行过程中宕机了该怎么办呢?我们当前的代码中,RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。在这种情况下,如果你终止一个worker那么你就可能会丢失这个任务,我们还将丢失所有已经交付给这个worker的尚未处理的消息。

我们不想丢失任何任务,如果一个worker意外宕机了,那么我们希望将任务交付给其他worker来处理。

为了确保消息永不丢失,RabbitMQ支持 href="https://www.rabbitmq.com/confirms.html">消息确认。消费者发送回一个确认(acknowledgement),以告知RabbitMQ已经接收,处理了特定的消息,并且RabbitMQ可以自由删除它。

如果使用者在不发送确认的情况下死亡(其通道已关闭,连接已关闭或TCP连接丢失),RabbitMQ将了解消息未完全处理,并将对其重新排队。如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。这样,您可以确保即使工人偶尔死亡也不会丢失任何消息。

没有任何消息超时;RabbitMQ将在消费者死亡时重新传递消息。即使处理一条消息需要很长时间也没关系。

在本教程中,我们将使用手动消息确认,方法是为“auto-ack”参数传递一个false,然后在完成任务后,使用d.Ack(false)worker发送一个正确的确认(这将确认一次传递)。

msgs, err := ch.Consume(q.Name, // queue"",     // consumerfalse,  // 注意这里传false,关闭自动消息确认false,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args
)
if err != nil {fmt.Printf("ch.Consume failed, err:%vn", err)return
}// 开启循环不断地消费消息
forever := make(chan bool)
go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dotCount := bytes.Count(d.Body, []byte("."))t := time.Duration(dotCount)time.Sleep(t * time.Second)log.Printf("Done")d.Ack(false) // 手动传递消息确认}
}()

使用这段代码,我们可以确保即使你在处理消息时使用CTRL+C杀死一个worker,也不会丢失任何内容。在worker死后不久,所有未确认的消息都将被重新发送。

消息确认必须在接收消息的同一通道(Channel)上发送。尝试使用不同的通道(Channel)进行消息确认将导致通道级协议异常。有关更多信息,请参阅确认的文档指南

忘记确认
忘记确认是一个常见的错误。这是一个简单的错误,但后果是严重的。当你的客户机退出时,消息将被重新传递(这看起来像随机重新传递),但是RabbitMQ将消耗越来越多的内存,因为它无法释放任何未确认的消息。
为了调试这种错误,可以使用rabbitmqctl打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows平台,去掉sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止运行,我们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。

首先,我们需要确保队列能够在RabbitMQ节点重新启动后继续运行。为此,我们需要声明它是持久的:

q, err := ch.QueueDeclare("hello", // nametrue,    // 声明为持久队列false,   // delete when unusedfalse,   // exclusivefalse,   // no-waitnil,     // arguments
)

虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许你使用不同的参数重新定义现有队列,并将向任何尝试重新定义的程序返回错误。但是有一个快速的解决方法——让我们声明一个具有不同名称的队列,例如task_queue

q, err := ch.QueueDeclare("task_queue", // nametrue,         // 声明为持久队列false,        // delete when unusedfalse,        // exclusivefalse,        // no-waitnil,          // arguments
)

这种持久的选项更改需要同时应用于生产者代码和消费者代码。

在这一点上,我们确信即使RabbitMQ重新启动,任务队列队列也不会丢失。现在我们需要将消息标记为持久的——通过使用amqp.Publishing中的持久性选项amqp.Persistent

err = ch.Publish("",     // exchangeq.Name, // routing keyfalse,  // 立即false,  // 强制amqp.Publishing{DeliveryMode: amqp.Persistent, // 持久(交付模式:瞬态/持久)ContentType:  "text/plain",Body:         []byte(body),})
有关消息持久性的说明
将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上,但是RabbitMQ接受了一条消息并且还没有保存它时,仍然有一个很短的时间窗口。而且,RabbitMQ并不是对每个消息都执行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但是对于我们的简单任务队列来说已经足够了。如果您需要更强有力的担保,那么您可以使用publisher confirms

公平分发

你可能已经注意到调度仍然不能完全按照我们的要求工作。例如,在一个有两个worker的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个worker将持续忙碌,而另一个worker几乎不做任何工作。嗯,RabbitMQ对此一无所知,仍然会均匀地发送消息。

这是因为RabbitMQ只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。

b2a5fe2cd4929feef1850ba19104891a.png

为了避免这种情况,我们可以将预取计数设置为1。这告诉RabbitMQ不要一次向一个worker发出多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向worker发送新消息。相反,它将把它发送给下一个不忙的worker

err = ch.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global
)
关于队列大小的说明
如果所有的worker都很忙,你的queue随时可能会满。你会想继续关注这一点,也许需要增加更多的worker,或者有一些其他的策略。

完整的代码示例

我们的new_task.go的最终代码代入如下:

package mainimport ("fmt""log""os""strings""github.com/streadway/amqp"
)func main() {// 1. 尝试连接RabbitMQ,建立连接// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Printf("connect to RabbitMQ failed, err:%vn", err)return}defer conn.Close()// 2. 接下来,我们创建一个通道,大多数API都是用过该通道操作的。ch, err := conn.Channel()if err != nil {fmt.Printf("open a channel failed, err:%vn", err)return}defer ch.Close()// 3. 要发送,我们必须声明要发送到的队列。q, err := ch.QueueDeclare("task_queue", // nametrue,         // 持久的false,        // delete when unusedfalse,        // 独有的false,        // no-waitnil,          // arguments)if err != nil {fmt.Printf("declare a queue failed, err:%vn", err)return}// 4. 然后我们可以将消息发布到声明的队列body := bodyFrom(os.Args)err = ch.Publish("",     // exchangeq.Name, // routing keyfalse,  // 立即false,  // 强制amqp.Publishing{DeliveryMode: amqp.Persistent, // 持久ContentType:  "text/plain",Body:         []byte(body),})if err != nil {fmt.Printf("publish a message failed, err:%vn", err)return}log.Printf(" [x] Sent %s", body)
}// bodyFrom 从命令行获取将要发送的消息内容
func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s
}

work.go内容如下:

package mainimport ("bytes""fmt""log""time""github.com/streadway/amqp"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Printf("connect to RabbitMQ failed, err:%vn", err)return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Printf("open a channel failed, err:%vn", err)return}defer ch.Close()// 声明一个queueq, err := ch.QueueDeclare("task_queue", // nametrue,         // 声明为持久队列false,        // delete when unusedfalse,        // exclusivefalse,        // no-waitnil,          // arguments)err = ch.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global)if err != nil {fmt.Printf("ch.Qos() failed, err:%vn", err)return}// 立即返回一个Delivery的通道msgs, err := ch.Consume(q.Name, // queue"",     // consumerfalse,  // 注意这里传false,关闭自动消息确认false,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)if err != nil {fmt.Printf("ch.Consume failed, err:%vn", err)return}// 开启循环不断地消费消息forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dotCount := bytes.Count(d.Body, []byte("."))t := time.Duration(dotCount)time.Sleep(t * time.Second)log.Printf("Done")d.Ack(false) // 手动传递消息确认}}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever
}

使用消息确认和预取计数,可以设置工作队列(work queue)。即使RabbitMQ重新启动,持久性选项也可以让任务继续存在。

有关amqp.Channel方法和消息属性的内容,可以浏览amqp API文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/459985.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用 CSS3 伪元素实现立体的照片堆叠效

CSS3 里引入的伪元素让 Web 开发人员能够在不需要额外添加 HTML 标签的情况下制作出复杂的视觉效果。例如&#xff0c;:before 和 :after 这个两个 CSS3 伪元素就可以帮助你实现很多有趣的效果。本教程将告诉你如何使用 CSS3 为元素创建一组漂亮的图片堆叠效果。 效果演示 …

启发式搜索A*算法

A* 寻路算法 (2011-02-15 10:53:11) 转载▼标签&#xff1a; 游戏 分类&#xff1a; 算法概述 虽然掌握了 A* 算法的人认为它容易&#xff0c;但是对于初学者来说&#xff0c; A* 算法还是很复杂的。 搜索区域(The Search Area) 我们假设某人要从 A 点移动到 B 点&#xff0c…

centos7全盘备份到本地_CentOS7下制作openssl1.1.1i RPM包并升级

点击上方"walkingcloud"关注&#xff0c;并选择"星标"公众号CentOS7下制作openssl1.1.1i RPM包并升级OpenSSL最新漏洞 OpenSSL官方发布了拒绝服务漏洞风险通告&#xff0c;漏洞编号为CVE-2020-1971漏洞详情OpenSSL是一个开放源代码的软件库包&#xff0c;应…

计算机网络基础专业找工作,2021计算机网络技术前景怎么样? 好找工作吗

很多同学想知道计算机网络技术前景怎么样&#xff0c;以下是一些相关信息的整理&#xff0c;希望能对同学们有所帮助&#xff01;计算机网络技术前景从目前的情况看&#xff0c;企业的IT技术管理岗位一般设置为企业信息主管、总监等&#xff1b;工程技术岗位设置为网络工程师、…

安装 VMware Tools

第一步&#xff1a;挂载VMware Tools 第二步&#xff1a; 将上图VMware Tools-9.2.0 XXXX.tar.gz 复制到tmp目录&#xff08;其他目录也行&#xff0c;没有权限操作需要 chmod 777 XXX 修改来获取权限&#xff09; 第三步&#xff1a;解压 tar xvzf VMwareTools-9.2.0-799703.t…

LINUX添加一块网卡地址配置及问题

如何如何网卡服务重启慢关掉service NetworkManager stopchkconfig NetworkManager off 关于NetworkManager外链&#xff1a;http://www.linuxidc.com/Linux/2013-08/88809.htm 查看cat /etc/udev/rules.d/70-persistent-net.rules是否网卡名有重复VMware虚拟机安装好CentOS6.…

[每日一题jQuery] jQuery选择器总结:进一步过滤、同级操作、后代操作

jQuery选择器继承自CSS的风格&#xff0c;可以通过jQuery选择器找出特定的DOM元素&#xff0c;在此基础上对该元素做相应处理。jQuery不仅支持简单的标签选择器、类选择器、id选择器&#xff0c;还针对表单状态、子元素、元素顺序提供相应的选择器。如果熟练运用&#xff0c;则…

计算机桌面备份在哪里,电脑备份文件在哪里

电脑是我们经常使用的工具&#xff0c;为了放在电脑出现系统问题&#xff0c;有的朋友会将电脑系统备份&#xff0c;但是不知道电脑备份文件放在什么地方了&#xff0c;电脑备份文件在哪里呢&#xff1f;很多朋友还是不知道的&#xff0c;所以针对电脑备份文件保存在哪里的问题…

如何用python打印田字格_如何用 3D 打印一双顶级跑鞋回形针

马拉松赛场上的跑鞋五光十色&#xff0c;除了美观之外&#xff0c;它们还有一个重要作用&#xff1a;保护运动员不受伤害。跑步时&#xff0c;从脚碰到地面开始会受到地面的反作用力。从着陆的减速到随着脚步推蹬过程的加速&#xff0c;这一过程中脚和地面的压力会达到两个峰值…

什么可以作为gcroot_面包果既能当水果又可以作为粮食,国内却无法普及,这是为什么?...

水果是我们大多数人日常都会吃的一种食物&#xff0c;现在国内的水果种类也是非常多&#xff0c;不管是国内本有的还是从国外进口的。但是你吃过面包果吗&#xff1f;可千万不要把它和非洲大陆上的面包树混为一谈&#xff0c;面包果和它半毛钱关系没有&#xff0c;它的原产地在…

cookies的存值问题

2019独角兽企业重金招聘Python工程师标准>>> cookies存值问题&#xff0c;项目中遇到问题初始化时由于cookies中存在冒号导致存到cookies中时取出来被转码了&#xff0c;冒号的转码从cookies中取出来为%A3 &#xff0c;当然一开始我的设想是保存到cookies换一种保存…

c++ 一个函数包括多个返回值判断_Python函数的概念和使用

函数为了便于程序的维护和更好的实现模块化&#xff0c;好的程序都会分解为很多函数。可以这么说&#xff0c;对于任何的编程语言&#xff0c;函数都是一个非常重要的概念。python 不仅简化了函数的定义过程&#xff0c;而且还大量借鉴了其他函数编程语言中的优秀特性。本章内容…

微型计算机中JNZ,微机原理jnz是什么指令_微机原理内存分配图

微机原理 数据传送指令微机原理 4指令系统 3通过数据传输地址 地址传送指令 标志传送指令微机原理 4指令系统 3通过数据传输地址 地址传送指令 标志传送指令微机原理指令 微机原理sub指令 csdn微机原理 4指令系统 3通过数据传输地址 地址传送指令 标志传送指令微机原理 4指令系…

分段处理_连续油管无限级可开关固井滑套分段压裂工艺

连续油管无限级可开关固井滑套分压工艺具有无需射孔、处理级数不受限制、施工效率高等特点&#xff0c;同时可为后期水平井控水及重复压裂提供井筒条件。作业前滑套与套管管柱一趟下入井内&#xff0c;正常进行固井后&#xff0c;依靠固井水泥实现压裂层间封隔。通过连续油管下…

access 根据id删除数据_小程序云开发之数据库自动备份丨云开发101

钻石有价&#xff0c;数据无价。我们通常会把重要的业务数据存放在数据库中&#xff0c;并需要对数据库做定时的自动备份工作&#xff0c;防止数据异常丢失&#xff0c;造成无法挽回的损失。小程序云开发提供了方便的云数据库供我们直接使用&#xff0c;云开发使用了腾讯云提供…

国家职业资格计算机调试维修技师试题,电工国家职业资格三级(高级)理论试题...

电工国家职业资格三级理论试题一、单选题(第1题&#xff5e;第60题。选择一个正确的答案&#xff0c;将相应的字母填入题内的括号中。每题1.0分&#xff0c;满分60分。)1.异步测速发电机的空心杯转子是用( )材料做成的。(A)低电阻 (B)高电阻 (C)低导磁 (D…

PHP解决方案@时间差异计算函数

为什么80%的码农都做不了架构师&#xff1f;>>> 方案解决目标&#xff1a;计算时间差异 function ago($time){ $periods array("second","minute","hour","day","week","month","year",…

baidumap vue 判断范围_vue 数据渲染

本文转载于 SegmentFault 社区社区专栏&#xff1a;山外de楼作者&#xff1a;山外de楼前言 vue 是如何将编译器中的代码转换为页面真实元素的&#xff1f;这个过程涉及到模板编译成 AST 语法树&#xff0c;AST 语法树构建渲染函数&#xff0c;渲染函数生成虚拟 dom&#xff0c;…

jtoken判断是否包含键_Redis 数据库、键过期的实现

今天看看作为内存数据库&#xff0c;Redis 是怎么存储数据的以及键是怎么过期的。阅读这篇文章你将会了解到&#xff1a;Redis 的数据库实现Redis 键过期的策略数据库的实现我们先看代码 server.h/redisServerstruct redisServer{ ... //保存 db 的数组 redisDb *db; //db 的数…

JBoss配置详解

为什么80%的码农都做不了架构师&#xff1f;>>> 2.0.1 JBOSS 的一点说明 $JBOSS-HOME/server/下有3个目录&#xff0c;all/default/minimal&#xff0c;它们是表示3种配置&#xff0c;全部的配置、默认配置、最小配置&#xff0c;我们在启动JBOSS服务时&#xff0c…