RocketMQ阅读源码前的准备

本文将讲解如何在IDEA中导入 RocketMQ 源码,并运行 Broker 和 NameServer,编写一个消息发送与消息消费的示例。

一. 源码导入及调试

1.1 导入源码

RocketMQ 原先是阿里巴巴集团内部的消息中间件,于2016年提交至Apache基金会孵化,并最终成为Apache顶级项目。

第一步:从GitHub上clone RocketMQ 源码:RocketMQ。点击 File->Open 导入项目。

后面关于RocketMQ的源码分析若无特殊说明,将默认使用 4.6.0 版本

第二步:构建项目

在项目根目录下执行下列命令

mvn clean install -Dmaven.test.skip=true

1.2 启动 NameServer

第一步:展开 namesrv 模块,鼠标右键选中 NamesrvStartup.java,将其拖拽到Debug As,选中 Debug ‘NamesrvStartup.java.main()’,弹出下图:

第二步:单击 Environment variables 后面的按钮,弹出配置界面,配置如下环境变量:

ROCKETMQ_HOME=E:\software-engineer\java\open-source\rocketmq-home

点击 OK->Apply 保存即可。

第三步:在刚刚指定的文件夹内创建conf、logs、store文件夹。

第四步:从 distribution 部署目录中将 broker.conflogback_broker.xmllogback_namesrv.xml 等文件复制到 conf 目录中,按需修改 broker.conf

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0#nameServer地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH#存储路径
storePathRootDir=E:\software-engineer\java\open-source\rocketmq-home\store
#CommitLog存储路径
storePatchCommitLog=E:\software-engineer\java\open-source\rocketmq-home\store\commitlog
#消费队列存储路径
storePathConsumeQueue=E:\software-engineer\java\open-source\rocketmq-home\store\consumequeue
#消息索引存储路径
storePathIndex=E:\software-engineer\java\open-source\rocketmq-home\store\index
#checkpoint 文件存储路径
storeCheckpoint=E:\software-engineer\java\open-source\rocketmq-home\store\checkpoint
#abort 文件存储路径
abortFile=E:\software-engineer\java\open-source\rocketmq-home\store\abort

第五步:点击 Debug 运行 NamesrvStartup.java,并输出“The Name Server boot success. serializeType=JSON”

1.3 启动 Broker

第一步:展开 broker 模块,启动 BrokerStartup.java ,会提示需要配置 ROCKETMQ_HOME 环境变量。与 NameServer启动时一样,配置环境变量即可:

第二步:配置-c参数,指定broker.conf 配置文件地址:

第三步:以 Debug 模式启动 BrokerStartup.java,查看${ROCKET_HOME}/logs/broker.log 文件,未报错则表示启动成功:

1.4 代码测试

1.4.1 生产者

第一步:修改org.apache.rocketmq.example.quickstart.Producer 示例程序,设置 RocketMQ NameServer地址:

public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();for (int i = 0; i < 1; i++) {try {Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) );SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}

第二步:运行该方法发送消息

1.4.2 消费者

第一步:修改 org.apache.rocketmq.example.quickstart.Consumer 示例程序,设置RocketMQ NameServer地址

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

第二步:运行该方法,消费消息

二. RocketMQ 源码结构

RocketMQ源码组织方式基于 Maven 模块组织,RocketMQ核心目录说明如下:

  • acl:权限控制模块
  • broker:broker模块(broker启动进程)
  • client:消息客户端,包含消息生产者和消费者
  • common:公共包
  • dev:开发者信息(非源码)
  • distribution:打包分发目录(非源码)
  • example:示例代码
  • filter:消息过滤相关基础类
  • logappender:日志实现相关类
  • logging:自主实现日志相关类
  • namesrv:NameServer实现相关类(NameServer启动进程)
  • openmessaging:消息开放标准,已发布
  • remoting:远程通信模块,基于Netty
  • srvutil:服务器工具类
  • store:消息存储相关实现类
  • style:checkstyle相关实现
  • test:测试相关类
  • tools:工具类,监控命令相关实现类。

三. Rocket设计理念和设计目标

