RocketMQ mqadmin java springboot python 调用笔记

命令

mqadmin命令列表

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin
The most commonly used mqadmin commands are:updateTopic               Update or create topicdeleteTopic               Delete topic from broker and NameServer.updateSubGroup            Update or create subscription groupsetConsumeMode            Set consume message mode. pull/pop etc.deleteSubGroup            Delete subscription group from broker.updateBrokerConfig        Update broker's configupdateTopicPerm           Update topic permtopicRoute                Examine topic route infotopicStatus               Examine topic Status infotopicClusterList          Get cluster info for topicaddBroker                 Add a broker to specified containerremoveBroker              Remove a broker from specified containerresetMasterFlushOffset    Reset master flush offset in slavebrokerStatus              Fetch broker runtime status dataqueryMsgById              Query Message by IdqueryMsgByKey             Query Message by KeyqueryMsgByUniqueKey       Query Message by Unique keyqueryMsgByOffset          Query Message by offsetqueryMsgTraceById         Query a message traceprintMsg                  Print Message DetailprintMsgByQueue           Print Message DetailsendMsgStatus             Send msg to broker.brokerConsumeStats        Fetch broker consume stats dataproducerConnection        Query producer's socket connection and client versionconsumerConnection        Query consumer's socket connection, client version and subscriptionconsumerProgress          Query consumers's progress, speedconsumerStatus            Query consumer's internal data structurecloneGroupOffset          Clone offset from other group.producer                  Query producer's instances, connection, status, etc.clusterList               List cluster infostopicList                 Fetch all topic list from name serverupdateKvConfig            Create or update KV config.deleteKvConfig            Delete KV config.wipeWritePerm             Wipe write perm of broker in all name server you defined in the -n paramaddWritePerm              Add write perm of broker in all name server you defined in the -n paramresetOffsetByTime         Reset consumer offset by timestamp(without client restart).skipAccumulatedMessage    Skip all messages that are accumulated (not consumed) currentlyupdateOrderConf           Create or update or delete order confcleanExpiredCQ            Clean expired ConsumeQueue on broker.deleteExpiredCommitLog    Delete expired CommitLog filescleanUnusedTopic          Clean unused topic on broker.startMonitoring           Start MonitoringstatsAll                  Topic and Consumer tps statsallocateMQ                Allocate MQcheckMsgSendRT            Check message send response timeclusterRT                 List All clusters Message Send RTgetNamesrvConfig          Get configs of name server.updateNamesrvConfig       Update configs of name server.getBrokerConfig           Get broker config by cluster or special brokergetConsumerConfig         Get consumer config by subscription group namequeryCq                   Query cq command.sendMessage               Send a messageconsumeMessage            Consume messageupdateAclConfig           Update acl config yaml file in brokerdeleteAclConfig           Delete Acl Config Account in brokerclusterAclConfigVersion   List all of acl config version information in clusterupdateGlobalWhiteAddr     Update global white address for acl Config File in brokergetAclConfig              List all of acl config information in clusterupdateStaticTopic         Update or create static topic, which has fixed number of queuesremappingStaticTopic      Update or create static topic, which has fixed number of queuesexportMetadata            Export metadataexportConfigs             Export configsexportMetrics             Export metricshaStatus                  Fetch ha runtime status datagetSyncStateSet           Fetch syncStateSet for target brokersgetBrokerEpoch            Fetch broker epoch entriesgetControllerMetaData     Get controller cluster's metadatagetControllerConfig       Get controller config.updateControllerConfig    Update controller config.electMaster               Re-elect the specified broker as mastercleanBrokerMetadata       Clean metadata of broker on controllerdumpCompactionLog         parse compaction log to messagegetColdDataFlowCtrInfo    get cold data flow ctr infoupdateColdDataFlowCtrGroupConfig addOrUpdate cold data flow ctr group configremoveColdDataFlowCtrGroupConfig remove consumer from cold ctr configsetCommitLogReadAheadMode set read ahead mode for all commitlog files

