RocketMQ 如何保证消息正常【投递】和【消费】

消息整体处理过程,这里我们将消息的整体处理阶段分为3个阶段进行分析:1Producer发送消息阶段。
2Broker处理消息阶段。
3Consumer消费消息阶段。

一、Producer发送消息阶段

1、安全机制保障1,发送方式。

1、同步发送
2、异步发送
3Oneway发送:Oneway 方式只负责发送请求,不等待应答

2、安全机制保障2

如果发送消息失败或者超时,则重新发送。
发送重试源码如下,本质其实就是一个for循环,当发送消息发生异常的时候重新循环发送。默认重试3次,重试次数修改。

3、安全机制保障3

broker提供多master模式,即使某台broker宕机了,保证消息可以投递到另外一台正常的broker上

二、Broker处理消息阶段

1、安全机制1:同步/异步 【刷盘】的策略

当消息投递到broker之后,会先存到page cache,然后根据broker设置的刷盘策略是否立即刷盘,
也就是如果刷盘策略为异步,broker并不会等待消息落盘成功就会返回【producer成功】,还只是保存到了page cache, 
也就是说当broker所在的服务器突然宕机,则会丢失部分页的消息。【这就是异步刷盘带来了的问题】

安全机制2:提供主从模式,同时主从支持同步双写

即使broker设置了【同步刷盘】,如果主broker磁盘损坏,也是会导致消息丢失。
因此可以给broker指定slave,然后将slave设置为同步刷盘策略。此模式下,producer每发送一条消息,都会等消息投递到【master】和【slave】都落盘成功了,
broker才会当作消息投递成功,保证休息不丢失。缺点:比较慢,而且如果单边失败,引发其他问题。

三、Consumer消费消息阶段

consumer默认提供的是【At least Once】机制
何为【At least Once】:就是Consumer先pull消息到本地,消费完成后,才向服务器返回ack。通常消费消息的ack机制一般分为两种思路:
(1)先提交后消费;(可以解决重复消费的问题但是会丢失消息)
(2)先消费,消费成功后再提交;因此Rocketmq默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。

消费消息重试机制,RocketMQ本身提供了重新消费消息的能力。但是会有重复消费的问题。

重复消费的问题出现原因
RocketMQ是以【consumer group+queue】来确认消息消费进度,通过【gruop+offset】来标记【queue】消费进度,
消费成功之后都会返回一个ack消息告之broker更新offset,但是RocketMQ并不是按一条一条消息来做ack,
而是根据一次拉取批量来做消息ack
如一次从broker拉去10条消息,就按照10条消息整体做offset,为方便理解下面先按照10条来分析
如上一次的offset为101,本次拉取了10调消息,偏移量从101-110
每一条消息消费成功会按照当前消息最小的offset来更新本地的消费进度,怎么理解这句话,例如:103消息先消费完成,但是101还没有消费完成(消费失败也算作消费完成),这时候更新还是按照101的偏移量来更新本地偏移量;直到所有的消息都消费完成,110这条消息消费完成的时候才会把偏移量更新为110,再通过定时任务将本地偏移量更新到broker(假设恰好更新偏移量等定时任务触发)。RocketMQ按批次更新进度好处是不需要每一条消息都需要做ack操作,提升了效率,但是随之产生了2个问题:
1、某一条失败,导致整体失败,然后又重行全量消费一次。
2、但是实际是失败的消息,如果处理。问题1:
如果这一批消息中的101消息由于一些原因一直没有消费完成,即使其它的9条消息都消费完成了,
broker的消费进度依然偏移到101,如果此时该consumer宕机或者实例被kill,该queue通过负载均衡策略会重新被分配给
其它的consumer,这个时候从broker拉去的偏移量为101开始消费,但是实际102-1099条消息已经消费完成,
造成102-1099条消息重复消费解决方案:
3.6版本之前RocketMQ没有给出解决方案,官方强调业务方【需要自己实现消息幂等】逻辑,但是为了避免大量的出现消息重
复消费的问题,RocketMQ也做了一些限制,如果本地的消息量达到2000之后,不会在拉取新的消息,也就是即使出现上面的
极端情况,也只会造成最多1999条消息重复消费。3.6之后的版本RocketMQ给出了一个解决方案(治标不治本),在消费端设置了一个消费超时时间
【consumeTimeout = 15min】 原理是,RocketMQ启动了一个定时任务来检查所有的消息的消费情况,在消费开始的时
候会记录消息【消费开始时间】,每隔consumeTimeout时间去检查所有消息是不是消费完成了,如果还没有消费完
成并且时间超过了consumeTimeout配置的时间,就当作【消费成功,但是处理失败】(也算作消费完成),既然消费完成了,
自然会把本地消费进度更新到上例中的110,再通过定时同步机制将本地进度同步到broker,达成本地和broker端一致的效果consumeTimeout支持业务自己配置,为什么说治标不治本,因为始终还是出现2*consumeTimeout时间(比如第一次任务在120分,101消息从121分开始消费,到1230分才会发现超时,如果这个时候宕机)的消息会出现无法完成确认造成消息重复消费。问题2:
既然是按批量来更新消费进度,但是那些虽然消费完成但是实际是【处理失败】的消息(主动返回【RECONSUME_LATER】和
【抛出异常】的)的消息是如何处理的?rocketmq在消息消费失败的消息会单独把该消息的msgid、偏移量等信息通过rpc调用通知给broker,那broker会把该消息做重新的投递,从而做到了消息的重置机制,消息的重试后面在分析

