Flink 基础 -- 应用开发(项目配置)

1、概述

本节中的指南将向您展示如何通过流行的构建工具(Maven, Gradle)配置项目,添加必要的依赖项(即连接器和格式,测试),并涵盖一些高级配置主题。

每个Flink应用程序都依赖于一组Flink库。至少,应用程序依赖于Flink api,此外,还依赖于某些连接器库(如Kafka, Cassandra)和第三方依赖,用户需要开发自定义函数来处理数据。

1.1 开始进行

要开始使用Flink应用程序,请使用以下命令、脚本和模板来创建Flink项目。

Maven

您可以使用下面的Maven命令基于 Archetype创建一个项目,或者使用提供的快速入门bash脚本。

所有Flink Scala api都已弃用,并将在未来的Flink版本中删除。您仍然可以在Scala中构建应用程序,但是应该使用Java版本的DataStream和/或Table API。
See FLIP-265 Deprecate and remove Scala API support

Maven command

$ mvn archetype:generate                \-DarchetypeGroupId=org.apache.flink   \-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.18.0

这允许您命名新创建的项目,并将交互地询问您的groupId、artifactId和包名。

Quickstart script

$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.18.0

1.2 您需要哪些依赖项?

要开始处理Flink作业,通常需要以下依赖项:

  • Flink APIs,以开发您的工作
  • 连接器和格式,以便将您的作业与外部系统集成
  • 测试实用程序,以测试您的工作

除此之外,您可能还需要添加开发自定义功能所需的第三方依赖项。

1.3 Flink APIs

Flink提供了两个主要的API:数据流API和表API & SQL。它们可以单独使用,也可以混合使用,这取决于你的用例:

想要使用的APIs需要添加的依赖项
DataStreamflink-streaming-java
DataStream with Scalaflink-streaming-scala_2.12
Table APIflink-table-api-java
Table API with Scalaflink-table-api-scala_2.12
Table API + DataStreamflink-table-api-java-bridge
Table API + DataStream with Scalaflink-table-api-scala-bridge_2.12

1.3 运行和打包

如果您希望通过简单地执行主类来运行作业,则需要在类路径中使用flink-clients。对于Table API程序,您还需要flink-table-runtimeflink-table-planner-loader

根据经验,我们建议将应用程序代码及其所需的所有依赖项打包到一个fat/uber JAR中。这包括作业的打包连接器、格式和第三方依赖项。此规则不适用于 Java APIs、DataStream Scala api和前面提到的运行时模块,这些模块已经由Flink自己提供,不应该包含在job uber JAR中。这个作业JAR可以提交到已经运行的Flink集群,或者添加到Flink应用程序容器映像中,而无需修改发行版。

1.4 What’s next?

  • 要开始开发您的工作,请查看数据流API和表API & SQL。
  • 有关如何根据构建工具打包作业的详细信息,请查看以下特定指南
    Maven
    Gradle
  • 有关项目配置的更高级主题,请查看高级主题部分。

2、如何使用Maven来配置您的项目

本指南将向您展示如何使用Maven配置Flink作业项目(Flink job project),Maven是由Apache Software Foundation开发的开源构建自动化工具,使您能够构建、发布和部署项目。您可以使用它来管理软件项目的整个生命周期。

2.1 要求

  • Maven 3.8.6 (recommended or higher)
  • Java 8 (deprecated) or Java 11

2.2 将项目导入到IDE中

一旦创建了项目文件夹和文件,我们建议您将该项目导入到IDE中进行开发和测试。

IntelliJ IDEA支持开箱即用的Maven项目。Eclipse提供了m2e插件来导入Maven项目。

注意:对于Flink来说,Java的默认JVM堆大小可能太小,您必须手动增加它。在Eclipse中,选择“Run Configurations -> Arguments”,在“VM Arguments”中输入“-Xmx800m”。在IntelliJ IDEA中,建议从Help |编辑自定义虚拟机选项菜单中更改JVM选项。有关详细信息,请参阅本文。

关于IntelliJ的注意事项:要使应用程序在IntelliJ IDEA中运行,必须在运行配置中勾选Include dependencies with "Provided" scope框。如果这个选项不可用(可能是由于使用较旧的IntelliJ IDEA版本),那么一个变通方法是创建一个调用应用程序的main()方法的测试。

2.3 构建项目

如果你想构建/打包你的项目,导航到你的项目目录并运行mvn clean package’ 命令。您将在这里找到一个JAR文件,其中包含您的应用程序(以及您可能作为依赖项添加到应用程序中的连接器和库):target/<artifact-id>-<version>.jar

