前言
上一篇初步认识了RocketMQ,这一篇文章我们简单来搭建一个RocketMQ集群。RocketMQ支持多种集群部署模式,其中最常用的是多主多从的异步复制模式(2m代表两个master,2s代表两个slave,async代表异步刷盘的机制)。今天我们就以这种方式来 搭建。
本篇文章除了集群的搭建,也适合面试使用,在这里希望能帮助到各位正在学习RocketMQ的小伙伴儿们!
一、集群架构
上一篇文章我们介绍了RocketMQ的四大组件,这里我们再重述一下这几个核心集群组件:
- NameServer集群:负责管理整个RocketMQ集群的路由信息。生产者和消费者在启动时会向NameServer注册自己的信息,并从NameServer获取路由信息,以便能够找到相应的Broker。
- Broker集群:负责存储和转发消息,提供负载均衡和故障转移功能。每个Broker都有一个独立的NameServer实例,用于与NameServer通信。Broker分为Master Broker和Slave Broker,Master Broker负责写入消息,而Slave Broker负责备份Master Broker的数据,这样的主从结构保证了消息的高可用性。
- 生产者集群:负责向Broker投递消息,提供负载均衡和故障转移功能。生产者通过连接到NameServer获取队列的元数据信息,然后把消息发送到指定的队列中。
- 消费者集群:负责从Broker中拉取消息并进行处理,提供负载均衡和故障转移功能。消费者通过连接到NameServer获取队列的元数据信息,然后从指定的队列中拉取消息。
RocketMQ集群模式
RocketMQ官方为我们提供了几个集群模式,包括2m-2s-async,2m-2s-sync,2m-noslave等,这些文件都在conf目录下,接下来我们先介绍一下这几种集群模式。
2m-2s-async(多Master多Slave异步复制模式)
RocketMQ官方文档2m-2s-async模式的特点:
-
每个Master配置一个Slave,形成
多对Master-Slave
组合。 -
高可用,采用
异步复制
方式,Master在接收到消息后,会立即向应用返回成功标识,随后异步地将消息复制到Slave。 -
主备之间有短暂的消息延迟,通常是毫秒级别。
2m-2s-async模式的优点
- 即使磁盘损坏,丢失的消息也非常少。
- 消息的实时性不受影响,
因为Master宕机后,消费者仍然可以从Slave消费
。 - 性能与多个Master模式几乎一样。
2m-2s-sync(多Master多Slave同步复制模式)
RocketMQ官方文档2m-2s-sync模式的特点:
- 每个Master配置一个Slave,形成多对Master-Slave组合。
- 用同步双写模式,意思就是消息需要同时写入Master和Slave,都写成功后才向应用返回成功。
2m-2s-sync模式的优缺点:
-
数据和服务都没有单点故障,Master宕机的情况下,消息无延迟,服务可用性与数据可用性都非常高。
-
但是相比异步复制模式,性能会略低。
2m-noslave(多Master无Slave模式)
RocketMQ官方文档2m-noslave模式的特点:
集群中全部为Master节点,没有Slave节点。
2m-noslave模式下的优缺点
-
配置相对简单,毕竟没有从节点嘛。
-
单个Master宕机或重启对应用没有影响。
-
性能最高,因为所有节点都直接参与消息的处理。
-
存在单点故障风险,虽然可以通过RAID这些技术减少磁盘故障的影响,但是整个节点故障时仍然会导致消息处理中断。
-
单个Broker宕机期间,如果这个节点上未被消费的消息在服务器恢复之前不可订阅,消息实时性会受到影响。
环境准备
软件环境 操作系统:CentOS 7
JDK版本:1.8
RocketMQ版本:4.7.1(以最新版本为准,但步骤类似)
IP地址分配:
服务器 | 服务器IP | NameServer | broker部署节点 |
---|---|---|---|
服务器0 | 192.168.220.134 | 192.168.220.134:9876 | |
服务器1 | 192.168.220.136 | 192.168.220.136:9876 | broker-a(master),broker-b-s(slave) |
服务器2 | 192.168.220.135 | 192.168.220.135:9876 | broker-b(master),broker-a-s(slave) |
这里可能会有小伙伴儿有疑问,为何这样分配broker?
之所以把broker-a(master)和broker-a-s(slave) 分配到不同的服务器上,这是因为当一个服务器挂掉后,另一个服务器上的a或者b节点仍然能够继续工作(a和b两个主节点,其中a-s代表a的从节点,b-s代表b的从节点
)!
2m-2s-async模式集群搭建
首先我们先克隆出来两个服务器,这两个服务器的IP不同,一个IP为192.168.220.136
,另一个IP为192.168.220.135
。
启动nameserver
然后启动三台服务器的nameserver,nameserver是⼀个轻量级的注册中心,broker把自己的信息注册到nameserver上。而且,nameserver是⽆状态的,直接启动即可。三台nameserver之间不需要通信,而是被请求⽅来关联三台nameserver的地址。
在启动这三台服务器之前,需要修改三台服务器的的runserver.sh⽂件,和上一篇文章一样,修改JVM内存默认的4g为512m。
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
然后在每台服务器的bin⽬录下执⾏如下命令:
服务器0: nohup ./mqnamesrv -n 192.168.220.134:9876 &
服务器1: nohup ./mqnamesrv -n 192.168.220.135:9876 &
服务器2: nohup ./mqnamesrv -n 192.168.220.136:9876 &
配置broker
前提:broker-a,broker-b-s这两台broker是配置在服务器1上,broker-b,broker-a-s这两台broker是配置在服务器2上。这两对主从节点在不同的服务器上,服务器0上没有部署broker。
首先在启动之前,我们需要修改每一台broker的配置⽂件。这里需要注意,同⼀台服务器上的两个broker保存路径不能⼀样。
首先修改broker-a的master节点
在服务器1上,进入到conf/2m-2s-async文件夹内,修改broker-a.properties文件。
broker-a的master节点
# 所属集群名称
brokerClusterName=DefaultCluster
# broker名字
brokerName=broker-a
# broker所在服务器的ip
brokerIP1=192.168.220.136
# broker的id,0表示master,>0表示slave
brokerId=0
# 删除⽂件时间点,默认在凌晨4点
deleteWhen=04
# ⽂件保留时间为48⼩时
fileReservedTime=48
# broker的⻆⾊为master
brokerRole=ASYNC_MASTER
# 使⽤异步刷盘的⽅式
flushDiskType=ASYNC_FLUSH
# 名称服务器的地址列表
namesrvAddr=192.168.220.136:9876; 192.168.220.134:9876; 192.168.220.135:9876
# 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端⼝
listenPort=10911
# abort⽂件存储路径
abortFile=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/abort
# 消息存储路径
storePathRootDir=/usr/local/src/rocketmq-all-4.7.1-bin-release/store
# commitLog存储路径
storePathCommitLog=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.7.1-bin- release/store/consumequeue
# 消息索引存储路径
storePathIndex= /usr/local/src/rocketmq-all-4.7.1-bin-release/store/index
# checkpoint⽂件存储路径
storeCheckpoint= /usr/local/src/rocketmq-all-4.7.1-bin-release/store/checkpoint
# 限制的消息⼤⼩
maxMessageSize=65536
# commitLog每个⽂件的⼤⼩默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个⽂件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
同样在服务器2上,进⼊到conf/2m-2s-async⽂件夹内,修改broker-b-s.properties文件。
broker-b-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerIP1=192.168.220.136
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.220.136:9876; 192.168.220.134:9876; 192.168.220.135:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=11011
abortFile=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/abort
storePathRootDir=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave
storePathCommitLog=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/commitlog
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/consumequeue
storePathIndex=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/index
storeCheckpoint=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/checkpoint
maxMessageSize=65536
然后在服务器3上,进⼊到conf/2m-2s-async文件夹内,修改broker-a-s.properties文件
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerIP1=192.168.220.135
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.220.135:9876; 192.168.220.136:9876; 192.168.220.134:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=11011
abortFile=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/abort
storePathRootDir=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave
storePathCommitLog=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/commitlog
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/consumequeue
storePathIndex=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/index
storeCheckpoint=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/checkpoint
maxMessageSize=65536
最后在服务器3上,进⼊到conf/2m-2s-async文件夹内,修改broker-b.properties文件
broker-b.properties节点
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerIP1=192.168.220.135
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.220.135:9876; 192.168.220.136:9876; 192.168.220.134:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
abortFile=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/abort
storePathRootDir=/usr/local/src/rocketmq-all-4.7.1-bin-release/store
storePathCommitLog=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/commitlog
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/consumequeue
storePathIndex=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/index
storeCheckpoint=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/checkpoint
maxMessageSize=65536
这些配置文件修改完成后,还需要修改服务器2和服务器3的runbroker.sh文件(需要cd bin)进入到bin目录:修改JVM内存默认的8g为512m。
cd bin
vim runbroker.sh
将文件上面的配置修改为512m
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
启动broker
在服务器2中启动broker-a(master)和broker-b-s(slave)
nohup./mqbroker -c /usr/local/src/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async/broker-a.properties & autoCreateTopicEnable=true
nohup./mqbroker -c /usr/local/src/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async/broker-b-s.properties & autoCreateTopicEnable=true
输入完成后可以使用cat nohup.out
命令查看结果:
在服务器3中启动broker-b(master),broker-a-s(slave)
nohup./mqbroker -c /usr/local/src/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async/broker-b.properties & autoCreateTopicEnable=true
nohup./mqbroker -c /usr/local/src/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async/broker-a-s.properties & autoCreateTopicEnable=true
输入完成后同样可以使用cat nohup.out
命令查看结果:
验证RocketMQ集群
我们可以和上一篇文章一样,使用RocketMQ提供的tools⼯具验证集群是否正常工作。
首先,我们在服务器2上配置环境变量,这个用于被tools中的⽣产者和消费者程序读取该变量。
export NAMESRV_ADDR='192.168.220.134:9876;192.168.220.135:9876;192.168.220.136:9876'
然后使用工具启动⽣产者:
./tools.sh org.apache.rocketmq.example.quickstart.Producer
从图中我们可以看到每一个broker中确实有四个队列在工作,而且消息的发送均为OK状态!
最后使用工具启动消费者:
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
最后在图里也有看到消息都被消费者消费完成。
本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!