RocketMQ快速入门:如何保证消息不丢失|保证消息可靠性(九)

0. 引言

在金融、电商等对数据完整性要求极高的行业,消息的丢失可能会导致数据不一致,严重影响业务逻辑和数据统计,也影响客户体验,所以在很多业务场景下,我们都要求数据不能丢失。而rocketmq中,如何对消息防丢失进行处理的呢?

1. 原理

1.2 产生消息丢失的场景

首先我们要理解消息的传递过程,在哪些阶段会导致消息丢失,才能知道如何进行防控。

我们之前分析过rabbitmq如何保证消息不丢失, rabbitmq内部有交换机这一转发步骤,所以相对比rocketmq更加复杂,但是两者的分析方法是一致的。
在这里插入图片描述

rocketmq的消息传递分为3个阶段:
(1)生产者发送消息到broker的队列中
(2)broker存储消息
(3)消费者到队列获取消息进行消费
在这里插入图片描述
而这三个阶段可能会导致消息丢失的场景是什么呢?其实由rabbitmq的分析我们可以得到启发。
(1)生产者发送消息到broker的队列中

生产者在发出消息后,可能因为网络异常、broker宕机,导致发出的消息实际并没有到达broker

(2)broker存储消息

broker的存储机制是将消息先存储到内存,存储完成后再发送回执给生产者,然后再异步将数据刷到磁盘,但如果在这个刷盘这个过程中broker宕机了,也会导致消息丢失

(3)消费者到队列获取消息进行消费

broker在将消息发出后,同样可能因为网络异常、消费者宕机或者消息者消费到一半产生错误等因素,导致消息实际并没有被消费者消费,但broker又扣除了这条消息,就会导致消息丢失

1.2 防丢失措施

阶段一:生产者发送消息到broker的队列中

1、因为发送到broker期间网络因素我们很难干预,也很难百分比保证。第一点我们能做的,如果其中一个broker宕机,那能有备用节点顶上,保证可用性。于是第一项就是多节点部署broker

2、但万一节点都挂了呢,或者整个机房网络瘫痪了,如何保证消息不丢失,我们只要从上游控制,如果下游不通时,就不要发了,待会再发。于是也衍生出消息发送失败时的重试机制

3、但如果一直重发不成功怎么办呢,那就需要下游告知上游,这次发送没成功,你记录好状态,这就是broker要有返回状态告知,否则生产者也不知道到底发送成功没有。broker中提供了3种发送方式:同步、异步、单向(详见之前的文章: RocketMQ快速入门:集成java客户端实现各类消息发送|异步、同步、顺序、单向、延迟、事务(五)附带源码)。

这三种方式中单向肯定不行,他是不管返回结果的,最容易丢失消息。而异步需要设置回调函数,在回调函数中处理发送失败时的逻辑,如果对于一些场景回调里很能补救,最常见的就是回调里进行重发,所以最优先保证消息可靠的就是同步发送的方式,一旦获取到发送失败,就进行补救处理,或者不再继续后续的业务逻辑,整个流程直接报错打回

总结一下,生产阶段保证消息可靠的手段包括多节点部署broker, 消息重发、同步发送。这几种方式实际上是可以配合使用的, 比如多节点部署,通过同步发送,发送失败时进行3次重发,都重发失败则记录状态。
在这里插入图片描述

阶段二:broker存储消息

存储阶段导致丢失的原因就是因为broker默认的是异步刷盘机制,如果改成同步刷盘呢,先存储到内存,然后刷新到磁盘,刷新成功后才给生产者返回成功收到的回执,以此保证消息可靠。rocketmq中也是支持同步刷盘的。

但如果只有一个节点的话,即使同步刷盘,当broker宕机后,没有备用的,还是会导致服务不可用,相对可靠性就没有保障了。所以同步刷盘,也可以配合着多节点部署使用

当然如果你的场景对可用性要求不高,即使宕机,只要报错会生产者就行,那同步刷盘也足够了

总结一下,存储阶段主要依赖同步刷盘和多节点部署来保障可靠性,当然多节点部署可以根据业务情况和成本预算选择。
在这里插入图片描述

阶段三:消费者到队列获取消息进行消费

