RocketMQ学习笔记

kafka适合于日志收集的场景(不需要太多topic;topic下面的partition多了会造成写文件的速度变慢,因为要造很多索引)
RocketMQ更适合于电商场景(适用于topic特别多的情况)

快速安装RocketMQ

RocketMQ的官网地址: http://rocketmq.apache.org ,github地址是 https://github.com/apache/rocketmq 。
最新版本的RocketMQ可以到官网上进行下载。历史版本需要到Github仓库中下载。下载地址:https://github.com/apache/rocketmq/releases。

运行RocketMQ需要先安装JDK。
配置环境变量。使用 vi ~/.bash_profile编辑文件,在下面加入以下内容:

export JAVA_HOME=/app/jdk1.8/
PATH=$JAVA_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin
export PATH

编辑完成后,执行 source ~/.bash_profile让环境变量生效。输入java -version能查看到以下内容表明JDK安装成功了。

[oper@worker1 ~]$ java -version
java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)

然后把下载的压缩包在本地完成解压,并上传到/app/rocketmq目录。完成后,把rocketmq的bin目录也配置到环境变量当中。
vi ~/.bash_profile,加入以下内容,并执行source ~/.bash_profile让环境变量生效:

export JAVA_HOME=/app/jdk1.8/
export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-4.9.1-bin-release
PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin
export PATH
这个ROCKETMQ_HOME的环境变量是必须要单独配置的,如果不配置的话,启动NameSever和Broker都会报错。
这个环境变量的作用是用来加载$ROCKETMQ_HOME/conf下的除broker.conf以外的几个配置文件。
所以实际情况中,可以不按这个配置,但是一定要能找到配置文件

NameServer服务搭建

启动NameServer非常简单, 在$ROCKETMQ_HOME/bin目录下有个mqadminsrv。直接执行这个脚本就可以启动RocketMQ的NameServer服务。但是要注意,RocketMQ默认预设的JVM内存是4G,这是RocketMQ是最佳配置。但是通常用虚拟机的话都是不够4G内存的,所以需要调整下JVM内存大小。修改的方式是直接修改runserver.sh。 用vi runserver.sh编辑这个脚本,在脚本中找到这一行调整内存大小为512M

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -
XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

然后用静默启动的方式启动NameServer服务:

nohup bin/mqnamesrv &

启动完成后,在nohup.out里看到这一条关键日志就是启动成功了。并且使用jps指令可以看到有一个NamesrvStartup进程。

Broker服务搭建

启动Broker的脚本是runbroker.sh。Broker的默认预设内存是8G,启动前,如果内存不够,同样需要调整下JVM内存。vi runbroker.sh,找到这一行,进行内存调整

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"

然后需要找到$ROCKETMQ_HOME/conf/broker.conf, vi指令进行编辑,在最下面加入一个配置:

autoCreateTopicEnable=true

然后也以静默启动的方式启动runbroker.sh

nohup ./mqbroker &

启动完成后,同样是检查nohup.out日志,有这一条关键日志就标识启动成功了。 并且jps指令可以看到一个BrokerStartup进程。

在观察runserver.sh和runbroker.sh时,还可以查看到其他的JVM执行参数,这些参数都可以进行定制。
观察到一个比较有意思的地方,nameServer使用的是CMS垃圾回收器,而Broker使用的是G1垃圾回收器。

命令行启动客户端

在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。在worker2上进入RocketMQ的安装目录:

首先需要配置一个环境变量NAMESRV_ADDR指向启动的NameServer服务。

export NAMESRV_ADDR='localhost:9876'

然后启动消息生产者发送消息:默认会发1000条消息

bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

然后启动消息消费者接收消息:

 bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭RocketMQ服务
# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker

集群架构解析

在这里插入图片描述

一个完整的RocketMQ集群中,有如下几个角色

  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者;举例:收信者
  • Broker:暂存和传输消息;举例:邮局
  • NameServer:管理Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
  • Message Queue:相当于是Topic的分区;用于并行发送和接收消息

RocketMQ集群搭建与优化

准备三台虚拟机,硬盘空间建议大于4G。配置机器名。

#vi /etc/hosts
192.168.232.128 worker1
192.168.232.129 worker2
192.168.232.130 worker3

创建用户–可选

useradd oper
passwd oper (密码输入 123qweasd)

系统配置
免密登录
切换oper用户,在worker1上 生成key

ssh-kengen

然后分发给其他机器

ssh-copy-id worker1
ssh-copy-id worker2
ssh-copy-id worker3

这样就可以在worker1上直接ssh 或者scp到另外的机器,不需要输密码了。

关闭防火墙

