Kafka【八】如何保证消息发送的可靠性、重复性、有序性

【1】消息发送的可靠性保证

对于生产者发送的数据,我们有的时候是不关心数据是否已经发送成功的,我们只要发送就可以了。在这种场景中,消息可能会因为某些故障或问题导致丢失,我们将这种情况称之为消息不可靠。虽然消息数据可能会丢失,但是在某些需要高吞吐,低可靠的系统场景中,这种方式也是可以接受的,甚至是必须的。

但是在更多的场景中,我们是需要确定数据是否已经发送成功了且Kafka正确接收到数据的,也就是要保证数据不丢失,这就是所谓的消息可靠性保证。

而这个确定的过程一般是通过Kafka给我们返回的响应确认结果(Acknowledgement)来决定的,这里的响应确认结果我们也可以简称为ACK应答。根据场景,Kafka提供了3种应答处理,可以通过配置对象进行配置。

在 Apache Kafka 中,ACK(Acknowledgment)指的是生产者在发送消息后,从 Kafka Broker 接收到的确认信号。这种确认机制是用来保证消息发送的可靠性的。Kafka 支持不同的 ACK 策略,这些策略允许生产者根据自己的需求来配置不同的确认级别。以下是 Kafka 中关于 ACK 的几个选项:

  1. No Acknowledgment (acks = 0)

    • 在这种模式下,生产者在发送消息后不会等待任何确认就认为消息已经被成功发送。这意味着如果 Broker 在写入消息之前崩溃,消息可能会丢失。这种方式提供了最高的吞吐量,但是没有可靠性保障。
  2. Leader Acknowledgment (acks = 1)

    • 在这种模式下,生产者在发送消息后会等待 Leader 副本确认消息已被写入。如果在确认之后 Leader 崩溃,那么消息仍然可能会丢失,因为还没有同步到 Follower 副本。这种方式提供了较好的吞吐量,但仍然存在一定的数据丢失风险。
  3. All In-Sync Replicas Acknowledgment (acks = all 或 acks = -1)

    • 这是最严格的确认策略,生产者在发送消息后会等待所有 ISR(In-Sync Replicas)的确认。这意味着消息不仅写入了 Leader,还同步到了所有的 Follower 副本。这种方式虽然降低了吞吐量,但是提供了最强的数据持久性和可靠性保障。

选择哪种 ACK 策略取决于应用的具体需求。如果对数据丢失有严格的要求,那么通常会选择 acks=all,以确保消息的持久性和可靠性;如果对性能要求较高,并且可以接受一定程度的数据丢失风险,那么可以选择较低级别的确认策略。

需要注意的是,使用 acks=all 时,如果 ISR 中的任何一个副本无法同步消息,那么生产者将无法发送新的消息,直到问题解决。因此,在配置 ACK 时,也需要考虑集群的健康状况和副本的数量。

假设我们的分区有5个follower副本,编号为1,2,3,4,5:

在这里插入图片描述

但是此时只有3个副本处于和Leader副本之间处于数据同步状态,那么此时分区就存在一个同步副本列表,我们称之为In Syn Replica,简称为ISR。此时,Kafka只要保证ISR中所有的4个副本接收到了数据,就可以对数据请求进行响应了。无需5个副本全部收到数据。

【2】消息发送的重复性

kafka为了提高数据可靠性提供了重试机制用来解决消息丢失问题。如果禁用重试机制,那么一旦数据发送失败,数据就丢失了。而数据重复,恰恰是因为开启重试机制后,如果因为网络阻塞或不稳定,导致数据重新发送。那么数据就有可能是重复的。

kafka提供了幂等性操作解决数据重复,并且幂等性操作要求必须开启重试功能和ACK取值为-1。

在 Apache Kafka 中,解决消息重复发送的问题通常涉及以下几个方面:

1. 幂等性生产者

Kafka 0.10.1 版本引入了幂等性生产者(Idempotent Producers)。启用幂等性后,生产者可以保证消息不会被重复发送。幂等性生产者依赖于事务日志来跟踪已发送的消息,并确保即使生产者崩溃,消息也只会被发送一次。

  • 实现原理
    • 生产者为每条消息附加一个序列号。
    • Broker 使用序列号来检查消息是否已经被发送过。
    • 如果 Broker 发现序列号冲突,则拒绝该消息。
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// TODO 对生产的数据K, V进行序列化的操作
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configMap.put(ProducerConfig.ACKS_CONFIG, "-1");//ACK应答
configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//开启幂等性
configMap.put(ProducerConfig.RETRIES_CONFIG, 5);
configMap.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configMap);

