RabbitMQ从原理到实战—基于Golang【万字详解】

文章目录

  • 前言
  • 一、MQ是什么?
    • 优势
    • 劣势
  • 二、MQ的用途
    • 1、应用解耦
    • 2、异步加速
    • 3、削峰填谷
    • 4、消息分发
  • 三、RabbitMQ是什么
    • 1、AMQP 协议
    • 2、RabbitMQ 包含的要素
    • 3、RabbitMQ 基础架构
  • 四、实战
    • 1、Simple模式(即最简单的收发模式)
    • 2、Work Queues 模型
    • 3、Publish/Subscribe 模型
    • 4、Routing 模型
    • 5、Topics 模型


前言

最近秋招开始找工作,顺便回顾消息队列并且总结。

一、MQ是什么?

消息队列(Message Queue)是一种在应用程序之间传递消息的通信模式。它通过在发送者和接收者之间建立一个消息队列来实现异步通信和解耦。

在消息队列模式中,发送者(Producer)将消息发送到一个中间件(Message Broker)中的消息队列,而接收者(Consumer)则从该队列中接收和处理消息。这种方式使得发送者和接收者可以独立地进行处理,而无需直接交互,从而实现解耦。发送者和接收者只需要知道如何与消息队列进行通信,而不需要知道彼此的存在。

优势

1. 异步通信:发送者将消息放入队列后即可继续进行其他操作,无需等待接收者的响应。接收者可以在合适的时候从队列中获取消息进行处理,实现了异步通信模式。

2. 解耦:发送者和接收者之间通过消息队列进行通信,彼此之间不直接耦合。发送者只需将消息发送到队列中,而不需要知道消息是如何被处理的。接收者只需从队列中获取消息进行处理,而不需要知道消息的来源。

3. 可靠性传输:消息队列通常提供持久化机制,确保消息在发送和接收过程中不会丢失。即使接收者暂时不可用,消息也会在队列中等待,直到接收者准备好接收为止。

4. 扩展性:消息队列可以支持多个发送者和接收者,实现系统的扩展性和高并发处理能力。

5. 缓冲和削峰填谷:通过将消息缓存到队列中,可以平衡发送者和接收者之间的处理速度差异,从而避免系统过载。

消息队列在分布式系统、微服务架构、异步任务处理、事件驱动架构等场景中被广泛应用。一些常见的消息队列系统包括RabbitMQ、Apache Kafka、ActiveMQ、Amazon SQS等。它们提供了丰富的功能和配置选项,可以根据应用需求选择合适的消息队列实现。

劣势

系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
系统复杂度提高
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

二、MQ的用途

四个用途
应用解耦:提高系统容错性和可维护性
异步提速:提升用户体验和系统吞吐量
削峰填谷:提高系统稳定性
消息分发:提高系统灵活性

1、应用解耦

应用解耦是指通过使用消息队列等中间件来降低应用程序之间的直接依赖性,从而实现独立开发、部署和升级的能力。通过解耦,每个应用程序可以通过消息队列发送和接收消息,而不需要了解其他应用程序的具体实现细节。通过应用解耦,可以实现系统的松耦合架构,提高系统的可维护性、扩展性和容错性。
没有使用MQ:
在这里插入图片描述

  • 系统的耦合性越高,容错性就越低,可维护性就越低。
  • 在这里插入图片描述使用 MQ 使得应用间解耦,提升容错性和可维护性。

2、异步加速

异步提速是指通过将耗时的操作转化为异步执行,从而提高系统的响应速度和吞吐量。通过异步处理,应用程序可以在等待某个操作完成的同时继续执行其他任务,而不需要阻塞等待结果返回。
例如,当一个应用程序需要进行网络请求并等待响应时,如果采用同步方式,应用程序会被阻塞,直到响应返回才能继续执行其他任务。而通过异步方式,应用程序可以继续执行其他任务,不需要等待网络请求的结果返回。这样可以提高系统的响应速度,使用户获得更好的体验。
没有使用MQ:
在这里插入图片描述
一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!
使用MQ:
在这里插入图片描述