注意:如果您使用与DataStreamJob不同的类作为应用程序的主类/入口点,我们建议您相应地更改pom.xml文件中的mainClass设置,以便Flink可以从JAR文件运行应用程序,而无需额外指定主类。

2.4 向项目添加依赖项

在项目目录中打开pom.xml文件,并在dependencies 选项卡之间添加依赖项

例如,你可以像这样添加Kafka连接器作为依赖项:

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.18.0</version></dependency></dependencies>

然后在命令行上执行mvn install
Java Project Template Scala Project Template或Gradle创建的项目被配置为在运行mvn clean package时自动将应用程序依赖项包含到应用程序JAR中。对于没有从这些模板中设置的项目,我们建议添加Maven Shade Plugin来构建包含所有必需依赖项的应用程序jar。

重要:请注意,所有这些核心API依赖项都应该将它们的作用域设置为 provided。这意味着需要对它们进行编译,但是不应该将它们打包到项目的最终应用程序JAR文件中。如果没有设置为provided,最好的情况是最终的JAR变得过大,因为它还包含所有Flink核心依赖项。最坏的情况是,添加到应用程序JAR文件中的Flink核心依赖项与您自己的一些依赖项版本发生冲突(通常可以通过反向类加载来避免这种情况)。

要正确地将依赖项打包到应用程序JAR中,必须将Flink API依赖项设置为compile 范围。

2.5 打包应用程序

根据您的用例,在将Flink应用程序部署到Flink环境之前,可能需要以不同的方式对其进行打包。

如果你想为Flink Job创建一个JAR,并且只使用Flink依赖关系而不使用任何第三方依赖关系(即使用JSON格式的文件系统连接器),你不需要创建一个uber/fat JAR或遮挡任何依赖关系。

如果您想为Flink Job创建一个JAR,并使用Flink发行版中没有内置的外部依赖项,您可以将它们添加到发行版的类路径中,或者将它们隐藏到您的uber/fat应用程序JAR中。

有了生成的uber/fat JAR,你可以通过以下命令将其提交到本地或远程集群:

bin/flink run -c org.example.MyJob myFatJar.jar

要了解有关如何部署Flink作业的详细信息,请查看部署指南。

2.6 用于创建带有依赖项的uber/fat JAR的模板

要构建一个包含声明的连接器和库所需的所有依赖项的应用程序JAR,您可以使用以下插件定义:

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- Replace this with the main class of your job --><mainClass>my.programs.main.clazz</mainClass></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins>
</build>

默认情况下,Maven阴影插件将包括runtimecompile范围内的所有依赖项。

3、连接器和格式

Flink应用程序可以通过连接器读取和写入各种外部系统。它支持多种格式,以便编码和解码数据以匹配Flink的数据结构。

数据流和Table API/SQL的可用连接器和格式概述。

3.1 Available artifacts

为了使用连接器和格式,您需要确保Flink能够访问实现它们的构件。对于Flink社区支持的每个连接器,我们在Maven Central上发布两个工件:

  • flink-connector-<NAME> 它是一个瘦JAR,只包括连接器代码,但不包括最终的第三方依赖
  • flink-sql-connector-<NAME> 它是一个超级JAR,可以与所有连接器第三方依赖项一起使用

这同样适用于格式(formats)。注意,有些连接器可能没有相应的flink-sql-connector-<NAME>构件,因为它们不需要第三方依赖项。

uber/fat jar主要用于与SQL客户端一起使用,但您也可以在任何数据流/表应用程序中使用它们。

3.2 Using artifacts

为了使用连接器/格式模块,你可以:

  • 在您的作业JAR中为瘦JAR及其传递依赖项 Shade
  • 在你的工作JAR中添加超级JAR
  • 将uber JAR直接拷贝到Flink发行版的/lib文件夹中

对于shading 依赖,请查看特定的Maven和Gradle指南。有关Flink分布的参考,请查看Flink分布的解剖。

决定是shade uber JAR、瘦JAR还是仅仅在发行版中包含依赖取决于您和您的用例。如果您为依赖项添加阴影,您将对作业JAR中的依赖项版本有更多的控制。在对瘦JAR进行shade 的情况下,您将对传递依赖项有更多的控制,因为您可以在不更改连接器版本的情况下更改版本(允许二进制兼容性)。如果在Flink分发/lib文件夹中直接嵌入连接器uber JAR,您将能够在一个地方控制所有作业的连接器版本。

4、测试依赖

Flink提供了用于测试作业的实用程序,您可以将其添加为依赖项。

4.1 DataStream API Testing

