原理是采用timewhile 实现的,源码分析可以参考
https://blog.csdn.net/sinat_14840559/article/details/129266105
除了useDelayLevel 已经默认改为false
private boolean useDelayLevel = false;
官方示意代码在public class TimerMessageProducer
for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));// This message will be delivered to consumer 10 seconds later.//message.setDelayTimeSec(10);// The effect is the same as the above// message.setDelayTimeMs(10_000L);// Set the specific delivery time, and the effect is the same as the abovemessage.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);// Send the messageSendResult result = producer.send(message);System.out.printf(result + "\n");}
实际测试大部分都会提前几百毫秒发送消息。
ps:
message.getBornTimestamp() – 消息生成时间
message.getDeliverTimeMs()-- 消息触发时间,即delay设置的timestamp
可以使用以下代码验证
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {for (MessageExt message : messages) {// Print approximate delay time periodString body = new String(message.getBody());System.out.printf("Receive message[msgId=%s %d ,%d ms later ]\n", body,System.currentTimeMillis() - message.getBornTimestamp(),System.currentTimeMillis() - message.getDeliverTimeMs() );}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});
proxy 配置如下:
{"rocketMQClusterName": "DefaultCluster","useDelayLevel":"false"
}