Kafka消息流转的挑战与对策:消息丢失与重复消费问题

        消息丢失和重复消费时分布式系统重的常见问题,如果处理不好会对业务造成很大的影响。比如用户下单是通过消息队列处理的,对于用户的订单来说,消息丢失会造成用户下单丢失,影响售卖,如果重复消费,可能会生成多个订单,多卖给了用户货物,影响也很大。

        所以,消息丢失和重复消费问题对于保证系统健壮性和业务正确性至关重要。需要了解问题出现的原因及各种解决方案。

一、消息丢失

        首先来确定什么地方可能会出现消息丢失的问题。生产者将消息发送到kafka broker中,然后消费者在来消费相应的消息。在这些环节中似乎都有可能出现消息丢失,我们来逐一分析。

        1.1 生产者造成的消息丢失

        生产者配置不当

        生产者发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)、异步(async)。

        发后既往,即ack=0,生产者只管往kafka中发送消息,但是消息有没有正确到达生产者是不知道的,这种方式吞吐量最高,但是很容易造成消息丢失。

        网络故障

        生产者在将消息发往broker时,出现了网络故障,导致消息没有发送成功。这种方式通常会加入重试机制,可有效避免网络问题带来的影响。如果网络长时间不可用,那就不是消息丢失的问题了,甚至影响业务的正常运转。

        生产者意外关闭

        如果生产者进程崩溃或被非正常关闭,在未完成消息发送操作之前,部分缓存中的消息可能会丢失。

        还记得以前的文章(深入剖析Kafka生产者:揭秘消息从发送到落地的全过程-CSDN博客)提到的消息发送流程吗?

        生产者客户端有两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。

        主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。相当于将消息发送到了缓存中,如果生产者意外宕机,那缓存中的消息就会丢失,造成消息丢失。

        可以设置合理的缓存区大小,并且要尽量避免生产者服务器意外宕机的情况。

        1.2 broker端故障

        刷盘机制

        异步批量刷盘:Kafka Broker默认使用异步刷盘机制,先将消息存储在PageCache中再进行异步写入磁盘。如果在刷盘前Broker发生故障,这部分消息可能丢失。

        可以配置成同步刷盘策略,虽在会影响性能,但在极端情况下可以保证消息不丢失。需要根据自己的业务场景进行评估,确定一个合理的方案。

        副本不完整

        当ISR集合中的副本数量不足或发生同步问题时,可能导致消息无法正确复制到其他副本上。如果生产者ack=1,消息发到leader中就认为成功了,有可能会出现消息还没同步到其他副本,leader分区出现了问题,导致消息消息丢失。

        所以要根据业务场景设置合理的ack值,减少因副本导致的消息丢失。

        1.3 消费者端引起的消息丢失

        自动提交偏移量

        Kafka消费者默认使用自动提交偏移量的功能,当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期由客户端参数auto.commit.interval.ms配置,默认5S,此参数生效的前提是enable.auto.commit参数为true。

        自动提交虽然简单,但可能会造成消息丢失,比如消费者刚拉取了一批消息,然后刚好到达了提交位移的时间,刚才的消息位移就提交了,但是消费者此时出现了故障,消息还未来得及处理,这样消费者重启后就会出现消息丢失。

        在使用中尽量使用手动位移提交的方式。

        手动提交处理不当

        手动提交位移如果处理不当,也会造成消息丢失,比如消费者拉取消息后,手动提交了位移,然后将消息交由其他线程来异步处理,但是处理过程出现了一场,这种情况就会出现消息丢失。

        每次提交消息都要确保消息正常的处理完毕了,否则会造成消息丢失。

        消息积压造成的消息丢失

        如果生产者的发送速率远远大于消费者的出率速率,那Kafka就会产生积压,我们知道kafka是将消息保存到日志文件中的,是有留存时间的,默认是7天,倘若你的消费者程序足够慢,慢到它要消费的数据快被Kafka删除了,就会造成消费丢失。

        要监控消息积压情况并引入预警机制,如果产生了积压要及时处理,通过处理积压来解决此种原因引起的消息积压。

        位移管理策略

        如果消费者设置的位移管理策略为“latest”,那么在消费者启动或者从故障中恢复时,它会直接从最新的消息开始消费,这将丢失从上次消费点到最新消息之间的所有消息。

        我们要根据业务需求,选择合理的位移管理策略。