如果要为使用DataStream API构建的作业开发测试,则需要添加以下依赖项:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>1.18.0</version><scope>test</scope>
</dependency>

在各种测试实用程序中,该模块提供了MiniCluster,这是一个轻量级的可配置Flink集群,可在JUnit测试中运行,可以直接执行作业。

有关如何使用这些实用程序的更多信息,请参阅DataStream API测试一节

4.2 Table API Testing

如果你想在IDE中本地测试Table API & SQL程序,除了前面提到的flink-test-utils之外,你可以添加以下依赖项:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-test-utils</artifactId><version>1.18.0</version><scope>test</scope>
</dependency>

这将自动引入查询规划器和运行时,它们分别用于规划和执行查询。

Flink -table-test-utils模块已在Flink 1.15中引入,被认为是实验性的。

5、高级配置主题

5.1 Flink分布的解剖

Flink本身由一组类和依赖项组成,这些类和依赖项构成了Flink运行时的核心,并且在启动Flink应用程序时必须出现。运行系统所需的类和依赖关系处理诸如协调、网络、检查点、故障转移、api、操作符(如窗口)、资源管理等领域。

这些核心类和依赖项被打包在flink-dist.jar中,它可以在下载的发行版的/lib文件夹中获得,并且是基本Flink容器映像的一部分。您可以将这些依赖关系看作类似于Java的核心库,其中包含StringList之类的类。

为了保持核心依赖尽可能小并避免依赖冲突,Flink核心依赖不包含任何连接器或库(即CEP, SQL, ML),以避免在类路径中有过多的默认类和依赖。

Flink发行版的/lib目录还包含各种jar,其中包括常用模块,例如执行Table作业所需的所有模块以及一组连接器和格式。这些是默认加载的,可以通过从/lib文件夹中删除它们来从类路径中删除。

Flink还在/opt文件夹下提供了额外的可选依赖项,这可以通过移动/lib文件夹中的jar来启用。

有关类加载的更多信息,请参阅Flink中的类加载一节。

5.2 Scala版本

不同的Scala版本之间不是二进制兼容的。所有(传递地)依赖于Scala的Flink依赖都以其构建的Scala版本为后缀(例如:flink-streaming-scala_2.12)。

如果你只使用Flink的Java api,你可以使用任何Scala版本。如果您正在使用Flink的Scala api,则需要选择与应用程序的Scala版本匹配的Scala版本。

有关如何为特定的Scala版本构建Flink的详细信息,请参阅构建指南。

2.12.8之后的Scala版本与之前的2.12.x 版本不兼容。这将阻止Flink项目升级其2.12.X版本在2.12.8之后。您可以按照构建指南在本地为以后的Scala版本构建Flink。为此,您需要添加-Djapicmp.skip在构建时跳过二进制兼容性检查。

5.3 表依赖剖析

Flink发行版默认包含执行Flink SQL作业所需的jar(在/lib文件夹中),特别是:

  • flink-table-api-java-uber-1.18.0.jar → contains all the Java APIs
  • flink-table-runtime-1.18.0.jar → contains the table runtime
  • flink-table-planner-loader-1.18.0.jar → contains the query planner

以前,这些jar都打包到flink-table.jar中。从Flink 1.15开始,为了允许用户将flink-table-planner-loader-1.18.0.jar flink-table-planner_2.12-1.18.0.jar交换,这个文件现在被分成三个jar。

虽然 Table Java API构件内置于发行版中,但默认情况下不包括表Scala API构件。当使用Flink Scala API的格式和连接器时,您需要手动下载并将这些JAR包含在distribution /lib文件夹中(推荐),或者将它们打包为Flink SQL作业的uber/fat JAR中的依赖项。

有关更多详细信息,请查看如何连接到外部系统。

Table Planner and Table Planner Loader

从Flink 1.15开始,该分布包含两个规划器:

  • flink-table-planner_2.12-1.18.0.jar, in /opt, contains the query planner
  • flink-table-planner-loader-1.18.0.jar, loaded by default in /lib, contains the query planner hidden behind an isolated classpath (you won’t be able to address any io.apache.flink.table.planner directly)

这两个规划器jar包含相同的代码,但是它们的打包不同。在第一种情况下,必须使用相同的Scala版本的JAR。在第二种情况下,您不需要考虑Scala,因为它隐藏在JAR中。

默认情况下,发行版使用flink-table-planner-loader。如果需要访问和使用查询规划器的内部,可以交换jar(在distribution /lib文件夹中复制并粘贴flink-table-planner_2.12.jar)。请注意,您将被限制使用您正在使用的Flink发行版的Scala版本。

