Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
21、Flink 的table API与DataStream API 集成(完整版)
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
31、Flink的SQL Gateway介绍及示例
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
35、Flink 的 Formats 之CSV 和 JSON Format
36、Flink 的 Formats 之Parquet 和 Orc Format
41、Flink之Hive 方言介绍及详细示例
40、Flink 的Apache Kafka connector(kafka source的介绍及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介绍及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
文章目录
- Flink 系列文章
- 一、Apache Kafka 连接器
- 3、kafka sourcefunction
- 4、kafka sink
- 1)、使用示例
- 1、Flink 1.13版本实现
- 2、Flink 1.17版本实现
- 3、说明
- 2)、序列化器
- 3)、容错
- 4)、监控
- 5、kafka producer
- 6、kafka 连接器指标
- 7、启用 Kerberos 身份验证
- 8、升级到最近的连接器版本
- 9、问题排查
- 1)、数据丢失
- 2)、UnknownTopicOrPartitionException
- 3)、ProducerFencedException
本文介绍了kafka的版本功能更能换、作为sink的使用、连接器的指标、身份认证、版本升级和问题排查几个主要 方面,关于常用的功能均以可运行的示例进行展示并提供完整的验证步骤。
本专题为了便于阅读以及整体查阅分为三个部分:
40、Flink 的Apache Kafka connector(kafka source的介绍及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介绍及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版
本文依赖kafka集群能正常使用。
本文分为9个部分,即sink、producer/source(Flink版本升级)、连接器指标、身份认证、版本升级及问题排查。
本文的示例是在Flink 1.17版本中运行。
一、Apache Kafka 连接器
Flink 提供了 Apache Kafka 连接器使用精确一次(Exactly-once)的语义在 Kafka topic 中读取和写入数据。
3、kafka sourcefunction
FlinkKafkaConsumer 已被弃用并将在 Flink 1.17 中移除,请改用 KafkaSource。
1.13版本的实现参考本文开头的示例。
4、kafka sink
KafkaSink 可将数据流写入一个或多个 Kafka topic。
1)、使用示例
Kafka sink 提供了构建类来创建 KafkaSink 的实例。
以下代码片段展示了如何将字符串数据按照至少一次(at lease once)的语义保证写入 Kafka topic:
1、Flink 1.13版本实现
- 实现代码
import java.util.Properties;
import java.util.Random;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestKafkaSinkDemo {public static void test1() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、source-主题:alan_source// 准备kafka连接参数Properties propSource = new Properties();propSource.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");// 集群地址propSource.setProperty("group.id", "flink_kafka");propSource.setProperty("auto.offset.reset", "latest");propSource.setProperty("flink.partition-discovery.interval-millis", "5000");propSource.setProperty("enable.auto.commit", "true");// 自动提交的时间间隔propSource.setProperty("auto.commit.interval.ms", "2000");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("alan_source", new SimpleStringSchema(), propSource);// 使用kafkaSourceDataStream<String> kafkaDS = env.addSource(kafkaSource);// 3、transformation-统计单词个数SingleOutputStreamOperator<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {private Random ran = new Random();@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {System.out.println("输出:" + value.f0 + "->" + value.f1);return value.f0 + "->" + value.f1;}});// 4、sink-主题alan_sinkProperties propSink = new Properties();propSink.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");propSink.setProperty("transaction.timeout.ms", "5000");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("alan_sink", new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), propSink,FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-toleranceresult.addSink(kafkaSink);// 5、executeenv.execute();}public static void main(String[] args) throws Exception {test1();}}
- 验证
1、创建kafka 主题 alan_source 和 alan_sink
2、驱动程序,观察运行控制台
3、通过命令往alan_source 写入数据,同时消费 alan_sink 主题的数据
## kafka生产数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_source
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>## kafka消费数据
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_sink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
hello->2
alan->3
alach->2
hello->3
123->1
4、应用程序控制台输出
2、Flink 1.17版本实现
- 代码实现
import java.util.Properties;
import java.util.Random;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestKafkaSinkDemo {public static void test2() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("alan_nsource").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformationDataStream<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {System.out.println("输出:" + value.f0 + "->" + value.f1);return value.f0 + "->" + value.f1;}});// 4、 sinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("alan_nsink").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();result.sinkTo(kafkaSink);// 5、executeenv.execute();}public static void main(String[] args) throws Exception {
// test1();test2();}}
- 验证
1、创建kafka 主题 alan_nsource 和 alan_nsink
2、驱动程序,观察运行控制台
3、通过命令往alan_nsource 写入数据,同时消费 alan_nsink 主题的数据
## kafka生产数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_nsource
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>## kafka消费数据
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_nsink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
alach->2
alan->3
hello->2
hello->3
123->1
4、应用程序控制台输出
3、说明
以下属性在构建 KafkaSink 时是必须指定的:
- Bootstrap servers, setBootstrapServers(String)
- 消息序列化器(Serializer), setRecordSerializer(KafkaRecordSerializationSchema)
- 如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证,则需要使用 setTransactionalIdPrefix(String)
2)、序列化器
构建时需要提供 KafkaRecordSerializationSchema 来将输入数据转换为 Kafka 的 ProducerRecord。Flink 提供了 schema 构建器 以提供一些通用的组件,例如消息键(key)/消息体(value)序列化、topic 选择、消息分区,同样也可以通过实现对应的接口来进行更丰富的控制。
KafkaRecordSerializationSchema.builder().setTopicSelector(new TopicSelector() {@Overridepublic String apply(Object t) {//设置选择的 topic return "alan_nsink";}}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build()
- 示例代码
import java.util.Properties;
import java.util.Random;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.TopicSelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestKafkaSinkDemo {public static void test3() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("alan_nsource").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformationDataStream<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {System.out.println("输出:" + value.f0 + "->" + value.f1);return value.f0 + "->" + value.f1;}});// 4、 sinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopicSelector(new TopicSelector() {@Overridepublic String apply(Object t) {//设置选择的 topic return "alan_nsink";}}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();result.sinkTo(kafkaSink);// 5、executeenv.execute();}public static void main(String[] args) throws Exception {test3();}}
- 验证
1、创建kafka 主题 alan_nsource 和 alan_nsink
2、驱动程序,观察运行控制台
3、通过命令往alan_nsource 写入数据,同时消费 alan_nsink 主题的数据
## kafka生产数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_nsource
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>## kafka消费数据
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_nsink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
alach->2
alan->3
hello->2
hello->3
123->1
4、应用程序控制台输出
其中消息体(value)序列化方法和 topic 的选择方法是必须指定的,此外也可以通过 setKafkaKeySerializer(Serializer) 或 setKafkaValueSerializer(Serializer) 来使用 Kafka 提供而非 Flink 提供的序列化器。
3)、容错
KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee)。对于 DeliveryGuarantee.AT_LEAST_ONCE 和 DeliveryGuarantee.EXACTLY_ONCE,Flink checkpoint 必须启用。
默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。
以下是对不同语义保证的解释:
- DeliveryGuarantee.NONE 不提供任何保证:消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。
- DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
- DeliveryGuarantee.EXACTLY_ONCE: 该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务 不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 + 最大重启时间,否则 Kafka 对未提交事务的过期处理会导致数据丢失。
关于容错,请参考文章:9、Flink四大基石之Checkpoint容错机制详解及示例(checkpoint配置、重启策略、手动恢复checkpoint和savepoint)
4)、监控
Kafka sink 会在不同的范围(Scope)中汇报下列指标。
5、kafka producer
FlinkKafkaProducer 已被弃用并将在 Flink 1.15 中移除,请改用 KafkaSink。
关于Flink 1.13版本的实现,请参考上文中的示例。
6、kafka 连接器指标
Flink 的 Kafka 连接器通过 Flink 的指标系统提供一些指标来帮助分析 connector 的行为。 各个版本的 Kafka producer 和 consumer 会通过 Flink 的指标系统汇报 Kafka 内部的指标。 该 Kafka 文档列出了所有汇报的指标。
同样也可通过将 Kafka source 在该章节描述的 register.consumer.metrics,或 Kafka sink 的 register.producer.metrics 配置设置为 false 来关闭 Kafka 指标的注册。
7、启用 Kerberos 身份验证
Flink 通过 Kafka 连接器提供了一流的支持,可以对 Kerberos 配置的 Kafka 安装进行身份验证。只需在 flink-conf.yaml 中配置 Flink。
像这样为 Kafka 启用 Kerberos 身份验证:
1、通过设置以下内容配置 Kerberos 票据
- security.kerberos.login.use-ticket-cache:默认情况下,这个值是 true,Flink 将尝试在 kinit 管理的票据缓存中使用 Kerberos 票据。注意!在 YARN 上部署的 Flink jobs 中使用 Kafka 连接器时,使用票据缓存的 Kerberos 授权将不起作用。
- security.kerberos.login.keytab 和 security.kerberos.login.principal:要使用 Kerberos keytabs,需为这两个属性设置值。
2、将 KafkaClient 追加到 security.kerberos.login.contexts:这告诉 Flink 将配置的 Kerberos 票据提供给 Kafka 登录上下文以用于 Kafka 身份验证。
一旦启用了基于 Kerberos 的 Flink 安全性后,只需在提供的属性配置中包含以下两个设置(通过传递给内部 Kafka 客户端),即可使用 Flink Kafka Consumer 或 Producer 向 Kafk a进行身份验证:
- 将 security.protocol 设置为 SASL_PLAINTEXT(默认为 NONE):用于与 Kafka broker 进行通信的协议。使用独立 Flink 部署时,也可以使用 SASL_SSL;请在此处查看如何为 SSL 配置 Kafka 客户端。
- 将 sasl.kerberos.service.name 设置为 kafka(默认为 kafka):此值应与用于 Kafka broker 配置的 sasl.kerberos.service.name 相匹配。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。
有关 Kerberos 安全性 Flink 配置的更多信息,请参见这里。你也可以在这里进一步了解 Flink 如何在内部设置基于 kerberos 的安全性。
该部分由于没有环境,未做验证,内容来至于官网。将来拟计划以专栏的形式介绍该部分内容。
8、升级到最近的连接器版本
通用的升级步骤概述见 升级 Jobs 和 Flink 版本指南。对于 Kafka,你还需要遵循这些步骤:
- 不要同时升级 Flink 和 Kafka 连接器
- 确保你对 Consumer 设置了 group.id
- 在 Consumer 上设置 setCommitOffsetsOnCheckpoints(true),以便读 offset 提交到 Kafka。务必在停止和恢复 savepoint 前执行此操作。你可能需要在旧的连接器版本上进行停止/重启循环来启用此设置。
- 在 Consumer 上设置 setStartFromGroupOffsets(true),以便我们从 Kafka 获取读 offset。这只会在 Flink 状态中没有读 offset 时生效,这也是为什么下一步非要重要的原因。
- 修改 source/sink 分配到的 uid。这会确保新的 source/sink 不会从旧的 sink/source 算子中读取状态。
- 使用 --allow-non-restored-state 参数启动新 job,因为我们在 savepoint 中仍然有先前连接器版本的状态。
9、问题排查
如果在使用 Flink 时对 Kafka 有问题,Flink 只封装 KafkaConsumer 或 KafkaProducer,你的问题可能独立于 Flink,有时可以通过升级 Kafka broker 程序、重新配置 Kafka broker 程序或在 Flink 中重新配置 KafkaConsumer 或 KafkaProducer 来解决。
一句话,大概是kafka的问题或配置的kafka的问题,和flink关系不大。
下面列出了一些常见问题的示例。
1)、数据丢失
根据你的 Kafka 配置,即使在 Kafka 确认写入后,你仍然可能会遇到数据丢失。特别要记住在 Kafka 的配置中设置以下属性:
- acks
- log.flush.interval.messages
- log.flush.interval.ms
- log.flush.*
上述选项的默认值是很容易导致数据丢失的。请参考 Kafka 文档以获得更多的解释。
2)、UnknownTopicOrPartitionException
导致此错误的一个可能原因是正在进行新的 leader 选举,例如在重新启动 Kafka broker 之后或期间。这是一个可重试的异常,因此 Flink job 应该能够重启并恢复正常运行。也可以通过更改 producer 设置中的 retries 属性来规避。但是,这可能会导致重新排序消息,反过来可以通过将 max.in.flight.requests.per.connection 设置为 1 来避免不需要的消息。
3)、ProducerFencedException
这个错误是由于 FlinkKafkaProducer 所生成的 transactional.id 与其他应用所使用的的产生了冲突。多数情况下,由于 FlinkKafkaProducer 产生的 ID 都是以 taskName + “-” + operatorUid 为前缀的,这些产生冲突的应用也是使用了相同 Job Graph 的 Flink Job。 我们可以使用 setTransactionalIdPrefix() 方法来覆盖默认的行为,为每个不同的 Job 分配不同的 transactional.id 前缀来解决这个问题。
以上,本文介绍了kafka的版本功能更能换、作为sink的使用、连接器的指标、身份认证、版本升级和问题排查几个主要 方面,关于常用的功能均以可运行的示例进行展示并提供完整的验证步骤。
本专题为了便于阅读以及整体查阅分为三个部分:
40、Flink 的Apache Kafka connector(kafka source的介绍及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介绍及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版