【系统设计】如何确保消息不会丢失?

一、前言

对于大部分业务系统来说,丢消息意味着数据丢失,是完全无法接受的。其实,现在主流的消息队列产品都提供了非常完善的消息可靠性保证机制,完全可以做到在消息传递过程中,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。

绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正确使用和配置消息队列导致的。虽然不同的消息队列提供的 API 不一样,相关的配置项也不同,但是在保证消息可靠传递这块儿,它们的实现原理是一样的。

这节课我们就来讲一下,消息队列是怎么保证消息可靠传递的,这里面的实现原理是怎么样的。当你熟知原理以后,无论你使用任何一种消息队列,再简单看一下它的 API 和相关配置项,就能很快知道该如何配置消息队列,写出可靠的代码,避免消息丢失。

二、检测消息丢失的方法

我们说,用消息队列最尴尬的情况不是丢消息,而是消息丢了还不知道。一般而言,一个新的系统刚刚上线,各方面都不太稳定,需要一个磨合期,这个时候,特别需要监控到你的系统中是否有消息丢失的情况。

如果是 IT 基础设施比较完善的公司,一般都有分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息。如果没有这样的追踪系统,这里我提供一个比较简单的方法,来检查是否有消息丢失的情况。

我们可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。

如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。

大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中,待你的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。

如果是在一个分布式系统中实现这个检测方法,有几个问题需要你注意。

首先,像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。

如果你的系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。

三、确保消息可靠传递

讲完了检测消息丢失的方法,接下来我们一起来看一下,整个消息从生产到消费的过程中,哪些地方可能会导致丢消息,以及应该如何避免消息丢失。

  • 生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。

  • 存储阶段: 在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。

  • 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

3.1、生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

你在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。以 Kafka 为例,我们看一下如何可靠地发送消息:

同步发送时,只要注意捕获异常即可。

try {RecordMetadata metadata = producer.send(record).get();System.out.println(" 消息发送成功。");
} catch (Throwable e) {System.out.println(" 消息发送失败!");System.out.println(e);
}

异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。

producer.send(record, (metadata, exception) -> {if (metadata != null) {System.out.println(" 消息发送成功。");} else {System.out.println(" 消息发送失败!");System.out.println(exception);}
});

如果超过一定超时时间还是失败,那就抛出异常,由发者自己在应用层面进行处理,手动重试发送 或者 记录失败消息后续补偿。

不过我们需要特别注意是,RocketMQ支持多种「消息类型」,但是并不是对所有「消息类型」 都会有「消息确认机制」和「失败重试机制」。

RocketMQ生产消息时,支持多种「消息类型」和「消息发送模式」。咱们白话为主,就不展开源码了,有兴趣同学可以参考

org.apache.rocketmq.client.producer.MQProducer这个接口定义即可。

消息类型:

  • 普通消息:发送普通消息,异常时默认重试

  • 普通有序消息:发送普通有序消息,通过指定「消息筛选器selector」,动态决定发送哪个队列。异常默认不重试,可以用户自己重试,并发送到其他队列

  • 严格有序消息:发送严格有序消息,通过指定队列,保证严格有序,异常默认不重试

消息发送模式:

  • 同步:调用发送消息方法后,同步阻塞,直到返回SendResult。配置retryTimesWhenSendFailed重试次数。

  • 异步:调用发送消息方法后,立即返回,发送结果会通过开发者自己注册的回调函数SendCallback进行处理。配置retryTimesWhenSendAsyncFailed重试次数。

  • 单向发送:这种方法完全不关心发送后的返回结果。显然,它具有最大吞吐量,但也存在消息丢失的潜在风险。

发送消息的模式和消息类型,可以通过 消息确认、mq-client自动「失败重试机制」、业务自定义重试 等方式,确保消息发送不丢失。

3.2、存储阶段

在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

在Kafka中,可以设置以下参数:

  • replication.factor >= 3

该参数表示分区副本的个数(之前已经介绍过了副本的概念)。

replication.factor =3

建议设置大于1,至少有一个副本,随着业务的重要性提升可以增加副本数量。如果 Leader 副本出现不可用,Follower 副本会被选举为新的 Leader 副本继续提供服务。冗余存储数据,做备份本身就是一种提升消息可靠性的办法。

  • unclean.leader.election.enable = false

该参数表示有哪些 Follower 可以有资格被选举为 Leader 。

unclean.leader.election.enable = false

