生产者在封装Message消息时可以传入tag参数,消费者在进行消费时可以进行订阅主题时可以进行tag过滤,代码示例如下.
//生产者
public class Producer {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("tagProducerGroup");producer.setNamesrvAddr("10.0.0.129:9876");producer.start();String topic = "tagFilterTopic";Message msg1 = new Message(topic,"TagA", ("消息A").getBytes(Charset.defaultCharset()));Message msg2 = new Message(topic,"TagB",("消息B").getBytes(Charset.defaultCharset()));Message msg3 = new Message(topic,"TagC", ("消息C").getBytes(Charset.defaultCharset()));producer.sendOneway(msg1);producer.sendOneway(msg2);producer.sendOneway(msg3);System.out.println("消息发送完毕");producer.shutdown();}
}
//消费者
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFilterConsumerGroup");consumer.setNamesrvAddr("10.0.0.129:9876");consumer.subscribe("tagFilterTopic", "TagA || TagC");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.println("消息的内容" + new String(msg.getBody(), Charset.defaultCharset()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}