用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。不需要的等待完成

3、削峰填谷

削峰填谷是一种通过平衡系统负载,减轻峰值压力和填充低谷时的资源利用率的技术。它的目标是在系统负载波动较大的情况下,合理利用资源,确保系统的稳定性和高效性。
没有使用MQ:
在这里插入图片描述

使用MQ:
在这里插入图片描述在这里插入图片描述
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做填谷。简单来说就是慢慢分发
使用MQ后,可以提高系统稳定性。

4、消息分发

消息分发是一种将消息从发送者传递到接收者的机制,它在异步系统和事件驱动架构中起着重要的作用。消息分发可以实现解耦和灵活性,允许不同组件或模块之间通过消息进行通信,从而实现系统的松耦合和可扩展性。
下面是消息分发的一些关键概念和示例:

发布者(Publisher):发布者是消息分发系统中的发送者,它负责生成并发布消息。发布者将消息发送到消息分发系统,而不需要知道消息的具体接收者。

订阅者(Subscriber):订阅者是消息分发系统中的接收者,它通过订阅特定的消息或消息类型来表明自己对消息的兴趣。当有匹配的消息到达时,消息分发系统会将消息传递给订阅者。

主题(Topic):主题是消息分发系统中用于分类和组织消息的标识符或名称。发布者可以将消息发布到特定的主题,而订阅者可以选择订阅感兴趣的主题。通过主题,可以实现消息的细粒度过滤和选择性订阅。

三、RabbitMQ是什么

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求可能比较低了。

1、AMQP 协议

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。
AMQP三层协议:
Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
Session Layer:中间层,主要负责客户端命会发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。

AMQP组件:
交换器(Exchange):消息代理服务器中用于把消息路由到队列的组件。
队列(queue):用来存储消息的数据结构,位于硬盘或内存中。
绑定(Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。

2、RabbitMQ 包含的要素

生产者:消息队列创建者,发送消息到MQ
消费者:连接到RabbitMQ,订阅到队列上,消费消息,持续订阅和单条订阅
消息:包含有效载荷和标签,有效载荷指要传输的数据,标签描述了有效载荷,并且RabbitMQ用它来决定谁获得消息,消费者只能拿到有效载荷,并不知道生产者是谁

3、RabbitMQ 基础架构

在这里插入图片描述
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。是生产者、消费者与RabbitMQ通信的渠道,生产者publish或是消费者subscribe 一个队列都是通过信道来通信的。
信道是建立在TCP连接上的虚拟连接,就是说RabbitMQ在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在RabbitMQ都有一个唯一的ID,保证了信道私有性,对应上唯一的线程使用。
Exchange交换机:message 到达 broker 的第一站**,根据分发规则,匹配查询表中的 routing key,分发消息到queue中去。生产者将消息发送到交换器,有交换器将消息路由到一个或者多个队中。当路由不到时,或返回给生产者或直接丟弃。
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding信息被保存到 exchange 中的查询表中,用于 message 的分发依据

四、实战

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

1、Simple模式(即最简单的收发模式)

消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
消费者:

package mainimport ("log""github.com/streadway/amqp"
)func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法连接到RabbitMQ服务器:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("无法创建通道:%s", err)}defer ch.Close()// 声明一个队列queue, err := ch.QueueDeclare("hello", // 队列名false,   // 持久性false,   // 自动删除false,   // 独占false,   // 等待服务器确认nil,     // 参数)if err != nil {log.Fatalf("无法声明队列:%s", err)}// 消费消息msgs, err := ch.Consume(queue.Name, // 队列名"",         // 消费者标签true,       // 自动确认false,      // 独占false,      // 不等待服务器确认false,      // 参数)if err != nil {log.Fatalf("无法注册消费者:%s", err)}// 处理接收到的消息for msg := range msgs {log.Printf("接收到消息:%s", msg.Body)}
}

