RocketMQ从入门到精通

1.MQ概述


1.1 RocketMQ简介

 RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。

1.2 MQ用途

  • 限流削峰

MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

  •  异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。

  •  数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。

1.3 常见MQ产品

  • RabbitMQ


RabbitMQ是使用ErLang语言开发的一款MQ产品。其吞吐量较Kafka与RocketMQ要低,且由于其不是Java语言开发,所以公司内部对其实现定制化开发难度较大。

  • Kafka


Kafka是使用Scala/Java语言开发的一款MQ产品。其最大的特点就是高吞吐量,常用于大数据领域的实时计算、日志采集等场景。其没有遵循任何常见的MQ协议,而是使用自研协议。

  • RocketMQ


RocketMQ是使用Java语言开发的一款MQ产品。经过数年阿里双11的考验,性能与稳定性非常高。其没有遵循任何常见的MQ协议,而是使用自研协议。

对比

2.RocketMQ 基本概念


2.1 消息


消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。单个消息所占空间不会很大。

RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageId有两个:在生产者send()消息时会自动生成一个MessageId(msgId),当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识。 

msgId:由producer端生成,其生成规则为: producerIp + 进程pid + MessageClientIDSetter类的ClassLoader的hashCode + 当前时间 + AutomicInteger自增计数器 
offsetMsgId:由broker端生成,其生成规则为:brokerIp + 物理分区的offset(Queue中的偏移量) 
key:由用户指定的业务相关的唯一标识
 

2.2 主题


Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。 一个生产者可以同时发送多种Topic的消息;而一个消费者只对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic的消息。 

2.3 标签


标签为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。 标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。 Topic是消息的一级分类,Tag是消息的二级分类。Topic相当于货物,Tag相当于上海山东等地区。

2.4 队列


存储消息的物理实体。 一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。 一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。 一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费。 一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。

分片不同于分区。在RocketMQ中,分片指的是存放相应Topic的Broker。每个分片中会创建出相应数量的分区,即Queue,每个Queue的大小都是相同的。

 2.5 Producer


消息生产者,负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。  例如:用户提交的请求写入到MQ的过程,就是消息生产的过程,在这里用户就是生产者 。


 RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息。如果主题中有多个队列,生产者组只有一个生产者,生产者会采取轮询的方式进行发送消息。

生产者代码如下:导入依赖

       <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version></dependency>

生产者代码

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer order = new DefaultMQProducer("order");order.setNamesrvAddr("localhost:9876");order.start();Message message = new Message("myTopic", "myTag", ("test").getBytes());SendResult result = order.send(message);System.out.println(result);order.shutdown();}


2.6 Consumer


消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。  例如:系统从MQ中读取到请求,并对请求进行处理的过程就是消息消费的过程,在这里系统就是消费者。 
 RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息。 消费者组使得在消息消费方面,实现负载均衡(将一个Topic中的不同的Queue平均分配给同一个Consumer Group的不同的Consumer,注意,并不是将消息负载均衡)和容错(一个Consmer挂了,该Consumer Group中的其它Consumer可以接着消费原Consumer消费的Queue)的目标变得非常容易。

消费者代码

public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("myTopic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("收到的消息"+list);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}

负载均衡策略

queue 个数大于 Consumer个数, 那么 Consumer 会平均分配 queue,不够平均,会根据clientId排序来拿取余数
queue个数小于Consumer个数,那么会有Consumer闲置,就是浪费掉了,其余Consumer平均分配到queue

消费者组中Consumer的数量应该小于等于订阅Topic的Queue数量。如果超出Queue数量,则多出的Consumer将不能消费消息。

2.7 NameServer


 NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。 
主要包括两个功能: 
Broker管理:接受Broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查Broker是否还存活。

路由信息管理:每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。 


路由注册 
Name Server既然是注册中心,那么是如何完成注册的呢? NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。 那各节点中的数据是如何进行数据同步的呢?在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在NameServer内部维护着⼀个Broker列表,用来动态存储Broker的信息。 
 
Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名称、Broker所属集群名称等等。NameServer在接收到心跳包后,会更新心跳时间戳,记录这个Broker的最新存活时间。 


路由剔除 
由于Broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除。 NameServer中有⼀个定时任务,每隔10秒就会扫描⼀次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除。 


路由发现 
RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取Topic最新的路由。 默认客户端每30秒会拉取一次最新的路由。
 

2.8 Broker


Broker充当着消息中转角色,负责存储消息、转发消息。
Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker同时也存储着消息相关的元数据,包括消费者组消费进度偏移offset、主题、队列等。

模块如下图:

Remoting Module:整个Broker的实体,负责处理来自clients端的请求。而这个Broker实体则由以下模块构成。

Client Manager:客户端管理器。负责接收、解析客户端(Producer/Consumer)请求,管理客户端。例如,维护Consumer的Topic订阅信息

Store Service:存储服务。提供方便简单的API接口,处理消息存储到物理硬盘和消息查询功能。

HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。

Index Service:索引服务。根据特定的Message key,对投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能。

2.9 RocketMQ 工作流程


工作流程如下图:

1)启动NameServer,NameServer启动后开始监听端口,等待Broker、Producer、Consumer连接。


