记录kafka-flink-kafka的end-to-end的exactly-once语义
步骤
- 开启checkpoint、stateBackend的设置和checkpoint配置
- 设置kafka source的配置
- 读取kafka source message
- 随意的transformation;并打印结果
- kafka sink端的配置
- 输出到kafka sink端
- 执行
代码
package com.javaye.demo.exactly;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class ExactlyOnce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(1000L);if (SystemUtils.IS_OS_WINDOWS) {env.setStateBackend(new FsStateBackend("file:///D:/ckp"));} else {env.setStateBackend(new FsStateBackend("hdfs://only:9870/flink-checkpoints"));}CheckpointConfig config = env.getCheckpointConfig();
config.setMinPauseBetweenCheckpoints(500L);
config.setTolerableCheckpointFailureNumber(5);
config.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setCheckpointTimeout(60 * 1000);
config.setMaxConcurrentCheckpoints(1);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));String kafkaServer = "only:9092";String sourceTopic = "flink_kafka_source";String groupId = "flink_kafka_source_exactly_once";String clientIdPrefix = "flink_exactly_once";Properties kafkaSourceProp = new Properties();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(kafkaServer).setTopics(sourceTopic).setGroupId(groupId).setClientIdPrefix(clientIdPrefix).setStartingOffsets(OffsetsInitializer.latest()) .setProperty("partition.discovery.interval.ms", "50000") .setProperty("auto.offset.reset", "latest").setValueOnlyDeserializer(new SimpleStringSchema())
.setProperties(kafkaSourceProp).build();DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "flink_kafka_exactly_once", TypeInformation.of(String.class));SingleOutputStreamOperator<String> flatMapDS = kafkaDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {Random random = new Random();int i = random.nextInt(5);if (i > 3) {System.out.println("模拟出现bug...");throw new RuntimeException("模拟出现bug...");}System.out.println(word + "===" + i);out.collect(word + "===" + i);}}});flatMapDS.print();Properties kafkaSinkProp = new Properties();kafkaSinkProp.setProperty("transaction.timeout.ms", 1000 * 5 + ""); KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers(kafkaServer).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("flink_kafka_sink").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setKafkaProducerConfig(kafkaSinkProp).build();flatMapDS.sinkTo(kafkaSink);env.execute(ExactlyOnce.class.getName());}
}