五.RocketMQ理论及常见问题处理方案

RocketMQ的架构理论及底层原理

    • 一:生产消息
      • 1.消息生产过程
      • 2.Queue选择算法
    • 二:存储消息
      • 2.1存储介质
      • 2.2消息的存储和发送
      • 2.3消息存储结构
      • 2.4刷盘机制
    • 三:消费消息
      • 1 获取消费类型
      • 2 消费模式
      • 3 Rebalance机制
      • 4.Queue分配算法
    • 四:消息清理
    • 五:订阅关系的一致性
      • 1.正确订阅关系
      • 2.错误订阅关系
    • 六:消费幂等
      • 1.什么是消费幂等
      • 2.消息重复的场景
      • 3.通用解决方法
    • 七:消息堆积与消费延迟
      • 1.概念
      • 2.产生原因分析
      • 3.消费耗时
      • 4.消费并发度
      • 5.避免方法
    • 八:Rocket mq的高可用
      • 1.消息消费高可用
      • 2.消息发送高可用
      • 3.消息主从复制
    • 九:负载均衡
      • 1.Producer负载均衡
      • 2.Consumer负载均衡
    • 十:消息重试
      • 1.顺序消息的重试
      • 2.无序消息的重试
    • 十一:死信队列

一:生产消息

1.消息生产过程

Producer可以将消息写入到某Broker中的某Queue中,过程如下:

  • Producer发送消息之前,会先向NameServer发出获取消息Topic的路由信息的请求
  • NameServer返回该Topic的路由表及Broker列表
  • Producer根据代码中指定的Queue选择策略,从Queue列表中选出一个队列,用于后续存储消息
  • Produer对消息做一些特殊处理,例如,消息本身超过4M,则会对其进行压缩
  • Producer向选择出的Queue所在的Broker发出RPC请求,将消息发送到选择出的Queue
    在这里插入图片描述

路由表:实际是一个Map,key为Topic名称,value是一个QueueData实例列表。QueueData并不是一个Queue对应一个QueueData,而是一个Broker中该Topic的所有Queue对应一个QueueData。即,只要涉及到该Topic的Broker,一个Broker对应一个QueueData。QueueData中包含brokerName。简单来说,路由表的key为Topic名称,value则为所有涉及该Topic的BrokerName列表。

Broker列表:其实际也是一个Map。key为brokerName,value为BrokerData。不是一个Broker对应一个BrokerData实例。而是一套brokerName名称相同的Master-Slave小集群对应一个BrokerData。BrokerData中包含brokerName及一个map。该map的key为brokerId,value为该broker对应的地址。brokerId为0表示该broker为Master,非0表示Slave。

2.Queue选择算法

对于无序消息,其Queue选择算法,也称为消息投递算法,常见的有两种:

  • 轮询算法
    默认选择算法。该算法保证了每个Queue中可以均匀的获取到消息。

该算法存在一个问题:在某些Broker上的Queue可能投递延迟较严重。从而导致Producer的缓存队列中出现较大的消息积压,影响消息的投递性能。

  • 最小投递延迟算法
    该算法会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的Queue。如果延迟相同,则采用轮询算法投递。该算法可以有效提升消息的投递性能。

该算法也存在一个问题:消息在Queue上的分配不均匀。投递延迟小的Queue其可能会存在大量的消息。而对该Queue的消费者压力会增大,降低消息的消费能力,可能会导致MQ中消息的堆积。

二:存储消息

分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。RocketMQ中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的store目录中。
在这里插入图片描述

  • abort:该文件在Broker启动后会自动创建,正常关闭Broker,该文件会自动消失。
  • checkpoint:其中存储着commitlog、consumequeue、index文件的最后刷盘时间戳
  • commitlog:其中存放着commitlog文件,而消息是写在commitlog文件中的
  • config:存放着Broker运行期间的一些配置数据
  • consumequeue:其中存放着consumequeue文件,队列就存放在这个目录中
  • index:其中存放着消息索引文件indexFile
  • lock:运行期间使用到的全局资源锁