kafka提供的幂等性操作只能保证同一个生产者会话中同一个分区中的数据不会重复,一旦数据发送过程中,生产者对象重启,那么幂等性操作就会失效。那么此时就需要使用Kafka的事务功能来解决跨会话的幂等性操作。但是跨分区的幂等性操作是无法实现的。

2. 事务支持

Kafka 从 0.11 版本开始支持事务,这使得生产者可以在事务上下文中发送消息。事务可以确保消息要么全部发送成功,要么全部不发送,从而避免部分消息丢失或重复发送的问题。

  • 实现原理

    • 生产者开启事务,并在一个事务中发送一系列消息。
    • 生产者在消息发送完成后提交事务。
    • 如果生产者崩溃或出现其他异常,则可以回滚事务,取消未完成的消息发送。

    事务支持可以用于确保消息的一致性和完整性。

【3】幂等性与事务支持

幂等性生产者(Idempotent Producers)和事务支持(Transactional Support)是两种不同的机制,它们各自解决了不同的问题,但在实际应用中可以结合起来使用。

幂等性生产者(Idempotent Producers)

幂等性生产者的设计目的是为了确保即使生产者崩溃或重试消息发送,消息也只被写入一次,从而避免重复消息。幂等性生产者不需要开启事务,而是通过以下机制来实现这一目标:

  • 消息序列化:生产者为每个分区的消息生成一个唯一的序列号。
  • 校验重复:Broker 会在接收到消息时检查序列号,如果发现序列号已经存在,则会拒绝这条消息。

幂等性生产者适用于那些希望避免重复消息,但又不需要事务一致性的情况。也就是说,它保证了即使生产者崩溃或重试发送,消息依然只被写入一次,但它并不保证消息的全局顺序或跨分区的一致性。

事务支持(Transactional Support)

事务支持则是为了实现更高级别的消息一致性和原子性,确保消息要么全部发送成功,要么全部不发送。事务支持可以用来处理跨多个分区甚至跨不同系统的复杂操作,确保这些操作作为一个整体成功或失败。

  • 事务上下文:生产者在事务上下文中发送消息,确保消息的发送是原子性的。
  • 提交或回滚:生产者可以在消息处理成功后提交事务,或者在处理失败时回滚事务。

事务支持适用于需要确保消息处理的原子性和一致性的场景,特别是在涉及到跨多个分区或多系统协调的情况下。

幂等性与事务的结合

在一些场景中,你可能会结合使用幂等性生产和事务支持,以达到更高的可靠性和一致性。例如:

  • 幂等性生产者 可以用来防止消息的重复发送。
  • 事务支持 可以用来确保跨多个分区或系统的操作的一致性。

在这种情况下,幂等性生产者确保单个消息不会重复写入,而事务支持则确保整个操作的原子性。

总结

  • 幂等性生产者:防止消息重复发送,适用于单个消息级别的去重。
  • 事务支持:确保操作的原子性和一致性,适用于需要跨分区或系统的一致性操作。

因此,幂等性生产者并不是必须与事务隔离使用才能保证消息的唯一性。相反,幂等性生产者本身就是为了解决消息重复发送问题而设计的。事务支持则是为了实现更高级别的数据一致性和操作的原子性。两者可以独立使用,也可以结合使用以满足不同的需求。

【4】消息发送的有序性保证

在 Apache Kafka 中,保证消息发送的有序性主要依赖于以下几种机制和策略:

1. 单一分区内的消息有序

Kafka 默认保证在一个主题(Topic)的单个分区(Partition)内部的消息是有序的。这是因为消息是按顺序追加到分区的日志文件中的。因此,如果你需要确保消息在主题内的顺序,可以将所有相关消息都发送到同一个分区。

如何实现单一分区内消息有序:

  • 固定分区器:你可以通过设置固定的分区器(Partitioner),使所有具有相同键的消息都被发送到同一个分区。例如,使用相同的键(Key)可以使消息被发送到同一分区,从而在该分区内保持顺序。

2. 使用幂等性生产者

尽管幂等性生产者的主要目的是防止消息重复发送,但如果你使用相同的键发送消息,并且启用了幂等性生产者,那么所有具有相同键的消息将被发送到同一个分区,并且在这个分区内保持顺序。

示例配置

