RabbitMQ 发布订阅视频学习地址:
简单模式下RabbitMQ 发布者发布消息 消费者消费消息
Publist/Subscribe 发布订阅
在 RabbitMQ 中,发布订阅模式是一种消息传递方式,其中发送者(发布者)不会将消息直接发送到特 定的接收者(订阅者)。而是将消息发送到一个交换机,交换机将消息转发到绑定到该交换机的每个队 列 ,每个绑定交换机的队列都将接收到消息。消费者(订阅者)监听自己的队列 并进行消费 。
场景 : 开放平台 开发者订阅了某个开放平台的 api 之后,数据有变化就会自动获取到最新的
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
P :生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X (交换机)
C :消费者,消息的接收者,会一直等待消息到来
Queue :消息队列,接收消息、缓存消息
Exchange :交换机( X )。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递 交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange 的类型。
Exchange 有常见以下 3 种类型:
Fanout :广播,将消息交给所有绑定到交换机的队列
Direct :定向,把消息交给符合指定 routing key 的队列
Topic :通配符,把消息交给符合 routing pattern (路由模式) 的队列
Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失 !
RabbitMQ 发布订阅模式的一些应用场景:
1. 数据提供商与应用商 :例如中国气象局向多个门户网站提供气象数据。
2. 新闻机构 :将独家新闻发布给多个订阅者,但可能需要根据新闻类型进行更精细的路由。
3. 商城系统 :新添加商品后,同时更新缓存和数据库。
4. 用户通知 :用户充值或转账成功后,通过多种方式(如短信、邮件)通知用户。
5. 消息广播 :将消息广播到多个消费者,例如系统公告、活动通知等。
6. 降低耦合 :生产者和消费者通过 RabbitMQ 进行解耦,不需要直接连接,提高系统的灵活性和可
扩展性。
7. 异步处理 :生产者发送消息后,消费者可以异步处理,提高系统的响应速度和并发处理能力。
生产者
emit_log.go
package main
import (
"context"
"log"
"os"
"strings"
"github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func bodyForm(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
func main() {
// 连接到RabbitMQ服务器
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//声明一个交换机
err = ch.ExchangeDeclare(
"logs", //name 交换机名称
"fanout", //交换机类型 Fanout 广播
true, //durable 持久化
false, //autoDelete 是否自动删除
false, //internal 是否内部使用 设置为 false 时,表示无论如何这个交换器都不是
内置的
false, //noWait 是否等待服务器响应 参数通常默认为False,意味着操作会同步进
行并等待服务器的响应
nil, // 其他属性
)
failOnError(err, "Failed to declare an exchange")
//发送消息
body := bodyForm(os.Args)
// 发布消息到交换机,并指定路由键
err = ch.PublishWithContext(
context.Background(),
"logs", // 交换器的名称
"", // 队列名
false, // mandatory 必须发送到队列 ,false表示如果交换器无法根据自身的类型和路
由键找到一个符合条件的队列丢弃
false, //immediate 参数设置为 false 时,表示消息不需要立即被消费者接收
amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent: %s", body)
}
消费者
receive_log.go
package main
import (
"log"
"github.com/rabbitmq/amqp091-go"
)
func failOnError2(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func main() {
//建立连接
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError2(err, "Failed to connect to RabbitMQ")
defer conn.Close()
//创建一个Channel
ch, err := conn.Channel()
failOnError2(err, "Failed to open a channel")
defer ch.Close()
//声明一个交换机
err = ch.ExchangeDeclare(
"logs", // 交换机名称
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否内部使用
false, // 是否等待服务器响应
nil, // 其他属性
)
failOnError2(err, "Failed to declare an exchange")
// 声明一个临时队列
q, err := ch.QueueDeclare(
"", // 队列名称,留空表示由RabbitMQ自动生成
false, // 是否持久化
false, // 是否自动删除(当没有任何消费者连接时)
true, // 是否排他队列(仅限于当前连接)
false, // 是否等待服务器响应
nil, // 其他属性
)
failOnError2(err, "Failed to declare a queue")
// 将队列绑定到交换机上
err = ch.QueueBind(
q.Name, // 队列名称
"", // 路由键,留空表示接收交换机的所有消息
"logs", // 交换机名称
false, // 是否等待服务器响应
nil, // 其他属性
)
failOnError2(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识符,留空表示由RabbitMQ自动生成
true, // 是否自动应答
false, // 是否独占模式(仅限于当前连接)
false, // 是否等待服务器响应
false, // noLocal
nil, // 其他属性
)
// msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
failOnError2(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [x] Waiting for logs. To exit press CTRL+C")
<-forever
}
运行
# 如果你想保存日志文件
go run receive_log.go > logs_from_rabbit.log
# 如果你想再终端看到日志
go run receive_log.go
# shell2
go run emit_log.go