kafka enable.auto.commit和auto.offset.reset使用说明

enable.auto.commit

是否自动提交offset,默认是true。

auto.offset.reset

表示自动重置 offset。

auto.offset.reset 参数定义了当无法获取消费分区的位移时从何处开始消费。例如:当 Broker 端没有 offset(如第一次消费或 offset 超过7天过期)时如何初始化 offset,当收到 OFFSET_OUT_OF_RANGE 错误时如何重置 Offset。

earliest:自动重置到 partition 的最小 offset。
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest:默认为 latest,表示自动重置到 partition 的最大 offset。
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。
none:不自动进行 offset 重置,抛出 OffsetOutOfRangeException 异常。
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。

auto.offset.reset=none 使用说明

使用背景

不希望发生 offset 自动重置的情况,因为业务不允许发生大规模的重复消费。

注意:

此时消费组在第一次消费的时候就会找不到 offset 而报错,这时就需要在 catch 里手动设置 offset。

使用说明

auto.offset.reset 设置为 None 以后,可以避免 offset 自动重置的问题,但是当增加分区的时候,因为关闭了自动重置机制,客户端不知道新的分区要从哪里开始消费,则会产生异常,此时需要人工去设置消费分组 offset 并消费。

使用方式

消费者在消费时,当 consumer 设置 auto.offset.reset=none, 捕获到 NoOffsetForPartitionException 异常,在 catch 里自己设置 offset。您可以根据自身业务情况选择以下方式中的其中一种。

指定 offset,这里需要自己维护 offset,方便重试。

指定从头开始消费。

指定 offset 为最近可用的 offset。

根据时间戳获取 offset,设置 offset。

总结:

