Kafka如何解决消息丢失的问题

在 Kafka 的整个架构中可以总结出消息有三次传递的过程:

  1. Producer 端发送消息给 Broker 端
  2. Broker 将消息进行并持久化数据
  3. Consumer 端从 Broker 将消息拉取并进行消费

在以上这三步中每一步都可能会出现丢失数据的情况, 那么 Kafka 到底在什么情况下才能保证消息不丢失呢?

Producer 端丢失

Producer 端为了提升发送效率,减少 IO 操作,发送消息的时候是将多个请求异步发送出去,所以 Producer 端消息丢失更多是因为消息根本就没有发送到 Broker 端。

导致 Producer 端没有发送消息成功的有以下原因:

  • 网络原因:由于网络抖动导致数据没发到 Broker 端
  • 数据原因:消息体太大超出 Broker 承受范围导致 Broker 拒收消息

解决方案

Producer 端数据丢失是因为通过异步的方式进行发送的,所以如果此时使用发后即焚的方式发送,即调用 Producer.send(msg) 会立即返回,由于没有回调,可能因网络原因导致 Broker 并没有收到消息,此时就丢失了。

因此可以从以下几方面进行解决 Producer 端消息丢失问题:

  • 使用带回调通知函数的方法进行发送消息
  • ACK 确认机制
  • 重试次数

Producer 端通过 ACK 配置来确认消息是否生产成功,配置参数如下:

  • 0:由于发送后就自认为发送成功,这时如果发生网络抖动,会造成数据丢失
  • 1:消息发送 Leader 分区并接收成功就表示发送成功,只要 Leader 分区不挂掉,就可以保证数据不丢数据,但是如果 Leader 分区挂掉了,Follower 分区还未同步完数据且没有 ACK,这时就会丢数据
  • -1 或者 all: 消息发送需要等待 ISR 中 Leader 分区和所有的 Follower 分区都确认收到消息才算发送成功, 可靠性最高,但也不能保证不丢数据,比如:当 ISR 中只有 Leader 分区, 这样就变成 acks = 1 的情况了

Broker 端丢失

Broker 接收到数据后会将消息进行持久化到磁盘存储,为了提高吞吐量和性能,采用的是异步批量刷盘的策略,也就是说按照一定的消息量和间隔时间进行刷盘。

首先会将数据存储到 PageCache 中,至于什么时候将 Cache 中的数据刷盘是由操作系统根据自己的策略决定或者调用 fsync 命令进行强制刷盘。如果在同步到 Follower 分区前 Broker 宕机掉,且选举了一个新的 Leader 分区,那么落后的消息数据就会丢失。

既然 Broker 端消息存储是通过异步批量刷盘的,那么就有可能会丢数据。由于 Kafka 中并没有提供同步刷盘的方式,所以单个 Broker 还是很有可能丢失数据的。

kafka 通过多分区多副本机制已经可以最大限度的保证数据不丢失,如果数据已经写入 PageCache 中但是还没来得及刷写到磁盘,此时如果所在 Broker 突然宕机挂掉或者停电,极端情况还是会造成数据丢失。

解决方案

Broker 端丢失消息是因为通过异步批量刷盘的策略,先将数据存储到 PageCache,再进行异步刷盘。

因此 Kafka 是通过多分区多副本的方式来最大限度的保证数据不丢失。可以通过以下参数配合来保证:

  • unclean.leader.election.enable:该参数表示有哪些 Follower 可以有资格被选举为 Leader , 如果一个 Follower 的数据落后 Leader 太多,那么一旦它被选举为新的 Leader, 数据就会丢失,因此我们要将其设置为false,防止此类情况发生。
  • replication.factor:该参数表示分区副本的个数。建议设置 replication.factor >=3, 这样如果 Leader 副本挂掉,Follower 副本会被选举为新的 Leader 副本继续提供服务。
  • min.insync.replicas:该参数表示消息至少要被写入成功到 ISR 多少个副本才算”已提交”,建议设置min.insync.replicas > 1, 这样才可以提升消息持久性,保证数据不丢失。

另外还需要确保一下 replication.factor > min.insync.replicas,如果相等,只要有一个副本挂掉,整个分区就无法正常工作了,因此推荐设置成: replication.factor = min.insync.replicas +1, 最大限度保证系统可用性。

