Kafka 中的幂等机制

Kafka 中的 幂等性(Idempotence) 是生产者端的重要机制,旨在确保即使在网络抖动、重试、Broker 重启等情况下,同一条消息不会被重复写入到 Topic 中。这是实现可靠消息传递、避免重复消费的关键手段之一。

✅ 什么是幂等性?

简单说:无论一个操作执行多少次,结果都是一样的。

在 Kafka 中,幂等性意味着:

相同的消息,即使发送多次,也只会被写入一次,且不会重复出现在日志中。

✅ Kafka 幂等性的作用场景

生产者可能会因为以下情况 重试发送 消息:

  • 网络超时,未收到 Broker 的 ack;
  • Kafka Broker 重启;
  • 客户端主动重试(retries > 0);
  • Leader 重新选举。

这些重试可能会导致:同一条消息写入多次,从而带来“重复消费”的问题。

Kafka 的幂等性功能可以自动解决这个问题,不用你在应用层手动做去重。

✅ 如何开启幂等性?

从 Kafka 0.11 版本开始支持幂等性。

✔ 开启方式

Kafka 2.0 版本之后,幂等性可以通过如下方式开启:

Properties props = new Properties();
props.put("enable.idempotence", "true");  // ✅ 显式开启
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

注意

  • acks=all 是开启幂等性的默认要求。
  • Kafka 2.5+ 中,enable.idempotence 默认就是 true。

✅ Kafka 是如何实现幂等性的?

Kafka 利用了以下几个机制:

1. Producer ID(PID)

  • 每个生产者初始化时,Kafka 分配一个唯一的 PID(Producer ID)。
  • Kafka 会记住这个 PID 发给哪个 Partition 了哪些消息。

2. Sequence Number(序列号)

  • Kafka 给每个消息分配一个自增的 Sequence Number每个 Partition 单独维护
  • Broker 在每个 Partition 中,记录下最近收到的 PID 和对应的序号。

✨ Kafka 判断是否是重复消息的规则:

如果某个 PID + Partition 下,收到一条消息,其 Sequence Number 是重复的或小于上一次的,说明是重试的重复消息,Kafka 会自动丢弃它

✅ 幂等性 vs 事务,有什么区别?

特性幂等性(Idempotence)事务(Transaction)
作用避免消息重复写入保证多条消息的原子提交
粒度单条消息一组消息
范围单个 partition、单个 producer多 partition、消费者偏移、多个 Topic
是否有回滚❌ 无✅ 有
消费者是否感知❌ 不感知✅ read_committed 下感知

可以理解为:

幂等性是事务的基础。Kafka 启用事务时,会自动启用幂等性,但单独开启幂等性不等于开启事务。

✅ 使用幂等性的推荐配置

enable.idempotence=true      ✅ 开启幂等性
acks=all                     ✅ 所有副本都要确认
retries=Integer.MAX_VALUE    ✅ 无限重试,确保最终写入成功
max.in.flight.requests.per.connection=1(旧版本)✅ 限制同时请求数,确保顺序(Kafka 2.4+ 可放宽为5)

⚠️ 若你设置 max.in.flight.requests.per.connection > 1,在旧版本 Kafka(<2.4)中可能会造成乱序+重复写入,不再幂等

✅ 总结一句话

Kafka 幂等性 = 在网络失败或客户端重试时,确保消息只被写入一次,自动去重,避免重复消费问题。

它是 实现可靠消息系统的第一步,在开启事务或处理金融、支付等关键数据时非常重要。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/77199.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用c语言写一个linux进程之间通信(聊天)的简单程序

使用talk 用户在同一台机器上talk指令格式如下&#xff1a; ​ talk 用户名ip地址 [用户终端号] 如果用户只登录了一个终端&#xff0c;那么可以不写用户终端号&#xff0c;如&#xff1a; talk userlocalhost可以使用who指令来查看当前有哪些用户登录&#xff0c;他的终端号…

深入探索Scala:从基础到进阶的全面总结

在大数据技术领域&#xff0c;Scala语言凭借其独特优势占据重要地位。它与Spark紧密相连&#xff0c;为大数据计算提供强大支持。今天&#xff0c;让我们一同深入回顾Scala从基础到进阶的关键知识点。 Scala开发环境搭建是入门的第一步&#xff0c;需确保JDK安装成功&#xff0…

【每日一个知识点】分布式数据湖与实时计算

在现代数据架构中&#xff0c;分布式数据湖&#xff08;Distributed Data Lake&#xff09; 结合 实时计算&#xff08;Real-time Computing&#xff09; 已成为大数据处理的核心模式。数据湖用于存储海量的结构化和非结构化数据&#xff0c;而实时计算则确保数据能够被迅速处理…

GPT-5、o3和o4-mini即将到来

原计划有所变更: 关于我们应有何期待的一些零散想法。 深度研究(Deep Research)确实强大但成本高昂且速度较慢(当前使用o3模型)。即将推出的o4-mini在性能上可能与o3相近,但将突破这些限制,让全球用户——甚至免费用户(尽管会有速率限制)——都能用上世界顶级AI研究助…

Spring Cloud LoadBalancer负载均衡+算法切换

目录 介绍核心功能负载均衡启动两个支付服务订单模块引入依赖LoadBalanced 注解启动订单服务测试结果 负载均衡算法切换总结 介绍 Spring Cloud LoadBalancer 是 Spring Cloud 提供的客户端负载均衡解决方案&#xff0c;提供更现代化的 API 和更好的 Spring 生态系统集成。它支…

Chrome 浏览器插件收录