这两个计划器不能同时存在于类路径中。如果将它们都加载到/lib中,则表作业将失败。

在即将到来的Flink版本中,我们将停止在Flink发行版中发布flink-table-planner_2.12工件。我们强烈建议迁移作业和自定义连接器/格式以使用API模块,而不依赖于规划器内部。如果您需要来自计划器的一些功能,这些功能目前没有通过API模块公开,请打开一个票证以便与社区讨论。

5.4 Hadoop的依赖性

一般规则:没有必要将Hadoop依赖项直接添加到应用程序中。如果你想在Hadoop中使用Flink,你需要有一个包含Hadoop依赖项的Flink设置,而不是将Hadoop作为一个应用程序依赖项添加。换句话说,Hadoop必须依赖于Flink系统本身,而不是包含应用程序的用户代码。Flink将使用由HADOOP_CLASSPATH环境变量指定的Hadoop依赖项,可以这样设置:

export HADOOP_CLASSPATH=`hadoop classpath`

这种设计有两个主要原因:

  • 一些Hadoop交互发生在Flink的核心,可能在用户应用程序启动之前。其中包括为检查点设置HDFS,通过Hadoop的Kerberos令牌进行身份验证,或者部署在YARN上。
  • Flink的反向类加载方法从核心依赖项中隐藏了许多传递依赖项。这不仅适用于Flink自己的核心依赖,也适用于安装过程中出现的Hadoop依赖。这样,应用程序就可以使用相同依赖项的不同版本,而不会遇到依赖冲突。当依赖树变得非常大时,这非常有用。

如果你在IDE中开发或测试时需要Hadoop依赖项(例如,HDFS访问),你应该将这些依赖项配置为类似于依赖项的范围(例如,test provided)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/136801.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何在时间循环里最优决策——时间旅行者的最优决策

文章目录 每日一句正能量前言时间旅行和平行宇宙强化学习策略梯度算法代码案例推荐阅读赠书活动 每日一句正能量 做一个决定&#xff0c;并不难&#xff0c;难的是付诸行动&#xff0c;并且坚持到底。 前言 时间循环是一类热门的影视题材&#xff0c;其设定常常如下&#xff1…

Pycharm加载项目时异常,看不到自己的项目文件

最近看到一个朋友问&#xff0c;他把项目导入pycharm为什么项目里的包不在项目里显示&#xff0c;只在projects file里显示&#xff1f;问题截图如下&#xff1a; Project里看不到自己的项目文件 只能在Project Files里看到自己的项目文件 问题解答 我也是偶然发现的这个方案…

jQuery中显示与隐藏

在我们jQuery当中&#xff0c;有多个显示隐藏的方法&#xff0c;本篇介绍一下hide()、show()、toggle() 在我们JS当中&#xff0c;或是CSS当中&#xff0c;我们常用到display:none或block; 在我们jQuery当中&#xff0c;我们该如何实现显示隐藏 在我们jQuery当中&#xff0c;我…

vscode基于cmake结果调试运行

Linux环境使用VSCode调试简单C代码_linux vscode编译c代码_果壳中的robot的博客-CSDN博客 Linux环境下使用VScode调试CMake工程 - 知乎 1 vscode实现cmakemake指令 我们都知道&#xff0c;对于cmake构建的工程&#xff0c;编译需要以下步骤: cd build cmake .. make 那如…

Linux CentOS 8(HTTPS的配置与管理)

Linux CentOS 8&#xff08;HTTPS的配置与管理&#xff09; 目录 一、HTTPS 介绍二、SSL 证书的介绍三、实验配置 一、HTTPS 介绍 HTTPS 在 HTTP 的基础下加入 SSL&#xff0c;SSL 是“Secure Sockets Layer”的缩写&#xff0c;中文为“安全套接层”。因此 HTTPS 是以安全为目…

MATLAB算法实战应用案例精讲-【人工智能】人工智能杂谈

目录 人工智能的发展历程 人工智能涉及的内容 一、数据挖掘 二、机器学习

【Unity ShaderGraph】| 物体靠近时局部溶解,根据坐标控制溶解的位置【文末送书】

前言 【Unity ShaderGraph】| 物体靠近时局部溶解&#xff0c;根据坐标控制溶解的位置一、效果展示二、根据坐标控制溶解的位置&#xff0c;物体靠近局部溶解三、应用实例&#x1f451;评论区抽奖送书 前言 本文将使用ShaderGraph制作一个根据坐标控制溶解的位置&#xff0c;物…