假设你需要确保所有消息在一个主题内是有序的,你可以这样配置生产者:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 保证消息发送顺序
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 启用幂等性// 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

这里的关键配置点是 max.in.flight.requests.per.connection 设置为 1,这可以确保在单个连接上一次只发送一条消息,从而在单一分区内保持消息的顺序。

注意事项

  • 单一分区限制:虽然单一分区内部可以保证消息有序,但这意味着所有相关消息都需要发送到同一个分区,这可能会导致性能瓶颈。
  • 并发处理:如果你需要高并发处理,而不仅仅关注消息的顺序,那么可能需要在多个分区之间平衡负载,并且在客户端实现适当的逻辑来处理顺序问题。

通过上述方法,Kafka 可以在不同程度上保证消息的有序性,但通常需要在性能和有序性之间做出权衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/53374.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring框架基础介绍2.0

目录 AOP概述 面向切面思想 优点&#xff1a; 核心原理&#xff1a; 使用案例: AOP 的基本概念 springAOP 实现 AspectJ 中常用的通知 Spring事物管理 数据库事务管理? spring 事务管理? Spring中的事物管理分为两种形式&#xff1a; 1、编程式事物管理 2、声明…

低空经济如此火爆,新手如何分一杯羹?

低空经济的火爆为新手提供了诸多参与和分一杯羹的机会。以下是一些具体的建议&#xff0c;帮助新手在这一领域找到切入点&#xff1a; 1. 了解行业概况与趋势 定义与范围&#xff1a;低空经济是指在3000米以下空域内进行各种有人和无人驾驶航空器活动的经济形态&#xff0c;涉…

dubbo的SPI机制

一.dubbo的SPI机制 SPI机制是一个服务发现机制&#xff0c;通过接口的全限定名找到指定目录下对应的文件&#xff0c;然后加载对应的实现类注册到系统中进行使用。 在Java原生跟mysql的驱动加载也使用了这个机制&#xff0c;但是他们只能进行全部实现类的加载&#xff08;遍历…

最新HTML5中的文件详解

第5章 HTML5中的文件 5.1选择文件 可以创建一个file类型的input,添加multiple属性为true,可以实现多个文件上传。 5.1.1 选择单个文件 1.功能描述 创建file类型input元素&#xff0c;页面中不再有文本框&#xff0c;而是 选择文件 按钮&#xff0c;右侧是上次文件的名称&a…

数据分析面试题:客户投保问题分析

目录 0 场景描述 1 数据准备 2 问题分析 2.1 计算小微公司的平均经营时长 2.2 计算小微公司且角色为投保人,保险起期在18年的总保费 2.3 假设,DWD_CUSTOMER_REL客户关联关系表中,存在部分客户保单数很多,部分客户保单数很少的情况,此时DWD_CUSTOMER_BASE表关联,程序…

百度智能云向量数据库创新和应用实践分享

本文整理自第 15 届中国数据库技术大会 DTCC 2024 演讲《百度智能云向量数据库创新和应用实践分享》 在 IT 行业&#xff0c;数据库有超过 70 年的历史了。对于快速发展的 IT 行业来说&#xff0c;一个超过 70 年历史的技术&#xff0c;感觉像恐龙一样&#xff0c;非常稀有和少…

Anaconda Prompt 安装paddle2.6报错

bug描述 python 3.11.9 通过 pip install paddlepaddle2.6.1 安装后&#xff0c;运行 paddle.utils.run_check() 则出现下面的错误&#xff1a; 解决办法 方法一&#xff1a;使用paddle 3的版本 这里要注意我的python版本 方法二&#xff1a;使用低版本的python python3.9…

Lombok jar包引入和用法

大家好&#xff0c;今天分享一个在编写代码时的快捷方法。 当我们在封装实体类时&#xff0c;会使用set、get等一些方法。如下图&#xff0c;不但费事还影响代码的美观。 那么如何才能减少代码的冗余呢&#xff0c;首先lib中导入lombok的jar包并添加库。 此处我已导入&#xf…

Jenkins+Svn+Vue自动化构建部署前端项目(保姆级图文教程)

目录 介绍 准备工作 配置jenkins 构建部署任务 常见问题 介绍 在平常开发前端vue项目时,我们通常需要将vue项目进行打包构建,将打包好的dist目录下的静态文件上传到服务器上,但是这种繁琐的操作是比较浪费时间的,可以使用jenkins进行自动化构建部署前端vue 准备工作 准备…

