RocketMQ系列文章
RocketMQ(一):基本概念和环境搭建
RocketMQ(二):原生API快速入门
RocketMQ(三):集成SpringBoot
RocketMQ(四):重复消费、消息重试、死信消息的解决方案
目录
- 一、重复消费
- 1、消息重复的情况
- 2、MySql唯一索引
- 3、redis分布式锁
- 二、消息重试
- 1、生产者重试
- 2、消费者重试
- 三、死信消息
- 四、消费堆积
一、重复消费
1、消息重复的情况
- 发送时消息重复
- 当一条消息已被成功发送到服务端并完成持久化
- 此时出现了网络闪断或者客户端宕机,导致
服务端对客户端应答失败
- 如果此时生产者意识到消息发送失败并尝试再次发送消息
- 消费者后续会收到两条内容相同并且 Message ID 也相同的消息
- 投递时消息重复
- 消息消费的场景下,消息已投递到消费者并完成业务处理,当
客户端给服务端反馈应答的时候网络闪断
- 为了保证消息至少被消费一次
- 消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息
- 消费者后续会收到两条内容相同并且 Message ID 也相同的消息
- 消息消费的场景下,消息已投递到消费者并完成业务处理,当
- 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
- 当消息队列RocketMQ的
Broker 或客户端重启、扩容或缩容
时 - 会触发 Rebalance,此时消费者可能会收到重复消息
- 当消息队列RocketMQ的
2、MySql唯一索引
- 因为 Message ID 有可能出现冲突(重复)的情况
- 所以用业务唯一标识作为幂等处理的关键依据
生产者
- 相同的唯一业务编号,发送两次
@Test
void test1() {// 业务唯一编号String key = "1300";Message<String> message = MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS, key).build();// 相同的key发送两次rocketMQTemplate.syncSend("repeatedTopic", message);rocketMQTemplate.syncSend("repeatedTopic", message);System.out.println("发送完成");
}
消费者
- 创建user表结构,num_no字段设置为唯一索引
- 当唯一的业务id插入唯一索引的num_no字段
- 只能插入一次,第二次会报唯一索引重复
- 当获取到重复数据,直接返回即可,就不在执行业务代码
@Component
@RocketMQMessageListener(topic = "repeatedTopic", consumerGroup = "repeated-consumer-group")
public class RepeatMysqlListener implements RocketMQListener<MessageExt> {@Autowiredprivate JdbcTemplate jdbcTemplate;@Overridepublic void onMessage(MessageExt message) {// 唯一的业务id(如果是相同的两次请求,则keys值一定相同)String messageKey = message.getKeys();try {jdbcTemplate.execute("INSERT INTO `user` (`num_no`,`name`) VALUES('" + messageKey + "','名称')");} catch (DataAccessException e) {// 该message可能是重复的if (e instanceof DuplicateKeyException) {System.out.println(messageKey+"的业务编号数据重复了,直接return,就算消费了此重复数据");return;}}// 获取消息执行业务System.out.println("获取消息内容:【" + new String(message.getBody()) + "】执行业务");}
}
执行结果:
发送完成
获取消息内容:【我是一个带key的消息】执行业务
1300的业务编号数据重复了,直接return,就算消费了此重复数据
3、redis分布式锁
Redisson分布式锁配置
@Configuration
public class RedissonConfig {@Beanpublic Redisson redisson() {Config config = new Config();config.useSingleServer().setAddress("redis://localhost:6390").setPassword("xc@1234").setDatabase(0);return (Redisson) Redisson.create(config);}
}
生产者
@Test
void test1() {// 业务唯一编号String key = "1400";Message<String> message = MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS, key).build();// 相同的key发送两次rocketMQTemplate.syncSend("repeatedTopic", message);rocketMQTemplate.syncSend("repeatedTopic", message);System.out.println("发送完成");
}
消费者
- 因为消费者是多线程并发消费
- 如果遇到相同的唯一业务id,则上锁依次执行
- 将执行过的唯一业务id放入redis
- 下次相同业务id进入与redis集合对比,存在则证明已经执行过了
@Component
@RocketMQMessageListener(topic = "repeatedTopic", consumerGroup = "repeated-consumer-group")
public class RepeatRedisListener implements RocketMQListener<MessageExt> {@Autowiredprivate Redisson redisson;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void onMessage(MessageExt message) {// 唯一的业务id(如果是相同的两次请求,则keys值一定相同)String messageKey = message.getKeys();RLock redissonLock = redisson.getLock(messageKey);try {// 添加redisson锁并实现锁续命功能// 默认过期时间是30s,每10s触发一次锁续命功能redissonLock.lock();List<String> topicBusinessKeyList = stringRedisTemplate.opsForList().range("topicBusinessKey",0,-1);if ( ObjectUtils.isNotEmpty(topicBusinessKeyList) && topicBusinessKeyList.contains(messageKey)) {System.out.println(messageKey + "的业务编号数据重复了,直接return,就算消费了此重复数据");return;}// 获取消息执行业务System.out.println("获取消息内容:【" + new String(message.getBody()) + "】执行业务");// 讲businessKey存入redisstringRedisTemplate.opsForList().rightPush("topicBusinessKey", messageKey);} finally {redissonLock.unlock();}}
}
执行结果:
发送完成
获取消息内容:【我是一个带key的消息】执行业务
1400的业务编号数据重复了,直接return,就算消费了此重复数据
二、消息重试
1、生产者重试
- 可以分别设置同步消息和异步消息发送的重试次数
- 广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
- 默认重试间隔时间为 1 秒,次数为2次
- 发送消息超时时间默认3000毫秒,如果因为超时,那么便不再尝试重试
application.yml配置文件设置
2、消费者重试
- 默认的重试间隔:
10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- 默认
多线程
模式下,重试16
次,设置超过 16 次的重试时间间隔均为每次 2 小时 - 某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递
- 在单线程的
顺序
模式下,重试Integer.MAX_VALUE次,间隔1秒
消费者配置
- 实现
RocketMQPushConsumerLifecycleListener
接口,从prepareStart方法中获取消费者
并设置它 - 消息最大重试次数的设置对
相同GroupID
下的所有Consumer
实例有效
@Component
@RocketMQMessageListener(topic = "retryTopic",consumerGroup = "retry-consumer-group"
)
public class RetryListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(MessageExt message) {//获取消息的重试次数System.out.println(message.getReconsumeTimes());System.out.println("消息内容:"+new String(message.getBody()));}@Overridepublic void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {// 设置消费者重试次数defaultMQPushConsumer.setMaxReconsumeTimes(2);// 实例名称-控制面板可以看到defaultMQPushConsumer.setInstanceName("消费者1号");}
}
设置重试二次的执行结果:
三、死信消息
- 当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
- 死信队列是死信Topic下分区数唯一的
单独队列
- 死信Topic名称为
%DLQ%原消费者组名
,死信队列的消息将不会再被消费
上一节的消费者重试两次后,就会将消息放入死信队列
处理死信消息方式一:
- 监听死信队列处理消息
@Component
@RocketMQMessageListener(topic = "%DLQ%retry-consumer-group",consumerGroup = "retry-dead-consumer-group"
)
public class RetryDeadConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息 签收了System.out.println("记录到特别的位置 文件 mysql 通知人工处理");}
}
处理死信消息方式二:
- 控制重试次数,重试几次后,直接记录到数据库等等
@Component
@RocketMQMessageListener(topic = "%DLQ%retry-consumer-group",consumerGroup = "retry-dead-consumer-group"
)
public class RetryDeadConsumer2 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {// 业务处理try {int i = 1 / 0;} catch (Exception e) {// 重试int reconsumeTimes = messageExt.getReconsumeTimes();if (reconsumeTimes >= 3) {// 不要重试了System.out.println("记录到特别的位置 文件 mysql 通知人工处理");}else {throw new RuntimeException("异常");}}}
}
四、消费堆积
一般认为单条队列消息差值>=10w时 算堆积问题
什么情况下会出现堆积
- 生产太快
- 生产方可以做业务限流
- 增加消费者数量,但是消费者数量<=队列数量,适当的设置最大的消费线程数量(
根据IO(2n)/CPU(n+1)
)
- 消费者消费出现问题