RocketMQ集群搭建,看完这篇文章你就懂了(基于2m-2s-async模式)

前言

上一篇初步认识了RocketMQ,这一篇文章我们简单来搭建一个RocketMQ集群。RocketMQ支持多种集群部署模式,其中最常用的是多主多从的异步复制模式(2m代表两个master,2s代表两个slave,async代表异步刷盘的机制)。今天我们就以这种方式来 搭建。

本篇文章除了集群的搭建,也适合面试使用,在这里希望能帮助到各位正在学习RocketMQ的小伙伴儿们!

在这里插入图片描述

一、集群架构

上一篇文章我们介绍了RocketMQ的四大组件,这里我们再重述一下这几个核心集群组件:

  1. NameServer集群:负责管理整个RocketMQ集群的路由信息。生产者和消费者在启动时会向NameServer注册自己的信息,并从NameServer获取路由信息,以便能够找到相应的Broker。
  2. Broker集群:负责存储和转发消息,提供负载均衡和故障转移功能。每个Broker都有一个独立的NameServer实例,用于与NameServer通信。Broker分为Master Broker和Slave Broker,Master Broker负责写入消息,而Slave Broker负责备份Master Broker的数据,这样的主从结构保证了消息的高可用性。
  3. 生产者集群:负责向Broker投递消息,提供负载均衡和故障转移功能。生产者通过连接到NameServer获取队列的元数据信息,然后把消息发送到指定的队列中。
  4. 消费者集群:负责从Broker中拉取消息并进行处理,提供负载均衡和故障转移功能。消费者通过连接到NameServer获取队列的元数据信息,然后从指定的队列中拉取消息。

在这里插入图片描述

RocketMQ集群模式

RocketMQ官方为我们提供了几个集群模式,包括2m-2s-async,2m-2s-sync,2m-noslave等,这些文件都在conf目录下,接下来我们先介绍一下这几种集群模式。

在这里插入图片描述

2m-2s-async(多Master多Slave异步复制模式)

RocketMQ官方文档2m-2s-async模式的特点:

  • 每个Master配置一个Slave,形成多对Master-Slave组合。

  • 高可用,采用异步复制方式,Master在接收到消息后,会立即向应用返回成功标识,随后异步地将消息复制到Slave。

  • 主备之间有短暂的消息延迟,通常是毫秒级别。

2m-2s-async模式的优点

  • 即使磁盘损坏,丢失的消息也非常少。
  • 消息的实时性不受影响,因为Master宕机后,消费者仍然可以从Slave消费
  • 性能与多个Master模式几乎一样。

2m-2s-sync(多Master多Slave同步复制模式)

RocketMQ官方文档2m-2s-sync模式的特点:

  1. 每个Master配置一个Slave,形成多对Master-Slave组合。
  2. 用同步双写模式,意思就是消息需要同时写入Master和Slave,都写成功后才向应用返回成功。

2m-2s-sync模式的优缺点:

  • 数据和服务都没有单点故障,Master宕机的情况下,消息无延迟,服务可用性与数据可用性都非常高。

  • 但是相比异步复制模式,性能会略低

2m-noslave(多Master无Slave模式)

RocketMQ官方文档2m-noslave模式的特点:

集群中全部为Master节点,没有Slave节点。

2m-noslave模式下的优缺点

  • 配置相对简单,毕竟没有从节点嘛。

  • 单个Master宕机或重启对应用没有影响。

  • 性能最高,因为所有节点都直接参与消息的处理

  • 存在单点故障风险,虽然可以通过RAID这些技术减少磁盘故障的影响,但是整个节点故障时仍然会导致消息处理中断。

  • 单个Broker宕机期间,如果这个节点上未被消费的消息在服务器恢复之前不可订阅,消息实时性会受到影响。

环境准备

软件环境 操作系统:CentOS 7
JDK版本:1.8
RocketMQ版本:4.7.1(以最新版本为准,但步骤类似)

IP地址分配:

服务器服务器IPNameServerbroker部署节点
服务器0192.168.220.134192.168.220.134:9876
服务器1192.168.220.136192.168.220.136:9876broker-a(master),broker-b-s(slave)
服务器2192.168.220.135192.168.220.135:9876broker-b(master),broker-a-s(slave)

这里可能会有小伙伴儿有疑问,为何这样分配broker?

之所以把broker-a(master)和broker-a-s(slave) 分配到不同的服务器上,这是因为当一个服务器挂掉后,另一个服务器上的a或者b节点仍然能够继续工作(a和b两个主节点,其中a-s代表a的从节点,b-s代表b的从节点
)!

2m-2s-async模式集群搭建

