剖析 Kafka 消息丢失的原因

文章目录

  • 前言
  • 一、生产者导致的消息丢失的场景
    • 场景1:消息太大
      • 解决方案 :
        • 1、减少生产者发送消息体体积
        • 2、调整参数max.request.size
    • 场景2:异步发送机制
      • 解决方案 :
        • 1、使用带回调函数的发送方法
    • 场景3:网络问题和配置不当
      • 解决方案 :
        • 1、设置`acks`参数设置为"all"
        • 2、设置重试参数
        • 3、设置 min.insync.replicas参数
  • 二、Broker服务端导致的消息丢失的场景
    • 场景1:Broker 宕机
      • 解决方案 :
        • 1、增加副本数量
    • 场景2:leader挂掉,follower未同步
      • 解决方案 :
        • 1、leader竞选资格
        • 2、增加副本数量
    • 场景3:持久化错误
      • 解决方案 :
        • 1、调整刷盘参数
        • 2、增加副本数量
  • 三、消费者导致的消息丢失
    • 场景1:提交偏移量后消息处理失败
      • 解决方案 :
    • 场景2:并发消费
      • 解决方案 :
    • 场景3:消息堆积
      • 解决方案 :
    • 场景4:消费者组rebalance
      • 解决方案 :
        • 1、提高消费能力
        • 2、调整参数避免不 必要的rebalance
  • 依然会丢消息的场景
    • 场景 1:
      • 场景 2:
    • 总结

前言

Kafka消息丢失的原因通常涉及多个方面,包括生产者、消费者和Kafka服务端(Broker)的配置和行为。下面将围绕这三个关键点,详细探讨Kafka消息丢失的常见原因,并提供相应的解决方案和最佳实践。具体分析如下:

一、生产者导致的消息丢失的场景

场景1:消息太大

消息大小超过Broker的message.max.bytes的值。此时Broker会直接返回错误。

解决方案 :

1、减少生产者发送消息体体积

可以通过压缩消息体、去除不必要的字段等方式减小消息大小。

2、调整参数max.request.size

max.request.size,表示生产者发送的单个消息的最大值,也可以指单个请求中所有消息的总和大小。默认值为1048576B,1MB。这个参数的值值必须小于Broker的message.max.bytes。

场景2:异步发送机制

Kafka生产者默认采用异步发送消息,如果未正确处理发送结果,可能导致消息丢失。

解决方案 :

1、使用带回调函数的发送方法

不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。带有回调通知的 send 方法可以针对发送失败的消息进行重试处理。

场景3:网络问题和配置不当

​ 生产者在发送消息时可能遇到网络抖动或完全中断,导致消息未能到达Broker。如果生产者的配置没有考虑这种情况,例如未设置恰当的重试机制(retries参数)和确认机制(acks参数),消息就可能在网络不稳定时丢失。

解决方案 :

1、设置acks参数设置为"all"

acks参数指定了必须要有多少个分区副本收到消息,生产者才认为该消息是写入成功的,这个参数对于消息是否丢失起着重要作用,该参数的配置具体如下:

  • acks = all/-1 : 表示kafka isr列表中所有的副本同步数据成功,才返回消息给客户端
  • acks = 0 :表示客户端只管发送数据,不管服务端接收数据的任何情况
  • acks = 1 :表示客户端发送数据后,需要在服务端 leader 副本写入数据成功后,返回响应

使用同步发送方式或确保acks参数设置为"all",以确保所有副本接收到消息。

2、设置重试参数

重试参数主要有retries和retry.backoff.ms两个参数。

(1)参数 retries是指生产者重试次数,该参数默认值为0。

消息在从生产者从发出到成功写入broker之前可能发生一些临时性异常,比如网络抖动、leader副本选举等,这些异常发生时客户端会进行重试,而重试的次数由retries参数指定。如果重试达到设定次数,生产者才会放弃重试并抛出异常。但是并不是所有的异常都可以通过重试来解决,比如消息过大,超过max.request.size参数配置的数值(默认值为1048576B,1MB)。如果设置retries大于0而没有设置参数max.in.flight.requests.per.connection(限制每个连接,也就是客户端与Node之间的连接最多缓存请求数)大于0则意味着放弃发送消息的顺序性。

使用retries的默认值交给使用方自己去控制,结果往往是不处理。所以通用设置建议设置如下:

retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1

该参数的设置已经在kafka 2.4版本中默认设置为Integer.MAX_VALUE;同时增加了delivery.timeout.ms的参数设置。

(2)参数retry.backoff.ms,用来设定两次重试之间的时间间隔,默认值为100。

避免无效的频繁重试。在配置retries和retry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试。

3、设置 min.insync.replicas参数

参数min.insync.replicas, 该参数控制的是消息至少被写入到多少个副本才算是 “真正写入”,该值默认值为 1,不建议使用默认值 1, 建议设置min.insync.replicas至少为2。 因为如果同步副本的数量低于该配置值,则生产者会收到错误响应,从而确保消息不丢失。

二、Broker服务端导致的消息丢失的场景

场景1:Broker 宕机

为了提升性能,Kafka 使用 Page Cache,先将消息写入 Page Cache,采用了异步刷盘机制去把消息保存到磁盘。如果刷盘之前,Broker Leader 节点宕机了,并且没有 Follower 节点可以切换成 Leader,则 Leader 重启后这部分未刷盘的消息就会丢失。

如果Broker的副本因子(replication.factor)设置过低,或者同步副本的数量(min.insync.replicas)设置不当,一旦Leader Broker宕机,选举出的新的Leader可能不包含全部消息,导致消息丢失。

解决方案 :

1、增加副本数量

这种场景下多设置副本数是一个好的选择,通常的做法是设置 replication.factor >= 3,这样每个 Partition 就会有 3个以上 Broker 副本来保存消息,同时宕机的概率很低。

同时配合设置上文提到的参数 min.insync.replicas至少为2(不建议使用默认值 1),表示消息至少要被成功写入到 2 个 Broker 副本才算是发送成功。

场景2:leader挂掉,follower未同步

假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但 leader 的数据还有一些没有被 follower 副本同步的话,就会造成消息丢失。

解决方案 :

1、leader竞选资格

参数unclean.leader.election.enable 参数值说明如下:

  • true:允许 ISR 列表之外的节点参与竞选 Leader;
  • false:不允许 ISR 列表之外的节点参与竞选 Leader。

该参数默认值为false。但如果为true的话,意味着非ISR集合中的副本也可以参加选举成为leader,由于不同步副本的消息较为滞后,此时成为leader的话可能出现消息不一致的情况。所以unclean.leader.election.enable 这个参数值要设置为 false。

2、增加副本数量

同上文。

场景3:持久化错误

为了提高性能,减少刷盘次数, Kafka的Broker数据持久化时,会先存储到页缓存(Page cache)中,

按照一定的消息量和时间间隔进行进行批量刷盘的做法。数据在page cache时,如果系统挂掉,消息未能及时写入磁盘,数据就会丢失。Kafka没有提供同步刷盘的方式,所以只能通过增加副本或者修改刷盘参数提高刷盘频率来来减少这一情况。

解决方案 :

1、调整刷盘参数

kafka提供3个参数来优化刷盘机制

log.flush.interval.messages 多少条消息刷盘1次,默认Long.MaxValue
log.flush.interval.ms 隔多长时间刷盘1次 默认null
log.flush.scheduler.interval.ms 周期性的刷盘。默认Long.MaxValue

官方不建议通过上述的刷盘3个参数来强制写盘。其认为数据的可靠性通过replica来保证,而强制flush数据到磁盘会对整体性能产生影响。

2、增加副本数量

同上文。

三、消费者导致的消息丢失

场景1:提交偏移量后消息处理失败

参数 enable.auto.commit 是否自动提交offset,默认是true。代表消息会自动提交偏移量。但是提交偏移量后,消息处理失败了,则该消息丢失。

解决方案 :

可以把 enable.auto.commit 设置为 false,这样相当于每次消费完后手动更新 Offset。不过这又会带来提交偏移量失败时,该消息复消费问题,因此消费端需要做好幂等处理。

场景2:并发消费

如果消费端采用多线程并发消费,很容易因为并发更新 Offset 导致消费失败。

解决方案 :

如果对消息丢失很敏感,最好使用单线程来进行消费。如果需要采用多线程,可以把 enable.auto.commit 设置为 false,这样相当于每次消费完后手动更新 Offset。

场景3:消息堆积

消费者如果处理消息的速度跟不上消息产生的速度,可能会导致消息堆积,进而触发消费者客户端的流控机制,从而遗失部分消息。

解决方案 :

一般问题都出在消费端,尽量提高客户端的消费速度,消费逻辑另起线程进行处理。

