package com.ldj.rocketmq.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** User: ldj* Date: 2024/5/26* Time: 15:09* Description: 局部顺序消息*/
public class OrderProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("produce-group-order");producer.setNamesrvAddr("192.168.208.190:9876;192.168.208.191:9876;192.168.208.192:9876");producer.start();/*** 局部顺序消息的要点的分2级,拿最外层的id作为计算队列下标,让相同一级的消息进入同一个队列*/for (int i = 0; i < 5; i++) {for (int j = 0; j < 5; j++) {int orderId = i;Message message = new Message("OrderTopic", "orderMessage", ("order_step[" + orderId + "-" + j + "]").getBytes(StandardCharsets.UTF_8));producer.send(message, (mqs, msg, arg) -> {Integer id = (Integer) arg;if (id != null) {return mqs.get(id.hashCode() % mqs.size());}throw new RuntimeException("缺少决定消息顺序的id!");}, orderId);}}}}
将一级分类下的所有信息收集进map,模拟顺序消费信息
package com.ldj.rocketmq.consumer;import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** User: ldj* Date: 2024/5/26* Time: 16:08* Description: 顺序消息消费者*/
public class OrderConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-order");consumer.setNamesrvAddr("192.168.208.190:9876;192.168.208.191:9876;192.168.208.192:9876");consumer.subscribe("OrderTopic", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setMessageModel(MessageModel.CLUSTERING);//订单id做为key,消息作为valueMap<String, List<String>> concurrentHashMap = new ConcurrentHashMap<>();consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {msgs.forEach(msg -> {String str = new String(msg.getBody());int left = str.indexOf("[");int right = str.indexOf("]");String substring = str.substring(left + 1, right);String[] k_v = substring.split("-");List<String> messages = concurrentHashMap.get(k_v[0]);if (CollectionUtils.isEmpty(messages)) {List<String> msgFirstList = new ArrayList<>();msgFirstList.add(str);concurrentHashMap.put(k_v[0], msgFirstList);} else {List<String> msgAddList = concurrentHashMap.get(k_v[0]);msgAddList.add(str);concurrentHashMap.put(k_v[0], msgAddList);}});System.out.println(JSON.toJSONString(concurrentHashMap));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.println("消费者准备就绪...");}
}
复制最后一行,并格式化