Flink Kafka[输入/输出] Connector

本章重点介绍生产环境中最常用到的Flink kafka connector。使用Flink的同学,一定会很熟悉kafka,它是一个分布式的、分区的、多副本的、 支持高吞吐的、发布订阅消息系统。生产环境环境中也经常会跟kafka进行一些数据的交换,比如利用kafka consumer读取数据,然后进行一系列的处理之后,再将结果写出到kafka中。这里会主要分两个部分进行介绍,一是Flink kafka Consumer,一个是Flink kafka Producer

Flink 输入输出至 Kafka案例

首先看一个例子来串联下Flink kafka connector。代码逻辑里主要是从 kafka里读数据,然后做简单的处理,再写回到kafka中。首先需要引入 flink-kafka相关的pom.xml依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.0</version>
</dependency>

分别从如何构造一个Source sinkFunctionFlink提供了现成的构造FlinkKafkaConsumerProducer的接口,可以直接使用。这里需要注意,因为kafka有多个版本,多个版本之间的接口协议会不同。Flink针对不同版本的kafka有相应的版本的ConsumerProducer。例如:针对 08091011版本,Flink对应的consumer分别是FlinkKafkaConsumer 0809010011producer也是。

 package com.zzx.flink;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import scala.Tuple2;
import scala.tools.nsc.transform.patmat.Logic;import java.util.Properties;/*** @description: Flink 从kafka 中读取数据并写入kafka* @author: zzx* @createDate: 2020/7/22* @version: 1.0*/
public class FlinkKafkaExample {public static void main(String[] args) throws Exception{//ParameterTool 从参数中读取数据final ParameterTool params = ParameterTool.fromArgs(args);//设置执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使参数在web界面中可用env.getConfig().setGlobalJobParameters(params);/**  TimeCharacteristic 中包含三种时间类型* @PublicEvolving* public enum TimeCharacteristic {* ​    //以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间*     ProcessingTime,* ​    //以数据进入flink streaming data flow的时间为准*     IngestionTime,* ​    //以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段*     EventTime* }*/env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);/*** CheckpointingMode:    EXACTLY_ONCE(执行一次)  AT_LEAST_ONCE(至少一次)*/env.enableCheckpointing(60*1000, CheckpointingMode.EXACTLY_ONCE);//------------------------------------------source start -----------------------------------String sourceTopic = "sensor";String bootstrapServers = "hadoop1:9092";// kafkaConsumer 需要的配置参数Properties props = new Properties();// 定义kakfa 服务的地址,不需要将所有broker指定上props.put("bootstrap.servers", bootstrapServers);// 制定consumer groupprops.put("group.id", "test");// 是否自动确认offsetprops.put("enable.auto.commit", "true");// 自动确认offset的时间间隔props.put("auto.commit.interval.ms", "1000");// key的序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// value的序列化类props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//从kafka读取数据,需要实现 SourceFunction 他给我们提供了一个FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<String>(sourceTopic, new SimpleStringSchema(), props);//------------------------------------------source end -----------------------------------------//------------------------------------------sink start -----------------------------------String sinkTopic = "topic";Properties properties = new Properties();properties.put("bootstrap.servers", bootstrapServers);properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<String>(sinkTopic, new SimpleStringSchema(), properties);//------------------------------------------sink end --------------------------------------//FlinkKafkaConsumer011 继承自 RichParallelSourceFunctionenv.addSource(consumer).map(new MapFunction<String, Tuple2<Long,String>>(){@Overridepublic Tuple2<Long, String> map(String s) throws Exception {return new Tuple2<>(1L,s);}}).filter(k -> k != null).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Long, String>>(Time.seconds(5)) {@Overridepublic long extractTimestamp(Tuple2<Long, String> element) {return element._1;}}).map(k ->k.toString()).addSink(producer);//执行env.execute("FlinkKafkaExample");}
}

如下创建代码中涉及的"sensor" Topic

[root@hadoop1 kafka_2.11-2.2.2]# bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --topic sensor --replication-factor 2 --partitions 4

Flink kafka Consumer

反序列化数据: 因为kafka中数据都是以二进制byte形式存储的。读到Flink系统中之后,需要将二进制数据转化为具体的javascala对象。具体需要实现一个schema类定义如何序列化和反序列数据。反序列化时需要实现DeserializationSchema
口,并重写deserialize(byte[] message)函数,如果是反序列化kafkakv的数据时,需要实现KeyedDeserializationSchema接口,并重写 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)函数。

