RabbitMQ安装|使用|概念|Golang开发

手册:http://www.rabbitmq.com/getstarted.html

安装:http://www.rabbitmq.com/download.html

参考:http://blog.csdn.net/whycold/article/details/41119807

一.介绍

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Go、Python、Ruby。用于在分布式系统中存储转发消息。

二.安装

ubuntu直接下载deb文件安装,默认已经启动,sudo敲入:

sudo  rabbitmq-server start 
sudo lsof -i:5672 

启用插件,进入UI:

sudo rabbitmq-plugins enable rabbitmq_management

登录http://127.0.0.1:15672(默认guest只能localhost访问,要远程访问,需要使用可远程访问的管理员账号)

用户名:密码=guest:guest

三.使用

# 敲入查看帮助
sudo rabbitmqctl# 创建用户 sudo rabbitmqctl add_user 登录用户名 密码 # 可以创建管理员用户,负责整个MQ的运维 sudo rabbitmqctl set_user_tags 登录用户名 administrator # 可以创建RabbitMQ监控用户,负责整个MQ的监控 sudo rabbitmqctl set_user_tags 登录用户名 monitoring # 可以创建某个项目的专用用户,只能访问项目自己的virtual hosts sudo rabbitmqctl set_user_tags 登录用户名 management # 查看用户 sudo rabbitmqctl list_users # 授权 # 该命令使用户具有/这个virtual host中所有资源的配置、写、读权限以便管理其中的资源 # set_permissions [-p <vhostpath>] <user> <conf> <write> <read> # 其中,<conf> <write> <read>的位置分别用正则表达式来匹配特定的资源,如'^(amq\.gen.*|amq\.default)$'可以匹配server生成的和默认的exchange,'^$'不匹配任何资源 sudo rabbitmqctl set_permissions -p / 登录用户名 '.*' '.*' '.*' 

四.概念入门

1.ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。

Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。

ConnectionFactory为Connection的制造工厂。

Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

2.Queue

Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。

RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。

多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

3.消息的一些机制

3.1.消息确认Message acknowledgment

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。

这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑…

另外pub message是没有ack的。(??)

3.2.消息持久Message durability

如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。

3.3.提前取机制Prefetch count

前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。就是变慢而已。订阅模式如何平摊?这种模式是一个消费者一次性拿很多条消息?

4.Exchange

在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。

Exchange是按照什么逻辑将消息路由到Queue的?这个将在Binding一节介绍。

RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,这将在Exchange Types一节介绍。

5.Routing key

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。

在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。

RabbitMQ为routing key设定的长度限制为255 bytes。

6.Binding

RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。

6.1.Binding key

在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。

在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。 binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型(广播)的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。

7.Exchange Types

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),下面分别进行介绍。

7.1.fanout

fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

7.2.direct

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。

7.3.topic

前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

1. routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” 2. binding key与routing key一样也是句点号“. ”分隔的字符串 3. binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个) 

以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1与Q2,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey

7.4.headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。

8.RPC

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。

但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。

RabbitMQ中实现RPC的机制是:

  1. 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
  2. 服务器端收到消息并处理
  3. 服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
  4. 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理

五.Go 接口

http://www.rabbitmq.com/tutorials/tutorial-one-go.html

请看官方示例:

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/send.go

上面的例子仔细看,有必要看源码!

GO接口库

六.实例解释

四种模式

  1. DIRECT 默认点对点模式
  2. TOPIC 话题模式
  3. FANOUT 广播模式
  4. RPC RPC模式

工作队列:默认点对点模式

发布方,一个!

package mainimport ("fmt""log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { // 拨号,下面例子都一样 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 这个是最重要的 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 申明一个队列 // https://godoc.org/github.com/streadway/amqp#Channel.QueueDeclare q, err := ch.QueueDeclare( "task_queue", // name 有名字! true, // durable 持久性的,如果事前已经声明了该队列,不能重复声明 false, // delete when unused false, // exclusive 如果是真,连接一断开,队列删除 false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") body := bodyFrom(os.Args) // 发布 err = ch.Publish( "", // exchange 默认模式,exchange为空 q.Name, // routing key 默认模式路由到同名队列,即是task_queue false, // mandatory false, amqp.Publishing{ // 持久性的发布,因为队列被声明为持久的,发布消息必须加上这个(可能不用),但消息还是可能会丢,如消息到缓存但MQ挂了来不及持久化。 DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s } 

