RocketMQ消息队列(一)—— 基本概念和消息类型

  RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ上,并且最近几年的双十一大促中,RocketMQ都有十分优秀的表现。Apache RocketMQ 4.3以后得版本正式支持事务消息,为分布式事务实现提供便利性支持。

一、RocketMQ的基本概念

  RocketMQ是一个分布式的消息中间件架构,其主要角色的关系如下图所示:
在这里插入图片描述
角色介绍如下:

  • Producer:消息的发送者
  • Consumer:消息接收者
  • Broker:暂存和传输消息,RocketMQ中实现消息暂存和发送消息的主要单元
  • NameServer:管理Broker的模块
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic,一个消息的接受者可以接收一个或者多个Topic
  • Message Queue:相当与topic的分区;用于并发发送和接收消息。

二、RocketMQ的消息类型

1、按照发送的特点分类

1)同步发送

  • 同步发送,现成阻塞,投递completes阻塞结束
  • 如果发送失败,会在默认的超时时间3s内进行重试,最多重试2次
  • 投递completes不代表投递成功,要check SendResult.sendStatus来判断是否投递成功
  • SendResult里面有发送状态的枚举;SendStatus,同步的消息投递有一个状态返回的值
public enum SendStatus {SEND_OK,FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE,
}
  • retry的实现原理:只有ack的SendStatus=SEND_OK才会停止retry
  • 发送同步消息且Ack为SEND_OK,只代表该消息成功的写入了MQ中,并不代表该消息成功被Consumer消费了

2)异步发送

  • 异步调用的话,当前线程一定要等待异步线程回调结束再关闭producer,以为是异步的,不会阻塞,提前关闭producer会导致未回调连接就断开了。
  • 异步消息不回retry,投递失败回调onException()方法,只有同步消息才会retry
  • 异步发送一般用于链路耗时较长,对RT响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

3)单向发送

  • 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功
  • 此方式发送消息的过程耗时非常短,一般在微妙级

三种发送方式的主要区别

发送方式发送TPS发送结果反馈可靠性
同步发送不丢失
异步发送不丢失
单向发送最快可能丢失

2、按照使用功能特点分

1)普通消息(订阅)

  普通消息就是我们业务开发中用到的最多的消息类型,生产者需要关注消息发送成功即可,消费者消费到消息即可,不需要保证消息的顺序,所以消息可以大规模并发的发送和消费,吞吐量很高,适合大部分场景。

2)顺序消息

  顺序消息分为分区顺序消息和全局顺序消息,全局顺序消息比较容易理解,也就是哪条消息先进入,哪条消息就会先被消费,符合我们的FIFO,很多时候全局顺序消息的实现代价太大,所以就出现了分区顺序消息。分区顺序消息就是在一个Broker中有序,其概念概念如下图所示:
在这里插入图片描述
我们通过对消息的key进行hash,想通hash的消息会被发送到同一个分区里面,当然如果要做全局顺序消息,我们的分区只需要一个即可,所以全局顺序消息的代价是比较大的,牺牲了扩展性和高并发性。

3)延时消息 —— 适用于订单超时库存归还等等业务

  延迟的机制是在服务端实现的,也就是Broker收到消息后,经过一段时间才发送给消费者,RocketMQ按照1-N定义了如下级别:1s,5s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,1h,2h;若要发送定时消息,在应用层初始化Message消息对象之后,调用Message.setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取响应的延迟级别,例如level=2,则延迟为5s,其实现原理如下:

  • 1、发送消息的时候如果消息设置了DelayTimeLevel,那么消息就会被丢到ScheduleMessageService.SCHEDULE_TOPIC这个topic里面,
  • 2、根据DelayTimeLevel选择对应的queue
  • 3、再把真是的topic和queue信息封装起来,set到msg里面
  • 4、然后每个SCHEDULE_TOPIC_XXXXX的每个DelayTimeLevelQueue,有定时任务取刷新是否有待投递的消息。
  • 每隔10s定时持久化发送进度

4)事务消息

  云消息队列 RocketMQ 版提供的分布式事务消息适用于所有对数据最终一致性有强需求的场景,详细的原理可以参考《分布式事务(五)——基于本地消息和可靠消息的解决方案》中的基于可靠消息的一致性的部分,这里我们主要介绍事务消息的相关内容。

(1)概念介绍
  • 事务消息:云消息队列 RocketMQ 版提供类似XA或Open XA的分布式事务功能,通过云消息队列 RocketMQ 版事务消息能达到分布式事务的最终一致。
  • 半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了云消息队列 RocketMQ 版服务端,但是云消息队列 RocketMQ 版服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,云消息队列 RocketMQ 版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
