【RabbitMQ】golang客户端教程2——工作队列

任务队列/工作队列

在这里插入图片描述
在上一个教程中,我们编写程序从命名的队列发送和接收消息。在这一节中,我们将创建一个工作队列,该队列将用于在多个工人之间分配耗时的任务。

工作队列(又称任务队列)的主要思想是避免立即执行某些资源密集型任务并且不得不等待这些任务完成。相反,我们安排任务异步地同时或在当前任务之后完成。我们将任务封装为消息并将其发送到队列,在后台运行的工作进程将取出消息并最终执行任务。当你运行多个工作进程时,任务将在他们之间共享。

这个概念在Web应用中特别有用,因为在Web应用中不可能在较短的HTTP请求窗口内处理复杂的任务,(译注:例如注册时发送邮件或短信验证码等场景)。

准备工作

在本教程的上一部分,我们发送了一条包含“ Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有实际的任务,例如调整图像大小或渲染pdf文件,所以我们通过借助time.Sleep函数模拟一些比较耗时的任务。我们会将一些包含.的字符串封装为消息发送到队列中,其中每有一个.就表示需要耗费1秒钟的工作,例如,hello...表示一个将花费三秒钟的假任务。

我们将稍微修改上一个示例中的send.go代码,以允许从命令行发送任意消息。该程序会将任务安排到我们的工作队列中,因此我们将其命名为new_task.go

body := bodyFrom(os.Args)  // 从参数中获取要发送的消息正文
err = ch.Publish("",           // exchangeq.Name,       // routing keyfalse,        // mandatoryfalse,amqp.Publishing {DeliveryMode: amqp.Persistent,ContentType:  "text/plain",Body:         []byte(body),})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

下面是bodyFrom函数:

func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s
}

我们以前的receive.go程序也需要进行一些更改:它需要为消息正文中出现的每个.伪造一秒钟的工作。它将从队列中弹出消息并执行任务,因此我们将其称为worker.go

msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args
)
failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dot_count := bytes.Count(d.Body, []byte("."))  // 数一下有几个.t := time.Duration(dot_count)time.Sleep(t * time.Second)  // 模拟耗时的任务log.Printf("Done")}
}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

请注意,我们的假任务模拟执行时间。

然后,我们就可以打开两个终端,分别执行new_task.goworker.go了。

# shell 1
go run worker.go
# shell 2
go run new_task.go

循环调度

使用任务队列的优点之一是能够轻松并行化工作。如果我们的工作正在积压,我们可以增加更多的工人,这样就可以轻松扩展。

首先,让我们尝试同时运行两个worker.go脚本。它们都将从队列中获取消息,但是究竟是怎样呢?让我们来看看。

你需要打开三个控制台。其中两个将运行worker.go脚本。这些控制台将成为我们的两个消费者——C1和C2。

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们将发布新任务。启动消费者之后,你可以发布一些消息:

# shell 3
go run new_task.go msg1.
go run new_task.go msg2..
go run new_task.go msg3...
go run new_task.go msg4....
go run new_task.go msg5.....

然后我们在shell1shell2 两个窗口看到如下输出结果了:

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg1.
# => [x] Received a message: msg3...
# => [x] Received a message: msg5.....
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg2..
# => [x] Received a message: msg4....

默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。使用三个或者更多worker试一下。

消息确认

work 完成任务可能需要耗费几秒钟,如果一个worker在任务执行过程中宕机了该怎么办呢?我们当前的代码中,RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。在这种情况下,如果你终止一个worker那么你就可能会丢失这个任务,我们还将丢失所有已经交付给这个worker的尚未处理的消息。

我们不想丢失任何任务,如果一个worker意外宕机了,那么我们希望将任务交付给其他worker来处理。

为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发送回一个确认(acknowledgement),以告知RabbitMQ已经接收,处理了特定的消息,并且RabbitMQ可以自由删除它。

如果使用者在不发送确认的情况下死亡(其通道已关闭,连接已关闭或TCP连接丢失),RabbitMQ将了解消息未完全处理,并将对其重新排队。如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。这样,您可以确保即使工人偶尔死亡也不会丢失任何消息。

没有任何消息超时;RabbitMQ将在消费者死亡时重新传递消息。即使处理一条消息需要很长时间也没关系。

在本教程中,我们将使用手动消息确认,方法是为“auto-ack”参数传递一个false,然后在完成任务后,使用d.Ack(false)worker发送一个正确的确认(这将确认一次传递)。

msgs, err := ch.Consume(q.Name, // queue"",     // consumerfalse,  // 注意这里传false,关闭自动消息确认false,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args
)
if err != nil {fmt.Printf("ch.Consume failed, err:%v\n", err)return
}// 开启循环不断地消费消息
forever := make(chan bool)
go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dotCount := bytes.Count(d.Body, []byte("."))t := time.Duration(dotCount)time.Sleep(t * time.Second)log.Printf("Done")d.Ack(false) // 手动传递消息确认}
}()

