Kafka核心参数与使用02

一、从基础的客户端说起

Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。

1. 消息发送者主流程

一个最基础的 Producer 发送消息的步骤如下:

  1. 设置 Producer 核心属性

    • 例如:bootstrap.servers(集群地址)、key.serializervalue.serializer 等。
    • 大多数核心配置在 ProducerConfig 中都有对应的注释说明。
  2. 构建消息

    • Kafka 消息是一个 Key-Value 结构,Key 常用于分区路由,Value 则是业务真正要传递的内容。
  3. 发送消息

    • 单向发送producer.send(record); 仅发出消息,不关心服务端响应。
    • 同步发送producer.send(record).get(); 获取服务端响应前会阻塞。
    • 异步发送producer.send(record, callback); 服务端响应时会回调。

示例核心代码示意:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String,String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =new ProducerRecord<>(TOPIC, key, value);
// 单向发送
producer.send(record);
// 同步发送
producer.send(record).get();
// 异步发送
producer.send(record, new Callback() { ... });

2. 消息消费者主流程

Consumer 侧,同样有三步:

  1. 设置 Consumer 核心属性

    • 例如:bootstrap.serversgroup.idkey.deserializervalue.deserializer 等。
  2. 拉取消息

    • Kafka 采用 Pull 模式:消费者主动调用 poll() 拉取消息。
  3. 处理消息,提交位点

    • 手动提交:consumer.commitSync()consumer.commitAsync()
    • 自动提交:设置 enable.auto.commit = true 及相应提交周期参数。

示例核心代码示意:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 业务处理}// 手动提交 offsetconsumer.commitSync();
}

二、从客户端属性来梳理客户端工作机制

Kafka 的核心特色在于高并发、高吞吐以及在网络不稳定、服务随时会崩溃等复杂场景下仍能保证消息安全性。以下属性与机制能帮助我们从客户端视角理解 Kafka。

1. 消费者分组消费机制

  • Group 机制
    每个 Consumer 都会指定一个 group.id。同一个 Consumer Group 内,Topic 的每个 Partition 只会被同组里的一个 Consumer 消费。不同组之间则是互不影响、各自消费。
  • offset 提交
    • offset 保存在 Broker 端,但由 Consumer “主导”提交。
    • 提交方式有:
      • 同步:commitSync(),安全但速度慢;
      • 异步:commitAsync(),快但可能丢失或重复消费。
    • auto.offset.reset
      • 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(earliest, latest, none)决定从何处开始消费。

提示:Offset 提交与消息处理之间并非完全同步,一旦无法保证强一致性,可能出现消息重复与消息丢失。可根据业务需求与场景选择手动提交或自动提交,也可将 offset 存入外部存储(如 Redis)自行管理。

2. 生产者拦截器机制

  • 通过配置 interceptor.classes 可以指定一个或多个实现了 ProducerInterceptor 接口的拦截器。
  • 典型功能:在发送前统一添加/修改消息内容,或者在发送后做监控/统计等操作。

3. 消息序列化机制

  • Producer 端:
    • key.serializer / value.serializer:将对象序列化为 byte[]
    • 内置如 StringSerializerIntegerSerializer 等;可自定义自定义序列化类。
  • Consumer 端:
    • key.deserializer / value.deserializer:将 byte[] 反序列化为业务对象。
  • 如果使用自定义类型(POJO)进行传输,则需要编写自定义 Serializer/Deserializer。
    • 核心思想:定长字段不定长字段的序列化与反序列化。

4. 消息分区路由机制

  • Producer 侧:
    • 通过 partitioner.class 指定自定义的分区器(Partitioner 接口)。Kafka 内置默认逻辑:
      • 若无 key,则采用黏性分区策略(Sticky Partition);
      • 若指定 key,则对 key 进行哈希得到分区;
      • 也可改为轮询策略(RoundRobinPartitioner)。
  • Consumer 侧:
    • 通过 partition.assignment.strategy 指定分区分配策略,内置 RangeAssignor, RoundRobinAssignor, StickyAssignor, CooperativeStickyAssignor 等。
      • Range:按顺序将分区切块分配。
      • RoundRobin:轮询分配。
      • Sticky:尽可能保持现有分配不变,同时保证分配均匀。