上述代码首先建立了与RabbitMQ服务器的连接,然后创建了一个通道和一个名为"heo"的队列。接下来,通过ch.Consume函数注册一个消费者,用于从队列中接收消息。在fo循环中,我们处理接收到的消息,这里只是简单地打印出来。
生产者:

package mainimport ("log""github.com/streadway/amqp"
)func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法连接到RabbitMQ服务器:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("无法创建通道:%s", err)}defer ch.Close()// 声明一个队列queue, err := ch.QueueDeclare("hello", // 队列名false,   // 持久性false,   // 自动删除false,   // 独占false,   // 等待服务器确认nil,     // 参数)if err != nil {log.Fatalf("无法声明队列:%s", err)}// 发送消息body := "Hello, RabbitMQ!"err = ch.Publish("",         // 交换机queue.Name, // 队列名false,      // 必须发送到队列false,      // 不等待服务器确认amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),},)if err != nil {log.Fatalf("无法发送消息:%s", err)}log.Printf("消息已发送:%s", body)
}

上述代码与消费者程序类似,首先建立了与RabbitMQ服务器的连接,然后创建了一个通道和一个名为"hello"的队列。接下来,通过ch.Publishi函数向队列发送一条消息。

2、Work Queues 模型

消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关[syncronize]保证一条消息只能被一个消费者使用)。
让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
消费者:

package mainimport ("fmt""log""math/rand""time""github.com/streadway/amqp"
)func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法连接到RabbitMQ服务器:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("无法创建通道:%s", err)}defer ch.Close()// 启动多个消费者并行处理任务for i := 1; i <= 3; i++ {go startConsumer(i, ch)}// 阻塞主进程select {}
}func generateTask(id int) string {time.Sleep(time.Duration(rand.Intn(3)) * time.Second)return fmt.Sprintf("Task %d", id)
}func startConsumer(id int, ch *amqp.Channel) {// 声明一个队列queue, err := ch.QueueDeclare("tasks_queue", // 队列名true,          // 持久性false,         // 自动删除false,         // 独占false,         // 等待服务器确认nil,           // 参数)if err != nil {log.Fatalf("无法声明队列:%s", err)}// 消费任务msgs, err := ch.Consume(queue.Name, // 队列名"",         // 消费者标签false,      // 手动确认false,      // 不等待服务器确认false,      // 不使用内置的参数false,      // 参数nil,           // 参数)if err != nil {log.Fatalf("无法注册消费者:%s", err)}for msg := range msgs {task := string(msg.Body)log.Printf("消费者 %d 接收到任务:%s", id, task)log.Printf("消费者 %d 完成任务:%s", id, task)// 手动确认任务已处理msg.Ack(false)}
}

利用协城启动多个消费者进行消费。
结果如下:
在这里插入图片描述

3、Publish/Subscribe 模型

每个消费者监听自己的队列。
生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
在RabbitMQ的Publish/Subscribe模型中,生产者将消息发送到交换机,交换机负责将消息广播给所有绑定到它上面的队列。消费者创建队列并将其绑定到交换机上,从而接收交换机发送的消息。这样,一个消息可以被多个消费者接收。
在这里插入图片描述

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs",   // 交换机名称"fanout", // 交换机类型true,     // 是否持久化false,    // 是否自动删除false,    // 是否内部使用false,    // 是否等待服务器响应nil,      // 其他属性)failOnError(err, "Failed to declare an exchange")// 发布消息到交换机body := "Hello, RabbitMQ!"err = ch.Publish("logs", // 交换机名称"",     // 路由键,留空表示广播给所有队列false,  // 是否等待服务器响应false,  // 其他属性amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),},)failOnError(err, "Failed to publish a message")log.Printf("Message sent: %s", body)
}

