kafka如何保证消息不丢?

概述

我们知道Kafka架构如下,主要由 Producer、Broker、Consumer 三部分组成。一条消息从生产到消费完成这个过程,可以划分三个阶段,生产阶段、存储阶段、消费阶段。

图片

  • 产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。

  • 存储阶段: 在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。

  • 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到Consumer上。

那么如何保证消息不丢我们可以从这三部分来分析。

消息传递语义

在深度剖析消息丢失场景之前,我们先来聊聊「消息传递语义」到底是个什么玩意?

所谓的消息传递语义是 Kafka 提供的 Producer 和 Consumer 之间的消息传递过程中消息传递的保证性。主要分为三种, 如下图所示:

  1. 1. 首先当 Producer 向 Broker 发送数据后,会进行 commit,如果 commit 成功,由于 Replica 副本机制的存在,则意味着消息不会丢失,但是 Producer 发送数据给 Broker 后,遇到网络问题而造成通信中断,那么 Producer 就无法准确判断该消息是否已经被提交(commit),这就可能造成 at least once 语义。

  2. 2. 在 Kafka 0.11.0.0 之前, 如果 Producer 没有收到消息 commit 的响应结果,它只能重新发送消息,确保消息已经被正确的传输到 Broker,重新发送的时候会将消息再次写入日志中;而在 0.11.0.0 版本之后, Producer 支持幂等传递选项,保证重新发送不会导致消息在日志出现重复。为了实现这个, Broker 为 Producer 分配了一个ID,并通过每条消息的序列号进行去重。也支持了类似事务语义来保证将消息发送到多个 Topic 分区中,保证所有消息要么都写入成功,要么都失败,这个主要用在 Topic 之间的 exactly once 语义。 其中启用幂等传递的方法配置enable.idempotence = true。 启用事务支持的方法配置:设置属性 transcational.id = "指定值"

  3. 3. 从 Consumer 角度来剖析, 我们知道 Offset 是由 Consumer 自己来维护的, 如果 Consumer 收到消息后更新 Offset, 这时 Consumer 异常 crash 掉, 那么新的 Consumer 接管后再次重启消费,就会造成 at most once 语义(消息会丢,但不重复)。

  4. 4. 如果 Consumer 消费消息完成后, 再更新 Offset,如果这时 Consumer crash 掉,那么新的 Consumer 接管后重新用这个 Offset 拉取消息, 这时就会造成 at least once 语义(消息不丢,但被多次重复处理)。

总结: 默认 Kafka 提供「at least once」语义的消息传递,允许用户通过在处理消息之前保存 Offset的方式提供 「at mostonce」 语义。如果我们可以自己实现消费幂等,理想情况下这个系统的消息传递就是严格的「exactly once」, 也就是保证不丢失、且只会被精确的处理一次,但是这样是很难做到的。

接下来我们从生产阶段、存储阶段、消费阶段这几方面看下kafka如何保证消息不丢失。

生产阶段

通过深入解析Kafka消息发送过程我们知道Kafka生产者异步发送消息并返回一个Future,代表发送结果。首先需要我们获取返回结果判断是否发送成功。

// 异步发送消息,并设置回调函数 
producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) { System.err.println("消息发送失败: " + exception.getMessage()); } else { System.out.println("消息发送成功到主题: " + metadata.topic() + ", 分区: " + metadata.partition() + ", 偏移量: " + metadata.offset()); } } 
});

消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

Producer(生产者)保证消息不丢失的方法:

  1. 1. 发送确认机制:Producer可以使用Kafka的acks参数来配置发送确认机制。通过设置合适的acks参数值,Producer可以在消息发送后等待Broker的确认。确认机制提供了不同级别的可靠性保证,包括:

    • • acks=0:Producer在发送消息后不会等待Broker的确认,这可能导致消息丢失风险。

    • • acks=1:Producer在发送消息后等待Broker的确认,确保至少将消息写入到Leader副本中。

    • • acks=all或acks=-1:Producer在发送消息后等待Broker的确认,确保将消息写入到所有ISR(In-Sync Replicas)副本中。这提供了最高的可靠性保证。

  2. 2. 消息重试机制:Producer可以实现消息的重试机制来应对发送失败或异常情况。如果发送失败,Producer可以重新发送消息,直到成功或达到最大重试次数。重试机制可以保证消息不会因为临时的网络问题或Broker故障而丢失。

Broker存储阶段

正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

在kafka高性能设计原理中我们了解到kafka为了提高性能用到了 Page Cache 技术.在读写磁盘日志文件时,其实操作的都是内存,然后由操作系统决定什么时候将 Page Cache 里的数据真正刷入磁盘。如果内存中数据还未刷入磁盘服务宕机了,这个时候还是会丢消息的。

