40、Flink 的Apache Kafka connector(kafka sink的介绍及使用示例)-2

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
21、Flink 的table API与DataStream API 集成(完整版)
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
31、Flink的SQL Gateway介绍及示例
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
35、Flink 的 Formats 之CSV 和 JSON Format
36、Flink 的 Formats 之Parquet 和 Orc Format
41、Flink之Hive 方言介绍及详细示例
40、Flink 的Apache Kafka connector(kafka source的介绍及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介绍及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的


文章目录

  • Flink 系列文章
  • 一、Apache Kafka 连接器
    • 3、kafka sourcefunction
    • 4、kafka sink
      • 1)、使用示例
        • 1、Flink 1.13版本实现
        • 2、Flink 1.17版本实现
        • 3、说明
      • 2)、序列化器
      • 3)、容错
      • 4)、监控
    • 5、kafka producer
    • 6、kafka 连接器指标
    • 7、启用 Kerberos 身份验证
    • 8、升级到最近的连接器版本
    • 9、问题排查
      • 1)、数据丢失
      • 2)、UnknownTopicOrPartitionException
      • 3)、ProducerFencedException


本文介绍了kafka的版本功能更能换、作为sink的使用、连接器的指标、身份认证、版本升级和问题排查几个主要 方面,关于常用的功能均以可运行的示例进行展示并提供完整的验证步骤。
本专题为了便于阅读以及整体查阅分为三个部分:
40、Flink 的Apache Kafka connector(kafka source的介绍及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介绍及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版
本文依赖kafka集群能正常使用。
本文分为9个部分,即sink、producer/source(Flink版本升级)、连接器指标、身份认证、版本升级及问题排查。
本文的示例是在Flink 1.17版本中运行。

一、Apache Kafka 连接器

Flink 提供了 Apache Kafka 连接器使用精确一次(Exactly-once)的语义在 Kafka topic 中读取和写入数据。

3、kafka sourcefunction

FlinkKafkaConsumer 已被弃用并将在 Flink 1.17 中移除,请改用 KafkaSource。
1.13版本的实现参考本文开头的示例。

4、kafka sink

KafkaSink 可将数据流写入一个或多个 Kafka topic。

1)、使用示例

Kafka sink 提供了构建类来创建 KafkaSink 的实例。

以下代码片段展示了如何将字符串数据按照至少一次(at lease once)的语义保证写入 Kafka topic:

1、Flink 1.13版本实现
  • 实现代码
import java.util.Properties;
import java.util.Random;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestKafkaSinkDemo {public static void test1() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、source-主题:alan_source// 准备kafka连接参数Properties propSource = new Properties();propSource.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");// 集群地址propSource.setProperty("group.id", "flink_kafka");propSource.setProperty("auto.offset.reset", "latest");propSource.setProperty("flink.partition-discovery.interval-millis", "5000");propSource.setProperty("enable.auto.commit", "true");// 自动提交的时间间隔propSource.setProperty("auto.commit.interval.ms", "2000");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("alan_source", new SimpleStringSchema(), propSource);// 使用kafkaSourceDataStream<String> kafkaDS = env.addSource(kafkaSource);// 3、transformation-统计单词个数SingleOutputStreamOperator<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {private Random ran = new Random();@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {System.out.println("输出:" + value.f0 + "->" + value.f1);return value.f0 + "->" + value.f1;}});// 4、sink-主题alan_sinkProperties propSink = new Properties();propSink.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");propSink.setProperty("transaction.timeout.ms", "5000");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("alan_sink", new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), propSink,FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-toleranceresult.addSink(kafkaSink);// 5、executeenv.execute();}public static void main(String[] args) throws Exception {test1();}}
  • 验证
    1、创建kafka 主题 alan_source 和 alan_sink
    2、驱动程序,观察运行控制台
    3、通过命令往alan_source 写入数据,同时消费 alan_sink 主题的数据
## kafka生产数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_source
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>## kafka消费数据
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_sink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
hello->2
alan->3
alach->2
hello->3
123->1

4、应用程序控制台输出
在这里插入图片描述

2、Flink 1.17版本实现
  • 代码实现
import java.util.Properties;
import java.util.Random;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestKafkaSinkDemo {public static void test2() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("alan_nsource").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformationDataStream<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {System.out.println("输出:" + value.f0 + "->" + value.f1);return value.f0 + "->" + value.f1;}});// 4、 sinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("alan_nsink").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();result.sinkTo(kafkaSink);// 5、executeenv.execute();}public static void main(String[] args) throws Exception {
//		test1();test2();}}
  • 验证
    1、创建kafka 主题 alan_nsource 和 alan_nsink
    2、驱动程序,观察运行控制台
    3、通过命令往alan_nsource 写入数据,同时消费 alan_nsink 主题的数据