连接到RabbitMQ服务器,声明了一个名为"logs"的交换机,并通过调用ch.Publish方法将消息发布到交换机上。
在示例代码中,通过指定交换机名称为"logs",路由键为空字符串,消息将被广播给所有绑定到该交换机的队列。

package mainimport ("fmt""log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs",   // 交换机名称"fanout", // 交换机类型true,     // 是否持久化false,    // 是否自动删除false,    // 是否内部使用false,    // 是否等待服务器响应nil,      // 其他属性)failOnError(err, "Failed to declare an exchange")// 声明一个临时队列q, err := ch.QueueDeclare("",    // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true,  // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil,   // 其他属性)failOnError(err, "Failed to declare a queue")// 将队列绑定到交换机上err = ch.QueueBind(q.Name, // 队列名称"",     // 路由键,留空表示接收交换机的所有消息"logs", // 交换机名称false,  // 是否等待服务器响应nil,    // 其他属性)failOnError(err, "Failed to bind a queue")// 订阅消息msgs, err := ch.Consume(q.Name, // 队列名称"",     // 消费者标识符,留空表示由RabbitMQ自动生成true,   // 是否自动应答false,  // 是否独占模式(仅限于当前连接)false,  // 是否等待服务器响应false,  // 其他属性nil,    // 其他属性)failOnError(err, "Failed to register a consumer")// 接收消息的goroutinego func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages. To exit press CTRL+C")<-make(chan struct{}) // 阻塞主goroutine
}

它连接到RabbitMQ服务器,声明一个fanout类型的交换机(Exchange),创建一个临时队列,将队列绑定到交换机上,并订阅消息。

在示例代码中,创建的交换机名为"logs",交换机类型为"fanout",表示消息将被广播给所有绑定到该交换机的队列。

消费者创建了一个临时队列,并将其绑定到交换机上,这样交换机就会将消息发送到该队列中。

4、Routing 模型

在fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在这里插入图片描述

在Direct模型下:

1、队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
2、消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
3、Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

消息生产者将消息发送给交换机按照路由判断,路由是字符串(info)当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息。
生产者

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_direct", // 交换机名称"direct",      // 交换机类型true,          // 是否持久化false,         // 是否自动删除false,         // 是否内部使用false,         // 是否等待服务器响应nil,           // 其他属性)failOnError(err, "Failed to declare an exchange")// 从命令行参数获取要发送的路由键和消息内容if len(os.Args) < 3 {log.Fatalf("Usage: %s [info] [message]", os.Args[0])}severity := os.Args[1]message := strings.Join(os.Args[2:], " ")// 发布消息到交换机,并指定路由键err = ch.Publish("logs_direct", // 交换机名称severity,      // 路由键false,         // 是否等待服务器响应false,         // 是否立即将消息写入磁盘amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),},)failOnError(err, "Failed to publish a message")log.Printf("Sent message: %s", message)
}

它连接到RabbitMQ服务器,声明一个direct类型的交换机(Exchange),并通过指定路由键将消息发布到交换机。

在示例代码中,创建的交换机名为"logs_direct",交换机类型为"direct",表示消息将根据指定的路由键进行选择性地发送给队列。

生产者从命令行参数获取要发送的路由键和消息内容。路由键可以是任意字符串,用于标识消息的类型或者级别。消息内容可以是任意文本。
消费者