在这里插入图片描述

  • 消息生成者发送消息
  • MQ收到消息,将消息进行持久化,在存储中新增一条记录
  • 返回ACK给生产者
  • MQ push 消息给对应的消费者,然后等待消费者返回ACK
  • 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
  • MQ删除消息

2.1存储介质

  • 关系型数据库DB
    ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈,若DB库宕机,MQ数据将无法存储丢失。
  • 文件系统
    RocketMQ采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。

文件系统>关系型数据库DB

2.2消息的存储和发送

1)消息存储
目前的高性能磁盘,顺序写速度可以达到600MB/s。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。
2)消息发送
一台服务器 把本机磁盘文件的内容发送到客户端,一般进行了4 次数据复制:

  1. 从磁盘复制数据到内核态内存;
  2. 从内核态内存复 制到用户态内存;
  3. 然后从用户态 内存复制到网络驱动的内核态内存;
  4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。
    在这里插入图片描述
    通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的
采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因

2.3消息存储结构

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

  • CommitLog:存储消息的元数据
  • ConsumerQueue:存储消息在CommitLog的索引
  • IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

2.4刷盘机制

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
在这里插入图片描述
1)同步刷盘
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
2)异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
3)配置
同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。

三:消费消息

消费者从Broker中获取消息的方式有两种:pull拉取方式和push推动方式。消费者组对于消息消费的模式又分为两种:集群消费和广播消费。

1 获取消费类型

拉取式消费
Consumer主动从Broker中拉取消息,主动权由Consumer控制。一旦获取了批量消息,就会启动消费过程。不过,该方式的实时性较弱,即Broker中有了新的消息时消费者并不能及时发现并消费。

由于拉取时间间隔是由用户指定的,所以在设置该间隔时需要注意平稳:间隔太短,空请求比例会增加;间隔太长,消息的实时性太差。

推送式消费
该模式下Broker收到数据后会主动推送给Consumer。该获取方式一般实时性较高。

该获取方式是典型的发布-订阅模式,即Consumer向其关联的Queue注册了监听器,一旦发现有新的消息到来就会触发回调的执行,回调方法是Consumer去Queue中拉取消息。而这些都是基于Consumer与Broker间的长连接的。长连接的维护是需要消耗系统资源的。

对比

  • pull:需要应用去实现对关联Queue的遍历,实时性差;但便于应用控制消息的拉取
  • push:封装了对关联Queue的遍历,实时性强,但会占用较多的系统资源

实现pull和push模式可以分别通过DefaultMQPullConsumer和DefaultMQPushConsumer来实现

2 消费模式

广播消费
在这里插入图片描述
广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条消息都会被发送到Consumer Group中的每个Consumer。

集群消费
在这里插入图片描述
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊同一个Topic的消息。即每条消息只会被发送到Consumer Group中的某个Consumer。

消息进度保存

  • 广播模式:消费进度保存在consumer端。因为广播模式下consumer group中每个consumer都会消费所有消息,但它们的消费进度是不同。所以consumer各自保存各自的消费进度。
  • 集群模式:消费进度保存在broker中。consumer group中的所有consumer共同消费同一个Topic中的消息,同一条消息只会被消费一次。消费进度会参与到了消费的负载均衡中,故消费进度是 需要共享的。

消费模式可通过@RocketMQMessageListener里配置实现
MessageModel.CLUSTERING:负载均衡模式(集群模式)。MessageModel.BROADCASTING:广播模式。

3 Rebalance机制

Rebalance机制的前提是:集群消费。

Rebalance即再均衡,指的是,将⼀个Topic下的多个Queue在同⼀个Consumer Group中的多个Consumer间进行重新分配的过程
在这里插入图片描述
Rebalance机制是为了提升消息的并行消费能力。例如,⼀个Topic下5个队列,在只有1个消费者的情况下,这个消费者将负责消费这5个队列的消息。如果此时我们增加⼀个消费者,那么就可以给其中⼀个消费者分配2个队列,给另⼀个分配3个队列,从而提升消息的并行消费能力。