二、重复消费

        重复消费可能会引起数据一致性问题,浪费系统计算、存储资源,甚至会影响用户体验,造成业务出错,所以要尽量避免重复消费。

        2.1 生产者造成的重复消费

        生产者造成的重复消费的问题并不直观,如果生产者发送消息时存在失败重试等场景,可能会造成生成者发送重复的消息,这样就会间接的造成重复消费的问题。

        针对这种情况,消费者要做好消息的幂等处理,重复的消息只消费一次。

        2.2 消费者造成的重复消费

        消费者故障与重启

        当消费者在处理完消息后尚未提交偏移量就崩溃或被人为关闭,重启后会从上一次已知的偏移量继续消费,这可能导致已经处理过的消息再次被消费。

        消费组再均衡

        当消费者群组中的某个消费者离开或者新加入时,整个消费者群体会触发重新平衡操作,分配新的分区给各个消费者。在此过程中,如果旧的消费者正在处理的消息未提交偏移量,则这些消息可能在新的消费者接管分区后被重新消费。

        自动位移提交

        如上文所述,消费者拉取消息后,如果已经消费了一部分消息,这时位移还未提交,但是此时消费者出现了故障,消费者重启后会重新拉取消息,造成重复消费。

        网络问题

        在网络不稳定的情况下,可能会发生偏移量提交请求丢失的情况,使得消费者认为某条消息未处理而进行重复消费。

        以上问题不管如何处理,都需要引入幂等机制,已经处理过的消息不在重复处理,这时终极解决方案,否则消息就有可能重复消费,对业务造成影响。

三、总结

        这一节关于kafka消息中间件出现重复消费和消息丢失的场景和原因进行了分析,你学会了吗?

        

        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/651935.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ps:创建基于饱和度的蒙版

能够区分图像上哪些区域的饱和度高,哪些区域的饱和度低,在调色过程中是相当有用的。 比如,使得饱和度高的区域更加饱和,可增加图像色彩反差,让画面更引人注目。 或者,使得饱和度区域趋于饱和,让…

技术书评和笔记【01】脑机接口-电路与系统 【2020版】

前言: 荷兰作者,Amir Zjajo博士,毕业于荷兰代尔夫特理工大学,方向 面向移动健康的低功耗混合型号电路与系统,以及,面向认知的神经形态电路。 ,脑机接口 - 电路与系统一书,系统介绍了,脑机接口电路与系统的实现技术,尤其,提到了量产和设计的问题,难能可贵,摘录如…

JVM篇----第九篇

系列文章目录 文章目录 系列文章目录前言一、分代收集算法二、新生代与复制算法三、老年代与标记复制算法前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。 一、分代…

MySQL-删除重复数据

在实际应用中,遇到一个这样的问题,MySQL中存储的数据为资讯类数据,在页面展示时会出现多个平台的新闻报导相同的内容,导致页面会出现重复数据。因为数据是每天定期更新,所以最快捷有效的方式是在更新完数据后增加一个去…

3、创建特性(Creating Features)

使用Pandas转换特性以适合您的模型。 文章目录 1、简介2、数学变换3、计数4、构建和分解特征5、分组转换1、简介 一旦你确定了一组有潜力的特性,就可以开始开发它们了。在这节课中,你将学习如何在Pandas中进行一些常见的转换。如果你对Pandas不熟练, 请参考《从零开始的Pand…

YOLOv8融合改进 更换检测头同时改进C2f模块

一、Detect_DyHead检测头和C2f-EMSC,C2f-EMSCP模块 详细介绍和代码在往期的博客里: Detect_DyHead: (YOLOv8改进检测头Detect为Detect_Dyhead-CSDN博客) C2f-EMSC和C2f-EMSCP: (YOLOv8改进之多尺度转换模块C2f-EMSC和C2f-EMSCP-CSDN博客) 二、算法实现 1、将检测…

QT之 QDebug 调试(一)

在QT中&#xff0c;进行调试&#xff0c;则需要在头文件地方加上 #include <QDebug> 加上之后&#xff0c;在编译之后则其输出的信息则在应用程序输出那里显示信息。 其QDebug 信息调试则如&#xff1a; qDebug() << " 需要插入的信息 "…

RPC教程 7.服务发现与注册中心

