1 什么是Flink
Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。
2 Flink的特点
2.1 事件驱动
事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
事件驱动型应用是在计算存储分离的传统应用基础上进化而来。在传统架构中,应用需要读写远程事务型数据库。相反,事件驱动型应用是基于状态化流处理来完成。在该设计中,数据和计算不会分离,应用只需访问本地(内存或磁盘)即可获取数据。系统容错性的实现依赖于定期向远程持久化存储写入 checkpoint。
像kafka和flume是事件驱动型的,事件驱动型就是以事件为单位的,来了一件事,会针对这件事进行处理,一件事一件事过来。sparkstreaming就不是事件驱动型的,是微批次,以事件为单位触发,不是以事件为单位触发。事件驱动型的优势:对数据更敏感,延迟更低。
2.2 流式处理
首先先了解批处理和流处理:
批处理:有界、持久、大量,适合需要访问全量数据才能完成的计算工作,一般用于离线统计。
流处理:无界、实时, 无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计,流处理最大好处就是具有极低的延迟。
在Spark中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。而在Flink中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。
有界流:有明确定义的开始和结束,可以在计算之前获取所有的数据,不需要有序,因为可以进行排序,也就是批处理。
无界流:有开始但是没有结束,处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性。
2.3 状态保证
只有在每一个单独的事件上进行转换操作的应用才不需要状态,而每一个具有一定复杂度的流处理应用都是有状态的。任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接收的事件或中间结果,以供后续的某个时间点(例如收到下一个事件或者经过一段特定时间)进行访问并进行后续处理。
应用状态是 Flink 中的一等公民,Flink 提供了许多状态管理相关的特性支持,其中包括:①多种状态基础类型:Flink 为多种不同的数据结构提供了相对应的状态基础类型,例如原子值(value),列表(list)以及映射(map);②插件化的State Backend:State Backend 负责管理应用程序状态,并在需要的时候进行 checkpoint。Flink 支持多种 state backend,可以将状态存在内存或者 RocksDB。③精确一次语义:Flink 的 checkpoint 和故障恢复算法保证了故障发生后应用状态的一致性。④超大数据量状态:Flink 能够利用其异步以及增量式的 checkpoint 算法,存储数 TB 级别的应用状态⑤可弹性伸缩的应用:Flink 能够通过在更多或更少的工作节点上对状态进行重新分布,支持有状态应用的分布式的横向伸缩。
2.4 丰富的时间语义支持
时间是流处理应用另一个重要的组成部分,FLink中有3种时间语义:Event Time(事件自身的时间),Processing Time(事件被处理时机器的系统时间),Ingestion Time(事件进入 Flink 的时间)。
对于迟到数据的处理:Flink 还引入了 watermark 的概念,用以衡量事件时间进展。Watermark 也是一种平衡处理延时和完整性的灵活机制。当以带有 watermark 的事件时间模式处理数据流时,在计算完成之后仍会有相关数据到达。这样的事件被称为迟到事件。Flink 提供了多种处理迟到数据的选项,例如将这些数据重定向到旁路输出(side output)或者更新之前完成计算的结果。
2.5 分层API
Flink 根据抽象程度分层,提供了三种不同的 API。每一种 API 在简洁性和表达力上有着不同的侧重,并且针对不同的应用场景。
ProcessFunction是 Flink 所提供的最具表达力的接口。ProcessFunction 可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口内的多个事件。它提供了对于时间和状态的细粒度控制。开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发回调函数。
DataStream API为许多通用的流处理操作提供了处理原语。这些操作包括窗口、逐条记录的转换操作,在处理事件时进行外部数据库查询等。DataStream API 支持 Java 和 Scala 语言,预先定义了例如map()、reduce()、aggregate()等函数。你可以通过扩展实现预定义接口或使用 Java、Scala 的 lambda 表达式实现自定义的函数。
Flink 支持两种关系型的 API,Table API 和 SQL。这两个 API 都是批处理和流处理统一的 API,这意味着在无边界的实时数据流和有边界的历史记录数据流上,关系型 API 会以相同的语义执行查询,并产生相同的结果。Table API 和 SQL 借助了 Apache Calcite 来进行查询的解析,校验以及优化。它们可以与 DataStream 和 DataSet API 无缝集成,并支持用户自定义的标量函数,聚合函数以及表值函数。
3 相关概念
(1)Flink Application Cluster 是一个专用的 Flink Cluster,它仅用于执行单个 Flink Job。的生命周期与 Flink Job的生命周期绑定在一起。
(2)Flink Cluster:一般情况下,Flink 集群是由一个 Flink Master和一个或多个 Flink TaskManager进程组成的分布式系统。
(3)Event: 是对应用程序建模的域的状态更改的声明。它可以同时为流或批处理应用程序的 input 和 output,也可以单独是 input 或者 output 中的一种。Event 是特殊类型的 Record。
(4)Function: 是由用户实现的,并封装了 Flink 程序的应用程序逻辑。大多数 Function 都由相应的 Operator:封装。
(5)Instance :常用于描述运行时的特定类型(通常是 Operator]或者 Function)的一个具体实例。
(6)Flink Job 代表运行时的 Flink 程序。Flink Job 可以提交到长时间运行的 Flink Session Cluster,也可以作为独立的 Flink Application Cluster 启动。
(7)JobManager :是在:Flink Master运行中的组件之一。JobManager 负责监督单个作业 Task]的执行。以前,整个 Flink Master都叫做 JobManager。
(8)Logical Graph 是一种描述流处理程序的高阶逻辑有向图。节点是Operator,边代表输入/输出关系、数据流和数据集中的之一。
(9)Managed State 描述了已在框架中注册的应用程序的托管状态。对于托管状态,Apache Flink 会负责持久化和重伸缩等事宜。
(10)Flink Master 是 Flink Cluster的主节点。它包含三个不同的组件:Flink Resource Manager、Flink Dispatcher、运行每个 Flink Job 的 Flink JobManager。
(11)Operator:Logical Graph的节点。算子执行某种操作,该操作通常由 Function执行。Source 和 Sink 是数据输入和数据输出的特殊算子。
(12)Operator Chain:算子链由两个或多个连续的 Operator组成,两者之间没有任何的重新分区。同一算子链内的算子可以彼此直接传递 record,而无需通过序列化或 Flink 的网络栈。
(13)Partition:分区是整个数据流或数据集的独立子集。通过将每个 Record分配给一个或多个分区,来把数据流或数据集划分为多个分区。在运行期间,Task 会消费数据流或数据集的分区。改变数据流或数据集分区方式的转换通常称为重分区。
(14)Physical Graph:Physical graph 是一个在分布式运行时,把 Logical Graph 转换为可执行的结果。节点是 Task,表示数据流或数据集的输入/输出关系或 partition。
(15)Record :是数据集或数据流的组成元素。Operator和 Function接收 record 作为输入,并将 record 作为输出发出。
(16)Flink Session Cluster:长时间运行的 Flink Cluster,它可以接受多个 Flink Job的执行。此 Flink Cluster的生命周期不受任何 Flink Job 生命周期的约束限制。
(17)State Backend:对于流处理程序,Flink Job的 State Backend 决定了其 state是如何存储在每个 TaskManager 上的( TaskManager 的 Java 堆栈或嵌入式 RocksDB),以及它在 checkpoint 时的写入位置( Flink Master的 Java 堆或者 Filesystem)。
(18)Sub-Task :是负责处理数据流 Partition的 Task。”Sub-Task”强调的是同一个 Operator或者 Operator Chain 具有多个并行的 Task 。
(19)Task: 是 Physical Graph 的节点。它是基本的工作单元,由 Flink 的 runtime 来执行。Task 正好封装了一个 Operator或者 Operator Chain的 parallel instance。
(20)TaskManager: 是 Flink Cluster:的工作进程。Task 被调度到 TaskManager 上执行。TaskManager 相互通信,只为在后续的 Task 之间交换数据。
(21)Transformation 应用于一个或多个数据流或数据集,并产生一个或多个输出数据流或数据集。Transformation 可能会在每个记录的基础上更改数据流或数据集,但也可以只更改其分区或执行聚合。虽然 Operator和 Function 是 Flink API 的“物理”部分,但 Transformation 只是一个 API 概念。具体来说,大多数(但不是全部)Transformation 是由某些 Operator实现的。
4 安装部署
4.1 安装
参考官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/cluster_setup.html
4.2 Flink on Yarn
Flink提供了两种在yarn上运行的模式,分别为Session-Cluster和Per-Job-Cluster模式。
(1)Session-Cluster模式:需要先启动集群,然后再提交作业,接着会向yarn申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享Dispatcher和ResourceManager;共享资源;适合规模小执行时间短的作业。
(2)Per-Job-Cluster模式:一个Job会对应一个集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
4.3 Flink启动参数
./flink <ACTION> [OPTIONS] [ARGUMENTS]可以使用以下操作:命令 "run" 编译并运行程序。Syntax: run [OPTIONS] <jar-file> <arguments>"run" action options:-c,--class <classname> 程序入口类("main" 方法 或 "getPlan()" 方法)仅在 JAR 文件没有在 manifest 中指定类的时候使用-C,--classpath <url> 在群集中的所有节点上向每个用户代码类加载器添加URL。路径必须指定协议(例如文件://),并且可以在所有节点上访问(例如,通过NFS共享)。您可以多次使用此选项来指定多个URL。该协议必须由 {@link java.net.URLClassLoader} 支持。-d,--detached 以独立模式运行任务-n,--allowNonRestoredState 允许跳过无法还原的保存点状态。当触发保存点的时候,你需要允许这个行为如果以从你的应用程序中移除一个算子 -p,--parallelism <parallelism> 运行程序的并行度。 可以选择覆盖配置中指定的默认值。-q,--sysoutLogging 将日志输出到标准输出-s,--fromSavepoint <savepointPath> 从保存点的路径中恢复作业 (例如hdfs:///flink/savepoint-1537)Options for yarn-cluster mode:-d,--detached 以独立模式运行任务-m,--jobmanager <arg> 连接 JobManager(主)的地址。使用此标志连接一个不同的 JobManager 在配置中指定的-yD <property=value> 使用给定属性的值-yd,--yarndetached 以独立模式运行任务(过期的;用 non-YARN 选项代替)-yh,--yarnhelp Yarn session CLI 的帮助信息-yid,--yarnapplicationId <arg> 用来运行 YARN Session 的 ID-yj,--yarnjar <arg> Flink jar 文件的路径-yjm,--yarnjobManagerMemory <arg> JobManager 容器的内存可选单元(默认值: MB)-yn,--yarncontainer <arg> 分配 YARN 容器的数量(=TaskManager 的数量)-ynm,--yarnname <arg> 给应用程序一个自定义的名字显示在 YARN 上-yq,--yarnquery 显示 YARN 的可用资源(内存,队列)-yqu,--yarnqueue <arg> 指定 YARN 队列-ys,--yarnslots <arg> 每个 TaskManager 的槽位数量-yst,--yarnstreaming 以流式处理方式启动 Flink-yt,--yarnship <arg> 在指定目录中传输文件(t for transfer)-ytm,--yarntaskManagerMemory <arg> 每个 TaskManager 容器的内存可选单元(默认值: MB)-yz,--yarnzookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。-ynl,--yarnnodeLabel <arg> 指定 YARN 应用程序 YARN 节点标签-z,--zookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。Options for default mode:-m,--jobmanager <arg> 连接 JobManager(主)的地址。使用此标志连接一个不同的 JobManager 在配置中指定的。-z,--zookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。Action "info" 显示程序的优化执行计划(JSON).Syntax: info [OPTIONS] <jar-file> <arguments>"info" action options:-c,--class <classname> 具有程序入口的类("main" 方法 或 "getPlan()" 方法)仅在如果 JAR 文件没有在 manifest 中指定类的时候使用-p,--parallelism <parallelism> 运行程序的并行度。 可以选择覆盖配置中指定的默认值。Action "list" 罗列出正在运行和调度的作业Syntax: list [OPTIONS]"list" action options:-r,--running 只显示运行中的程序和他们的 JobID-s,--scheduled 只显示调度的程序和他们的 JobIDOptions for yarn-cluster mode:-m,--jobmanager <arg> 连接 JobManager(主)的地址。使用此标志连接一个不同的 JobManager 在配置中指定的。-yid,--yarnapplicationId <arg> 用来运行 YARN Session 的 ID。-z,--zookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。Options for default mode:-m,--jobmanager <arg> 连接 JobManager(主)的地址。使用此标志连接一个不同的 JobManager 在配置中指定的。-z,--zookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。Action "stop" 停止正在运行的程序 (仅限流式处理作业)Syntax: stop [OPTIONS] <Job ID>"stop" action options:Options for yarn-cluster mode:-m,--jobmanager <arg> 连接 JobManager(主)的地址。使用此标志连接一个不同的 JobManager 在配置中指定的。-yid,--yarnapplicationId <arg> 用来运行 YARN Session 的 ID。-z,--zookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。Options for default mode:-m,--jobmanager <arg> 连接 JobManager(主)的地址。使用此标志连接一个不同的 JobManager 在配置中指定的。-z,--zookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。Action "cancel" 取消正在运行的程序。Syntax: cancel [OPTIONS] <Job ID>"cancel" action options:-s,--withSavepoint <targetDirectory> 触发保存点和取消作业。目标目录是可选的。如果没有指定目录,使用默认配置(state.savepoints.dir)。Options for yarn-cluster mode:-m,--jobmanager <arg> 连接 JobManager(主)的地址。使用此标志连接一个不同的 JobManager 在配置中指定的。-yid,--yarnapplicationId <arg> 用来运行 YARN Session 的 ID。-z,--zookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。Options for default mode:-m,--jobmanager <arg> 连接 JobManager(主)的地址。使用此标志连接一个不同的 JobManager 在配置中指定的。-z,--zookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。Action "savepoint" 触发运行作业的保存点,或处理现有作业。Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]"savepoint" action options:-d,--dispose <arg> 保存点的处理路径。-j,--jarfile <jarfile> Flink 程序的 JAR 文件。Options for yarn-cluster mode:-m,--jobmanager <arg> 连接 JobManager(主)的地址。使用此标志连接一个不同的 JobManager 在配置中指定的。-yid,--yarnapplicationId <arg> 用来运行 YARN Session 的 ID。-z,--zookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。Options for default mode:-m,--jobmanager <arg> 连接 JobManager(主)的地址。使用此标志连接一个不同的 JobManager 在配置中指定的。-z,--zookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。Action "modify" 修改正在运行的作业 (例如:修改并行度).Syntax: modify <Job ID> [OPTIONS]"modify" action options:-h,--help 用来显示命令行的帮助信息。-p,--parallelism <newParallelism> 指定作业新的并行度。-v,--verbose 这个选项过期了Options for yarn-cluster mode:-m,--jobmanager <arg> 连接 JobManager(主)的地址。使用此标志连接一个不同的 JobManager 在配置中指定的。-yid,--yarnapplicationId <arg> 用来运行 YARN Session 的 ID。-z,--zookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。Options for default mode:-m,--jobmanager <arg> 要连接的JobManager(主节点)的地址。使用此标志可连接到与配置中指定的不同的 JobManager。-z,--zookeeperNamespace <arg> 用来创建高可用模式的 Zookeeper 的子路径的命名空间。