Rebalance限制:
由于⼀个队列最多分配给⼀个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列。
Rebalance危害:
1.消费暂停:在只有一个Consumer时,其负责消费所有队列;在新增了一个Consumer后会触发Rebalance的发生。此时原Consumer就需要暂停部分队列的消费,等到这些队列分配给新的Consumer后,这些暂停消费的队列才能继续被消费。
2.消费重复:Consumer 在消费新分配给自己的队列时,必须接着之前Consumer 提交的消费进度的offset继续消费。然而默认情况下,offset是异步提交的,这个异步性导致提交到Broker的offset与Consumer实际消费的消息并不一致。这个不一致的差值就是可能会重复消费的消息。

同步提交:consumer提交了其消费完毕的一批消息的offset给broker后,需要等待broker的成功ACK。当收到ACK后,consumer才会继续获取并消费下一批消息。在等待ACK期间,consumer是阻塞的。
异步提交:consumer提交了其消费完毕的一批消息的offset给broker后,不需要等待broker的成功ACK。consumer可以直接获取并消费下一批消息。

3.消费突刺:由于Rebalance可能导致重复消费,如果需要重复消费的消息过多,或者因为Rebalance暂停时间过长从而导致积压了部分消息。那么有可能会导致在Rebalance结束之后瞬间需要消费很多消息。

Rebalance产生的原因:
导致Rebalance产生的原因,无非就两个:消费者所订阅Topic的Queue数量发生变化,或消费者组中消费者的数量发生变化。

Consumer实例在接收到Rebalance产生通知后会采用Queue分配算法自己获取到相应的Queue,即由Consumer实例自主进行Rebalance。

4.Queue分配算法

一个Topic中的Queue只能由Consumer Group中的一个Consumer进行消费,而一个Consumer可以同时消费多个Queue中的消息。那么Queue与Consumer间的配对关系是如何确定的,即Queue要分配给哪个Consumer进行消费,也是有算法策略的。常见的有四种策略。这些策略是通过在创建Consumer时的构造器传进去的。
1.平均分配策略
在这里插入图片描述

该算法是要根据avg = QueueCount / ConsumerCount 的计算结果进行分配的。如果能够整除,则按顺序将avg个Queue逐个分配Consumer;如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配。
2.环形平均策略
在这里插入图片描述

环形平均算法是指,根据消费者的顺序,依次在由queue队列组成的环形图中逐个分配。
3.一致性hash策略
在这里插入图片描述

该算法会将consumer的hash值作为Node节点存放到hash环上,然后将queue的hash值也放到hash环上,通过顺时针方向,距离queue最近的那个consumer就是该queue要分配的consumer。

该算法存在的问题:分配不均。
存在的意义:其可以有效减少由于消费者组扩容或缩容所带来的大量的Rebalance。

4.同机房策略
在这里插入图片描述
该算法会根据queue的部署机房位置和consumer的位置,过滤出当前consumer相同机房的queue。然后按照平均分配策略或环形平均策略对同机房queue进行分配。如果没有同机房queue,则按照平均分配策略或环形平均策略对所有queue进行分配。

四:消息清理

消息被消费过后不会被立即清理掉,消息是被顺序存储在commitlog文件的,会以commitlog文件为单位进行清理。
自动清理文件场景:

  • 文件过期,且到达清理时间点(默认为凌晨4点)后
  • 文件过期,且磁盘空间占用率已达过期清理警戒线(默认75%)后
  • 磁盘占用率达到清理警戒线(默认85%)后,按照规则清理文件,默认从最老文件开始清理
  • 磁盘占用率达到系统危险警戒线(默认90%)后,Broker将拒绝消息写入

五:订阅关系的一致性