topicList

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicList  -n localhost:9876
%RETRY%please_rename_unique_group_name
RMQ_SYS_TRANS_HALF_TOPIC
stringRequestTopic
%RETRY%objectRequestConsumer
%RETRY%please_rename_unique_group_name_4
TRANS_CHECK_MAX_TIME_TOPIC
BenchmarkTest
%RETRY%genericRequestConsumer
string-topic
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
%RETRY%string_consumer_newns
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
localhost.localdomain
order-paid-topic
%RETRY%my-group1
user-topic
%RETRY%string_trans_consumer
message-ext-topic
OFFSET_MOVED_EVENT
%RETRY%user_consumer
%RETRY%order-paid-consumer
yeqiang-MS-7B23
DefaultCluster
spring-transaction-topic
%RETRY%stringRequestConsumer
bytesRequestTopic
%RETRY%string_consumer
%RETRY%bytesRequestConsumer
%RETRY%rocketmq-consume-demo-message-ext-consumer

statsAll

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin statsAll  -n localhost:9876 
#Topic                                                            #Consumer Group                                                  #Accumulation      #InTPS     #OutTPS   #InMsg24Hour  #OutMsg24Hour
RMQ_SYS_TRANS_HALF_TOPIC                                          CID_RMQ_SYS_TRANS                                                           0        0.00        0.00              0              0
stringRequestTopic                                                stringRequestConsumer                                                       1        0.00        0.00              0              0
TRANS_CHECK_MAX_TIME_TOPIC                                                                                                                    0        0.00                          0    NO_CONSUMER
BenchmarkTest                                                                                                                                 0        0.00                          0    NO_CONSUMER
string-topic                                                      string_consumer                                                           106        0.00        0.00              0              0
string-topic                                                      string_consumer_newns                                                      63        0.00        0.00              0              0
TBW102                                                                                                                                        0        0.00                          0    NO_CONSUMER
rmq_sys_REVIVE_LOG_DefaultCluster                                                                                                             0        0.00                          0    NO_CONSUMER
SELF_TEST_TOPIC                                                                                                                               0        0.00                          0    NO_CONSUMER
SCHEDULE_TOPIC_XXXX                                                                                                                           0        0.00                          0    NO_CONSUMER
DefaultCluster_REPLY_TOPIC                                                                                                                    0        0.00                          0    NO_CONSUMER
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23                                                                                                    0        0.00                          0    NO_CONSUMER
RMQ_SYS_TRANS_OP_HALF_TOPIC                                       CID_RMQ_SYS_TRANS                                                           0        0.00        0.00              0              0
TopicTest                                                         please_rename_unique_group_name                                           252        0.00        0.00              0              0
TopicTest                                                         please_rename_unique_group_name_4                                           0        0.00        0.00              0              0
localhost.localdomain                                                                                                                         0        0.00                          0    NO_CONSUMER
order-paid-topic                                                  order-paid-consumer                                                         1        0.00        0.00              0              0
user-topic                                                        user_consumer                                                               2        0.00        0.00              0              0
message-ext-topic                                                 rocketmq-consume-demo-message-ext-consumer                                  2        0.00        0.00              0              0
OFFSET_MOVED_EVENT                                                                                                                            0        0.00                          0    NO_CONSUMER
yeqiang-MS-7B23                                                                                                                               0        0.00                          0    NO_CONSUMER
DefaultCluster                                                                                                                                0        0.00                          0    NO_CONSUMER
spring-transaction-topic                                          string_trans_consumer                                                      15        0.00        0.00              0              0
bytesRequestTopic                                                 bytesRequestConsumer                                                        0        0.00        0.00              0              0

topicStatus

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus  -n localhost:9876 -t string-topic
#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated
yeqiang-MS-7B23                   0     0                     35                      2023-08-25 16:21:35,786
yeqiang-MS-7B23                   1     0                     52                      2023-08-25 14:55:57,152
yeqiang-MS-7B23                   2     0                     33                      2023-08-25 16:21:35,646
yeqiang-MS-7B23                   3     0                     42                      2023-08-25 14:55:57,172
yeqiang-MS-7B23                   4     0                     1                       2023-08-25 16:21:34,355
yeqiang-MS-7B23                   5     0                     1                       2023-08-25 14:55:57,105
yeqiang-MS-7B23                   6     0                     4                       2023-08-25 16:23:01,489
yeqiang-MS-7B23                   7     0                     1                       2023-08-25 16:21:36,186