消费阶段的丢失可能性主要来源于消费者没处理好之前就宕机或则异常了,首先一点能想到的那就多个消费者呢,但实际上多点部署在消费阶段并不能解决问题,因为rocketmq消费模式有广播模式和集群模式,广播模式下每个节点都会收到消息,这个模式下的天然就是多节点部署。而集群模式本身也是基于多消费者的情况,但两者都无法保障当消息发送给某一节点后,这个节点拉去了消息,但没实际处理完就异常的场景。

所以就考虑当消费者消费完成后,再给broker发送成功消费的回执,这时broker才更新消息偏移量,将消息标识为被消费。如此才能保障消息的可靠性。

在这里插入图片描述

2. 实现

2.1 生产阶段

1、多节点部署,可以部署主从或集群模式,这里不讲解详细的搭建流程,后续单独讲解

2、同步发送,主要通过producer.send来实现同步发送

public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {// 声明groupDefaultMQProducer producer = new DefaultMQProducer("group_test");// 声明namesrv地址producer.setNamesrvAddr("localhost:9876");// 设置重试次数producer.setRetryTimesWhenSendFailed(2);// 启动实例producer.start();// 设置消息的topic,tag以及消息体Message msg = new Message("topic_test", "tag_test", "消息内容1".getBytes(StandardCharsets.UTF_8));// 发送消息,并设置10s连接超时SendResult send = producer.send(msg, 10000);System.out.println("发送结果:"+send);// 关闭实例producer.shutdown();}

如果是springboot集成的,可以通过SendResult sendResult = rocketMQTemplate.syncSend("topic_test:tag_test", message);来实现。通过发送结果SendResult对象,来判断发送失败后的处理逻辑

3、发送重试
(1)首先手动重发的实现很简单,只需要根据send.getSendStatus()状态来判断,如果需要重发多次的,可以结合guava-retry 等重发组件来更方便的实现

// 发送消息,并设置10s连接超时SendResult send = producer.send(msg, 10000);System.out.println("发送结果:"+send);if(!send.getSendStatus().equals(SendStatus.SEND_OK)){// 发送失败,手动重发send = producer.send(msg, 10000);}

(2)当然rocketmq也封装好了重试机制给我们使用,其重试机制采用衰减的形式,也就是重试间隔时间会逐渐增加
在这里插入图片描述
我们只需要通过producer.setRetryTimesWhenSendFailed(2);方法即可设置,会在发送失败时自动触发重新发送,同时如果超过设置的超时时间还未接收到成功的结果也会触发重发机制,就不需要我们手动书写重发逻辑了,更加推荐这种方式。

        // 声明groupDefaultMQProducer producer = new DefaultMQProducer("group_test");// 声明namesrv地址producer.setNamesrvAddr("localhost:9876");// 设置重试次数producer.setRetryTimesWhenSendFailed(2);// 启动实例producer.start();

2.2 存储阶段

1、主要将broker的刷盘策略设置为同步刷盘,需要修改broker.conf配置文件

# 设置为同步刷盘模式
flushDiskType = SYNC_FLUSH

2、如果配置的是多节点,一般是主从模式,为了防止主节点有数据,从节点没刷到数据的情况,就需要开启从节点刷盘后再返回ACK回执给生产者,需要修改从节点broker配置文件

# 默认为 ASYNC_MASTER
brokerRole=SYNC_MASTER

broker提供了两种主从同步模式:ASYNC_MASTER异步 和 SYNC_MASTER同步

  • ASYNC_MASTER:

消息发送到master节点后,开启一个异步线程更新给从节点,这个过程中有消息同步丢失的风险,优点是性能高

  • SYNC_MASTER:

消息发送到master节点后,同步更新到从节点,当从节点更新完再返回成功的ACK回执给生产者,表示消息发送成功,可靠性高,但性能会有所下降

3、关于多节点部署,我们在后续单独讲解

2.3 消费阶段

1、其实现就是消费后返回成功状态即可

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 设置topicconsumer.subscribe("topic_test", "*");// 设置重试次数consumer.setMaxReconsumeTimes(2);// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {for (MessageExt msg : list) {String topic = msg.getTopic();try {String messageBody = new String(msg.getBody(), "utf-8");System.out.println(topic+":"+messageBody);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();

2、如果返回的是ConsumeConcurrentlyStatus.RECONSUME_LATER状态,消费者就会触发稍后重试机制进行重新消费,同样的可以通过consumer.setMaxReconsumeTimes设置最大重试次数

// 设置重试次数
consumer.setMaxReconsumeTimes(2);

其重试次数和时延等级与生产重试是一致的
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/30805.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

当游戏遭遇安全问题,我们应该怎么做?

在游戏安全领域&#xff0c;专业性最差、但最常见的案例类型是DDoS攻击&#xff08;分布式拒绝服务攻击&#xff09;。出于它的特性&#xff0c;中小厂商、独立开发者较容易遭受这类攻击。 例如&#xff0c;今年2月29日上线的手游《雷索纳斯》就遭受了名为ACCN组织发起的DDoS攻…

【内含优惠码】重磅发售!《2023年度中国量化投资白皮书》(纸质版)

这是可以公开了解量化行业图景的&#xff0c;为数不多资料。 简介 《2023年度中国量化投资白皮书》由宽邦科技、华泰证券、金融阶、华锐技术、AMD、阿里云、英迈中国等多家机构联合发起编写&#xff0c;并于2024年6月15日正式发布&#xff0c;全书公17万字6大章节勾勒最新量化…

Studying-代码随想录训练营day15| 222.完全二叉树的节点个数、110.平衡二叉树、257.二叉树的所有路径、404.左叶子之和

第十五天&#xff0c;二叉树part03&#x1f4aa;&#xff0c;编程语言&#xff1a;C 目录 257.完全二叉树的节点个数 110.平衡二叉树 257.二叉树的所有路径 404.左叶子之和 总结 257.完全二叉树的节点个数 文档讲解&#xff1a;代码随想录完全二叉树的节点个数 视频讲解…

Arduino平台软硬件原理及使用——无源蜂鸣器模块的使用

文章目录 一、蜂鸣器发声原理 二、无源蜂鸣器与有源蜂鸣器的区分 三、无源蜂鸣器模块在Arduino中的使用 一、蜂鸣器发声原理 上图为常见的不同封装及规格的蜂鸣器。 同蜜蜂、知了等昆虫发声原理一样&#xff0c;蜂鸣器同样靠振动来发出声音&#xff1b; 如上图为无源蜂鸣器的内…

【总结】ui自动化selenium知识点总结

1. 大致原理 首页安装第三方库selenium库&#xff0c; 其次要下载好浏览器驱动文件&#xff0c;比如谷歌的 chromedriver.exe&#xff0c;配置上环境变量。 使用selenium的webdriver类去创建一个浏览器驱动对象赋值叫driver&#xff0c;一个浏览器驱动对象就可以 实现 对浏…

【vue3|第11期】Vue3中的ref属性:让元素引用变得简单

日期&#xff1a;2024年6月19日 作者&#xff1a;Commas 签名&#xff1a;(ง •_•)ง 积跬步以致千里,积小流以成江海…… 注释&#xff1a;如果您觉得有所帮助&#xff0c;帮忙点个赞&#xff0c;也可以关注我&#xff0c;我们一起成长&#xff1b;如果有不对的地方&#xf…

招聘主播?小心是大陷阱!!!

高薪招聘主播的骗局通常涉及一系列精心设计的步骤&#xff0c;旨在引诱求职者上钩并从中获利。以下是这种骗局常见的几个关键环节&#xff1a; 首先&#xff0c;骗子会通过各种渠道发布诱人的招聘信息&#xff0c;声称正在寻找有潜力的主播&#xff0c;并承诺提供高额的底薪和…

虚拟3D沉浸式展会编辑平台降低了线上办展的门槛

在数字化浪潮的引领下&#xff0c;VR虚拟网上展会正逐渐成为企业展示品牌实力、吸引潜在客户的首选平台。我们与广交会携手走过三年多的时光&#xff0c;凭借优质的服务和丰富的经验&#xff0c;赢得了客户的广泛赞誉。 面对传统展会活动繁多、企业运营繁忙的挑战&#xff0c;许…

【绝对有用】刚刚开通的GPT-4o计算这种数学题目出现问题了

欢迎关注如何解决以上问题的方法&#xff1a;查看个人简介中的链接的具体解决方案

[Qt的学习日常]--窗口

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 目录 一、窗口的分…

全面了解虚拟线上会议室的核心功能和优势,助力企业高效协作

在现代办公环境中&#xff0c;虚拟线上会议室日益普及&#xff0c;成为企业沟通协作的重要工具。虚拟会议室的出现不仅简化了传统会议的复杂流程&#xff0c;还提供了一系列优势功能&#xff0c;提升了用户的会议体验。 一、虚拟线上会议室的优势功能 1、实时音视频会议 虚拟线…

全网最易懂,开源时序数据库influxDB,实际应用评测

前言&#xff1a; 当今是信息爆炸的时代&#xff0c;在处理高频数据时&#xff0c;关系型数据库oracle/mysql明显表现出乏力&#xff0c;因秒级、毫秒级高频数据&#xff0c;分分钟可以把关系型数据库的表塞爆。在日常生活工作中&#xff0c;我们经常会遇到哪些需要高频分析的场…

[自动驾驶 SoC]-3 英伟达Orin

NVIDIA Jetson AGX OrinTM series (资料来源&#xff1a;nvidia-jetson-agx-orin-technical-brief.pdf) 1 整体介绍 1) Orin SoC结构 Orin SoC&#xff0c;如下图所示&#xff0c;由一个NVIDIA Ampere architecture GPU, Arm Cortex-A78AE CPU, 下一代深度学习核视觉处理加速…

企业防盗版,如何保障上网安全

信息化的发展企业日常办公越来越依赖互联网。然而&#xff0c;终端及普通PC在访问互联网过程中&#xff0c;会面临各种不容忽视的风险。这些风险包括&#xff1a; 员工主动故意的数据泄漏&#xff1a;员工可能故意泄露敏感信息。后台应用程序外发信息&#xff1a;一些应用程序…

代码覆盖率:衡量测试的有效性

在软件开发领域&#xff0c;确保代码的可靠性和稳健性至关重要。实现这一目标的关键实践之一是通过测试。但是&#xff0c;测试本身需要进行测量和评估&#xff0c;以确保其有效性。这就是代码覆盖率发挥作用的地方。代码覆盖率是一种指标&#xff0c;它量化了测试期间程序源代…

遥感图像地物覆盖分类,数据集制作-分类模型对比-分类保姆级教程

遥感图像地物覆盖分类,数据集制作-分类模型对比-分类保姆级教程 在遥感影像上人工制作分类数据集采用python+gdal库制作数据集挑选分类模型(RF、KNN、SVM、逻辑回归)选择随机森林模型建模分类遥感图像预测在遥感影像上人工制作分类数据集 1.新建shp文件 地理坐标系保持和影像…

本地大模型服务 Ollama:从安装到使用

文章目录 前言一、下载安装1.1 官网安装1.2 压缩包安装1.3 docker 安装二、命令行使用2.1 常用命令2.2 模型列表2.3 使用三、Open-WebUI3.1 安装3.2 修改语言3.3 使用参考前言 Ollama 是专为在本地机器上便捷部署和运行大型语言模型(LLM)而设计的开源框架,它有如下几个特点…

如何应对 Android 面试官 -> MVVM 实战一个新闻客户端 (上)

前言 本章我们基于重构的方式进行一个 MVVM 的实战&#xff0c;我们将一个新闻列表的普通实现&#xff0c;一步一步的改造成 MVVM 的架构模式&#xff0c;一共分为上中下三个章节&#xff1b; 传统方式实现 首先咱们来看具体实现的最终效果&#xff0c;就是一个新闻列表页面&a…

connect-caption-and-trace——用于共同建模图像、文本和人类凝视轨迹预测

介绍 论文地址&#xff1a;https://arxiv.org/abs/2105.05964 源码地址&#xff1a;https://github.com/facebookresearch/connect-caption-and-trace 在过去&#xff0c;计算机视觉和自然语言处理领域的模型和算法的发展只有偶尔的重叠&#xff0c;但近年来&#xff0c;这两…

python-04

str.spilt() str.spilt(str" ", num string.count(str)); str&#xff1a;分隔符&#xff0c;默认为所有的空字符&#xff0c;包括空格、换行符"\n"、制表符"\t"等。 num&#xff1a;分隔次数 str "小时候 总有他们在耳边叮咛嘱咐 小…