另外Flink中也提供了一些常用的序列化反序列化的schema类。例如,SimpleStringSchema,按字符串方式进行序列化、反序列化。TypeInformationSerializationSchema,它可根据FlinkTypeInformation信息来推断出需要选择的schemaJsonDeserializationSchema使用 jackson反序列化 json格式消息,并返回ObjectNode,可以使用get(“property”)方法来访问相应字段。
[点击并拖拽以移动] ​

消费起始位置设置

如何设置作业消费kafka起始位置的数据,这一部分Flink也提供了非常好的封装。在构造好的FlinkKafkaConsumer类后面调用如下相应函数,设置合适的起始位置。
【1】setStartFromGroupOffsets,也是默认的策略,从group offset位置读取数据,group offset指的是kafka broker端记录的某个group的最后一次的消费位置。但是kafka broker端没有该group信息,会根据kafka的参数auto.offset.reset的设置来决定从哪个位置开始消费。
setStartFromEarliest,从kafka最早的位置开始读取。
setStartFromLatest,从kafka最新的位置开始读取。
setStartFromTimestamp(long),从时间戳大于或等于指定时间戳的位置开始读取。Kafka时间戳,是指kafka为每条消息增加另一个时戳。该时戳可以表示消息在proudcer端生成时的时间、或进入到kafka broker时的时间。
setStartFromSpecificOffsets,从指定分区的offset位置开始读取,如指定的offsets中不存某个分区,该分区从group offset位置开始读取。此时需要用户给定一个具体的分区、offset的集合。

一些具体的使用方法可以参考下图。需要注意的是,因为Flink框架有容错机制,如果作业故障,如果作业开启checkpoint,会从上一次 checkpoint状态开始恢复。或者在停止作业的时候主动做savepoint,启动作业时从savepoint开始恢复。这两种情况下恢复作业时,作业消费起始位置是从之前保存的状态中恢复,与上面提到跟kafka这些单独的配置无关。
[点击并拖拽以移动] ​

topic 和 partition 动态发现

实际的生产环境中可能有这样一些需求:
场景一,有一个Flink作业需要将五份数据聚合到一起,五份数据对应五个kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的topic
场景二,作业从一个固定的kafka topic读数据,开始该topic10partition,但随着业务的增长数据量变大,需要对kafka partition个数进行扩容,由10个扩容到20。该情况下如何在不重启作业情况下动态感知新扩容的partition
针对上面的两种场景,首先需要在构建FlinkKafkaConsumer时的properties中设置flink.partition-discovery.interval-millis参数为非负值,表示开启动态发现的开关,以及设置的时间间隔。此时FlinkKafkaConsumer内部会启动一个单独的线程定期去kafka获取最新的meta信息。针对场景一,还需在构建FlinkKafkaConsumer时,topic的描述可以传一个正则表达式(如下图所示)描述的pattern。每次获取最新kafka meta时获取正则匹配的最新topic列表。针对场景二,设置前面的动态发现参数,在定期获取kafka最新meta信息时会匹配新的partition。为了保证数据的正确性,新发现的partition从最早的位置开始读取。
[点击并拖拽以移动] ​

commit offset 方式

Flink kafka consumer commit offset方式需要区分是否开启了checkpoint。如果checkpoint关闭,commit offset要依赖于kafka客户端的auto commit。 需设置enable.auto.commitauto.commit.interval.ms参数到consumer properties,就会按固定的时间间隔定期auto commit offsetkafka如果开启checkpoint,这个时候作业消费的offsetFlink会在state中自己管理和容错。此时提交offsetkafka,一般都是作为外部进度的监控,想实时知道作业消费的位置和lag情况。此时需要setCommitOffsetsOnCheckpointstrue来设置当checkpoint成功时提交offsetkafka。此时commit offset的间隔就取决于checkpoint的间隔,所以此时从kafka一侧看到的lag可能并非完全实时,如果checkpoint间隔比较长lag曲线可能会是一个锯齿状。
[点击并拖拽以移动] ​

Timestamp Extraction/Watermark 生成

我们知道当Flink作业内使用EventTime属性时,需要指定从消息中提取时间戳和生成水位的函数。FlinkKakfaConsumer构造的source后直接调用assignTimestampsAndWatermarks函数设置水位生成器的好处是此时是每个partition一个watermark assigner,如下图。source生成的时戳为多个partition时戳对齐后的最小时戳。此时在一个source读取多个partition,并且partition之间数据时戳有一定差距的情况下,因为在 sourcewatermarkpartition级别有对齐,不会导致数据读取较慢partition数据丢失。
[点击并拖拽以移动] ​

Flink kafka Producer

【1】Producer分区: 使用FlinkKafkaProducerkafka中写数据时,如果不单独设置partition策略,会默认使用FlinkFixedPartitioner,该 partitioner分区的方式是task所在的并发idtopicpartition数取余:parallelInstanceId % partitions.length
○ 此时如果sink4paritition1,则4task往同一个partition中写数据。但当sink task < partition个数时会有部分partition没有数据写入,例如sink task2partition总数为4,则后面两个partition将没有数据写入。
○ 如果构建FlinkKafkaProducer时,partition设置为null,此时会使用kafka producer默认分区方式,非key写入的情况下,使用round-robin的方式进行分区,每个task都会轮循的写下游的所有partition。该方式下游的partition数据会比较均衡,但是缺点是partition个数过多的情况下需要维持过多的网络连接,即每个task都会维持跟所有partition所在broker的连接。
[点击并拖拽以移动] ​

容错

Flink kafka 09010版本下,通过setLogFailuresOnlyfalsesetFlushOnCheckpointtrue, 能达到at-least-once语义。setLogFailuresOnly默认为false,是控制写kafka失败时,是否只打印失败的log不抛异常让作业停止。setFlushOnCheckpoint,默认为true,是控制是否在 checkpointfluse数据到kafka,保证数据已经写到kafka。否则数据有可能还缓存在kafka客户端的buffer中,并没有真正写出到kafka,此时作业挂掉数据即丢失,不能做到至少一次的语义。
Flink kafka 011版本下,通过两阶段提交的sink结合kafka事务的功能,可以保证端到端精准一次。
[点击并拖拽以移动] ​

疑问与解答

【问题一】:Flink consumer的并行度的设置:是对应topicpartitions个数吗?要是有多个主题数据源,并行度是设置成总体的 partitions数吗?
【解答】: 这个并不是绝对的,跟topic的数据量也有关,如果数据量不大,也可以设置小于partitions个数的并发数。但不要设置并发数大于partitions总数,因为这种情况下某些并发因为分配不到partition导致没有数据处理。
【问题二】: 如果partitionernull的时候是round-robin发到每一个partition ?如果有key的时候行为是kafka那种按照key分布到具体分区的行为吗?
【解答】: 如果在构造FlinkKafkaProducer时,如果没有设置单独的partitioner,则默认使用FlinkFixedPartitioner,此时无论是带key的数据,还是不带key。如果主动设置partitionernull时,不带key的数据会round-robin轮询的方式写出到partition,带key的数据会根据key,相同key数据分区的相同的partition
【问题三】: 如果checkpoint时间过长,offset未提交到kafka,此时节点宕机了,重启之后的重复消费如何保证呢?
【解答】: 首先开启checkpointoffsetFlink通过状态state管理和恢复的,并不是从kafkaoffset位置恢复。在checkpoint机制下,作业从最近一次checkpoint恢复,本身是会回放部分历史数据,导致部分数据重复消费,Flink引擎仅保证计算状态的精准一次,要想做到端到端精准一次需要依赖一些幂等的存储系统或者事务操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/582547.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

类。。。。