Consumer 端丢失

消息消费流程主要分为两个阶段:

  • 从 Broker 上拉取数据
  • 处理消息,并提交 Offset 记录

Consumer 拉取后消息后需要提交 Offset, 那么这里就可能会丢数据的。丢失原因如下:

  • 可能使用的自动提交 Offset 方式
  • 拉取消息后先提交 Offset,后处理消息,如果此时处理消息的时候异常宕机,由于 Offset 已经提交了, 待 Consumer 重启后,会从之前已提交的 Offset 下一个位置重新开始消费, 之前未处理完成的消息不会被再次处理,对于该 Consumer 来说消息就丢失了。
  • 拉取消息后先处理消息,在进行提交 Offset, 如果此时在提交之前发生异常宕机,由于没有提交成功 Offset, 待下次 Consumer 重启后还会从上次的 Offset 重新拉取消息,不会出现消息丢失的情况, 但是会出现重复消费的情况,这里只能业务自己保证幂等性。

解决方案

Consumer 端丢失消息是因为在拉取完消息后提交 Offset 造成的,因此为了不丢数据,正确的做法是:拉取数据、业务逻辑处理、提交消费 Offset 位移信息。

同时还需要设置参数 enable.auto.commit = false,采用手动提交位移的方式。另外对于消费消息重复的情况,业务自己保证幂等性, 保证只成功消费一次即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/45488.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python rtsp 硬件解码 二

上次使用了python的opencv模块 述说了使用PyNvCodec 模块,这个模块本身并没有rtsp的读写,那么读写rtsp是可以使用很多方法的,我们为了输出到pytorch直接使用AI程序,简化rtsp 输入,可以直接使用ffmpeg的子进程 方法一 …

STM8遇坑[EEPROM读取debug不正常release正常][ STVP下载成功单运行不成功][定时器消抖莫名其妙的跑不通流程]

EEPROM读取debug不正常release正常 这个超级无语,研究和半天,突然发现调到release就正常了,表现为写入看起来正常读取不正常,这个无语了,不想研究了 STVP下载不能够成功运行 本文摘录于:https://blog.csdn.net/qlexcel/article/details/71270780只是做学习备份之…

GEEMAP 中如何拉伸图像

图像拉伸是最基础的图像增强显示处理方法,主要用来改善图像显示的对比度,地物提取流程中往往首先要对图像进行拉伸处理。图像拉伸主要有三种方式:线性拉伸、直方图均衡化拉伸和直方图归一化拉伸。 GEE 中使用 .sldStyle() 的方法来进行图像的…

【深入浅出C#】章节 9: C#高级主题:LINQ查询和表达式

C#高级主题涉及到更复杂、更灵活的编程概念和技术,能够让开发者更好地应对现代软件开发中的挑战。其中,LINQ查询和表达式是C#高级主题中的一项关键内容,具有以下重要性和优势: 数据处理和操作: 在现代软件中&#xff…

8.5.tensorRT高级(3)封装系列-基于生产者消费者实现的yolov5封装

目录 前言1. yolov5封装总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程,之前有看过一遍,但是没有做笔记,很多东西也忘了。这次重新撸一遍,顺便记记笔记。 本次课程学习 tensorRT 高级-基于生产者消费者实现的yolov5封装…

postgresql中基础sql查询