package com.tencent.tcb.operation.ckafka.plain;import com.google.common.collect.Lists;
import com.tencent.tcb.operation.ckafka.JavaKafkaConfigurer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;public class KafkaPlainConsumerDemo {public static void main(String args[]) {//设置JAAS配置文件的路径。JavaKafkaConfigurer.configureSaslPlain();//加载kafka.properties。Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();Properties props = new Properties();//设置接入点,请通过控制台获取对应Topic的接入点。props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));//接入协议。props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");//Plain方式。props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");//两次Poll之间的最大允许间隔。//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);//每次Poll的最大数量。//注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);//消息的反序列化方式。props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//当前消费实例所属的消费组,请在控制台申请之后填写。//属于同一个组的消费实例,会负载消费消息。props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));//消费offset的位置。注意!如果auto.offset.reset=none这样设置,消费组在第一次消费的时候 就会报错找不到offset,第一次这时候就需要在catch里手动设置offset。props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");//构造消费对象,也即生成一个消费实例。KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);//设置消费组订阅的Topic,可以订阅多个。//如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样。List<String> subscribedTopics = new ArrayList<String>();//如果需要订阅多个Topic,则在这里添加进去即可。//每个Topic需要先在控制台进行创建。String topicStr = kafkaProperties.getProperty("topic");String[] topics = topicStr.split(",");for (String topic : topics) {subscribedTopics.add(topic.trim());}consumer.subscribe(subscribedTopics);//循环消费消息。while (true) {try {ConsumerRecords<String, String> records = consumer.poll(1000);//必须在下次Poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。 建议开一个单独的线程池来消费消息,然后异步返回结果。for (ConsumerRecord<String, String> record : records) {System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));}} catch (NoOffsetForPartitionException e) {System.out.println(e.getMessage());//当auto.offset.reset设置为 none时,需要捕获异常 自己设置offset。您可以根据自身业务情况选择以下方式中的其中一种。//e.g 1 :指定offset, 这里需要自己维护offset,方便重试。Map<Integer, Long> partitionBeginOffsetMap = getPartitionOffset(consumer, topicStr, true);Map<Integer, Long> partitionEndOffsetMap = getPartitionOffset(consumer, topicStr, false);consumer.seek(new TopicPartition(topicStr, 0), 0);//e.g 2:从头开始消费consumer.seekToBeginning(Lists.newArrayList(new TopicPartition(topicStr, 0)));//e.g 3:指定offset为最近可用的offset。consumer.seekToEnd(Lists.newArrayList(new TopicPartition(topicStr, 0)));//e.g 4: 根据时间戳获取offset,就是根据时间戳去设置offset。例如重置到10分钟前的offsetMap<TopicPartition, Long> timestampsToSearch = new HashMap<>();Long value = Instant.now().minus(300, ChronoUnit.SECONDS).toEpochMilli();timestampsToSearch.put(new TopicPartition(topicStr, 0), value);Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(timestampsToSearch);for (Entry<TopicPartition, OffsetAndTimestamp> entry : topicPartitionOffsetAndTimestampMap.entrySet()) {TopicPartition topicPartition = entry.getKey();OffsetAndTimestamp entryValue = entry.getValue();consumer.seek(topicPartition, entryValue.offset()); // 指定offset, 这里需要自己维护offset,方便重试。}}}}/*** 获取topic的最早、最近的offset* @param consumer* @param topicStr* @param beginOrEnd true begin; false end* @return*/private static Map<Integer, Long> getPartitionOffset(KafkaConsumer<String, String> consumer, String topicStr,boolean beginOrEnd) {Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(topicStr);List<TopicPartition> tp = new ArrayList<>();Map<Integer, Long> map = new HashMap<>();partitionInfos.forEach(str -> tp.add(new TopicPartition(topicStr, str.partition())));Map<TopicPartition, Long> topicPartitionLongMap;if (beginOrEnd) {topicPartitionLongMap = consumer.beginningOffsets(tp);} else {topicPartitionLongMap = consumer.endOffsets(tp);}topicPartitionLongMap.forEach((key, beginOffset) -> {int partition = key.partition();map.put(partition, beginOffset);});return map;}}

springboot项目下

 /*** enable-auto-commit: false 由spring提交* enable-auto-commit: true  由kafka提交*//*** enable-auto-commit: true  相同组下  (换组 会重置数据)* 如果这个topic某个分区有已经提交的offset,那么无论是把auto.offset.reset=earliest还是latest,都将失效,消费者会从已经提交的offset开始消费.*//*** enable-auto-commit: fasle 相同组下 (换组 会重置数据)* 如果这个topic某个分区没有提交的offset,那么把auto.offset.reset=latest,将没消费的设置为提交消费,然后从最后开始消费* 如果这个topic某个分区没有提交的offset,那么把auto.offset.reset=earliest,从没开始消费的offset开始消费*/

非springboot项目下

enable.auto.commit falseauto.offset.reset earliest 第一次消费, 重启后消费  都会从第一条开始重新消费全部数据
enable.auto.commit trueauto.offset.reset earliest 第一次消费全部数据,重启后从提交处开始消费enable.auto.commit falseauto.offset.reset latest  第一次,重启后会从最后一条开始消费,但没有提交,换成earliest 重新消费全部数据
enable.auto.commit trueauto.offset.reset latest   第一次从最后一条开始消费,重启后从提交处开始消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/808929.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu (Linux系统) 下载安装 Qt 环境

在官网http://download.qt.io/archive/qt/ 下载安装包&#xff0c;默认linux平台下提供的安装包以run后缀结尾 也可以选择其它地址下载 Qt官网下载地址&#xff1a;https://download.qt.io&#xff1b; 国内镜像下载地址&#xff1a;https://mirrors.cloud.tencent.com/qt/ 。建…

量子城域网系列(二):量子密钥与通信系统中各层协议融合应用

写在前面。国家标准中对量子保密通信的定义&#xff1a;量子保密通信是利用QKD与其他密码技术结合形成的保密通信技术。 经过这段时间的讨论&#xff0c;我们基本上明白了量子保密通信的内涵、基础协议、技术原理等。我们知道了当前语境下的量子密钥分发网络核心是实现两点之间…

windows下pycharm中配置conda虚拟环境

目录 一&#xff1a;背景 二&#xff1a;安装conda环境 三&#xff1a;pycharm配置环境 四&#xff1a;注意问题 一&#xff1a;背景 在使用python的过程中&#xff0c;我们可能需要在一个windows环境中创建多个版本的python和安装不同的库去做一些开发任务。 使用conda&a…

文献学习-33-一个用于生成手术视频摘要的python库

VideoSum: A Python Library for Surgical Video Summarization Authors: Luis C. Garcia-Peraza-Herrera, Sebastien Ourselin, and Tom Vercauteren Source: https://arxiv.org/pdf/2303.10173.pdf 这篇文章主要关注的是如何通过视频摘要来简化和可视化手术视频&#xff0c…

【linux】set ff=unix、linux设置文件格式

文章目录 一、文件格式二、如何查看文件格式三、设置文件格式、set ffunix四、查看unix与dos的区别 一、文件格式 当我们打开sh脚本时发现有时候格式是unix(LF) ,有时候是windows(CR LF) 。如下图&#xff1a; 文件格式影响了文件中的换行符 linux中sh类型的文件一般要设置为…

文献速递:深度学习肝脏肿瘤诊断---动态对比增强 MRI 上的自动肝脏肿瘤分割使用 4D 信息:基于 3D 卷积和卷积 LSTM 的深度学习模型

Title 题目 Automatic Liver Tumor Segmentation on Dynamic Contrast Enhanced MRI Using 4D Information: Deep Learning Model Based on 3D Convolution and Convolutional LSTM 动态对比增强 MRI 上的自动肝脏肿瘤分割使用 4D 信息&#xff1a;基于 3D 卷积和卷积 LSTM …

基于主链路规划策略实现微服务升级改造

原创作者&#xff1a;田超凡&#xff08;程序员田宝宝&#xff09; 版权所有&#xff0c;引用请注明原作者&#xff0c;严禁复制转载 最近项目上架构升级改造比较忙&#xff0c;更新频率放缓&#xff0c;敬请谅解&#xff01; 主链路规划基本概念 主链路指的就是保证业务可用…

x265中量化函数neon汇编实现分析

// uint32_t quant_c(const int16_t* coef, const int32_t quantScale, int32_t* deltaU, int16_t* qCoef, int qBits, int add, int numCoeff) function x265_quant_neon mov w9, #1 //x9的低32位 1 lsl w9, w9, w4 //w9 1 << qBits…

程序员的心智与成长

程序员的心智与成长 工作思考 有效控制情绪&#xff0c;在沟通时使用适当的表情包以传达善意。无论线上还是线下&#xff0c;都应避免争吵。只有和气相处&#xff0c;我们才能推动工作的进展。在讨论具体问题之前&#xff0c;先进行一些预备性的交流。情绪应放在第一位&#…

C#利用BufferedStream缓冲功能来增加IO操作性能

BufferedStream是C#中用于提供缓冲功能的流之一&#xff0c;它可以增加IO操作的性能&#xff0c;特别是在处理大量小型数据时。以下是BufferedStream的一些重要特性和用法&#xff1a; 1. **构造函数**&#xff1a;BufferedStream有多个重载的构造函数&#xff0c;其中最常用的…

K8s拉取habor镜像

目录 在daemon.json中添加仓库地址 重新加载daemon.json并重启docker 在目标node节点添加域名 验证目标node是否能正常登录镜像仓库 创建pod资源 加载yml文件 验证 查看pod的ip与端口号 在daemon.json中添加仓库地址 此处需要在创建资源对象所在的节点进行添加 路径&a…

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之十 简单视频浮雕画效果

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之十 简单视频浮雕画效果 目录 Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之十 简单视频浮雕画效果 一、简单介绍 二、简单视频浮雕画效果实现原理 三、简单视频浮雕画效果…

HistoricActivityInstance和HistoricProcessInstance区别

1、HistoricActvityInstance和HistoricProcessInstance区别 1.act_hi_actinst表保存每个流程实例processInstance经历的所有活动&#xff0c;即走过的审批节点历程&#xff0c; //查询该"流程定义"下所有流程实例经历的所有流程活动//结果是listList<HistoricAct…

避免使用第三方工具完成电脑环境检测

0. 简介 在之前配置各种深度学习环境的时候经常需要先检测一下电脑的软硬件环境&#xff0c;其实整个过程比较重复和固定&#xff0c;所以我们是否有可能一键检测Python版本、PIP版本、Conda版本、CUDA版本、电脑系统、CPU核数、CPU频率、内存、硬盘等内容这是很多Deepper苦恼…

废品回收小程序推动回收行业的发展趋势

回收在全球都是一个重要行业&#xff0c;它为全球的环保作出了重要贡献。 随着科技的不断发展创新&#xff0c;废品回收的方式也逐渐多样&#xff0c;全新的线上回收小程序也逐渐出现在大众的生活中&#xff0c;在当下的手机时代&#xff0c;线上回收也为大众提供了更加便利的…

35-4 fastjson漏洞复现

环境准备:35-2 fastjson反序列化漏洞介绍 及漏洞环境搭建-CSDN博客 fastjson_tool.jar下载:fastjson_rce_tool: fastjson命令执行自动化利用工具, remote code execute,JNDI服务利用工具 RMI/LDAP (gitee.com) 一、攻击机kali开启nc监听6666端口(或其他端口也行,只要不…

如何使用pgvector为RDS PostgreSQL构建专属ChatBot?

背景 越来越多的企业和个人希望能够利用LLM和生成式人工智能来构建专注于其特定领域的具备AI能力的产品。目前&#xff0c;大语言模型在处理通用问题方面表现较好&#xff0c;但由于训练语料和大模型的生成限制&#xff0c;对于专业知识和时效性方面存在一些局限。在信息时代&…

TCP_NODELAY在延迟敏感的场景下适合设置

结论先行 在TCP发送报文有时比较短&#xff0c;但又对延迟比较敏感的场景&#xff0c;例如&#xff0c;应用控制信令&#xff0c;非常适合启用TCP_NODELAY套接字选项。 现象 发送者在TCP链路上连续发送两条请求&#xff0c;第一条请求立即发送出去了&#xff0c;而第二条要等…

Redis(三) String字符串

文章目录 前言常见命令SETGETMSETMGETINCRINCRBYDECRDECRBYINCRBYFLOATAPPENDGETRANGESETRANGESTRLEN命令小结 前言 Redis 的数据有很多种数据类型&#xff0c;包括字符串类型、列表类型、哈希类型、集合类型、有序集合类型等。这几种数据类型是针对于 value 来说的&#xff0…

学习java第四十天

类图中各个类的作用&#xff1a; AliasRegistry&#xff1a;定义对alias的简单增删改等操作 SimpleAliasRegistry&#xff1a;主要使用map作为alias的缓存&#xff0c;并对接口AliasRegistry进行实现 SingletonBeanRegistry&#xff1a;定义对单例的注册及获取 BeanFactory&…