RocketMQ从入门到精通

1.MQ概述


1.1 RocketMQ简介

 RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。

1.2 MQ用途

  • 限流削峰

MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

  •  异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。

  •  数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。

1.3 常见MQ产品

  • RabbitMQ


RabbitMQ是使用ErLang语言开发的一款MQ产品。其吞吐量较Kafka与RocketMQ要低,且由于其不是Java语言开发,所以公司内部对其实现定制化开发难度较大。

  • Kafka


Kafka是使用Scala/Java语言开发的一款MQ产品。其最大的特点就是高吞吐量,常用于大数据领域的实时计算、日志采集等场景。其没有遵循任何常见的MQ协议,而是使用自研协议。

  • RocketMQ


RocketMQ是使用Java语言开发的一款MQ产品。经过数年阿里双11的考验,性能与稳定性非常高。其没有遵循任何常见的MQ协议,而是使用自研协议。

对比

2.RocketMQ 基本概念


2.1 消息


消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。单个消息所占空间不会很大。

RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageId有两个:在生产者send()消息时会自动生成一个MessageId(msgId),当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识。 

msgId:由producer端生成,其生成规则为: producerIp + 进程pid + MessageClientIDSetter类的ClassLoader的hashCode + 当前时间 + AutomicInteger自增计数器 
offsetMsgId:由broker端生成,其生成规则为:brokerIp + 物理分区的offset(Queue中的偏移量) 
key:由用户指定的业务相关的唯一标识
 

2.2 主题


Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。 一个生产者可以同时发送多种Topic的消息;而一个消费者只对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic的消息。 

2.3 标签


标签为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。 标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。 Topic是消息的一级分类,Tag是消息的二级分类。Topic相当于货物,Tag相当于上海山东等地区。

2.4 队列


存储消息的物理实体。 一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。 一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。 一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费。 一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。

分片不同于分区。在RocketMQ中,分片指的是存放相应Topic的Broker。每个分片中会创建出相应数量的分区,即Queue,每个Queue的大小都是相同的。

 2.5 Producer


消息生产者,负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。  例如:用户提交的请求写入到MQ的过程,就是消息生产的过程,在这里用户就是生产者 。


 RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息。如果主题中有多个队列,生产者组只有一个生产者,生产者会采取轮询的方式进行发送消息。

生产者代码如下:导入依赖

       <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version></dependency>

生产者代码

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer order = new DefaultMQProducer("order");order.setNamesrvAddr("localhost:9876");order.start();Message message = new Message("myTopic", "myTag", ("test").getBytes());SendResult result = order.send(message);System.out.println(result);order.shutdown();}


2.6 Consumer


消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。  例如:系统从MQ中读取到请求,并对请求进行处理的过程就是消息消费的过程,在这里系统就是消费者。 
 RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息。 消费者组使得在消息消费方面,实现负载均衡(将一个Topic中的不同的Queue平均分配给同一个Consumer Group的不同的Consumer,注意,并不是将消息负载均衡)和容错(一个Consmer挂了,该Consumer Group中的其它Consumer可以接着消费原Consumer消费的Queue)的目标变得非常容易。

消费者代码

public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("myTopic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("收到的消息"+list);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}

负载均衡策略

queue 个数大于 Consumer个数, 那么 Consumer 会平均分配 queue,不够平均,会根据clientId排序来拿取余数
queue个数小于Consumer个数,那么会有Consumer闲置,就是浪费掉了,其余Consumer平均分配到queue

消费者组中Consumer的数量应该小于等于订阅Topic的Queue数量。如果超出Queue数量,则多出的Consumer将不能消费消息。

2.7 NameServer


 NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。 
主要包括两个功能: 
Broker管理:接受Broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查Broker是否还存活。

路由信息管理:每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。 


路由注册 
Name Server既然是注册中心,那么是如何完成注册的呢? NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。 那各节点中的数据是如何进行数据同步的呢?在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在NameServer内部维护着⼀个Broker列表,用来动态存储Broker的信息。 
 
Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名称、Broker所属集群名称等等。NameServer在接收到心跳包后,会更新心跳时间戳,记录这个Broker的最新存活时间。 


路由剔除 
由于Broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除。 NameServer中有⼀个定时任务,每隔10秒就会扫描⼀次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除。 


路由发现 
RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取Topic最新的路由。 默认客户端每30秒会拉取一次最新的路由。
 

2.8 Broker


Broker充当着消息中转角色,负责存储消息、转发消息。
Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker同时也存储着消息相关的元数据,包括消费者组消费进度偏移offset、主题、队列等。

模块如下图:

Remoting Module:整个Broker的实体,负责处理来自clients端的请求。而这个Broker实体则由以下模块构成。

Client Manager:客户端管理器。负责接收、解析客户端(Producer/Consumer)请求,管理客户端。例如,维护Consumer的Topic订阅信息

Store Service:存储服务。提供方便简单的API接口,处理消息存储到物理硬盘和消息查询功能。

HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。

Index Service:索引服务。根据特定的Message key,对投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能。

2.9 RocketMQ 工作流程


工作流程如下图:

1)启动NameServer,NameServer启动后开始监听端口,等待Broker、Producer、Consumer连接。


2)启动Broker时,Broker会与所有的NameServer建立并保持长连接,然后每50秒向NameServer定时发送心跳包。


3)发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建Topic。


4) Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息,即当前发送的Topic消息的Queue与Broker的地址(IP+Port)的映射关系。然后根据算法策略从队选择一个Queue,与队列所在的Broker建立长连接从而向Broker发消息。当然,在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信息。


5)Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态。
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/234182.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java第二十章课堂总结

如果一次只完成一件事情&#xff0c;很容易实现。但现实生活中&#xff0c;很多事情都是同时进行的。Java中为了模拟这种状态&#xff0c;引入了线程机制。简单地说&#xff0c;当程序同时完成多件事情时&#xff0c;就是所谓的多线程。多线程应用相当广泛&#xff0c;使用多线…

【bug日记】如何切换jdk版本,如何解决java和javac版本不一致

背景 今天在安装jenkins后&#xff0c;使用java运行war包的时候&#xff0c;提示jdk1.8版本太低&#xff0c;需要提高版本&#xff0c;所以就需要切换jdk版本 解决 在用户变量中&#xff0c;首先更改了JAVA_HOME的地址为17的目录&#xff0c;发现javac的版本改为17了&#x…

Jmeter的接口测试详细步骤并实现业务闭环

一、首先是了解Jmeter接口测试用到的组件 1、测试计划&#xff1a;Jmeter的起点和容器2、线程组&#xff1a;代表一定的虚拟用户3、取样器&#xff1a;发送请求的最小单元4、逻辑控制器&#xff1a;控制组件的执行顺序5、前置处理器&#xff1a;在请求之前的操作6、后置处理器…

服务宕机、线上环境内存溢出OOM分析思路

前言 平时工作中&#xff0c;肯定会遇到哪个产品经理突然来找&#xff0c;说服务器又挂了&#xff0c;怎么又用不了啦&#xff01;类似的紧急情况&#xff0c;遇到这种情况不要慌&#xff0c;我提供以下几点紧急补救思路。 1&#xff09;重启大法保命 2&#xff09;确认是否新…

5分钟上手浏览器插件测试——Eolink Apikit

Eolink Apikit 研发管理和自动化测试产品中&#xff0c;提供了多种发起 API 测试的方式&#xff1a; 服务器测试&#xff1a;通过 Eolink Apikit 官方远程服务器发送请求&#xff0c;不需要安装任何插件&#xff0c;但是无法访问本地服务器(localhost)、内网、局域网。插件测试…

年终汇报这么写,升值加薪必有你!

#01 你这么能干&#xff0c; 老板知道吗&#xff1f; — 打工人最怕什么&#xff1f; 最怕你忙前忙后&#xff0c;干活一大堆&#xff0c;气出一身结节&#xff0c;锅还没少背&#xff0c;最后升职加薪没有你&#xff0c;出国旅游不带你&#xff1b;更怕你日常996&#xf…

同义词替换器降低论文重复率的最新技术动态

大家好&#xff0c;今天来聊聊同义词替换器降低论文重复率的最新技术动态&#xff0c;希望能给大家提供一点参考。 以下是针对论文重复率高的情况&#xff0c;提供一些修改建议和技巧&#xff0c;可以借助此类工具&#xff1a; 标题&#xff1a;同义词替换器降低论文重复率的最…

从旺店通·企业版到金蝶云星空通过接口配置打通数据

从旺店通企业版到金蝶云星空通过接口配置打通数据 对接系统&#xff1a;旺店通企业版 旺店通是北京掌上先机网络科技有限公司旗下品牌&#xff0c;国内的零售云服务提供商&#xff0c;基于云计算SaaS服务模式&#xff0c;以体系化解决方案&#xff0c;助力零售企业数字化智能化…

基于SSM的视康眼镜网店销售系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

高级算法设计与分析(二) -- 递归与分治策略

系列文章目录 高级算法设计与分析&#xff08;一&#xff09; -- 算法引论 高级算法设计与分析&#xff08;二&#xff09; -- 递归与分治策略 高级算法设计与分析&#xff08;三&#xff09; -- 动态规划 未完待续【 高级算法设计与分析&#xff08;四&#xff09; -- 贪…

iEnglish:家校协同培养学生自主阅读习惯

近日,2023年最新一期365天和1000天“iEnglish学习成长营”顺利结营。据悉,截至今年12月,在家庭场景中完成365天和1000天不间断阅读人数分别突破15万人和2万人。 近年来,全民终身学习的学习型社会不断深入推进,基础教育中对于学生的综合阅读能力素养新要求不断更新。提升孩子的…

【Git】Git基本操作

文章目录 Git 是什么Git 的优点Git 安装Linux UbuntuLinux CentOsWindows Git 基本操作1. 创建 Git 本地仓库2. 配置 Git3. Git工作区、暂存区和版本库4. 添加文件5. 查看 .git 文件6. 修改文件7. 版本回退 Git 是什么 Git是一个免费的、开源的分布式版本控制系统&#xff0c;…

【Linux】ip命令使用

ip命令 用于管理与配置网络接口和路由表。 ip命令的安装 ip 命令来自 iproute2 软件包&#xff0c;在 CentOS 7 中默认已安装。 yum install -y iproute 语法 ip [ OPTIONS ] OBJECT { COMMAND | help }ip [ -force ] -batch filename选项及作用 执行令 &#xff1a; ip …

TCP的拥塞控制_基础知识_四种拥塞控制方法

TCP的拥塞控制 一.拥塞控制的基本概念 在某段时间&#xff0c;若对网络中某一资源的需求超过了该资源所能提供的可用部分&#xff0c;网络性能就要变坏&#xff0c;这种情况就叫作拥塞 。 计算机网络中的链路容量(带宽)、交换节点中的缓存和处理机等都是网络的资源 若出现拥塞…

spring boot 配置多数据源 踩坑 BindingException: Invalid bound statement (not found)

在上一篇&#xff1a;《【已解决】Spring Boot多数据源的时候&#xff0c;mybatis报错提示&#xff1a;Invalid bound statement (not found)》 凯哥(凯哥Java) 已经接受了&#xff0c;在Spring Boot配置多数据源时候&#xff0c;因为自己马虎&#xff0c;导致的一个坑。下面&a…

Notepad++:多行数据操作

1&#xff09;删除关键字之后&#xff08;或之前&#xff09;的所有字符 删除s之后&#xff08;包含s&#xff09;的所有内容&#xff1b;快捷键&#xff1a;s.*$ 替换成功 删除s之前&#xff08;包含s&#xff09;的所有内容&#xff1b;快捷键&#xff1a;^.*s 2&#xff09…

深度学习中的张量维度

1 深度学习中的张量 在深度学习框架中&#xff0c;Tensor&#xff08;张量&#xff09;是一种数据结构&#xff0c;用于存储和操作多维数组。张量可以被视为一种扩展的矩阵&#xff0c;它可以具有任意数量的维度。 在深度学习中&#xff0c;张量通常被用来表示神经网络的输入…

PowerDesigner生成数据字典

这里写自定义目录标题 1&#xff0c;创建物理模型2&#xff0c;创建数据源连接3&#xff0c;获取表4&#xff0c;创建报告 先看下最终效果 1&#xff0c;创建物理模型 2&#xff0c;创建数据源连接 填写数据源连接信息 测试连接是否成功 3&#xff0c;获取表 连接刚创建的数…

Springboot数据校验与异常篇

一、异常处理 1.1Http状态码 HTTP状态码是指在HTTP通信过程中&#xff0c;服务器向客户端返回的响应状态。它通过3位数字构成&#xff0c;第一个数字定义了响应的类别&#xff0c;后两位数字没有具体分类作用。以下是常见的HTTP状态码及其含义&#xff1a; - 1xx&#xff08;信…

[PyTorch][chapter 8][李宏毅深度学习][Back propagation]

前言&#xff1a; 反向传播算法(英:Backpropagation algorithm&#xff0c;简称:BP算法)是一种监督学习算法&#xff0c;常被用来训练多层感知机。 它用于计算梯度计算中&#xff0c;降低误差。 目录&#xff1a; 链式法则 模型简介&#xff08;Model&#xff09; 损失函…