首先我们先克隆出来两个服务器,这两个服务器的IP不同,一个IP为192.168.220.136,另一个IP为192.168.220.135

在这里插入图片描述

启动nameserver

然后启动三台服务器的nameserver,nameserver是⼀个轻量级的注册中心,broker把自己的信息注册到nameserver上。而且,nameserver是⽆状态的,直接启动即可。三台nameserver之间不需要通信,而是被请求⽅来关联三台nameserver的地址。

在启动这三台服务器之前,需要修改三台服务器的的runserver.sh⽂件,和上一篇文章一样,修改JVM内存默认的4g为512m。

vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

然后在每台服务器的bin⽬录下执⾏如下命令:
服务器0: nohup ./mqnamesrv -n 192.168.220.134:9876 &
服务器1: nohup ./mqnamesrv -n 192.168.220.135:9876 &
服务器2: nohup ./mqnamesrv -n 192.168.220.136:9876 &

配置broker

前提:broker-a,broker-b-s这两台broker是配置在服务器1上,broker-b,broker-a-s这两台broker是配置在服务器2上。这两对主从节点在不同的服务器上,服务器0上没有部署broker。

首先在启动之前,我们需要修改每一台broker的配置⽂件。这里需要注意,同⼀台服务器上的两个broker保存路径不能⼀样。
首先修改broker-a的master节点

在服务器1上,进入到conf/2m-2s-async文件夹内,修改broker-a.properties文件。

broker-a的master节点

# 所属集群名称
brokerClusterName=DefaultCluster
# broker名字
brokerName=broker-a
# broker所在服务器的ip
brokerIP1=192.168.220.136
# broker的id,0表示master,>0表示slave
brokerId=0
# 删除⽂件时间点,默认在凌晨4点
deleteWhen=04
# ⽂件保留时间为48⼩时
fileReservedTime=48
# broker的⻆⾊为master
brokerRole=ASYNC_MASTER
# 使⽤异步刷盘的⽅式
flushDiskType=ASYNC_FLUSH
# 名称服务器的地址列表
namesrvAddr=192.168.220.136:9876; 192.168.220.134:9876; 192.168.220.135:9876
# 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true 
# broker对外服务的监听端⼝
listenPort=10911
# abort⽂件存储路径
abortFile=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/abort
# 消息存储路径
storePathRootDir=/usr/local/src/rocketmq-all-4.7.1-bin-release/store
# commitLog存储路径
storePathCommitLog=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.7.1-bin- release/store/consumequeue 
# 消息索引存储路径
storePathIndex= /usr/local/src/rocketmq-all-4.7.1-bin-release/store/index 
# checkpoint⽂件存储路径
storeCheckpoint= /usr/local/src/rocketmq-all-4.7.1-bin-release/store/checkpoint
# 限制的消息⼤⼩
maxMessageSize=65536
# commitLog每个⽂件的⼤⼩默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个⽂件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

同样在服务器2上,进⼊到conf/2m-2s-async⽂件夹内,修改broker-b-s.properties文件。

broker-b-s.properties

brokerClusterName=DefaultCluster
brokerName=broker-b
brokerIP1=192.168.220.136
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.220.136:9876; 192.168.220.134:9876; 192.168.220.135:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=11011
abortFile=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/abort
storePathRootDir=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave
storePathCommitLog=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/commitlog
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/consumequeue
storePathIndex=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/index
storeCheckpoint=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/checkpoint
maxMessageSize=65536

然后在服务器3上,进⼊到conf/2m-2s-async文件夹内,修改broker-a-s.properties文件

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerIP1=192.168.220.135
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.220.135:9876; 192.168.220.136:9876; 192.168.220.134:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=11011
abortFile=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/abort
storePathRootDir=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave
storePathCommitLog=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/commitlog
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/consumequeue
storePathIndex=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/index
storeCheckpoint=/usr/local/src/rocketmq-all-4.7.1-bin-release/store-slave/checkpoint
maxMessageSize=65536

最后在服务器3上,进⼊到conf/2m-2s-async文件夹内,修改broker-b.properties文件

broker-b.properties节点

brokerClusterName=DefaultCluster
brokerName=broker-b
brokerIP1=192.168.220.135
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.220.135:9876; 192.168.220.136:9876; 192.168.220.134:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
abortFile=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/abort
storePathRootDir=/usr/local/src/rocketmq-all-4.7.1-bin-release/store
storePathCommitLog=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/commitlog
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/consumequeue
storePathIndex=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/index
storeCheckpoint=/usr/local/src/rocketmq-all-4.7.1-bin-release/store/checkpoint
maxMessageSize=65536