2)启动Broker时,Broker会与所有的NameServer建立并保持长连接,然后每50秒向NameServer定时发送心跳包。


3)发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建Topic。


4) Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息,即当前发送的Topic消息的Queue与Broker的地址(IP+Port)的映射关系。然后根据算法策略从队选择一个Queue,与队列所在的Broker建立长连接从而向Broker发消息。当然,在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信息。


5)Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态。
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/234182.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java第二十章课堂总结

如果一次只完成一件事情&#xff0c;很容易实现。但现实生活中&#xff0c;很多事情都是同时进行的。Java中为了模拟这种状态&#xff0c;引入了线程机制。简单地说&#xff0c;当程序同时完成多件事情时&#xff0c;就是所谓的多线程。多线程应用相当广泛&#xff0c;使用多线…

【bug日记】如何切换jdk版本,如何解决java和javac版本不一致

背景 今天在安装jenkins后&#xff0c;使用java运行war包的时候&#xff0c;提示jdk1.8版本太低&#xff0c;需要提高版本&#xff0c;所以就需要切换jdk版本 解决 在用户变量中&#xff0c;首先更改了JAVA_HOME的地址为17的目录&#xff0c;发现javac的版本改为17了&#x…

Jmeter的接口测试详细步骤并实现业务闭环

一、首先是了解Jmeter接口测试用到的组件 1、测试计划&#xff1a;Jmeter的起点和容器2、线程组&#xff1a;代表一定的虚拟用户3、取样器&#xff1a;发送请求的最小单元4、逻辑控制器&#xff1a;控制组件的执行顺序5、前置处理器&#xff1a;在请求之前的操作6、后置处理器…

用sqlite制作对局记录管理

1. sqlite简介 sqlite是一款非常轻便小巧的数据库&#xff0c;以C语言开发&#xff0c;已流行了数十年&#xff0c;据说是世界上部署最多的数据库。为什么是部署最多的呢&#xff1f;因为它根本不需要数据库服务器&#xff0c;且可以在任意设备、任意操作系统上部署。因此&…

服务宕机、线上环境内存溢出OOM分析思路

前言 平时工作中&#xff0c;肯定会遇到哪个产品经理突然来找&#xff0c;说服务器又挂了&#xff0c;怎么又用不了啦&#xff01;类似的紧急情况&#xff0c;遇到这种情况不要慌&#xff0c;我提供以下几点紧急补救思路。 1&#xff09;重启大法保命 2&#xff09;确认是否新…

5分钟上手浏览器插件测试——Eolink Apikit

Eolink Apikit 研发管理和自动化测试产品中&#xff0c;提供了多种发起 API 测试的方式&#xff1a; 服务器测试&#xff1a;通过 Eolink Apikit 官方远程服务器发送请求&#xff0c;不需要安装任何插件&#xff0c;但是无法访问本地服务器(localhost)、内网、局域网。插件测试…

Qt中,将一个结构体转为qbytearray后,如何将这个qpqbytearray重新恢复为之前的结构体

在 Qt 中&#xff0c;如果你有一个自定义的结构体&#xff0c;并将其转换为 QByteArray&#xff0c;然后想要将 QByteArray 转换回原始的结构体&#xff0c;你可以使用 Qt 的 QDataStream 类来实现这个转换过程。 首先&#xff0c;假设你有一个自定义的线段结构体如下&#xf…

Git Status 中文乱码解决

中文的文件名&#xff0c;全是乱码 解决&#xff1a; git config --global core.quotepath false

年终汇报这么写,升值加薪必有你!

