文章目录
- 批量发送消息
- 消息过滤
批量发送消息
批量发送消息可以减少网络的 IO 开销,让多个消息通过 1 次网络开销就可以发送,提升数据发送的吞吐量
虽然批量发送消息可以减少网络 IO 开销,但是一次也不能发送太多消息
批量消息直接将多个消息放入集合中发送即可,生产者代码如下:
public class Producer {public static void main(String[] args) throws Exception {// 1、创建生产者对象DefaultMQProducer producer = new DefaultMQProducer("producer_group");// 2、为生产者对象设置 NameServer 地址producer.setNamesrvAddr("127.0.0.1:9876");// 3、把我们的生产者直接启动起来producer.start();// 4、创建消息、并发送消息List<Message> reqList = new ArrayList<>(12);for (int i = 0; i < 12; i++) {// public Message(String topic, String tags, String keys, byte[] body) {Message message = new Message("custom-batch-topic","batchTag","CUSTOM_BATCH",("("+i+")Hello Message From BATCH Producer, " +"date="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())).getBytes());reqList.add(message);}// 利用生产者对象,将消息直接批量发送出去producer.send(reqList);System.out.println("Send Finished.");}
}
消费者代码如下:
public class Consumer {public static void main(String[] args) throws Exception {// 1、创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");// 2、为消费者对象设置 NameServer 地址consumer.setNamesrvAddr("127.0.0.1:9876");// 3、订阅主题consumer.subscribe("custom-batch-topic", "*");// 4、注册监听消息,并打印消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {String printMsg = new String(msg.getBody()) + ", recvTime: "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());System.out.println(printMsg);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、把消费者直接启动起来consumer.start();System.out.println("Consumer Started Finished.");}
}
消息过滤
消费者组中还可以有过滤操作,对同一个 Topic 下的消息的 Tag 标签进行过滤
但是使用消息过滤时需要 保证同一个消费组中消费的消息的 Tag 相同
,如果同一个消费者组中的两个消费者订阅了不同的 Tag,比如消费者 A 订阅了 Tag1,消费者 B 订阅了 Tag2,那么可能 B 收到了 Tag1 的数据,发现不是自己想要的,于是将 Tag1 的数据过滤掉了,那么就导致了 A 也收不到 Tag1 的数据,造成数据消失的现象
消息过滤流程图如下:
消息过滤生产者如下:
public class FilterProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producer_group",true);producer.setNamesrvAddr("127.0.0.1:9876");producer.start();List<Order> list = new ArrayList<>();for (int i = 0; i < 12; i ++) {Order order = new Order();order.orderId = i;order.desc = "desc:" + i;order.tag = "tag" + i % 3;list.add(order);}for (Order order : list) {Message msg = new Message("Filter-Test-Topic",order.tag,(order.toString()).getBytes());msg.setKeys("Filter_Tag");msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));// 直接将 msg 发送出去producer.send(msg);}System.out.println("Send Finished.");}public static class Order {int orderId;String desc;String tag;@Overridepublic String toString() {return "orderId="+orderId+", desc="+desc+", tag="+tag;}}
}
过滤 tag 的几种用法:
过滤消息的 tag 主要修改一行代码:
consumer.subscribe("Filter-Test-Topic", "tag1");
,过滤也分几种情况:
过滤所有 tag
consumer.subscribe("Filter-Test-Topic", "*");
过滤单个 tag
consumer.subscribe("Filter-Test-Topic", "tag1");
过滤多个 tag
consumer.subscribe("Filter-Test-Topic", "TG2 || TG3");
订阅 SQL92 方式(需要修改 custom.conf 文件,添加一行配置:enablePropertyFilter=true)
consumer.subscribe("Filter-Test-Topic", MessageSelector.bySql("idx > 10"));
这里的 idx > 10 的 idx 是在生产者中通过下边这行代码放入的:
msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
消息过滤消费者代码如下(只过滤出 tag = tag1 的消息):
public class Subscribe02_Single_Consumer {public static void main(String[] args) throws Exception {// 1、创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Subscribe02_Single_Consumer");// 2、为消费者对象设置 NameServer 地址consumer.setNamesrvAddr("127.0.0.1:9876");// 3、订阅主题consumer.subscribe("Filter-Test-Topic", "tag1");// 4、注册监听消息,并打印消息consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String printMsg = new String(msg.getBody()) + ", recvTime: "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());System.out.println(printMsg);}return ConsumeOrderlyStatus.SUCCESS;}});// 5、把消费者直接启动起来consumer.start();System.out.println("Consumer Started Finished.");}
}