订阅关系的一致性指的是,同一个消费者组(Group ID相同)下所有Consumer实例所订阅的Topic与Tag及对消息的处理逻辑必须完全一致。否则,消息消费的逻辑就会混乱,甚至导致消息丢失。

1.正确订阅关系

多个消费者组订阅了多个Topic,但每个消费者组里的多个消费者实例的订阅关系topic与tag均一致。
在这里插入图片描述

2.错误订阅关系

一个消费者组里的多个Consumer实例的订阅关系Topic与Tag都不一致。

  • 订阅了不同Topic
  • 订阅了不同Tag
  • 订阅了不同数量的Topic
    在这里插入图片描述

六:消费幂等

1.什么是消费幂等

当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的。

2.消息重复的场景

  • 发送时消息重复
    当一条消息已被成功发送到Broker并完成持久化,此时出现了网络波动,从而导致Broker对Producer应答失败。 如果此时Producer意识到消息发送失败并尝试再次发送消息,此时Broker中就可能会出现两条内容相同并且Message ID也相同的消息,那么后续Consumer就一定会消费两次该消息。
  • 消费时消息重复
    消息已投递到Consumer并完成业务处理,当Consumer给Broker反馈应答时网络波动,Broker没有接收到消费成功响应。为了保证消息至少被消费一次的原则,Broker将在网络恢复后再次尝试投递之前已被处理过的消息。此时消费者就会收到与之前处理过的内容相同、Message ID也相同的消息。
  • Rebalance时消息重复
    当Consumer Group中的Consumer数量发生变化时,或其订阅的Topic的Queue数量发生变化时,会触发Rebalance,此时Consumer可能会收到曾经被消费过的消息。

3.通用解决方法

幂等解决方案的设计中涉及到两要素:幂等令牌与唯一性处理。

  • 幂等令牌:是生产者和消费者两者中的既定协议,通常指具备唯⼀业务标识的字符串。例如,订单号、流水号。一般由Producer随着消息一同发送来的。
  • 唯一性处理:服务端通过采用⼀定的算法策略,保证同⼀个业务逻辑不会被重复执行成功多次。例如,对同一笔订单的多次支付操作,只会成功一次。

幂等性操作的通用性解决方法:

  1. 首先通过缓存去重。在缓存中如果已经存在了某幂等令牌,则说明本次操作是重复性操作;若缓存没有命中,则进入下一步。
  2. 在唯一性处理之前,先在数据库中查询幂等令牌作为索引的数据是否存在。若存在,则说明本次操作为重复性操作;若不存在,则进入下一步。
  3. 在同一事务中完成三项操作:业务唯一性处理后,将幂等令牌写入到缓存,并将幂等令牌作为唯一索引的数据写入到DB中。

第1步已经判断过是否是重复性操作了,第2步还要再次判断原因:
一般缓存中的数据是具有有效期的。缓存中数据的有效期一旦过期,就是发生缓存穿透,使大量请求直接到达数据库

七:消息堆积与消费延迟

1.概念

消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来越多(进的多出的少),这部分消息就被称为堆积消息。消息出现堆积进而会造成消息的消费延迟。

以下场景需要重点关注消息堆积和消费延迟问题:

  • 业务系统上下游能力不匹配造成的持续堆积,且无法自行恢复。
  • 业务系统对消息的消费实时性要求较高,即使是短暂的堆积造成的消费延迟也无法接受(短信验证码)。

2.产生原因分析

在这里插入图片描述
Consumer使用长轮询Pull模式消费消息时,有两个阶段:

  • 消息拉取
    Consumer通过长轮询Pull模式批量拉取的方式从服务端获取消息,将拉取到的消息缓存到本地缓冲队列中。该阶段吞吐量较高,一般不会是消息堆积的瓶颈

  • 消息消费
    Consumer将本地缓冲队列的消息提交到消费线程中,使用消费业务逻辑对消息进行处理。此时Consumer的消费能力就完全依赖于消息的消费耗时消费并发度。若业务处理逻辑复杂,导致处理单条消息的耗时较长,则整体的消息吞吐量肯定不会高,就会导致Consumer本地缓冲队列达到上限,停止从服务端拉取消息。