Python 生产者:producer.py

from rocketmq.client import Producer, MessagegroupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
TAGS = "tag-my-group1"
KEYS = "key-my-group1-0"
# 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1
producer = Producer(groupName)
# 设置服务地址
producer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# producer.set_session_credentials(
#     accessKey,  # 角色密钥
#     secretKey,  # 角色名称
#     ''
# )
# 启动生产者
producer.start()# 组装消息   topic名称,在控制台 topic 页面复制。
msg = Message(topicName)
# 设置keys
msg.set_keys(TAGS)
# 设置tags
msg.set_tags(KEYS)
# 消息内容
msg.set_body('This is a new message.')# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 资源释放
producer.shutdown()

运行

yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ source /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/activate
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012857767267388CFD61230000 35
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ 

mqadmin查询topic状态

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus  -n localhost:9876 -t string-topic
#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated
yeqiang-MS-7B23                   0     0                     36                      2023-08-28 09:03:35,722
yeqiang-MS-7B23                   1     0                     52                      2023-08-25 14:55:57,152
yeqiang-MS-7B23                   2     0                     33                      2023-08-25 16:21:35,646
yeqiang-MS-7B23                   3     0                     42                      2023-08-25 14:55:57,172
yeqiang-MS-7B23                   4     0                     1                       2023-08-25 16:21:34,355
yeqiang-MS-7B23                   5     0                     1                       2023-08-25 14:55:57,105
yeqiang-MS-7B23                   6     0                     4                       2023-08-25 16:23:01,489
yeqiang-MS-7B23                   7     0                     1                       2023-08-25 16:21:36,186

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicRoute  -n localhost:9876 -t string-topic
{"brokerDatas":[{"brokerAddrs":{0:"10.47.76.67:10911"},"brokerName":"yeqiang-MS-7B23","cluster":"DefaultCluster","enableActingMaster":false}],"filterServerTable":{},"queueDatas":[{"brokerName":"yeqiang-MS-7B23","perm":6,"readQueueNums":8,"topicSysFlag":0,"writeQueueNums":8}]
}

图形工具rocketmq-dashborad

https://github.com/apache/rocketmq-dashboard

自行编译

mvn clean package -Dmaven.test.skip=true

启动

java -Drocketmq.namesrv.addr=127.0.0.1:9876 -jar target/rocketmq-dashboard-1.0.0.jar

 

 

