RocketMQ源码分析 - 环境搭建
- 环境搭建
- 源码拉取
- 导入IDEA
- 调试
- 1) 启动NameServer
- 2) 启动Broker
- 3) 发送消息
- 4) 消费消息
环境搭建
依赖工具
- JDK:1.8+
- Maven
- Intellij IDEA
源码拉取
从官方仓库 https://github.com/apache/rocketmq clone
或者download
源码。
源码目录结构:
- broker:broker模块(broker启动进程)
- client:消息客户端,包含消息生产者、消息消费者相关类
- common:公共包
- dev:开发者信息(非源代码)
- distribution:部署实例文件夹(非源代码)
- example:RocketMQ例代码
- filter:消息过滤相关基础类
- filtersrv:消息过滤服务器实现相关类(Filter启动进程)
- logappender:日志实现相关类
- namesrv:NameServer实现相关类(NameServer启动进程)
- openmessageing:消息开放标准
- remoting:远程通信模块,给予Netty
- srcutil:服务工具类
- store:消息存储实现相关类
- style:checkstyle相关实现
- test:测试相关类
- tools:工具类,监控命令相关实现类
导入IDEA
执行安装
clean install -Dmaven.test.skip=true
。。。。。。
[INFO]
[INFO] Apache RocketMQ 4.5.1 .............................. SUCCESS [ 24.872 s]
[INFO] rocketmq-logging 4.5.1 ............................. SUCCESS [ 3.511 s]
[INFO] rocketmq-remoting 4.5.1 ............................ SUCCESS [ 4.462 s]
[INFO] rocketmq-common 4.5.1 .............................. SUCCESS [ 5.444 s]
[INFO] rocketmq-client 4.5.1 .............................. SUCCESS [ 4.268 s]
[INFO] rocketmq-store 4.5.1 ............................... SUCCESS [ 3.219 s]
[INFO] rocketmq-srvutil 4.5.1 ............................. SUCCESS [ 1.431 s]
[INFO] rocketmq-filter 4.5.1 .............................. SUCCESS [ 1.321 s]
[INFO] rocketmq-acl 4.5.1 ................................. SUCCESS [ 1.082 s]
[INFO] rocketmq-broker 4.5.1 .............................. SUCCESS [ 3.667 s]
[INFO] rocketmq-tools 4.5.1 ............................... SUCCESS [ 2.806 s]
[INFO] rocketmq-namesrv 4.5.1 ............................. SUCCESS [ 1.228 s]
[INFO] rocketmq-logappender 4.5.1 ......................... SUCCESS [ 1.394 s]
[INFO] rocketmq-openmessaging 4.5.1 ....................... SUCCESS [ 1.122 s]
[INFO] rocketmq-example 4.5.1 ............................. SUCCESS [ 1.282 s]
[INFO] rocketmq-test 4.5.1 ................................ SUCCESS [ 1.439 s]
[INFO] rocketmq-distribution 4.5.1 ........................ SUCCESS [ 0.147 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:02 min
[INFO] Finished at: 2024-07-19T08:46:25+08:00
[INFO] Final Memory: 57M/913M
[INFO] ------------------------------------------------------------------------Process finished with exit code 0
调试
创建conf配置文件夹,从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
1) 启动NameServer
-
展开namesrv模块,右键NamesrvStartup.java
-
配置ROCKETMO_HOME
-
重新启动
控制台打印结果
Connected to the target VM, address: '127.0.0.1:65350', transport: 'socket'
The Name Server boot success. serializeType=JSON
2) 启动Broker
- broker.conf配置文件内容
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# namesrvAddr地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 开启客户端创建主题功能
autoCreateTopicEnable=true# 存储路径
storePathRootDir=D:\\work\\mq\\rocketmq-master\\dataDir
# commitLog路径
storePathCommitLog=D:\\work\\mq\\rocketmq-master\\dataDir\\commitLog
# 消息队列存储路径
storePathConsumeQueue=D:\\work\\mq\\rocketmq-master\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=D:\\work\\mq\\rocketmq-master\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=D:\\work\\mq\\rocketmq-master\\dataDir\\checkpoint
# abort文件存储路径
abortFile=D:\\work\\mq\\rocketmq-master\\dataDir\\abort
- 创建数据文件夹dataDir
- 启动BrokerStartup,配置broker.conf和ROCKETMQ_HOME
3) 发送消息
- 进入example模块的org.apache.rocketmq.example.quickstart
- 指定Namesrv地址
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
- 运行main方法,发送消息
4) 消费消息
- 进入example模块的org.apache.rocketmq.example.quickstart
- 指定Namesrv地址
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");```
- 运行main方法,发送消息
```javascript
D:\install\jdk\jdk-8u131-windows-x64\bin\java.exe -javaagent:D:\install\idea\ideaIU-2018.3.5.win\lib\idea_rt.jar=57007:D:\install\idea\ideaIU-2018.3.5.win\bin -Dfile.encoding=UTF-8 -classpath D:\install\jdk\jdk-8u131-windows-x64\jre\lib\charsets.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\deploy.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\access-bridge-64.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\cldrdata.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\dnsns.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\jaccess.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\jfxrt.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\localedata.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\nashorn.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunec.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunjce_provider.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunmscapi.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunpkcs11.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\zipfs.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\javaws.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jce.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jfr.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jfxswt.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jsse.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\management-agent.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\plugin.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\resources.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\rt.jar;D:\work\mq\rocketmq-master\example\target\classes;D:\work\mq\rocketmq-master\client\target\classes;D:\work\mq\rocketmq-master\common\target\classes;C:\Users\muxu\.m2\repository\org\apache\commons\commons-lang3\3.4\commons-lang3-3.4.jar;D:\work\mq\rocketmq-master\srvutil\target\classes;D:\work\mq\rocketmq-master\remoting\target\classes;C:\Users\muxu\.m2\repository\com\alibaba\fastjson\1.2.51\fastjson-1.2.51.jar;C:\Users\muxu\.m2\repository\io\netty\netty-all\4.0.42.Final\netty-all-4.0.42.Final.jar;C:\Users\muxu\.m2\repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;C:\Users\muxu\.m2\repository\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;C:\Users\muxu\.m2\repository\com\google\guava\guava\19.0\guava-19.0.jar;C:\Users\muxu\.m2\repository\ch\qos\logback\logback-classic\1.0.13\logback-classic-1.0.13.jar;C:\Users\muxu\.m2\repository\ch\qos\logback\logback-core\1.0.13\logback-core-1.0.13.jar;C:\Users\muxu\.m2\repository\org\slf4j\slf4j-api\1.7.7\slf4j-api-1.7.7.jar;C:\Users\muxu\.m2\repository\org\javassist\javassist\3.20.0-GA\javassist-3.20.0-GA.jar;C:\Users\muxu\.m2\repository\io\openmessaging\openmessaging-api\0.3.1-alpha\openmessaging-api-0.3.1-alpha.jar;D:\work\mq\rocketmq-master\openmessaging\target\classes;D:\work\mq\rocketmq-master\acl\target\classes;D:\work\mq\rocketmq-master\logging\target\classes;C:\Users\muxu\.m2\repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;C:\Users\muxu\.m2\repository\commons-codec\commons-codec\1.9\commons-codec-1.9.jar org.apache.rocketmq.example.quickstart.Consumer
22:16:16.493 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=2, sysFlag=0, bornTimestamp=1721571319594, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319598, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000000642, commitLogOffset=1602, bodyCRC=193412630, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382412, UNIQ_KEY=0200000111C818B4AAC26BC5B72A000A, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 48], transactionId='null'}]]
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=11, sysFlag=0, bornTimestamp=1721571319797, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319798, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000001F6E, commitLogOffset=8046, bodyCRC=529756006, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382418, UNIQ_KEY=0200000111C818B4AAC26BC5B7F5002E, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52, 54], transactionId='null'}]]
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=3, sysFlag=0, bornTimestamp=1721571319624, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319627, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F000000000000090E, commitLogOffset=2318, bodyCRC=216726031, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382419, UNIQ_KEY=0200000111C818B4AAC26BC5B748000E, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 52], transactionId='null'}]]
ConsumeMessageThread_11 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=6, sysFlag=0, bornTimestamp=1721571319698, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319702, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000001172, commitLogOffset=4466, bodyCRC=1237960928, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382419, UNIQ_KEY=0200000111C818B4AAC26BC5B792001A, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50, 54], transactionId='null'}]]
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=18, sysFlag=0, bornTimestamp=1721571319880, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319881, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000003302, commitLogOffset=13058, bodyCRC=1521507721, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382425, UNIQ_KEY=0200000111C818B4AAC26BC5B848004A, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 52], transactionId='null'}]]
。。。。。。