5. 生产者消息缓存机制

生产者为了提高吞吐量,会将消息先写入一个本地缓存(RecordAccumulator),然后 sender 线程批量发送到 Broker:

  • buffer.memory:缓存总大小,默认 32MB。
  • batch.size:每个分区发送批次大小,默认 16KB。
  • linger.ms:即便 batch.size 未填满,等待 linger.ms 毫秒后也会将消息批量发送。
  • max.in.flight.requests.per.connection:同一连接上未收到响应的请求数上限。

6. 发送应答机制

  • acks 用于控制生产者发送完消息后何时认为消息“成功”:
    • acks=0:不等待 Broker 确认,吞吐量高,安全性低。
    • acks=1:只等待 Leader 分区写入,常见设置。
    • acks=all-1:等待所有副本写入,安全性最高,吞吐量相对低。
  • 还需配合 Broker 端 min.insync.replicas 参数,控制副本个数不足时直接返回错误。

7. 生产者消息幂等性

  • 为保证 Exactly-once 语义,需要开启 enable.idempotence(幂等性)。
  • 幂等性主要依赖 PID + SequenceNumber 机制:
    • Producer 向同一分区发送消息时,每条消息都有一个单调递增的序列号。
    • Broker 针对 <PID, Partition> 维护序列号,只接收递增消息,防止消息重复写入。
  • 幂等性要求:
    • acks=all
    • retries>0
    • max.in.flight.requests.per.connection<=5

8. 生产者消息事务

  • 幂等性只能保证单个分区的 Exactly-once,如果涉及 多个分区/Topic 则需要“事务”来保证一批消息的一致性。
  • 主要 API:
    • initTransactions()
    • beginTransaction()
    • commitTransaction()
    • abortTransaction()
  • 事务依赖于 transaction.id 来区分不同的 Producer 实例,以便在崩溃重启后继续补偿或回滚先前未完成的事务,保证多分区的一致写入。

三、客户端流程总结

  1. Producer:

    • 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到 RecordAccumulatorSender 线程批量发送到 Broker → 按 acks 等待 Broker 响应 → 提交或重试。
  2. Consumer:

    • 属性配置(反序列化、消费组、分区分配策略等) → poll() 拉取消息 → 业务处理 → 提交 offset(手动或自动),与 Broker 同步消费进度。
  3. 重点:

    • 消息在 Producer 端的缓存发送机制消息在 Consumer 端的主动拉取、分组消费、offset 提交 是理解 Kafka 高并发、高吞吐、高可用的关键。
    • 其他如 幂等性(保证单分区 Exactly-once)和 事务(保证多分区一致性)是针对数据安全性和业务需求的更深入扩展。

四、Spring Boot 集成 Kafka

Spring Boot 中集成 Kafka 本质也是对上述 Producer/Consumer 的封装。

  1. 引入依赖

    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. application.properties 中配置 Kafka 相关参数

    • 和原生 Kafka 参数名称基本一致,如 spring.kafka.producer.*spring.kafka.consumer.* 等。
    • 典型参数:bootstrap-servers, acks, batch-size, enable-auto-commit, auto-offset-reset 等。
  3. 使用 KafkaTemplate 发送消息

    @RestController
    public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/send/{message}")public void sendMessage(@PathVariable("message") String msg) {kafkaTemplate.send("topic1", msg);}
    }
    
  4. 使用 @KafkaListener 声明消息消费者

    @Component
    public class KafkaConsumerListener {@KafkaListener(topics = {"topic1"})public void onMessage(ConsumerRecord<?, ?> record) {System.out.println("消费内容:" + record.value());}
    }
    

 


结语

  • 想要真正掌握 Kafka,重点在于建立整体的数据流转模型
    • Producer 端如何将消息分区、缓存、发送、应答、重试、保证幂等与事务;
    • Consumer 端如何分组消费、订阅分区、拉取消息、提交 offset。
  • 熟悉这些机制后,再去看各种客户端配置就会轻松许多,能够结合实际业务场景做灵活配置与调优。
  • Spring Boot 也只是对原生 Kafka 客户端的进一步封装,一旦理解 Kafka 底层机制与各项参数原理,使用 Spring Boot 时只需“对号入座”地进行配置即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/891983.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

性能测试工具的原理与架构解析

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 在软件开发与运维领域&#xff0c;性能测试是确保系统稳定、高效运行的关键环节。性能测试工具作为实现这一目标的重要工具&#xff0c;通过模拟真实用户行为和负载…

【insert 插入数据语法合集】.NET开源ORM框架 SqlSugar 系列

系列文章目录 &#x1f380;&#x1f380;&#x1f380; .NET开源 ORM 框架 SqlSugar 系列 &#x1f380;&#x1f380;&#x1f380; 文章目录 系列文章目录一、前言 &#x1f343;二、插入方式 &#x1f4af;2.1 单条插入实体2.2 批量 插入实体2.3 根据字典插入2.4 根据 Dat…

记一次k8s下容器启动失败,容器无日志问题排查

问题 背景 本地开发时&#xff0c;某应用增加logback-spring.xml配置文件&#xff0c;加入必要的依赖&#xff1a; <dependency><groupId>net.logstash.logback</groupId><artifactId>logstash-logback-encoder</artifactId><version>8…

【PPTist】批注、选择窗格

前言&#xff1a;本篇文章研究批注和选择窗格两个小功能 一、批注 批注功能就是介个小图标 点击可以为当前页的幻灯片添加批注&#xff0c;还能删除之前的批注 如果我们增加了登录功能&#xff0c;还可以在批注上显示当前的用户名和头像&#xff0c;不过现在是写死的。 左侧…

使用Paddledetection进行模型训练【Part1:环境配置】

目录 写作目的 安装文档 环境要求 版本依赖关系 安装说明 写作目的 方便大家进行模型训练前的环境配置。 安装文档 环境要求 PaddlePaddle &#xff1e;&#xff1d;2.3.2OS 64位操作系统Python 3(3.5.1/3.6/3.7/3.8/3.9/3.10)&#xff0c;64位版本pip/pip3(9.0.1)&am…

C++ scanf

1.scanf概念解释&#xff1a; C语言兼容C语言中的基本语句语法,scanf语句是C语言中的输入语句,在C语言环境中也可以使用。对于大数据的输入使用scanf比C的输入cin效率高、速度快。 scanf称为格式输入函数,其关键字最末一个字母f即为是格式"(format)之意",其意义是按指…

数学建模入门——描述性统计分析

摘要&#xff1a;本篇博客主要讲解了数学建模入门的描述性统计分析&#xff0c;包括基本统计量的计算、数据的分布形态、数据可视化和相关性分析。 往期回顾&#xff1a; 数学建模入门——建模流程-CSDN博客 数学建模入门——数据预处理&#xff08;全&#xff09;-CSDN博客 …

30、论文阅读:基于小波的傅里叶信息交互与频率扩散调整的水下图像恢复

Wavelet-based Fourier Information Interaction with Frequency Diffusion Adjustment for Underwater Image Restoration 摘要介绍相关工作水下图像增强扩散模型 论文方法整体架构离散小波变换与傅里叶变换频率初步增强Wide Transformer BlockSpatial-Frequency Fusion Block…

Zero to JupyterHub with Kubernetes 下篇 - Jupyterhub on k8s

前言&#xff1a;纯个人记录使用。 搭建 Zero to JupyterHub with Kubernetes 上篇 - Kubernetes 离线二进制部署。搭建 Zero to JupyterHub with Kubernetes 中篇 - Kubernetes 常规使用记录。搭建 Zero to JupyterHub with Kubernetes 下篇 - Jupyterhub on k8s。 官方文档…

Matlab回归预测大合集(不定期更新)-188

截至2025-1-2更新 1.BP神经网络多元回归预测&#xff08;多输入单输出&#xff09; 2.RBF神经网络多元回归预测&#xff08;多输入单输出&#xff09; 3.RF随机森林多元回归预测&#xff08;多输入单输出&#xff09; 4.CNN卷积神经网络多元回归预测&#xff08;多输入单输…