场景4:消费者组rebalance

消费者组 rebalance导致导致消息丢失的场景有两种:
1、某个客户端心跳超时,触发 Rebalance被踢出消费组。如果只有这一个客户端,那消息就不会被消费了。
2、Rebalance时没有及时提交偏移量,因为 Rebalance重新分配分区给消费者,所以如果在 Rebalance 过程中,消费者没有及时提交偏移量,可能会导致消息丢失。

解决方案 :

1、提高消费能力

提高单条消息的处理速度,例如对消息处理中比 较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。

2、调整参数避免不 必要的rebalance

参数max.poll.interval.ms用于指定consumer两次poll的最大时间间隔(默认5分钟),如果超过了该间隔consumer client会主动向coordinator发起LeaveGroup请求,触发rebalance。根据实际场景可将max.poll.interval.ms值设置大一点,避免不 必要的rebalance。

此外可适当减小max.poll.records的值,max.poll.records用于指每次调用poll()时取到的records的最大数,默认值是500,可根 据实际消息速率适当调小。这种思路可解决因消费时间过长导致的重复消费问题, 对代码改动较小,但无法绝对避免重复消费问题。

依然会丢消息的场景

即使把参数都设置的很完善也会丢失消息的两种场景

场景 1:

当把数据写到足够多的PageCache的时候就会告知生产者现在数据已经写入成功,但如果还没有把PageCache的数据写到硬盘上,这时候PageCache所在的操作系统都挂了,此时就会丢失数据。

场景 2:

副本所在的服务器硬盘都坏了,也会丢数据。

总结

总的来说,Kafka消息丢失是一个涉及多个环节的问题,需要从生产者、Broker和消费者三个层面综合考虑。通过合理的配置和策略,结合监控和及时的应对措施,可以大幅降低消息丢失的风险,确保数据在分布式系统中的可靠传递。

最后
请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/855288.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python爬虫实战案例之——MySql数据入库

Hello大家好,我是你们的南枫学长,咱们今天来学——爬虫之MySql数据入库。 话不多说,导入咱们的老朋友: Pymysql就是我们Python里面的mysql库,主要功能就是用来连接MySql数据库,那么下载还是一样的操作去进…

自动驾驶规划-RTT* 算法 【免费获取Matlab代码】

目录 1.算法原理3.结果展示4.参考文献5.代码获取 1.算法原理 RRT(Rapidly-Exploring Random Trees) 快速随机扩展树,是一种单一查询路径规划算法。RRT 将根节点作为搜索的起点,然后通过随机撒点采样增加叶子节点的方式,生成一个随机扩展树&a…

STM32开发过程中碰到的问题总结 - 3

文章目录 前言1. keil5升级到最新版本使用armV6编译工具链编译不通过2. 最新的keil用Jlink调试失败3. 移动了目录后跑不起来了4. 串口兼容了GNU 和arm只会,编译出来的成果物,串口输出不正常5.STM32下哪些IO口可以作为中断触发去使用6. 触发GPIO10的外部中…

【Go语言】面向对象编程(二):通过组合实现类的继承和方法重写

通过组合实现类的继承和方法重写 要实现面向对象的编程,就必须实现面向对象编程的三大特性:封装、继承和多态。 1 封装 类的定义及其内部数据的定义可以看作是类的属性,基于类定义的函数方法则是类的成员方法。 2 继承 Go 语言中&#x…

数据库系统概述选择简答概念复习

目录 一、组成数据库的三要素 二、关系数据库特点 三、三级模式、二级映像 四、视图和审计提供的安全性 审计(Auditing) 视图(Views) 五、grant、revoke GRANT REVOKE 六、三种完整性 实体完整性 参照完整性 自定义完整性 七、事务的特性ACDI 原子性(Atomicity)…

大模型系列:提示词管理