如果一个 Follower 的数据落后 Leader 太多,那么一旦它被选举为新的 Leader, 数据就会丢失,因此我们要将其设置为false,防止此类情况发生。

  • min.insync.replicas > 1

该参数表示消息至少要被写入成功到 ISR 多少个副本才算"已提交",默认值是1,建议设置 min.insync.replicas > 1,这样才可以提升消息持久性,保证数据不丢失。

min.insync.replicas = 2

另外我们还需要确保一下replication.factor > min.insync.replicas,如果相等,只要有一个副本不可以,整个 partition 将无法正常提供服务。

为了保证消息持久性的同时还要保证可用性,推荐设置成:replication.factor =min.insync.replicas +1, 最大限度保证系统可用性。

3.3、消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

同样,我们以用 Python 语言消费 RabbitMQ 消息为例,来看一下如何实现一段可靠的消费代码:

def callback(ch, method, properties, body):print(" [x] 收到消息 %r" % body)# 在这儿处理收到的消息database.save(body)print(" [x] 消费完成 ")# 完成消费业务逻辑后发送消费确认响应ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_consume(queue='hello', on_message_callback=callback)

你可以看到,在消费的回调方法 callback 中,正确的顺序是,先是把消息保存到数据库中,然后再发送消费确认响应。这样如果保存消息到数据库失败了,就不会执行消费确认的代码,下次拉到的还是这条消息,直到消费成功。

如果在尝试消费的过程中达到了最大重试次数(通常为16次),仍然无法成功消费,则消息将被发送到死信队列,以确保消息存储的可靠性。后续业务可以根据死信队列,来做相关补偿措施。

四、参考

  • Kafka 是如何做到消息不丢或不重复的?

  • 3分钟白话RocketMQ系列—— 如何保证消息不丢失

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/225386.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决kernel32.dll丢失的修复方式,kernel32.dll预防错误的方法

kernel32.dll文件是电脑中的一个重要文件,如果电脑出现kernel32.dll丢失的错误提示,那么电脑中的一些程序将不能正常使用,那么出现这样的问题有什么解决办法呢?那么今天就和大家说说解决kernel32.dll丢失的修复方式。 一.kernel32…

[Knowledge Distillation]论文分析:Distilling the Knowledge in a Neural Network

文章目录 一、完整代码二、论文解读2.1 介绍2.2 Distillation2.3 结果 三、整体总结 论文:Distilling the Knowledge in a Neural Network 作者:Geoffrey Hinton, Oriol Vinyals, Jeff Dean 时间:2015 一、完整代码 这里我们使用python代码进…

探索Linux服务器配置信息的命令

目录 前言1 uname2 lscpu3 free4 df5 lspci6 lsusb7 lshw结语 前言 Linux系统提供了许多命令,用于获取和查看服务器的软硬件配置信息。这些命令可以帮助管理员和用户了解系统的状态、资源使用情况以及硬件设备的相关信息。以下是一些常用的命令以及它们的作用、使用…

线程池ThreadPoolExecutor详解

线程池ThreadPoolExecutor详解 大家好,我是微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天,让我们深入研究Java中线程池的强大工具——ThreadPoolExecutor,解析它的工作原理、配置参数…

[Vulnhub靶机] DC-1

[Vulnhub靶机] DC-1靶机渗透思路及方法(个人分享) 靶机下载地址: https://download.vulnhub.com/dc/DC-1.zip 靶机地址:192.168.67.25 攻击机地址:192.168.67.3 一、信息收集 1.使用 arp-scan 命令扫描网段内存活的…

html的学习笔记

开发工具:vscode 文字标签 h1:一级标题,h2:二级标题h6 p:段落标签 hr:分隔线 br:换行 strong/b:文字加粗 ins/u:下划线 em/i:倾斜 del/s:删除线 媒体标签 图片…

vue中使用ailwind css

官网地址: 安装 - Tailwind CSS 中文网 推荐一个网站,里面可以查询所有的TailWindCSS的class样式: Tailwind CSS Cheat Sheet npm安装: 注意:1、这里要用npm,不要用cnpm。2、最好用install,不要…

泛型的相关内容

首先我们来了解一下什么是泛型&#xff0c;泛型的作用又是什么。 泛型的形式是 ArrayList<Object> objects new ArrayList<>(); 这里的<Object>这个就是泛型&#xff0c;添加泛型的作用又是什么呢&#xff0c;它可以限制添加对象的类型&#xff0c;比如A…

