默认:从topic中指定的group上次消费的位置开始消费。
所以必须配置group.id参数从消费者组提交的偏移量开始读取分区(kafka或zookeeper中)。如果找不到分区的偏移量,auto.offset.reset将使用属性中的设置。如果是默认行为(setStartFromGroupOffsets),那么任务从检查点重启,按照重启前的offset进行消费,如果直接重启不从检查点重启并且group.id不变,程序会按照上次提交的offset的位置继续消费。如果group.id改变了,则程序按照auto.offset.reset设置的属性进行消费。但是如果程序带有状态的算子,还是建议使用检查点重启。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);Properties props = new Properties();
props.setProperty("bootstrap.servers",KAFKA_BROKER);
props.setProperty("zookeeper.connect", ZK_HOST);
props.setProperty("group.id",GROUP_ID);
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(TOPIC, new SimpleStringSchema(), props);consumer.setStartFromGroupOffsets();
注意:以下五种方式运行时优先级都比KafkaProperties中配置的auto.offset.reset优先级高。
方式一 : 指定topic, 指定partition的offset位置
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition("topic_name", 0), 11L);
offsets.put(new KafkaTopicPartition("topic_name", 1), 22L);
offsets.put(new KafkaTopicPartition("topic_name", 2), 33L);
consumer.setStartFromSpecificOffsets(offsets);
Map<KafkaTopicPartition, Long> Long参数指定的offset位置
KafkaTopicPartition构造函数有两个参数,第一个为topic名字,第二个为分区数.
- 如果使用者需要读取在提供的偏移量映射中没有指定偏移量的分区,则它将回退到setStartFromGroupOffsets()该特定分区的默认组偏移行为。
- 当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个Kafka分区的起始位置由存储在保存点或检查点中的偏移量确定。
consumer.setStartFromSpecificOffsets(offsets);
方式二: 从topic中最初的数据开始消费
consumer.setStartFromEarliest();
方式三: 从指定的时间戳开始
consumer.setStartFromTimestamp(1559801580000l);
对于每个分区,时间戳大于或等于指定时间戳的记录将用作起始位置。如果分区的最新记录早于时间戳,则只会从最新记录中读取分区。在此模式下,Kafka中的已提交偏移将被忽略,不会用作起始位置。时间戳指的是kafka中消息自带的时间戳。
方式四: 从最新的数据开始消费
consumer.setStartFromLatest();
方式五(同一默认)
参见: https://mp.weixin.qq.com/s?__biz=MzU5Mzk3MDA3Mw==&mid=2247483866&idx=2&sn=6a3b458caf5bebf0171f9fbd834b7517&chksm=fe09172cc97e9e3a590f5ea2978d078b1b46d94f86bd344173fa69c1d63790b09d2fe173bffb&token=1856795336&lang=zh_CN#rd