既然大模型应用的编程范式是面向提示词的编程,需要建立一个全面且结构化的提示词库, 对提示词进行持续优化也是必不可少的,那么如何在大模型应用中更好的管理提示词呢? 1. 提示词回顾 提示词在本质上是向大型语言模型(…

基于Spring Boot的工具迭代

1. 申请git权限 2. git项目中点击我的-Settings-SSH Keys添加公钥 3. 公钥生成步骤 ssh-keygen -o -t rsa -b 4096 -C "your email" cd ~/.ssh/ cat id_rsa.pub 把公钥内容粘贴到SSH Keys 4. 创建本地分支git checkout -b branchname git远程仓库创建远程分支 …

stm32f103 HAL库 HC-SR04测距

目录 一、实现测距二、添加TIM3控制LED根据距离以不同频率闪烁三、观察时序Modebus协议12路超声波雷达设计方案1. 系统架构设计2. 硬件设计3. 软件设计4. 通信协议设计5. 用户接口6. 安全和冗余7. 测试和验证8. 电源和物理封装9. 文档和支持 一、实现测距 配置时钟 配置定时器…

vue部署宝塔nginx配置(获取用户ip地址、反代理访问api接口、websocket转发)

以下配置为我自己的需求,因人而异,如果只是单纯的前端非交互页面,可以不用修改配置。 代码及注释,如下: #解决vue-router设置mode为history,去掉路由地址上的/#/后nginx显示404的问题location / {proxy_htt…

多模态大模型通用模式

MM-LLMs(多模态大模型)是目前比较新的和实用价值越发显著的方向。其指的是基于LLM的模型,具有接收、推理和输出多模态信息的能力。这里主要指图文的多模态。 代表模型:GPT-4o、Gemini-1.5-Pro、GPT-4v、Qwen-VL、CogVLM2、GLM4V、…

Ptrade和QMT的区别,怎么获取合适的量化交易软件?

​Ptrade和QMT的适用人群 交易活跃用户 量化爱好者已经专业量化投资者 高净值个人或机构 Ptrade和QMT的区别 回测和交易频率 Ptrade回测和交易只支持分钟级和日线级别的频率,而QMT支持tick级、分钟级、5分钟级、10分钟级、日线、周线、月线等。 使用QMT进行回…

Docker overlay磁盘使用100%处理方法overlay 100%

一、问题描述 服务器上运行了几个docker容器,运行个一周就会出现overlay 100%的情况,经查找,是容器里生成了很多core.xxx的文件导致的。 二、解决方法 首先通过以下命令查看: df -h 可以看的overlay已经100%了,进入到/var/lib/d…

计算机网络实验(9):路由器的基本配置和单臂路由配置

一、 实验名称 路由器的基本配置和单臂路由配置 二、实验目的: (1)路由器的基本配置: 掌握路由器几种常用配置方法; 掌握采用Console线缆配置路由器的方法; 掌握采用Telnet方式配置路由器的方法&#…

Java | Leetcode Java题解之第148题排序链表

题目: 题解: class Solution {public ListNode sortList(ListNode head) {if (head null) {return head;}int length 0;ListNode node head;while (node ! null) {length;node node.next;}ListNode dummyHead new ListNode(0, head);for (int subL…

26 种 prompt 套路,驯服大模型

节前,我们组织了一场算法岗技术&面试讨论会,邀请了一些互联网大厂朋友、今年参加社招和校招面试的同学。 针对大模型技术趋势、算法项目落地经验分享、新手如何入门算法岗、该如何准备面试攻略、面试常考点等热门话题进行了深入的讨论。 总结链接如…

封装音视频编解码和渲染的动态链接库编译和测试

1.动态链接库的编译 生成了以下几个文件 我们把生成的lib文件复制到lib文件夹中 其余三个文件不变动 2.进行测试看是否可以用生成的xcodec.lib库文件里的接口函数 以上是重新创建的新项目,导入了xcodec.lib,其他配置同以前项目 库测试结果 运行显示我们…

qt登录和闹钟实现

qt实现登录 #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);// 去掉头部this->setWindowFlag(Qt::FramelessWindowHint);// 去掉空白部分this->s…

栈(Stack)

目录 一.栈(Stack) 1.概念 2.栈的使用 3.栈的模拟实现 二.栈相关习题 1.逆波兰表达式求值 (1)链接 (2)解析 (3)题解 2.括号匹配 (1)链接 &#xff…

计算机网络——传输层重要协议(TCP、UDP)

一、常见名词解释 IP地址:IP地址主要用于标识网络主机、其他网络设备(如路由器)的网络地址,即IP地址用于定位主机的网络地址; IP地址是一个32位的二进制数,通常被分割为4个 8位⼆进制数(也就是…

构建基于 LlamaIndex 的RAG AI Agent

I built a custom AI agent that thinks and then acts. I didnt invent it though, these agents are known as ReAct Agents and Ill show you how to build one yourself using LlamaIndex in this tutorial. 我构建了一个自定义的AI智能体,它能够思考然后行动。…