黑马点评05分布式锁 1互斥锁和过期时间

实战篇-09.分布式锁-基本原理和不同实现方式对比_哔哩哔哩_bilibili 1.分布式锁 因为jvm内部的sychonized锁无法在不同jvm之间共享锁监视器&#xff0c;所以需要一个jvm外部的锁来共享。 2.redis setnx互斥锁 加锁解锁即可 2.1不释放锁可能死锁 redis 的setnx不会自动释放锁…

Crow:黑魔法 添加路由2 new_rule_tagged实现模板参数的绑定

Crow添加路由的定义: #define CROW_ROUTE(app, url) app.template route<crow::black_magic::get_parameter_tag(url)>(url) Crow:黑魔法get_parameter_tag-CSDN博客 介绍了get_parameter_tag的作用 route的定义为: auto route(std::string&& rule) -> …

微信小程序生成二维码海报并分享

背景&#xff1a;点击图标&#xff0c;生成海报后&#xff0c;点击保存相册&#xff0c;可以保存 生成海报&#xff1a;插件wxa-plugin-canvas&#xff0c;此处使用页面异步生成组件方式&#xff0c;官网地址&#xff1a;wxa-plugin-canvas - npm 二维码&#xff1a;调用后端…

用CC三维建模建出的OSGB格式,用模方打不开,显示该路径包含OSGB瓦块数量0,是什么原因?

答&#xff1a;模方只识别tile命名的模型文件&#xff0c;此模型是不分块输出&#xff0c;要平面切块重新跑。 模方是一款针对实景三维模型的冗余碎片、水面残缺、道路不平、标牌破损、纹理拉伸模糊等共性问题研发的实景三维模型修复编辑软件。模方4.1新增自动单体化建模功能&…

巴贝拉葡萄酒是单一品种还是混合品种制成的?

大多数巴贝拉葡萄酒都是由单一的巴贝拉葡萄品种制成的&#xff0c;许多意大利葡萄酒商开始尝试在巴贝拉葡萄酒中加入其它葡萄品种&#xff0c;其中两个最受欢迎的意大利品种是皮埃蒙特的巴贝拉德阿尔巴和达斯蒂。和朋友在一家意大利餐厅吃饭&#xff0c;被酒单吓到了&#xff1…

24. 常用shell之 du - 显示目录空间使用情况 的用法和衍生用法

du&#xff08;disk usage&#xff09;是 Unix 和类 Unix 系统&#xff08;如 Linux 和 macOS&#xff09;中的一个命令&#xff0c;用于显示文件和目录的磁盘空间使用情况。与 df 命令不同&#xff0c;du 更注重于单个文件和目录的空间使用&#xff0c;而不是整个文件系统的使…

10.1Linux输入子系统介绍

输入设备介绍 鼠标、键盘、按键、触摸屏等提供输入支持的设备都属于输入设备&#xff0c;在Linux也提供了一套驱动框架“input 子系统”与之对应&#xff0c;用于抽象输入设备&#xff0c;并提供管理输入设备驱动和输入事件处理程序的功能 input 子系统 input 子系统用于管理…

GPT 魔力涌现

GPT 二、Prompt 的典型构成 角色&#xff1a;给 AI 定义一个最匹配任务的角色&#xff0c;比如&#xff1a;「你是一位软件工程师」「你是一位小学老师」指示&#xff1a;对任务进行描述上下文&#xff1a;给出与任务相关的其它背景信息&#xff08;尤其在多轮交互中&#xff…

分布式文件存储系统minio了解下

什么是minio minio 是一个基于 Apache License v2.0 开源协议的对象存储服务。非常适合于存储大容量非结构化的数据&#xff0c;例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等&#xff0c;而一个对象文件可以是任意大小。 是一种海量、安全、低成本、高可靠的云存储…

机器学习笔记 - 用于自动化检测服饰的YOLOs-Fashionpedia模型

一、安装环境 使用预训练模型和 PyTorch Lightning 来自动化产品标记过程,将大幅度提高耗时的任务的效率。 # 安装软件包 pip install torch== 2.0 .0 pip install pytorch-lightning== 2.0 .1 pip install datasets== 2.11 .0 pip install Transformers== 4.30

Map.entry用法详解

Map.entry用法详解 大家好&#xff0c;我是免费搭建查券返利机器人赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;在Java编程的旅途中&#xff0c;Map.Entry是一位不可或缺的伙伴&#xff0c;为我们提供了在Map中…