这些配置文件修改完成后,还需要修改服务器2和服务器3的runbroker.sh文件(需要cd bin)进入到bin目录:修改JVM内存默认的8g为512m。

cd bin
vim runbroker.sh

将文件上面的配置修改为512m

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

启动broker

在服务器2中启动broker-a(master)和broker-b-s(slave)

nohup./mqbroker -c /usr/local/src/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async/broker-a.properties & autoCreateTopicEnable=true 
nohup./mqbroker -c /usr/local/src/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async/broker-b-s.properties & autoCreateTopicEnable=true 

输入完成后可以使用cat nohup.out命令查看结果:
在这里插入图片描述

在服务器3中启动broker-b(master),broker-a-s(slave)

nohup./mqbroker -c /usr/local/src/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async/broker-b.properties & autoCreateTopicEnable=true 
nohup./mqbroker -c /usr/local/src/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async/broker-a-s.properties & autoCreateTopicEnable=true 

输入完成后同样可以使用cat nohup.out命令查看结果:

在这里插入图片描述

验证RocketMQ集群

我们可以和上一篇文章一样,使用RocketMQ提供的tools⼯具验证集群是否正常工作。
首先,我们在服务器2上配置环境变量,这个用于被tools中的⽣产者和消费者程序读取该变量。

export NAMESRV_ADDR='192.168.220.134:9876;192.168.220.135:9876;192.168.220.136:9876'

然后使用工具启动⽣产者:

./tools.sh org.apache.rocketmq.example.quickstart.Producer

在这里插入图片描述
从图中我们可以看到每一个broker中确实有四个队列在工作,而且消息的发送均为OK状态!

最后使用工具启动消费者:

./tools.sh org.apache.rocketmq.example.quickstart.Consumer

在这里插入图片描述

最后在图里也有看到消息都被消费者消费完成。

本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/49418.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

乐鑫ESP32-H2设备联网芯片,集成多种安全功能方案,启明云端乐鑫代理商

在数字化浪潮的推动下,物联网正以前所未有的速度融入我们的日常生活。然而,随着设备的激增,安全问题也日益成为公众关注的焦点。 乐鑫ESP32-H2致力于为所有开发者提供高性价比的安全解决方案,这款芯片经过专门设计以集成多种安全…

[Armbian] 部署Docker版Home Assistent,安装HACS并连接米家设备