systemctl stop firewalld.service
firewall-cmd --state
配置RocketMQ主从集群

搭建一个2主2从异步刷盘的集群,所以会使用conf/2m-2s-async下的配置文件。预备设计的集群情况如下:

机器名nemaeServer节点部署broker节点部署
worker1nameserver
worker2nameserverbroker-a, broker-b-s
worker3nameserverbroker-b,broker-a-s
所以修改的配置文件是进入rocketmq的config目录下修改2m-2s-async的配置文件。主要是配置broker.conf文件。
在rocketmq的config目录下可以看到rocketmq建议的各种配置方式:
2m-2s-async: 2主2从异步刷盘(吞吐量较大,但是消息可能丢失),
2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全),
2m-noslave:2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置。
而dleger就是用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,其他的就都是follower。
通常正式环境都会采用这种方式来搭建集群。
配置第一组broker-a

在worker2上先配置borker-a的master节点。先配置2m-2s-async/broker-a.properties

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=worker1:9876;worker2:9876;worker3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

该节点对应的从节点在worker3上。修改2m-2s-async/broker-a-s.properties

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=worker1:9876;worker2:9876;worker3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/storeSlave
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/storeSlave/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

配置第二组Broker-b
这一组broker的主节点在worker3上,所以需要配置worker3上的config/2m-2sasync/broker-b.properties

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=worker1:9876;worker2:9876;worker3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

然后他对应的slave在worker2上,修改work2上的 conf/2m-2s-async/broker-b-s.properties

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=worker1:9876;worker2:9876;worker3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/storeSlave
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/storeSlave/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

这样2主2从的集群配置基本就完成了。搭建过程中需要注意的配置项:
1、同一机器上两个实例的store目录不能相同,否则会报错 Lock failed,MQ already started
2、同一机器上两个实例的listenPort也不能相同。否则会报端口占用的错nameserver不需要进行配置,直接启动就行。这也看出nameserver是无状态的。
3、如果是多网卡的机器,比如云服务器,那么需要在broker.conf中增加brokerIP1属性,指定所在机器的外网网卡地址。

启动RocketMQ

直接调用bin目录下的脚本就行。只是启动之前要注意看下他们的JVM内存配置,默认的配置都比较高。
1、先启动nameServer
修改三个节点上的bin/runserver.sh,调整里面的jvm内存配置。找到下面这一行调整下内存

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

直接在三个节点上启动nameServer。

nohup bin/mqnamesrv &

启动完成后,在nohup.out里看到这一条关键日志就是启动成功了
在这里插入图片描述
使用jps指令可以看到一个NamesrvStartup进程
2、再启动broker
启动broker是使用的mqbroker指令,只是注意启动broker时需要通过-c 指定对应的配置文件

在worker2上启动broker-a的master节点和broker-b的slave节点

nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &

在work3上启动broker-b的master节点和broker-a的slave节点

nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &

3、启动状态检查
使用jps指令,能看到一个NameSrvStartup进程和两个BrokerStartup进程。
nohup.out中也有启动成功的日志。
对应的日志文件:

# 查看nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log

4、测试mqadmin管理工具
RocketMQ源码中并没有提供管理控制台,只提供了一个mqadmin指令来管理
RocketMQ。指令的位置在bin目录下。直接使用该指令就会列出所有支持的命令。
在这里插入图片描述
使用方式都是 mqadmin {command} {args}。 如果有某个指令不会使用,可以
使用 mqadmin help {command} 指令查看帮助。

5、命令行快速验证
RocketMQ提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服
务。例如,在worker2机器上进入RocketMQ的安装目录:
发送消息:默认会发1000条消息

bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

接收消息:

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
注意,这是官方提供的Demo,但是官方的源码中,这两个类都是没有指定nameServer的,
所以运行会有点问题。要指定NameServer地址,可以配置一个环境变量NAMESRV_ADDR,
这样默认会读取这个NameServer地址。可以配到.bash_profile里或者直接临时指定。
export NAMESRV_ADDR='worker1:9876;worker2:9876;worker3:9876'

这个tooles.sh实际上是封装了一个简单的运行RocketMQ的环境,上面指令中指定的Java类,都在lib/rocketmq-example-4.7.1.jar包中。未来如果有一些客户端示例,也可以打成jar包放到这个lib目录下,通过tools.sh运行。

搭建管理控制台

RocketMQ源代码中并没有提供控制台,但是有一个Rocket的社区扩展项目中提供了一个控制台,地址: https://github.com/apache/rocketmq-dashboard

下载下来后,解压并进入对应的目录,使用maven进行编译

mvn clean package -Dmaven.test.skip=true