《粮食科技与经济》是什么级别的期刊?是正规期刊吗?能评职称吗?

​问题解答 问&#xff1a;《粮食科技与经济》是不是核心期刊&#xff1f; 答&#xff1a;不是&#xff0c;是知网收录的第一批认定学术期刊。 问&#xff1a;《粮食科技与经济》级别&#xff1f; 答&#xff1a;省级。主管单位&#xff1a; 湖南省粮食和物资储备局 …

bat批处理实现从特定文件夹中提取文件内容并以父文件夹名存储

1、需求分析 标题是bat批处理实现从特定文件夹中提取文件内容并以父文件夹名存储。这里面我们要做的工作是&#xff1a; ①、批处理脚本使用的是bat文件&#xff1b; ②、文件夹下面有很多子文件夹&#xff0c;然后子文件夹下仍然有相同的文件结构&#xff0c;我们需要从三级…

halcon 自定义距离10的一阶导数幅图,摆脱sobel的3掩码困境

一&#xff0c;为什么要摆脱3的掩码 在处理图像的过程中&#xff0c;会用到平滑算子&#xff0c;很容易破坏边际&#xff0c;所谓的一阶导数sobel只计算掩码为3的差分&#xff0c;在幅度图分割中&#xff0c;往往是很难把握的。 举个例子-现在图像头平滑好了&#xff0c;缺陷…

模具要不要建设3D打印中心

随着3D打印技术的日益成熟与广泛应用&#xff0c;模具企业迎来了自建3D打印中心的热潮。这一举措不仅为企业带来了前所未有的发展机遇&#xff0c;同时也伴随着一系列需要克服的挑战&#xff0c;如何看待企业引进增材制造&#xff0c;小编为您全面分析。 机遇篇&#xff1a; 加…

Codeforces Round (Div.3) C.Sort (前缀和的应用)

原题&#xff1a; time limit per test&#xff1a;5 seconds memory limit per test&#xff1a;256 megabytes You are given two strings a and b of length n. Then, you are (forced against your will) to answer q queries. For each query, you are given a range …

FPGA开发:Verilog数字设计基础

EDA技术 EDA指Electronic Design Automation&#xff0c;翻译为&#xff1a;电子设计自动化&#xff0c;最早发源于美国的影像技术&#xff0c;主要应用于集成电路设计、FPGA应用、IC设计制造、PCB设计上面。 而EDA技术就是指以计算机为工具&#xff0c;设计者在EDA软件平台上…

240907-Gradio渲染装饰器Render-Decorator

A. 最终效果 B. 示例代码 import gradio as gr import gradio as grwith gr.Blocks() as demo:input_text gr.Textbox()gr.render(inputsinput_text)def show_split(text):if len(text) 0:gr.Markdown("## No Input Provided")else:# for letter in text:for lett…

代码随想录训练营day37|52. 携带研究材料,518.零钱兑换II,377. 组合总和 Ⅳ,70. 爬楼梯

52. 携带研究材料 这是一个完全背包问题&#xff0c;就是每个物品可以无限放。 在一维滚动数组的时候规定了遍历顺序是要从后往前的&#xff0c;就是因为不能多次放物体。 所以这里能多次放物体只需要把遍历顺序改改就好了 # include<iostream> # include<vector>…

2024/9/6黑马头条跟学笔记(四)

D4内容介绍 阿里三方安全审核 分布式主键 异步调用 feign 熔断降级 1.自媒体文章自动审核 1.1审核流程 查文章——调接口文本审核——minio下载图片图片审核——审核通过保存文章——发布 草稿1&#xff0c;失败2&#xff0c;人工3&#xff0c;发布9 1.2接口获取 注册阿…

How can I load the openai api configuration through js in html?

题意&#xff1a;怎样在HTML中通过JavaScript加载OpenAI API配置 问题背景&#xff1a; I am trying to send a request through js in my html so that openai analyzes it and sends a response, but if in the js I put the following: 我正在尝试通过HTML中的JavaScript发…

Qt/C++ 个人开源项目#串口助手(源码与发布链接)

一、项目概述 该串口助手工具基于Qt/C开发&#xff0c;专为简化串口通信调试与开发而设计&#xff0c;适合新手快速上手。工具具有直观的用户界面和丰富的功能&#xff0c;旨在帮助用户与串口设备建立可靠通信&#xff0c;便于调试、数据传输和分析。 二、主要功能 波特率&a…