messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
延迟消息级别
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {//事务消息处理if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// 如果是延迟消息if (msg.getDelayTimeLevel() > 0) {// 如果设置的值过大,则设置为最大延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 修改Topictopic = ScheduleMessageService.SCHEDULE_TOPIC;// 根据延迟级别,决定要将其投递到那个队列中queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// 记录原始的 topic 和 队列信息MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 修改topic和队列信息msg.setTopic(topic);msg.setQueueId(queueId);}}public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";RocketMQ的Broker端在存储生产者写入消息时,首先将其写入CommitLog里,为了不让用户立刻就能消费到这条消息,
这里先将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并且根据设置的延迟级别选择将消息投放到哪一个队列里。
整个流程;
- 生产者发送延迟消息到Broker里
- 把消息转发到SCHEDULE_TOPIC_XXXX主题下的队列中
- 延迟服务定期消费SCHEDULE_TOPIC_XXXX主题下的消息,到时间了就把它拿到CommitLog中
- 消息重新被投放到目标Topic里
- 消费者消费延迟消息