RocketMQ—消费者的两种消费模式
RocketMQ消息消费的模式分为两种:负载均衡模式和广播模式,负载均衡模式表示多个消费者交替消费同一个主题里面的消息;广播模式表示每个每个消费者都消费一遍订阅的主题的消息。
负载均衡模式
CLUSTERING 集群模式下 队列会被消费者分摊, 队列数量>=消费者数量 消息的消费位点 mq服务器会记录处理
代码如下
/*** CLUSTERING 集群模式下 队列会被消费者分摊, 队列数量>=消费者数量 消息的消费位点 mq服务器会记录处理*/
@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-a",messageModel = MessageModel.CLUSTERING, // 集群模式 负载均衡consumeThreadNumber = 40)
public class DC1 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-a组的第一个消费者:" + message);}
}@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-a",messageModel = MessageModel.CLUSTERING // 集群模式
)
public class DC2 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-a组的第二个消费者:" + message);}
}
广播模式
BROADCASTING 广播模式下 消息会被每一个消费者都处理一次, mq服务器不会记录消费点位,也不会重试。
@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-b",messageModel = MessageModel.BROADCASTING // 广播模式
)
public class DC4 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-b组的第一个消费者:" + message);}
}
@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-b",messageModel = MessageModel.BROADCASTING // 广播模式
)
public class DC5 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-b组的第二个消费者:" + message);}
}