1. Responsive Viewer 可以在同个窗口内&#xff0c;针对同一网站&#xff0c;添加多个不同设备屏幕显示。 在前端开发&#xff0c;需要多端适配&#xff0c;尤其是移动端响应式适配的网站开发中&#xff0c;可以同时测试多个不同屏幕的适配效果。 2. VisBug 提供工具栏&#x…

SQL 函数概述

SQL 函数概述 SQL 函数可以分为几大类&#xff0c;不同数据库系统可能有略微不同的实现。以下是主要的 SQL 函数分类&#xff1a; 1. 聚合函数 (Aggregate Functions) COUNT() - 计算行数 SUM() - 计算总和 AVG() - 计算平均值 MIN() - 找最小值 MAX() - 找最大值 GROUP…

MySQL学习笔记九

第十一章使用数据处理函数 11.1函数 SQL支持函数来处理数据但是函数的可移植性没有SQL强。 11.2使用函数 11.2.1文本处理函数 输入&#xff1a; SELECT vend_name,UPPER(vend_name) AS vend_name_upcase FROM vendors ORDER BY vend_name; 输出&#xff1a; 说明&#…

认识vue中的install和使用场景

写在前面 install 在实际开发中如果你只是一个简单的业务实现者&#xff0c;那么大部分时间你是用不到install的&#xff0c;因为你用到的基本上都是别人封装好的插件、组件、方法、指令等等&#xff0c;但是如果你需要给公司的架构做建设&#xff0c;install就是你避不开的一个…

【SpringCloud】构建分布式系统的利器

一、引言 在当今数字化时代&#xff0c;随着业务规模的不断扩大和用户量的急剧增长&#xff0c;单体应用逐渐暴露出诸多局限性&#xff0c;如可扩展性差、维护困难等。分布式系统应运而生&#xff0c;而 Spring Cloud 则成为了构建分布式系统的热门框架之一。它提供了一系列丰…

mkdir通配符详解

在 mkdir 命令中使用通配符可以简化批量创建目录的操作。通配符如 {} 和 * 可以用来生成多个目录名称&#xff0c;从而减少重复输入。以下是一些常见的使用方法和示例。 使用 {} 通配符 {} 通配符可以用来生成一系列的目录名称&#xff0c;语法如下&#xff1a; mkdir dir_{…

Transformer的Word Embedding

一、Transformer 中的词嵌入是什么&#xff1f; 1. 定义与作用 • 词嵌入&#xff08;Word Embedding&#xff09;&#xff1a;将离散的词语映射为低维连续向量&#xff0c;捕捉语义和语法信息。 • 在 Transformer 中的位置&#xff1a; • 输入层&#xff1a;每个词通过嵌入…

Linux 进程间通信:信号机制

Linux 进程间通信&#xff1a;信号机制 在多进程操作系统中&#xff0c;进程之间的通信至关重要&#xff0c;尤其是在Linux系统中&#xff0c;信号&#xff08;Signal&#xff09;作为一种特殊的进程间通信方式&#xff0c;广泛用于进程之间的协调和控制。信号可以看作是操作系…

基于TRIZ创新方法论的九屏法分析系统

1. 文件头与库导入 # -*- coding: utf-8 -*- import streamlit as st import pandas as pd import numpy as np import plotly.graph_objects as go from datetime import datetime from sklearn.ensemble import RandomForestRegressor ​​作用​​&#xff1a;设置文件编码…

【LangChain框架组成】 LangChain 技术栈的模块化架构解析

目录 整体架构概述 整体架构层级划分 模块详细解析 1. 部署与服务层&#xff08;LangServe & Deployments&#xff09; 2. 应用模板层&#xff08;Templates & Committee Architectures&#xff09; 3. 核心功能层&#xff08;LangChain&#xff09; 4. 社区扩展…

自定义数据结构的QVariant序列化 ASSERT failure in QVariant::save: “invalid type to save“

自定义数据结构放入QVariant&#xff0c;在序列化时抛出异常 ASSERT failure in QVariant::save: “invalid type to save” 自定义数据结构如struct MyData&#xff0c;除了要在结构体后面加 struct MyData { ... } Q_DECLARE_METATYPE(MyData)如果需要用到流的输入输出&…

vxe-table 启用 checkbox-config.reserve 实现分页复选框选择功能、获取已选数据的用法

vxe-table 启用 checkbox-config.reserve 实现分页复选框选择功能、获取已选数据的用法 查看官网&#xff1a;https://vxetable.cn gitbub&#xff1a;https://github.com/x-extends/vxe-table gitee&#xff1a;https://gitee.com/x-extends/vxe-table 效果 代码 获取已选择…

蓝桥杯-门牌制作

题目描述 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 小蓝要为一条街的住户制作门牌号。 这条街一共有 20202020 位住户&#xff0c;门牌号从 11 到 20202020 编号。 小蓝制作门牌的方法是先制作 00 到 99 这几个数字…

C#调用Lua方法1+C#调用Lua方法2,3

xLua中Lua调用C#代码 原因&#xff1a;C#实现的系统&#xff0c;因为Lua可以调用&#xff0c;所以完全可以换成Lua实现&#xff0c;因为Lua可以即时更改&#xff0c;即时运行&#xff0c;所以游戏的代码逻辑就可以随时更改。 实现和C#相同效果的系统&#xff0c;如何实现&#…

macOS Chrome - 打开开发者工具,设置 Local storage

文章目录 macOS Chrome - 打开开发者工具设置 Local storage macOS Chrome - 打开开发者工具 方式2&#xff1a;右键点击网页&#xff0c;选择 检查 设置 Local storage 选择要设置的 url&#xff0c;显示右侧面板 双击面板&#xff0c;输入要添加的内容 2025-04-08&#xff…