(2)分布式事务消息的优势

  云消息队列 RocketMQ 版分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。
在这里插入图片描述

(3)典型场景

  在淘宝购物车下单时,涉及到购物车系统和交易系统,这两个系统之间的数据最终一致性可以通过分布式事务消息的异步处理实现。在这种场景下,交易系统是最为核心的系统,需要最大限度地保证下单成功。而购物车系统只需要订阅云消息队列 RocketMQ 版的交易订单消息,做相应的业务处理,即可保证最终的数据一致性。

(4)交互流程

事务消息交互流程如下图所示。
在这里插入图片描述
事务消息发送步骤如下:

  1. 生产者将半事务消息发送至云消息队列 RocketMQ 版服务端。
  2. 云消息队列 RocketMQ 版服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

事务消息回查步骤如下:

  1. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  2. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
(5)使用规则

生产消息的规则

  • 事务消息发送完成本地事务后,可在execute方法中返回以下三种状态:
    • TransactionStatus.CommitTransaction:提交事务,允许消费者消费该消息。
    • TransactionStatus.RollbackTransaction:回滚事务,消息将被丢弃不允许消费。
    • TransactionStatus.Unknow:暂时无法判断状态,等待固定时间以后云消息队列 RocketMQ 版服务端根据回查规则向生产者进行消息回查。
  • 通过ONSFactory.createTransactionProducer创建事务消息的Producer时必须指定LocalTransactionChecker的实现类,处理异常情况下事务消息的回查。
  • 回查规则:本地事务执行完成后,若云消息队列 RocketMQ 版服务端收到的本地事务返回状态为TransactionStatus.Unknow,或生产者应用退出导致本地事务未提交任何状态。则云消息队列 RocketMQ 版服务端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。
    • 回查间隔时间:系统默认每隔30秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。
    • 第一次消息回查最快时间:该参数支持自定义设置。若指定消息未达到设置的最快回查时间前,系统默认每隔30秒一次的回查任务不会检查该消息。
      以Java为例,以下设置表示:第一次回查的最快时间为60秒。
Message message = new Message();
message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"60");

消费消息的规则

  • 事务消息的Group ID不能与其他类型消息的Group ID共用。与其他类型的消息不同,事务消息有回查机制,回查时云消息队列 RocketMQ 版服务端会根据Group ID去查询生产者客户端。

后记
  个人总结,欢迎转载、评论、批评指正

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/662972.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mybatis的一级缓存和二级缓存

一、介绍 1、mybatis缓存: mybatis包含一个非常强大的查询缓存特性,可以非常方便的定制和配置缓存,通过缓存减少Java Application与数据库的交互次数,从而提升程序的运行效率。 2、mybatis一二级缓存 mybatis的缓存分为一级缓存…

Docker中配置MySql环境

目录 一、简单安装 1. 首先从Docker Hub中拉取镜像 2. 启动尝试创建MySQL容器,并设置挂载卷。 3. 查看mysql8这个容器是否启动成功 4. 如果已经成功启动,进入容器中简单测试 4.1 进入容器 4.2 登录mysql中 4.3 进行简单添加查找测试 二、主从复…

MySQL-----初识

一 SQL的基本概述 基本概述 ▶SQL全称: Structured Query Language,是结构化查询语言,用于访问和处理数据库的标准的计算机语言。SQL语言1974年由Boyce和Chamberlin提出,并首先在IBM公司研制的关系数据库系统SystemR上实现。 ▶美国国家标…

MySQL亿级数据的查询优化-历史表该如何建

前端时间在知乎上看到一个问题,今天有空整理并测试了一下: 这个问题很具体,所以还是可以去尝试优化一下,我们基于InnoDB并使用自增主键来讲。 比较简单的做法是将历史数据存放到另一个表中,与最近的数据分开。那是不是…

如何使用Linux Archcraft结合内网穿透实现SSH远程连接

📑前言 本文主要是使用Linux Archcraft结合内网穿透实现SSH远程连接的文章,如果有什么需要改进的地方还请大佬指出⛺️ 🎬作者简介:大家好,我是青衿🥇 ☁️博客首页:CSDN主页放风讲故事 &#…

go gin 响应数据

