学习之前呢需要会使用linux的基础命令
一.RocketMQ 主题与队列的协同作用解析
在 RocketMQ 中,主题(Topic)与队列(Queue)的协同设计实现了消息系统的逻辑抽象与物理存储分离。虽然队列实际存储在不同集群的 Broker 节点上,但主题作为逻辑层面的核心单元,仍具有不可替代的作用:
1. 主题是消息的逻辑分类与路由标识
- 逻辑聚合:主题将同一类业务消息聚合为逻辑单元(例如订单消息归入
OrderTopic
),生产者只需关注发送到哪个主题,无需感知底层队列的物理分布。 - 路由依据:消息通过主题名称路由到对应集群的队列中。例如,生产者发送到
PaymentTopic
的消息会被自动分配到该主题关联的队列(可能分布在多个集群)。 - 跨集群协同:若队列分布在多个集群,主题的元数据(如队列分布规则)由 NameServer 统一管理,确保生产者/消费者能准确找到目标队列。
2. 主题实现消息的订阅与消费隔离
- 订阅关系管理:消费者通过订阅主题来接收消息,而非直接绑定队列。即使队列分布在多个集群,消费者组仍可通过主题名称统一订阅,RocketMQ 自动完成队列分配和负载均衡。
- 权限控制:主题支持独立的权限配置(如读写权限),实现不同业务消息的访问隔离。例如,财务系统仅能访问
FinanceTopic
,与订单系统隔离。 - Tag 过滤扩展:在主题基础上,通过 Tag 进一步细分消息类型(如
OrderTopic:TagA
),消费者可选择性订阅特定 Tag,减少无关消息处理。
3. 主题为运维提供统一管理入口
- 队列动态扩展:当集群扩容时,通过调整主题的队列数量(如从 8 队列增至 16 队列),新队列可自动分配到新增集群的 Broker,无需修改业务代码。
- 监控与告警:主题级别的监控指标(如消息堆积量、消费 TPS)便于快速定位问题。例如,
InventoryTopic
的消费延迟告警可直接关联到库存业务。 - 数据生命周期:主题支持独立的消息保留策略(如保存 3 天),不同业务可按需配置,避免全局策略的局限性。
4. 主题与队列的协同模型
维度 | 主题(Topic) | 队列(Queue) |
---|---|---|
定位 | 逻辑分类单元,定义消息身份与权限 | 物理存储单元,实际承载消息数据 |
扩展性 | 通过增加队列数量实现横向扩展 | 依赖 Broker 集群扩容提升单队列容量 |
消费模式 | 支持集群消费(单队列单消费者)或广播消费 | 仅作为存储载体,不直接决定消费模式 |
运维操作 | 配置权限、Tag 规则、保留策略等 | 调整副本数、存储路径、故障迁移等 |
实际场景示例
- 场景:电商系统的订单消息(
OrderTopic
)需要支持每秒 10 万条吞吐量。 - 实现:
- 创建
OrderTopic
并设置 32 个队列。 - 将队列分布在 4 个集群的 Broker 节点上(每个集群 8 队列)。
- 消费者组订阅
OrderTopic
,RocketMQ 自动将 32 个队列均分给消费者实例。 - 当某个集群故障时,NameServer 将故障队列的路由指向其他集群的副本队列,保障高可用
- 创建
小结
主题的核心价值在于逻辑抽象,它屏蔽了底层队列的物理复杂性,同时提供业务层面的统一管理入口。队列的分布式存储解决的是性能与可靠性问题,而主题则定义了消息的业务语义与协作规则,两者共同构成 RocketMQ 高并发、高可用的基石。
二.RocketMQ管理命令
1、updateTopic
作用:修改或创建一个Topic
命令:mqadmin updateTopic -b | -c [-h] [-n ] [-o ] [-p ] [-r ] [-s ] -t [-u ] [-w ]
参数:
- -n: name server地址列表
- -c: cluster 名称,表示topic 建在该集群
- -t: 设置topic名称
- -h: 打印help信息
- -o: 设置topic是否为有序的 取值:true、false(默认)
- -p: 设置topic的权限
示例:
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic
2、deleteTopic
作用:从broker和nameserver删除topic
命令:mqadmin deleteTopic -c [-h] [-n ] -t
参数:
- -n: name server地址列表
- -c: cluster 名称,表示topic 建在该集群
- -t: 设置topic名称
- -h: 打印help信息
示例:
mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t testtopic
3、topicList
作用:从nameserver列出所有topic
命令:mqadmin topicList [-c] [-h] [-n ]
参数:
- -n: name server地址列表
- -c: cluster 名称,表示topic 建在该集群
- -h: 打印help信息
示例:
mqadmin topicList -n localhost:9876
4、topicStatus
作用:检查topic的状态信息
命令:mqadmin topicStatus [-h] [-n ] -t
参数:
- -n: name server地址列表
- -c: cluster 名称,表示topic 建在该集群
- -t: 设置topic名称
- -h: 打印help信息
示例
mqadmin topicStatus -n localhost:9876 -t testtopic
5、cleanUnusedTopic
作用:清理未使用的topic
命令:mqadmin cleanUnusedTopic [-b ] [-c ] [-h] [-n ]
参数:
- -n: name server地址列表
- -b: broker地址
- -c: 集群名称
- -h: 打印help信息
6、关闭namesrv和broker服务
mqshutdown namesrv
mqshutdown broker
三.RocketMQ安装与配置
1、首先修改环境变量
vim /etc/profile
# 文件末尾追加改信息
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$PATH:$ROCKETMQ_HOME/bin
# 生效环境变量
source /etc/profile
2、解压文件
unzip rocketmq-all-5.1.4-bin-release.zip
我们上传安装包至opt目录下,使用该命令将其解压至opt目录下,最后使用mv命令对其进行位置移动和改名-----/usr/local/rocketmq
# 解压到当前目录(/opt)
unzip rocketmq-all-5.0.0-bin-release.zip# 将解压后的目录移动到/usr/local
mv rocketmq-all-5.0.0-bin-release /usr/local/# 改名
mv rocketmq-all-5.0.0-bin-release rocketmq
进入bin目录如下
3、启动NameServer
nohup sh mqnamesrv &
4、启动broker
nohup sh ./mqbroker -n localhost:9876 &
使用jps命令查看后发现broker未启动
启动成功后的输出结果
使用 cat nohup.out 查看日志如下
很常见的问题
1、java.lang.IllegalAccessError: class org.apache.rocketmq.common.UtilAll (in unnamed module @0x58acb723) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module
解决:添加启动参数
修改runbroker文件,添加红色参数 $JAVA ${JAVA_OPT} --add-exports=java.base/sun.nio.ch=ALL-UNNAMED $@
2、堆空间初始值太大也报错
修改文件runbroker.sh(将光标位置修改为如下数字即可)
通过上述修改,将初始堆内存512M,最大堆内存设置为512M,新生代(Java中用于存储创建对象的部分)设置为256M,修改完成后便可以正常启动以及查看日志。