编译完成后,获取target下的jar包,就可以直接执行。但是这个时候要注意,在这个项目的application.yml中需要指定nameserver的地址。默认这个属性是指向本地。如果配置为空,会读取环境变量NAMESRV_ADDR。

那可以在jar包的当前目录下增加一个application.yml文件,覆盖jar包中默认的一个属性:

rocketmq:config:namesrvAddrs:- worker1:9876- worker2:9876- worker3:9876

然后执行:

java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar

搭建Dledger高可用集群–了解

通过这种方式,搭建了一个主从结构的RocketMQ集群,但是要注意,这种主从结构是只做数据备份,没有容灾功能的。也就是说当一个master节点挂了后,slave节点是无法切换成master节点继续提供服务的。注意这个集群至少要是3台,允许少于一半的节点发生故障。

如果slave挂了,对集群的影响不会很大,因为slave只是做数据备份的。
但是影响也是会有的,例如,当消费者要拉取的数据量比较大时,
RocketMQ有一定的机制会优先保证Master节点的性能,只让Master节点返回一小部分数据,
而让其他部分的数据从slave节点去拉取。另外,需要注意,Dleger会有他自己的CommitLog机制,也就是说,
使用主从集群累计下来的消息,是无法转移到Dleger集群中的。

而如果要进行高可用的容灾备份,需要采用Dledger的方式来搭建高可用集群。

搭建方法
要搭建高可用的Broker集群,只需要配置conf/dleger下的配置文件就行。

这种模式是基于Raft协议的,是一个类似于Zookeeper的paxos协议的选举协议,也是会在集群中随机选举出一个leader,其他的就是follower。只是他选举的过程跟paxos有点不同。Raft协议基于随机休眠机制的,选举过程会比paxos相对慢一点。

系统参数调优 – 重要

到这里,整个RocketMQ的服务就搭建完成了。但是在实际使用时,RocketMQ的吞吐量、性能都很高,那要发挥RocketMQ的高性能,还需要对RocketMQ以及服务器的性能进行定制
1、配置RocketMQ的JVM内存大小:
之前提到过,在runserver.sh中需要定制nameserver的内存大小,在runbroker.sh中需要定制broker的内存大小。这些默认的配置可以认为都是经过检验的最优化配置,但是在实际情况中都还需要根据服务器的实际情况进行调整。这里以runbroker.sh中对G1GC的配置举例,在runbroker.sh中的关键配置:

JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_broker_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"

-XX:+UseG1GC: 使用G1垃圾回收器, -XX:G1HeapRegionSize=16m 将G1的region块大小设为16M,
-XX:G1ReservePercent:在G1的老年代中预留25%空闲内存,这个默认值是10%,RocketMQ把这个参数调大了。-XX:InitiatingHeapOccupancyPercent=30:当堆内存的使用率达到30%之后就会启动G1垃圾回收器尝试回收垃圾,默认值是45%,RocketMQ把这个参数调小了,也就是提高了GC的频率,但是避免了垃圾对象过多,一次垃圾回收时间太长的问题

然后,后面定制了GC的日志文件,确定GC日志文件的地址、打印的内容以及控制每个日志文件的大小为30M并且只保留5个文件。这些在进行性能检验时,是相当重要的参考内容。

2、RocketMQ的其他一些核心参数
例如在conf/dleger/broker-n0.conf中有一个参数:sendMessageThreadPoolNums=16。这一个参数是表明RocketMQ内部用来发送消息的线程池的线程数量是16个,其实这个参数可以根据机器的CPU核心数进行适当调整,例如如果你的机器核心数超过16个,就可以把这个参数适当调大。

3、Linux内核参数定制
在部署RocketMQ的时候,还需要对Linux内核参数进行一定的定制。例如

  • ulimit,需要进行大量的网络通信和磁盘IO。

  • vm.extra_free_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)

  • vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。

  • vm.max_map_count,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。

  • vm.swappiness,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。

  • File descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。建议设置文件描述符的值为655350。

      这些参数在CentOS7中的配置文件都在 /proc/sys/vm目录下。另外,RocketMQ的bin目录下有个os.sh里面设置了RocketMQ建议的系统内核参数,可以根据情况进行调整。
    

RocketMQ消息转发模型

1 消息模型(Message Model)
​ RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

2 消息生产者(Producer)
​ 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

​ 生产者中,会把同一类Producer组成一个集合,叫做生产者组。同一组的Producer被认为是发送同一类消息且发送逻辑一致。

3 消息消费者(Consumer)
​ 负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

  • 拉取式消费的应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

  • 推动式消费模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