安全性保障:跳转

重复消费:跳转

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/140495.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JPA Buddy快速创建update、find、count、delete、exists方法

JPA Buddy快速创建update、find、count、delete、exists方法,JPA默认提供的CrudRepository\JpaRepository提供的方法比较少,一般我们会手写一些方法,这里我们选择通过JPA Buddy快速生成,之前文章中讲到了JPA Buddy原本是IDEA收费插…

《QT从基础到进阶·二十一》QGraphicsView、QGraphicsScene和QGraphicsItem坐标关系和应用

前言: 我们需要先由一个 QGraphicsView,这个是UI显示的地方,也就是装满可见原色的Scene,然后需要一个QGraphicsScene 用来管理所有可见的界面元素,要实现UI功能,我们需要用各种从QGraphicsItem拼装成UI控件…

scDrug:从scRNA-seq到药物反应预测

scRNA-seq技术允许在转录组水平上对数千个细胞进行测量。scRNA-seq正在成为研究肿瘤微环境中细胞成分及其相互作用的重要工具。scRNA-seq也被用于揭示肿瘤微环境模式与临床结果之间的关联,并在复杂组织中剖析药物治疗的细胞特异性效应。scRNA-seq的最新进展推动了疾…

广告业展示服务预约小程序的效果如何

虽然不少人不会与广告业直接接触,但各种形式的广告却是充斥在人们生活中,线下的传单展板、线上的视频、音频、图文等都是广告很好的传播通道,同时广告业能扩展的客户属性也非常广,下到超市小摊,上到企业公司都有大小相…

从0到0.01入门React | 006.精选 React 面试题

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

地面沉降监测站可以监测什么?

随着城市化的飞速发展,地面沉降问题日益凸显。为了及时掌握土地沉降情况,确保人们安全,就需要借助地面沉降监测站的力量。 一、实时监测土地沉降 地面沉降监测站的核心功能是实时监测土地沉降。通过高精度GNSS位移监测站和先进的数据分析技术…

【rl-agents代码学习】01——总体框架

文章目录 rl-agent Get startInstallationUsageMonitoring 具体代码 学习一下rl-agents的项目结构以及代码实现思路。 source: https://github.com/eleurent/rl-agents rl-agent Get start Installation pip install --user githttps://github.com/eleurent/rl-agentsUsage…

性能压测工具:Locust详解

