12、Kafka中位移提交那些事儿

Kafka中位移提交那些事儿

  • 1、自动提交位移
  • 2、手动提交位移
    • 2.1、同步提交位移
    • 2.2、异步提交位移
    • 2.3、更精细化的位移管理

Consumer 端有个位移的概念,它和消息在分区中的位移不是一回事儿,虽然它们的英文都是 Offset。今天我们要聊的位移是 Consumer 的消费位移,它记录了 Consumer 要消费的下一条消息的位移。这可能和你以前了解的有些出入,不过切记是下一条消息的位移,而不是目前最新消费消息的位移。

举个例子。假设一个分区中有10条消息,位移分别是0到9。某个Consumer应用已消费了5条消息,这就说明该Consumer消费了位移为0到4的5条消息,此时Consumer的位移是5,指向了下一条消息的位移。

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据

提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于X的消息你都已经成功消费了。

这一点特别关键。因为位移提交非常灵活,你完全可以提交任何位移值,但由此产生的后果你也要一并承担。

位移提交的语义保障是由你来负责的,Kafka只会“无脑”地接受你提交的位移。

鉴于位移提交甚至是位移管理对Consumer端的巨大影响,Kafka,特别是KafkaConsumer API,提供了多种提交位移的方法。从用户的角度来说,位移提交分为自动提交和手动提交;从Consumer端的角度来说,位移提交分为同步提交和异步提交。

1、自动提交位移

把参数 enable.auto.commit 设置为 true。

一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始拉取消息时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费

在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

2、手动提交位移

首先设置 enable.auto.commit 为 false,然后调用相应的 API 手动提交位移。

2.1、同步提交位移

最简单的 API 就是 KafkaConsumer#commitSync()。该方法会提交 KafkaConsumer#poll() 返回的最新位移。从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。下面这段代码展示了 commitSync() 的使用方法:

while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));// 处理消息process(records);try {consumer.commitSync();} catch (CommitFailedException e) {// 处理提交失败异常handle(e);}
}

手动提交位移,它的好处就在于更加灵活,你完全能够把控位移提交的时机和频率。但是,它也有一个缺陷,就是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。当然,你可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。

2.2、异步提交位移

鉴于同步提交位移的阻塞问题,Kafka 社区为手动提交位移提供了另一个 API 方法: KafkaConsumer#commitAsync()。从名字上来看它就不是同步的,而是一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。

while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息consumer.commitAsync((offsets, exception) -> {if (exception != null)handle(exception);});
}

2.3、更精细化的位移管理

commitAsync 是否能够替代 commitSync 呢?答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

显然,如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能达到最理想的效果,原因有两个:

  1. 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
  2. 我们不希望程序总处于阻塞状态,影响 TPS。

我们来看一下下面这段代码,它展示的是如何将两个API方法结合使用进行手动提交。

try{while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息commitAysnc(); // 使用异步提交规避阻塞}
} catch(Exception e) {handle(e); // 处理异常
} finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close();}
}

这段代码同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性,所以,如果你需要自行编写代码开发一套 Kafka Consumer 应用,那么我推荐你使用上面的代码范例来实现手动的位移提交。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/228273.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

千亿露酒市场的未来之“露”

执笔 | 尼 奥 编辑 | 扬 灵 12月15日&#xff0c;以“以美为酿&#xff0c;品致未来”为主题的中国露酒产业发展大会暨露酒价值论坛在“中国酒都”宜宾举办。 近年来&#xff0c;露酒产业发展异军突起&#xff0c;市场销售规模超越黄酒、葡萄酒品类&#xff0c;成为中国酒…

正则表达式IP地址

正则表达式基础语法 正则表达式-字符类 [abc]&#xff1a;代表a或者b&#xff0c;或者c字符中的一个。 [^abc]&#xff1a;代表除a,b,c以外的任何字符。 [a-z]&#xff1a;代表a-z的所有小写字符中的一个。 [A-Z]&#xff1a;代表A-Z的所有大写字符中的一个。 [0-9]&#xff…

人工智能文本分类

在本文中&#xff0c;我们全面探讨了文本分类技术的发展历程、基本原理、关键技术、深度学习的应用&#xff0c;以及从RNN到Transformer的技术演进。文章详细介绍了各种模型的原理和实战应用&#xff0c;旨在提供对文本分类技术深入理解的全面视角。 一、引言 文本分类作为人工…

期末总复习(重点!!!)

一、第6章异常处理 1、什么是异常、什么是异常处理异常是指程序在运行过程中发生的错误事件&#xff0c;影响程序的正常执行。异常并不是一定会发生&#xff0c;默认情况下&#xff0c;程序运行中遇到异常时将会终止&#xff0c;并在控制台打印出异常出现的堆栈信息。异常处理…

在线客服系统定价因素解析:影响价格的关键因素

跨境电子商务公司必不可少的工具就是在线客服系统。企业选择在线客服系统的时候免不了要对不同产品的功能性、价格、服务等因素进行考量。今天这篇文章&#xff0c;我们就来探讨一下在线客服系统的定价因素有哪些&#xff1f;探究市面上的在线客服系统价格各异的影响因素。为大…

Lambda 的表达式作用域(Lambda Scopes)