【读书与思考】历史是一个好东西

【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】【读书与思考】 导言 以后《AI日记》专栏我想专注于 AI 相关的学习、成长和工作等。而与 AI 无关的一些读书、思考和闲聊&#xff0c;我打算写到这里&#xff0c;我会尽量控制自己少想和少写。 下图的一些感想…

Git使用mirror备份和恢复

Git使用mirror备份和恢复 使用到的命令总结备份1.进入指定代码仓库&#xff0c;拷贝地址2.进入要备份到的文件夹&#xff0c;右键打开git命令行&#xff0c;输入以下命令3.命令执行完成后会生成一个新文件夹 恢复1.在gitee上创建代码仓库![请添加图片描述](https://i-blog.csdn…

人工智能的可解释性:从黑箱到透明

✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ ​​ ​ 人工智能&#xff08;AI&#xff09;的快速发展和广泛应用&#xff0c;带来了许多革新的成果&#xff0c;但也引发了对其透明性和可解释…

Nacos注册中心介绍及部署

文章目录 Nacos注册中心介绍及部署1. 注册中心简介2. 注册中心原理3. Nacos部署-基于Docker3.1 Nacos官网下载3.2 基础数据信息3.3 环境信息3.4 docker安装部署3.5 测试3.5 测试 Nacos注册中心介绍及部署 1. 注册中心简介 Spring Cloud注册中心是Spring Cloud微服务架构中的一…

Nginx与frp结合实现局域网和公网的双重https服务

背景&#xff1a; 因为局域网内架设了 tiddlywiki、 Nextcloud 等服务&#xff0c;同时也把公司的网站架设在了本地&#xff0c;为了实现局域网直接在局域网内访问&#xff0c;而外部访问通过frps服务器作为反向代理的目的&#xff0c;才有此内容。 实现的效果如下图琐事 不喜欢…

zephyr移植到STM32

Zephy如何移植到单片机 1. Window下搭建开发环境1.1 安装Choncolatey1.2 安装相关依赖1.3创建虚拟python环境1.4 安装west1.4.1 使用 pip 安装 west1.4.2 检查 west 安装路径1.4.3 将 Scripts路径添加到环境变量1.4.4 验证安装 1.5 获取zephyr源码和[安装python](https://so.cs…

【分糖果——DFS】

题目 代码1 #include <bits/stdc.h> using namespace std; set<string> s; void dfs(int num1, int num2, int u, string ans) {if (u 7){if (num1 num2 > 5)return;ans (char)((num1) * 17 num2);s.insert(ans);return;}for (int i 0; i < num1; i){f…

【HarmonyOS】鸿蒙应用实现屏幕录制详解和源码

【HarmonyOS】鸿蒙应用实现屏幕录制详解和源码 一、前言 官方文档关于屏幕录制的API和示例介绍获取简单和突兀。使用起来会让上手程度变高。所以特意开篇文章&#xff0c;讲解屏幕录制的使用。官方文档参见&#xff1a;使用AVScreenCaptureRecorder录屏写文件(ArkTS) 二、方…

解决在VS2019/2022中编译c++项目报错fatal error C1189: #error : “No Target Architecture“

解决在VS2019/2022中编译c项目报错fatal error C1189: #error : “No Target Architecture” 报错原因 在winnt.h中&#xff0c;不言而喻&#xff0c;一目了然&#xff1a; 代码节选&#xff1a; #if defined(_AMD64_) || defined(_X86_) #define PROBE_ALIGNMENT( _s ) TY…

Python教程丨Python环境搭建 (含IDE安装)——保姆级教程!

工欲善其事&#xff0c;必先利其器。 学习Python的第一步不要再加收藏夹了&#xff01;提高执行力&#xff0c;先给自己装好Python。 1. Python 下载 1.1. 下载安装包 既然要下载Python&#xff0c;我们直接进入python官网下载即可 Python 官网&#xff1a;Welcome to Pyt…