​ 消费者同样会把同一类Consumer组成一个集合,叫做消费者组,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

  • 集群消费模式下, 相同Consumer Group的每个Consumer实例平均分摊消息。
  • 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
    4 主题(Topic)
    ​ 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

​ Topic只是一个逻辑概念,并不实际保存消息。同一个Topic下的消息,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue。MessageQueue是一个具有FIFO特性的队列结构,生产者发送消息与消费者消费消息的最小单位。

5 代理服务器(Broker Server)
​ 消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Broker Server是RocketMQ真正的业务核心,包含了多个重要的子模块:

  • Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
  • Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
  • Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
  • HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
  • Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

而Broker Server要保证高可用需要搭建主从集群架构。RocketMQ中有两种Broker架构模式:

  • 普通集群:
    这种集群模式下会给每个节点分配一个固定的角色,master负责响应客户端的请求,并存储消息。slave则只负责对master的消息进行同步保存,并响应部分客户端的读请求。消息同步方式分为同步同步和异步同步。

这种集群模式下各个节点的角色无法进行切换,也就是说,master节点挂了,这一组Broker就不可用了。

  • Dledger高可用集群:
    Dledger是RocketMQ自4.5版本引入的实现高可用集群的一项技术。这个模式下的集群会随机选出一个节点作为master,而当master节点挂了后,会从slave中自动选出一个节点升级成为master。

Dledger技术做的事情:1、从集群中选举出master节点 2、完成master节点往slave节点的消息同步。

6 名字服务(Name Server)
​ 名称服务充当路由消息的提供者。Broker Server会在启动时向所有的Name Server注册自己的服务信息,并且后续通过心跳请求的方式保证这个服务信息的实时性。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

​ 这种特性也就意味着NameServer中任意的节点挂了,只要有一台服务节点正常,整个路由服务就不会有影响。当然,这里不考虑节点的负载情况。

7 消息(Message)
​ 消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题Topic。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

​ 并且Message上有一个为消息设置的标志,Tag标签。用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/1264.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++进阶:搜索树

目录 1. 二叉搜索树1.1 二叉搜索树的结构1.2 二叉搜索树的接口及其优点与不足1.3 二叉搜索树自实现1.3.1 二叉树结点结构1.3.2 查找1.3.3 插入1.3.4 删除1.3.5 中序遍历 2. 二叉树进阶相关练习2.1 根据二叉树创建字符串2.2 二叉树的层序遍历I2.3 二叉树层序遍历II2.4 二叉树最近…

一、MinIO基本知识

MinIO基本知识 一、简介1.许可 二、部署1.Docker部署1.1 部署容器 1.2 MinIO页面访问1.3 创建Bucket![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/6c8aa92975f146b691f1f36ce1033e7c.png) 三、Python-API1.安装包2.Bucket、Object概念3.Bucket-API4.MinIOClient-…

C++修炼之路之多态--多态的条件与例外,重载+重写+重定义

目录 前言 一:构成多态的条件及一些特殊情况(前提是构成父子类) 1.多态是在不同的继承关系的类对象,去调用同一函数,产生了不同的结果 2.两个条件 3.三同的两个例外 1.协变---返回值类型可以不同,但必…

c++ qt6.5 打包sqlite组件无法使用,尽然 也需要dll支持!这和开发php 有什么区别!

运行 程序会默认使用当前所在文件夹中的 dll 文件,若文件不存在,会使用系统环境变量路径中的文件;又或者是需要在程序源代码中明确指定使用的 dll 的路径。由于我安装 Qt 时将相关 dll 文件路径都添加到了系统环境变量中,所以即使…

【R语言】混合图:小提琴图+箱线图

{ggstatsplot} 是 {ggplot2} 包的扩展,用于创建图形,其中包含信息丰富的绘图本身中包含的统计测试的详细信息。在典型的探索性数据分析工作流程中,数据可视化和统计建模是两个不同的阶段:可视化通知建模,而建模又可以建…

MapReduce工作流程(Hadoop3.x)

MapReduce 是一种用于并行处理大规模数据集的——编程模型和处理框架。它通常用于分布式计算环境中,如Apache Hadoop。 工作流程 1. 切分阶段(Splitting): 数据集被分成多个数据块,每个数据块的大小通常在64MB到12…

kaggle 泰坦尼克使用xgboost 得分0.73684

流程 导入所要使用的包引入kaggle的数据集csv文件查看数据集有无空值填充这些空值提取特征分离训练集和测试集调用模型 导入需要的包 import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import warnings warnings.filterwarni…

zabbix自定义监控、自动发现和注册以及代理设置

