topic交换器(主题交换器)
发送到topic交换器
的消息不能具有随意的routing_key
——它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的routing_key
示例:“stock.usd.nyse”
,“nyse.vmw”
,“quick.orange.rabbit”
。routing_key
中可以包含任意多个单词,最多255个字节。
绑定键也必须采用相同的形式。topic交换器
背后的逻辑类似于direct交换器
——用特定路由键发送的消息将传递到所有匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况: - *
(星号)可以代替一个单词。 - #
(井号)可以替代零个或多个单词。
通过下面这个示例可以很容易看明白这一点:
在这个例子中,我们将发送一些都是描述动物的信息。将使用包含三个词(两个点)的路由密钥发送消息。路由键中的第一个单词将描述速度,第二个是颜色,第三个是种类:“<speed>.<colour>.<species>”
。
我们创建了三个绑定关系:Q1与绑定键“ * .orange. * ”绑定,Q2与“* .* .rabbit”和“lazy.#”绑定。
这些绑定可以总结为:
- Q1对所有橙色动物都感兴趣
- Q2想接收有关兔子(rabbit)的一切消息,以及有关懒惰(lazy)动物的一切消息。
路由键设置为“quick.orange.rabbit”
的消息将传递到两个队列。消息“lazy.orange.elephant”
也将发送给他们两个。另一方面,“quick.orange.fox”
将仅进入第一个队列,而“lazy.brown.fox”
将仅进入第二个队列。即使“lazy.pink.rabbit”
与两个绑定匹配(匹配Q2的两个绑定),也只会传递到第二个队列一次。 “quick.brown.fox”
与任何绑定都不匹配,因此将被丢弃。
如果我们打破约定并发送一个或四个单词的消息,例如“orange”
或“quick.orange.male.rabbit”
,会发生什么?好吧,这些消息将不匹配任何绑定,并且将会丢失。
另外,“lazy.orange.male.rabbit”
即使有四个单词,也将匹配最后一个绑定,并将其传送到第二个队列。
topic交换器
topic交换器功能强大,可以像其他交换器一样运行。
当队列用
“#”
(井号)绑定键绑定时,它将接收所有消息,而与路由键无关,就像在fanout
交换器中一样。当在绑定中不使用特殊字符
“*”
(星号)和“#”
(井号)时,topic交换器的行为就像direct交换器一样。
完整示例
我们将在日志记录系统中使用topic
交换器。我们将从一个可行的假设开始,即日志的路由键将包含两个词:“<facility>.<severity>”
。
emit_log_topic.go
的代码:
package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_topic", // name"topic", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs_topic", // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 3) || os.Args[2] == "" {s = "hello"} else {s = strings.Join(args[2:], " ")}return s
}func severityFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "anonymous.info"} else {s = os.Args[1]}return s
}
receive_logs_topic.go
的代码:
package mainimport ("log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_topic", // name"topic", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("", // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [binding_key]...", os.Args[0])os.Exit(0)}// 绑定topicfor _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_topic", s)err = ch.QueueBind(q.Name, // queue names, // routing key"logs_topic", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")}msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto ackfalse, // exclusivefalse, // no localfalse, // no waitnil, // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}
想要接收所有的日志:
go run receive_logs_topic.go "#"
要从“kern”
接收所有日志:
go run receive_logs_topic.go "kern.*"
或者,如果你只想接收“critical”
日志:
go run receive_logs_topic.go "*.critical"
你可以创建多个绑定:
go run receive_logs_topic.go "kern.*" "*.critical"
并发出带有路由键“kern.critical”
的日志:
go run emit_log_topic.go "kern.critical" "A critical kernel error"
你可以自己尝试玩一下这个程序。请注意,代码没有对路由键或绑定键进行任何假设,你可能希望使用两个以上的路由键参数。
源自:https://www.rabbitmq.com/getstarted.html