RocketMQ源码分析 - 环境搭建

RocketMQ源码分析 - 环境搭建

    • 环境搭建
      • 源码拉取
      • 导入IDEA
      • 调试
        • 1) 启动NameServer
        • 2) 启动Broker
        • 3) 发送消息
        • 4) 消费消息

环境搭建

依赖工具

  • JDK:1.8+
  • Maven
  • Intellij IDEA

源码拉取

从官方仓库 https://github.com/apache/rocketmq clone或者download源码。
在这里插入图片描述
源码目录结构:

  • broker:broker模块(broker启动进程)
  • client:消息客户端,包含消息生产者、消息消费者相关类
  • common:公共包
  • dev:开发者信息(非源代码)
  • distribution:部署实例文件夹(非源代码)
  • example:RocketMQ例代码
  • filter:消息过滤相关基础类
  • filtersrv:消息过滤服务器实现相关类(Filter启动进程)
  • logappender:日志实现相关类
  • namesrv:NameServer实现相关类(NameServer启动进程)
  • openmessageing:消息开放标准
  • remoting:远程通信模块,给予Netty
  • srcutil:服务工具类
  • store:消息存储实现相关类
  • style:checkstyle相关实现
  • test:测试相关类
  • tools:工具类,监控命令相关实现类

导入IDEA

在这里插入图片描述
执行安装

clean install -Dmaven.test.skip=true

在这里插入图片描述

。。。。。。
[INFO] 
[INFO] Apache RocketMQ 4.5.1 .............................. SUCCESS [ 24.872 s]
[INFO] rocketmq-logging 4.5.1 ............................. SUCCESS [  3.511 s]
[INFO] rocketmq-remoting 4.5.1 ............................ SUCCESS [  4.462 s]
[INFO] rocketmq-common 4.5.1 .............................. SUCCESS [  5.444 s]
[INFO] rocketmq-client 4.5.1 .............................. SUCCESS [  4.268 s]
[INFO] rocketmq-store 4.5.1 ............................... SUCCESS [  3.219 s]
[INFO] rocketmq-srvutil 4.5.1 ............................. SUCCESS [  1.431 s]
[INFO] rocketmq-filter 4.5.1 .............................. SUCCESS [  1.321 s]
[INFO] rocketmq-acl 4.5.1 ................................. SUCCESS [  1.082 s]
[INFO] rocketmq-broker 4.5.1 .............................. SUCCESS [  3.667 s]
[INFO] rocketmq-tools 4.5.1 ............................... SUCCESS [  2.806 s]
[INFO] rocketmq-namesrv 4.5.1 ............................. SUCCESS [  1.228 s]
[INFO] rocketmq-logappender 4.5.1 ......................... SUCCESS [  1.394 s]
[INFO] rocketmq-openmessaging 4.5.1 ....................... SUCCESS [  1.122 s]
[INFO] rocketmq-example 4.5.1 ............................. SUCCESS [  1.282 s]
[INFO] rocketmq-test 4.5.1 ................................ SUCCESS [  1.439 s]
[INFO] rocketmq-distribution 4.5.1 ........................ SUCCESS [  0.147 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:02 min
[INFO] Finished at: 2024-07-19T08:46:25+08:00
[INFO] Final Memory: 57M/913M
[INFO] ------------------------------------------------------------------------Process finished with exit code 0

调试

创建conf配置文件夹,从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
在这里插入图片描述

1) 启动NameServer
  • 展开namesrv模块,右键NamesrvStartup.java
    在这里插入图片描述

  • 配置ROCKETMO_HOME
    在这里插入图片描述
    在这里插入图片描述

  • 重新启动
    控制台打印结果

Connected to the target VM, address: '127.0.0.1:65350', transport: 'socket'
The Name Server boot success. serializeType=JSON

在这里插入图片描述