结论:
消息堆积的主要瓶颈在于客户端的消费能力,而消费能力由消费耗时和消费并发度决定。一般优先优化消费耗时问题

3.消费耗时

影响消息处理时长的主要因素是代码逻辑。而代码逻辑中可能会影响处理时长代码主要有两种类型:

  • CPU内部计算型代码
    主要是代码中复杂的递归和循环
  • 外部I/O操作型代码
    读写数据库,读写缓存系统,下游系统调用

内部计算相对外部I/O操作来说耗时几乎可以忽略。

4.消费并发度

一般情况下,消费者的消费并发度由单节点线程数(单个Consumer的线程数)和节点数量(即Consumer Group所包含的Consumer数)共同决定,其值为单节点线程数*节点数量。

注意:顺序消息的消费并发度比较特殊,由于需要顺序消费,则对于全局顺序消息而言,并发度为1。对于分区顺序消息,Topic有多个Queue分区,其仅可以保证该Topic的每个Queue分区中的消息被顺序消费,则并发度为Topic的分区数量。

5.避免方法

需要在前期设计阶段对整个业务逻辑进行梳理。其中最重要的就是梳理消息的消费耗时和设计消息消费的并发度。

  1. 梳理消息的消费耗时
    通过压测获取消息的消费耗时,对逻辑进行优化评估:
    1)消息消费逻辑计算复杂度是否过高,代码是否存在无限循环和递归等缺陷。
    2)消息消费逻辑中的I/O操作是否是必须的,能否用本地缓存等方案规避。
    3)消费逻辑中的复杂耗时的操作是否可以做异步化处理。
  2. 设置消费并发度
    1)逐步调大单个Consumer节点的线程数,并观测节点的系统指标,得到单节点最优的消费线程数和消息吞吐量。
    2)根据上下游流量峰值计算出需要设置的节点数(消费者数)
    节点数 = 流量峰值 / 单个节点消息吞吐量

八:Rocket mq的高可用

在这里插入图片描述
RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。

Master角色的Broker支持读和写,Slave角色的Broker仅支持读,即Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。

1.消息消费高可用

Consumer是从Master读还是从Slave 读,不需做任何操作,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。

2.消息发送高可用

在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(即搭建Broker主从集群,如多主多从),这样当一个Broker组的Master不可 用后,其他组的Master仍然可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足,需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。

注:如果向某一个broker发送失败后,会设置一个规避策略,如接下来5分钟内轮询选择队列时,发生错误的broker中的队列将不参与队列负载

3.消息主从复制

一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式:

  • 同步复制
    Master和Slave均写成功后才反馈给客户端写成功状态;
    优缺点:如果Master出故障, Slave上有全部的备份数据,易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量
  • 异步复制
    只要Master写成功即反馈给客户端写成功状态。
    优缺点:系统拥有较低的延迟和较高的吞吐量,但Master出了故障,有些数据因为没有被写入Slave,可能会丢失;

同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,可设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个

通常配置为:异步刷盘,同步复制

九:负载均衡

1.Producer负载均衡

每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下

2.Consumer负载均衡

1)集群模式
在集群消费模式下,每条消息会投递到订阅这个topic的Consumer Group下的一个实例。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
2)广播模式
一条消息需要投递到一个消费组下面所有的消费者实例

十:消息重试

1.顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

2.无序消息的重试

对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,可以通过设置返回状态达到消息重试的结果。无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

