指定开始_Flink-Kafka指定offset的五种方式

默认:从topic中指定的group上次消费的位置开始消费。

所以必须配置group.id参数从消费者组提交的偏移量开始读取分区(kafka或zookeeper中)。如果找不到分区的偏移量,auto.offset.reset将使用属性中的设置。如果是默认行为(setStartFromGroupOffsets),那么任务从检查点重启,按照重启前的offset进行消费,如果直接重启不从检查点重启并且group.id不变,程序会按照上次提交的offset的位置继续消费。如果group.id改变了,则程序按照auto.offset.reset设置的属性进行消费。但是如果程序带有状态的算子,还是建议使用检查点重启。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);Properties props = new Properties();
props.setProperty("bootstrap.servers",KAFKA_BROKER);
props.setProperty("zookeeper.connect", ZK_HOST);
props.setProperty("group.id",GROUP_ID);
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(TOPIC, new SimpleStringSchema(), props);consumer.setStartFromGroupOffsets();

注意:以下五种方式运行时优先级都比KafkaProperties中配置的auto.offset.reset优先级高。

方式一 : 指定topic, 指定partition的offset位置

Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition("topic_name", 0), 11L);
offsets.put(new KafkaTopicPartition("topic_name", 1), 22L);
offsets.put(new KafkaTopicPartition("topic_name", 2), 33L);
consumer.setStartFromSpecificOffsets(offsets);

Map<KafkaTopicPartition, Long> Long参数指定的offset位置

KafkaTopicPartition构造函数有两个参数,第一个为topic名字,第二个为分区数.

  • 如果使用者需要读取在提供的偏移量映射中没有指定偏移量的分区,则它将回退到setStartFromGroupOffsets()该特定分区的默认组偏移行为。
  • 当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个Kafka分区的起始位置由存储在保存点或检查点中的偏移量确定。

consumer.setStartFromSpecificOffsets(offsets);

方式二: 从topic中最初的数据开始消费

consumer.setStartFromEarliest();

方式三: 从指定的时间戳开始

consumer.setStartFromTimestamp(1559801580000l);

对于每个分区,时间戳大于或等于指定时间戳的记录将用作起始位置。如果分区的最新记录早于时间戳,则只会从最新记录中读取分区。在此模式下,Kafka中的已提交偏移将被忽略,不会用作起始位置。时间戳指的是kafka中消息自带的时间戳。

方式四: 从最新的数据开始消费

consumer.setStartFromLatest();


方式五(同一默认)

参见: https://mp.weixin.qq.com/s?__biz=MzU5Mzk3MDA3Mw==&mid=2247483866&idx=2&sn=6a3b458caf5bebf0171f9fbd834b7517&chksm=fe09172cc97e9e3a590f5ea2978d078b1b46d94f86bd344173fa69c1d63790b09d2fe173bffb&token=1856795336&lang=zh_CN#rd

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/416511.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

谈谈java中遍历Map的几种方法

java中的map遍历有多种方法&#xff0c;从最早的Iterator&#xff0c;到java5支持的foreach,再到java8 Lambda&#xff0c;让我们一起来看下具体的用法以及各自的优缺点 先初始化一个map public class TestMap {public static Map<Integer, Integer> map new HashMap<…

自动打包_全自动打包机行业发展如何?全自动打包机行业发展现状分析

全自动打包机行业发展如何&#xff1f;全自动打包机行业发展现状分析随着经济的快速发展&#xff0c;现在很多产品的生产过程都变得简单起来&#xff0c;更新换代也非常快&#xff0c;这不仅促进了企业的发展&#xff0c;也提高了人们生活质量。全自动打包机作为企业生产中常见…

小可爱

转载于:https://www.cnblogs.com/lrf9606/p/7077434.html

13新功能_再聊聊灵感盒 -Marginnote 3.6.12/13新功能

我是夜雨&#xff0c;水群最多的一类人本文主要BB了我对灵感盒的理解Marginnote 3.6.12/13个人之前对灵感盒的理解在此强调开发者的一句话不要对灵感盒做太多高大上的引申灵感盒只不过是新的脑图结构关于灵感盒的延伸Zattelkasten/slip box/卡片盒笔记法该内容很早之前就在Marg…

Git之第三方托管oschina

一.git 简介 1.Git是一款免费、开源的分布式版本控制系统&#xff0c;用于敏捷高效地处理任何或小或大的项目。2.Git是一个开源的分布式版本控制系统&#xff0c;用以有效、高速的处理从很小到非常大的项目版本管理。3.Git 是由“Linux之父” Linus Torvalds 创建的。因为他发现…

局域网抢答器_基于童芯派的抢答器V1.0

[童心制物Makeblock]的新产品"童芯派"发布已两月有余&#xff0c;刚一发布就第一时间入手三个含扩展板的套装&#xff0c;Makeblock的产品我还是很认可的&#xff0c;从mbot&#xff0c;ranger到程小奔&#xff0c;从神经元&#xff0c;光环版到童芯派&#xff0c;东…

Spring-boot(一)

1.1 spring介绍 spring Boot使开发独立的&#xff0c;产品级别的基于Spring的应用变得非常简单&#xff0c;你只需"just run"。 我们为Spring平台及第三方库提供开箱即用的设置&#xff0c;这样你就可以有条不紊地开始。多数Spring Boot应用需要很少的Spring配置。 你…

下拉框_教你封装 Element Tree 树状下拉框

在日常项目开发中&#xff0c;树状下拉框的需求还是比较常见的&#xff0c;但是element并没有这种组件以供使用。在这里&#xff0c;小编就基于element如何封装一个树状下拉框做个详细的介绍。通过这篇文章&#xff0c;你可以了解学习到一个树状下拉框组件是如何一步一步封装成…

java异常处理,需要考虑的流程

异常处理&#xff0c;我们需要主意的事儿 问3个问题&#xff1a; 什么出了错&#xff1f;——异常类型 在哪出的错&#xff1f;——堆栈跟踪 为什么出错&#xff1f;——异常信息 把上面的3个问题回答好&#xff0c;在异常抛出后能快速对问题进行 定性、定位、定义。 要想让我们…

字体单独设置样式_Glyphs 官方教程 | 字体命名

​​字体名称是很重要的&#xff0c;它决定了字体菜单中的分组和顺序&#xff0c;而这直接影响你的字体将如何呈现给用户。在一款字体中&#xff0c;字体名称被存储在六个不同的地方&#xff0c;这一点已经相当困难&#xff1b;或者实际上还会有更多的地方&#xff0c;这就更复…

protractor端到端测试简介

安装依赖 protractornpm install -g protractor karma-jasminenpm install --save-dev karma-jasmine javaJDK http://blog.csdn.net/bingiser/article/details/53375282webdriver-managerwebdriver-manager update 文件 test.js 测试文件protractor_conf.js protractor配置文件…

arm linux 开机电路_ARM Linux启动过程分析

摘要&#xff1a;嵌入式Linux的可移植性使得我们可以在各种电子产品上看到它的身影。对于不同体系结构的处理器来说Linux的启动过程也有所不同。本文以S3C2410 ARM处理器为例&#xff0c;详细分析了系统上电后bootloader的执行流程及ARM Linux的启动过程。关键词&#xff1a;AR…