大数据面试题:Kafka怎么保证数据不丢失,不重复?

面试题来源:

《大数据面试题 V4.0》

大数据面试题V3.0,523道题,679页,46w字

可回答:Kafka如何保证生产者不丢失数据,消费者不丢失数据?

参考答案:

存在数据丢失的几种情况

  • 使用同步模式的时候,有3种状态保证消息被安全生产,在配置为1(只保证写入leader成功)的话,如果刚好leader partition挂了,数据就会丢失。

  • 还有一种情况可能会丢失消息,就是使用异步模式的时候,当缓冲区满了,如果配置为0(还没有收到确认的情况下,缓冲池一满,就清空缓冲池里的消息),数据就会被立即丢弃掉。

避免方法的一些概述

1、在数据生产时避免数据丢失的方法

只要能避免上述两种情况,那么就可以保证消息不会被丢失。

1)在同步模式的时候,确认机制设置为-1,也就是让消息写入leader和所有的副本。

2)在异步模式下,如果消息发出去了,但还没有收到确认的时候,缓冲池满了,在配置文件中设置成不限制阻塞超时的时间,也就说让生产端一直阻塞,这样也能保证数据不会丢失。

在数据消费时,避免数据丢失的方法:确认数据被完成处理之后,再更新offset值。低级API中需要手动控制offset值。

消息队列的问题都要从源头找问题,就是生产者是否有问题。

讨论一种情况,如果数据发送成功,但是接受response的时候丢失了,机器重启之后就会重发。

重发很好解决,消费端增加去重表就能解决,但是如果生产者丢失了数据,问题就很麻烦了。

2、数据重复消费的情况,处理情况如下

1)去重:将消息的唯一标识保存到外部介质中,每次消费处理时判断是否处理过;

2)不管:大数据场景中,报表系统或者日志信息丢失几条都无所谓,不会影响最终的统计分析结。

Kafka到底会不会丢数据(data loss)? 通常不会,但有些情况下的确有可能会发生。下面的参数配置及Best practice列表可以较好地保证数据的持久性(当然是trade-off,牺牲了吞吐量)。

如果想要高吞吐量就要能容忍偶尔的失败(重发漏发无顺序保证)。

 block.on.buffer.full = trueacks = allretries = MAX_VALUEmax.in.flight.requests.per.connection = 1使用KafkaProducer.send(record, callback)callback逻辑中显式关闭producer:close(0) unclean.leader.election.enable=falsereplication.factor = 3 min.insync.replicas = 2replication.factor > min.insync.replicasenable.auto.commit=false

消息处理完成之后再提交位移

给出列表之后,我们从两个方面来探讨一下数据为什么会丢失:

Producer端

新版本的Kafka替换了Scala版本的old producer,使用了由Java重写的producer。新版本的producer采用异步发送机制。KafkaProducer.send(ProducerRecord)方法仅仅是把这条消息放入一个缓存中(即RecordAccumulator,本质上使用了队列来缓存记录),同时后台的IO线程会不断扫描该缓存区,将满足条件的消息封装到某个batch中然后发送出去。显然,这个过程中就有一个数据丢失的窗口:若IO线程发送之前client端挂掉了,累积在accumulator中的数据的确有可能会丢失。

Producer的另一个问题是消息的乱序问题。假设客户端代码依次执行下面的语句将两条消息发到相同的分区

 producer.send(record1);producer.send(record2);

如果此时由于某些原因(比如瞬时的网络抖动)导致record1没有成功发送,同时Kafka又配置了重试机制和max.in.flight.requests.per.connection大于1(默认值是5,本来就是大于1的),那么重试record1成功后,record1在分区中就在record2之后,从而造成消息的乱序。很多某些要求强顺序保证的场景是不允许出现这种情况的。发送之后重发就会丢失顺序。

鉴于producer的这两个问题,我们应该如何规避呢??对于消息丢失的问题,很容易想到的一个方案就是:既然异步发送有可能丢失数据, 我改成同步发送总可以吧?比如这样:

 producer.send(record).get();

这样当然是可以的,但是性能会很差,不建议这样使用。以下的配置清单应该能够比较好地规避producer端数据丢失情况的发生:(特此说明一下,软件配置的很多决策都是trade-off,下面的配置也不例外:应用了这些配置,你可能会发现你的producer/consumer 吞吐量会下降,这是正常的,因为你换取了更高的数据安全性)。

