RocketMQ简介
RocketMQ 简介
Apache RocketMQ 是一款开源的 分布式消息中间件(Message Queue, MQ),由阿里巴巴团队研发并捐赠给 Apache 基金会,现已成为顶级项目。它专为 高吞吐、低延迟、高可靠 的分布式场景设计,广泛应用于 电商、金融、物流、IoT 等领域。
1. 核心特性
特性 | 说明 |
---|---|
高吞吐 | 单机支持 10万+ TPS(消息吞吐量),适用于大流量场景(如双11、秒杀)。 |
低延迟 | 毫秒级 消息投递,满足实时业务需求。 |
高可靠 | 支持 同步刷盘、多副本存储,确保数据不丢失。 |
事务消息 | 提供 两阶段提交(2PC),保证分布式事务一致性。 |
顺序消息 | 支持 分区有序(如订单ID相同的消息严格有序)。 |
定时/延时消息 | 可设定消息在 指定时间 或 延迟后 投递。 |
消息回溯 | 支持按 时间戳 重新消费历史消息。 |
消息过滤 | 支持 Tag 过滤 和 SQL92 表达式 筛选消息。 |
2. 核心架构
RocketMQ 采用 发布-订阅(Pub/Sub) 模型,主要由以下组件构成:
(1)NameServer(路由中心)
- 轻量级 无状态服务,负责 Broker 发现 和 路由管理。
- 类似 Kafka 的 ZooKeeper,但更简单高效。
(2)Broker(消息存储与转发)
- 主从架构(Master-Slave),支持 同步/异步复制,确保高可用。
- 存储模型:
- CommitLog:所有消息 顺序写入,提升磁盘 IO 性能。
- ConsumeQueue:逻辑队列,存储消息索引,加速消费。
(3)Producer(生产者)
- 负责发送消息,支持 同步、异步、单向(Oneway) 发送模式。
(4)Consumer(消费者)
- 支持 集群消费(负载均衡) 和 广播消费(全量投递)。
- 提供 Push(服务端推送) Pull(客户端拉取) 两种消费模式。
3. 典型应用场景
(1)削峰填谷(流量控制)
- 场景:秒杀、大促等高并发场景,避免系统过载。
- 方案:消息队列缓冲请求,后端按处理能力消费。
(2)异步解耦
- 场景:订单支付成功后,异步通知库存、物流、营销系统。
- 方案:Producer 发送消息,Consumer 订阅处理,降低系统耦合。
(3)分布式事务
- 场景:跨服务事务(如订单+库存)。
- 方案:使用 事务消息,确保最终一致性。
(4)日志收集
- 场景:实时采集业务日志,传输至大数据系统(如 Flink、Hadoop)。
- 方案:RocketMQ + RocketMQ Connect 构建数据管道。
4. 对比其他MQ
对比项 | RocketMQ | Kafka | RabbitMQ |
---|---|---|---|
设计目标 | 金融级可靠消息 | 日志流处理 | 企业级AMQP |
吞吐量 | 10万+ TPS | 百万级 TPS | 万级 TPS |
延迟 | 毫秒级 | 毫秒~秒级 | 微秒~毫秒级 |
事务支持 | ✅(2PC事务消息) | ❌ | ⚠️(有限支持) |
消息回溯 | ✅(按时间戳) | ✅(按Offset) | ❌ |
适用场景 | 电商、金融、IoT | 日志、大数据 | 企业应用集成 |
5. 快速体验
(1)启动服务
# 启动 NameServer
nohup sh bin/mqnamesrv &# 启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
(2)发送消息(Java示例)
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg); // 同步发送
producer.shutdown();
(3)消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {System.out.println("收到消息: " + new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
6. 总结
✅ 适合场景:
- 需要 高可靠、事务支持、顺序消息 的业务(如支付、订单)。
- 大规模分布式系统,要求 高吞吐、低延迟。
❌ 不适合场景:
- 极低延迟(微秒级)场景(可考虑 RabbitMQ)。
- 纯日志流处理(可考虑 Kafka)。
📌 推荐使用:
- 金融、电商、物流、IoT 等对 数据一致性 要求高的领域。
🔗 官方资源:
- 官网:https://rocketmq.apache.org/
- GitHub:https://github.com/apache/rocketmq
RocketMQ 凭借其 高性能、高可靠、丰富的消息模式,已成为企业级消息中间件的首选之一。 🚀
RocketMQ安装
RocketMQ 安装指南(Linux/Windows/Mac)
RocketMQ 支持多种操作系统,以下是详细的安装步骤:
1. 环境准备
- JDK 1.8+(推荐 OpenJDK 或 Oracle JDK)
- 4GB+ 内存(Broker 默认需要 2GB 堆内存)
- 磁盘空间(建议 10GB+,用于消息存储)
2. 下载 RocketMQ
从官方镜像站下载最新稳定版(以 5.0.0 为例):
# Linux/Mac
wget https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
unzip rocketmq-all-5.0.0-bin-release.zip
cd rocketmq-5.0.0# Windows
下载地址:https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
解压后进入 `rocketmq-5.0.0/bin` 目录。
3. 启动 NameServer
NameServer 是轻量级路由中心,无状态服务。
Linux/Mac
# 启动 NameServer(后台运行)
nohup sh bin/mqnamesrv &# 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
Windows
# 在 CMD 中执行
.\bin\mqnamesrv.cmd
4. 启动 Broker
Broker 是消息存储和转发节点,需配置内存参数。
Linux/Mac
# 修改 JVM 内存(可选,默认 2GB)
export JAVA_OPT="-Xms2g -Xmx2g"# 启动 Broker(单机模式)
nohup sh bin/mqbroker -n localhost:9876 &# 查看日志
tail -f ~/logs/rocketmqlogs/broker.log
Windows
# 修改内存(编辑 bin/runbroker.cmd)
set "JAVA_OPT=-Xms1g -Xmx1g"# 启动 Broker
.\bin\mqbroker.cmd -n localhost:9876
5. 验证安装
(1)检查进程
# Linux/Mac
jps -l | grep -E 'NamesrvStartup|BrokerStartup'# 预期输出
12345 org.apache.rocketmq.namesrv.NamesrvStartup
67890 org.apache.rocketmq.broker.BrokerStartup
(2)发送测试消息
# 使用自带工具测试
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer# 预期输出
SendResult [sendStatus=SEND_OK, msgId=...]
(3)消费测试消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer# 预期输出
Receive message: Hello RocketMQ 0
Receive message: Hello RocketMQ 1
...
6. 关闭服务
# 关闭 Broker
sh bin/mqshutdown broker# 关闭 NameServer
sh bin/mqshutdown namesrv
7. 常见问题
(1)内存不足
- 报错:
Java heap space
- 解决:修改
bin/runbroker.sh
或runbroker.cmd
,调整-Xms
和-Xmx
。
(2)端口冲突
- 报错:
Address already in use
- 解决:检查
9876
(NameServer)和10911
(Broker)端口是否被占用。
(3)Windows 启动失败
- 报错:
找不到主类
- 解决:确保 JDK 环境变量配置正确,或使用绝对路径运行脚本。
8. 生产环境建议
- 集群部署:至少 2 台 NameServer + 主从 Broker。
- 持久化配置:
# conf/broker.conf brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 # 0=Master, >0=Slave storePathRootDir=/data/rocketmq/store
- 监控工具:集成 Prometheus + Grafana(官方提供 Dashboard)。
9. 卸载 RocketMQ
# 删除安装目录
rm -rf rocketmq-5.0.0# 清理日志和数据
rm -rf ~/logs/rocketmqlogs
rm -rf ~/store
通过以上步骤,你可以快速完成 RocketMQ 的安装和验证。如需更高级配置(如 ACL 权限、多副本同步),请参考 官方文档。 🚀
RocketMQ快速入门
Java 中使用 RocketMQ 快速入门
以下是一个完整的 Java 示例,涵盖 Producer(生产者) 和 Consumer(消费者) 的基本用法,使用 RocketMQ 5.x 客户端。
1. 添加 Maven 依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.0.0</version> <!-- 使用最新版本 -->
</dependency>
2. 生产者(Producer)示例
发送同步消息(阻塞等待 Broker 响应):
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.client.producer.SendResult;public class ProducerExample {public static void main(String[] args) throws Exception {// 1. 创建生产者(指定 Producer Group)DefaultMQProducer producer = new DefaultMQProducer("test_producer_group");// 2. 设置 NameServer 地址producer.setNamesrvAddr("localhost:9876");// 3. 启动生产者producer.start();// 4. 创建消息(Topic, Tag, Body)Message msg = new Message("test_topic", "TagA", "Hello RocketMQ".getBytes());// 5. 发送消息(同步方式)SendResult result = producer.send(msg);System.out.println("发送结果: " + result);// 6. 关闭生产者producer.shutdown();}
}
关键参数说明:
Producer Group
:生产者组,用于事务消息和故障转移。Topic
:消息的分类(如订单、支付)。Tag
:消息的子分类(如TagA
表示普通消息,TagB
表示紧急消息)。
3. 消费者(Consumer)示例
使用 Push 模式(服务端推送消息):
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;public class ConsumerExample {public static void main(String[] args) throws Exception {// 1. 创建消费者(指定 Consumer Group)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");// 2. 设置 NameServer 地址consumer.setNamesrvAddr("localhost:9876");// 3. 订阅 Topic 和 Tag(* 表示所有 Tag)consumer.subscribe("test_topic", "*");// 4. 注册消息监听器
.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,ConsumeConcurrentlyContext context) {for (MessageExt msg : messages) {System.out.printf("收到消息: Topic=%s, Tag=%s, Body=%s %n",msg.getTopic(),msg.getTags(),new String(msg.getBody()));}// 返回消费状态(成功/失败)return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5. 启动消费者consumer.start();System.out.println("消费者已启动,等待消息...");}
}
关键参数说明:
Consumer Group
:消费者组,同组内负载均衡。MessageListenerConcurrently
:并发消费(默认方式)。ConsumeConcurrentlyStatus.CONSUME_SUCCESS
:确认消费成功(失败则重试)。
4. 运行流程
- 启动 NameServer 和 Broker(参考安装指南)。
- 运行 Producer:发送消息到 Broker。
- 运行 Consumer:自动从 Broker 拉取消息并处理。
预期输出:
# Producer 输出
发送结果: SendResult [sendStatus=SEND_OK, msgId=...]# Consumer 输出
收到消息: Topic=test_topic, Tag=TagA, Body=Hello RocketMQ
5. 高级功能示例
(1)发送异步消息
producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步发送成功: " + sendResult);}@Overridepublic void onException(Throwable e) {System.out.println("异步发送失败: " + e.getMessage());}
});
(2)发送顺序消息
// 发送时指定 ShardingKey(相同 Key 的消息保证顺序)
SendResult result = producer.send(msg, (mqs, msg, arg) -> {Integer id = (Integer) arg;return mqs.get(id % mqs.size());
}, orderId); // orderId 作为分区键
(3)消费模式(集群 vs 广播)
// 集群模式(同组消费者共享消息)
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式(同组消费者各自消费全量消息)
consumer.setMessageModel(MessageModel.BROADCASTING);
6. 常见问题
(1)连接失败
- 问题:
No route info of this topic
- 解决:检查 Broker 是否启动,或手动创建 Topic:
sh bin/mqadmin updateTopic -n localhost:9876 -t test_topic
(2)重复消费
- 原因:消费者返回
RECONSUME_LATER
或超时未响应。 - 解决:确保消费逻辑幂等,或使用
MessageExt.getReconsumeTimes()
判断重试次数。
(3)消息堆积
- 解决:增加消费者实例,或调整
pullBatchSize
:consumer.setPullBatchSize(32); // 默认 32
7. 完整项目结构
src/
├── main/
│ ├── java/
│ │ ├── ProducerExample.java
│ │ └── ConsumerExample.java
│ └── resources/
│ └── logback.xml # 日志配置(可选)
pom.xml
通过以上代码,你可以快速上手 RocketMQ 的核心消息收发功能。更多高级特性(如事务消息、延迟消息)请参考 官方文档。 🚀
RocketMQ集群搭建
RocketMQ 核心角色
角色 | 职责 | 部署要求 | 数据流向 |
---|---|---|---|
NameServer | 路由管理中心 | 至少2节点 | 无状态 |
Broker-Master | 消息读写 | 奇数节点 | 持久化存储 |
Broker-Slave | 数据备份 | 与Master配对 | 同步/异步复制 |
Producer | 消息生产 | 可水平扩展 | → Broker |
Consumer | 消息消费 | 可水平扩展 | ← Broker |
RocketMQ集群核心特点详解
1. NameServer 无状态设计
- 无状态集群:节点间零数据同步,完全独立运行
- 轻量级:单节点资源消耗低(1核2GB内存足够)
- 高可用:建议至少部署2个节点(不同可用区)
- 数据时效:
- Broker每30秒上报路由信息
- 客户端每30秒刷新路由缓存
2. Broker 主从架构
配置维度 | Master | Slave |
---|---|---|
BrokerId | 必须设为0 | 必须≥1(建议顺序递增) |
BrokerName | 同组主从必须相同(如broker-a) | 同组主从必须相同 |
连接管理 | 与所有NameServer保持长连接 | 额外与对应Master建立数据通道 |
数据同步 | 主动推送数据 | 支持两种模式: |
• SYNC_MASTER(强一致) | ||
• ASYNC_MASTER(高性能) |
3. Producer 工作模式
- 无状态设计:支持无限水平扩展
- 智能路由:
- 首次随机选择NameServer
- 自动感知Broker拓扑变化
- 容错机制:
// 建议生产环境配置 producer.setRetryTimesWhenSendFailed(3); producer.setSendMsgTimeout(5000);
4. Consumer 消费规则
- 订阅策略:
- 默认从Master消费(最新数据)
- Master不可用时自动切换Slave
- 负载均衡:
// 集群模式下自动分配队列 consumer.setMessageModel(MessageModel.CLUSTERING);
- 位点控制:
// 支持三种初始位点 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET // 常用配置 );
5. 生产环境黄金法则
-
部署建议:
- NameServer:奇数节点(3节点最佳)
- Broker:SYNC_MASTER + 至少1个Slave
-
性能调优:
# broker.conf mappedFileSizeCommitLog=1073741824 # 1GB CommitLog文件 flushIntervalCommitLog=1000 # 1秒刷盘间隔
-
监控指标:
指标类别 关键指标 报警阈值 消息堆积 consumerOffset落后量 >10万条 系统资源 CPU使用率 >70%持续5分钟 网络吞吐 入站流量 接近带宽上限80%
RocketMQ集群模式
RocketMQ 支持多种集群部署模式,主要分为以下三种,每种模式适用于不同的场景和需求:
1. 单 Master 模式
- 特点:只有一个 Master 节点,无 Slave 节点。
- 优点:部署简单,资源消耗低。
- 缺点:存在单点故障风险,一旦 Master 宕机,整个服务不可用。
- 适用场景:仅用于测试或开发环境,不推荐生产使用。
2. 多 Master 模式
- 特点:多个 Master 节点组成集群,无节点。
- 优点:
- 高性能:所有节点均可读写,无主从复制延迟。
- 高可用:单个 Master 宕机不影响其他节点。
- 缺点:
- 宕机期间未被消费的消息可能丢失(未配置同步刷盘时)。
- 无法自动故障转移(需人工干预)。
- 适用场景:对消息丢失不敏感的高吞吐场景(例如日志收集)。
3. 多 Master 多 Slave 模式
- 子模式:
- 同步复制(Sync):Master 和 Slave 数据完全一致后返回写入成功。
- 异步复制(Async):Master 写入成功后立即返回,数据异步复制到 Slave。
- 优点:
- 高可用:Master 宕机后,Slave 可自动或手动切换为 Master。
- 数据可靠性高(尤其是同步复制模式)。
- 缺点:
- 同步复制性能较低(需等待 Slave 完成复制)。
- 异步复制可能丢失少量数据(Master 宕机时未复制的数据)。
- 适用场景:对数据可靠性要求高的生产环境(如金融交易)。
简单对比表
模式 | 可靠性 | 性能 | 自动容灾 | 适用场景 |
---|---|---|---|---|
单 Master | 低 | 最高 | 不支持 | 测试/开发 |
多 Master | 中 | 高 | 不支持 | 允许少量丢失的高吞吐场景 |
多 Master 多 Slave(异步) | 中高 | 中高 | 支持 | 平衡可靠性与性能的生产环境 |
多 Master 多 Slave(同步) | 最高 中 | 支持 | 强一致性要求的场景 |
选择建议
- 追求简单和性能:多 Master 模式。
- 追求高可用和数据可靠:多 Master 多 Slave + 同步复制。
- 折中方案:多 Master 多 Slave + 异步复制(需容忍少量丢失)。
RocketMQ 的集群模式灵活,可根据业务需求在性能、可靠性和复杂度之间权衡。
RocketMQ双主双从搭建
一、架构设计
二、配置要点
关键参数 | Master配置 | Slave配置 |
---|---|---|
brokerRole | SYNC_MASTER | SLAVE |
brokerId | 0 | 1 |
listenPort | 10911 | 11011 |
三、部署步骤
- 启动NameServer:
nohup sh bin/mqnamesrv &
- 启动Master节点:
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a.properties &
- 启动Slave节点:
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a-s.properties &
RocketMQ消费模式
一、Push模式(长轮询)
核心机制:
- Broker主动推送(实际基于长轮询)
- 客户端注册MessageListener
- 服务端hold连接(默认15s超时)
特点:
- 实时性高(消息延迟<1s)
- 客户端资源占用较多
- 服务端维护消费进度
示例代码:
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 处理消息return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
二、Pull模式
核心机制:
- 客户端定时拉取(需自行控制频率)
- 可精确控制消费位点
- 需手动提交offset
特点:
- 灵活性高(可自定义拉取逻辑)
- 容易产生空轮询
- 客户端维护消费进度
示例代码:
PullResult pullResult = consumer.pullBlockIfNotFound(new MessageQueue("topic", "broker", queueId),"*",getMessageQueueOffset(),32 // 单次拉取条数
);
三、对比总结表
特性 | Push模式 | Pull模式 |
---|---|---|
实现原理 | 服务端长轮询 | 客户端主动请求 |
实时性 | 高(毫秒级) | 依赖拉取频率 |
资源消耗 | 服务端连接开销大 | 客户端控制开销大 |
消费控制 | 自动提交offset | 需手动管理offset |
适用场景 | 常规实时业务 | 特殊调度需求/批量处理 |
四、选型建议
-
默认用Push模式:
- 适合99%的常规场景
- 简化客户端代码
// 推荐配置(并发消费) consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(64);
-
考虑Pull模式当:
- 需要精确控制消费速率时
- 实现自定义存储offset逻辑
// 典型Pull控制逻辑 while(true) {if(业务允许消费){pullResult = consumer.pull(...);// 处理消息updateOffset();}Thread.sleep(100); // 控制频率 }
注意事项:
- Push模式实际仍基于Pull封装(长轮询机制)
- Pull模式注意处理
NO_NEW_MSG
和OFFSET_ILLEGAL
状态 - 两种模式可混合使用(不同消费者实例)
RocketMQ中消费者组的概念
一、基础定义
消费者组是RocketMQ中逻辑上的订阅单元,由多个消费者实例组成,共同消费同一Topic下的消息。关键特性:
// 消费者组声明示例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("订单处理组"); // ← 这里定义消费者组名
二、核心特性图解
三、工作模式
-
集群模式(CLUSTERING)
- 同组消费者均分消息(每条消息只被组内一个实例消费)
- 实现横向扩展和负载均衡
# 消费模式配置 messageModel=CLUSTERING # 默认模式
-
广播模式(BROADCAST)
- 组内每个消费者收到全量消息
- 适用于日志收集等场景
consumer.setMessageModel(MessageModel.BROADCASTING);
四、关键机制
-
队列分配策略
- 平均分配(AllocateMessageQueueAveragely)
- 环形分配(AllocateMessageQueueAveragelyByCircle)
- 自定义策略(实现AllocateMessageQueueStrategy接口)
// 设置分配策略 consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
-
消费位点管理
- 持久化到Broker的
consumerOffset.json
- 支持重置位点(控制台或API操作)
# 重置消费位点命令 sh bin/mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g 订单组 -t OrderTopic -s now
- 持久化到Broker的
-
重平衡机制
- 触发条件:
- 消费者实例增减
- Topic队列数变化
- 网络分区恢复
- 过程耗时通常<1秒(生产环境实测)
- 触发条件:
五、生产实践要点
-
组名规范建议
业务域_环境_功能 示例:payment_prod_notify
-
消费者数量设置
最优消费者数量 = min(订阅Topic的队列总数, 业务需要的并行度)
-
异常处理方案
问题类型 解决方案 单实例堆积 增加消费者实例 全组消费停滞 检查网络/重启Broker 位点丢失 通过控制台重置offset