一、Locust介绍 开源性能测试工具https://www.locust.io/,基于Python的性能压测工具,使用Python代码来定义用户行为,模拟百万计的并发用户访问。每个测试用户的行为由您定义,并且通过Web UI实时监控聚集过程。 压力发生器作为性…

【面试经典150 | 】颠倒二进制位

文章目录 写在前面Tag题目来源题目解读解题思路方法一:逐位颠倒方法二:分治 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法,两到三天更新一篇文章,欢迎催更…… 专栏内容以分析题目为主,并附带一些对于…

javaSE的发展历史以及openjdk和oracleJdk

1 JavaSE 的发展历史 1.1 Java 语言的介绍 SUN 公司在 1991 年成立了一个称为绿色计划(Green Project)的项目,由 James Gosling(高斯林)博士领导,绿色计划的目的是开发一种能够在各种消费性电子产品&…

【可解释AI】Alibi explain: 解释机器学习模型的算法

Alibi explain: 解释机器学习模型的算法 可解释人工智能简介Alibi特点算法Library设计展望参考资料 今天介绍Alibi Explain,一个开源Python库,用于解释机器学习模型的预测(https://github.com/SeldonIO/alibi)。该库具有最先进的分类和回归模型可解释性算…

Java --- JVM的执行引擎

目录 一、执行引擎概述 1.1、执行引擎的工作过程 二、Java代码编译和执行的过程 三、解释器 3.1、解释器工作机制 3.2、解释器分类 3.3、解释器现状 四、JIT编译器 五、热点代码及探测方式 六、方法调用计数器 6.1、热点衰减 七、回边计数器 八、HotSpot VM设置程序…

axios1.5取消请求,中断请求的方法

给input的onchange绑定事件 引入axios,使用axios.CancelToken.source()创建标记 实例中,把cancelToken的值填上

CSS常用示例100+ 【目录】

目前已有文章 11 篇 本专栏记录的是经常使用的CSS示例与技巧,主要包含CSS布局,CSS特效,CSS花边信息三部分内容。其中CSS布局主要是列出一些常用的CSS布局信息点,CSS特效主要是一些动画示例,CSS花边是描述了一些CSS相关…

Python参数传递,从入门到精通

Python是一种非常灵活的编程语言,以多种方式定义和调用函数。其中一个关键方面是参数传递的灵活性。在Python中,可以通过位置、关键字、默认值和可变长度参数等多种方式来传递参数。 1. 位置参数 位置参数是最常见的参数传递方式。当调用一个函数时&am…

node插件express(路由)的插件使用(二)——cookie 和 session的基本使用区别

文章目录 前言一、express 框架中的 cookie0、cookie的介绍和作用1. 设置cookie2.删除cookie3.获取cookie(1)安装cookie-parser(2)导入cookie-parser(3)注册中间件(4)获取cookie&…

使用Python分析时序数据集中的缺失数据

大家好,时间序列数据几乎每秒都会从多种来源收集,因此经常会出现一些数据质量问题,其中之一是缺失数据。 在序列数据的背景下,缺失信息可能由多种原因引起,包括采集系统的错误(例如传感器故障)…

Intel® DevCloud for oneAPI SYCL编程项目实践

问题陈述 实验所用的硬件环境和软件环境 本次实验使用oneAPI中支持SYCL编程模型的C编译器,使用英特尔oneAPI Developer Cloud服务,可以免安装额外环境,利用CPU作为主机(Host),同时利用GPU作为设备&#xf…

LCA

定义 最近公共祖先简称 LCA(Lowest Common Ancestor)。两个节点的最近公共祖先,就是这两个点的公共祖先里面,离根最远的那个。 性质 如果 不为 的祖先并且 不为 的祖先,那么 分别处于 的两棵不同子树中&#…

【机试题】LazyIterator迭代器懒加载问题

将下面这个未完成的Java工具类补充完成,实现懒加载的功能,该类需要实现Iterable接口,能够遍历所有数据。具体要求如下: 工具类提供了一个ValueLoader接口,用于获取数据,其中ValueLoader的接口定义为&#x…