package mainimport ("fmt""log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_direct", // 交换机名称"direct",      // 交换机类型true,          // 是否持久化false,         // 是否自动删除false,         // 是否内部使用false,         // 是否等待服务器响应nil,           // 其他属性)failOnError(err, "Failed to declare an exchange")// 声明一个临时队列q, err := ch.QueueDeclare("",    // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true,  // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil,   // 其他属性)failOnError(err, "Failed to declare a queue")// 从命令行参数获取要绑定的路由键if len(os.Args) < 2 {log.Fatalf("Usage: %s [info] [warning] [error]", os.Args[0])}severities := os.Args[1:]// 将队列绑定到交换机上,并指定要接收的路由键for _, severity := range severities {err = ch.QueueBind(q.Name,        // 队列名称severity,      // 路由键"logs_direct", // 交换机名称false,         // 是否等待服务器响应nil,           // 其他属性)failOnError(err, "Failed to bind a queue")}// 订阅消息msgs, err := ch.Consume(q.Name, // 队列名称"",     // 消费者标识符,留空表示由RabbitMQ自动生成true,   // 是否自动应答false,  // 是否独占模式(仅限于当前连接)false,  // 是否等待服务器响应false,  // 其他属性nil,    // 其他属性)failOnError(err, "Failed to register a consumer")// 接收消息的goroutinego func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages. To exit press CTRL+C")<-make(chan struct{}) // 阻塞主goroutine
}

上述代码实现了一个Routing模型的消费者。它连接到RabbitMQ服务器,声明一个direct类型的交换机(Exchange),创建一个临时队列,并将队列绑定到交换机上,同时指定要接收的路由键。

在RabbitMQ的Routing模型中,生产者将消息发送到交换机,并在发送消息时指定一个路由键(routing key)。交换机根据路由键将消息发送给与之绑定的队列。消费者创建队列并将其绑定到交换机上,并通过指定要接收的路由键来选择性地接收消息。

在示例代码中,创建的交换机名为"logs_direct",交换机类型为"direct",表示消息将根据指定的路由键进行选择性地发送给队列。

消费者创建了一个临时队列,并通过循环将该队列绑定到交换机上,并指定要接收的路由键。路由键可以是任意字符串,用于标识消息的类型或者级别。在示例中,我们通过命令行参数传入要绑定的路由键。

最后,消费者通过调用ch.Consume方法订阅消息。该方法返回一个消息通道msgs,消费者可以从该通道接收到消息。在示例中,我们使用一个goroutine来异步接收消息,并在收到消息时打印出来。

5、Topics 模型

在这里插入图片描述
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

统配符
* 匹配不多不少恰好1个词
# 匹配一个或多个词

如:
fan.# 匹配 fan.one.two 或者 fan.one 等
fan.* 只能匹配 fan.one
生产者

func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_topic", // 交换机名称"topic",      // 交换机类型true,         // 是否持久化false,        // 是否自动删除false,        // 是否内部使用false,        // 是否等待服务器响应nil,          // 其他属性)failOnError(err, "Failed to declare an exchange")// 定义要发送的消息的路由键和内容routingKey := "example.key.das"message := "Hello, RabbitMQ!"// 发布消息到交换机,并指定路由键err = ch.Publish("logs_topic", // 交换机名称routingKey,   // 路由键false,        // 是否等待服务器响应false,        // 是否立即发送amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),},)failOnError(err, "Failed to publish a message")log.Printf("Sent message: %s", message)
}

消费者


func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_topic", // 交换机名称"topic",      // 交换机类型true,         // 是否持久化false,        // 是否自动删除false,        // 是否内部使用false,        // 是否等待服务器响应nil,          // 其他属性)failOnError(err, "Failed to declare an exchange")// 声明一个临时队列q, err := ch.QueueDeclare("",    // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true,  // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil,   // 其他属性)failOnError(err, "Failed to declare a queue")// 将队列绑定到交换机上,并指定要接收的路由键err = ch.QueueBind(q.Name,       // 队列名称"example.#",  // 路由键,可以使用通配符*匹配多个单词"logs_topic", // 交换机名称false,        // 是否等待服务器响应nil,          // 其他属性)failOnError(err, "Failed to bind a queue")// 创建一个消费者通道msgs, err := ch.Consume(q.Name, // 队列名称"",     // 消费者标识符,留空表示由RabbitMQ自动生成true,   // 是否自动应答false,  // 是否排他消费者false,  // 是否阻塞false,  // 是否等待服务器响应nil,    // 其他属性)failOnError(err, "Failed to register a consumer")// 接收和处理消息forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages...")<-forever
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/61301.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++贪吃蛇(控制台版)