go gin 响应数据 package mainimport ("fmt""github.com/gin-gonic/gin" )type UserInfo struct {UserName string json:"user_name"Age int json:"age"Password string json:"-" }func JsonTest(ctx *gin.Context…

黑马Java——常见API

一、游戏打包exe 游戏打包exe要考虑的因素: 游戏打包exe核心步骤: 详见《打包exe文档》 二、Math (一) Math类的常用方法 1、代码实现 2、小结

JVM 笔记

JVM HotSpot Java二进制字节码的运行环境 好处: 一次编写,到处运行自动内存管理,具有垃圾回收功能数组下标越界检查多态(虚方法表) JVM组成 类加载子系统(Java代码转换为字节码)运行时数据…

【JavaEE进阶】 图书管理系统开发日记——贰

文章目录 🌲前言🎄设计数据库🍃引⼊MyBatis和MySQL驱动依赖🌳Model创建🎍约定前后端交互接口🍀服务器代码🚩控制层🚩业务层🚩数据层 🌴效果展示⭕总结 &#…

01- k8s基础网络知识 之 underlay与overlay网络

前言: 我们在学习k8s网络之前,必须要了解k8s网络相关的一些基础知识,比如什么是underlay网络、overlay网络等,只有把基础知识掌握之后,后续学习k8s网络的时候,一些知识点就不会再云里雾里了。 1 underlay与…

协作办公开源神器:ONLYOFFICE

目录 前言ONLYOFFICE为什么选择ONLYOFFICE强大的文档编辑功能多种协作方式多人在线协同支持跨端多平台连接器安全性极高本地部署 ONLYOFFICE 8.0版本震撼来袭可填写的 PDF 表单显示协作用户头像更新插件界面设计更快更强大 总结 前言 近几年来,随着互联网技术的不断…

如何解决 docker registry x509 证书不信任问题?

最近想尝试一下极狐GitLab(可以理解为 GitLab 在中国的发行版)内置的容器镜像仓库,这样就不用自己安装 Harbor 之类的了。于是找了个服务器安装了一个极狐GitLab 的私有化部署版本,安装过程可以参考过往的技术文章使用Omnibus 安装…

在Windows搭建gRPC C++开发环境

本文介绍在Windows下使用Visual Studio 2017编译gRPC 1.48.0并配置开发环境,以及开发、配置一个简单的c服务端以及.net客户端。 0、前置条件 1、下载gRPC源码 使用git命令行在预备存放grpc源码的目录下执行, 此处我们下载的是 grpc 1.48.0 git clone -b v1.48.0 …

Pycharm python用matplotlib 3D绘图显示空白解决办法

问题原因: matplotlib版本升级之后显示代码变了,修改为新的 # ax Axes3D(fig) # 原代码 ax fig.add_axes(Axes3D(fig)) # 新代码import numpy as np import matplotlib.pyplot as plt from matplotlib import cm from mpl_toolkits.mplot3d import Ax…

测试环境搭建整套大数据系统(一:基础配置,修改hostname,hosts,免密,时间同步)

一:使用服务器配置。 二:修改服务器名称hostname,hosts。 在 Linux 系统中,hostname 和 /etc/hosts 文件分别用于管理主机名和主机名解析。 在三台服务器上,分别执行以下命令。 vim /etc/hostnamexdso-hadoop-test-0…

༺༽༾ཊ—Unity之-04-原型模式—ཏ༿༼༻

首先创建一个项目, 在这个初始界面我们需要做一些准备工作, 建基础通用文件夹, 创建一个Plane 重置后 缩放100倍 加一个颜色, 任务1:使用 建造者模式 创建三种 金刚猿猴 零部件 拼接组合 首先资源商店下载 金刚猿猴 模…

刨析数据结构(二)

🌈个人主页:小田爱学编程 🔥 系列专栏:数据结构————"带你无脑刨析" 🏆🏆关注博主,随时获取更多关于数据结构的优质内容!🏆🏆 😀欢迎…

strlen函数详解

🎈个人主页:甜美的江 🎉欢迎 👍点赞✍评论⭐收藏 🤗收录专栏:c语言 🤝希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共同学习、交流进步&a…

动态微信小程序码和开发者工具解析小程序码

一、动态生成微信小程序码 1、方式一 微信官方网站,对已发布的小程序,提供了一个快捷的入口,输入微信小程序的page页面即可。 page页面可以通过右侧开启入口获取 也可以通过开发者工具左下角的页面地址和参数地址那里获取到 二、生成的小…

【软件设计师笔记】计算机系统基础知识考点

【考证须知】IT行业高含金量的证书(传送门) 💖 【软件设计师笔记】程序语言设计考点(传送门) 💖 【软件设计师笔记】操作系统考点(传送门) 💖 🐓 计算机系统组成 计算机系统是由硬件和软件组成的,它们协同工作来运…