3.1 设计理念

RocketMQ 设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储和消息消费,整体设计追求简单和性能高效,主要体现在一下三个方面:

  1. 首先,NameServer设计极其简单,摒弃了业界常用的使用Zookeeper充当信息管理的“注册中心”,而是自研NameServer来实现元数据的管理(Topic路由信息等)。从实际需求出发,因为Topic路由信息无须在集群之间保持强一致,追求最终一致性,并且能容忍分钟级的不一致。正是基于此种情况,RocketMQ的NameServer集群之间互不通信,极大地降低了NameServer实现的复杂程度,对网络的要求也降低了不少,但是性能相比较Zookeeper有了极大的提升。
  2. 其次是高效的IO存储机制。RocketMQ追求消息发送的高吞吐量,RocketMQ的消息存储文件设计成文件组的概念,组内单个文件大小固定,方便引入内存映射机制,所有主题的消息存储基于顺序写,极大地提供了消息写性能,同时为了兼顾消息消费与消息查找,引入了消息消费队列文件与索引文件。
  3. 最后是容忍存在设计缺陷,适当将某些工作下放给RocketMQ使用者。消息中间件的实现者经常会遇到一个难题:如何保证消息一定能被消息消费者消费,并且保证只消费一次。RocketMQ的设计者给出的解决办法是不解决这个难题,而是退而求其次,只保证消息被消费者消费,但设计上允许消息被重复消费,这样极大地简化了消息中间件的内核,使得实现消息发送高可用变得非常简单与高效,消息重复问题由消费者在消息消费时实现幂等。

3.2 设计目标

RocketMQ作为一款消息中间件,需要解决如下问题:

  1. 架构模式 RocketMQ与大部分消息中间件一样,采用发布订阅模式,基本的参与组件主要包括:消息发送者、消息服务器(消息存储)、消息消费、路由发现。

  2. 顺序消息 所谓顺序消息,就是消息消费者按照消息达到消息存储服务器的顺序消费。RocketMQ可以严格保证消息有序。

  3. 消息过滤 消息过滤是指在消息消费时,消息消费者可以对同一主题下的消息按照规则只消费自己感兴趣的消息。RocketMQ消息过滤支持在服务端与消费端的消息过滤机制。

    1. 消息在Broker端过滤。Broker只将消息消费者感兴趣的消息发送给消息消费者。
    2. 消息在消息消费端过滤,消息过滤方式完全由消息消费者自定义,但缺点是有很多无用的消息会从Broker传输到消费端。
  4. 消息存储 消息中间件的一个核心实现是消息的存储,对消息存储一般有如下两个维度的考量:消息堆积能力和消息存储性能。RocketMQ追求消息存储的高性能,引入内存映射机制,所有主题的消息顺序存储在同一个文件中。同时为了避免消息无限在消息存储服务器中累积,引入了消息文件过期机制与文件存储空间报警机制。

  5. 消息高可用性 通常影响消息可靠性的有以下几种情况。

    1. Broker正常关机。
    2. Broker异常Crash。
    3. OS Crash。
    4. 机器断电,但是能立即恢复供电情况。
    5. 机器无法开机(可能是CPU、主板、内存等关键设备损坏)。
    6. 磁盘设备损坏。

    针对上述情况,情况14的RocketMQ在同步刷盘机制下可以确保不丢失消息,在异步刷盘模式下,会丢失少量消息。情况56属于单点故障,一旦发生,该节点上的消息全部丢失,如果开启了异步复制机制,RoketMQ能保证只丢失少量消息,RocketMQ在后续版本中将引入双写机制,以满足消息可靠性要求极高的场合。

  6. 消息到达(消费)低延迟 RocketMQ在消息不发生消息堆积时,以长轮询模式实现准实时的消息推送模式。

  7. 确保消息必须被消费一次 RocketMQ 通过消息消费确认机制(ACK)来确保消息至少被消费一次(least once),但由于ACK消息有可能丢失等其他原因,RocketMQ无法做到消息只被消费一次,有重复消费的可能。

  8. 回溯消息 回溯消息是指消息消费端已经消费成功的消息,由于业务要求需要重新消费消息。RocketMQ支持按时间回溯消息,时间维度可精确到毫秒,可以向前或向后回溯。

  9. 消息堆积 消息中间件的主要功能是异步解耦,必须具备应对前端的数据洪峰,提高后端系统的可用性,必然要求消息中间件具备一定的消息堆积能力。RocketMQ消息存储使用磁盘文件(内存映射机制),并且在物理布局上为多个大小相等的文件组成逻辑文件组,可以无限循环使用。RocketMQ消息存储文件并不是永久存储在消息服务器端,而是提供了过期机制,默认保留3天

  10. 定时消息 定时消息是指消息发送到Broker后,不能被消息消费端立即消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意精度的定时消息消费,必须在消息服务端对消息进行排序,势必带来很大的性能损耗,故RocketMQ不支持任意进度的定时消息,而只支持特定延迟级别。

  11. 消息重试机制 消息重试是指消息在消费时,如果发送异常,消息中间件需要支持消息重新投递,RocketMQ支持消息重试机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/192126.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录算法训练营第三十四天|62.不同路径,63. 不同路径 II