定义一个person类&#xff0c;包含私有成员&#xff0c;int *age,string &name,一个stu类&#xff0c;包含私有成员double *sore,person p1,写出person类和stu类的特殊成员函数&#xff0c;并写一个stu的函数&#xff0c;显示所有信息。 #include <iostream>using n…

Android下载gradle失败解决方法

1、在gradle-wrapper.properties文件中查看自己需要下载gradle什么版本的包和zip路径&#xff08;wrapper/dists&#xff09;。 2、在setting中查看Gradle的保存路径&#xff0c;如下图&#xff1a;C:/Users/Administrator/.gradle&#xff0c;加上第一步的zip路径得到下载grad…

15 Sequence-Driver-Sequencer communication in UVM

我们分别讨论了sequece_item、sequence、sequencer和driver。在本节中&#xff0c;我们将讨论他们如何相互talk&#xff0c;sequencer如何给driver提供从sequence里的sequence item。在开始阅读本节之前&#xff0c;请确保您了解sequencer和driver中使用的所有方法。&#xff0…

Ubuntu fcitx Install

ubuntu经常出现键盘失灵的问题 查询资料得知应该是Ibus框架的问题 于是需要安装fcitx框架和搜狗拼音 sudo apt update sudo apt install fcitx 设置fcitx开机自启动&#xff08;建议&#xff09; sudo cp /usr/share/applications/fcitx.desktop /etc/xdg/autostart/ 然后…

普中STM32-PZ6806L开发板(HAL库函数实现-TIM2实现us延时)

简介 使用TIM2实现1us延时其他知识 公式 时间&#xff08;s&#xff09;1/时钟频率&#xff08;Hz&#xff09;由导出 1us 1/1M(Hz)预分配设置 系统时钟是72MHz, 要1us的延时, 预分配得设置为72-1计数器重载设置 设置为最大值65535&#xff0c;这样延时的时间可以设置的最…

【Vue3】创建项目的方式

1. 基于 vue-cli 创建 ## 查看vue/cli版本&#xff0c;确保vue/cli版本在4.5.0以上 vue --version## 安装或者升级你的vue/cli npm install -g vue/cli## 执行创建命令 vue create vue_test本质上使用webpack&#xff0c;默认安装以下依赖&#xff1a; 2. 基于 vite 创建 官…

Buck电源设计常见的一些问题(五)MOS管振荡抑制方法(三)

MOS管振荡抑制方法(三)Rboot的选取 1.Rboot的选取2.总结1.Rboot的选取 同步 Buck 变换器一般采用自举电路供电,如图所示。开关节点上升沿的振荡与上管开通关系密切,上管开通时的驱动电流路径如图所示。因此,可以通过增大 Rboot来减缓上管开通的速度,从而抑制开关节点的振…

文献速递:人工智能医学影像分割---高效的MR引导CT网络训练,用于CT图像中前列腺分割

01 文献速递介绍 如今&#xff0c;根据国家癌症研究所的报告&#xff0c;美国约有9.9%的男性患有前列腺癌。1 此外&#xff0c;根据美国癌症协会的数据&#xff0c;预计2019年将有174,650个新病例被诊断出前列腺癌&#xff0c;与此同时大约有31,620名男性将死于前列腺癌。因此…

vue前端预览pdf并加水印、ofd文件,控制打印、下载、另存,vue-pdf的使用方法以及在开发中所踩过的坑合集

根据公司的实际项目需求&#xff0c;要求实现对pdf和ofd文件的预览&#xff0c;并且需要限制用户是否可以下载、打印、另存pdf、ofd文件&#xff0c;如果该用户可以打印、下载需要控制每个用户的下载次数以及可打印的次数。正常的预览pdf很简单&#xff0c;直接调用浏览器的预览…

计算机操作系统(OS)——P1操作系统概述

1、操作系统的概念(定义) 1.1、什么是操作系统 __操作系统&#xff08;Operating System&#xff0c;OS&#xff09;&#xff1a;__是指控制和管理整个计算机系统的__硬件和软件__资源&#xff0c;并合理的组织调度计算机的工作和资源的分配&#xff1b;以__提供给用户和其它…

