Kafka生产常见问题分析与总结

Kafka生产常见问题分析与总结

消息丢失

  • 生产者
    • acks = 0
      • 不需要等待任何Broker确认收到消息的回复就可以继续发消息
        • 性能最高,但是最容易丢消息,对于数据丢失不敏感的场景可以使用,如大数据统计报表
    • acks = 1
      • 只要等待Broker中的leader成功写入数据成功就可以继续发消息
        • 如果follower没有成功备份数据而此时leader刚好挂了,就会丢消息
    • acks = -1 或 all
      • 等待Broker中的leader、follower都写入成功才可以继续发消息
        • 只要保证有一个副本存活就不会丢消息,一般使用在金融场景,当然如果配置副本只有一个也可能会丢消息跟acks=1情况类似
  • 消费者
    • 如果消费者配置的是自动提交,恰好此时消费服务挂了,没有处理完的所有数据,这样就导致了数据丢失,下次也消费不到了

重复消费

  • 生产者
    • 发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际Broker可能已经接受到消息,但发送方会重发消息
  • 消费者
    • 如果消费消息配置了自动提交,刚拉取了一批处理了一部分,但是尚未提交,服务挂了,下次重启时又会拉取到相同的一批数据重复处理,一般情况下消费端会进行幂等性处理

消费乱序

  • 如果发送端配置重试机制,Kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现发送了1、2、3三条消息,第1条消息超时了,后面2条消息发送成功,然后再重试发送第1条消息,这时Broker端分区存入的消息顺序为2、3、1,所以是否需要配置重试机制得根据业务去定,当然也可以用同步发送的模式去发送并且acks≠0,这样也能保证消息从发送到消费是全链路有序的
    • 发送端的消息发送模式
      • 发后即忘
        • 不关心消息是否成功到达,对返回结果不做任何判断处理,这种方式注重吞吐量,但是无法保证消息的可靠性
      • 同步
        • 消息发送出去之后,关心消费端是否成功接受,只有成功了才能继续下一条
      • 异步
        • 在发送消息的同时通过指定的回调函数去进行消费端的响应处理
  • 注意: Kafka保证全链路消息顺序消费需要从生产端开始
    • 方案一: 将所有有序消息发送到同一个分区,然后使用一个消费者去消费,但是这种性能较低
    • 方案二: 可以在消费者端接受到消息后将需要保证顺序消费的几条消息发送到内存队列(可以整多个),一个内存队列安排一个线程去顺序处理

消息积压

  • 线上有时因为生产端发送消息速度过快或者消费端消费过慢,可能会导致Broker积压大量未消费的消息

    • 一般情况下可以通过增加当前topic的分区将消息拆分到更多的分区中去,同时增加对应的消费者去进行消费
      • 消费者数 = 分区数
    • 如果积压了百万级消息需要进行紧急处理,可以修改消费端程序,将其收到的消息快速转发到其他topic(可以设置多个分区),然后再启动多个消费者去同时消费新增topic多个分区下消息
  • 由于消息数据格式变动或消费端程序存在问题,导致消费端消费失败,可能会导致Broker积压大量未消费的消息

    • 可以将这些消费失败的消息转发到其他队列(类似死信队列),后面再慢慢分析死信队列中的消息去进行问题处理

延迟消息

  • 延时队列存储的对象是延时消息,所谓的延时消息就是发送出去之后,消费端需要等待某个特定的时间才能进行获取到该消息进行消费
    • 应用场景
      • 超时订单
      • 订单完成多长时间后通知进行评价
    • 实现思路
      • 发送延时消息先把消息按照不同的延迟时间段发送到指定的topic中,然后通过定时任务进行轮询消费这些topic,查看消息是否到期,如果时间到了就把这些消息发送到具体业务的topic中
        • 注意
          • 如果用定时任务执行,对项目性能也是一种考验,而且会有一定的延迟,如果要保证时间偏差在2min左右,这样会导致执行过于频繁,所以比建议使用Kafka去实现延迟消息,建议使用RocketMQ、RabbtMQ

消息回溯

  • 如果觉得某段时间对已消费消息的结果存在质疑,比如代码存在问题,当修复之后,可以指定offset将过去的消息重新消费一次

消息传递保障

  • at most once(最多收到一次)
    • 生产端使用 acks = 0
  • at least lonce(至少收到一次)
    • 生产端使用 acks = -1 || all
  • exactly once(收到一次)
    • at least once 加上消费端增加幂等性处理,也可以使用Kafka生产者的幂等性来实现
      • Kafka生产者的幂等性
        • 因为生产端重试导致消息重复发送,Kafka的幂等性可以保证重复发送的消息只接受一次,只需要在生产端参数开启即可

Kafka的事务

  • Kafka的事务不同于RocketMQ,RocketMQ是保障本地事务(比如数据库)与MQ消息发送的事务一致性,Kafka的事务主要保障一次发送多条消息的事务一致性(要么同时成功,要么同时失败),一般在Kafka流式计算场景较多

生产问题场景汇总

如何保证消息不丢失

  • 生产端发送消息到Broker不丢失
    • 生产端配置
      • acks = 0
        • 不需要等待任何Broker确认收到消息的回复就可以继续发消息
        • 性能最高,但是最容易丢消息,对于数据丢失不敏感的场景可以使用,如大数据统计报表
      • acks = 1
        • 只要等待Broker中的leader成功写入数据成功就可以继续发消息
          • 如果follower没有成功备份数据而此时leader刚好挂了,就会丢消息
      • acks = -1 或 all
        • 等待Broker中的leader、follower都写入成功才可以继续发消息
          • 只要保证有一个副本存活就不会丢消息,一般使用在金融场景,当然如果配置副本只有一个也可能会丢消息跟acks = 1情况类似
    • 对于生产端只要使用acks = 1 || all 即可,生产端发送消息后可以拿到Broker的反馈去进行判断是否发送成功,再根据是否需要重发
  • Broker端保存消息不丢失
    • 合理优化刷盘频率,防止服务异常崩溃造成消息未刷盘
      • Kafka的消息是先写入操作系统的页缓存中,然后再刷盘写入硬盘,页缓存中的消息断电即丢失,Kafka不支持写一条刷一次盘的同步机制,只能通过调整刷盘频率提升消息安全,另外需要配置多备份因子,避免单点消息丢失,配置好备份因子之后,Kafka会给每个分区分配多个备份分区,这些分区会尽量平均分配到多个Broker上,当出现故障时也能进行选举,继续向外提供服务
  • 消费端防止异步处理丢失消息
    • 消费者端由于有消息重试机制,正常情况下不会丢消息,每次消费处理一批消息,需要在处理完之后给Brocker进行应答,提交当前消息offset,Broker进行应答后,会推进本地日志的offset记录,如果Broker没有接到应答,Broker会重新向一个消费者组的消费者推送消息,最终保证消息不丢失,消费端采用手动提交offset的方式,相比自动提交更容易掌握提交offset的时机
    • 消费端唯一要注意的是,不能进行异步处理业务逻辑,因为如果业务逻辑异步进行,而消费者已经同步提交了offset,如果业务逻辑出现异常失败了,此时Broker已经收到的消费者应答,后续不会再重新推送消息,造成业务层面的消息丢失

消息积压如何处理

  • 业务运行正常的情况下
    • 如果只是因为消费端处理消息过慢造成积压,可以增加对应topic的分区数,将消息拆分到更多的分区中,然后增加同比例的消费者数,另外再发送消息的时候,尽量要保证各个分区之间的数据分布均衡,可以调整生产端的分区策略,让后续更多的消息分配到新增的分区里,或新开一个topic,配置更多的分区以及对应的消费者数,然后启动一批消费者(充当搬运工),将消息从旧topic转发到新topic中去
    • 分区数 = 消费者数
  • 业务运行异常的情况下
    • 如果是因为消费端业务问题导致积压,影响了程序正常运行,比如消费者序列化失败、业务处理异常,可以采用一种降级的方案,先启动一个消费者将topic下的消息转发到其它队列里(类似于死信队列),然后后续再进行分析以及问题处理

如何保证消息顺序

  • 如何保证生产端发送到分区消息有序
    • 第一种
      • 一个topic配置一个分区,这样牺牲吞吐量保证全局有序
    • 第二种
      • 通过定制生产端的分区器,将消息分配到同一个分区
        • 可以满足一些要求局部有序的场景,比如订单相关的多条消息但是不要求所有消息有序,就可以通过自定义分区器处理
  • 分区中的消息有序后,如何保证消费端消费顺序有序
    • 基于分区中消息的局部有序性,由于Kafka消费端拉取消息都是并行拉取多个批次的消息进行处理,所以无法保证串行消费,如果非要实现此功能,可以将消息按照业务独立性收集到对应的内存队列中,进行特定的排序进行处理
      • 对于RocketMQ中提供了顺序消息,实现原理是先锁定一个MesageQueue(类似分区),消费完这个队列之后再锁定下一个队列进行消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/699407.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

入侵检测系统的设计与实现

入侵检测系统(Intrusion Detection System,简称IDS)是一种能够监视网络或计算机系统活动的安全工具,旨在识别并响应可能的恶意行为或安全事件。这些事件可能包括未经授权的访问、恶意软件、拒绝服务攻击等。入侵检测系统通过不同的…

高并发Server的基石:reactor反应堆模式

业务开发同学只关心业务处理流程。但是我们开发的程序都是运行服务端server上,服务端server接收到IO请求后,是如何处理请求并最终进入业务流程的呢?这里不得不提到reactor反应堆模型。nginx tomcat redis nodejs dubbo等软件的网络处理模型都…

JS进阶——一些常用的字符串方法

charAt(index): 返回在指定位置的字符。 const str "Hello"; console.log(str.charAt(1)); // 输出 "e" concat(string2, string3, ..., stringX): 连接两个或更多字符串,并返回新的字符串。 const str1 "Hello"; const str2 …

SwiftUI 支持拖放功能的集合视图(Grid)如何捕获手指按下并抬起这一操作

功能需求 假设我们开发了一款 SwiftUI 应用,其中用户可以通过拖放 Grid 中的 Cell 来完成一些操作。现在,我们希望用户在某个 Cell 被按下并随后抬起手指时得到通知,这能够实现吗? 如上图所示,我们准确地捕获到了手指在 Grid 的 Cell 上按下再抬起这一操作!那么它是如何…

R语言【BIEN】——BIEN_occurrence_species():从BIEN中提取指定物种的观察数据

Package BIEN version 1.2.6 Description BIEN_occurrence_species()从BIEN数据库下载特定物种的观察记录。 Usage BIEN_occurrence_species(species,cultivated FALSE,new.world NULL,all.taxonomy FALSE,native.status FALSE,natives.only TRUE,observation.type FAL…

Linux之ACL访问控制列表

一、ACL权限的介绍 1.1 什么是ACL 访问控制列表(ACL)是一种网络安全技术,它通过在网络设备(如路由器、交换机和防火墙)上定义一系列规则,对进出接口的数据包进行控制。这些规则可以包含“允许”&…

123 Linux C++ 系统编程2 Linux 上安装卸载程序三种方法,linux 下解压缩命令 tar介绍。kill命令,top命令,umask 命令

一 通过命令和网络直接安装 sudo apt-get update sudo apt-get update 的工作就是将自己本地 ubutun的软件列表和 aliyun 的软件列表对比,如不一样,则更新。 sudo apt-get install 软件名 真正的安装 那么这里就有一个问题了, 怎么从aliy…

【初始RabbitMQ】死信队列的实现

死信的概念 死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致…

认识HarmonyOS

1.认识HarmonyOS 1.1.HarmonyOS简介 在中美贸易战的刺激下,国产操作系统HarmonyOS(鸿蒙操作系统)开始进入到大众的视野。 鸿蒙寓意为“万物起源”,发展至今已经经过了好几个迭代版本。 1.1.1.早期鸿蒙雏形 LiteOS 2015 年 5 月 …

STM32F103x 的时钟源

AHB (Advanced High-performance Bus) 高速总线,用来接高速外设的。 APB (Advanced Peripheral Bus) 低速总线,用来接低速外设的,包含APB1 和 APB2。 APB1:上面连接的是低速外设,包括电源接口、备份接口、 CAN 、 US…

k8s中基于alpine的pod无法解析域名问题

现象 在pod内无法解析指定域名 # 执行ping bash-4.4# ping xx-xx-svc-0.xxx-fcp.svc.cluster.local ping: bad address xx-xx-svc-0.xxx-fcp.svc.cluster.local排查经过 # 执行nslookup bash-4.4# nslookup xx-xx-svc-0.xxx-fcp.svc.cluster.local Server: 172.43.0…

【Linux网络】网络编程套接字(TCP)

目录 地址转换函数 字符串IP转整数IP 整数IP转字符串IP 关于inet_ntoa 简单的单执行流TCP网络程序 TCP socket API 详解及封装TCP socket 服务端创建套接字 服务端绑定 服务端监听 服务端获取连接 服务端处理请求 客户端创建套接字 客户端连接服务器 客户端…

基于MPI的并行计算

代码实现的是基于MPI的并行计算&#xff0c;代码如下&#xff1a; #include <stdio.h> // needed for printing #include <math.h> // needed for tanh, used in init function #include "params.h" // m…

QT-串口工具

一、演示效果 二、关键程序 &#xff1a; #include "mainwindow.h" #include "ui_mainwindow.h"#include <QMessageBox>MainWindow::MainWindow(QWidget *parent) :QMainWindow(parent),ui(new Ui::MainWindow),listPlugins(QList<TabPluginInt…

动态规划--持续更新篇

将数字变成0的操作次数 1.题目 2.思路 在numberOfSteps函数中&#xff0c;首先设置f[0]为0&#xff0c;因为0已经是0了&#xff0c;不需要任何步骤。然后&#xff0c;使用一个for循环从1迭代到输入的整数num。对于每个整数i&#xff0c;如果i是奇数&#xff0c;则将f[i]设置为…

静态时序分析:SDC约束命令set_driving_cell详解

相关阅读 静态时序分析https://blog.csdn.net/weixin_45791458/category_12567571.html?spm1001.2014.3001.5482 在上文中&#xff0c;我们不建议使用set_drive命令而是使用set_driving_cell命令&#xff0c;这是一个描述输入端口驱动能力更精确的方法。因为大多数情况下&…

SpringBoot实现缓存预热的几种常用方案

&#x1f3f7;️个人主页&#xff1a;牵着猫散步的鼠鼠 &#x1f3f7;️系列专栏&#xff1a;Java全栈-专栏 &#x1f3f7;️个人学习笔记&#xff0c;若有缺误&#xff0c;欢迎评论区指正 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&…

如何使用1688.item_search_shop API获取阿里巴巴店铺商品信息

要使用1688的item_search_shop API获取阿里巴巴店铺的商品信息&#xff0c;你通常需要遵循以下步骤&#xff1a; 1. 注册并获取API密钥 首先&#xff0c;你需要在阿里巴巴开放平台&#xff08;如1688开放平台&#xff09;上注册一个开发者账号&#xff0c;并创建一个应用。创…

QEMU开发入门

1. 简介 QEMU&#xff08;Quick EMUlator&#xff09;是一个开源的虚拟化软件&#xff0c;它能够模拟多种硬件平台&#xff0c;并在这些平台上运行各种操作系统。QEMU可以在不同的主机架构之间进行虚拟化&#xff0c;例如x86、ARM、PowerPC、Risc-V等。QEMU是一个功能强大且灵…

大数据面试总结三

1、hdfs作为分布式存储系统&#xff0c;底层的实现的方式&#xff08;可能不正确&#xff09; 1、底层是一个分布式存储的&#xff0c;底层会将数据进行切分多个block块&#xff08;128M&#xff09;&#xff0c;并存储在不同的节点上面&#xff0c;这种分布式方式有助于提高数…