0.前言 这一节的内容只能解决只有一个服务的情况。要是有多个服务(即是多个结构体&#xff09;这种就解决不了&#xff0c;也即是没有服务ip地址和服务实例的映射关系。 1.为什么需要注册中心 在上一节中&#xff0c;客户端想要找到服务实例的ip,需要硬编码把ip写到代码中。…

猿媛员的专属春联来咯

我们“因程序汇聚&#xff0c;因猿份相识”&#xff0c;今天来给辛苦了一年的“猿媛员”们送上几幅恶搞对联&#xff0c;为图一笑 &#x1f604; 闲言少叙&#xff0c;上对联 龙行多福 上联&#xff1a;龙龙龙龙龙龙龙 下联&#xff1a;福福福福福福福 形象版 上联&#…

centos 7安装MySQl

本文参考借鉴&#xff1a;https://cloud.tencent.com/developer/article/2353312&#xff0c;非常赞&#xff01; 为了避免权限不足的问题&#xff0c;建议切换至root用户进行安装 1.MySQL的清理与安装 查看是否存在MySQL服务 安装mysql之前&#xff0c;需要先看看要安装系…

【极数系列】Flink搭建入门项目Demo 秒懂Flink开发运行原理(05)

文章目录 引言1.创建mavenx项目2.包结构3.引入pom依赖4.增加log4j2.properties配置5.创建主启动类6.构建打jar包7.flinkUI页面部署 引言 gitee地址&#xff1a;https://gitee.com/shawsongyue/aurora.git 源码直接下载可运行&#xff0c;模块&#xff1a;aurora_flink Flink 版…

phar反序列化漏洞

基础&#xff1a; Phar是一种PHP文件归档格式&#xff0c;它类似于ZIP或JAR文件格式&#xff0c;可以将多个PHP文件打包成一个单独的文件&#xff08;即Phar文件&#xff09;。 打包后的Phar文件可以像普通的PHP文件一样执行&#xff0c;可以包含PHP代码、文本文件、图像等各…

剖析线程池ThreadPoolExecutor

文章目录 线程池一、线程池概述二、ThreadPoolExecutor类详解三、线程池参数配置与优化四、线程池监控与调优五、线程池与其他并发工具比较六、线程池在实际应用中的案例分析案例背景线程池的配置配置线程池参数。处理用户请求 监控与调优 七、线程池的扩展与自定义实现八、线程…

Python爬虫---Scrapy框架---CrawlSpider

CrawlSpider 1. CrawlSpider继承自scrapy.Spider 2. CrawlSpider可以定义规则&#xff0c;再解析html内容的时候&#xff0c;可以根据链接规则提取出指定的链接&#xff0c;然后再向这些链接发送请求&#xff0c;所以&#xff0c;如果有需要跟进链接的需求&#xff0c;意思就是…

Redis实现多种限流算法

一 常见限流算法 1 固定窗口限流 每一个时间段计数器&#xff0c;当计数器达到阈值后拒绝&#xff0c;每过完这个时间段&#xff0c;计数器重置0&#xff0c;重新计数。 优点&#xff1a;实现简单&#xff0c;性能高&#xff1b; 缺点&#xff1a;明显的临界问题&#xff0c…

有手就行!阿里云上3分钟搞定幻兽帕鲁联机服务器搭建

幻兽帕鲁最近在社区呈现了爆火的趋势&#xff0c;在线人数已突破百万级别&#xff0c;官方服务器也开始出现不稳定&#xff0c;卡人闪退的情况。对于有一定财力的小伙伴&#xff0c;搭建一个私人服务器是一个最稳定而舒服的解决方案。 本文萝卜哥将讲解一下如何快速搭建 palwo…

看图说话:Git图谱解读

很多新加入公司的同学在使用Git各类客户端管理代码的过程中对于Git图谱解读不太理解&#xff0c;我们常用的Git客户端是SourceTree&#xff0c;配合P4Merge进行冲突解决基本可以满足日常工作大部分需要。不同的Git客户端工具对图谱展示会有些许差异&#xff0c;以下是SourceTre…

查看 Avro 格式的 Kafka 消息(启用了 Confluent Schema Registry )

使用 Avro 格式传递 Kafka 消息要比 Json 更加高效,因为它是二进制格式,在启用了 Confluent Schema Registry 的情况下,会进一步地提升传输效率,因为 Avro 中的 Schema 信息将不再出现在消息中,消息体积会进一步压缩,同时,还可以利用到 Schema Registry 的其他好处,例如…

Java如何对OSS存储引擎的Bucket进行创建【OSS学习】

在前面学会了如何开通OSS&#xff0c;对OSS的一些基本操作&#xff0c;接下来记录一下如何通过Java代码通过SDK对OSS存储引擎里面的Bucket存储空间进行创建。 目录 1、先看看OSS&#xff1a; 2、代码编写&#xff1a; 3、运行效果&#xff1a; 1、先看看OSS&#xff1a; 此…

跟着cherno手搓游戏引擎【12】渲染context和首个三角形

渲染上下文&#xff1a; 目的&#xff1a;修改WindowsWindow的结构&#xff0c;把glad抽离出来 WindowsWindow.h:新建m_Context #pragma once #include "YOTO/Window.h" #include <YOTO/Renderer/GraphicsContext.h> #include<GLFW/glfw3.h> #include…