title: [Armbian] 部署Docker版Home Assistent,安装HACS并连接米家设备 date: 2024-07-21T10:51:23Z lastmod: 2024-07-21T11:40:39Z [Armbian] 部署Docker版Home Assistent,安装HACS并连接米家设备 官网:Home Assistant (home-assistant.i…

FoundationDB 基本使用

目录 一、FoundationDB介绍 二、安装单机版FoundationDB 2.1 下载安装程序 2.2 安装FoundationDB 2.3 修改配置信息 2.4 管理FoundationDB服务 三、fdbcli的常用命令 3.1连接数据库 3.2退出fdbcli 3.3查看版本 3.4 写模式 3.5写入键值 3.6读取键值 3.7删除键值 …

无法连接到internet怎么办?已连接但无internet访问,其实并不难

有时我们会遇到无法连接到Internet的问题,由多种原因引起,包括硬件故障、软件设置问题、网络供应商故障等。本文将介绍无法连接到Internet时可以采取的步骤。 简述 当你无法连接到Internet时,可以按照以下步骤进行检查和解决: 1…

python实现特征检测算法4

python实现Richardson-Lucy 反卷积算法 Richardson-Lucy 反卷积算法算法原理Python实现详细解释Richardson-Lucy算法的优缺点应用领域Richardson-Lucy 反卷积算法 Richardson-Lucy反卷积算法是一种迭代算法,用于恢复因成像系统中的点扩散函数(PSF)导致的模糊图像。该算法最…

TCP系列(一)-介绍TCP

服务 TCP和UDP同样使用IP提供的服务,但是TCP提供的是面向连接,可靠的字节流服务 面向连接 使用TCP进行通信双方,必须先建立连接,然后进行数据交换 可靠服务 将应用数据分割成固定大小的报文段每次发出报文,会启动定时…

分享从零开始学习网络设备配置--任务6.1 实现计算机的安全接入

项目描述 随着网络技术的发展和应用范围的不断扩大,网络已经成为人们日常生活中必不可少的一部分。园区网作为给终端用户提供网络接入和基础服务的应用环境,其存在的网络安全隐患不断显现出来,如非人为的或自然力造成的故障、事故&#xff1b…

昇思25天学习打卡营第09天|使用静态图加速

MindSpore支持两种运行模式:动态图(PyNative模式)和静态图(Graph模式)。 动态图模式下,计算图的构建和计算同时发生,适合调试和开发,但不利于优化。 静态图模式下,计算…

【概率论】-2-概率论公理(Axioms of Probability)

上一篇文章我们学习了基本的概率论内容-排列组合,本次我们学习概率论公理的内容,正式开始计算概率,在开始前我们需要学习一些基本概念。 目录 一.样本空间和事件 1.样本空间 2.事件 3.交并补 二、概率公理 1.基本公理 2.对称差 2.布尔…

vuex的工作流程,模块化使用案例分享,及状态持久化

文章目录 一、Vuex 是什么?二、核心概念三、Vuex 的工作流程四、什么情况下我应该使用 Vuex?五、Vuex 的使用六、使用示例七、状态持久化1、手动利用HTML5的本地存储2、利用vuex-persistedstate插件2.1、安装2.2、配置 一、Vuex 是什么? Vue…

USART串口理论知识总结

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 USART串口理论知识总结 1、通讯的串行和并行1.串口采用发送数据代码并用printf重代码 1、通讯的串行和并行 1.串口采用发送数据代码并用printf重代码 #include <stdint.h…

GPT-4o mini 时代:开发者的机遇、挑战与经验共享

在当今科技飞速发展的时代&#xff0c;OpenAI 最新发布的 GPT-4o mini 模型以其卓越的性能和极具竞争力的价格&#xff0c;在技术领域掀起了一股热潮&#xff0c;引发了广泛的关注。对于广大开发者来说&#xff0c;这无疑是一个令人振奋的新契机。 GPT-4o mini 模型的诞生&…

【C++】流插入和流提取运算符重载

目录 前言ostream和istream自定义类型的流插入重载自定义类型的流提取重载解决私有问题日期类总接口 前言 我们在上一节实现日期类时&#xff0c;在输入和输出打印时&#xff0c;经常会调用两个函数&#xff1a; void Insert()//输入函数{cin >> _year;cin >> _mo…

放眼全局做好真正的IT系统架构

一、系统架构存在的问题 当再次复盘业务架构、应用架构、技术架构、数据架构时这些过程域时&#xff0c;发现公司的这些架构如同一盘散沙。 1、业务架构随意&#xff0c;想到什么做什么&#xff0c;想法一天一个&#xff0c;天马行空。要么就是信息不对称&#xff0c;不统一。…

【Java】/* 浅谈String(下) */

目录 一、字符串的不可变性 二、字符串的修改 三、StringBuilder和StringBuffer 四、面试题 一、字符串的不可变性 1. 如上图所示&#xff0c;String类的是被final修饰的类(不能被继承)&#xff0c;成员变量value值是一个被final修饰的字节型数组。 2. 以下图代码为例&…

JavaWeb笔记_Session

Session概述 Session是一种在服务端记录用户会话信息的技术 Session的创建和获取 /*** HttpServletRequest对象中的方法:* public HttpSession getSession()* 如果当前服务端没有session,那就在服务端新建一个session对象* 如果在服务端有这个session,那么就直…

Java企业微信服务商代开发获取AccessToken示例

这里主要针对的是企业微信服务商代开发模式 文档地址 可以看到里面大致有三种token&#xff0c;一个是服务商的token&#xff0c;一个是企业授权token&#xff0c;还有一个是应用的token 这里面主要有下面几个参数 首先是服务商的 corpid 和 provider_secret &#xff0c;这个可…

mysql常用函数五大类

mysql常用函数 1. 第一类&#xff1a;数值函数1.1 圆周率pi的值1.2 求绝对值1.3 返回数字的符号1.4 开平方&#xff0c;根号1.5 求两个数的余数1.6 截取正数部分1.7 向上取整数1.8 向下取整数1.9 四舍五入函数1.10 随机数函数1.11 数值左边补位函数1.12 数值右边补位函数1.13 次…

83. UE5 RPG 实现属性值的设置

在前面&#xff0c;我们实现了角色升级相关的功能&#xff0c;在PlayerState上记录了角色的等级和经验值&#xff0c;并在变动时&#xff0c;通过委托广播的形式向外广播&#xff0c;然后在UI上&#xff0c;通过监听委托的变动&#xff0c;进行修改等级和经验值。 在这一篇里&a…

鸿蒙开发仓颉语言【Hyperion: 一个支持自定义编解码器的TCP通信框架】组件

Hyperion: 一个支持自定义编解码器的TCP通信框架 特性 支持自定义编解码器高效的ByteBuffer实现&#xff0c;降低请求处理过程中数据拷贝自带连接池支持&#xff0c;支持连接重建、连接空闲超时易于扩展&#xff0c;可以积木式添加IoFilter处理入栈、出栈消息 组件 hyperio…