postgresql中基础sql查询 创建表插入数据创建索引删除表postgresql命令速查简单查询计算查询结果 利用查询条件过滤数据模糊查询 创建表 -- 部门信息表 CREATE TABLE departments( department_id INTEGER NOT NULL -- 部门编号,主键, department_name CHARACTE…

迁移学习系列--半监督

目前追踪的代码库如下: Transfer-Learning-Library Semi-supervised-learning TorchSSL 包含算法: (2015 NeurIPS ) PiModel [1] (2017 NeurIPS ) MeanTeacher[2] (2013 ICML ) PseudoLabel [3] (2018TPAMI ) VAT (Virtual adversarial training)[4] (2019 NeurIPS) M…

数据结构(1)

数据结构其实就是将数据按照一定的关系组织起来的集合,用于组织和存储数据。 1.数据结构分类 1.逻辑结构 逻辑结构是从具体问题中抽象出来的模型,是抽象意义的结构,按照对象中数据的相互关系进行分类。 1>集合结构:集合结构中…

CentOS6.8图形界面安装Oracle11.2.0.1.0

Oracle11下载地址 https://edelivery.oracle.com/osdc/faces/SoftwareDelivery 一、环境 CentOS release 6.8 (Final),测试环境:内存2G,硬盘20G,SWAP空间4G Oracle版本:Release 11.2.0.1.0 安装包:V175…

Lookup Singularity

1. 引言 Lookup Singularity概念 由Barry WhiteHat在2022年11月在zkResearch论坛 Lookup Singularity中首次提出: 其主要目的是:让SNARK前端生成仅需做lookup的电路。Barry预测这样有很多好处,特别是对于可审计性 以及 形式化验证&#xff…

【学习FreeRTOS】第8章——FreeRTOS列表和列表项

1.列表和列表项的简介 列表是 FreeRTOS 中的一个数据结构,概念上和链表有点类似,列表被用来跟踪 FreeRTOS中的任务。列表项就是存放在列表中的项目。 列表相当于链表,列表项相当于节点,FreeRTOS 中的列表是一个双向环形链表列表的…

微软Win11 Dev预览版Build23526发布

近日,微软Win11 Dev预览版Build23526发布,修复了不少问题。牛比如斯Microsoft,也有这么多bug,所以你写再多bug也不作为奇啊。 主要更新问题 [开始菜单] 修复了在高对比度主题下,打开开始菜单中的“所有应…

ZooKeeper实现分布式锁

Lock public interface Lock {public void lock();public void unlock(); }ZkLock /*** 使用ZooKeeper实现分布式锁* */ public class ZkLock implements Lock{private CuratorFramework client;private final String zkPath;private Integer count; // 记录获取锁的次数&…

Spring Boot通过企业邮箱发件被Gmail退回的解决方法

这两天给我们开发的Chrome插件:Youtube中文配音 增加了账户注册和登录功能,其中有一步是邮箱验证,所以这边会在Spring Boot后台给用户的邮箱发个验证信息。如何发邮件在之前的文章教程里就有,这里就不说了,着重说说这两…

Lucene教程_编程入门自学教程_菜鸟教程-免费教程分享

教程简介 Lucene是apache软件基金会 jakarta项目组的一个子项目,是一个开放源代码的全文检索引擎工具包,但它不是一个完整的全文检索引擎,而是一个全文检索引擎的架构,提供了完整的查询引擎和索引引擎,部分文本分析引…

通过 kk 创建 k8s 集群和 kubesphere

官方文档:多节点安装 确保从正确的区域下载 KubeKey export KKZONEcn下载 KubeKey curl -sfL https://get-kk.kubesphere.io | VERSIONv3.0.7 sh -为 kk 添加可执行权限: chmod x kk创建 config 文件 KubeSphere 版本:v3.3 支持的 Kuber…

Linux 安全技术和防火墙

目录 1 安全技术 2 防火墙 2.1 防火墙的分类 2.1.1 包过滤防火墙 2.1.2 应用层防火墙 3 Linux 防火墙的基本认识 3.1 iptables & netfilter 3.2 四表五链 4 iptables 4.2 数据包的常见控制类型 4.3 实际操作 4.3.1 加新的防火墙规则 4.3.2 查看规则表 4.3.…

vue3 基于element plus对el-pagination进行二次封装

vue3 基于element plus对el-pagination进行二次封装 1、前言2、在components文件夹中新建pagination.vue文件3、在组件内使用分页 1、前言 在vue3项目中,如果每个列表页都敲一遍分页方法,显然是不合理的,那么,下面我将基于elemen…

企事业数字培训及知识库平台

前言 随着信息化的进一步推进,目前各行各业都在进行数字化转型,本人从事过医疗、政务等系统的研发,和客户深入交流过日常办公中“知识”的重要性,再加上现在倡导的互联互通、数据安全、无纸化办公等概念,所以无论是企业…

打家劫舍 II——力扣213

动规 int robrange(vector<int>& nums, int start, int end){int first=nums[start]