RabbitMQ 是功能强大的开源消息代理。根据官网称:也是使用量最广泛的消息队列。就像他的口号“Messaging that just works”,开箱即用使用简单,支持多种消息传输协议(AMQP、STOMP、MQTT)。
一个应用程序或者服务如何使用RabbitMq呢?
首先会有生产者和消费者两个角色;生产者连接到rabbit代理服务,创建一条AMQP信道,然后把生成的消息,通过信道发布到交换器上,交换器根据路由规则(路由key)进行绑定到或者路由到队列上面。最后消息到达队列上中。消费者跟生产者一样需要先和rabbit代理服务器创建连接,同时创建一个消息管道,并订阅到队列上,进而从队列中获取消息,进行处理。这里面涉及到消息、交换器、绑定、队列几个重要的概念,下面会一一讲解。整个过程如图所示
消息
生产者创建消息,这里的消息是指?消息包含两个部分内容:有效载荷(payload)、标签(label)。有效载荷就是你想要传输的数据。而标签是描述了有效载荷,并且RabbitMQ用它来决定谁将获得消息的拷贝。其实通过上图你也会发现,不同于tcp协议,因为AMQP没有明确的接收方,只会用标签表述这条消息,然后把消息交给Rabbit。rabbit会根据标签把消息发送给感兴趣的接收方。
队列
消息最终到达队列中并等待消费。消费者通过AMQP的Basic.Consume命令订阅。这样做会将信道设置为接受模式,直到取消对队列的订阅为止。订阅之后,消费者在消费(或者拒绝)最近的接收的那条消息之后,就能从队列中自动的接收下一条消息。
注意:什么时候消息才会从队列中删除呢?这里涉及到一个消息确认的动作。消费者接收到的每一条消息都必须进行确认。才会从队列中删除。消费者可以通过AMQP的Basic.Ack命令显式地向rabbtmq发送一个确认,或者在订阅到队列的时候就将autoAck属性设置为true;如:autoAck: true,一旦消费者接收消息,rabbitmq会自动视其确认了消息。
如果消费者接收到消费1,然后在确认之前从rabbit断开连接,rabbitmq会认为这条消息没有分发,然后重新分发下一个订阅的消费者。这样做的好处,即使你的应用程序奔溃了,也可以确保消息会被发送给另一个消费者进行处理,或者等待你的程序恢复正常连接,继续消费。假设消费者A程序与rabbit断开了连接,消息进而会被消费者B进行消费处理。如下图
只要消费者不进行确认,rabbit将不会给该消费者发送消息,因为在上一条消息被确认之前,rabbit会认为这个消费者并没有准备好接收下一条消息的能力。
在没有办法正常确认消息,不能一直堵塞呀,比如消费者有bug。那就使用AMPQ的Basic.Reject命令;明确的拒绝这条消息,其中一个参数requeue如果设置了ture的话,Rabbit会把消息重新发给下一个订阅的消费者。
如果你检测到一条消息本身有错误而任何一个消费者都无法处理的时候,就可以把requeue设置为false,rabbitmq会把消息从队里中移除,而不会把他发送给新的消费者。
注意:这里你可以使用对拒绝的消息进行特殊处理,比如发送到死信队列或者专门收集的erro队里中。
小结:队列是amqp消息通信的基础模块
1.为消息提供的处所,消息在此等待消费
2.对负载均衡来说,队列是绝佳方案。只需附加一堆消费者,并让rabbitmq以循环的方式均匀地分配发来的消息。
3.队列是rabbit中消息的最后的终点。
交换器、绑定
我们知道消费者如何获取消息,那么现在的问题是,消息是如何到达队列的呢?消息发送到交换器,会根据确定的规则,RabbitMQ将会决定消息该投递到哪个队列。这些规则称为路由键(routing key)。队列通过路由键绑定到交换器。当你发送消息到代理服务器时,消息将拥有一个路由键。如:AMPQ的Basic.Publish方法,有个参数routingKey通过他指定。即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。
交换器有四种类型:direct、fanout、topic和headers;每种类型实现了不同的路由算法,前三个比较常用。
1.direct
这种模式非常简单:路由键匹配的话,消息就被投递到对应的队列。路由算法-使用路由键和队列名称同名进行路由消息。使用场景-直接把消息发送到指定队列时使用。
默认的direct交换器,不需要进行声明, 队列声明会自动绑定到默认的交换器上,并以队列名称作为路右键。使用以下代码发送消息申明的队列中。
channel.BasicPublish(exchange: "",routingKey: "hello",basicProperties: null,body: body);
2.fanout交换器
这种模式下,可以忽略routing key,唯一需要做的就是为新的消费者写一段代码,然后声明新的队列并将其绑定到fanout交换器上。当你发送一条消息到fanout交换器上,他会把消息投递给所有附加在此交换器的队里上。路由算法-消息会路由到绑定到交换器上的所有队列。使用场景-发布订阅的广播功能
2.topic交换器
这类交换器允许不同源头的消息到达同一个队列。路由算法-根据全部或部分路由键匹配将消息路由绑定的队列上。使用场景-根据某些条件广播到特定的队列上。
小结:
本文主要总结了 AMQP几个主要元素:交换器,绑定,队列。以及一个消息创建到消费者读取消费的过程。