使用这段代码,我们可以确保即使你在处理消息时使用CTRL+C杀死一个worker,也不会丢失任何内容。在worker死后不久,所有未确认的消息都将被重新发送。

消息确认必须在接收消息的同一通道(Channel)上发送。尝试使用不同的通道(Channel)进行消息确认将导致通道级协议异常。

忘记确认

忘记确认是一个常见的错误。这是一个简单的错误,但后果是严重的。当你的客户机退出时,消息将被重新传递(这看起来像随机重新传递),但是RabbitMQ将消耗越来越多的内存,因为它无法释放任何未确认的消息。

为了调试这种错误,可以使用rabbitmqctl打印messages_unacknowledged字段:

> sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
> ```
>
> 在Windows平台,去掉sudo
>
> ```bash
> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
> ```### 消息持久化我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止运行,我们的任务仍然会丢失。当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。首先,我们需要确保队列能够在RabbitMQ节点重新启动后继续运行。为此,我们需要声明它是持久的:```go
q, err := ch.QueueDeclare("hello", // nametrue,    // 声明为持久队列false,   // delete when unusedfalse,   // exclusivefalse,   // no-waitnil,     // arguments
)

虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许你使用不同的参数重新定义现有队列,并将向任何尝试重新定义的程序返回错误。但是有一个快速的解决方法——让我们声明一个具有不同名称的队列,例如task_queue

q, err := ch.QueueDeclare("task_queue", // nametrue,         // 声明为持久队列false,        // delete when unusedfalse,        // exclusivefalse,        // no-waitnil,          // arguments
)

这种持久的选项更改需要同时应用于生产者代码和消费者代码。

在这一点上,我们确信即使RabbitMQ重新启动,任务队列队列也不会丢失。现在我们需要将消息标记为持久的——通过使用amqp.Publishing中的持久性选项amqp.Persistent

err = ch.Publish("",     // exchangeq.Name, // routing keyfalse,  // 立即false,  // 强制amqp.Publishing{DeliveryMode: amqp.Persistent, // 持久(交付模式:瞬态/持久)ContentType:  "text/plain",Body:         []byte(body),})

有关消息持久性的说明

将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上,但是RabbitMQ接受了一条消息并且还没有保存它时,仍然有一个很短的时间窗口。而且,RabbitMQ并不是对每个消息都执行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但是对于我们的简单任务队列来说已经足够了。如果您需要更强有力的担保,那么您可以使用publisher confirms

公平分发

你可能已经注意到调度仍然不能完全按照我们的要求工作。例如,在一个有两个worker的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个worker将持续忙碌,而另一个worker几乎不做任何工作。嗯,RabbitMQ对此一无所知,仍然会均匀地发送消息。

这是因为RabbitMQ只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。
在这里插入图片描述
为了避免这种情况,我们可以将预取计数设置为1。这告诉RabbitMQ不要一次向一个worker发出多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向worker发送新消息。相反,它将把它发送给下一个不忙的worker

err = ch.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global
)

关于队列大小的说明

如果所有的worker都很忙,你的queue随时可能会满。你会想继续关注这一点,也许需要增加更多的worker,或者有一些其他的策略。

完整的代码实例

我们的new_task.go的最终代码代入如下:

package mainimport ("github.com/streadway/amqp""log""os""strings"
)func failOnErrorNew(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}
func bodyFrom(args []string) string {var s stringif len(args) < 2 || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], "")}return s
}func main() {//建立连接conn, err := amqp.Dial("amqp://licong:123456@8.130.85.112:5672/")failOnErrorNew(err, "Failed to connect to RabbitMQ")defer conn.Close()//获取channelch, err := conn.Channel()failOnErrorNew(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("task_queue", //nametrue,         //durablefalse,        //delete when unusedfalse,        //exclusivefalse,        //no-waitnil,          //arguments)failOnErrorNew(err, "Failed to declare a queue ")body := bodyFrom(os.Args)err = ch.Publish("",     // exchangeq.Name, // routing keyfalse,  // mandatoryfalse,amqp.Publishing{DeliveryMode: amqp.Persistent, //表示将消息设置为持久模式,确保在 RabbitMQ 重启后消息能够被恢复。ContentType:  "text/plain",Body:         []byte(body),})failOnErrorNew(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)}

work.go内容如下:

package mainimport ("bytes""github.com/streadway/amqp""log""time"
)func failOnErrorWork(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}
func main() {//建立连接conn, err := amqp.Dial("amqp://licong:123456@8.130.85.112:5672/")failOnErrorWork(err, "Failed to connect to RabbitMQ")defer conn.Close()//获取channelch, err := conn.Channel()failOnErrorWork(err, "Failed to open a channel")defer ch.Close()//声明队列q, err := ch.QueueDeclare("task_queue", //nametrue,         //durablefalse,        //delete when unusedfalse,        //exclusivefalse,        //no-waitnil,          //argument)failOnErrorWork(err, "Failed to declare a queue")err = ch.Qos(1,     //prefetch count 消费者一次能获取的最大未确认消息数量0,     //prefetch size 消费者一次能获取的最大未确认消息的总大小(以字节为单位)false, //global  是否将预取配置应用于所有通道。如果为 true,则应用于所有通道;如果为 false,则仅应用于当前通道。)//获取接收消息的Delivery通道msgs, err := ch.Consume(q.Name, //queue"",     //consumerfalse,  //注意这里传false,关闭自动消息确认false,  //exclusivefalse,  //no-localfalse,  //no-waitnil,    //args)failOnErrorWork(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Reveived a message:%s", d.Body)dot_count := bytes.Count(d.Body, []byte(".")) //数一下有几个t := time.Duration(dot_count)time.Sleep(t * time.Second)log.Printf("Done")d.Ack(false) //手动传递消息确认}}()log.Printf("[*] Waiting for messages. To exit press CTRL+C")<-forever
}

源自:https://www.rabbitmq.com/getstarted.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/15736.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis 异常处理:连接失败、数据库满、缓存雪崩、缓存击穿和缓存穿透

目录 1. 连接失败2. 数据库满3. 缓存雪崩4. 缓存击穿&#xff1a;5. 缓存穿透&#xff1a; Redis使用过程中&#xff0c;可能会遇到各种异常情况&#xff0c;例如&#xff1a;连接失败、数据库满、缓存雪崩、缓存击穿和缓存穿透等。这些异常情况可能会导致系统崩溃&#xff0c;…

pip安装lap出现问题

解决方法一 用conda安装&#xff0c;用以下命令&#xff1a; conda install -c conda-forge lap解决方法二 用pip安装&#xff0c;用以下命令&#xff1a; pip install gitgit://github.com/gatagat/lap.git文章目录 解决方法一解决方法二摘要YoloV8改进策略&#xff1a;基…

opencv介绍及环境搭建

文章目录 前言一、opencv介绍二、开发环境搭建三、测试总结前言 本篇文章开始学习opencv的知识,opencv主要用于图像处理和识别,在生活中到处都是可以见到的,那么本篇文章就正式带大家来学习opencv。 一、opencv介绍 OpenCV(Open Source Computer Vision Library)是一个…

P5718 【深基4.例2】找最小值

题目描述 给出 n n n 和 n n n 个整数 a i a_i ai​&#xff0c;求这 n n n 个整数中最小值是什么。 输入格式 第一行输入一个正整数 n n n&#xff0c;表示数字个数。 第二行输入 n n n 个非负整数&#xff0c;表示 a 1 , a 2 … a n a_1,a_2 \dots a_n a1​,a2​……

短视频矩阵源码

一、短视频矩阵源码搭建解析&#xff1a; 目录 一、短视频矩阵源码搭建解析&#xff1a; 二、短视频矩阵源码的开发路径分享&#xff1a; 三、短视频矩阵系统开发应具备哪些能力&#xff1f; 短视频技术开发能力&#xff1a; 开发人员应具备短视频相关技术能力&#xff0c…

Vcenter 创建 虚拟机配置 Thin Provision 模式 disk

介绍 在vCenter中选择虚拟磁盘格式通常也取决于您的需求和使用情况。 vSphere支持多种虚拟磁盘格式&#xff0c;以下是一些常见的格式&#xff1a; Thick Provision Lazy Zeroed&#xff1a;这是vSphere中的默认格式。它会预分配虚拟磁盘所需的存储空间&#xff0c;但只有在虚…

深度学习(32)——CycleGAN

深度学习&#xff08;32&#xff09;——CycleGAN 文章目录 深度学习&#xff08;32&#xff09;——CycleGAN1. GAN原理2. CycleGAN&#xff08;1&#xff09;原理&#xff08;2&#xff09;核心思想&#xff08;3&#xff09;优点&#xff08;4&#xff09;缺点&#xff08;5…

安全测试国家标准解读——并发程序安全

本系列文章主要围绕《GB/T 38674—2020 信息安全技术 应用软件安全编程指南》进行讲解&#xff0c;该标准是2020年4月28日&#xff0c;由国家市场监督管理总局、国家标准化管理委员会发布&#xff0c;2020年11月01日开始实施。我们对该标准中一些常见的漏洞进行了梳理&#xff…

clickhouse MPPDB数据库 运维实用SQL总结III

文章目录 CH问题处理使用remote函数报URL "xxxx:9000" is not allowed in configuration fileclickhouse MPPDB数据库 运维实用SQL总结 clickhouse MPPDB数据库 运维实用SQL总结II clickhouse MPPDB数据库 运维实用SQL总结III CH server相关的配置参见 : clickhous…

2023最新Ubuntu安装部署Gitlab详细教程(每个步骤均配图)

Ubuntu安装配置Gitlab详细步骤 安装依赖 打开终端&#xff0c;运行如下命令&#xff1a; sudo apt updatesudo apt-get upgradesudo apt-get install curl openssh-server ca-certificates postfix接下来会遇到如下界面&#xff0c;Tab切换到“确定”按钮&#xff0c;然后回…

Tomcat 安装配置教程及成功后,启动失败报错解决方案

解决方案 我的报错原因是因为我的JDK是1.8的而我的Tomcat是10版本的&#xff0c;可能是因为版本原因吧&#xff0c;我重新装了Tomcat 9就可以启动成功了&#xff01; 简单说下安装的时候需要注意哪些步骤吧 今天我在安装tomcat10的时候&#xff0c;安装成功后&#xff0c;启…

RT1052的定时器

文章目录 1 通用定时器1.1 定时器框图1.2 实现周期性中断 2 相关寄存器3 定时器配置3.1 时钟使能3.2 初始化GPT1定时器3.2.1 base3.2.2 initConfig3.2.2.1 clockSorce3.2.2.2 divider3.2.2.3 enablexxxxx 3.3 设置 GPT1 比较值3.3.1 base3.3.2 channel3.3.3 value 3.4 设置 GPT…

分布式存储Ceph部署

前言 Ceph 和 GlusterFS 都是出色的分布式存储&#xff0c;其中Ceph 广泛由于Openstack以及K8S。 官网 ## 官网 https://docs.ceph.com/en/quincy/## 参考文档 https://zhuanlan.zhihu.com/p/386560160

代码随想录算法训练营第五十五天|动态规划part15|● 392.判断子序列 ● 115.不同的子序列

● 392.判断子序列 Is Subsequence - LeetCode dp[i][j] 以i - 1为结尾的和以 j - 1为结尾的子串的相同子序列长度 if (s[i - 1] t[j - 1]&#xff09;dp[i][j] dp[i - 1][j - 1] 1 else dp[i][j] dp[i][j - 1] for (int i 1; i < s.length(); i) for (int j 1; j …

AD21 PCB设计的高级应用(三)PCB多板互连装配设计

&#xff08;三&#xff09;PCB多板互连装配设计 一旦模块在多板原理图上相互连接,就可以验证板到板的连接。这将检测网络到引脚分配错误和引脚到引脚的互连布线错误。可以解决这些错误并将修改信息更新到对应的 PCB 中,或者重新更新到源系统原理图。 印制电路板不是孤立存在的…

MFC第二十三天 HBrush对闭合图形的填充、CPen、CFont类常用功能与LOGFONT和LOGPEN结构体

文章目录 HBrush对闭合图形的填充HBITMAP位图资源的加载和平铺填充CFont类常用功能与LOGFONT结构体CPen类简介 HBrush对闭合图形的填充 HBRUSH创建&#xff1a; a)实色填充&#xff1a; HBRUSH CreateSolidBrush( COLORREF color);b)栅格线填充&#xff1a; HBRUSH CreateHa…

uni-app如何生成正式的APK

第一步&#xff1a; 进入dcloud官网https://dcloud.io/&#xff0c;点击开发者后台进入登录注册页面 第二步&#xff1a;登录之后跳到项目列表&#xff0c;选择自己想要打包的项目 点击进去如果没有生成证书&#xff0c;点击生成证书&#xff0c;如果显示证书已生成就不用管了…

OpenJDK下载安装教程

《OpenJDK官网》 进入官网&#xff0c;点击Installing----》jdk.java.net 随便进去一个JDK版本 根据自己的操作系统下载对应的二进制JDK即可&#xff08;我的是Windows&#xff09; 配置环境变量 JAVA_HOME 你的JDK路径\jdk-17PATH追加内容 WINR输入CMD java -version

参数量仅有50KB的超轻量级unet变种网络egeunet【参数和计算量降低494和160倍】医疗图像分割实践

今天看到一篇挺有意思的文章&#xff0c;做的是跟医疗图像分割相关的工作&#xff0c;但是不像之前看到的一些工作一味地去追求高精度&#xff0c;因为医疗领域本身就是一个相对特殊的行业&#xff0c;对于模型产生的结果的精确性要求是很高的&#xff0c;带来的是参数量级的庞…

el-cascader 数据的回显

<el-cascaderplaceholder"试试搜索":options"allOptions":props"{ multiple: true }"v-model"options"filterable style"width: 80%;max-height:240px;overflow-y:scroll;"></el-cascader> allOptions里面包含…