pytest实现多进程与多线程运行超好用的插件

前言 如果想分布式执行用例&#xff0c;用例设计必须遵循以下原则&#xff1a; 1、用例之间都是独立的&#xff0c; 2、用例a不要去依赖用例b 3、用例执行没先后顺序&#xff0c; 4、随机都能执行每个用例都能独立运行成功每个用例都能重复运行&#xff0c;不影响其它用例 这…

深入浅出图解C#堆与栈 C# Heap(ing) VS Stack(ing) 第五节 引用类型复制问题及用克隆接口ICloneable修复

深入浅出图解C#堆与栈 C# Heaping VS Stacking 第五节 引用类型复制问题及用克隆接口ICloneable修复 [深入浅出图解C#堆与栈 C# Heap(ing) VS Stack(ing) 第一节 理解堆与栈](https://mp.csdn.net/mdeditor/101021023)[深入浅出图解C#堆与栈 C# Heap(ing) VS Stack(ing) 第二节…

九九乘法表c 语言 用于打印九九乘法表

以下是一个简单的C语言程序&#xff0c;用于打印九九乘法表&#xff1a; #include <stdio.h>int main() {int i, j;for (i 1; i < 9; i) {for (j 1; j < i; j) {printf("%d*%d%-2d ", j, i, i*j);}printf("\n");}return 0; }解释&#xff1…

JavaScript练习题第(四)部分

大家好关于JavaScript基础知识点已经发布&#xff1a;需要的大家可以去我的主要查看 &#xff08;当然了有任何不会的&#xff0c;可以私信我&#xff01;&#xff01;&#xff01;&#xff01;&#xff09; 为了巩固大家学习知识点给大家准备几道练习题&#xff1a; 当然&…

网络编程『简易TCP网络程序』

&#x1f52d;个人主页&#xff1a; 北 海 &#x1f6dc;所属专栏&#xff1a; Linux学习之旅、神奇的网络世界 &#x1f4bb;操作环境&#xff1a; CentOS 7.6 阿里云远程服务器 文章目录 &#x1f324;️前言&#x1f326;️正文TCP网络程序1.字符串回响1.1.核心功能1.2.程序…

CGAL的D维包围盒相交计算

包围盒相交测试是一种用于快速判断两个三维对象是否相交的方法&#xff0c;而AABB树则是一种数据结构&#xff0c;常用于加速场景中的射线检测和碰撞检测。 首先&#xff0c;让我们了解一下包围盒相交测试。这种测试的目的是为了快速判断两个三维对象是否相交&#xff0c;而不需…

同化的题解

时间限制: 1000ms 空间限制: 524288kB 题目描述 古人云&#xff1a;“近朱者赤近墨者黑”。这句话是很有道理的。这不鱼大大和一群苦命打工仔被安排进厂拧螺丝了。 进厂第一天&#xff0c;每个人拧螺丝的动力k都是不同且十分高涨的。但是当大家坐在一起后会聊天偷懒&#xf…

在微服务中如何实现全链路的金丝雀发布?

目录 1. 什么金丝雀发布&#xff1f;它有什么用&#xff1f; 2.如何实现全链路的金丝雀发布 2.1 负载均衡模块 2.2 网关模块 2.3 服务模块 2.3.1 注册为灰色服务实例 2.3.2 设置负载均衡器 2.3.3 传递灰度发布标签 2.4 其他代码 2.4.1 其他业务代码 2.4.2 pom.xml 关…

CSS 向上扩展动画

上干货 <template><!-- mouseenter"startAnimation" 表示在鼠标进入元素时触发 startAnimation 方法。mouseleave"stopAnimation" 表示在鼠标离开元素时触发 stopAnimation 方法。 --><!-- 容器元素 --><div class"container&q…

SAP VA01 创建带wbs号的销售订单包 CJ067的错误

接口错误提示如下 SAP官方 CJ067 124177 - VA01: CJ067 during WBS acct assgmt with a different business area S4的core 刚好能用上 实施 这个note后成功