全面解剖 消息中间件 RocketMQ-(3)
一、RocketMQ – mqadmin 命令介绍
1、mqadmin 管理工具 使用方式
进入 RocketMQ 安装位置,在 bin 目录下执行 ./mqadmin {command} {args}
# 进入 RocketMQ 安装目录的 bin 目录下:
cd /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/bin/# mqadmin 管理工具 使用方式
./mqadmin {command} {args}
2、mqadmin 命令介绍
1)Topic 相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
updateTopic | 创建更新 Topic 配置 | -b | Broker 地址,表示 topic 所在 Broker,只支持单台 Broker,地址为 ip:port |
-c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList查询) | ||
-h- | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
-p | 指定新topic的读写权限(W=2 | ||
-r | 可读队列数(默认为8) | ||
-w | 可写队列数(默认为8) | ||
-t | topic 名称(名称只能使用字符 【^ [a-zA-Z0-9_-]+$) 】 | ||
deleteTopic | 删除 Topic | -h | 打印帮助 |
-n | NameServer服务地址,格式 ip:port | ||
-t | topic 名称(名称只能使用字符 【 ^ [a-zA-Z0-9_-]+$) 】 | ||
topicList | 查看 Topic 列表信息 | -c | 不配置-c只返回 topic 列表,增加-c返回 clusterName, topic,consumerGroup 信息,即 topic 的所属集群和订阅关系,没有参数 |
-n | NameServer服务地址,格式 ip:port | ||
-t | topic 名称 | ||
topicRoute | 查看 Topic 路由信息 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic 名称 | ||
topicStatus | 查看 Topic 消息队列 offset | -h | 打印帮助 |
-n | NameServer服务地址,格式 ip:port | ||
topicClusterList | 查看 Topic 所在集群列表 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
updateTopicPerm | 更新 Topic 读写权限 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
-b | Broker 地址,表示 topic 所在 Broker,只支持单台 Broker,地址为:ip:port | ||
-p | 指定新 topic 的读写权限(W=2 | ||
-c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询,-b 优先,如果没有 -b,则对集群中所有 Broker 执行命令 | ||
updateOrderConf | 从 NameServer 上创建,删除,获取特定命名空间的 kv 配置,目前还未启用 | -t | topic 键 |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
-v | orderConf,值 | ||
-m | method,可选 get,put,delete | ||
allocateMQ | -t | topic 名称 | |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
-v | Broker 地址,表示 topic 所在 Broker,只支持单台 Broker,地址为:ip:port | ||
2)集群相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
clusterList | 查看集群信息,集群,BrokerName,Brokerld, TPS 等信息 | -m | #OutTotalYest,#InTotalToday,#OutTotalToday |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
-i | 打印间隔,单位秒 | ||
-a | amount,每次探测总数,RT=总时间/amount | ||
-s | 消息大小,单位 B | ||
-c | 探测那个集群 | ||
-p | 是否打印格式化日志,以 | ||
-i | 打印间隔,单位秒 | ||
clusterRT | 发送消息检测集群各 BrokerRT,消息发往 ${BrokerName}Topic | -a | amount,每次探测总数,RT=总时间/amount |
-s | 消息大小,单位 B | ||
-c | 探测那个集群 | ||
-p | 是否打印格式化日志,以 | ||
-h | 打印帮助 | ||
-m | 所属机房,打印使用 | ||
-i | 打印间隔,单位秒 | ||
-n | NameServer服务地址,格式 ip:port | ||
3)Broker 相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
updateBrokerConfig | 更新 Broker 配置文件,会修改 Broker.conf | -b | Broker 地址,格式 ip:port |
-c | cluster 名称 | ||
-k | key 值 | ||
-v | value 值 | ||
-h | 打印帮助 | ||
4)消息相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
queryMsgById | 根据 offsetMsgId 查询 msg,如果使用开源控制台,应使用,此命令还有其他参数,具体作用请阅读QueryMsgByIdSubCommand | -i | msgId |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
queryMsgByKey | 根据消息 Key 查询消息 | -k | msgKey |
-t | Topic 名称 | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-b | Broker 名称,(这里需要注意填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到 | ||
queryMsgByOffset | 根据Offset 查询消息 | -i | query 队列 id |
-o | offset 值 | ||
-t | Topic 名称 | ||
-h | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
queryMsgByUniqueKey | 根据 msgId 查询,msgId 不同于 offsetMsgId,区别详见常见运维问题,-g,-d 配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果 | -h | 打印帮助 |
-n | NameServer服务地址,格式 ip:port | ||
-i | uniqe msg id | ||
-g | consumerGroup | ||
-d | clientId | ||
-t | Topic 名称 | ||
checkMsgSendRT | 检测向 topic 发消息的 RT,功能类似 clusterRT | -h | 打印帮助 |
-n | NameServer服务地址,格式 ip:port | ||
-o | offset 值 | ||
-t | Topic 名称 | ||
-a | 探测次数 | ||
sendMessage | 发送一条消息,可以根据配置发往特定 MessageQueue 或普通发送 | -s | 消息大小 |
-h | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
-t | Topic 名称 | ||
-p | body 消息体 | ||
-k | keys | ||
-c | tags | ||
-b | BrokerName | ||
-i | queueId | ||
-h | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
-t | Topic 名称 | ||
5)消费者、消费组相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
consumerProgress | 查看订阅组消费状态,可以查看具体的 client IP 的消息积累量 | -g | 消费者所属组名 |
-s | 是否打印 client IP | ||
-h | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
consumerStatus | 查看消费者状态,包括同一个分组中是否是相同的订阅,分析 ProcessQueue 是否堆积,返回消费者 jstack 结果,内容较多,使用者参见 ConsumerStatusSubCommand | -h | 打印帮助 |
-n | NameServer服务地址,格式 ip:port | ||
-g | consumer group | ||
-i | clientId | ||
-s | 是否执行 jstack | ||
getConsumerStatus | 获取 Consumer 消费进度 | -g | 消费者所属组名 |
-t | 查询主题 | ||
-i | Consumer 客户端 IP | ||
-h | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
updateSubGroup | 更新或创建订阅关系 | -h | 打印帮助 |
-n | NameServer服务地址,格式 ip:port | ||
-b | Broker 地址 | ||
-c | 集群名称 | ||
-g | 消费者分组名称 | ||
-s | 分组是否允许消费 | ||
-m | 是否从最小 offset 开始消费 | ||
-d | 是否是广播模式 | ||
-q | 重试队列数量 | ||
-r | 最大重试次数 | ||
-i | 当 slaveReadEnable 开启时有效,且还未达到从 slave 消费时建议从哪个 Brokerld 消费,可以配置备机 id,主动从备机消费 | ||
-w | 如果 Broker 建议从 slave 消费,配置决定从哪个 slave 消费,配置 Brokerld,例如 | ||
6)连接相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
consumerConnection | 查询 Consumer 的网络连接 | -g | 消费者所属组名 |
-n | NameServer服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
-g | 生产者所属组名 | ||
-t | 主题名称 | ||
producerConnection | 查询 Producer 的网络连 | -n | NameServer服务地址,格式 ip:port |
-h | 打印帮助 | ||
7)NameServer 相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
updateKvConfig | 更新 NameServer 的 kv 配置,目前还未使用 | -s | 命名空间 |
-k | key | ||
-v | value | ||
-n | NameServer服务地址,格式 ip:port | ||
8)其它
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
startMonitoring | 开启监控进程,监控消息误删,重试队列消息数等 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 |
3、使用 mqadmin 命令 注意事项:
- 几乎所有命令都需要配置-n表示NameServer地址,格式为 ip:port。
- 几乎所有命令都可以通过-h获取帮助。
- 如果既有 Broker 地址(-b)配置项又有 clusterName(-c) 配置项,则优先以 Broker 地址执行命令;如果不配置 Broker 地址,则对集群中所有主机执行命令。
二、RocketMQ – console 集群监控平台搭建
1、下载安装 RocketMQ – console 监控平台
RocketMQ 有一个对其扩展的开源项目 incubator-rocketmg-externals,这个项目中有一个子模块叫 rocketmq-console,这个便是管
理控制台项目了,先将 incubator-rocketmq-externals 拉到本地,因为我们需要自己对 rocketmq-console 进行编译打包运行。
下载地址:https://github.com/apache/rocketmq-externals
rocketmq-externals-master.zip
git clone https://github.com/apache/rocketmq-externals
cd rocketmq-console
mvn clean package -Dmaven.test.skip=true
2、解压即安装
/rocketmq-externals-master/rocketmq-console/
3、修改配置文件:在 rocketmq-console 中配置 namesrv 集群地址:
rocketmq.config.namesrvAddr=192.168.25.135:9876;192.168.25.138:9876
4、打包: 在项目的 pom.xml 所在的目录,打开 cmd 命令提示符
# 进入项目目录
cd /rocketmq-externals-master/rocketmq-console/# 打包
mvn clean package -Dmaven.test.skip=true
5、打包完成: 在项目的 pom.xml 所在的目录,会在 target 文件夹下,生成 rocketmq-console-ng-1.0.1.jar
将此 jar 包上传到两个虚拟机(192.168.25.135:9876;192.168.25.138:9876)的 /usr/local/rocketmq/ 下(/usr/soft/)。
6、启动 rocketmq-console 控制台
java -jar rocketmq-console-ng-1.0.0.jar
7、启动成功后,通过浏览器访问:http://localhost:8080 进入 控制台界面
http://192.168.25.135:8080
http://192.168.25.138:8080
三、RocketMQ – 消息发送样例介绍和步骤分析
1、消息发送样例
1.1 导入 MQ 客户端依赖
<dependency><groupid>org.apache.rocketmq</groupId><artifactid>rocketmq-client</artifactid><version>4.4.0</version>
</dependency>
1.2 消息发送者步骤分析
1.创建消息生产者 producer,并制定生产者组名2.指定 Nameserver 地址3.启动 producer4.创建消息对象,指定主题 Topic、Tag 和消息体5.发送消息6.关闭生产者 producer。
1.3 消息消费者步骤分析
1.创建消费者 Consumer,制定消费者组名2.指定 Nameserver 地址3.订阅主题 Topic 和 Tag4.设置回调函数,处理消息5.启动消费者 consumer。
2、打开 idea,创建 rocketmq_demo 的 maven 工程。
--> idea --> File --> New --> Project --> Maven Project SDK: ( 1.8(java version "1.8.0_131" ) --> Next --> Groupld : ( djh.it )Artifactld : ( rocketmq_demo )Version : 1.0-SNAPSHOT--> Name: ( rocketmq_demo )Location: ( \rocketmq_demo\ ) --> Finish
3、在工程 rocketmq_demo (模块)中的 pom.xml 中导入依赖
<dependency><groupid>org.apache.rocketmq</groupId><artifactid>rocketmq-client</artifactid><version>4.4.0</version>
</dependency>
4、在工程 rocketmq_demo (模块)中,创建包结构:
src/main/java/djh/it/mq/rocketmq/base/
src/main/java/djh/it/mq/rocketmq/batch/
src/main/java/djh/it/mq/rocketmq/delay/
src/main/java/djh/it/mq/rocketmq/filter/
src/main/java/djh/it/mq/rocketmq/order/
src/main/java/djh/it/mq/rocketmq/transaction/
四、RocketMQ – 发送同步消息
1、RocketMQ – 发送同步消息
发送同步消息,这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知、短信通知等。
2、在工程 rocketmq_demo (模块)中,创建 发送同步消息类 SyncProducer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\base\producer\SyncProducer.java** 2024-5-24 创建 发送同步消息类 SyncProducer.java*/
package djh.it.mq.rocketmq.base.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;public class SyncProducer {public static void main(String[] args) throws Exception {//1.创建消息生产者 producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定 Nameserver 地址producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.启动 producerproducer.start();for(int i=0; i<10; i++){//4.创建消息对象,指定主题 Topic、Tag 和消息体//参数一:消息主题 Topic, 参数二:消息 Tag, 参数三:消息内容Message msg = new Message("base", "Tag1", ("Hello World"+i).getBytes());//5.发送消息SendResult result = producer.send(msg);//发送状态SendStatus status = result.getSendStatus();//消息 ID //String msgId = result.getMsgId();//消息接受队列 ID //int queueId = result.getMessageQueue().getQueueId();//System.out.println("发送状态:"+result+", 消息ID"+msgId+", 队列"+queueId);System.out.println("发送结果:"+result);TimeUnit.SECONDS.sleep(1); //线程睡1秒}//6.关闭生产者 producer。producer.shutdown(); }
}
3、运行测试类,进行测试。查看发送结果。
五、RocketMQ – 发送异步消息
1、RocketMQ – 发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间的等待 Broker 的响应。
2、在工程 rocketmq_demo (模块)中,创建 发送异步消息类 AsyncProducer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\base\producer\AsyncProducer.java** 2024-5-24 创建 发送异步消息类 AsyncProducer.java*/
package djh.it.mq.rocketmq.base.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;public class AsyncProducer {public static void main(String[] args) throws Exception {//1.创建消息生产者 producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定 Nameserver 地址producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.启动 producerproducer.start();for(int i=0; i<10; i++){//4.创建消息对象,指定主题 Topic、Tag 和消息体//参数一:消息主题 Topic, 参数二:消息 Tag, 参数三:消息内容Message msg = new Message("base", "Tag2", ("Hello World"+i).getBytes());//5.发送 异步 消息producer.send(msg, new SendCallback(){//发送成功的回调函数public void onSuccess(SendResult sendResult){System.out.println("发送结果:"+sendResult);}//发送失败的回调函数public void onException(Throwable e){System.out.println("发送异常:"+e);}});System.out.println("发送结果:"+result);TimeUnit.SECONDS.sleep(1); //线程睡1秒}//6.关闭生产者 producer。producer.shutdown(); }
}
3、运行测试类,进行测试。查看发送结果。
六、RocketMQ – 发送单向消息
1、RocketMQ – 发送单向消息
单向消息这种方式主要用在不特别关心发送结果的场景,例如日志发送。
2、在工程 rocketmq_demo (模块)中,创建 发送单向消息类 OneWayProducer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\base\producer\OneWayProducer.java** 2024-5-24 创建 发送单向消息类 OneWayProducer.java*/
package djh.it.mq.rocketmq.base.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import java.util.concurrent.TimeUnit;public class OneWayProducer {public static void main(String[] args) throws Exception {//1.创建消息生产者 producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定 Nameserver 地址producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.启动 producerproducer.start();for(int i=0; i<3; i++){//4.创建消息对象,指定主题 Topic、Tag 和消息体//参数一:消息主题 Topic, 参数二:消息 Tag, 参数三:消息内容Message msg = new Message("base", "Tag3", ("Hello World 单向消息"+i).getBytes());//5.发送 单向 消息(无返回值)producer.sendOneway(msg);TimeUnit.SECONDS.sleep(5); //线程睡5秒}//6.关闭生产者 producer。producer.shutdown(); }
}
3、运行测试类,进行测试。
七、RocketMQ – 消息消费基本流程
1、消息消费者步骤分析
1.创建消费者 Consumer,制定消费者组名2.指定 Nameserver 地址3.订阅主题 Topic 和 Tag4.设置回调函数,处理消息5.启动消费者 consumer。
2、在工程 rocketmq_demo (模块)中,创建 消息消费类 Consumer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\base\consumer\Consumer.java** 2024-5-24 创建 消息消费类 Consumer.java*/
package djh.it.mq.rocketmq.base.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import java.util.concurrent.TimeUnit;public class Consumer {public static void main(String[] args) throws Exception {//1.创建消费者 Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定 Nameserver 地址consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.订阅主题 Topic 和 Tag//consumer.subscribe("base", "Tag1"); //接收同步消息consumer.subscribe("base", "Tag2"); //接收异步消息前,可以让先发送异步消息。//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently(){//接受消息内容public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){//System.out.println(msgs); //接收到的消息是未转换的字节码for(MessageExt msg : msgs){System.out.println(new String(msg.getBody())); //转换为字符串消息}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者 consumer。consumer.start();}
}
3、运行测试类,进行测试。
八、RocketMQ – 消费者广播模式和负载均衡模式
1、RocketMQ – 消费者 广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。
2、RocketMQ – 消费者 负载均衡模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。
3、在工程 rocketmq_demo (模块)中,修改 消息消费测试类 Consumer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\base\consumer\Consumer.java** 2024-5-24 修改 消息消费类 Consumer.java 添加消费模式。*/
package djh.it.mq.rocketmq.base.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import java.util.concurrent.TimeUnit;public class Consumer {public static void main(String[] args) throws Exception {//1.创建消费者 Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定 Nameserver 地址consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.订阅主题 Topic 和 Tag//consumer.subscribe("base", "Tag1"); //接收同步消息//consumer.subscribe("base", "Tag1 | Tag2"); //接收同步消息 和 异步消息//consumer.subscribe("base", "*"); //接收所有消息consumer.subscribe("base", "Tag1"); //接收同步消息前,可以让先发送同步消息。//添加消费模式 //consumer.setMessageModel(MessageModel.CLUSTERING); //默认是负载均衡模式消费consumer.setMessageModel(MessageModel.BROADCASTING); //广播模式消费//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently(){//接受消息内容public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){//System.out.println(msgs); //接收到的消息是未转换的字节码for(MessageExt msg : msgs){System.out.println(new String(msg.getBody())); //转换为字符串消息}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者 consumer。consumer.start();}
}
4、运行测试类,进行测试。
上一节关联链接请点击:
# 全面解剖 消息中间件 RocketMQ-(2)