目录
- 一、简介
- 1.1、消费模式
- 二、消费者
- 2.1、maven依赖
- 2.2、application配置
- 2.3、消费监听
- 三、生产者
- 3.1、发送消息
- 3.2、运行结果
- 四、其他
一、简介
在之前的文章中,我们讲过了,同步发送单条消息,异步发送单条消息,发送单向消息,发送顺序消息,批量发送消息,事务消息,我们使用的模式都是 集群消费模式(Cluster),本文就来讲另外一种消息消费模式,也就是广播消费模式(Broadcast)
1.1、消费模式
在 Apache RocketMQ 中,实现消息消费的方式主要是两种:
-
集群消费模式(Cluster):
在集群消费模式下,同一个消费者组(Consumer Group)中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上,但是同一个消息只会被同一个消费者组中的一个消费者消费。 -
广播消费模式(Broadcast):
在广播消费模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,即每个消费者都会独立地消费消息。消息会被广播到同一个消费者组中的所有消费者实例上。
那么怎么使用广播消费模式呢?其实很简单,通过在消费者的 @RocketMQMessageListener 注解中设置 messageModel 参数为 MessageModel.BROADCASTING,即可将消费者设置为广播模式。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,从而实现了消息的广播消费。接下里看看具体操作吧。
二、消费者
2.1、maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq</artifactId><groupId>com.alian</groupId><version>1.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>11-broadcasting-message-one</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties></project>
2.2、application配置
application.properties
server.port=8011# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的消费者组
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 批量拉取消息的数量
rocketmq.consumer.pull-batch-size=10
# 广播消费模式
rocketmq.consumer.message-model=BROADCASTING
实际上对于本文来说,下面两个配置不用配置,也不会生效。
# 默认的消费者组
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 广播消费模式
rocketmq.consumer.message-model=BROADCASTING
因为优先的是@RocketMQMessageListener 注解中设置 consumerGroup 和messageModel 参数。
2.3、消费监听
@RocketMQMessageListener是RocketMQ提供的注解,用于配置消费者监听器的相关属性。
package com.alian.broadcasting;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = "broadcasting_string_message_topic",consumerGroup = "BROADCASTING_CONSUMER_GROUP",messageModel = MessageModel.BROADCASTING)
public class StringMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("第一个消费者接收到的字符串消息: {}", message);// 处理消息的业务逻辑}
}
关于这里@RocketMQMessageListener的参数做个简单解释:
- topic:必填,指定该消费者订阅的Topic名称
- consumerGroup:必填,指定该消费者所属的消费者组名称,同一个组内的消费者实例通常进行负载均衡消费
- messageModel:设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
MessageModel.java
public enum MessageModel {BROADCASTING("BROADCASTING"),CLUSTERING("CLUSTERING");private final String modeCN;MessageModel(String modeCN) {this.modeCN = modeCN;}public String getModeCN() {return this.modeCN;}
}
三、生产者
生产者我就复用前面批量消息发送的模块了
3.1、发送消息
@Slf4j
@SpringBootTest
public class SendBatchedBroadcastingMessageTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void syncSendStringMessagesWithBuilder() {String topic = "broadcasting_string_message_topic";for (int i = 0; i < 10; i++) {String message = "广播消息:" + i;Message<String> rocketMessage = MessageBuilder.withPayload(message).build();rocketMQTemplate.convertAndSend(topic, rocketMessage);}}@Testpublic void syncSendBatchStringMessagesWithBuilder() {String topic = "string_message_topic";String message = "批量广播消息:";List<Message<String>> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> rocketMessage = MessageBuilder.withPayload(message + i)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 加入到列表messageList.add(rocketMessage);}// 使用syncSend发送批量消息SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);log.info("批量消息发送结果:{}",sendResult);}@AfterEachpublic void waiting() {try {Thread.sleep(3000L);} catch (InterruptedException e) {e.printStackTrace();}}}
我们先启动消费者,然后生产者发送消息。
3.2、运行结果
运行结果:
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:1
[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:0
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:3
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:2
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:9
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:0
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:2
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:4
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:5
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:3
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:1
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:6
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:9
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:8[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:0
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:1
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:2
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:3
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:9
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:4
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:6
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:2
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:3
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:8
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:1
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:0
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:9
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:5
四、其他
RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。所以:不同的消费者组会被视为不同的消费者;如果消费者重启或重新加入组,就能从对应Queue的offset处继续消费。
不过使用广播消费模式时,Consumer Group 的概念基本上没有作用,因为每个消费者实例都会独立地收到消息的一个副本。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,而不像集群消费模式中那样,一个消费者组中的消费者会共同消费消息。
广播消费模式在RocketMQ中最好的好处就是消费者解耦:不同的消费者可以独立消费消息,相互之间不受影响,提高了系统的扩展性,它的适用场景有:
- 日志收集 - 需要将日志数据分发给多个日志收集系统,每个系统都需要收到全量日志。
- 数据备份 - 实时备份数据到多个存储系统,确保数据有冗余副本。
- 信息推送 - 向多个推送通道投递并发送消息通知,如站内信、短信、Push等。
- 状态同步 - 将数据变更实时同步到集群的所有节点,保证集群节点状态一致。
- 负载均衡 - 将任务或请求广播给所有服务实例,由每个实例独立处理,实现负载分担。
- 监控告警 - 将系统监控数据广播给多个监控系统,多视角分析。