1)重试次数
RocketMQ 默认允许每条消息最多重试 16 次,如果消息重试 16 次后仍然失败,消息将不再投递。
2)配置方式

  • 消费失败后,重试配置方式

    集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

    • 返回 Action.ReconsumeLater (推荐)
    • 返回 Null
    • 抛出异常
    public class MessageListenerImpl implements MessageListener {@Overridepublic Action consume(Message message, ConsumeContext context) {//处理消息doConsumeMessage(message);//方式1:返回 Action.ReconsumeLater,消息将重试return Action.ReconsumeLater;//方式2:返回 null,消息将重试return null;//方式3:直接抛出异常, 消息将重试throw new RuntimeException("Consumer Message exceotion");}
    }
    
  • 消费失败后,不重试配置方式
    集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 Action.CommitMessage,此后这条消息将不会再重试。

    public class MessageListenerImpl implements MessageListener {@Overridepublic Action consume(Message message, ConsumeContext context) {try {doConsumeMessage(message);} catch (Throwable e) {//捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;return Action.CommitMessage;}//消息处理正常,直接返回 Action.CommitMessage;return Action.CommitMessage;}
    }
    

十一:死信队列

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列(死信队列)中。

死信消息特性

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列特性

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/42710.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

html+css+JavaScript 实现两个输入框的反转动画

开发时遇到了一个输入框交换的动画 做完之后觉得页面上加些许过渡或动画,其变化虽小,却能极大的提升页面质感,给人一种顺畅、丝滑的视觉体验。它的实现过程主要是通过css中的transition和animation来实现的。平时在开发的时候增加一些动画效…

使用qt creator配置msvc环境(不需要安装shit一样的宇宙第一IDE vs的哈)

1. 背景 习惯使用Qt编程的童鞋,尤其是linux下开发Qt的童鞋一般都是使用qt creator作为首选IDE的,通常在windows上使用Qt用qt creator作为IDE的话一般编译器有mingw和msvc两种,使用mingw版本和在linux下的方式基本上一样十分简单,不…

如何在Ubuntu环境下使用加速器配置Docker环境

一、安装并打开加速器 这个要根据每个加速器的情况来安装并打开,一般是会开放一个代理端口,比如1087 二、安装Docker https://docs.docker.com/engine/install/debian/#install-using-the-convenience-script 三、配置Docker使用加速器 3.1 修改配置…

UE5 04-重新加载当前场景

给关卡加一个淡出的效果 给关卡加一个淡入的效果, 这个最好放置在Player 上,这样切关卡依然有这个效果

防火墙基础及登录(华为)

目录 防火墙概述防火墙发展进程包过滤防火墙代理防火墙状态检测防火墙UTM下一代防火墙(NGFW) 防火墙分类按物理特性划分软件防火墙硬件防火墙 按性能划分百兆级别和千兆级别 按防火墙结构划分单一主机防火墙路由集成式防火墙分布式防火墙 华为防火墙利用…

centos7|操作系统|升级openssl-1.0.2k到openssl-3.3.0

一、 前言: opensssl是什么软件?openssl的版本是怎样的?为什么需要升级openssl?如何升级openssl? 1、openssl是一个什么样软件? OpenSSL是一个开源的安全套接字层(Secure Sockets Layer&…

MySQL8.0在windows下的下载安装及详细使用

下载mysql8.0二进制包 下载地址:MySQL :: Download MySQL Community Server 编辑my.ini配置文件 解压二进制包,新建/编辑my.ini配置文件(如果不存在则新建) [client] #客户端设置,即客户端默认的连接参数 # 设置mysql客户端连接服务端时…

Canvas:掌握颜色线条与图像文字设置

想象一下,用几行代码就能创造出如此逼真的图像和动画,仿佛将艺术与科技完美融合,前端开发的Canvas技术正是这个数字化时代中最具魔力的一环,它不仅仅是网页的一部分,更是一个无限创意的画布,一个让你的想象…

回溯 | Java | LeetCode 39, 40, 131 做题总结

Java Arrays.sort(数组) //排序 不讲究顺序的解答,都可以考虑一下排序是否可行。 39. 组合总和 错误解答 在写的时候需要注意,sum - candidates[i];很重要,也是回溯的一部分。 解答重复了。是因为回溯的for循环理解错了。 class Solutio…

使用OpenCV与PySide(PyQt)的视觉检测小项目练习

OpenCV 提供了丰富的图像处理和计算机视觉功能,可以实现各种复杂的图像处理任务,如目标检测、人脸识别、图像分割等。 PyQt(或PySide)是一个创建GUI应用程序的工具包,它是Python编程语言和Qt库的成功融合。Qt库是最强大的GUI库之一。Qt的快速…

【开放集目标检测】Grounding DINO

一、引言 论文: Grounding DINO: Grounding DINO: Marrying DINO with Grounded Pre-Training for Open-Set Object Detection 作者: IDEA 代码: Grounding DINO 注意: 该算法是在Swin Transformer、Deformable DETR、DINO基础上…

逆变器学习笔记(三)

DCDC电源芯片外围器件选型_dcdc的comp补偿-CSDN博客、 1.芯片的COMP引脚通常用于补偿网络: 芯片的COMP引脚通常用于补偿网络,在控制环路中发挥重要作用。COMP引脚接电容和电阻串联接地,主要是为了稳定控制环路、调整环路响应速度和滤波噪声…

java Lock接口

在 Java 中,Lock 接口的实现类ReentrantLock 类提供了比使用 synchronized 方法和代码块更广泛的锁定机制。 简单示例: import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;public class ReentrantLockExampl…

关闭vue3中脑瘫的ESLine

在创建vue3的时候脑子一抽选了ESLine,然后这傻卵子ESLine老是给我报错 博主用的idea开发前端 ,纯粹是用不惯vscode 关闭idea中的ESLine,这个只是取消红色波浪线, 界面中的显示 第二步,在vue.config.js中添加 lintOnSave: false 到这里就ok了,其他的我试过了一点用没有

阿里云 OSS - 开通到使用、服务端签名直传(前后端代码 + 效果演示)

目录 开始 OSS 相关术语须知 阿里云 OSS 开通 阿里云 OSS 使用 官方文档教程 实战开发 阿里云 OSS 自动配置 环境配置 实战开发 服务端签名直传 概述 代码实现 开始 OSS 相关术语须知 中文 英文 说明 存储空间 Bucket 存储空间是您用于存储对象(Ob…

DB-GPT-PaperReading

DB-GPT: Empowering Database Interactions with Private Large Language Models 1. 基本介绍 DB-GPT 旨在理解自然语言查询,提供上下文感知响应,并生成高精度的复杂 SQL 查询,使其成为从新手到专家的用户不可或缺的工具。DB-GPT 的核心创新在于其私有 LLM 技术,该技术在…

FL Studio 2024 发布,添加 FL Cloud 插件、AI 等功能

作为今年最受期待的音乐制作 DAW 更新之一,FL Studio 2024发布引入了新功能,同时采用了新的命名方式,从现在起将把发布年份纳入其名称中。DAW 的新增功能包括在 FL Cloud 中添加插件、AI 驱动的音乐创作工具和 FL Studio 的新效果。 FL Cloud…

ThinkPHP定时任务是怎样实现的?

接到一个需求:定时检查设备信息,2分钟没有心跳的机器,推送消息给相关人员,用thinkphp5框架,利用框架自带的任务功能与crontab配合来完成定时任务。 第一步:分析需求 先写获取设备信息,2分钟之…

力扣双指针算法题目:快乐数

目录 1.题目 2.思路解析 3.代码展示 1.题目 . - 力扣(LeetCode) 2.思路解析 题目意思是将一个正整数上面的每一位拿出来,然后分别求平方,最后将这些数字的平方求和得到一个数字,如此循环,如果在此循环中…

【做一道算一道】和为 K 的子数组

给你一个整数数组 nums 和一个整数 k ,请你统计并返回 该数组中和为 k 的子数组的个数 。 子数组是数组中元素的连续非空序列。 示例 1: 输入:nums [1,1,1], k 2 输出:2 示例 2: 输入:nums [1,2,3],…