rabbitmq 使用SAC队列实现顺序消息
前提
SAC: single active consumer, 是指如果有多个实例,只允许其中一个实例消费,其他实例为空闲
目的
实现消息顺序消费,操作:
- 创建4个SAC队列,
- 消息的路由key 取队列个数模,这里是4
- 发送消息到每个队列,保证每个队列只有一个消费者!!
实现
定义消息 SeqMessage
@Data
@AllArgsConstructor
public class SeqMessage implements Serializable {//消息idprivate String requestNo;//消息中顺序,1,2,3,4private int order;
}
创建 队列 绑定
@Configuration
public class OrderQueueConfiguration {public static final String EXCHANGE = "order-ex";public static final String RK_PREFIX = "rk-";public static final String ONE_QUEUE = "one-queue";public static final String TWO_QUEUE = "two-queue";public static final String THREE_QUEUE = "three-queue";public static final String FOUR_QUEUE = "four-queue";@Beanpublic DirectExchange exchange() { // 使用直连的模式return new DirectExchange(EXCHANGE, true, false);}@Beanpublic Binding oneBinding() {return BindingBuilder.bind(oneQueue()).to(exchange()).with(RK_PREFIX + 1);}@Beanpublic Binding twoBinding() {return BindingBuilder.bind(twoQueue()).to(exchange()).with(RK_PREFIX + 2);}@Beanpublic Binding threeBinding() {return BindingBuilder.bind(threeQueue()).to(exchange()).with(RK_PREFIX + 3);}@Beanpublic Binding fourBinding() {return BindingBuilder.bind(fourQueue()).to(exchange()).with(RK_PREFIX + 3);}@Beanpublic Queue oneQueue() {return createSacQueue(ONE_QUEUE);}@Beanpublic Queue twoQueue() {return createSacQueue(TWO_QUEUE);}@Beanpublic Queue threeQueue() {return createSacQueue(THREE_QUEUE);}@Beanpublic Queue fourQueue() {return createSacQueue(FOUR_QUEUE);}private static Queue createSacQueue(String queueName) {Map<String, Object> arguments = new HashMap<>(2);arguments.put("x-single-active-consumer", true);return new Queue(queueName, true, false, false, arguments);}}
重要的是 x-single-active-consumer
这个属性, 只有一个实例生效
创建 消费者
为每个队列创建一个监听消费者
@Slf4j
@Component
public class OrderListener {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = ONE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 1))public void onMessage1(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", ONE_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = TWO_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 2))public void onMessage2(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", TWO_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = THREE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 3))public void onMessage3(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", THREE_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = FOUR_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 4))public void onMessage4(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", FOUR_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}}
生产者发送消息
@GetMapping("/send/seq/messqge")public String sendSeqMessage() throws JsonProcessingException {int cnt = 100;int mod = 4;int seqSize = 6;for (int i = 0; i < cnt; i++) {for (int j = 0; j < seqSize; j++) {int rk = i % mod + 1;SeqMessage seqMessage = new SeqMessage("seq-" + i, j);String s = objectMapper.writeValueAsString(seqMessage);log.info("routeKey: {}, send msg: {}", rk, s);rabbitTemplate.convertAndSend(EXCHANGE, RK_PREFIX + rk, s);}}return "success";}
运行结果:
two-queue recv: {"requestNo":"seq-1","order":0}
two-queue recv: {"requestNo":"seq-1","order":1}
two-queue recv: {"requestNo":"seq-1","order":2}
two-queue recv: {"requestNo":"seq-1","order":3}
two-queue recv: {"requestNo":"seq-1","order":4}
two-queue recv: {"requestNo":"seq-1","order":5}
two-queue recv: {"requestNo":"seq-5","order":0}
two-queue recv: {"requestNo":"seq-5","order":1}
two-queue recv: {"requestNo":"seq-5","order":2}
two-queue recv: {"requestNo":"seq-5","order":3}
two-queue recv: {"requestNo":"seq-5","order":4}
two-queue recv: {"requestNo":"seq-5","order":5}three-queue recv: {"requestNo":"seq-2","order":0}
three-queue recv: {"requestNo":"seq-2","order":1}
three-queue recv: {"requestNo":"seq-2","order":2}
three-queue recv: {"requestNo":"seq-2","order":3}
three-queue recv: {"requestNo":"seq-2","order":4}
three-queue recv: {"requestNo":"seq-2","order":5}
three-queue recv: {"requestNo":"seq-6","order":0}
three-queue recv: {"requestNo":"seq-6","order":1}
three-queue recv: {"requestNo":"seq-6","order":2}
three-queue recv: {"requestNo":"seq-6","order":3}
three-queue recv: {"requestNo":"seq-6","order":4}
three-queue recv: {"requestNo":"seq-6","order":5}
可以发现,消息消费是顺序的
good luck!