文章目录 讲一下 Lambda 的表达式作用域&#xff08;Lambda Scopes&#xff09;。访问局部变量访问字段和静态变量访问默认接口方法 讲一下 Lambda 的表达式作用域&#xff08;Lambda Scopes&#xff09;。 访问局部变量 我们可以直接在 lambda 表达式中访问外部的局部变量&a…

c# bitmap压缩导致png不透明的问题解决

新建.net 6控制台项目 安装System.Drawing.Common包 代码如下 using System.Drawing; using System.Drawing.Imaging;namespace PngCompress02 {internal class Program{static void Main(string[] args){CompressPngImage("E:\Desktop\6.png", "E:\Desktop\6…

C++相关闲碎记录(14)

1、数值算法 &#xff08;1&#xff09;运算后产生结果accumulate() #include "algostuff.hpp"using namespace std;int main() {vector<int> coll;INSERT_ELEMENTS(coll, 1, 9);PRINT_ELEMENTS(coll);cout << "sum: " << accumulate(…

DPDK系列之三十九控制管理

一、基础介绍 通过前面的分析&#xff0c;对DPDK中对报文处理的过程有了一个初步的认知。从一个更高层次来看&#xff0c;传统的网络通信一般会通过上层应用、操作系统、网卡驱动和硬件四层。再往下&#xff0c;基本就不属于于计算机控制的系统了。 早期的应用&#xff0c;基本…

Python - coverage

coverage overage 是一个用于测量Python程序代码覆盖率的工具。它监视您的程序&#xff0c;注意代码的哪些部分已经执行&#xff0c;然后分析源代码&#xff0c;以确定哪些代码本可以执行&#xff0c;但没有执行。 覆盖率测量通常用于衡量测试的有效性。它可以显示代码的哪些…

整理了上百个开源中文大语言模型,涵盖模型、应用、数据集、微调、部署、评测

自ChatGPT为代表的大语言模型&#xff08;Large Language Model, LLM&#xff09;出现以后&#xff0c;由于其惊人的类通用人工智能&#xff08;AGI&#xff09;的能力&#xff0c;掀起了新一轮自然语言处理领域的研究和应用的浪潮。 尤其是以ChatGLM、LLaMA等平民玩家都能跑起…

抖音品牌力不足,如何开通抖音旗舰店?强开旗舰店全攻略来了!

随着直播的兴起&#xff0c;抖音电商在近年来的发展速度可谓是相当迅猛。越来越多的商家开始将重心投入到抖音电商。从开店、搭建直播间&#xff0c;起号&#xff0c;再到日常运营... 然而我们在第一步开店的时候&#xff0c;就遇到了不少麻烦。 1、选择开通抖音旗舰店&#x…

初识Flask

摆上中文版官方文档网站&#xff1a;https://flask.github.net.cn/quickstart.html 开启实验之路~~~~~~~~~~~~~ from flask import Flaskapp Flask(__name__) # 使用修饰器告诉flask触发函数的URL&#xff0c;绑定URL&#xff0c;后面的函数用于返回用户在浏览器上看到的内容…

Spring Cloud + Vue前后端分离-第5章 单表管理功能前后端开发

Spring Cloud Vue前后端分离-第5章 单表管理功能前后端开发 完成单表的增删改查 控台单表增删改查的前后端开发&#xff0c;重点学习前后端数据交互&#xff0c;vue ajax库axios的使用等 通用组件开发:分页、确认框、提示框、等待框等 常用的公共组件:确认框、提示框、等待…

系列九、事务

一、事务 1.1、概述 事务是一组操作的集合&#xff0c;它是一个不可分割的工作单位&#xff0c;事务会把所有的操作作为一个整体一起向系统提交或者撤销操作请求&#xff0c;即&#xff1a;这些操作要么同时成功&#xff0c;要么同时失败。 例如: 张三给李四转账1000块钱&…

使用邮件群发平台,轻松实现高效沟通的4大优势!

新媒体带动着众多线上平台的发展&#xff0c;使得流量为企业带来了可观的营收。但是&#xff0c;随着短视频市场的饱和&#xff0c;想要再次获得初始时的流量就变得越发困难。在这个时候&#xff0c;企业不妨将眼光往邮件群发这个传统的营销方式上倾斜&#xff0c;特别是出海、…

多进程间通信学习之共享内存

共享内存&#xff1a;1、在内核中创建共享内存&#xff1b;2、进程1和进程2都能够访问到&#xff0c;通过这段内存空间进行数据传递&#xff1b;3、共享内存是所有进程间通信方式中&#xff0c;效率最高&#xff0c;不需要在内核中往返进行拷贝&#xff1b;4、共享内存的内存空…

prometheus简介

什么是Prometheus Prometheus 是一个开源的服务监控系统和时序数据库&#xff0c;其提供了通用的数据模型和快捷数据采集、存储和查询接口。Prometheus的特点 多维数据模型&#xff1a;由度量名称和键值对标识的时间序列数据 时序数据&#xff0c;是在一段时间内通过重复测量&…

数据结构之---- 动态规划

数据结构之---- 动态规划 什么是动态规划&#xff1f; 动态规划是一个重要的算法范式&#xff0c;它将一个问题分解为一系列更小的子问题&#xff0c;并通过存储子问题的解来避免重复计算&#xff0c;从而大幅提升时间效率。 在本节中&#xff0c;我们从一个经典例题入手&am…