## kafka生产数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_nsource
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>## kafka消费数据
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_nsink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
alach->2
alan->3
hello->2
hello->3
123->1

4、应用程序控制台输出
在这里插入图片描述

3、说明

以下属性在构建 KafkaSink 时是必须指定的:

  • Bootstrap servers, setBootstrapServers(String)
  • 消息序列化器(Serializer), setRecordSerializer(KafkaRecordSerializationSchema)
  • 如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证,则需要使用 setTransactionalIdPrefix(String)

2)、序列化器

构建时需要提供 KafkaRecordSerializationSchema 来将输入数据转换为 Kafka 的 ProducerRecord。Flink 提供了 schema 构建器 以提供一些通用的组件,例如消息键(key)/消息体(value)序列化、topic 选择、消息分区,同样也可以通过实现对应的接口来进行更丰富的控制。

KafkaRecordSerializationSchema.builder().setTopicSelector(new TopicSelector() {@Overridepublic String apply(Object t) {//设置选择的 topic return "alan_nsink";}}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build()
  • 示例代码
import java.util.Properties;
import java.util.Random;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.TopicSelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestKafkaSinkDemo {public static void test3() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("alan_nsource").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformationDataStream<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {System.out.println("输出:" + value.f0 + "->" + value.f1);return value.f0 + "->" + value.f1;}});// 4、 sinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopicSelector(new TopicSelector() {@Overridepublic String apply(Object t) {//设置选择的 topic return "alan_nsink";}}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();result.sinkTo(kafkaSink);// 5、executeenv.execute();}public static void main(String[] args) throws Exception {test3();}}
  • 验证
    1、创建kafka 主题 alan_nsource 和 alan_nsink
    2、驱动程序,观察运行控制台
    3、通过命令往alan_nsource 写入数据,同时消费 alan_nsink 主题的数据
## kafka生产数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_nsource
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>## kafka消费数据
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_nsink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
alach->2
alan->3
hello->2
hello->3
123->1

4、应用程序控制台输出
在这里插入图片描述

其中消息体(value)序列化方法和 topic 的选择方法是必须指定的,此外也可以通过 setKafkaKeySerializer(Serializer) 或 setKafkaValueSerializer(Serializer) 来使用 Kafka 提供而非 Flink 提供的序列化器。

3)、容错

KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee)。对于 DeliveryGuarantee.AT_LEAST_ONCE 和 DeliveryGuarantee.EXACTLY_ONCE,Flink checkpoint 必须启用。

默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。

以下是对不同语义保证的解释:

  • DeliveryGuarantee.NONE 不提供任何保证:消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。
  • DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
  • DeliveryGuarantee.EXACTLY_ONCE: 该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务 不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 + 最大重启时间,否则 Kafka 对未提交事务的过期处理会导致数据丢失。

关于容错,请参考文章:9、Flink四大基石之Checkpoint容错机制详解及示例(checkpoint配置、重启策略、手动恢复checkpoint和savepoint)

4)、监控

Kafka sink 会在不同的范围(Scope)中汇报下列指标。
在这里插入图片描述

5、kafka producer

FlinkKafkaProducer 已被弃用并将在 Flink 1.15 中移除,请改用 KafkaSink。

关于Flink 1.13版本的实现,请参考上文中的示例。

6、kafka 连接器指标

Flink 的 Kafka 连接器通过 Flink 的指标系统提供一些指标来帮助分析 connector 的行为。 各个版本的 Kafka producer 和 consumer 会通过 Flink 的指标系统汇报 Kafka 内部的指标。 该 Kafka 文档列出了所有汇报的指标。

同样也可通过将 Kafka source 在该章节描述的 register.consumer.metrics,或 Kafka sink 的 register.producer.metrics 配置设置为 false 来关闭 Kafka 指标的注册。

7、启用 Kerberos 身份验证

Flink 通过 Kafka 连接器提供了一流的支持,可以对 Kerberos 配置的 Kafka 安装进行身份验证。只需在 flink-conf.yaml 中配置 Flink。
像这样为 Kafka 启用 Kerberos 身份验证:

1、通过设置以下内容配置 Kerberos 票据

  • security.kerberos.login.use-ticket-cache:默认情况下,这个值是 true,Flink 将尝试在 kinit 管理的票据缓存中使用 Kerberos 票据。注意!在 YARN 上部署的 Flink jobs 中使用 Kafka 连接器时,使用票据缓存的 Kerberos 授权将不起作用。
  • security.kerberos.login.keytab 和 security.kerberos.login.principal:要使用 Kerberos keytabs,需为这两个属性设置值。

2、将 KafkaClient 追加到 security.kerberos.login.contexts:这告诉 Flink 将配置的 Kerberos 票据提供给 Kafka 登录上下文以用于 Kafka 身份验证。

一旦启用了基于 Kerberos 的 Flink 安全性后,只需在提供的属性配置中包含以下两个设置(通过传递给内部 Kafka 客户端),即可使用 Flink Kafka Consumer 或 Producer 向 Kafk a进行身份验证:

  • 将 security.protocol 设置为 SASL_PLAINTEXT(默认为 NONE):用于与 Kafka broker 进行通信的协议。使用独立 Flink 部署时,也可以使用 SASL_SSL;请在此处查看如何为 SSL 配置 Kafka 客户端。
  • 将 sasl.kerberos.service.name 设置为 kafka(默认为 kafka):此值应与用于 Kafka broker 配置的 sasl.kerberos.service.name 相匹配。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。

有关 Kerberos 安全性 Flink 配置的更多信息,请参见这里。你也可以在这里进一步了解 Flink 如何在内部设置基于 kerberos 的安全性。

该部分由于没有环境,未做验证,内容来至于官网。将来拟计划以专栏的形式介绍该部分内容。

8、升级到最近的连接器版本

通用的升级步骤概述见 升级 Jobs 和 Flink 版本指南。对于 Kafka,你还需要遵循这些步骤:

  • 不要同时升级 Flink 和 Kafka 连接器
  • 确保你对 Consumer 设置了 group.id
  • 在 Consumer 上设置 setCommitOffsetsOnCheckpoints(true),以便读 offset 提交到 Kafka。务必在停止和恢复 savepoint 前执行此操作。你可能需要在旧的连接器版本上进行停止/重启循环来启用此设置。
  • 在 Consumer 上设置 setStartFromGroupOffsets(true),以便我们从 Kafka 获取读 offset。这只会在 Flink 状态中没有读 offset 时生效,这也是为什么下一步非要重要的原因。
  • 修改 source/sink 分配到的 uid。这会确保新的 source/sink 不会从旧的 sink/source 算子中读取状态。
  • 使用 --allow-non-restored-state 参数启动新 job,因为我们在 savepoint 中仍然有先前连接器版本的状态。

9、问题排查

如果在使用 Flink 时对 Kafka 有问题,Flink 只封装 KafkaConsumer 或 KafkaProducer,你的问题可能独立于 Flink,有时可以通过升级 Kafka broker 程序、重新配置 Kafka broker 程序或在 Flink 中重新配置 KafkaConsumer 或 KafkaProducer 来解决。

一句话,大概是kafka的问题或配置的kafka的问题,和flink关系不大。

下面列出了一些常见问题的示例。

1)、数据丢失

根据你的 Kafka 配置,即使在 Kafka 确认写入后,你仍然可能会遇到数据丢失。特别要记住在 Kafka 的配置中设置以下属性:

  • acks
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.flush.*

上述选项的默认值是很容易导致数据丢失的。请参考 Kafka 文档以获得更多的解释。

2)、UnknownTopicOrPartitionException