为了最大程度地降低数据丢失的可能性,我们可以考虑以下方法:

  1.  持久化配置优化:可以通过调整 Kafka 的持久化配置参数来控制数据刷盘的频率,从而减少数据丢失的可能性。例如,可以降低 flush.messages 和 flush.ms 参数的值,以更频繁地刷写数据到磁盘。

  2.  副本因子增加:在 Kafka 中,可以为每个分区设置多个副本,以提高数据的可靠性。当某个 broker 发生故障时,其他副本仍然可用,可以避免数据丢失。

  3. 使用acks=all:在生产者配置中,设置 acks=all 可以确保消息在所有ISR(In-Sync Replicas)中都得到确认后才被视为发送成功。这样可以确保消息被复制到多个副本中,降低数据丢失的风险。

  4. 备份数据:定期备份 Kafka 的数据,以便在发生灾难性故障时可以进行数据恢复。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

  1. 自动提交位移:Consumer可以选择启用自动提交位移的功能。当Consumer成功处理一批消息后,它会自动提交当前位移,标记为已消费。这样即使Consumer发生故障,它可以使用已提交的位移来恢复并继续消费之前未处理的消息。

  2. 手动提交位移:Consumer还可以选择手动提交位移的方式。在消费一批消息后,Consumer可以显式地提交位移,以确保处理的消息被正确记录。这样可以避免重复消费和位移丢失的问题。

下面是手动提交位移的例子:

// 创建消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题
consumer.subscribe(Collections.singletonList(topic));try {while (true) {// 消费消息ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// 处理消息逻辑System.out.println("消费消息:Topic = " + record.topic() +", Partition = " + record.partition() +", Offset = " + record.offset() +", Key = " + record.key() +", Value = " + record.value());// 手动提交位移TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());OffsetAndMetadata offsetMetadata = new OffsetAndMetadata(record.offset() + 1);consumer.commitSync(Collections.singletonMap(topicPartition, offsetMetadata));}}
} catch (Exception e) {e.printStackTrace();
} finally {consumer.close();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/683891.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++阶梯之类与对象(下)

前文&#xff1a; c阶梯之类与对象&#xff08;上&#xff09;-CSDN博客 c阶梯之类与对象&#xff08;中&#xff09;-CSDN博客 c阶梯之类与对象&#xff08;中&#xff09;&#xff1c; 续集 &#xff1e;-CSDN博客 1. 再谈构造函数 1.1 构造函数体赋值 在创建对象时&a…

【机器学习笔记】3 逻辑回归

分类问题 分类问题监督学习最主要的类型&#xff0c;主要特征是标签离散&#xff0c;逻辑回归是解决分类问题的常见算法&#xff0c;输入变量可以是离散的也可以是连续的 二分类 先从用蓝色圆形数据定义为类型1&#xff0c;其余数据为类型2&#xff1b;只需要分类1次&#x…

Java并发基础:SynchronousQueue全面解析!

内容概要 SynchronousQueue的优点在于其直接性和高效性&#xff0c;它实现了线程间的即时数据交换&#xff0c;无需中间缓存&#xff0c;确保了数据传输的实时性和准确性&#xff0c;同时&#xff0c;其灵活的阻塞机制使得线程同步变得简单而直观&#xff0c;适用于需要精确协…

相机图像质量研究(13)常见问题总结:光学结构对成像的影响--鬼影

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结&#xff1a;光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结&#xff1a;光学结构对成…

TiDB 在医疗保障信息平台的应用实践

文章介绍了 TiDB 在医疗保障信息平台中的应用。东软医保云应用管理平台通过与 TiDB 联合&#xff0c;成功满足了医疗保障业务中高并发、实时性和复杂查询的要求。在某地市医疗保障信息平台的实践中&#xff0c;TiDB 分布式数据库有效实现了在线交易和实时分析服务&#xff0c;日…

C语言学习day14:数组定义和使用

定义变量&#xff1a; 数据类型 变量 值 数组定义&#xff1a; 数据类型 数组名[元素个数]{值1,值2,值3} 代码&#xff1a; int main() {//定义变量//数据类型 变量 值//数组定义//数据类型 数组名[元素个数]{值1,值2,值3}//数组下标 数组名[小标]//数组下标是…

.NET Core WebAPI中封装Swagger配置

一、创建相关文件 创建一个Utility/SwaggerExt文件夹&#xff0c;添加一个类 二、在Program中找到Swagger相关配置信息 三、添加方法&#xff0c;在Program中调用 在SwaggerExt类中添加方法&#xff0c;将相关配置添写入 /// <summary> /// swagger配置 /// </sum…

初识Qt | 从安装到编写Hello World程序

文章目录 1.前端开发简单分类2.Qt的简单介绍3.Qt的安装和环境配置4.创建简单的Qt项目 1.前端开发简单分类 前端开发&#xff0c;这里是一个广义的概念&#xff0c;不单指网页开发&#xff0c;它的常见分类 网页开发&#xff1a;前端开发的主要领域&#xff0c;使用HTML、CSS …

[经验] 欧阳修唐宋八大家之首是谁 #微信#知识分享#学习方法

欧阳修唐宋八大家之首是谁 1、唐宋八大家之首是谁 唐宋八大家是中国文学史上最具代表性的八位大文豪&#xff0c;他们的文学成就在中国文学史上占有重要地位&#xff0c;被誉为文学史上的“巨人”。 唐宋八大家之首&#xff0c;无疑是唐代著名诗人杜甫。他出生在一个贫苦的家…

牛客——IncDec Sequence(差分)

链接&#xff1a;登录—专业IT笔试面试备考平台_牛客网 来源&#xff1a;牛客网 题目描述 给定一个长度为 n(n≤105)(n \leq 10^5 )(n≤105) 的数列a1,a2,…,an{a_1,a_2,…,a_n}a1​,a2​,…,an​&#xff0c;每次可以选择一个区间 [l,r]&#xff0c;使下标在这个区间内的数…

每日一题 力扣107 二叉树的层序遍历Ⅱ

107. 二叉树的层序遍历 II 题目描述&#xff1a; 给你二叉树的根节点 root &#xff0c;返回其节点值 自底向上的层序遍历 。 &#xff08;即按从叶子节点所在层到根节点所在的层&#xff0c;逐层从左向右遍历&#xff09; 示例 1&#xff1a; 输入&#xff1a;root [3,9,20…

如何用 ChatGPT 做项目管理?

ChatGPT 可以通过创建和维护跨团队项目协作计划&#xff0c;让员工更容易理解他们的角色和职责。 这个协作计划里面会包括每个团队或个人要执行的具体任务&#xff0c;每个任务最后期限和任何事情之 间的依赖关系。 该场景对应的关键词库:(24 个) 项目管理、项目协作计划、跨…

操作 Docker 存储卷的常用指令汇总

1. 什么是存储卷&#xff1f; 存储卷就是将宿主机的本地文件系统中存在的某个目录直接与容器内部的文件系统上的某一目录建立绑定关系。使得可以在宿主机和容器内共享数据库内容&#xff0c;让容器直接访问宿主机中的内容&#xff0c;也可以宿主机向容器写入内容&#xff0c;容…

(通信)驻波

驻波是一种物理现象&#xff0c;它发生在频率相同、传输方向相反的两种波&#xff08;不一定是电波&#xff09;沿传输线形成的一种分布状态。 在这种状态下&#xff0c;一个波通常是另一个波的反射波。 在驻波中&#xff0c;波节和波腹的位置始终保持不变&#xff0c;给人一种…

了解Ping、Wget、端口、Netstat和Curl命令

1. 端口 1.1 什么是端口&#xff1f; 端口是一种用于标识不同应用程序或服务的逻辑通道。它是一个数字&#xff0c;取值范围从0到65535。常见的端口有一些已经被标准化&#xff0c;比如HTTP使用的80端口&#xff0c;HTTPS使用的443端口。 1.2 了解端口状态 使用netstat -an…

【深度学习】Pytorch 系列教程(二):PyTorch数据结构:1、Tensor(张量): GPU加速(GPU Acceleration)

文章目录 一、前言二、实验环境三、PyTorch数据结构0、分类1、Tensor&#xff08;张量&#xff09;1. 维度&#xff08;Dimensions&#xff09;2. 数据类型&#xff08;Data Types&#xff09;3. GPU加速&#xff08;GPU Acceleration&#xff09;查看可用gpu张量移动经典语句d…

边缘计算第二版施巍松——第8章边缘计算系统实例

8.1边缘计算系统概述 1.Cloudlet 架构&#xff1a;移动设备-Cloudlet-云 cloudlet也可以像云一样为用户提供服务&#xff0c;Cloudlet离移动设备只有一跳的距离&#xff0c;具有物理距离的临近性&#xff0c;可以保证实时反馈时延低&#xff0c;又可以利用局域网的高带宽优势&…

(五)【Jmeter】使用代理录制HTTP脚本操作步骤及注意事项

前置信息 软件版本Jmeter5.6.3 服务网址备注drupalhttp://192.168.88.88:18080/&#xff08;二&#xff09;【Jmeter】专栏实战项目靶场drupal部署 用户名密码test1test1test2test2 实操记录 1、启动jmeter&#xff0c;操作顺序见下图 2、在视图面板添加如下信息&#x…

[office] Excel 数据库函数条件区域怎样设置 #笔记#笔记

Excel 数据库函数条件区域怎样设置 以下面的数据表格为例&#xff0c;对于条件区域的设置&#xff0c;有几方面需要注意的内容&#xff0c;下面就一起看看如何对Excel 数据库函数条件区域设置的吧。希望会大家有所帮助 以下面的数据表格为例&#xff0c;对于条件区域的设置&am…

计算机设计大赛 深度学习OCR中文识别 - opencv python

文章目录 0 前言1 课题背景2 实现效果3 文本区域检测网络-CTPN4 文本识别网络-CRNN5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习OCR中文识别系统 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;…