前言 监控项的定制和新设备的注册往往需要大量手动操作,这会导致维护成本的增加和监控效率的降低。本文将介绍如何利用 Zabbix 的自定义功能,实现监控项的动态发布和新设备的自动注册以及代理设置、从而简化运维工作并实现更高效的监控管理。 Zabbix 监…

【大模型开源篇1】彦宏您怎么看LLaMA3的开源

Meta LLaMA是Meta公司开源的大模型,作为大模型开源界得鼻祖, 刚刚发布LLaMA3。从ChatGPT 拉开了大模型竞赛的序幕,Meta 选择了开源,至此大模型也开始百花齐放的时期,但是开源模型一直无法超过必源模型,如今…

EA包图上嵌套的包位置不对

Extreme 2024-4-11 11:36 我从工具栏把一个包拖在另一个包里面,可是项目树上两个包的位置并列,拖了几次结果都一样。我的目的是做一个多层级的包图,是不是(EA)不能在图上做? UMLChina潘加宇 确实是这样&a…

Python可视化数据分析-饼状图

一、前言 饼状图(Pie Chart)是一种常用的数据可视化图表,用于展示数据中各部分的占比关系。Python 中有多种库可以用于绘制饼状图,比较常用的包括 matplotlib、pyecharts和 plotly 等。 二、使用 matplotlib 绘制饼状图 import…

必应bing搜索国内广告投放开户价格?

搜索引擎广告作为精准引流的重要手段之一,受到了众多企业的青睐,其中微软旗下的必应搜索(Bing),以其独特的市场定位和用户群体,成为了不可忽视的广告投放平台。对于想要在中国市场利用必应搜索进行广告投放…

局域网无法连接怎么办?

局域网连接是我们日常生活和工作中常用的方式之一,但有时我们可能会遇到局域网无法连接的问题。这给我们的工作和生活带来了很大的困扰。本文将介绍局域网无法连接的常见原因,并推荐一款名为【天联】的组网产品,它能够解决不同地区间的局域网…

Google Earth Engine 洪水制图 - 使用 Sentinel-1 SAR GRD

Sentinel-1 提供从具有双极化功能的 C 波段合成孔径雷达 (SAR) 设备获得的信息。该数据包括地面范围检测 (GRD) 场景,这些场景已通过 Sentinel-1 工具箱进行处理,以创建经过校准和正射校正的产品。该集合每天都会更新,新获得的资产会在可用后两天内添加。 该集合包含所有 G…

Delphi Firemonkey使用TVertScrollbox自定义列表数据

界面布局设置如下 创建一个过程添加新项目 procedure TForm1.AddItem(name: string; age: Integer); varlayout: TLayout; begin// 设置姓名标签的文本Label3.Text : name;// 设置年龄标签的文本Label4.Text : IntToStr(age);// 克隆 Layout1,并将克隆得到的对象赋值…

FastJson2中FastJsonHttpMessageConverter找不到类问题

问题描述 如果你最近也在升级FastJson到FastJson2版本,而跟我一样也遇到了FastJsonHttpMessageConverter找不到类问题以及FastJsonConfig找不到问题,那么恭喜你,看完本文,安装完fastjson2、fastjson2-extension、fastjson2-exte…

STM32H743驱动SD卡(1)

本文内容参考: STM32——SDIO的学习(驱动SD卡)(理论篇)-CSDN博客 STM32个人笔记-SDIO接口-CSDN博客 STM32-(40):SD卡与SDIO-CSDN博客 【STM32】使用SDIO进行SD卡读写(一)-初步认…

使用python-can和cantools实现arxml报文解析、发送和接收的完整指南

文章目录 背景一、硬件支持二、环境准备1、python解释器安装2、python库安装 三、 收发案例四、 方法拓展1、canoe硬件调用2、回调函数介绍 结论 背景 在汽车行业中,CAN (Controller Area Network) 总线是用于车辆内部通信的关键技术。arxml文件是一种用于描述CAN消…

【数据结构】算法效率揭秘:时间与空间复杂度的较量

前言 在计算机科学中,时间复杂度和空间复杂度是衡量算法性能的两个重要指标。它们分别表示算法在执行过程中所需的时间和空间资源。了解这两个概念有助于我们评估和比较不同算法的优劣,从而选择更合适的算法解决问题~ 欢迎关注个人主页:逸狼 …

.github/workflows Actions为项目构建增加手动CI 构建按钮

在Github CI项目的时候, 一般是有push的时候才触发CI构建任务, 今天介绍一种通过 on workflow_dispatch 来增加手动CI构建按钮的方法。 CI构建任务代码示例 .github/workflows/ci.yml name: CIon:push:branches: [develop]pull_request:branches: [dev…