一、生产者端防丢失
1. 发送方式选择
- 同步发送:使用
send()
方法,等待 Broker 确认响应(SendResult
),确保消息已成功发送。 - 异步发送:使用
sendAsync()
方法并设置回调函数,处理发送成功 / 失败的逻辑。 - 单向发送:使用
sendOneway()
,不等待响应(适用于允许少量丢失的场景)。
2. 重试机制
- 设置
maxRetryTimesWhenSendFailed
:生产者自动重试次数(默认 2 次)。 - 自定义异常处理:捕获
MQClientException
或RemotingException
,手动重试。
二、Broker 端防丢失
1.消息持久化
2.主从复制
Broker消息的零丢失方案:
- 同步刷盘:在返回应用写成功状态前,消息已经被写入磁盘。
- 异步刷盘:消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入
- 同步复制:等Master和Slave均写成功后才反馈给客户端写成功状态
- 异步复制:只要Master写成功即可反馈给客户端写成功状态
推荐:
- 刷盘方式
Master和Slave都设置成ASYNC_FLUSH的异步刷盘
- 复制方式
Master配置成SYNC_MASTER 同步复制
三:消费者端防丢失
1.广播消费(BROADCASTING
)
2.事务消息(半消息)
3.死信队列
怎么保证不丢失?
- 生产者
-
- 开启confirm模式,重试的机制
- rocketMQ
-
- 开启持久化(增大
commitLog
刷盘间隔)
- 开启持久化(增大
- 消费者
-
- ack的机制
消息持久化机制:Broker接收到消息后,会立即将消息写入磁盘,并返回确认信息给生产者。RocketMQ支持同步刷盘和异步刷盘两种方式,其中同步刷盘方式在消息写入磁盘后才返回确认,可靠性更高
消费失败后的常见的处理方法:
- 方式 1:返回 Action.ReconsumeLater(推荐) 重试
- 方式 2:返回 Null
- 方式 3:抛出异常