C自学精简实践教程 目录(必读) 目录 主要考察 需求 输入文件 运行效果 实现思路 枚举类型 enum class 启动代码 输入文件data.txt 的内容 参考答案 学生实现的效果 主要考察 模块划分 文本文件读取 UI与业务分离 控制台交互 数据抽象 需求 用户输入字母表示方…

学习JAVA打卡第四十七天

日期的格式化 程序可能希望按照某种习惯来输出时间。例如时间的顺序&#xff1a;年/月/日或年/月/日/时/分/秒。可以直接使用String类调用format方法对日期进行格式化。 Format方法 Format方法&#xff1a; format&#xff08;格式化模式,日期列表&#xff09; 按照“格式…

T8461 ICS TRIPLEX 通过商业编程工具进行离线编辑

T8461 ICS TRIPLEX 通过商业编程工具进行离线编辑 ICS Triplex isa graf及其日本总经销商Nissin高兴地宣布Almega AX-V6焊接机器人获得成功。由国际公认的弧焊电源和工业机器人供应商OTC DAIHEN Corporation制造的Almega机器人以其高速和平稳运动而闻名&#xff0c;可缩短生产…

NameError: name ‘_mysql‘ is not defined

报错信息 Traceback (most recent call last):File "/Users/xuruilong/Desktop/cmabc_back/.enve/lib/python3.9/site-packages/MySQLdb/__init__.py", line 18, in <module>from . import _mysql ImportError: dlopen(/Users/xuruilong/Desktop/cmabc_back/.…

提高工作效率的一键查询和保存大量快递物流信息的技巧

在如今快速发展的电商行业中&#xff0c;物流服务的准确与便捷是保证顺利交付商品的重要环节。为了方便用户追踪物流&#xff0c;固乔快递查询助手应运而生。这款软件不仅能够快速查询快递单号的物流信息&#xff0c;还具备保存查询结果的功能&#xff0c;方便用户随时查看。 首…

泰迪大数据实训平台产品介绍

大数据产品包括&#xff1a;大数据实训管理平台、大数据开发实训平台、大数据编程实训平台等 大数据实训管理平台 泰迪大数据实训平台从课程管理、资源管理、实训管理等方面出发&#xff0c;主要解决现有实验室无法满足教学需求、传统教学流程和工具低效耗时和内部教学…

【UE5】给模型指定面添加自定义材质

实现步骤 1. 首先我们向UE中导入一个简单的模型&#xff0c;可以看到目前该模型的材质插槽只有一个&#xff0c;当我们修改材质时会使得模型整体的材质全部改变&#xff0c;如果我们只想改变模型的某些面的材质就需要继续做后续操作。 2. 选择建模模式 3. 在模式工具栏中点击…

基于STM32的空气质量检测LCD1602显示报警仿真设计(仿真+程序+讲解)

本设计 基于STM32的空气质量检测报警仿真设计(仿真程序讲解&#xff09; 1.主要功能2.仿真3. 程序4. 资料清单&下载链接 基于STM32的空气质量检测报警仿真设计(仿真程序讲解&#xff09; 仿真图proteus 8.9 程序编译器&#xff1a;keil 5 编程语言&#xff1a;C语言 设…

Java虚拟机内部组成

1、栈区 public class Math {public int compute(){//一个方法对应一块栈帧内存区域int a l;int b 2;int c (a b)*10;return c; } public static void main(String[] args){Math math new, Math() ;math.compute() ;System.out.println("test");}} 栈是先进后出…

怎么用postman连接websocket

点击右侧栏的Collections&#xff0c;然后点击旁边的New&#xff0c;然后点击其中的WebSocket Request,然后输入Url&#xff0c;点击Connection&#xff0c;这里需要注意的是Url不能加上http://&#xff0c;因为这个不是http协议。

Springboot 接口方式硬通知实现 动态刷新配置值,@ConfigurationProperties 、@Value 都可以

前言 看到这个文章标题&#xff0c;也许有的看官就觉得很多余&#xff0c; 因为Nacos 可以设置 NacosValue(value "${XXX}",autoRefreshed true) 实现动态刷新&#xff1b; 又因为cloud config的RefreshScope 实现动态刷新&#xff1b; 还有阿波罗...等 这…

进程Start

Linux中的命令解释器和Windows的程序管理器explorer.exe一样地位,都是在用户态下运行的进程 共享变量发生不同进程间的指令交错&#xff0c;就可能会数据出错 进程只作为除CPU之外系统资源的分配单位 CPU的分配单位是线程 每个进程都有自己的独立用户空间 内核空间是OS内核的…

docker-compose管理创建LNMP服务并运行Wordpress网站平台

文章目录 一&#xff0e;项目环境1. 环境描述2.项目需求 二&#xff0e;部署过程1.安装Docker2.安装Docker加速器3.Docker-Compose安装部署4.准备依赖文件、配置nginx5.配置mysql6.配置php7.编写docker-compose.yml8.验证 三.容器快照&#xff0c;然后将Docker镜像打包成tar包备…

springmvc没有绿标,怎么配置tomcat插件运行?

一、添加插件后&#xff0c;刷新&#xff0c;自动从maven仓库下载tomcat插件 二、写好项目后&#xff0c;添加tomcat配置 三、即可点击绿标运行

【广州华锐互动】AR昆虫认知学习系统实现对昆虫形态的捕捉和还原

随着科技的不断发展&#xff0c;人们对自然界的认识也在不断加深。在这个过程中&#xff0c;AR&#xff08;增强现实&#xff09;技术的出现为人们带来了全新的体验方式。为此&#xff0c;广州华锐互动开发了AR昆虫认知学习系统&#xff0c;本文将为大家详细介绍这款系统的特点…

jvm的内存划分区域

jvm划分5个区域&#xff1a; java虚拟机栈、本地方法栈、堆、程序计数器、方法区。 各个区各自的作用&#xff1a; 1.本地方法栈&#xff1a;用于管理本地方法的调用&#xff0c;里面并没有我们写的代码逻辑&#xff0c;其由native修饰&#xff0c;由 C 语言实现。 2.程序计数…

HTML基础--Form表单--内联元素

目录 Form表单 表单元素 创建表单 () 文本输入 () 密码输入 单选按钮 () 和 复选框 () 下拉列表 () 和 选项 ()提交按钮 () 重置按钮 () 块元素与行内元素&#xff08;内联元素&#xff09; Form表单 HTML中的表单&#xff08;<form>&#xff09;是一个重要的元…

Kubernetes技术-Kubernetes集群环境搭建准备

1.搭建环境规划 在搭建k8s的时候可以分为两种: 单master集群,故名思意,只有一个master管理节点和多个node节点。如下图所示: 多master集群,故名思意,有多个master管理节点和多个node节点。如下图所示: 2.服务器硬件要求 (1).测试环境要求(教学、研究等环境) Master节点:至…

[蓝桥复盘] 算法赛内测赛2 20230831

[蓝桥复盘] 算法赛内测赛2 20230831 总结新一与基德的身高大战1. 题目描述2. 思路分析3. 代码实现 肖恩的投球游戏加强版1. 题目描述2. 思路分析3. 代码实现 体育健将1. 题目描述2. 思路分析3. 代码实现 小桥的奇异旋律1. 题目描述2. 思路分析3. 代码实现 区间or划分1. 题目描…

css让多个盒子强制自动等宽

1.width: calc( 100 / n% ) 2.display:flex; flex:1;width:100px; 3.display:grid;grid-template-columns: repeat(auto-fit, minmax(100px, 1fr)); 但是其中某一个内容较长的时候 会破坏1:1:1的平衡 这个时候发现附件名字过长导致不等比例&#xff0c;通过查看阮一峰flex文…