如何用Java实现一个基于机器学习的情感分析系统,用于分析文本中的情感倾向

背景&#xff1a;练习两年半&#xff08;其实是两周半&#xff09;&#xff0c;利用工作闲余时间入门一下机器学习&#xff0c;本文没有完整的可实施的案例&#xff0c;由于知识体系不全面&#xff0c;目前代码只能运行&#xff0c;不能准确的预测 卡点&#xff1a; 1 由于过…

技术分享 | app自动化测试(Android)--触屏操作自动化

导入TouchAction Python 版本 from appium.webdriver.common.touch_action import TouchActionJava 版本 import io.appium.java_client.TouchAction;常用的手势操作 press 按下 TouchAction 提供的常用的手势操作有如下操作&#xff1a; press 按下 release 释放 move_…

CentOS Linux 系统镜像

CentOS Linux具有以下特点&#xff1a; 稳定性&#xff1a;CentOS Linux旨在提供一个稳定、可靠的服务器环境&#xff0c;适合用于关键业务应用和生产环境。高效性&#xff1a;CentOS Linux经过优化和调整&#xff0c;可以充分发挥硬件的性能&#xff0c;提高系统的整体效率。…

Rust和isahc库编写代码示例

Rust和isahc库编写的图像爬虫程序的代码&#xff1a; rust use isahc::{Client, Response}; fn main() { let client Client::new() .with_proxy("") .finish(); let url ""; let response client.get(url) .send() …

无线测温系统在电厂的必要性,保障电力系统稳定运行

安科瑞电气股份有限公司 上海嘉定 201801 摘要&#xff1a;采集关键电力设备接电的实时温度&#xff0c;克服有线温度监测系统存在的诸如线路多&#xff0c;布线复杂&#xff0c;维护困难等不足&#xff0c;将无线无源传感器与Zigbee无线通信技术相结合&#xff0c;将物联网技…

assimp中如何判断矩阵是否是单位矩阵

对于一个矩阵元素为浮点型的矩阵&#xff0c;你是否还在使每个元素跟1.0f或0.0f进行比较&#xff0c;如果这样&#xff0c;只能说你的结果不一定正确&#xff0c;那我们看看assimp中是如何做的。 template <typename TReal> AI_FORCE_INLINE bool aiMatrix4x4t<TReal…

K8S基础服务(apiserver、controller、scheduler、etcd)时区设置

K8S基础服务&#xff08;apiserver、controller、scheduler、etcd&#xff09;时区设置 一、PodPreset 使用PodPreset可以修改所有容器的时区&#xff08;在pod 创建时,用户可以使用 podpreset 对象将特定信息注入 pod 中,这些信息可以包括 secret、 卷、 卷挂载和环境变量&a…

关于VUE启动内存溢出

安装node v10.14.2 后 启动公司的VUE项目 使用命令npm run dev 命令 报错&#xff1a; <--- Last few GCs --->[20940:00000244699848E0] 215872 ms: Scavenge 1690.2 (1836.4) -> 1679.6 (1836.4) MB, 5.4 / 0.7 ms (average mu 0.266, current mu 0.253) a…

jQuery中淡入与淡出

在我们jQuery中为我们封装了很多好玩的方法&#xff0c;我为大家介绍一下淡入与淡出&#xff01; 我们需要配合事件来玩淡入淡出 淡出语法&#xff1a;fadeOut([speed,[easing],[fn]) (1)参数都可以省略 (2)speed:三种预定速度之一的字符串(“slow”“normal”or “fast”)或…

阿里云 :推出通义大模型编码助手产品【通义灵码】

本心、输入输出、结果 文章目录 阿里云 &#xff1a;推出通义大模型编码助手产品【通义灵码】前言通义灵码简介主要功能主要功能点 支持的语言和 IDEjetbrains IDEA 安装计费相关弘扬爱国精神 阿里云 &#xff1a;推出通义大模型编码助手产品【通义灵码】 编辑&#xff1a;简简…

ARMday2(环境创建+工程配置+创建文件+单步调试)

目录 一、汇编环境的创建 二、为工程配置链接脚本&#xff08;map.lds&#xff09; 三、为工程创建汇编文件 start.s 编程调试 接下来我们需要建立一个 start.s 汇编文件添加到我们的工程中去 四、对汇编代码进行单步调试&#xff08;仿真&#xff09; 五、汇编工程的编译 …

Node.js中的文件系统(file system)模块

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

Kafka与Flink的整合 -- sink、source

1、首先导入依赖&#xff1a; <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version></dependency> 2、 source&#xff1a;Flink从Kafka中读取数据 p…