2) 启动Broker
  • broker.conf配置文件内容

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# namesrvAddr地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 开启客户端创建主题功能
autoCreateTopicEnable=true# 存储路径
storePathRootDir=D:\\work\\mq\\rocketmq-master\\dataDir
# commitLog路径
storePathCommitLog=D:\\work\\mq\\rocketmq-master\\dataDir\\commitLog
# 消息队列存储路径
storePathConsumeQueue=D:\\work\\mq\\rocketmq-master\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=D:\\work\\mq\\rocketmq-master\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=D:\\work\\mq\\rocketmq-master\\dataDir\\checkpoint
# abort文件存储路径
abortFile=D:\\work\\mq\\rocketmq-master\\dataDir\\abort
  • 创建数据文件夹dataDir
  • 启动BrokerStartup,配置broker.conf和ROCKETMQ_HOME
    在这里插入图片描述
    在这里插入图片描述
3) 发送消息
  • 进入example模块的org.apache.rocketmq.example.quickstart
  • 指定Namesrv地址
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
  • 运行main方法,发送消息

4) 消费消息
  • 进入example模块的org.apache.rocketmq.example.quickstart
  • 指定Namesrv地址
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");```
- 运行main方法,发送消息
```javascript
D:\install\jdk\jdk-8u131-windows-x64\bin\java.exe -javaagent:D:\install\idea\ideaIU-2018.3.5.win\lib\idea_rt.jar=57007:D:\install\idea\ideaIU-2018.3.5.win\bin -Dfile.encoding=UTF-8 -classpath D:\install\jdk\jdk-8u131-windows-x64\jre\lib\charsets.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\deploy.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\access-bridge-64.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\cldrdata.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\dnsns.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\jaccess.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\jfxrt.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\localedata.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\nashorn.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunec.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunjce_provider.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunmscapi.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunpkcs11.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\zipfs.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\javaws.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jce.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jfr.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jfxswt.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jsse.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\management-agent.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\plugin.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\resources.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\rt.jar;D:\work\mq\rocketmq-master\example\target\classes;D:\work\mq\rocketmq-master\client\target\classes;D:\work\mq\rocketmq-master\common\target\classes;C:\Users\muxu\.m2\repository\org\apache\commons\commons-lang3\3.4\commons-lang3-3.4.jar;D:\work\mq\rocketmq-master\srvutil\target\classes;D:\work\mq\rocketmq-master\remoting\target\classes;C:\Users\muxu\.m2\repository\com\alibaba\fastjson\1.2.51\fastjson-1.2.51.jar;C:\Users\muxu\.m2\repository\io\netty\netty-all\4.0.42.Final\netty-all-4.0.42.Final.jar;C:\Users\muxu\.m2\repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;C:\Users\muxu\.m2\repository\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;C:\Users\muxu\.m2\repository\com\google\guava\guava\19.0\guava-19.0.jar;C:\Users\muxu\.m2\repository\ch\qos\logback\logback-classic\1.0.13\logback-classic-1.0.13.jar;C:\Users\muxu\.m2\repository\ch\qos\logback\logback-core\1.0.13\logback-core-1.0.13.jar;C:\Users\muxu\.m2\repository\org\slf4j\slf4j-api\1.7.7\slf4j-api-1.7.7.jar;C:\Users\muxu\.m2\repository\org\javassist\javassist\3.20.0-GA\javassist-3.20.0-GA.jar;C:\Users\muxu\.m2\repository\io\openmessaging\openmessaging-api\0.3.1-alpha\openmessaging-api-0.3.1-alpha.jar;D:\work\mq\rocketmq-master\openmessaging\target\classes;D:\work\mq\rocketmq-master\acl\target\classes;D:\work\mq\rocketmq-master\logging\target\classes;C:\Users\muxu\.m2\repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;C:\Users\muxu\.m2\repository\commons-codec\commons-codec\1.9\commons-codec-1.9.jar org.apache.rocketmq.example.quickstart.Consumer
22:16:16.493 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=2, sysFlag=0, bornTimestamp=1721571319594, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319598, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000000642, commitLogOffset=1602, bodyCRC=193412630, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382412, UNIQ_KEY=0200000111C818B4AAC26BC5B72A000A, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 48], transactionId='null'}]] 
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=11, sysFlag=0, bornTimestamp=1721571319797, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319798, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000001F6E, commitLogOffset=8046, bodyCRC=529756006, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382418, UNIQ_KEY=0200000111C818B4AAC26BC5B7F5002E, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52, 54], transactionId='null'}]] 
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=3, sysFlag=0, bornTimestamp=1721571319624, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319627, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F000000000000090E, commitLogOffset=2318, bodyCRC=216726031, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382419, UNIQ_KEY=0200000111C818B4AAC26BC5B748000E, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 52], transactionId='null'}]] 
ConsumeMessageThread_11 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=6, sysFlag=0, bornTimestamp=1721571319698, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319702, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000001172, commitLogOffset=4466, bodyCRC=1237960928, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382419, UNIQ_KEY=0200000111C818B4AAC26BC5B792001A, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50, 54], transactionId='null'}]] 
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=18, sysFlag=0, bornTimestamp=1721571319880, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319881, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000003302, commitLogOffset=13058, bodyCRC=1521507721, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382425, UNIQ_KEY=0200000111C818B4AAC26BC5B848004A, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 52], transactionId='null'}]] 
。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/52026.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PCIe学习笔记(26)

Error Forwarding(错误转发) 错误转发(也称为数据中毒),通过设置EP位表示。下面是一些使用错误转发的例子: •例#1:从主存读取遇到不可纠正的错误 •例#2:PCI写到主存的奇偶校验错误 •例#3:内部数据缓冲区或缓存上的数据完整性错误 错误…

【题目/训练】:双指针

引言 我们已经在这篇博客【算法/学习】双指针-CSDN博客里面讲了双指针、二分等的相关知识。 现在我们来做一些训练吧 经典例题 1. 移动零 思路: 使用 0 当做这个中间点,把不等于 0(注意题目没说不能有负数)的放到中间点的左边,等于 0 的…

在Ubuntu16.04里安装ROS Kinetic

1.设置apt的source list sudo sh -c echo "deb http://packages.ros.org/ros/ubuntu$(lsb_release -sc) main" > /etc/apt/sources.list.d/ros-latest.list 2.设置gpd keys sudo apt-key adv --keyserver hkp://ha.pool.sks-keyservers.net:80 --recv-key 421C365…

基于java的酒店管理系统设计与实现

系统分析与设计 需求分析 1.系统概要 根据餐饮系统的流程,完成从用户登录到开台点菜,到结账收银,到统计一条线的信息化管理,因此整个餐饮管理信息系统的研发内容就是开发一整套餐饮管理信息系统,实现餐饮业务的计算…

【Vue3】集成 Element Plus

【Vue3】集成 Element Plus 背景简介开发环境开发步骤及源码总结 背景 随着年龄的增长,很多曾经烂熟于心的技术原理已被岁月摩擦得愈发模糊起来,技术出身的人总是很难放下一些执念,遂将这些知识整理成文,以纪念曾经努力学习奋斗的…

后端开发刷题 | 合并两个排序的链表

描述 输入两个递增的链表,单个链表的长度为n,合并这两个链表并使新链表中的节点仍然是递增排序的。 数据范围: 0≤n≤1000,−1000≤节点值≤1000 如输入{1,3,5},{2,4,6}时,合并后的链表为{1,2,3,4,5,6},…

MySQL各个版本root账号没有最高权限的解决方法

一、详细报错 ERROR 1045 (28000): Access denied for user ‘root’‘localhost’ (using password: YES) 报错原因(分析过程): rootlocalhost用户密码修改导致 解决方法: 跳过权限验证启动数据库,并修改密码。如下…

怎么快速定位bug?如何编写测试用例?

🍅 点击文末小卡片,免费获取软件测试全套资料,资料在手,涨薪更快 作为一名测试人员如果连常见的系统问题都不知道如何分析,频繁将前端人员问题指派给后端人员,后端人员问题指派给前端人员,那么在…

Windows系统命令

Windows系统命令 Windows 系统中的命令行工具是指令式编程语言,可以用来执行各种任务、管理文件和目录、监控系统状态等。下面是一个 Windows 命令应用实例: 1. 文件操作 cd:用于改变当前目录。例如,cd Documents 将当前目录更…

Ubuntu下通过Docker部署Synapse服务器技术博客

今天,我在阿贝云这个不错的免费云服务器上进行Synapse部署测试。这家免费云服务商太棒了,1核CPU、1G内存、10G硬盘、5M带宽,阿贝云的免费服务器性能超乎想象。 作为一个资深的IT技术爱好者,我简直爱不释手Docker这个神器。它可以轻松地帮我部署各种应用程序,包括今…

独立站PrestaShop安装

独立站PrestaShop安装 独立站PrestaShop安装系统需求下载PrestaShop浏览器下载命令行下载 解压PrestaShop创建数据库移动PrestaShop源码到web目录composer安装依赖包nginx配置访问域名进入安装页面选择语言许可协议系统兼容性店铺信息Content of your store系统配置数据库店铺安…

数据结构day03(栈 Stack 顺序栈、链式栈 )内含具体详细代码实现

目录 【1】栈 Stack 1》栈的定义 2》顺序栈 2》链式栈 4》顺序栈的链式栈的区别 【1】栈 Stack 1》栈的定义 栈:是只允许在一端进行插入或删除的线性表,首先栈是一种线性表,但限定这种线性表只能在某一端进行插入和删除操作。 栈顶&…

MySQL的查询排序、模糊查询及通配符

1. 查询排序 在MySQL中,使用ORDER BY子句可以对查询结果进行排序。以下是ORDER BY子句的基本语法: SELECT 列名 FROM 表名 ORDER BY 列名 ASC/DESC; 其中,ASC表示升序排序,DESC表示降序排序。例如,我们可以使用以下…

仿照ContentLoadingProgressBar 的特点在Android项目中自定义Loading对话框

ContentLoadingProgressBar 是 Android 中的一个控件,继承自 ProgressBar。它在 ProgressBar 的基础上添加了一些特殊功能,主要用于在加载内容时显示进度。它的一些主要特点如下: 自动隐藏和显示:ContentLoadingProgressBar 会在…

JavaScript_7_练习:随机抽奖案例

效果图 代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>练习&#xff1a;随机抽奖案例</tit…

vue项目配置基础路由vue-router

1、运行以下命令安装vue-router yarn add vue-router 2、在src目录下的components中新建两个vue页面 3、在src目录下新建router文件夹&#xff0c;在router文件夹下面新建index.js文件 4、配置main.js文件 //引入Vue import Vue from "vue"; //引入App import App…

全新分支版本!微软推出Windows 11 Canary Build 27686版

已经很久没有看到 Windows 11 全新的分支版本了&#xff0c;今天微软发布 Windows 11 Canary 新版本&#xff0c;此次版本号已经转移到 Build 27xxx&#xff0c;首发版本为 Build 27686 版。 此次更新带来了多项改进&#xff0c;包括 Windows Sandbox 沙盒功能切换到 Microsof…

LearnOpenGL——SSAO学习笔记

LearnOpenGL——SSAO学习笔记 SSAO一、基本概念二、样本缓冲三、法向半球四、随机核心转动五、SSAO着色器六、环境遮蔽模糊七、应用SSAO遮蔽因子 SSAO 一、基本概念 环境光照是我们加入场景总体光照中的一个固定光照常量&#xff0c;它被用来模拟光的散射(Scattering)。散射应…

QT事件机制理解

事件和信号 从硬件层来看: 事件就是一种中断&#xff0c; 中断的产生形式: 1.用户操控硬件所产生的中断。 2.由系统自身所产生的中断&#xff0c;比如说定时器。 这种中断由系统内核监控&#xff0c;由系统内核接收到中断并向CPU发出的执行请求就叫信号。所以说事件是信号产生…

C++,std::bind 详解

文章目录 1. 概述2. 基本用法2.1 使用占位符2.2 示例 3. 总结 1. 概述 std::bind 是 C11 引入的一个功能&#xff0c;它允许你将函数&#xff08;或成员函数、函数对象&#xff09;与其参数绑定&#xff0c;生成一个新的可调用对象。这个功能在需要将函数及其参数一起传递给其…