工作方,多个,拿发布方的消息

package mainimport ("bytes""fmt" "github.com/streadway/amqp" "log" "time" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 指定队列! q, err := ch.QueueDeclare( "task_queue", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // Fair dispatch 预取,每个工作方每次拿一个消息,确认后才拿下一次,缓解压力 err = ch.Qos( 1, // prefetch count // 待解释 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") // 消费根据队列名 msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack 设置为真自动确认消息 false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") // 确认消息被收到!!如果为真的,那么同在一个channel,在该消息之前未确认的消息都会确认,适合批量处理 // 真时场景:每十条消息确认一次,类似 d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever } 

发布-订阅:广播模式

发布方

package mainimport ("fmt""log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 默认模式有默认交换机,广播自己定义一个交换机,交换机可与队列进行绑定 err = ch.ExchangeDeclare( "logs", // name "fanout", // type 广播模式 true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) // 发布 err = ch.Publish( "logs", // exchange 消息发送到交换机,这个时候没队列绑定交换机,消息会丢弃 "", // routing key 广播模式不需要这个,它会把所有消息路由到绑定的所有队列 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s } 

订阅方

package mainimport ("fmt""log" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 同样要申明交换机 err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 新建队列,这个队列没名字,随机生成一个名字 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive 表示连接一断开,这个队列自动删除 false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 队列和交换机绑定,即是队列订阅了发到这个交换机的消息 err = ch.QueueBind( q.Name, // queue name 队列的名字 "", // routing key 广播模式不需要这个 "logs", // exchange 交换机名字 false, nil) failOnError(err, "Failed to bind a queue") // 开始消费消息,可开多个订阅方,因为队列是临时生成的,所有每个订阅方都能收到同样的消息 msgs, err := ch.Consume( q.Name, // queue 队列名字 "", // consumer true, // auto-ack 自动确认 false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever } 

高级路由 发布-订阅 新版:默认点对点模式

发布-订阅每个绑定的队列都收到一样的消息,现在不想!使用路由功能,队列绑定进行分发。

发布方

package mainimport ("fmt""log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 交换机申明,且类型为点对点默认 err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) // 发布 err = ch.Publish( "logs_direct", // exchange 发到这个交换机 severityFrom(os.Args), // routing key 且路由key是由命令行指定,如下方,指定了error false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "info" } else { s = os.Args[1] } return s } 

发消息到交换机,路由key为error

go run *.go error "Run. Run. Or it will explode."

消费方

package mainimport ("fmt""log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 惯例 err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 申明临时队列 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [info] [warning] [error]", os.Args[0]) os.Exit(0) } # 绑定队列和交换机,绑定多个路由key,见下方 for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) // 下面同个队列可以收到不同路由key的消息 ,广播模式除外! err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_direct", // exchange false, nil) failOnError(err, "Failed to bind a queue") } // 消费队列 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <forever } 

消费这些key:info warning error

go run *.go info warning error

话题 发布-订阅 新新版:话题模式

上面的路由都是标准的,就是固定字符串名字,话题模式可以使用类正则的路由,这样模糊匹配更棒!!

路由类似于这样 *.love.*

* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.

发布方

package mainimport ("fmt""log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 交换机,话题模式 err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) // 类似上面 err = ch.Publish( "logs_topic", // exchange severityFrom(os.Args), // routing key 路由可以不标准了 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "anonymous.info" } else { s = os.Args[1] } return s } 

命令运行

To receive all the logs:

go run *.go "#"

To receive all logs from the facility “kern”:

go run *.go "kern.*"

Or if you want to hear only about “critical” logs:

go run *.go "*.critical"

You can create multiple bindings:

go run *.go "kern.*" "*.critical"

消费方

package mainimport ("fmt""log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 同理 err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 临时队列 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [binding_key]...", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s) // 绑定也是类似的 err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_topic", // exchange false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <forever } 

命令运行

go run *.go "kern.critical" "A critical kernel error"

rpc:RPC模式

应答方

package mainimport ("fmt""log" "strconv" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func fib(n int) int { if n == 0 { return 0 } else if n == 1 { return 1 } else { return fib(n-1) + fib(n-2) } } func main() { // 拨号 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 声明匿名队列 q, err := ch.QueueDeclare( "rpc_queue", // name false, // durable false, // delete when usused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 公平分发 没有这个则round-robbin:https://segmentfault.com/a/1190000004492447 err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") // 消费,等待请求 msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { //请求来了 for d := range msgs { n, err := strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") log.Printf(" [.] fib(%d)", n) // 计算 response := fib(n) // 回答 err = ch.Publish( "", // exchange d.ReplyTo, // routing key 回答队列 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: d.CorrelationId, 序列号 Body: []byte(strconv.Itoa(response)), }) failOnError(err, "Failed to publish a message") // 确认回答完毕 d.Ack(false) } }() log.Printf(" [*] Awaiting RPC requests") <forever } 

请教方

package mainimport ("fmt""log" "math/rand" "os" "strconv" "strings" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func randomString(l int) string { bytes := make([]byte, l) for i := 0; i < l; i++ { bytes[i] = byte(randInt(65, 90)) } return string(bytes) } func randInt(min int, max int) int { return min + rand.Intn(max-min) } func fibonacciRPC(n int) (res int, err error) { // 拨号 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 队列声明 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive 为真即连接断开就删除 false, // noWait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 消费 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive 这个为真,服务器会认为这是该队列唯一的消费者 false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") corrId := randomString(32) // 请教! err = ch.Publish( "", // exchange "rpc_queue", // routing key 问题发到这里 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, 希望回答被发到这里 Body: []byte(strconv.Itoa(n)), }) failOnError(err, "Failed to publish a message") // 取答案 for d := range msgs { if corrId == d.CorrelationId { res, err = strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") break } } return } func main() { rand.Seed(time.Now().UTC().UnixNano()) n := bodyFrom(os.Args) log.Printf(" [x] Requesting fib(%d)", n) res, err := fibonacciRPC(n) failOnError(err, "Failed to handle RPC request") log.Printf(" [.] Got %d", res) } func bodyFrom(args []string) int { var s string if (len(args) < 2) || os.Args[1] == "" { s = "30" } else { s = strings.Join(args[1:], " ") } n, err := strconv.Atoi(s) failOnError(err, "Failed to convert arg to integer") return n } 

http://www.rabbitmq.com/tutorials/tutorial-six-go.html

七.属性详解,测试!

package mainimport ( "fmt" "github.com/streadway/amqp" "log" "time" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } // Queue属性测试 // // durable属性和auto-delete属性可以同时生效; // durable属性和exclusive属性会有性质上的冲突,两者同时设置时,仅exclusive属性生效; // auto_delete属性和exclusive属性可以同时生效; // // auto_delete如果有连接存在消费者订阅该http://www.lenggirl.com/tool/RabbitMQ.htmlqueue,正常,如果消费者全部消失,自动删除队列 // 可以在没有创建consumer的情况下,创建出具有auto-delete属性的queue。 // // exclusive,如果声明该队列的连接断开,自动删除队列 // queue的存在条件是在声明该队列的连接上存在某个consumer订阅了该queue。 // func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 默认模式有默认交换机,广播自己定义一个交换机,交换机可与队列进行绑定 /* ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags. ExchangeDeclare方法在服务器声明一个exchange。如果不存在,新建一个,存在的话则确认type和durability和auto-delete的标志是否一致。 Errors returned from this method will close the channel. 如果方法返回错误,channel会被关闭。 Exchange names starting with "amq." are reserved for pre-declared and standardized exchanges. The client MAY declare an exchange starting with "amq." if the passive option is set, or the exchange already exists. Names can consists of a non-empty sequence of letters, digits, hyphen, underscore, period, or colon. 名字以"amq."开头的Exchange是为之前已经声明和标准化的exchange们保留的, 在exchange已经存在的情况下,或者passive选项设置为真,客户端才有可能声明一个这样的exchange。 exchange的名字是一个非空序列,仅能包含字母,数字,连字符-,下划线_,句号.,冒号: 另外的方法ExchangeDeclarePassive主要用来检测exchange是否已经存在。 Each exchange belongs to one of a set of exchange kinds/types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. Once an exchange is declared, its type cannot be changed. The common types are "direct", "fanout", "topic" and "headers". Durable and Non-Auto-Deleted exchanges will survive server restarts and remain declared when there are no remaining bindings. This is the best lifetime for long-lived exchange configurations like stable routes and default exchanges. Non-Durable and Auto-Deleted exchanges will be deleted when there are no remaining bindings and not restored on server restart. This lifetime is useful for temporary topologies that should not pollute the virtual host on failure or after the consumers have completed. Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is running including when there are no remaining bindings. This is useful for temporary topologies that may have long delays between bindings. Durable and Auto-Deleted exchanges will survive server restarts and will be removed before and after server restarts when there are no remaining bindings. These exchanges are useful for robust temporary topologies or when you require binding durable queues to auto-deleted exchanges. Note: RabbitMQ declares the default exchange types like 'amq.fanout' as durable, so queues that bind to these pre-declared exchanges must also be durable. Exchanges declared as `internal` do not accept accept publishings. Internal exchanges are useful for when you wish to implement inter-exchange topologies that should not be exposed to users of the broker. When noWait is true, declare without waiting for a confirmation from the server. The channel may be closed as a result of an error. Add a NotifyClose listener to respond to any exceptions. Optional amqp.Table of arguments that are specific to the server's implementation of the exchange can be sent for exchange types that require extra parameters. func (me *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error { */ err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil) failOnError(err

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/540998.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MS的完整形式是什么?

硕士&#xff1a;理学硕士/外科硕士/ MicroSoft (MS: Master of Science / Master of Surgery / MicroSoft) 1)硕士&#xff1a;理学硕士 (1) MS: Master of Science) MS is an abbreviation of Master of Science. It is a masters degree program provided by universities i…

人工智能能够构建一个自主驱动云吗?

企业和组织可以从云计算中受益&#xff0c;但许多公司并不希望面对公共云的成本&#xff0c;性能和治理问题&#xff0c;并且认为构建自己的私有云的复杂性和运营开销并没有那么困难。 如今&#xff0c;一些云计算供应商正在使用人工智能&#xff08;AI&#xff09;来简化私有云…

前端必备的 web 安全知识手记

前言 安全这种东西就是不发生则已&#xff0c;一发生则惊人。作为前端&#xff0c;平时对这方面的知识没啥研究&#xff0c;最近了解了下&#xff0c;特此沉淀。文章内容包括以下几个典型的 web 安全知识点&#xff1a;XSS、CSRF、点击劫持、SQL 注入和上传问题等&#xff08;…

php属于脚本,php是脚本语言吗

PHP即“超文本预处理器”&#xff0c;是一种通用开源脚本语言。PHP是在服务器端执行的脚本语言&#xff0c;与C语言类似&#xff0c;是常用的网站编程语言。PHP独特的语法混合了C、Java、Perl以及 PHP 自创的语法。利于学习&#xff0c;使用广泛&#xff0c;主要适用于Web开发领…

NetMarketShare:本月桌面浏览器市场份额几乎没有变化

NetMarketShare之前关于台式机浏览器市场份额的报告表示&#xff0c;Google Chrome市场份额正在快速上升&#xff0c;而Edge浏览器市场份额正在以蜗牛的速度前进。而该公司的最新统计数据显示&#xff0c;几乎所有浏览器的市场份额或多或少保持不变。 NetMarketShare的最新统计…

lnmp解析php,LNMP之 php解析

[rootLNMP ~]# vim /usr/local/nginx/conf/nginx.conf打开以下PHP 相关项且更改 scripts$fastcgi_script_name;> /usrlocal/nginx/html$fastcgi_script_name;location ~ \.php$ {root html;fastcgi_pass 127.0.0.1:9000;fastcgi_index index.php;fastcgi_param…

计算机网络中的传输协议是_计算机网络中的传输方式

计算机网络中的传输协议是传输方式 (Transmission Modes) The mechanism of transferring data or information between two linked devices connected over a network is referred to as Transmission Modes. 在通过网络连接的两个链接的设备之间传输数据或信息的机制称为传输…

https 密钥 php,https加密方式是什么

Https加密介绍Http直接通过明文在浏览器和服务器之间传递消息&#xff0c;容易被监听抓取到通信内容。Https采用对称加密和非对称加密结合的方式来进行通信。Https不是应用层的新协议&#xff0c;而是Http通信接口用SSL和TLS来加强加密和认证机制。加密方式对称加密&#xff1a…

java get post 注解,GET/POST接收或发送数据的问题

在文章开始&#xff0c;先来回忆一下GET、POST这两种请求方式的区别。❈Http定义了与服务器交互的不同方法&#xff0c;最基本的方法有4种&#xff0c;分别是GET&#xff0c;POST&#xff0c;PUT&#xff0c;DELETE。URL全称是资源描述符&#xff0c;我们可以这样认为&#xff…

Apple新发布的APFS文件系统对用户意味着什么

2016年WWDC大会上&#xff0c;Apple除了公布watchOS、tvOS、macOS以及iOS等一系列系统和软件更新外&#xff0c;还公布了一个名为APFS&#xff08;Apple File System&#xff09;的文件系统。 这一全新文件系统专门针对闪存/SSD进行优化&#xff08;但依然可用于传统机械硬盘&a…

tps 交易量_交易处理系统(TPS)

tps 交易量A transaction is a simple process that takes place during business operations. The transaction processing system (TPS) manages the business transactions of the client and therefore helps a companys operations. A TPS registers, as well as all of i…

CYQ.Data 轻量数据层之路 自定义MDataTable绑定续章(七)

本章起&#xff0c;将续章讲解整框架当初的设计思路&#xff1a; 本章既为续章&#xff0c;说明我以前写过&#xff0c;是的&#xff0c;以前我写过内部整个MDataTable的构造&#xff0c;不过&#xff0c;当初匆匆写完后&#xff0c; 最后一步的实现MDataTable绑定GridView/Dat…

php 文字超出画布,input实现文字超出省略号(代码示例)

本篇文章给大家带来的内容是关于input实现文字超出省略号(代码示例)&#xff0c;有一定的参考价值&#xff0c;有需要的朋友可以参考一下&#xff0c;希望对你有所帮助。input实现文字省略号功能普通元素实现文字超出宽度自动变成省略号非常简单&#xff0c;给元素加个宽度&…

排序算法系列:插入排序算法

概述 直接插入排序&#xff08;Straight Insertion Sort&#xff09;的基本操作是将一个记录插入到已经排好序的有序表中&#xff0c;从而得到一个新的、记录数增1的有序表。 – 《大话数据结构》 版权说明 著作权归作者所有。商业转载请联系作者获得授权&#xff0c;非商业转载…

php点击复制按钮到我的粘贴板,js实现点击复制当前文本到剪贴板功能(兼容所有浏览器)...

最近做项目时&#xff0c;在网站框架搭建过程&#xff0c;有一个功能需要实现复制文本到剪贴板&#xff0c;相信这个功能很常用&#xff0c;但是对于不常写JS代码的我来说是一个比较大的挑战&#xff0c;回想以前做过的一个站点&#xff0c;使用window.clipboardData实现复制到…

[Phonegap+Sencha Touch] 移动开发77 Cordova Hot Code Push插件实现自己主动更新App的Web内容...

原文地址&#xff1a;http://blog.csdn.net/lovelyelfpop/article/details/50848524 插件地址&#xff1a;https://github.com/nordnet/cordova-hot-code-push 以下是我对GitHub项目readme的翻译 ——————————————————————————————————————…

java 如何重写迭代器,如何用Java按需定制自己的迭代器

编写自己的迭代器的流程是&#xff1a;首先实现Iterable接口&#xff0c;进而实现该接口中的Iterator iterator()方法&#xff0c;该方法返回接口Iterator&#xff0c;Iterator接口中封装了next&#xff0c;hasnext&#xff0c;remove等方法。实现了Iterable接口的类能够通过fo…

php整合支付宝,Thinkphp5.0整合支付宝在线下单

thinkphp5.0支付宝在线支付下单整个流程&#xff0c;包括创建订单、支付成功回调更新订单状态、最终跳转到商户订单详情页查看演示下载资源&#xff1a;17次 下载资源下载积分&#xff1a;998积分支付宝在线支付控制器代码 public function alipay() {//发起支付宝支付$order_n…

php怎么引用表单元素,表单元素:最全的各种html表单元素获取和使用方法总结...

表单是网页与用户的交互工具&#xff0c;由一个元素作为容器构成&#xff0c;封装其他任何数量的表单控件&#xff0c;还有其他任何元素里可用的标签&#xff0c;表单能够包含、、、、、等表单控件元素。表单元素有哪些呢&#xff1f;它包含了如下的这些元素&#xff0c;输入文…

数据中心部署气流遏制系统需要考虑的十大要素

数据中心气流遏制策略能够大幅提高传统数据中心制冷系统的可预测性和效率。事实上&#xff0c;绿色网格组织&#xff08;The Green Grid&#xff09;将气流管理策略称作“实施数据中心节能计划的起点”。但是&#xff0c;大多数已有数据中心由于受各种条件的制约&#xff0c;只…