关于RabbitMQ你了解多少?
文章目录
- 关于RabbitMQ你了解多少?
- 基础篇
- 使用消息队列
- RabbitMQ
- RabbitMQ 7种用法类型
- Exchange 交换机
- 进阶篇
- 目录
- 客户端整合SpringBoot
- 消息可靠性投递
- 消费端限流
- 消息超时
- 死信和死信队列
- 延迟队列
- 事务消息
- 惰性队列
- 优先级队列
- 集群篇
- 工作机制
- 集群搭建
- 负载均衡
- 仲裁队列
- 流式队列
- 异地容灾
- 完整笔记总结
RabbitMQ是目前企业中应用非常广泛的高性能的异步通讯组件
基础篇
基础篇回顾
使用消息队列
- 同步功能耦合度高,异步功能解耦
- 同步响应时间长,响应总长,异步快速响应
- 同步并发压力传递,异步削峰限流
- 同步系统结果弹性不足,牵一发动全身,异步功能便于扩展
RabbitMQ
- 消息的发送端,生产者 Producer,和消息的接收端,消费者 Consumer
- 生产者发送消息给消费者建立连接,Connection,建立连接TCP三次握手,为实现Connection复用,在Connection中建立逻辑连接 Channel ,实际每次连接使用的是Connection中的Channel完成
- Broker就是RabbitMQ的主体,服务器本身,负责接收和分发消息,集群就是多个Broker
- 在Broker实例里面可根据划分多个逻辑上的分组,每个分组叫 Virtual Host,其中包含 Exchange 交换机,是消息到达Broker的第一站,之后发送到 Queue 队列上,队列是消息的容器,消息放在这里等待被消息取走
- 消息在交换机中只是中转,不去存储消息,存储消息需在队列中,消息发送到哪一个队列是需要 Binding 绑定关系,通过路由键方式将交换机和队列绑定到一起
RabbitMQ 7种用法类型
- Hello World 简化版Work Queues,一生产一消费
- Work Queues 一生产多消费,多消费竞争
- Publish/Subscribe 交换机绑定多队列
- Routing 交换机加入路由键,将消息确定的发送到某队列
- Topics 使用通配符匹配一个或多个路由键
- Remote procedure call(RPC) 同步调用
- Publisher Confirms 消息可靠性投递
Exchange 交换机
- 交换机 —》 路由键 —》 队列
- 交换机只负责转发消息,不具备存储消息的能力,若没任何队列与交换机绑定或没符合路由规则的队列,消息会丢失
- 交换机接收消息,如何处理消息取决交换机类型
- Work Queues 默认交换机发送到一个队列
- Publish/Subscribe指定交换机使用的是Fanout类型交换机,采用广播形式,发送给与他绑定的多个队列
- Routing指定交换机使用的是Direct类型交换机,采用定向形式,发送给符合指定routing key的队列
- Topics指定交换机使用的是Topic类型交换机,采用通配符形式,发送给符合指定routing pattern(路由模式) 的队列
进阶篇
目录
- 客户端整合SpringBoot
- 消息可靠性投递
- 消费端限流
- 消息超时
- 死信和死信队列
- 延迟队列
- 事务消息
- 惰性队列
- 优先级队列
客户端整合SpringBoot
-
搭建环境
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
基础设定:交换机名称、队列名称、绑定关系
-
发送消息:使用
RabbitTemplate
@Autowired private RabbitTemplate rabbitTemplate;@Test void send() {rabbitTemplate.convertAndSend("clw_sjzc_exchange", "clw_sjzc_routing_key", "hello world"); }
-
接收消息:使用
@RabbitListener
注解@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "test", durable = "true"),exchange = @Exchange(value = "test"),key = {"test"})) public void test(String msg, Message message, Channel channel) {System.out.println(msg); }@RabbitListener(queues = {"test"}) public void test(String msg, Message message, Channel channel) {System.out.println(msg); }
消息可靠性投递
故障研判,对症下药
-
消息没发送到消息队列
-
在生产者端进行确认,分别针对交换机和队列确认,若没成功发送到消息队列服务器上,可尝试重新发送
spring:rabbitmq:host: 192.168.0.150port: 5672username: rootpassword: rootvirtual-host: /connection-timeout: 10000#交换机确认publisher-confirm-type: CORRELATED#队列确认publisher-returns: true
@Configuration @Slf4j class RabbitConfig implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 方法会在spring容器初始化的时候执行* 当容器实例化一个带有@PostConstruct注解的@Bean时,会在调用构造函数之后,在依赖注入完成之前执行这个方法* @PostConstruct 使用条件* 1. 方法不能有任何参数* 2. 不能是静态方法* 3. 不能返回任何值*/@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {}@Overridepublic void returnedMessage(ReturnedMessage returned) {} }
-
为目标交换机指定备份交换机,当目标交换机投递失败,把消息投递至备份交换机
-
-
消息队列服务器宕机导致内存消息丢失
- 消息持久化到硬盘,服务器重启也不会导致消息丢失
-
消费端宕机或抛异常导致消息未成功消费
- 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息
- 消费端消费消息失败,给服务器返回NACK信息,同时把消息恢复为待消费状态,可再次取回消息重试(消息端接口支持幂等性)
spring:rabbitmq:host: 192.168.0.150port: 5672username: rootpassword: rootvirtual-host: /connection-timeout: 10000#交换机确认publisher-confirm-type: CORRELATED#队列确认publisher-returns: truelistener:simple:#消息确认自动变手动acknowledge-mode: manual
/*deliveryTag:交付标签机制消息进入队列为了做区分,broker会生成一个64位整数(long)唯一标识 deliveryTag消息往消费端投递时,携带交付标签消费端将消息处理结果 ACK、NACK、Reject 返回给Broker后,Broker对对应消息执行后续操作如 删除消息、重新排队、标记死信等,那Broker就必须直到现在操作的消息具体为哪一个若交换机是Fanout模式,同一消息广播到不同队列,deliveryTag也不会重复,deliveryTag是在Broker范围内唯一multiple:每个消息有自己的deliveryTag当 multiple 为 true,批量处理之前全部消息,当 multiple 为 false,单独处理当前消息 */ @RabbitListener(queues = "test") public void test(String msg, Message message, Channel channel) {Boolean redelivered = message.getMessageProperties().getRedelivered();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {channel.basicAck(deliveryTag, false);} catch (Exception e) {if (redelivered){System.out.println("重复消费,不再尝试");channel.basicNack(deliveryTag, false, false);}else {System.out.println("消费失败,再试一次");channel.basicNack(deliveryTag, false, true);}} }
消费端限流
削峰填谷
spring:rabbitmq:host: 192.168.0.150port: 5672username: rootpassword: rootvirtual-host: /connection-timeout: 10000#交换机确认publisher-confirm-type: CORRELATED#队列确认publisher-returns: truelistener:simple:#消息确认自动变手动acknowledge-mode: manual #每次从队列中取值数prefetch: 100
消息超时
给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除
以下分为两个层面,若都设置则哪个时间短哪个生效
- 队列层面:在队列层面设定消息的过期时间,并不是队列的过期时间,这个队列中的消息全部使用同一个过期时间
- 消息本身:给具体的某个消息设定过期时间
死信和死信队列
当一个消息无法被消费,就变成死信
- 产生原因
- 拒绝:消费者拒接消息,
basicNack()
/basicReject()
,并且不把消息重新放入原目标队列,requeue = false - 溢出:队列中消息数量达到限制,根据先进先出原则,将最早消息变成死信
- 超时:消息到达超时时间未被消费
- 拒绝:消费者拒接消息,
- 处理方式
- 丢弃:对不重要的消息直接丢弃,不做处理
- 入库:把死信写入数据库,日后处理
- 监听:消息变成死信进入死信队列,设置专用消费端监听死信队列,做后续处理(常用)
延迟队列
- 借助消息超时时间+死信队列
- 给RabbitMQ安装插件,最多延迟两天
事务消息
事务只针对生产者端,有局限性,控制缓存中的事务
惰性队列
未设置惰性模式时队列是持久化机制,创建队列时,在Durability 有两个选项可以选择
- Durable:持久化队列,消息存储在硬盘上
- 持久化时机:当Durable队列满时或者 broker关闭时,此时再往消息队列存入消息会暂时阻塞
- Transient:临时队列,不做持久化操作,broker重启后消息会消失
优先级队列
基于队列先进先出的特性,通常先入队列的消息先投递,设置优先级之后,优先级高的消息更大几率先投递。关键参数:x-max-priority
RabbitMQ 允许使用一个正整数给消息设定优先级,取值范围1~255,官方建议1 ~5之间设置优先级,优先级越高占用CPU、内存等资源越多
x-max-priority
指定最大优先级,默认为0。设置优先级参数不能大于这个值
集群篇
工作机制
- 基本诉求
- 避免单点故障、大流量场景分摊负载、数据同步
- 工作机制
- 先启动一个节点,然后让后续节点加入前面启动的节点,节点直接互相发现
集群搭建
负载均衡
仲裁队列
集群中有多个节点,仲裁队列Quorum Queue
将自动分散在各个节点
流式队列
在一个仅追加的日志文件中保存接收到的消息,每个消息分配一个偏移量Offset
,消息被消费端消费之后仍不删除,可以重复消费
指定偏移量消费,如同Kafka的指针一样,随着消息一直被消费,指针一直往后走
异地容灾
完整笔记总结
完整笔记总结