参考书籍:朱忠华的《RabbitMQ实战指南》
一、基础概念
1.Exchange
1.1 创建方法的参数,exchangeDeclare()
- exchange:交换器的名称
- type:交换器的类型
- durable:是否持久化,true代表持久化。(持久化会将交换器存入磁盘)
- autoDelete:是否自动删除,true表示自动删除。(当该交换器上绑定的最后一个队列/交换器解除绑定后,该交换器自动删除)
- internal:是否是内置的,true表示内置交换器。(生产者无法直接发消息给内置交换器,只能通过其他交换器路由到该交换器)
- argument:其他一些结构化的参数
1.2 交换器类型
1.2.1 fanout
- 把所有消息路由到与该交换器绑定的队列
1.2.2 direct
- 路由时需要BindingKey和RoutingKey完全匹配
1.2.3 topic
- 路由到BindingKey和RoutingKey相匹配的队列
- RoutingKey和BindingKey为以“.”分隔的字符串
- BindingKey中可以存在“*”(匹配一个单词),“#”(匹配多个单词,也可以是0个)
1.2.4 header
- 不依赖路由键的匹配规则来路由消息,而是根据发送的消息内容中的header属性进行匹配
1.3 备份交换器
- 生产者发送消息时未设置mandatory参数,若根据路由键没有符合的队列该消息将会丢失
- 如果设置mandatory参数则需要增加程序对应的处理逻辑,相对复杂
- 可以设置备份交换器来存储这部分消息,后面再根据需要进行处理这部分消息
2.Queue
2.1 创建方法的参数,queueDeclare()
- queue:队列的名称
- durable:是否持久化
- exclusive:是否排他,true表示排他。(队列仅对首次声明他的连接可见,并在该连接断开后自动删除)
- 排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)可以同时访问该队列
- autoDelete:是否自动删除,true表示自动删除。(至少有一个消费者连接到这个队列,之后所有与这个连接的消费者都断开时,才会自动删除)
- arguments:其他一些参数
- x-message-ttl:指定队列中消息的过期时间(以毫秒为单位)。超过指定时间的消息将被自动删除。
- x-max-length:指定队列中消息的最大数量。不设置的情况下,将没有长度限制,无限制地消耗系统资源。
- x-max-length-bytes:指定队列中消息的最大总字节数。当队列中的消息总字节数达到最大值时,新的消息将被拒绝或丢弃。
- x-expires:指定队列的过期时间(以毫秒为单位)。当队列在指定时间内未被使用时,将自动删除。
- x-dead-letter-exchange:指定一个交换机名称,用于接收被拒绝或过期的消息。这些消息将被重新路由到指定的交换机中。
- x-dead-letter-routing-key:指定一个路由键,用于重新路由被拒绝或过期的消息。
- x-overflow:队列消息数量超过限制后的策略模式。默认采用drop-head策略
- drop-head:新的消息将覆盖队列中最早的消息
- reject-publish:新的消息将被拒绝并丢弃
- reject-publish-dlx:新的消息将被拒绝并发送到死信交换机
- x-max-priority:队列消费的优先级顺序
- …
3.死信队列
DLX,全称Dead-Letter-Exchange,死信交换器。当消息在一个队列中变成死信后,会被重新发送到另一个交换器,这个交换器就是DLX,其绑定的队列称之为死信队列
3.1消息变成死信的情况
- 消息被拒绝且requeue设置为false
- 消息过期
- 队列达到最大长度,且x-overflow策略为reject-publish-dlx
3.2 流程图
4.延迟队列
延迟队列存储的对象是延迟消息,就是在某些特殊情况下,需要让消息在一定时间后再交给消费者进行消费的场景。
实现逻辑
- 和上面死信队列中的流程图一致,只是上图中消息进入死信队列的情况有三种,延迟队列只是通过消息过期来这一种情况来实现
- 延迟消息中,queue1不绑定消费者,消费者绑定queue2,等待消息过期后进行消费
5. 优先级队列
这类队列中具有高优先级的消息优先被消费
5.1 注意事项
- 首先queue需要设置x-max-priority属性,且messge也需要设置priority属性
- 如果只设置message的priority属性,不设置queue的x-max-priority属性。则消息的 priority 值只是一个消息属性,但不会影响消息的处理顺序
- 消息的优先级priority为0-9,值为0代表会被最后消费
- x-max-priority值代表该队列能识别priority的最大值,priority大于该值的消息都视同为x-max-priority的值
5.2 例子
- 如果队列的x-max-priority为5,分别有priority为0,1,2,3,4,5,6,7,8,9的消息若干条,将会以怎么样的顺序进行消费?
- 队列的x-max-priority为5,所以所有优先级大于5的消息(即优先级为6,7,8,9的消息)都将被视为优先级为5。
- 优先级为5(包括原本优先级为6,7,8,9的消息)的消息将首先被消费
- 然后消费优先级为4的消息
- 然后消费优先级为3的消息
- …
- 然后消费优先级为0的消息
6.持久化
6.1 分类
6.1.1 交换器的持久化
- 通过声明交换器的时候设置durable参数
- 如果不做持久化,重启服务后,交换器的元数据会丢失,但是消息不会丢失
6.1.2 队列的持久化
- 通过声明队列的时候设置durable参数
- 如果不做持久化,重启服务后,队列的元数据会丢失,消息数据也会丢失
6.1.3 消息的持久化
- 通过生产消息时候设置basicProperties的deliveryMode参数
6.2 注意事项
- 设置队列和消息的持久化,服务重启后,消息和队列依旧存在
- 只设置队列持久化,服务重启后,队列存在,但消息丢失
- 只设置消息持久化,服务重启后,队列消失,继而消息也会丢失
- 因为消息是存储在队列中的,如果队列丢失了,那么队列中的消息,无论是否持久化,都会丢失。
二、消息
1.生产消息
1.1 basicPublish()
- exchange:交换器名称
- routingKey:路由键
- props:消息的基本属性集
- contentType:设置消息的内容类型
- application/json --> json格式的数据
- application/xml --> XML格式
- text/plain --> 纯文本格式
- …
- contentEncoding:设置消息的内容编码
- header:对应交换器为header类型时使用
- deliveryMode:设置消息的投递模式,可选值为 1(非持久化)或 2(持久化)
- priority:设置消息的优先级,优先级由低到高为0 到 9,值为0代表最后消费。注意只有对应的队列设置了x-max-priority 参数才会根据优先级进行消费
- replyTo:设置消息的回复队列。指定一个队列,用于接收消息的回复或响应,消费者可以将回复发送到指定的队列中,使生产者能够获取到回复消息。
- expiration:设置消息的过期时间
- messageId:设置消息的唯一标识符
- timestamp:设置消息的时间戳
- type:描述消息的类型。消费者可以根据type属性的不同,分类处理不同type的消息
- contentType:设置消息的内容类型
- byte[] body:消息体
- mandatory:true表示交换器根据路由键找不到符合的队列时,会调用basicReturn将消息返回给生产者。false表示这种情况会把消息丢弃
- immediate:true表示交换机将消息路由到队列上且该队列上没有消费者时,那么该消息不会存入队列,当匹配的所有队列都没有消费者时,会通过basicReturn将消息返回给生产者
2.消费消息
2.1 消费模式
2.1.1 推模式:basicConsumer()
- queue:队列的名称
- autoAck:设置是否自动确认
- consumerTag:消费者标签。用来区分多个消费者
- noLocal:为true表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者
- exclusive:是否排他,消费者独占该队列,其他消费者无法同时访问该队列
- arguments:其他参数
- callback:设置回调函数
2.1.2 拉模式:basicGet()
2.2 消费端的确认与拒绝
2.2.1 确认
- 若消费者在订阅时设置autoAck为true,则发送出去的消息会自动被置为确认,然后删除该消息。
- 若消费者在订阅时设置autoAck为false,则发出的消息后,会等待消费者显式的回复确认信号后再删除该消息。
- 若在显式的回复确认信号前进程挂掉或者其他原因一直未发出,RabbitMQ会一直等待持有消息,等待消费者显式调用Basic.Ack为止
- 若autoAck为false,一直没收到确认信号,并且消费者已经断开连接,则会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者
2.2.2 拒绝
- 拒绝单条消息:basicReject()
- deliveryTag:消息的编号
- requeue:为true表示将该消息重新存入队列,以便发送给下一个消费者。为false表示把消息从队列中移除
- 批量拒绝消息:basicNack()
- deliveryTag
- multiple:false表示拒绝编号为delivery的这一条消息,为true表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息
- requeue
- 重新请求未确认的消息:basicRecover()
- requeue:消息未被确认的消息重新加入到队列后,为true表示同一条消息会被分配给与之前不同的消费者,为false表示同一条消息会被分配给与之前相同的消费者
3.消息过期时间TTL
3.1 实现方式
- 设置队列属性。x-message-ttl指定队列中消息的过期时间(以毫秒为单位)。超过指定时间的消息将被自动删除。
- 生产消息时候针对该消息设置过期时间。expiration设置消息的过期时间
3.2 注意事项
- 如果两种方法一起使用,消息的TTL以两者较小的数值为准
- 消息在队列中的生存时间超过TTL后,就会变成“死信”,不会再被消费者接收
- 在队列属性TTL方式中,消息过期后就会从队列中删除
- 在消息属性TTL方式中,消息过期后不回立即删除,因为消息只有在被投递到消费者的时候才会进行判定