block.on.buffer.full = true 尽管该参数在0.9.0.0已经被标记为“deprecated”,但鉴于它的含义非常直观,所以这里还是显式设置它为true,使得producer将一直等待缓冲区直至其变为可用。否则如果producer生产速度过快耗尽了缓冲区,producer将抛出异常。缓冲区满了就阻塞在那,不要抛异常,也不要丢失数据。

acks=all 很好理解,所有follower都响应了才认为消息提交成功,即"committed"。

retries = MAX 无限重试,直到你意识到出现了问题。

max.in.flight.requests.per.connection = 1 限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。注意:设置此参数是为了避免消息乱序。

使用KafkaProducer.send(record, callback)而不是send(record)方法 自定义回调逻辑处理消息发送失败,比如记录在日志中,用定时脚本扫描重处理。

callback逻辑中最好显式关闭producer:close(0) 注意:设置此参数是为了避免消息乱序(仅仅因为一条消息发送没收到反馈就关闭生产者,感觉代价很大)。

unclean.leader.election.enable=false 关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失。 replication.factor >= 3 参考了Hadoop及业界通用的三备份原则。

min.insync.replicas > 1 消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数。与acks配合使用保证replication.factor > min.insync.replicas 如果两者相等,当一个副本挂掉了分区也就没法正常工作了。通常设置replication.factor = min.insync.replicas + 1即可。

Consumer端

consumer端丢失消息的情形比较简单:如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失。由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做。为了避免数据丢失,现给出两点建议:

  • enable.auto.commit=false,关闭自动提交位移

  • 在消息被完整处理之后再手动提交位移

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/12819.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux下在终端输入密码隐藏方法

Linux系统中,如何将在终端输入密码时将密码隐藏? 最近做简单的登录界面时,不做任何操作的话,在终端输入密码的同时也会显示输入的密码是什么,这样对于隐蔽性和使用都有不好的体验。那么我就想到将密码用字符*隐藏起来…

STM32 Flash学习(三)

硬件设计 开机的时候先显示一些提示信息,然后在主循环里面检测两个按键。 其中1个按键WK_UP用来执行写入FLASH的操作,另一个按照KEY0用来执行读出操作。 软件设计 添加了两个文件stmflash.c和stmflash.h。 #include "stmflash.h" #include…

freeswitch的mod_xml_curl模块

概述 freeswitch是一款简单好用的VOIP开源软交换平台。 随着fs服务的增多,每一台fs都需要在后台单独配置,耗时耗力,心力憔悴。 如果有一个集中管理配置的配置中心,统一管理所有fs的配置,并可以实现动态的修改配置就…

自动驾驶地面车辆的雷达里程计:方法与数据集综述

在不同复杂环境中,各种传感器的性能会有所不同。它们各自具有优势和劣势,因此融合多模态数据提供了一种解决方案,可以克服各个传感器单独使用时的限制[90]–[92]。此外,许多讨论的传感器已经广泛应用于自动驾驶领域,因…

PHP 支付宝支付、订阅支付(周期扣款)整理汇总

最近项目中需要使用支付宝的周期扣款,整理一下各种封装方法 APP支付(服务端) /******************************************************* 调用方法******************************************************/function test_pay(){$isSubscri…

mybatis日志工厂

前言: 如果一个数据库操作,出现异常,我们需要排错,日志就是最好的助手 官方给我们提供了logImpl:指定 MyBatis 所用日志的具体实现,未指定时将自动查找。 默认工厂: 在配置文件里添加&#xf…

深度剖析APP开发中的UI/UX设计

作为一个 UI/UX设计师,除了要关注 UI/UX设计之外,还要掌握移动开发知识,同时在日常工作中也需要对用户体验有一定的认知,在本次分享中,笔者就针对自己在工作中积累的一些经验来进行一个总结,希望能够帮助到…

cartographer发布畸变矫正后的scan数据

实现方式: 模仿源代码,在cartographer_ros写一个函数,以函数指针的方式传入cartographer后端,然后接收矫正后的scan数据,然后按照话题laserScan发布出来。 需要同时发布点云强度信息的,还要自己添加含有强度…

如何连接远程服务器?快解析内内网穿透可以吗?

如今我们迎来了数字化转型的时代,众多企业来为了更好地推动业务的发展,常常需要在公司内部搭建一个远程服务器。然而,对于企业员工来说,在工作过程中经常需要与这个服务器进行互动,而服务器位于公司的局域网中&#xf…

Go重写Redis中间件 - Go实现Redis协议解析器

Go实现Redis协议解析器 Redis网络协议详解 在解决完通信后,下一步就是搞清楚 Redis 的协议-RESP协议,其实就是一套类似JSON、Protocol Buffers的序列化协议,也就是我们的客户端和服务端通信的协议 RESP定义了5种格式 简单字符串(Simple String) : 服务器用来返回简单的结…

简述IO(BIO NIO IO多路复用)

在unix网络变成中的五种IO模型: Blocking IO(阻塞IO) NoneBlocking IO (非阻塞IO) IO mulitplexing(IO多路复用) signal driven IO (信号驱动IO) asynchronous IO (异步IO) BIO BIO(Blocking IO)是一种阻塞IO模型,也是传统的IO操作模型之一…

Windows上安装和使用git到gitoschina和github上_亲测

Windows上安装和使用git到gitoschina和github上_亲测 git介绍与在windows上安装创建SSHkey在gitoschina使用 【git介绍与在windows上安装】 Git是一款免费、开源的分布式版本控制系统&#xff0c;用于敏捷高效地处理任何或小或大的项目。 相关介绍可以参考 <百度百科>…

【Vue3 + Element Plus】纯前端实现本地数据分页

先附上效果图 Vue3 Element Plus 实现本地分页 首页弹窗代码 <el-table :data"tableData" style"width: 100%" border stripe><el-table-column v-for"{ id, prop, label } in tableColumn" :prop"prop" :key"id"…

RocketMQ概论

目录 前言&#xff1a; 1.概述 2.下载安装、集群搭建 3.消息模型 4.如何保证吞吐量 4.1.消息存储 4.1.1顺序读写 4.1.2.异步刷盘 4.1.3.零拷贝 4.2.网络传输 前言&#xff1a; RocketMQ的代码示例在安装目录下有全套详细demo&#xff0c;所以本文不侧重于讲API这种死…

【Rust 基础篇】Rust默认泛型参数:简化泛型使用

导言 Rust是一种以安全性和高效性著称的系统级编程语言&#xff0c;其设计哲学是在不损失性能的前提下&#xff0c;保障代码的内存安全和线程安全。在Rust中&#xff0c;泛型是一种非常重要的特性&#xff0c;它允许我们编写一种可以在多种数据类型上进行抽象的代码。然而&…

tcp keepalive

tcp keepalive用于检查两者之间的链路是否正常&#xff0c;或防止链路断开。 一旦建立了TCP连接&#xff0c;该连接被定义为有效&#xff0c;直到一方关闭它。一旦连接进入连接状态&#xff0c;它将无限期地保持连接状态。但实际上&#xff0c;这种联系不会无限期地持续下去。如…

数据结构:快速的Redis有哪些慢操作?

redis 为什么要这莫快&#xff1f;一个就是他是基于内存的&#xff0c;另外一个就是他是他的数据结构 说到这儿&#xff0c;你肯定会说&#xff1a;“这个我知道&#xff0c;不就是 String&#xff08;字符串&#xff09;、List&#xff08;列表&#xff09;、 Hash&#xff08…

1.Ansible

文章目录 Ansible概念作用特性总结 部署AnsibleAnsible模块commandshellcronusergroupcopyfilehostnamepingyumserice/systemdscriptmountarchiveunarchivereplacesetup inventory主机清单主机变量组变量组嵌套 Ansible 概念 Ansible是一个基于Python开发的配置管理和应用部署…

【Redis】面试题

1. 为什么要用缓存 1. 提高系统的读写性能。 2. 减轻数据库的压力&#xff0c;防止大量的请求到达数据库&#xff0c;让数据库压力剧增&#xff0c;拖垮数据库。redis数据存储在内存中&#xff0c;高效的数据结构&#xff0c;读写数据比数据库快。 将热点数据存储在redis当中&…

#P1004. [NOIP1998普及组] 三连击

题目描述 将 1, 2, \ldots , 91,2,…,9 共 99 个数分成 33 组&#xff0c;分别组成 33 个三位数&#xff0c;且使这 33 个三位数构成 1 : 2 : 31:2:3 的比例&#xff0c;试求出所有满足条件的 33 个三位数。 输入格式 无 输出格式 若干行&#xff0c;每行 33 个数字。按照…