(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012DF4226307248D16C3250000 36

 

 consoumer.py

import time
from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
KEYS = "key-my-group1-0"def callback(msg):# 模拟业务print('Received message. messageId: ', msg.id, ' body: ', msg.body)# 消费成功回复CONSUME_SUCCESSreturn ConsumeStatus.CONSUME_SUCCESS# 消费成功回复消息状态# return ConsumeStatus.RECONSUME_LATER# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# consumer.set_session_credentials(
#     accessKey,	 # 角色密钥
#     secretKey,   # 角色名称
#     ''
# )
# 订阅topic
consumer.subscribe(topicName, callback, "*")
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()while True:time.sleep(3600)
# 资源释放
consumer.shutdown()

启动python消费者

(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/consumer.py[Consumer] Waiting for messages.
Received message. messageId:  7F0001012DF4226307248D16C3250000  body:  b'This is a new message.'

可以看到my-group1已被消费 

再启动一个consumer.py,产生一次消息

可以看到,只有一个consumer消费到了消息,说明默认情况下,消息非广播模式。

Java生产一个消息:

training: Java SpringBoot SpringCloud k8s等练习程序 - Gitee.com

 

 

 

python rocketmq依赖

Release rocketmq-client-cpp-2.1.0 · apache/rocketmq-client-cpp · GitHub

python完整程序

python-rocketmq-demo: python3 rocketmq5 的一个例子

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/58333.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java集合sort排序报错UnsupportedOperationException处理

文章目录 报错场景排查解决UnmodifiableList类介绍 报错场景 我们使用的是PostgreSQL数据库,存储业务数据,业务代码使用的是Spring JPA我们做的是智慧交通信控平台,有个功能是查询展示区域的交通态势,需要按照不同维度排序展示区…

执行jmeter端口不够用报错(Address not available)

执行jmeter端口不够用报错(Address not available) linux解决方案 // 增加本地端口范围 echo 1024 65000 > /proc/sys/net/ipv4/ip_local_port_range// 启用快速回收TIME_WAIT套接字 sudo sysctl -w net.ipv4.tcp_tw_recycle1// 启用套接字的重用 sudo sysctl -w net.ipv4.t…

循环结构(个人学习笔记黑马学习)

while循环语句 在屏幕中打印0~9这十个数字 #include <iostream> using namespace std;int main() {int i 0;while (i < 10) {cout << i << endl;i;}system("pause");return 0; } 练习案例: 猜数字 案例描述:系统随机生成一个1到100之间的数字&…

远程教育:别催了,在线巡课真爽啊

随着远程教育和在线学习的兴起&#xff0c;教学活动的场景正逐渐从传统的实体教室转向虚拟的线上平台&#xff0c;这也催生了对教学质量监管的新需求。 在线巡课系统在这一背景下应运而生&#xff0c;它不仅能够实时观察教师的教学过程&#xff0c;还能够量化评估教学效果&…

ADS 错误 1808可能原因 ADSError 1808

​ 调试问题记录&#xff1a; 背景&#xff1a; Ads调试时遇到错误&#xff0c;返回码是 1808&#xff0c;查询倍福官网 得出1808错误原因是 symbol not found 原因&#xff1a; ADSError: symbol not found (1808). Possible incorrect runtime port selected 可能是ads的地…

极氪汽车的云资源治理细探

作者&#xff1a;极氪汽车吴超 前言 2021 年&#xff0c;极氪 001 迅速崭露头角&#xff0c;仅用 110 天便创下了首款车型交付量“最快破万”的纪录。2022 年 11 月&#xff0c;极氪 009 在短短 76 天内便率先完成了首批交付&#xff0c;刷新了中国豪华纯电品牌交付速度的纪录…

内网实战1

1、信息收集&#xff1a; 使用nmap做端口扫描&#xff1a; nmap -sV -Pn -T4 192.168.26.174重要端口&#xff1a;80、445、139、135、3306 目录扫描&#xff1a; 访问80端口&#xff1a;发现一个网站是phpstudy搭建的&#xff1b; 发现一个mysql数据库&#xff0c;那我们…

11-Manager 和 模型Model

准备工作: 一. Manager 库: Manager: 用于管理相关操作端命令和使用相关操作端命令 (1). 安装flask-script: pip install flask-script2.0.3 (2). 在app.py中 包装 app from apps import create_app# Manager类用于管理相关操作端命令和使用相关操作端命令 from flask_scrip…

Unity实现UI图片面板滚动播放效果第二弹

效果&#xff1a; 场景结构&#xff1a; 特殊物体&#xff1a;panel下面用排列组件horizent layout group放置多个需要显示的面板&#xff0c;用mask遮罩好。 主要思路&#xff1a; 这次是要在最后一个toggle的地方&#xff0c;依然向左滚动回1&#xff0c;这是难点。因此实际…

CTFhub-sqli注入-报错注入

用到的函数 updatexml(1&#xff0c; &#xff0c;1) concat(0x7e, ,0x7e) group_concat(目标值) right(&#xff0c;32) 1 1 1 union select updatexml(1,concat(0x7e,database(),0x7e),1) 1 union select updatexml(1,concat(0x7e,(select(group_concat(ta…

ZIP压缩文件如何设置密码保护?

想要保护ZIP压缩文件&#xff0c;我们可以设置打开密码&#xff0c;下面来说说设置ZIP文件密码的两种方法。 方法一&#xff1a;单次设置打开密码 如果只需要对当前的ZIP压缩包进行加密&#xff0c;我们可以单独设置这个压缩包的密码。 使用WinRAR压缩文件的时候可以同时设置…

Redis下载与安装

文章目录 Redis简介下载&#xff0c;安装和配置&#xff08;cmd&#xff09;图形化工具 Redis 简介 下载&#xff0c;安装和配置&#xff08;cmd&#xff09; 开启redis服务 1.在解压出来的文件夹中打开cmd 2.输入 redis-server.exe redis.windows.conf即可开启服务 可以看到…

ElasticSearch总结

ES是什么 ES是一个天生支持分布式的搜索、聚合分析的存储引擎 基于Java开发 基于Lucene的开源分布式搜索引擎 ELK &#xff1a; elasticSearch Logstah Kibana 加入 Beats 后 ELK 改为 &#xff1a;Elastic stack ES解决了什么问题 ES解决的核心问题 &#xff1a; 1.海量数…

Linux Day11---mbash项目(二)

观看本文之前请先阅读Linux Day10的相关内容 1.touch 1.1 open系统调用 int open(const char*path,int oflags,mode_t mode); oflags参数&#xff1a; O_APPEND:把写入数据追加在文件的末尾 O_TRUNC:把文件长度设置为0&#xff0c;丢弃已有的内容 O_CREAT:如果需要&#…

如何自己实现一个丝滑的流程图绘制工具(一)vue如何使用

背景 项目需求突然叫我实现一个类似processOn一样的在线流程图绘制工具。 这可难倒我了&#xff0c;立马去做调研&#xff0c;在github上找了很多个开源的流程图绘制工具&#xff0c; 对比下来我还是选择了 bpmn-js 原因&#xff1a; 1、他的流程图是涉及到业务的&#xff0c…

idea上利用JDBC连接MySQL数据库(8.1.0版)

1.了解jdbc概念 JDBC(Java DataBase Connectivity,java数据库连接)是一种用于执行SQL语句的Java API&#xff0c;可以为多种 关系数据库提供统一访问&#xff0c;它由一组用Java语言编写的类和接口组成。JDBC提供了一种基准&#xff0c;据此可以构建 更高级的工具和接口&#…

顺序表链表OJ题(3)——【数据结构】

W...Y的主页 &#x1f60a; 代码仓库分享 &#x1f495; 前言&#xff1a; 今天是链表顺序表OJ练习题最后一次分享&#xff0c;每一次的分享题目的难度也再有所提高&#xff0c;但是我相信大家都是非常机智的&#xff0c;希望看到博主文章能学到东西的可以一键三连关注一下博主…

Leetcode刷题:395. 至少有 K 个重复字符的最长子串、823. 带因子的二叉树

Leetcode刷题:395. 至少有 K 个重复字符的最长子串、823. 带因子的二叉树 1. 395. 至少有 K 个重复字符的最长子串算法思路参考代码和运行结果 2. 823. 带因子的二叉树算法思路参考代码和运行结果 1. 395. 至少有 K 个重复字符的最长子串 题目难度&#xff1a;中等 标签&#…

ubuntu 22.04 LTS openai triton 安装

第一种方法&#xff1a; pip install triton 第二种方法&#xff0c;安装最新的版本&#xff1a; pip install -U --index-url https://aiinfra.pkgs.visualstudio.com/PublicPackages/_packaging/Triton-Nightly/pypi/simple/ triton-nightly 第三种方法&#xff1a; git c…

大模型+学习机,是概念游戏还是双向奔赴?

众所周知&#xff0c;2023年上半年大模型概念炙手可热。各大科技公司纷纷卷入&#xff0c;或宣称布局相关领域&#xff0c;或率先官宣自研大模型。而随着资本市场对大模型概念的热情有所消退&#xff0c;属于这片战场的新一轮角逐慢慢聚焦在了技术的落地应用上。 8月15日&#…