#01 你这么能干&#xff0c; 老板知道吗&#xff1f; — 打工人最怕什么&#xff1f; 最怕你忙前忙后&#xff0c;干活一大堆&#xff0c;气出一身结节&#xff0c;锅还没少背&#xff0c;最后升职加薪没有你&#xff0c;出国旅游不带你&#xff1b;更怕你日常996&#xf…

基于深度学习的图像分割

摘要 遥感图像分割是利用遥感技术获取的高分辨率图像进行像素级别的分类&#xff0c;将图像中的不同物体或不同地物提取出来的过程。这个过程对于遥感应用具有重要意义&#xff0c;因为它能够提取出地物和地表特征&#xff0c;如河流、道路、建筑、植被、水体等&#xff0c;并且…

同义词替换器降低论文重复率的最新技术动态

大家好&#xff0c;今天来聊聊同义词替换器降低论文重复率的最新技术动态&#xff0c;希望能给大家提供一点参考。 以下是针对论文重复率高的情况&#xff0c;提供一些修改建议和技巧&#xff0c;可以借助此类工具&#xff1a; 标题&#xff1a;同义词替换器降低论文重复率的最…

从旺店通·企业版到金蝶云星空通过接口配置打通数据

从旺店通企业版到金蝶云星空通过接口配置打通数据 对接系统&#xff1a;旺店通企业版 旺店通是北京掌上先机网络科技有限公司旗下品牌&#xff0c;国内的零售云服务提供商&#xff0c;基于云计算SaaS服务模式&#xff0c;以体系化解决方案&#xff0c;助力零售企业数字化智能化…

MySQL数据库知识点简易总结篇

SQL数据库简易版笔记 SQL&#xff08;结构化查询语句&#xff09;&#xff1a;可操作世界上所有的关系型数据 关系数据库 建立在关系模型基础上的数据库。简单来说&#xff0c;关系型数据库是多张能互相连接的二维表组成的数据库。其优势有&#xff1a; 都是表结构&#xf…

基于SSM的视康眼镜网店销售系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

HarmonyOS4.0从零开始的开发教程20三方库的基本使用

HarmonyOS&#xff08;十八&#xff09;三方库的基本使用 三方库概述 三方库是开发者在系统能力的基础上进行了一层具体功能的封装&#xff0c;对其能力进行拓展&#xff0c;提供更加方便的接口&#xff0c;提升开发效率的工具。如果是发布到开源社区&#xff0c;称为开源三方…

高级算法设计与分析(二) -- 递归与分治策略

系列文章目录 高级算法设计与分析&#xff08;一&#xff09; -- 算法引论 高级算法设计与分析&#xff08;二&#xff09; -- 递归与分治策略 高级算法设计与分析&#xff08;三&#xff09; -- 动态规划 未完待续【 高级算法设计与分析&#xff08;四&#xff09; -- 贪…

CCD相机和CMOS相机有什么区别

问题描述&#xff1a;CCD相机和CMOS相机有什么区别。在阅读一些论文时&#xff0c;常看到工业上的检测常用到CCD相机&#xff0c;和我们熟知的CMOS相机有什么区别呢。 问题解答&#xff1a; CCD相机&#xff1a; 原理&#xff1a; 光电转换&#xff1a; 光子被感光芯片上的…

面试经典150题(32-37)

leetcode 150道题 计划花两个月时候刷完&#xff0c;今天&#xff08;第十五天&#xff09;完成了6道(32-37)150&#xff1a; 今天刚好有点没精神的感觉&#xff0c;然后碰到的题也不难。。天意&#xff01;&#xff01;&#xff01; 32.&#xff08;289. 生命游戏&#xff0…

iEnglish:家校协同培养学生自主阅读习惯

近日,2023年最新一期365天和1000天“iEnglish学习成长营”顺利结营。据悉,截至今年12月,在家庭场景中完成365天和1000天不间断阅读人数分别突破15万人和2万人。 近年来,全民终身学习的学习型社会不断深入推进,基础教育中对于学生的综合阅读能力素养新要求不断更新。提升孩子的…

【Git】Git基本操作

文章目录 Git 是什么Git 的优点Git 安装Linux UbuntuLinux CentOsWindows Git 基本操作1. 创建 Git 本地仓库2. 配置 Git3. Git工作区、暂存区和版本库4. 添加文件5. 查看 .git 文件6. 修改文件7. 版本回退 Git 是什么 Git是一个免费的、开源的分布式版本控制系统&#xff0c;…