导致此错误的一个可能原因是正在进行新的 leader 选举,例如在重新启动 Kafka broker 之后或期间。这是一个可重试的异常,因此 Flink job 应该能够重启并恢复正常运行。也可以通过更改 producer 设置中的 retries 属性来规避。但是,这可能会导致重新排序消息,反过来可以通过将 max.in.flight.requests.per.connection 设置为 1 来避免不需要的消息。

3)、ProducerFencedException

这个错误是由于 FlinkKafkaProducer 所生成的 transactional.id 与其他应用所使用的的产生了冲突。多数情况下,由于 FlinkKafkaProducer 产生的 ID 都是以 taskName + “-” + operatorUid 为前缀的,这些产生冲突的应用也是使用了相同 Job Graph 的 Flink Job。 我们可以使用 setTransactionalIdPrefix() 方法来覆盖默认的行为,为每个不同的 Job 分配不同的 transactional.id 前缀来解决这个问题。

以上,本文介绍了kafka的版本功能更能换、作为sink的使用、连接器的指标、身份认证、版本升级和问题排查几个主要 方面,关于常用的功能均以可运行的示例进行展示并提供完整的验证步骤。
本专题为了便于阅读以及整体查阅分为三个部分:
40、Flink 的Apache Kafka connector(kafka source的介绍及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介绍及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/162192.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

geemap学习笔记012:如何搜索Earth Engine Python脚本

前言 本节主要是介绍如何查询Earth Engine中已经集成好的Python脚本案例。 1 导入库 !pip install geemap #安装geemap库 import ee import geemap2 搜索Earth Engine Python脚本 很简单&#xff0c;只需要一行代码。 geemap.ee_search()使用方法 后记 大家如果有问题需…

如何提高图片转excel的效果?(软件选择篇)

在日常的工作中&#xff0c;我们常常会遇到一些财务报表类的图片需要转换成可编辑的excel&#xff0c;但是&#xff0c;受各种条件的限制&#xff0c;常常只能通过手工录入这种原始的方式来实现&#xff0c;随着人工智能、深度学习以及网络技术的发展&#xff0c;这种原始的录入…

SpringBoot集成七牛云OSS详细介绍

&#x1f4d1;前言 本文主要SpringBoot集成七牛云OSS详细介绍的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#x1f304;每日一句&a…

【Java工具篇】Java反编译工具Bytecode Viewer

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

【C++高阶(四)】红黑树深度剖析--手撕红黑树!

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习C   &#x1f51d;&#x1f51d; 红黑树 1. 前言2. 红黑树的概念以及性质3. 红黑…

计算机网络之数据链路层

一、概述 1.1概述 物理层发出去的信号需要通过数据链路层才知道是否到达目的地&#xff1b;才知道比特流的分界线 链路(Link)&#xff1a;从一个结点到相邻结点的一段物理线路&#xff0c;中间没有任何其他交换结点数据链路(Data Link)&#xff1a;把实现通信协议的硬件和软件…

电商API接口|电商数据接入|拼多多平台根据商品ID查商品详情SKU和商品价格参数

随着科技的不断进步&#xff0c;API开发领域也逐渐呈现出蓬勃发展的势头。今天我将向大家介绍API接口&#xff0c;电商API接口具备独特的特点&#xff0c;使得数据获取变得更加高效便捷。 快速获取API数据——优化数据访问速度 传统的数据获取方式可能需要经过多个中介环节&…

华大基因认知障碍基因检测服务,助力认知障碍疾病防控

认知障碍是一种严重的神经系统疾病&#xff0c;对人类的脑健康产生了重大影响。据报告显示&#xff0c;在我国65岁以上的人群中&#xff0c;存在轻度认知障碍的患者约为3,800万&#xff0c;而中重度痴呆患者则约为1,500万&#xff0c;患病人口数量庞大。这种疾病不仅会对患者的…

免费多域名SSL证书

顾名思义&#xff0c;免费多域名SSL证书就是一种能够为多个域名或子域提供HTTPS安全保护的证书。这意味着&#xff0c;如果您有三个域名——例如example.com、example.cn和company.com&#xff0c;您可以使用一个免费的多域名SSL证书为所有这些域名提供安全保障&#xff0c;而无…

TransFusionNet:JetsonTX2下肝肿瘤和血管分割的语义和空间特征融合框架

TransFusionNet: Semantic and Spatial Features Fusion Framework for Liver Tumor and Vessel Segmentation Under JetsonTX2 TransFusionNet&#xff1a;JetsonTX2下肝肿瘤和血管分割的语义和空间特征融合框架背景贡献实验方法Transformer-Based Semantic Feature Extractio…

AI写代码 可以代替人工吗?

近年AI技术非常火热&#xff0c;有人就说&#xff0c;用AI写代码程序员不就都得下岗吗&#xff1f;对此我的回答是否定的&#xff0c;因为AI虽然已经有了编写代码的能力&#xff0c;但它现在的水平大多还仅限于根据业务需求搭建框架&#xff0c;而具体的功能实现还尚且稚嫩&…

【愚公系列】保姆级教程带你实现HarmonyOS手语猜一猜元服务

&#x1f680;前言 最近HarmonyOS NEXT大火&#xff0c;这个纯血鸿蒙吸引力了大家的关注。虽然现在还没面向个人开发者开放&#xff0c;但我们可以基于最新的API9及开发工具来尝试开发鸿蒙新的应用形态——元服务。来体验下未来在HarmonyOS NEXT上实现的应用开发。 HarmonyOS…

什么是高防IP?有什么优势?怎么选择高防IP?

在当今的互联网环境中&#xff0c;分布式拒绝服务&#xff08;DDoS&#xff09;攻击已经成为一种常见的安全威胁。这种攻击通过向目标服务器发送大量的无效流量&#xff0c;使其无法处理正常的请求&#xff0c;从而达到迫使服务中断的目的。作为一个用户&#xff0c;你是否曾遇…

QGIS文章五——对遥感影像进行土地类型分类—监督分类(dzetsaka : classification tool)...

dzetsaka classification tool是QGIS的强大分类插件&#xff0c;目前主要提供了高斯混合模型分类器、Random Forest、KNN和SVM四种分类器模型&#xff0c;相比于SCP(Semi-Automatic Classification)&#xff0c;他的一个特点就是功能专一&#xff0c;操作简单。 从十一月开始一…

Linux基础命令3

移动&#xff0c;剪切文件 普通文件的移动剪切 现在在这儿 上图中&#xff0c;mv y.x ./tmp的意思&#xff0c;就是将当前路径下的y.x文件进行剪切&#xff0c;然后放到路径为当前路径下的tmp目录文件夹里面 操作完成后可以cd tmp&#xff0c;ls看到y.x文件已经在里面了 现在…

京东内部员工,爆料工资与公积金收入!

精彩回顾&#xff1a;进了央企&#xff0c;拿了户口&#xff0c;却感觉被困住了。 每个企业都有它的一套规则&#xff0c;哪些人适合加薪&#xff0c;哪些人适合拿奖金&#xff0c;哪些人适合给股票期权等等。但是说实话&#xff0c;很多人都只能拿底薪&#xff0c;这些福利啥的…

数据挖掘 K近邻

什么时候用K近邻&#xff1f; 交叉验证的时候。最常见的交叉验证方法是K折交叉验证&#xff0c;其中数据集被均匀分成K个子集&#xff0c;称为折&#xff0c;然后执行K次训练和测试&#xff0c;每次选择不同的折作为测试集&#xff0c;其余的作为训练集。最后&#xff0c;将K次…

JavaScript编程基础 – 对象

JavaScript编程基础 – 对象 JavaScript Programming Essentials – Object 本文简要介绍JavaScript面向对象编程&#xff0c;如何实现其中的对象以及实例演示&#xff0c;希望对大家学习JavaScript有所帮助。 1. 面向对象编程特点 面向对象编程(Object-Oriented Programmi…

浅谈JDK动态代理(上)

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 到目前为止&#xff0c…

力扣OJ题讲解——循环队列

今天我们一起来做一道关于队列的OJ题目&#xff0c;这是力扣题目622题&#xff0c;点击题目链接可以直接跳转&#xff0c;https://leetcode.cn/problems/design-circular-queue/ 首先&#xff0c;我们看到要求&#xff0c;需要我们实现哪些功能&#xff1f; 我们需要设置队列长…