Kafka怎么保证消息发送不丢失

前言

Kafka发送消息是异步发送的,所以我们不知道消息是否发送成功,所以会可能造成消息丢失。而且Kafka架构是由生产者-服务器端-消费者三种组成部分构成的。要保证消息不丢失,那么主要有三种解决方法:

  1. 生产者(producer)保持同步发送消息
  2. 服务器端(broker)持久化设置为同步刷盘
  3. 消费者(consumer)设置为手动提交偏移量(offset)

1.生产者(producer)端 处理

生产者默认发送消息代码如下:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaMessageProducer {public static void main(String[] args) {// 配置Kafka生产者Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);String topic = "my-topic"; // Kafka主题try {// 发送消息到Kafkafor (int i = 0; i < 10; i++) {String message = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
                producer.send(record);System.out.println("Sent message: " + message);}} catch (Exception e) {
            e.printStackTrace();} finally {// 关闭Kafka生产者
            producer.close();}}
}

请确保在运行代码之前已经设置好正确的Kafka集群地址、主题名称以及依赖的Kafka客户端库。该示例代码创建了一个Kafka生产者实例,使用字符串作为键和值的序列化器,并循环发送10条消息到指定的Kafka主题。

生产者端要保证消息发送成功,可以有两个方法:

  1. 把异步发送改成同步发送,这样producer就能实时知道消息的发送结果。

要将 Kafka 发送方法改为同步发送,可以使用 `send()` 方法的返回值`Future<RecordMetadata>`, 并调用 `get()` 方法来等待发送完成。

以下是将 Kafka 发送方法改为同步发送的示例代码:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaMessageProducer {public static void main(String[] args) {// 配置 Kafka 生产者Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器// 创建 Kafka 生产者实例Producer<String, String> producer = new KafkaProducer<>(props);String topic = "my-topic"; // Kafka 主题try {// 发送消息到 Kafkafor (int i = 0; i < 10; i++) {String message = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);RecordMetadata metadata = producer.send(record).get(); // 同步发送并等待发送完成System.out.println("Sent message: " + message + ", offset: " + metadata.offset());}} catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();} finally {// 关闭 Kafka 生产者
            producer.close();}}
}

在这个示例代码中,通过调用 send(record).get() 实现了同步发送,其中 get() 方法会阻塞当前线程,直到发送完成并返回消息的元数据。

  1. 添加异步回调函数来监听消息发送的结果,如果发送失败,可以在回调函数里重新发送。

要保持发送消息成功并添加回调函数,你可以在发送消息的时候指定一个回调函数作为参数。回调 函数将在消息发送完成后被调用,以便你可以在回调函数中处理发送结果。

以下是使用回调函数进行消息发送的示例代码:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaMessageProducer {public static void main(String[] args) {// 配置 Kafka 生产者Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器// 创建 Kafka 生产者实例Producer<String, String> producer = new KafkaProducer<>(props);String topic = "my-topic"; // Kafka 主题try {// 发送消息到 Kafkafor (int i = 0; i < 10; i++) {String message = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);// 发送消息并指定回调函数
                producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Sent message: " + message + ", offset: " + metadata.offset());} else {// 这里重新发送消息
                            producer.send(record);
                            exception.printStackTrace();}}});}} finally {// 关闭 Kafka 生产者
            producer.close();}}
}

在这个示例代码中,我们使用了 send(record, callback) 方法来发送消息,并传递了一个实现了 Callback 接口的匿名内部类作为回调函数。当消息发送完成后,回调函数的 onCompletion() 方法会被调用。你可以根据 RecordMetadata 和 Exception 参数来处理发送结果。

另外producer还提供了一个重试参数,这个参数叫retries,如果因为网络问题或者Broker故障导致producer发送消息失败,那么producer会根据这个参数的值进行重试发送消息。

2.服务器(Broker)端 处理

Kafka Broker(服务器端)通过以下方式来确保生产者端消息发送的成功和不丢失:

1. 消息持久化(异步刷盘:Kafka Broker将接收到的消息持久化到磁盘上的日志文件中。这样即使在消息发送后发生故障,Broker能够恢复并确保消息不会丢失。(注意:持久化是由操作系统调度的,如果持久化之前系统崩溃了,那么就因为不能持久化导致数据丢失,但是Kafka没提供同步刷盘策略

2. 复制与高可用性:Kafka支持分布式部署,可以将消息分布到多个Broker上形成一个Broker集群。在集群中,消息被复制到多个副本中,以提供冗余和高可用性。生产者发送消息时,它可以将消息发送到任何一个Broker,然后Broker将确保消息在集群中的所有副本中都被复制成功。

3. 消息提交确认:当生产者发送消息后,在收到Broker的确认响应之前,生产者会等待。如果消息成功写入并复制到了指定的副本中,Broker会发送确认响应给生产者。如果生产者在指定的时间内没有收到确认响应,它将会尝试重新发送消息,以确保消息不会丢失。

4. 可靠性设置(同步刷盘:生产者可以配置一些参数来提高消息发送的可靠性。例如,可以设置`acks`参数来指定需要收到多少个Broker的确认响应才认为消息发送成功。可以将`acks`设置为`"all"`,表示需要收到所有副本的确认响应才算发送成功。

总之,Kafka Broker通过持久化和复制机制,以及消息确认和可靠性设置,确保生产者端的消息发送成功且不丢失。同时,应注意及时处理可能的错误情况,并根据生产者端需求和场景合理配置相应的参数。

另外,

参数 `acks` 是用来设置生产者在发送消息后等待确认响应的方式,可以设置以下三个值之一:

1. `acks=0`:生产者不会等待任何来自服务器的确认响应。消息被立即认为已发送成功,但这也意味着如果服务器没有成功接收消息,生产者将无法得知。这种设置下存在消息丢失的风险,因此并不推荐在关键业务中使用。

2. `acks=1`:生产者在消息被写入服务器的leader副本后会收到一个确认响应。这意味着leader副本已收到消息并写入磁盘,但其他副本尚未必需收到消息。这种设置下,生产者可以获得基本的消息可靠性,因为只要leader副本可达并写入成功,生产者就会收到一个确认。

  1. `acks=all`(或`acks=-1`):生产者在消息被写入服务器的所有leader副本后才会收到一个确认响应。这意味着所有副本都已成功接收并写入消息。这种设置下,生产者可以获得最高级别的消息可靠性,但会降低生产者的吞吐量,因为需要等待更多的确认

对于使用YAML文件进行Kafka配置的情况,你可以按照以下格式设置acks参数:

# Kafka生产者配置
producer:
  bootstrap.servers: your-kafka-server:9092
  acks: all        # 设置acks参数为"all"
  key.serializer: org.apache.kafka.common.serialization.StringSerializer
  value.serializer: org.apache.kafka.common.serialization.StringSerializer

需要根据具体的业务需求来选择适当的`acks`值。对于关键业务,建议使用`acks=all`以确保消息的完全可靠性。对于一些非关键的应用,轻微的消息丢失可能是可以接受的,可以使用`acks=1`来平衡可靠性和吞吐量。

3.消费者(Concumer)端 处理

Kafka Consumer 默认会确保消息的至少一次传递(at least once delivery)。这意味着当 Consumer 完成对一条消息的处理后,会向 Kafka 提交消息的偏移量(offset),告知 Kafka 这条消息已被成功处理。如果 Consumer 在处理消息时发生错误,可以通过回滚偏移量来重试处理之前的消息。

以下是一些确保消息消费成功的方法:

1. 使用自动提交偏移量(Auto Commit Offsets):默认情况下,Kafka Consumer 在消费消息后会自动提交偏移量。你可以通过设置 `enable.auto.commit` 属性为 `false` 来关闭自动提交,然后在成功处理消息后手动提交偏移量。这样可以确保只有在消息成功处理后才提交偏移量,以避免消息丢失。

2. 手动提交偏移量(Manual Commit Offsets):使用手动提交偏移量的方式可以更加精确地控制偏移量的提交时机。在成功处理消息后,通过调用 `commitSync()` 或 `commitAsync()` 方法来手动提交偏移量。你可以针对每个分区或每批消息进行偏移量的提交,以便在发生错误时能精确到达到处理过的最后一条消息。

3. 设置消费者的最大重试次数:你可以在消费消息的处理逻辑中实现重试机制,当处理失败时进行重试。可以使用一个计数器来限制重试次数,以防止无限重试导致循环消费消息。

4. 设置适当的消费者参数:根据你的需求,你可以根据消息量、处理能力等因素来调整消费者的配置参数,以确保消费者的性能和可靠性。例如,可以适当增加消费者的并行度(设置更多的线程或消费者实例)来提高吞吐量和容错性。

记住,尽管 Kafka 提供了可靠的消息传递机制,但仍然需要在消费者端实现适当的错误处理和重试逻辑,以处理可能发生的错误情况。

4.延申

  1. Kafka 写入磁盘的日志文件主要用于持久化消息数据以确保数据的可靠性和持久性。下面是一些作用:
  • 1. 数据持久化:Kafka使用日志文件来保存消息数据,确保即使在发生故障或重启后,数据也能够持久存储在磁盘上。这样可以有效地避免数据丢失。

  • 2. 数据复制:Kafka允许在不同的服务器之间进行数据复制,以提高容错能力和可用性。写入磁盘的日志文件可以被复制到其他副本中,以实现数据的冗余存储和故障恢复。

  • 3. 数据回放:Kafka的日志文件可以按顺序存储消息数据,使得可以根据偏移量(offset)进行可靠的数据回放操作。消费者可以根据需要重新读取并处理存储在日志文件中的消息。

  • 4. 顺序写入:Kafka通过将消息追加到日志文件末尾的方式进行写入,这种顺序写入的方式对于磁盘IO操作更为友好,可以提高写入性能和吞吐量。

总之,Kafka写入磁盘的日志文件可以确保消息数据的持久化、可靠性和顺序性,提供高性能的消息传递和数据处理能力。📝📁💾

什么是ACK?

ACK (Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。在TCP/IP协议中,如果接收方成功的接收到数据,那么会回复一个ACK数据。通常ACK信号有自己固定的格式,长度大小,由接收方回复给发送方。

什么是ISR?

ISR全称是“In-Sync Replicas”,也就是保持同步的副本,他的含义就是,跟Leader始终保持同步的Follower有哪些。所以每个Partition都有一个ISR,这个ISR里一定会有Leader自己,因为Leader肯定数据是最新的,然后就是那些跟Leader保持同步的Follower,也会在ISR里。Leader负责跟踪与维护ISR列表。如果一个 Follower 宕机,或者落后太多(落后多少,由参数replica.lag.time.max.ms控制),Leader 将把它从 ISR 中移除。如果Leader发生故障或挂掉,一个新Leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为Leader,新的Leader继续服务客户端的读写请求。

什么是HW?

HW俗称高水位,是HighWatermark的缩写。它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。

什么是LEO?

LEO (Log End Offset),标识当前日志文件中下一条待写入的消息的offset。LEO 的大小相当于当前日志分区中最后一条消息的offset值加1.分区 ISR 集合中的每个副本都会维护自身的 LEO ,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/238703.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙软考题目总结

试题及答案1 【习题】运行Hello World工程 判断题 1.DevEco Studio是开发HarmonyOS应用的一站式集成开发环境。 &#xff08;正确&#xff09; 2.main_pages.json存放页面page路径配置信息。&#xff08;正确&#xff09; 单选题 1.在stage模型中&#xff0c;下列配置文件…

TCP/IP:从数据包到网络的演变

引言 TCP/IP协议的起源可以追溯到20世纪60年代末和70年代初&#xff0c;美国国防部高级研究计划局&#xff08;ARPA&#xff09;研究开发一种可靠的通信协议&#xff0c;用于连接分散在不同地点的计算机和资源。 在当时&#xff0c;计算机之间的连接并不像现在这样普遍和便捷…

RocketMQ系统性学习-RocketMQ高级特性之消息大量堆积处理、部署架构和高可用机制

&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308; 【11来了】文章导读地址&#xff1a;点击查看文章导读&#xff01; &#x1f341;&#x1f341;&#x1f341;&#x1f341;&#x1f341;&#x1f341;&#x1f3…

算法通关村-番外篇排序算法

大家好我是苏麟 , 今天带来番外篇 . 冒泡排序 BubbleSort 最基本的排序算法&#xff0c;最常用的排序算法 . 我们以关键字序列{26,53,48,11,13,48,32,15}看一下排序过程: 代码如下 : (基础版) class Solution {public int[] sortArray(int[] nums) {for(int i 0;i < n…

简单了解一下当前火热的大数据 -- Kylin

神兽麒麟 一、Apache Kylin 是什么&#xff1f;二、Kylin架构结语 一、Apache Kylin 是什么&#xff1f; 由eBay公司中国团队研发&#xff0c;是一个免费开源的OLAP多维数据分析引擎优点 超快的响应速度&#xff0c;亚秒级支持超大数据集&#xff08;PB以上&#xff0c;千亿记…

力扣 | 75. 颜色分类

给定一个包含红色、白色和蓝色、共 n 个元素的数组 nums &#xff0c;原地对它们进行排序&#xff0c;使得相同颜色的元素相邻&#xff0c;并按照红色、白色、蓝色顺序排列。 我们使用整数 0、 1 和 2 分别表示红色、白色和蓝色。 必须在不使用库内置的 sort 函数的情况下解决…

天津web前端就业培训班,Web机构选择重点

Web前端培训是目前非常热门的培训领域之一。很多领域都会涉及到web前端开发&#xff0c;比如传统互联网、房地产、金融、游戏、影视传媒等行业都需要web前端技术的支持。越来越多的企业和个人也需要建立自己的网站和移动应用程序&#xff0c;因此市场对web前端工程师的需求是非…

Dubbo秘密传递:让你的代码行云流水

在Dubbo使用过程中&#xff0c;有一些通用参数如果显式的在接口参数中传递&#xff0c;使用起来会有一些不方便。 比如&#xff1a;经常使用的用户信息、token等上下文信息&#xff0c;如果都放入参数中传递&#xff0c;这样会耦合到业务逻辑中了&#xff0c;这时隐式传参就是…

学习鸿蒙开发需要报培训班吗?

学习鸿蒙开发是否需要报培训班&#xff0c;取决于个人的学习需求和实际情况。 对于初学者来说&#xff0c;参加培训班可以提供系统的学习资源&#xff0c;有专业的老师指导&#xff0c;能够帮助快速入门&#xff0c;掌握基本知识和技能。同时&#xff0c;培训班通常会提供实践…

Linux 磁盘空间占满故障解决方法

故障排查&#xff1a; 使用命令查看磁盘使用量 # 使用人类可读的格式(预设值是不加这个选项的...) df -h # --inodes 列出 inode 资讯&#xff0c;不列出已使用 block df -i # 查看当前目录下各个文件及目录占用空间大小 du -sh / 情况一&#xff1a;一般磁盘空间满了&a…

vue3 使用addRoute动态添加路由,页面刷新就白屏解决办法

问题&#xff0c;通过接口动态添加路由&#xff0c;第一次从登录页跳转还是正常的&#xff0c;说明路由添加成功了&#xff0c;但是刷新后就白屏了&#xff0c;且控制台报错路由匹配不到&#xff0c;在项目的main.js&#xff0c;router和路由拦截器中添加了一大堆打印后发现&am…

【前缀和】【单调栈】LeetCode2281:巫师的总力量和

作者推荐 map|动态规划|单调栈|LeetCode975:奇偶跳 涉及知识点 单调栈 C算法&#xff1a;前缀和、前缀乘积、前缀异或的原理、源码及测试用例 包括课程视频 题目 作为国王的统治者&#xff0c;你有一支巫师军队听你指挥。 给你一个下标从 0 开始的整数数组 strength &…

isp代理/双isp代理/数据中心代理的区别?如何选择?

本文我们来详细科普一下几种不同的代理类型&#xff1a;isp代理/双isp代理/数据中心代理&#xff0c;了解他们的区别&#xff0c;选择更适合自己的代理类型。 在讲述这几种代理类型之前&#xff0c;我们先复习一下代理大类有哪几种。 一、机房代理和非机房代理 在做代理ip选…

一文弄懂kubernetes之Service

目录 ServiceService工作流程kube-proxyuserspaceiptablesIPVS EndpointsService负载分发策略Service属性Service定义多端口Service外部服务ServiceHeadless Services Service 在 kubernetes 中&#xff0c;Pod 是有生命周期的&#xff0c;如果 Pod 重启 IP 很有可能会发生变化…

【玩转TableAgent数据智能分析】借助全球高校数据多维度分析案例,体验TableAgent如何助力用户轻松洞察数据,赋能企业高效数智化转型

目录 前言 一、TableAgent介绍及其优势&#xff1f; 1、会话式数据分析&#xff0c;所需即所得 2、私有化部署&#xff0c;数据安全 3、支持企业级数据分析,大规模&#xff0c;高性能 4、支持领域微调&#xff0c;专业化 5、透明化过程&#xff0c;审计部署 二、使用Ta…

一起免费玩XG24-EK2703A板卡开发板,还有额外奖励等你拿!

hello大家好&#xff0c;我是硬核王同学&#xff0c;今天又看到了一个适合嵌入式初学者的免费参加的活动&#xff0c;迫不及待地就来跟大家分享&#xff01; Funpack活动是硬禾学堂联合DigiKey发起的“玩成功就全额退”活动。第一季和第二季已圆满结束&#xff0c;现在是第三季…

用radis扩展websockets服务

Redis可以存储会话数据&#xff0c;这使得不同的服务器可以共享WebSocket连接的状态。这意味着如果用户连接到服务器 A 然后重新连接到服务器 B&#xff0c;服务器 B 将知道现有连接。 此外&#xff0c;Redis 提供发布/订阅功能&#xff0c;使其成为向所有连接的客户端广播消息…

【Vue】动态合并行

前言&#xff1a; 开发中会经常使用到表格例如el-table&#xff0c;还会经常用到合并行或合并列&#xff0c;el-table提供了对应的方法&#xff0c;但是官方文档中的方法是固定的行数或列数&#xff0c;如果我们想要根据接口获取到的动态数据去合并行或合并列应该怎么实现呢&am…

Vue中的插槽和自定义指令

目录 一、插槽 1.默认插槽 2.具名插槽 3.作用域插槽 二、自定义指令 全局注册自定义指令 执行一次性初始化设置 组件vnode更新触发 局部注册自定义指令 一、插槽 父组件传递模板给子组件&#xff0c;子组件使用插槽声明slot元素承载分发内容出口。 1.默认插槽 父组件…

maven限制内存使用峰值/最大内存

前言 通过设置虚拟机的内存大小&#xff0c;达到限制maven内存使用峰值的效果 方法1&#xff1a;修改mvn脚本 找到mvn脚本在MAVEN_OPTS参数值添加-Xms、-Xmx参数&#xff1a;MAVEN_OPTS"$MAVEN_OPTS -Xms512m -Xmx512m"效果图 windows系统下修改MAVEN_OPTS参数 …