62. 不同路径 - 力扣&#xff08;LeetCode&#xff09; 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish” &#…

oj赛氪练习题

数组调整 import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner scanner new Scanner(System.in);int n scanner.nextInt();int k scanner.nextInt();int[] arr new int[n];for (int i 0; i < n; i) {arr[i] scanner.nextIn…

python+Qt5+sqllite 个性化单词记忆软件设计

问题描述&#xff1a; 设计一款背诵英语单词的软件。用户可以根据自己的需求导入需背诵的词库&#xff0c;并可以编辑自己的词库。背单词时有两种模式供选择&#xff1a;系统可以给出中文提示&#xff0c;用户输入对应的单词&#xff0c;也可输出单词让用户输入中文意思。系统判…

【pytest】执行环境切换的两种解决方案

一、痛点分析 在实际企业的项目中&#xff0c;自动化测试的代码往往需要在不同的环境中进行切换&#xff0c;比如多套测试环境、预上线环境、UAT环境、线上环境等等&#xff0c;并且在DevOps理念中&#xff0c;往往自动化都会与Jenkins进行CI/CD&#xff0c;不论是定时执行策略…

【数据中台】开源项目(5)-Amoro

介绍 Amoro is a Lakehouse management system built on open data lake formats. Working with compute engines including Flink, Spark, and Trino, Amoro brings pluggable and self-managed features for Lakehouse to provide out-of-the-box data warehouse experience,…

指针常量和常量指针的区别

文章目录 指针常量常量指针即是指针常量又是常量指针 指针常量 指针常量的本质是常量&#xff0c;表示的是 这个指针所指向的地址不能发生改变。即指针变量的值&#xff08;即地址值&#xff09;不能发生修改。但是指针所指向的那块内存里的值是可以修改的。 注意&#xff1a;…

canvas基础:绘制圆弧、圆形

canvas实例应用100 专栏提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。 canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重要的帮助。 文章目录 arc…

【大数据】HBase 中的列和列族

&#x1f60a; 如果您觉得这篇文章有用 ✔️ 的话&#xff0c;请给博主一个一键三连 &#x1f680;&#x1f680;&#x1f680; 吧 &#xff08;点赞 &#x1f9e1;、关注 &#x1f49b;、收藏 &#x1f49a;&#xff09;&#xff01;&#xff01;&#xff01;您的支持 &#x…

K8S客户端二 使用Rancher部署服务

Rancher容器云管理平台 本博客中使用了四台服务器&#xff0c;如下 rancher服务器k8s-masterk8s-worker01k8s-worker02 一、主机硬件说明 序号硬件操作及内核1CPU 4 Memory 4G Disk 100GCentOS72CPU 4 Memory 4G Disk 100GCentOS73CPU 4 Memory 4G Disk 100GCentOS74CPU 4 …

Linux的基本指令(五)

目录 前言 tar指令(重要) 再次思考&#xff0c;为什么要打包和压缩呢&#xff1f; 实例&#xff1a;基于xshell进行压缩包在Windows与Linux之间的互传 实例&#xff1a;实现两个Linux系统之间的文件互传 bc指令 uname -r指令 重要的热键 关机与开机 扩展命令 shell及…

国际语音群呼系统的产品优势有哪些?为什么要使用国际语音群呼系统?

一、国际语音群呼系统的产品优势&#xff1a; 1.巨量群呼 支持大容量并发群呼&#xff0c;呼叫不受限制&#xff0c;充裕的线路保障造就百万级平台容量&#xff0c;可以短时间内同时拨打大量电话&#xff0c;让语音快速到达&#xff0c;大大提高发送效率&#xff1b; 2.自主…

Android12蓝牙框架

参考&#xff1a; https://evilpan.com/2021/07/11/android-bt/ https://source.android.com/docs/core/connect/bluetooth?hlzh-cn https://developer.android.com/guide/topics/connectivity/bluetooth?hlzh-cn https://developer.android.com/guide/components/intents-fi…

FL Studio Producer Edition21.0.3中文版安装详解(附下载链接)

fl studio 21中文版是Image-Line公司继20版本之后更新的水果音乐制作软件&#xff0c;很多用户不太理解&#xff0c;为什么新版本不叫fl studio 21或fl studio2024&#xff0c;非得直接跳到21.2版本&#xff0c;其实该版本是为了纪念该公司22周年&#xff0c;所以该版本也是推出…

使用gcloud SDK 管理和部署 Cloud run service

查看cloud run 上的service 列表&#xff1a; gcloud run services list > gcloud run services listSERVICE REGION URL LAST DEPLOYED BY LAST DEPL…

小米秒享3--非小米电脑

小米妙享中心是小米最新推出的一款功能&#xff0c;能够为用户们提供更加舒适便利的操作体验。简单的说可以让你的笔记本和你的小米手机联动&#xff0c;比如你在手机的文档&#xff0c;连接小米共享后&#xff0c;可以通过电脑进行操作。 对于非小米电脑想要体验终版秒享AIOT…

使用java批量生成Xshell session(*.xsh)文件

背景 工作中需要管理多套环境, 有时需要同时登陆多个节点, 且每个环境用户名密码都一样, 因此需要一个方案来解决动态的批量登录问题. XShell Xshell有session管理功能: 提供了包括记住登录主机、用户名、密码及登录时执行命令或脚本(js,py,vbs)的功能 session被存储在xsh文…

计算机组成学习-数据的表示和运算总结

1、进制与编码 1.1 进位计数法 常用的进位计数法有十进制、二进制、八进制、十六进制等。十六进制每个 数位可取0〜9、A、B、C、D、E、F中的任意一个&#xff0c;其中A、B、C、D、E、F分别表示 10〜15。 八进制数字通常以前缀 "0"&#xff08;零&#xff09;加上数…

GoLong的学习之路,进阶,RabbitMQ (消息队列)

快有一周没有写博客了。前面几天正在做项目。正好&#xff0c;项目中需要MQ&#xff08;消息队列&#xff09;&#xff0c;这里我就补充一下我对mq的理解。其实在学习java中的时候&#xff0c;自己也仿照RabbitMQ自己实现了一个单机的mq&#xff0c;但是mq其中一个特点也就是&a…

基于ResNet18网络完成图像分类任务

目录 1 数据处理 1.1 数据集介绍 1.2 数据读取 1.3 构造Dataset类 2 模型构建 3 模型训练 4 模型评价 5 模型预测 6 什么是预训练模型和迁移学习 7 比较“使用预训练模型”和“不使用预训练模型”的效果。 总结 在本实践中&#xff0c;我们实践一个更通用的图像分类任务…

rust-flexi_logger

flexi_logger 是字节开源的rust日志库。目前有log4rs、env_log 等库&#xff0c;综合比较下来&#xff0c;还是flexi_logger简单容易上手&#xff0c;而且自定义很方便&#xff0c;以及在效率方面感觉也会高&#xff0c;下篇文章我们来测试下。 下面来看下怎么使用 关注 vx gol…