定时延时发送消息
任务需要延迟一段时间再进行处理。
生产者
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("ip:9876");producer.start();List<Order> F = OrderBuilder.build(1, "A", "B", "C");List<Order> S = OrderBuilder.build(2, "D", "Q");List<Order> T = OrderBuilder.build(3, "N", "Q", "R");ArrayList<Order> orders = new ArrayList<Order>() {{addAll(F);addAll(S);addAll(T);}};for (Order order : orders) {Message msg = new Message("test-topic", "test-topic_str", order.toString().getBytes());msg.setKeys("test-topic_trace");// 官网提供了这些延迟级别 分别对应 0 1 2// messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";// 重要的逻辑在这里设置队列延迟等级msg.setDelayTimeLevel(3);producer.send(msg);}System.out.println("finish");// 这里发送了两个Tag 的消息// 下面这个消息没有设置延迟时间for (Order order : orders) {Message msg = new Message("test-topic", "test-topic_str_other", ("other" + order.toString()).getBytes());msg.setKeys("test-topic_trace_other");producer.send(msg);}System.out.println("finish");}
}
消费者1订阅tag 为*的消息
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");consumer.setNamesrvAddr("ip:9876");// *表示订阅所有的消息consumer.subscribe("test-topic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
可以看到我们没有设置延迟发送和延迟发送的sentTime和recvTime是很有区别的:
我们看到test-group里面的消息总共有16条
消费者订阅tag为test-topic_str_other的消息
consumer.subscribe("test-topic", "test-topic_str_other");
消费者订阅tag为test-topic_str的消息
consumer.subscribe("test-topic", "test-topic_str");
通过上面的案例验证了:
// 后面这个值是根据消息的tag进行正则匹配的
consumer.subscribe("test-topic", "*");
源码的注解也有说明: