RocketMQ快速实战以及集群架构原理详解

RocketMQ快速实战以及集群架构原理详解

组成部分

  • 启动Rocket服务之前要先启动NameServer

image.png

  • NameServer
    • 提供轻量级Broker路由服务,主要是提供服务注册
  • Broker
    • 实际处理消息存储、转发等服务的核心组件
  • Producer
    • 消息生产者集群,通常为业务系统中的一个功能模块
  • Consumer
    • 消息消费者集群,通常是业务系统中的一个功能模块
  • Topic
    • 区分消息的种类,生产端可以发送消息给一个或多个topic,消费端可以进行一个或多个消息进行消费

集群中的角色

  • Producer
    • 消息发送者(寄信人),在生产者中会把同一类生产者组成一个集合,称之为生产者组,这类生产者发送同一类消息且发送逻辑一致,如果发送的是事务消息是原始生产者在发送之后崩溃,则Broker服务会联系同一生产组的其它生产者来提交或回溯消费
  • Consumer
    • 消息接受者(收信人),消费者同样会把一类消费者组成一个集合,称之为消费者组,这类消费者消费同一类消息且消费逻辑一致,消费者组在消息消费方面,实现负载均衡和容错非常容易,消费组中的消费者必须订阅相同的topic
    • RocketMQ支持两种消息模式
      • 集群消费模式
        • 相同消费者组下的每个消费者平摊消息
      • 广播消费模式
        • 相同消费者组的每个消费者接受全量消息
  • Broker Server
    • 暂存和传输消息(邮局),也存储消息相关的元数据信息(包括消费者组、消费进度偏移、主题、队列消息等),Broker Server是RocketMQ真正的业务核心
      • 子模块
        • Remoting Module
          • 整个Broker的实体,负责处理来自Client端的请求
        • Client Manager
          • 负责管理客户端以及维护消费者订阅的topic信息
        • Store Service
          • 提供方便简单的API接口处理消息存储到物理硬盘的查询功能
        • HA Service
          • 高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能
        • Index Service
          • 根据特定的Message Key对投递到Broker的消息进行索引服务,以提供消息的快速查询
      • Broker架构模式
        • 普通集群
          • 每个节点分配一个固定的角色,master负责响应客户端的写以及存储消息,slave只负责对master的消息进行同步以及响应客户端的读
            • 消息同步方式分为同步和异步
        • Dledger高可用集群
          • 基于Raft协议随机选举出一个master,而master挂了之后,会从slage中自动选举出一个节点作为新master
          • Dledger的职责
            • 接管Broker的Commitlog的消息存储
            • 从集群中选举出master节点
            • 完成master节点往slave节点的消息同步
  • Name Server
    • 管理Broker(邮局的管理机构),Broker Server启动时会向所有的Name Server注册自己的服务信息,并且后续通过心跳来保证服务信息的实时性,生产者或消费者可以通过名称服务查找各个topic响应的Broker IP列表,多个Name Server实例组成集群(AP模式),但相互独立,没有信息互换,意味着Name Server中任意的节点挂了,只要有一台服务节点正常,整个路由服务不会受影响
  • Topic
    • 区分消息的种类,一个发送者可以发送消息给一个或多个topic,一个消息的接受者可以订阅一个或多个topic消息,同一个topic下的数据,会分片存储到不同的Broker上,而每一个分片单位MessageQueue(类似于Kafka中的Partition)
  • MessageQueue
    • 相当于Topic中的分区,用于并行发送和接收消息,每个Topic默认有4个MessageQueue

消息确认机制

  • 消息生产端采用消息确认多次重试的机制来保证消息能发送到MQ

    • 3种发送消息的方式

      • 同步发送

        • 必须等到Broker反馈之后才能继续发,安全性最高但发消息最慢
      • 单向发送

        • 不管消息是否发成功都能继续发,所以吞吐量最高,但是安全性低,容易丢消息
      • 异步发送

        • 发送消息的同时回注册一个回调去处理响应,安全性低,容易丢消息
  • 消息消费者端采⽤状态确认机制保证消费者⼀定能正常处理对应的消息

    • Broker会通过记录重试次数,为了不影响topic下其它正常的消息,会给每个消费组设计对应的重试topic,在消息重试时,会将原topic的消息移动到对应的重试topic中去,当重试达到一定阈值会将失败的消息推入到死信topic中
    • 消费者组由多个消费者实例组成,Broker只需要向某一个实例推送消息即可,保障消息重试机制正常运行,并且Broker只通过消费者返回的状态来判断是否处理成功,但是业务执行是否正确是无法知道的
  • 消费者也可以⾃⾏指定起始消费位点

    • Broker通过消费者返回的状态来推进消费者组对应的消息offset,虽然offset是Broker来维护,但是消费者可以自己指定offset进行消费

消息模型

顺序消息

  • 只能保证局部消息有序,不能保证全局有序,要保证全局有序需要从生产端、Broker、消费端三个角度同时考虑才行
    • 生产端
      • 默认情况下,生产端采用轮询将消息投递到不同的MessageQueue种,而消费端会从多个MessageQueue中拉取消息,所以这种情况下是无法保证顺序的,所以只有让一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这组消息有序
    • Broker
      • Broker中的一个MessageQueue是可以保证有序的
    • 消费端
      • 消费端会从多个MessageQueue上拉取消息,此时每个MessageQueue的消息是有序的,但是多个MessageQueue直接混合到一起却是乱序的,如果想要保证消费有序,可以通过锁MessageQueue的方式,消费完一个MessageQueue再去消费下一个来保证
        • MessageListenerOrderly会锁队列,取完一个才能下一个
        • MessageListenerConcurrently不会锁队列,每次从多个MessageQueue取出一批数据(默认不超过32条)
  • 实现思路简概
    • 生产者只有将一批有序的消息放到同一个MessageQueue上,Broker才有可能保持这一批消息的顺序
    • 消费者只有一次锁定一个MessageQueue,拿到MessageQueue上消息
  • 注意点
    • 大部分业务场景下只要保证局部有序,如果要保证全局有序,只能保留一个MessageQueue,性能较低
    • 生产者端尽可能将有序消息打散到不同的MessageQueue上,避免数据热点竞争
    • 消费者端只能使用同步方式处理消息,不要使用异步处理,更不能自行批量处理
    • 消费者端只进行有限次数的重试,如果一条消息处理失败,RocketMQ会将后续消息阻塞,让消费者进行重试,但是如果消费者一直处理失败,超过最大重试次数,RocketMQ会跳过这条消息,直接处理后面的消息,导致消费乱序
    • 消费者端如果处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代

广播消息

  • 广播消息并没有特定的消费者,因为这涉及到消费者的集群消费模式,默认是集群模式
  • 实现思路简概
    • 集群模式
      • 一个消息只会被一个消费组中的多个消费者共同处理一次
        • Broker端会给每个消费者组维护一个统一的offset来保证同一个消费组内只会被消费一次
    • 广播模式
      • 一个消息会被推送给所有消费者消费,不再关心消费组
        • Broker端只管推消息,消费端自己维护offset
  • 注意点
    • Broker端不维护消费进度,如果消费者处理消息失败了,将⽆法进⾏消息重试
    • 消费端自己维护offset可以在服务重启后继续之前的进度,消息丢了也不影响服务稳定性

延迟消息

  • RocketMQ给消息定制了18个默认的延迟级别,延迟消息的难点其实是性能,需要不断进⾏定时轮询,全部扫描所有消息是不可能的
  • 实现思路简概
    • RocketMQ预设一个系统topic(SCHEDULE_TOPIC_XXXX),在这个topic下有18个延迟队列,每次只针对这些队列里的消息进行延迟操作
  • 注意点
    • 预设延迟时间导致不灵活,后续版本已经支持预设一个具体的时间戳,不建议调整延迟级别对应的延迟时间

批量消息

  • 生产者端发送的消息过多时,可以将多条消息合并进行批量发送,减少网络IO,提升消息发送的吞吐量

  • 注意点

    • 只能对同一topic下的消息进行批量发送,不支持延迟消息,以及批量消息的大小不超过1MB(超过了自行拆分)

过滤消息

  • 同一topic下不同的消息,消费者只关注某一类消息,有简单过滤和SQL过滤方式
  • 实现思路简概
    • 简单过滤
      • ⽣产者端需要在发送消息时,增加Tag属性,消费者端/Broker端就可以通过这个Tag属性过滤出需要的消息
    • SQL过滤
      • ⽣产者端需要在发送消息时,增加Tag属性以及自定义的属性,消费者端/Broker端可以指定一个SQL进行过滤
  • 注意点
    • 消息过滤在消费者端和Broker端都可以做,消费者端进行过滤可以保障消息过滤的可控性,而Broker端过滤可以减少不必要数数据网络IO(只把消费者端需要的消息发送出去就行)
    • 在实际生产中,被过滤的消息并不会直接丢弃,会交给其它需要的消费者进行消费,如果一直没有消费者进行消费,Broker端会继续推进offset

事务消息

  • 通过RocketMQ的事务机制,来保障本地事务(比如数据库)与MQ消息发送的事务一致性(上下游的数据一致性)

  • 实现思路简概

    • 生产者端将消息发送到MQ服务端
    • MQ服务端将消息持久化成功之后,向生产者端反馈已发送成功,此时消息处于半事务消息状态(暂不能投递)
    • 生产者端开始执行本地事务逻辑
    • 生产者端根据本地事务执行结果向MQ服务端提交二次确认结果来判断是否提交或回滚
      • 提交
        • 服务端将半事务消息标记为可投递,将半事务消息转交给消费端
      • 回滚
        • 服务端将回滚事务,放弃将半事务消息转交给消费端
    • 当出现网络故障或生产者端重启时,若果MQ服务端未收到二次确认消息结果或收到的结果为未知状态,经过一定时间后,MQ服务端将对生产者组的任一生产者发送消息回查,生产者收到消息回查后,需要检查对应消息的本地事务执行最终结果,然后生产者端根据检查到的最终结果再次提交二次确认来判断是否提交或回滚
  • 注意点

    • 半消息是对消费者不可⻅的⼀种消息,RocketMQ的做法是将消息转到了⼀个系统Topic(RMQ_SYS_TRANS_HALF_TOPIC)
    • 事务消息中,本地事务默认回查次数15次,本地事务回查的默认间隔60秒,超过回查次数后,消息将会被丢弃
    • 事务消息不支持延迟消息和批量消息

最佳实战注意点

  • 合理分配Topic、Tag
    • ⼀个应⽤尽可能⽤⼀个Topic,⽽消息⼦类型则可以⽤tags来标识,tags可以由应⽤⾃由设置,只有⽣产者在发送消息设置了tags,消费⽅在订阅消息时才可以利⽤tags通过broker做消息过滤
  • 使⽤Key加快消息索引
    • 分配好Topic和Tag之后,⾃然就需要优化Key属性了,因为Key也可以参与消息过滤,通常建议每个消息要分配 ⼀个在业务层⾯的唯⼀标识码,设置到Key属性中
      • 作用
        • 可以配合Tag进⾏更精确的消息过滤
        • Broker端会为每个消息创建⼀个hash索引,应⽤可以通过topic、key来查询某⼀条历史的消息内容,以及消息在集群内的处理情况,为了避免哈希冲突问题,客户端要尽量保证key的唯⼀性
  • 关注错误消息重试
    • RocketMQ消费者端,如果处理消息失败了,Broker是会将消息重新进⾏投送,⽽在重试时,每个消费者组创建⼀个对应的重试队列(“%RETRY%”+ConsumeGroup),多关注重试队列,可以及时了解消费者端的运⾏情况,如果这个队列中出现了⼤量的消息,就意味着消费者的运⾏出现了问题,要及时跟踪进⾏⼲预
    • RocketMQ默认允许每条消息最多重试16次(可以定制),如果消息重试16次后仍然失败,消息将不再投递。转为进⼊死信队列
  • ⼿动处理死信队列
    • 当⼀条消息消费失败,RocketMQ就会⾃动进⾏消息重试。⽽如果消息超过最⼤重试次数,RocketMQ就会认为这个消息有问题,RocketMQ不会⽴刻将这个有问题的消息丢弃,⽽会将其发送到这个消费者组对应的死信队列,此时需要人工去查看死信队列(%DLQ%+ConsumGroup)中的消息,对错误原因进行排查以及对死信进行处理(转发到正常的tipic进行重新消费或者丢弃)
    • 死信队列的特征
      • ⼀个死信队列对应⼀个ConsumGroup,⽽不是对应某个消费者实例
      • 如果⼀个ConsumeGroup没有产⽣死信,RocketMQ就不会为其创建相应的死信队列
      • 死信队列中的消息不会再被消费者正常消费
      • 死信队列的有效期跟正常消息相同,默认3天(可配置),超过这个最⻓时间的消息都会被删除,⽽不管消息是否消费过
  • 消费者端进⾏幂等控制
    • 在MQ系统中,对于消息幂等有三种实现语义
      • at most once 最多⼀次:每条消息最多只会被消费⼀次
        • 可以⽤异步发送、sendOneWay等⽅式就可以保证
      • at least once ⾄少⼀次:每条消息⾄少会被消费⼀次
        • 可以⽤同步发送、事务消息等很多⽅式能够保证
      • exactly once 刚刚好⼀次:每条消息都只会确定的消费⼀次
        • RocketMQ只能保证at least once,保证不了exactly once
          • 云上版本支持
    • 消息幂等的必要性
      • 出现重复的情况
        • 发送时消息重复
          • 当⼀条消息已被成功发送到服务端并完成持久化,此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端
            应答失败, 如果此时⽣产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且消息ID也相同的消息
        • 投递时消息重复
          • 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候⽹络闪断,为
            了保证消息⾄少被消费⼀次,Broker端将在⽹络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且消息ID也相同的消息
        • 负载均衡时消息重复(不限于⽹络抖动、Broker 重启以及订阅⽅应⽤重启)
          • 当 Broke端 或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息
      • 处理方式
        • 在RocketMQ中,是⽆法保证每个消息只被投递⼀次的,所以要在业务上⾃⾏来保证消息消费的幂等性,RocketMQ的每条消息都有⼀个唯⼀的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以⽤这个MessageId来作为判断幂等的关键依据,但是最好使用分布式ID来避免出现冲突

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/702385.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

板块二 JSP和JSTL:第四节 EL表达式 来自【汤米尼克的JAVAEE全套教程专栏】

板块二 JSP和JSTL:第四节 EL表达式 一、什么是表达式语言二、表达式取值(1)访问JSP四大作用域(2)访问List和Map(3)访问JavaBean 三、 EL的各种运算符(1).和[ ]运算符&…

《The Art of InnoDB》第二部分|第4章:深入结构-磁盘结构-redo log

4.3 redo log 目录 4.3 redo log 4.3.1 redo log 介绍 4.3.2 redo log 的作用 4.3.3 redo log file 结构 4.3.4 redo log 提交逻辑 4.3.5 redo log 持久化逻辑 4.3.6 redo log 检查点 4.3.7 小结

汇编语言与接口技术实践——秒表

1. 设计要求 基于 51 开发板,利用键盘作为按键输入,将数码管作为显示输出,实现电子秒表。 功能要求: (1)计时精度达到百分之一秒; (2)能按键记录下5次时间并通过按键回看 (3)设置时间,实现倒计时,时间到,数码管闪烁 10 次,并激发蜂鸣器,可通过按键解除。 2. 设计思…

抖音数据抓取工具|短视频下载工具|视频内容提取软件

一、开发背景: 随着抖音平台的流行,越来越多的人希望能够下载抖音视频以进行个人收藏或分享。然而,目前在网上找到的抖音视频下载工具功能单一,操作繁琐,无法满足用户的需求。因此,我们决定开发一款功能强大…

java面试题之mysql篇

1、数据库索引 ​​​​​​​ 索引是对数据库表中一列或多列的值进行排序的一种结构,使用索引可快速访问数据库表中的特定信息。如果想按特定职员的姓来查找他或她,则与在表中搜索所有的行相比,索引有助于更快地获取信息。 索引的一个主要…

编程笔记 Golang基础 030 接口

编程笔记 Golang基础 030 接口 一、接口的定义:二、接口的实现:三、接收者类型四、应用示例五、接口的意义 在Go语言中,接口是一种类型定义,它描述了一组方法签名,任何实现了这些方法的类型都隐式地实现了这个接口。这…

k8s-创建命名空间的方法

使用命令式创建namespace kubectl create namespace test-namespace查看命名空间 kubectl get namespace使用声明式创建命名空间 a. 编写dev-namespace.yaml文件 apiVersion: v1 kind: Namespace metadata:name: dev-namespaceb. 使用dev-namespace.yaml,yaml文件创…

音视频开发之旅(69)-SD图生图

目录 1. 效果展示 2. ControlNet介绍 3. 图生图流程浅析 4. SDWebui图生图代码流程 5. 参考资料 一、效果展示 图生图的应用场景非常多,比较典型的应用场景有风格转化(真人与二次元)、线稿上色、换装和对图片进行扩图等,下面…

TCP/IP协议栈:模拟器实现基本的L2和L3功能

在C中实现的TCPI/IP网络堆栈模拟器。该模拟器实现基本的第2层(MAC地址,Arp)和第3层(路由,IP)功能。 TCP/IP协议栈是一个网络通信的基础架构,包含了多层次的协议和功能。在模拟实现基本的L2和L3…

神经网络2-卷积神经网络一文深度读懂

卷积神经网络(Convolutional Neural Network, CNN)是一类包含卷积计算且具有深度结构的前馈神经网络(Feedforward Neural Networks),主要用于图像识别、语音识别和自然语言处理等任务,是深度学习&#xff0…

使用决策树算法预测隐形眼镜类型

目录 谷歌笔记本(可选) 编写算法:决策树 准备数据:拆分数据集 测试算法:构造注解树 使用算法:预测隐形眼镜类型 谷歌笔记本(可选) from google.colab import drive drive.mount…

ubuntu20.04 tvm 安装教程

ubuntu20.04 tvm 安装教程: 参考: 1. https://tvm.hyper.ai/docs/install/from_source/ 2. https://blog.csdn.net/wenwen_2020/article/details/134856293 步骤: 1. 创建容器:docker run -itd --name tvm --gpusall --ipchost…

Springboot之压缩逻辑源码跟踪流程

背景 在项目开发过程中,前后端参数比较多,导致网络传输耗时比较多,因此想将数据压缩传输,以减少网络传输的耗时,从而减少接口的响应时间,可以自己实现,但是spring相关的框架已经内置了该功能&am…

堆排序、快速排序和归并排序

堆排序、快速排序和归并排序是所有排序中最重要的三个排序,也是难度最大的三个排序;所以本文单独拿这三个排序来讲解 目录 一、堆排序 1.建堆 2.堆排序 二、快速排序 1.思想解析 2.Hoare版找基准 3.挖坑法找基准 4.快速排序的优化 5.快速排序非…

C语言--左旋字符/右旋字符实现及其判断

1.题目解释 左旋就是把对应的左边的放到右边 例如ABCDEF左旋2个字符就是BCDEFAB&#xff0c;左旋3个字符就是DEFABC&#xff1b; 2.代码实现 void leftmove(char* str, int k) {int j 0;assert(str);for (j 0; j < k; j){char temp *str;int len strlen(str);int i …

单个文件实现cpu的信息检测:ruapu.h的学习笔记

https://github.com/nihui/ruapu是nihui大佬开发的用单文件检测CPU特性的项目 ruapu.h的使用 "ruapu.h"主要提供了两个函数 ruapu_init 和 ruapu_supports&#xff0c;分别用于初始化和检测指令集支持。 // 使用示例见&#xff1a;https://github1s.com/nihui/rua…

MyBatis核心配置文件

1、properties属性&#xff1a; 将变量提取出来变成全局变量 enable-default-value&#xff1a;启动默认值 数据库环境四要素 2、settings属性 &#xff1a; 开启二级缓存&#xff0c;开启延迟加载懒加载 消极懒加载积极懒加载 <setting name"cacheEnable" valu…

Spring数据脱敏实现

在当今的数字化时代&#xff0c;数据安全和个人隐私保护变得日益重要。为了遵守各种数据保护法规&#xff0c;如欧盟的GDPR&#xff08;通用数据保护条例&#xff09;&#xff0c;企业在处理敏感信息时需要格外小心。数据脱敏是一种常见的技术手段&#xff0c;用于隐藏敏感数据…

Servlet使用Cookie和Session

一、会话技术 当用户访问web应用时&#xff0c;在许多情况下&#xff0c;web服务器必须能够跟踪用户的状态。比如许多用户在购物网站上购物&#xff0c;Web服务器为每个用户配置了虚拟的购物车。当某个用户请求将一件商品放入购物车时&#xff0c;web服务器必须根据发出请求的…

windows实现ip1:port1转发至ip2:port2教程

第一步&#xff1a;创建虚拟网卡(如果ip1为本机127.0.0.1跳过此步骤)&#xff0c;虚拟网卡的IPV4属性设置ip1 1> 创建虚拟网卡参见 Windows系统如何添加虚拟网卡&#xff08;环回网络适配器&#xff09;_windows添加虚拟网卡-CSDN博客​​​​​​ 2> 设置虚拟网卡使用…