flink1.12.0学习笔记(1)-部署与入门
1-1-Flink概述
Flink诞生
Flink 诞生于欧洲的一个大数据研究项目 StratoSphere。该项目是柏林工业大学的一个研究性项目。早期, Flink 是做 Batch 计算的,但在 2014 年, StratoSphere 里面的核心成员孵化出 Flink,同年将 Flink 捐赠 Apache,并在后来成为 Apache 的顶级大数据项目,同时 Flink 计算的主流方向被定位为 Streaming, 即用流式计算来做所有大数据的计算,这就是 Flink 技术诞生的背景。
2014 年 Flink 作为主攻流计算的大数据引擎开始在开源大数据行业内崭露头角。区别于 Storm、Spark Streaming 以及其他流式计算引擎的是:它不仅是一个高吞吐、低延迟的计算引擎,同时还提供很多高级的功能。比如它提供了有状态的计算,支持状态管理,支持强一致性的数据语义以及支持 基于Event Time的WaterMark对延迟或乱序的数据进行处理等。
什么是Flink
flink是一个分布式,高性能,随时可用的以及准确的流处理计算框架,flink可以对无界数据(流处理)和有界数据(批处理)进行有状态计算(flink天生支持状态计算)的分布式,高性能的计算框架。
阿里收购Flink
随着人工智能时代的降临,数据量的爆发,在典型的大数据的业务场景下数据业务最通用的做法是:选用批处理的技术处理全量数据,采用流式计算处理实时增量数据。在绝大多数的业务场景之下,用户的业务逻辑在批处理和流处理之中往往是相同的。但是,用户用于批处理和流处理的两套计算引擎是不同的。因此,用户通常需要写两套代码。毫无疑问,这带来了一些额外的负担和成本。
阿里巴巴的商品数据处理就经常需要面对增量和全量两套不同的业务流程问题,所以阿里就在想能不能有一套统一的大数据引擎技术,用户只需要根据自己的业务逻辑开发一套代码。这样在各种不同的场景下,不管是全量数据还是增量数据,亦或者实时处理,一套方案即可全部支持,这就是阿里选择 Flink 的背景和初衷。
2015年阿里巴巴开始使用 Flink 并持续贡献社区(阿里内部还基于Flink做了一套Blink),2019年1月8日,阿里巴巴以 9000 万欧元(7亿元人民币)收购了创业公司 Data Artisans。从此Flink开始了新一轮的乘风破浪!
Flink流处理特性
- 支持高吞吐、低延迟、高性能的流处理
- 支持带有事件时间的窗口(window)操作
- 支持有状态计算的Exactly-once语义
- 支持高度灵活的窗口(window)操作,支持基于time、count、session,以及data-driven的窗口操作
- 支持具有背压功能的持续流模型
- 支持基于轻量级分布式快照(Snapshot)实现容错
- 一个运行时同时支持Batch On Streaming处理和Streaming处理
- Flink在JVM内部实现了自己的内存管理
- 支持迭代计算
- 支持程序自动优化:避免特定情况下Shuffle、排序等高消耗操作,中间结果有必要进行缓存
Flink官网介绍
官网地址:
https://flink.apache.org/
官方中文地址:
https://flink.apache.org/zh/
大数据技术框架发展阶段
总共有四代,mr–>DAG框架(tez)—>Spark流批处理框架,内存计算(伪实时)–>flink流批处理,内存计算(真正的实时计算)
Flink相比spark的差异简单对比
框架 | SparkStreaming | Flink |
---|---|---|
定义 | 弹性的分布式数据集,并非真正的实时计算 | 真正的流计算,就像storm一样;但flink同时支持有限的数据流批处理计算 |
高容错 | 基于RDD和checkpoint,比较重 | checkpoint(快照),非常轻量级 |
内存管理 | JVM相关操作暴露给用户 | Flink在JVM中实现的是自己的内存管理 |
延时 | 中等100ms | 低10ms |
Flink组件栈
一个计算框架要有长远的发展,必须打造一个完整的 Stack。只有上层有了具体的应用,并能很好的发挥计算框架本身的优势,那么这个计算框架才能吸引更多的资源,才会更快的进步。所以 Flink 也在努力构建自己的 Stack。
Flink分层的组件栈如下图所示:每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。
各层详细介绍:
- 物理部署层:Flink 支持本地运行、能在独立集群或者在被 YARN 管理的集群上运行, 也能部署在云上,该层主要涉及Flink的部署模式,目前Flink支持多种部署模式:本地、集群(Standalone、YARN)、云(GCE/EC2)、Kubenetes。Flink能够通过该层能够支持不同平台的部署,用户可以根据需要选择使用对应的部署模式。
- Runtime核心层:Runtime层提供了支持Flink计算的全部核心实现,为上层API层提供基础服务,该层主要负责对上层不同接口提供基础服务,也是Flink分布式计算框架的核心实现层,支持分布式Stream作业的执行、JobGraph到ExecutionGraph的映射转换、任务调度等。将DataSteam和DataSet转成统一的可执行的Task Operator,达到在流式引擎下同时处理批量计算和流式计算的目的。
- API&Libraries层:Flink 首先支持了 Scala 和 Java 的 API,Python。DataStream、DataSet、Table、SQL API,作为分布式数据处理框架,Flink同时提供了支撑计算和批计算的接口,两者都提供给用户丰富的数据处理高级API,例如Map、FlatMap操作等,也提供比较低级的Process Function API,用户可以直接操作状态和时间等底层数据。
- 扩展库:Flink 还包括用于复杂事件处理的CEP,机器学习库FlinkML,图处理库Gelly等。Table 是一种接口化的 SQL 支持,也就是 API 支持(DSL),而不是文本化的SQL 解析和执行。
Flink基石
Flink之所以能这么流行,离不开它最重要的四大基石:
-
Checkpoint
-
这是Flink最重要的一个特性。
Flink基于Chandy-Lamport算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。
Chandy-Lamport算法实际上在1985年的时候已经被提出来,但并没有被很广泛的应用,而Flink则把这个算法发扬光大了。
Spark在实现Continue streaming,Continue streaming目的是为了降低处理的延时,其也需要提供这种一致性的语义,最终也采用了Chandy-Lamport这个算法,说明Chandy-Lamport算法在业界得到了一定的肯定。
-
-
State
- 提供了一致性的语义之后,Flink为了让用户在编程时能够更轻松、更容易地去管理状态,还提供了一套非常简单明了的State API,包括里面的有ValueState、ListState、MapState,近期添加了BroadcastState,使用State API能够自动享受到这种一致性的语义。
-
Time
- 除此之外,Flink还实现了Watermark的机制,能够支持基于事件的时间的处理,能够容忍迟到/乱序的数据。
-
Window
- 另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。
Flink应用场景
事件驱动场景[Event-driven Applications]
典型实例:
- 欺诈检测(Fraud detection)
- 异常检测(Anomaly detection)
- 基于规则的告警(Rule-based alerting)
- 业务流程监控(Business process monitoring)
- Web应用程序(社交网络)
事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
事件驱动型应用是在计算存储分离的传统应用基础上进化而来。在传统架构中,应用需要读写远程事务型数据库。
相反,事件驱动型应用是基于状态化流处理来完成。在该设计中,数据和计算不会分离,应用只需访问本地(内存或磁盘)即可获取数据。
系统容错性的实现依赖于定期向远程持久化存储写入 checkpoint。下图描述了传统应用和事件驱动型应用架构的区别。
从某种程度上来说,所有的实时的数据处理或者是流式数据处理都应该是属于Data Driven,流计算本质上是Data Driven 计算。应用较多的如风控系统,当风控系统需要处理各种各样复杂的规则时,Data Driven 就会把处理的规则和逻辑写入到Datastream 的API 或者是ProcessFunction 的API 中,然后将逻辑抽象到整个Flink 引擎,当外面的数据流或者是事件进入就会触发相应的规则,这就是Data Driven 的原理。在触发某些规则后,Data Driven 会进行处理或者是进行预警,这些预警会发到下游产生业务通知,这是Data Driven 的应用场景,Data Driven 在应用上更多应用于复杂事件的处理。
数据分析场景[Data Analytics Applications]
典型实例
- 电信网络质量监控
- 移动应用中的产品更新及实验评估分析
- 消费者技术中的实时数据即席分析
- 大规模图分析
数据分析任务需要从原始数据中提取有价值的信息和指标
Data Analytics Applications包含Batch analytics(批处理分析)和Streaming analytics(流处理分析)
Batch analytics可以理解为周期性查询:Batch Analytics 就是传统意义上使用类似于Map Reduce、Hive、Spark Batch 等,对作业进行分析、处理、生成离线报表。比如Flink应用凌晨从Recorded Events中读取昨天的数据,然后做周期查询运算,最后将数据写入Database或者HDFS,或者直接将数据生成报表供公司上层领导决策使用。
Streaming analytics可以理解为连续性查询:比如实时展示双十一天猫销售GMV(Gross Merchandise Volume成交总额),用户下单数据需要实时写入消息队列,Flink 应用源源不断读取数据做实时计算,然后不断的将数据更新至Database或者K-VStore,最后做大屏实时展示。
数据管道场景[Data Pipeline Applications]
典型实例
- 电子商务中的持续 ETL(实时数仓)
- 当下游要构建实时数仓时,上游则可能需要实时的Stream ETL。这个过程会进行实时清洗或扩展数据,清洗完成后写入到下游的实时数仓的整个链路中,可保证数据查询的时效性,形成实时数据采集、实时数据处理以及下游的实时Query。
- 电子商务中的实时查询索引构建(搜索引擎推荐)
- 搜索引擎这块以淘宝为例,当卖家上线新商品时,后台会实时产生消息流,该消息流经过Flink 系统时会进行数据的处理、扩展。然后将处理及扩展后的数据生成实时索引,写入到搜索引擎中。这样当淘宝卖家上线新商品时,能在秒级或者分钟级实现搜索引擎的搜索。
提取-转换-加载(ETL)是一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。因此数据管道支持从一个不断生成数据的源头读取记录,并将它们以低延迟移动到终点。
和周期性 ETL 作业相比,持续数据管道可以明显降低将数据移动到目的端的延迟。此外,由于它能够持续消费和发送数据,因此用途更广,支持用例更多,下图描述了周期性ETL作业和持续数据管道的差异。
Periodic ETL:比如每天凌晨周期性的启动一个Flink ETL Job,读取传统数据库中的数据,然后做ETL,最后写入数据库和文件系统。
Data Pipeline:比如启动一个Flink 实时应用,数据源(比如数据库、Kafka)中的数据不断的通过Flink Data Pipeline流入或者追加到数据仓库(数据库或者文件系统),或者Kafka消息队列。
Data Pipeline 的核心场景类似于数据搬运并在搬运的过程中进行部分数据清洗或者处理,而整个业务架构图的左边是Periodic ETL,它提供了流式ETL 或者实时ETL,能够订阅消息队列的消息并进行处理,清洗完成后实时写入到下游的Database或File system 中。
1-2-Flink安装部署
快速搭建hadoop实验环境
部署操作文档:https://blog.csdn.net/wt334502157/article/details/114916871
Flink支持多种安装模式
- Local—本地单机模式,学习测试时使用
- Standalone—独立集群模式,Flink自带集群,开发测试环境使用
- StandaloneHA—独立集群高可用模式,Flink自带集群,开发测试环境使用
- On Yarn—计算资源统一由Hadoop YARN管理,生产环境使用
Local本地模式
Local本地模式原理
- Flink程序由JobClient进行提交
- JobClient将作业提交给JobManager
- JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager
- TaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成。
- 作业执行完成后,结果将发送回客户端(JobClient)
Local本地模式安装部署
# 本地模式准备一台机器即可
[root@zuoli ~]# cd /opt/
# 下载安装包
[root@zuoli opt]# wget http://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz
[root@zuoli opt]# tar -xf flink-1.12.0-bin-scala_2.12.tgz
[root@zuoli opt]# chown -R root:root /opt/flink-1.12.0
# 改名或创建软链接,软链接好处在于不同版本部署只要删除软连接重建指向即可
[root@zuoli opt]# ln -s flink-1.12.0 flink
[root@zuoli opt]# cd flink
[root@zuoli flink]# bin/start-cluster.sh
# 没有报错则服务已经正常启动
Starting cluster.
Starting standalonesession daemon on host zuoli.
Starting taskexecutor daemon on host zuoli.# 使用jps可以查看到下面两个进程
[root@zuoli flink]# jps -l | grep flink
112245 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
112509 org.apache.flink.runtime.taskexecutor.TaskManagerRunner[root@zuoli flink]# netstat -tnlpu | grep -E "112245|112509"
tcp6 0 0 :::6123 :::* LISTEN 112245/java
tcp6 0 0 :::46733 :::* LISTEN 112245/java
tcp6 0 0 :::8081 :::* LISTEN 112245/java
tcp6 0 0 :::42739 :::* LISTEN 112509/java
tcp6 0 0 :::40181 :::* LISTEN 112509/java
tcp6 0 0 :::38783 :::* LISTEN 112509/java
tcp6 0 0 :::34751 :::* LISTEN 112245/java
# 8081端口为Flink的Web UI页面
Local本地模式使用测试
[root@zuoli flink]# pwd
/opt/flink
[root@zuoli flink]# mkdir input
[root@zuoli flink]# vim input/words.txt
zuoli
zuoli wow tbc wlk
wow tbc wlk
tac wlk
zuoli_666
wangt fs ms
zs dz
zuoli zuoli[root@zuoli flink]# mkdir output
[root@zuoli flink]# bin/flink run examples/batch/WordCount.jar --input input/words.txt --output output/words_result.txt
Job has been submitted with JobID be5567890a32040df02a3af7eae4b323
Program execution finished
Job with JobID be5567890a32040df02a3af7eae4b323 has finished.
Job Runtime: 941 ms
[root@zuoli flink]# cat output/words_result.txt
dz 1
fs 1
ms 1
tac 1
tbc 2
wangt 1
zuoli 4
zuoli_666 1
wlk 3
wow 2
zs 1
文档中统计了源编辑文件中,每个关键词出现的次数统计
通过页面可以看到执行过任务的状态信息
本地模式仅作流程体验,实验完成关闭服务
[root@zuoli ~]# cd /opt/flink
[root@zuoli flink]# bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 112509) on host zuoli.
Stopping standalonesession daemon (pid: 112245) on host zuoli.
本地模式可以部署了解熟悉过程以及原理,应用使用场景相对较低,了解即可
Standalone独立集群模式
Standalone独立集群原理
- client客户端提交任务给JobManager
- JobManager负责申请任务运行所需要的资源并管理任务和资源,
- JobManager分发任务给TaskManager执行
- TaskManager定期向JobManager汇报状态
Standalone独立集群安装部署
集群规划 :
- 服务器: ops01 ( Master + Slave ): JobManager + TaskManager
- 服务器: ops02 ( Slave ): TaskManager
- 服务器: ops03 ( Slave ): TaskManager
ops01,ops02,ops03是3台服务器的hostsname
zuoli@ops01:/home/zuoli >cd /opt/
zuoli@ops01:/opt >wget http://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz
zuoli@ops01:/opt >tar -xf flink-1.12.0-bin-scala_2.12.tgz -C /opt/module/
zuoli@ops01:/opt >cd /opt/module/
zuoli@ops01:/opt/module >ln -s flink-1.12.0 flink
zuoli@ops01:/opt/module >cd flink
zuoli@ops01:/opt/module/flink >vim conf/flink-conf.yaml
jobmanager.rpc.address: ops01
taskmanager.numberOfTaskSlots: 2
web.submit.enable: true
jobmanager.archive.fs.dir: hdfs://ops01:8020/flink/completed-jobs/
historyserver.web.address: ops01
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://ops01:8020/flink/completed-jobs/
zuoli@ops01:/opt/module/flink >vim conf/masters
ops01:8081zuoli@ops01:/opt/module/flink >vim conf/workers
ops01
ops02
ops03# 添加HADOOP_CONF_DIR环境变量
zuoli@ops01:/opt/module/flink >sudo vim /etc/profile
# HADOOP_CONF_DIR
export HADOOP_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop/zuoli@ops01:/opt/module/flink >source /etc/profile
zuoli@ops01:/home/zuoli >scp -r /opt/module/flink-1.12.0 ops02:/opt/module/
zuoli@ops01:/home/zuoli >scp -r /opt/module/flink-1.12.0 ops02:/opt/module/# 在ops02上操作
zuoli@ops02:/home/zuoli >cd /opt/module/
zuoli@ops02:/opt/module >ln -s flink-1.12.0 flink
zuoli@ops02:/opt/module >sudo vim /etc/profile
# HADOOP_CONF_DIR
export HADOOP_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop/
zuoli@ops02:/opt/module >
zuoli@ops02:/opt/module >source /etc/profile# 在ops03上操作
zuoli@ops03:/home/zuoli >cd /opt/module/
zuoli@ops03:/opt/module >ln -s flink-1.12.0 flink
zuoli@ops03:/opt/module >sudo vim /etc/profile
# HADOOP_CONF_DIR
export HADOOP_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop/
zuoli@ops03:/opt/module >
zuoli@ops03:/opt/module >source /etc/profile# 启动集群,在ops01上执行如下命令zuoli@ops01:/home/zuoli >/opt/module/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host ops01.
Starting taskexecutor daemon on host ops01.
Starting taskexecutor daemon on host ops02.
Starting taskexecutor daemon on host ops03.# 启动历史服务器
zuoli@ops01:/home/zuoli >/opt/module/flink/bin/historyserver.sh start
Starting historyserver daemon on host ops01.zuoli@ops01:/home/zuoli >for i in ops01 ops02 ops03 ;do echo $i && ssh $i "jps -l | grep flink";done
ops01
6638 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
6964 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
ops02
45014 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
ops03
5325 org.apache.flink.runtime.taskexecutor.TaskManagerRunnerzuoli@ops01:/home/zuoli >sudo ssh ops01 "netstat -tnlpu | grep -E '6638|6964'"
tcp6 0 0 :::6123 :::* LISTEN 6638/java
tcp6 0 0 :::33645 :::* LISTEN 6964/java
tcp6 0 0 :::8081 :::* LISTEN 6638/java
tcp6 0 0 :::39516 :::* LISTEN 6638/java
tcp6 0 0 :::35677 :::* LISTEN 6964/java
tcp6 0 0 :::41729 :::* LISTEN 6964/java
tcp6 0 0 :::36809 :::* LISTEN 6638/java
zuoli@ops01:/home/zuoli >sudo ssh ops02 " netstat -tnlpu | grep 45014 "
tcp6 0 0 :::33334 :::* LISTEN 45014/java
tcp6 0 0 :::33305 :::* LISTEN 45014/java
tcp6 0 0 :::44328 :::* LISTEN 45014/java
zuoli@ops01:/home/zuoli >sudo ssh ops03 " netstat -tnlpu | grep 5325 "
tcp6 0 0 :::33974 :::* LISTEN 5325/java
tcp6 0 0 :::37444 :::* LISTEN 5325/java
tcp6 0 0 :::43306 :::* LISTEN 5325/java
Standalone独立集群使用测试
zuoli@ops01:/home/zuoli >/opt/module/flink/bin/flink run /opt/module/flink/examples/batch/WordCount.jarExecuting WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 81555215ae9b4067868ada1e53896aa7
Program execution finished
Job with JobID 81555215ae9b4067868ada1e53896aa7 has finished.
Job Runtime: 543 ms
Accumulator Results:
- 96038ddc744b14b0cfedd3b3c0150b57 (java.util.ArrayList) [170 elements](a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
(bare,1)
(be,4)
...
...
# 验证完关闭服务
zuoli@ops01:/opt/module/flink >bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 6964) on host ops01.
Stopping taskexecutor daemon (pid: 45014) on host ops02.
Stopping taskexecutor daemon (pid: 5325) on host ops03.
Stopping standalonesession daemon (pid: 6638) on host ops01.
zuoli@ops01:/home/zuoli >/opt/module/flink/bin/historyserver.sh stop
Standalone独立集群模式可以部署了解熟悉过程以及原理,应用使用场景相对较低,了解即可
StandaloneHA高可用模式
StandaloneHA高可用原理
利用独立集群安装部署升级成HA高可用
从之前的架构中我们可以很明显的发现 JobManager 有明显的单点问题(SPOF,single point of failure)。JobManager 肩负着任务调度以及资源分配,一旦 JobManager 出现意外,其后果可想而知。
在 Zookeeper 的帮助下,一个 Standalone的Flink集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选一个新的 JobManager 来接管 Flink 集群。
StandaloneHA高可用安装部署
集群规划 :
- 服务器: ops01 ( Master + Slave ): JobManager + TaskManager
- 服务器: ops02 ( Slave ): JobManager + TaskManager
- 服务器: ops03 ( Slave ): TaskManager
ops01,ops02,ops03是3台服务器的hostsname
注意和独立集群的角色区别,ops02多了JobManager身份
环境需提前部署ZooKeeper服务
建议预设好Hadoop集群,很大部分实验和代码需要和hadoop集群组件hdfs、yarn等调用
zookeeper信息:
zuoli@ops01:/opt/module/flink/log >zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: followerzuoli@ops02:/home/zuoli >zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: leaderzuoli@ops03:/home/zuoli >zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower
hadoop信息:
zuoli@ops01:/opt/module/flink/log >hadoop version Hadoop 3.1.3 Source code repository https://gitbox.apache.org/repos/asf/hadoop.git -r ba631c436b806728f8ec2f54ab1e289526c90579 Compiled by ztang on 2019-09-12T02:47Z Compiled with protoc 2.5.0 From source with checksum ec785077c385118ac91aadde5ec9799 This command was run using /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-common-3.1.3.jar
注意:利用Standalone独立集群进行改动升级
修改配置文件
zuoli@ops01:/opt/module/flink >pwd
/opt/module/flinkzuoli@ops01:/opt/module/flink >vim conf/flink-conf.yaml# 开启HA,使用文件系统作为快照存储
state.backend: filesystem# 启用检查点,可以将快照保存到HDFS
state.checkpoints.dir: hdfs://ops01:8020/flink-checkpoints# 使用zookeeper搭建高可用
high-availability: zookeeper# 存储JobManager的元数据到HDFS
high-availability.storageDir: hdfs://ops01:8020/flink/ha/# 配置ZK集群地址
high-availability.zookeeper.quorum: ops01:2181,ops02:2181,ops03:2181
修改master信息:
zuoli@ops01:/opt/module/flink >vim conf/masters
ops01:8081
ops02:8081
分发配置文件:
zuoli@ops01:/opt/module/flink >scp -r conf/flink-conf.yaml ops02:/opt/module/flink/conf/
zuoli@ops01:/opt/module/flink >scp -r conf/flink-conf.yaml ops03:/opt/module/flink/conf/
zuoli@ops01:/opt/module/flink >scp -r conf/masters ops02:/opt/module/flink/conf/
zuoli@ops01:/opt/module/flink >scp -r conf/masters ops03:/opt/module/flink/conf/
zuoli@ops01:/opt/module/flink >
在另外一台ops02上操作:
zuoli@ops02:/home/zuoli >cd /opt/module/flink/conf/
zuoli@ops02:/opt/module/flink/conf >vim flink-conf.yaml
jobmanager.rpc.address: ops02
注意之前的flink服务没有关闭则先关闭
/opt/module/flink/bin/stop-cluster.sh
启动flink集群:
zuoli@ops01:/home/zuoli >/opt/module/flink/bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host ops01.
Starting standalonesession daemon on host ops02.
Starting taskexecutor daemon on host ops01.
Starting taskexecutor daemon on host ops02.
Starting taskexecutor daemon on host ops03.zuoli@ops01:/home/zuoli >for i in ops01 ops02 ops03 ;do echo $i && ssh $i "jps -l | grep flink";done
ops01
ops02
ops03
# 通过for循环发现并没有flink相关服务成功启动,没有jps进程信息
问题排查:
查看日志
2022-09-20 16:38:36,343 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Could not start cluster entrypoint StandaloneSessionClusterEntrypoint. org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.at
网上查找报错得知:flink 自1.12版本开始是不带 hadoop 支持包的,在 CLASS_PATH 下缺少了 HDFS 相关的 jar 实现
相关包为:
flink-shaded-hadoop-3
commons-cli
可以通过maven的官方包库去查找下载:https://mvnrepository.com/
zuoli@ops01:/home/zuoli >wget https://repo1.maven.org/maven2/commons-cli/commons-cli/1.4/commons-cli-1.4.jar zuoli@ops01:/home/zuoli >wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3/3.1.1.7.2.1.0-327-9.0/flink-shaded-hadoop-3-3.1.1.7.2.1.0-327-9.0.jar
把包复制到flink的库lib目录下:
zuoli@ops01:/home/zuoli >cp commons-cli-1.4.jar flink-shaded-hadoop-3-3.1.1.7.2.1.0-327-9.0.jar /opt/module/flink
把包分发到其它节点flink的库lib目录下:
zuoli@ops01:/home/zuoli >scp commons-cli-1.4.jar flink-shaded-hadoop-3-3.1.1.7.2.1.0-327-9.0.jar ops02:/opt/module/flink/lib/ zuoli@ops01:/home/zuoli >scp commons-cli-1.4.jar flink-shaded-hadoop-3-3.1.1.7.2.1.0-327-9.0.jar ops03:/opt/module/flink/lib/
启动flink集群,发现仍有问题,再次查看日志:
从日志中查看到
Caused by: java.io.IOException: Could not create FileSystem for highly available storage path (hdfs:/ops01:8020/flink/ha/default)at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103) ~[flink-dist_2.12-1.12.0.jar:1.12.0]at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
初始化还是失败,Could not create FileSystem看上去和flink无关,已经到hadoop的组件级别报错,似乎引用层面上获取不到信息,处理方案:
zuoli@ops01:/opt/module/flink/log >vim /etc/profile export HADOOP_CLASSPATH=`hadoop classpath` zuoli@ops01:/opt/module/flink/log >source /etc/profilezuoli@ops02:/opt/module/flink/log >vim /etc/profile export HADOOP_CLASSPATH=`hadoop classpath` zuoli@ops02:/opt/module/flink/log >source /etc/profilezuoli@ops03:/opt/module/flink/log >vim /etc/profile export HADOOP_CLASSPATH=`hadoop classpath` zuoli@ops03:/opt/module/flink/log >source /etc/profile
再次尝试启动flink集群:
zuoli@ops01:/opt/module/flink/log >/opt/module/flink/bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host ops01.
Starting standalonesession daemon on host ops02.
Starting taskexecutor daemon on host ops01.
Starting taskexecutor daemon on host ops02.
Starting taskexecutor daemon on host ops03.zuoli@ops01:/opt/module/flink/log >for i in ops01 ops02 ops03 ;do echo $i && ssh $i "jps -l | grep flink";done
ops01
17125 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
17516 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
ops02
37520 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
37875 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
ops03
58337 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
从页面查看master
尝试查看ops02:
StandaloneHA高可用使用测试
# 运行wordcount测试jar
zuoli@ops01:/opt/module/flink/log >/opt/module/flink/bin/flink run /opt/module/flink/examples/batch/WordCount.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
...
...
将ops01的服务终止,进行验证flink集群服务是否还可以正常执行任务运行
zuoli@ops01:/opt/module/flink/log >jps -l | grep flink
17125 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
17516 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
zuoli@ops01:/opt/module/flink/log >kill -9 17125
zuoli@ops01:/opt/module/flink/log >jps -l | grep flink
17516 org.apache.flink.runtime.taskexecutor.TaskManagerRunnerzuoli@ops01:/opt/module/flink/log >/opt/module/flink/bin/flink run /opt/module/flink/examples/batch/WordCount.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
...
...
# 服务依然可以正常执行出运行结果
虽然ops01服务中断,页面也已经打不开,但是ops02存活,集群整体还是可以正常的对外提供服务
停止服务:
zuoli@ops01:/opt/module/flink/log >/opt/module/flink/bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 17516) on host ops01.
Stopping taskexecutor daemon (pid: 37875) on host ops02.
Stopping taskexecutor daemon (pid: 58337) on host ops03.
No standalonesession daemon (pid: 17125) is running anymore on ops01.
Stopping standalonesession daemon (pid: 37520) on host ops02.
zuoli@ops01:/opt/module/flink/log >for i in ops01 ops02 ops03 ;do echo $i && ssh $i "jps -l | grep flink";done
ops01
ops02
ops03
Flink On Yarn模式
Flink On Yarn模式原理
在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式
- 优势1.Yarn的资源可以按需使用,提高集群的资源利用率
- 优势2.Yarn的任务有优先级,根据优先级运行作业
- 优势3.基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)
- JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控
- 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器
- Client上传jar包和配置文件到HDFS集群上
- Client向Yarn ResourceManager提交任务并申请资源
- ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager
- ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
- TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务
提交作业两种模式
Session模式介绍
- 特点
- 需要事先申请资源,启动JobManager和TaskManger
- 优点
- 不需要每次递交作业申请资源,而是使用已经申请好的资源,从而提高执行效率
- 缺点
- 作业执行完成以后,资源不会被释放,因此一直会占用系统资源
- 应用场景
- 适合作业递交比较频繁的场景,小作业比较多的场景
Per-Job模式介绍
- 特点
- 每次递交作业都需要申请一次资源
- 优点
- 作业运行完成,资源会立刻被释放,不会一直占用系统资源
- 缺点
- 每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间
- 应用场景
- 适合作业比较少的场景、大作业的场景
Flink On Yarn模式安装部署
在StandaloneHA高可用集群基础之上修改
# 各节点的yarn-site.xml配置文件增加如下配置
zuoli@ops01:/home/zuoli >vim /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml<!-- 关闭yarn内存检查 --><property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value></property><!-- 关闭虚拟内存检查 --><property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value></property>
说明:
- 是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。
- 在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job
重启hadoop集群组件
zuoli@ops01:/home/zuoli >stop-all.sh
zuoli@ops01:/home/zuoli >start-all.sh
也可以单独重启yarn
重启yarn
/opt/module/hadoop/sbin/stop-yarn.sh
/opt/module/hadoop/sbin/start-yarn.sh
启动flink集群
zuoli@ops01:/home/zuoli >/opt/module/flink/bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host ops01.
Starting standalonesession daemon on host ops02.
Starting taskexecutor daemon on host ops01.
Starting taskexecutor daemon on host ops02.
Starting taskexecutor daemon on host ops03.zuoli@ops01:/home/zuoli >for i in ops01 ops02 ops03 ;do echo $i && ssh $i "jps -l | grep flink";done
ops01
72233 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
72656 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
ops02
72836 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
72467 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
ops03
83138 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
Flink On Yarn模式使用测试
Session模式测试
启动flink的Session模式
zuoli@ops01:/home/zuoli >ll /opt/module/flink/bin/yarn-session.sh
-rwxr-xr-x 1 zuoli zuoli 1723 Jun 18 2020 /opt/module/flink/bin/yarn-session.shzuoli@ops01:/home/zuoli >/opt/module/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
...
...
2022-09-21 11:57:36,655 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Session establishment complete on server ops03/11.8.36.76:2181, sessionid = 0x30b2acded4d0005, negotiated timeout = 40000
2022-09-21 11:57:36,657 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager [] - State change: CONNECTED
2022-09-21 11:57:36,942 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/rest_server_lock'}.
JobManager Web Interface: http://ops01:46734
yarn-session.sh -n 2 -tm 800 -s 1 -d 说明:
申请2个CPU、1600M内存
-n 表示申请2个容器,这里指的就是多少个taskmanager
-tm 表示每个TaskManager的内存大小
-s 表示每个TaskManager的slots数量
-d 表示以后台程序方式运行
yarn界面
保持当前控制台运行,复制会话窗口ops01
# 第1次执行WordCount任务
zuoli@ops01:/home/zuoli > /opt/module/flink/bin/flink run /opt/module/flink/examples/batch/WordCount.jar
# 第2次执行WordCount任务
zuoli@ops01:/home/zuoli > /opt/module/flink/bin/flink run /opt/module/flink/examples/batch/WordCount.jar
# 第3次执行WordCount任务
zuoli@ops01:/home/zuoli > /opt/module/flink/bin/flink run /opt/module/flink/examples/batch/WordCount.jar
执行3个wordcount任务,flink任务界面UI可以看到jobs记录
从yarn的任务页面可以看到始终是一个ID为:application_1615531413182_10311的挂起任务处于running
也就相当于3个wordcount任务均为application_1615531413182_10311处理
停止任务:
zuoli@ops01:/home/zuoli >yarn application -kill application_1615531413182_10311
# 或者直接将保持当前控制台运行的任务ctrl+c断开即可
Per-Job分离模式测试
# 直接提交job任务即为Per-Job分离模式
# 第1次执行WordCount任务
zuoli@ops01:/home/zuoli >/opt/module/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /opt/module/flink/examples/batch/WordCount.jar
# 第1次执行WordCount任务
zuoli@ops01:/home/zuoli >/opt/module/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /opt/module/flink/examples/batch/WordCount.jar
# 第1次执行WordCount任务
zuoli@ops01:/home/zuoli >/opt/module/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /opt/module/flink/examples/batch/WordCount.jar
参数说明:
-m jobmanager的地址
-yjm 1024 指定jobmanager的内存信息
-ytm 1024 指定taskmanager的内存信息
1-3-Flink入门开发示例
背景知识介绍与说明
Flink API
Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用起来难度越大
Flink编程模型
Flink 应用程序结构主要包含三部分
- Source
- Transformation
- Sink
代码的数据流结构:
准备工作
创建个maven工程
GroupId
- cn.zuoli
ArtifactId
- flink_test
工程项目pom文件:https://osszuoli.oss-cn-shanghai.aliyuncs.com/flink/pom.xml
log4j配置文件(放置项目resource目录下):
https://osszuoli.oss-cn-shanghai.aliyuncs.com/flink/log4j.properties
在main.java下创建cn.zuoli的包,包下创建一个test类调通开发环境
Flink开发初体验
基于DataSet代码开发
创建WordCount类
WordCount代码:
package cn.zuoli;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {//TODO 0.envExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//TODO 1.sourceDataSet<String> lines = env.fromElements("zuoli hadoop spark", "zuoli hadoop spark", "zuoli hadoop", "zuoli");//TODO 2.transformationDataSet<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//value表示每一行数据String[] arr = value.split(" ");for (String word : arr) {out.collect(word);}}});DataSet<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {//value就是每一个单词return Tuple2.of(value, 1);}});//分组UnsortedGrouping<Tuple2<String, Integer>> grouped = wordAndOne.groupBy(0);//聚合AggregateOperator<Tuple2<String, Integer>> result = grouped.sum(1);//TODO 3.sinkresult.print();}
}
执行效果:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/Administrator/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/Administrator/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
(hadoop,3)
(zuoli,4)
(spark,2)
Process finished with exit code 0
执行结果中有:
(hadoop,3)
(zuoli,4)
(spark,2)提示有:ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
实际不影响实际操作,问题处理:
在resource目录下创建:Log4j2.xml
内容 如下:
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="ERROR"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n" /></Console><RollingFile name="RollingFile" filename="log/test.log"filepattern="${logPath}/%d{YYYYMMddHHmmss}-fargo.log"><PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n" /><Policies><SizeBasedTriggeringPolicy size="10 MB" /></Policies><DefaultRolloverStrategy max="20" /></RollingFile></Appenders><Loggers><Root level="ERROR"><AppenderRef ref="Console" /><AppenderRef ref="RollingFile" /></Root></Loggers> </Configuration>
再次执行任务则不再有报错
基于DataStream代码开发
匿名内部类-处理批
创建类WordCount2
package cn.zuoli;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount2 {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//TODO 1.sourceDataStream<String> lines = env.fromElements("zuoli hadoop spark", "zuoli hadoop spark", "zuoli zuoli", "zuoli");//TODO 2.transformationDataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//value就是每一行数据String[] arr = value.split(" ");for (String word : arr) {out.collect(word);}}});DataStream<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {//value就是一个个单词return Tuple2.of(value, 1);}});KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);//聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1);//TODO 3.sinkresult.print();//TODO 4.execute/启动并等待程序结束env.execute();}
}
匿名内部类-处理流
创建类WordCount3
package cn.zuoli;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount3 {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.sourceDataStream<String> lines = env.socketTextStream("ops01", 9999);//TODO 2.transformationDataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//value就是每一行数据String[] arr = value.split(" ");for (String word : arr) {out.collect(word);}}});DataStream<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {//value就是一个个单词return Tuple2.of(value, 1);}});KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);//聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1);//TODO 3.sinkresult.print();//TODO 4.execute/启动并等待程序结束env.execute();}
}
DataStream-Lambda
创建类WordCount4
package cn.zuoli;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;public class WordCount4 {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//注意:使用DataStream根据数据源自动选择使用流还是批//TODO 1.sourceDataStream<String> lines = env.fromElements("zuoli zuoli zuoli", "zuoli hadoop zuoli", "zuoli hadoop", "zuoli");//TODO 2.transformationSingleOutputStreamOperator<String> words = lines.flatMap((String value, Collector<String> out) -> Arrays.stream(value.split(" ")).forEach(out::collect)).returns(Types.STRING);DataStream<Tuple2<String, Integer>> wordAndOne = words.map((String value) -> Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING,Types.INT));KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);//聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1);//TODO 3.sinkresult.print();//TODO 4.execute/启动并等待程序结束env.execute();}
}
On-Yarn
创建类WordCount5_Yarn
package cn.zuoli;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Arrays;public class WordCount5_Yarn {public static void main(String[] args) throws Exception {ParameterTool parameterTool = ParameterTool.fromArgs(args);String output = "";if (parameterTool.has("output")) {output = parameterTool.get("output");System.out.println("指定了输出路径使用:" + output);} else {output = "hdfs://ops01:8020/wordcount/output47_";System.out.println("可以指定输出路径使用 --output ,没有指定使用默认的:" + output);}//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//TODO 1.sourceDataStream<String> lines = env.fromElements("zuoli hadoop zuoli", "zuoli hadoop zuoli", "zuoli hadoop", "zuoli");//TODO 2.transformationSingleOutputStreamOperator<String> words = lines.flatMap((String value, Collector<String> out) -> Arrays.stream(value.split(" ")).forEach(out::collect)).returns(Types.STRING);DataStream<Tuple2<String, Integer>> wordAndOne = words.map((String value) -> Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);//聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1);//TODO 3.sinkSystem.setProperty("HADOOP_USER_NAME", "root");//设置用户名result.writeAsText(output + System.currentTimeMillis()).setParallelism(1);//TODO 4.execute/启动并等待程序结束env.execute();}
}
代码打成jar包上传至服务器执行
服务器上执行验证:
[root@ops01 ~]# /opt/module/flink/bin/flink run -Dexecution.runtime-mode=BATCH -m yarn-cluster -yjm 1024 -ytm 1024 -c cn.zuoli.WordCount5_Yarn /root/original-flink_test-1.0-SNAPSHOT.jar --output hdfs://ops01:8020/wordcount/output_20220922
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
指定了输出路径使用:hdfs://ops01:8020/wordcount/output_20220922
2022-09-22 17:35:27,081 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/opt/module/flink-1.12.0/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2022-09-22 17:35:27,131 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at ops02/11.8.36.63:8032
2022-09-22 17:35:27,308 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2022-09-22 17:35:27,463 INFO org.apache.hadoop.conf.Configuration [] - resource-types.xml not found
2022-09-22 17:35:27,464 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils [] - Unable to find 'resource-types.xml'.
2022-09-22 17:35:27,511 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured JobManager memory is 1024 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (2048 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 1024 MB may not be used by Flink.
2022-09-22 17:35:27,511 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured TaskManager memory is 1024 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (2048 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 1024 MB may not be used by Flink.
2022-09-22 17:35:27,511 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=1024, slotsPerTaskManager=2}
2022-09-22 17:35:27,593 INFO org.apache.hadoop.conf.Configuration.deprecation [] - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2022-09-22 17:35:30,091 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1615531413182_10336
2022-09-22 17:35:30,127 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1615531413182_10336
2022-09-22 17:35:30,127 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated
2022-09-22 17:35:30,129 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
2022-09-22 17:35:35,919 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2022-09-22 17:35:35,920 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface ops01:43333 of application 'application_1615531413182_10336'.
Job has been submitted with JobID 30770a4ad3862ac3154c7b667426e7ad
Program execution finished
Job with JobID 30770a4ad3862ac3154c7b667426e7ad has finished.
Job Runtime: 11073 ms[root@ops01 ~]# hdfs dfs -ls /wordcount
Found 3 items
drwxr-xr-x - zuoli supergroup 0 2022-09-20 15:01 /wordcount/input
drwxr-xr-x - zuoli supergroup 0 2022-09-20 15:01 /wordcount/output
-rw-r--r-- 3 zuoli supergroup 111 2022-09-22 17:35 /wordcount/output_202209221663839326937
[root@ops01 ~]# hdfs dfs -cat /wordcount/output_202209221663839326937
2022-09-22 17:37:04,788 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
(zuoli,1)
(hadoop,1)
(zuoli,2)
(zuoli,3)
(hadoop,2)
(zuoli,4)
(zuoli,5)
(hadoop,3)
(zuoli,6)
1-4-Flink原理
Flink角色
在实际生产中,Flink 都是以集群在运行,在运行的过程中包含了两类进程。
- JobManager:
- 它扮演的是集群管理者的角色,负责调度任务、协调 checkpoints、协调故障恢复、收集 Job 的状态信息,并管理 Flink 集群中的从节点 TaskManager。
- TaskManager:
- 实际负责执行计算的 Worker,在其上执行 Flink Job 的一组 Task;TaskManager 还是所在节点的管理员,它负责把该节点上的服务器信息比如内存、磁盘、任务运行情况等向 JobManager 汇报。
- Client:
- 用户在提交编写好的 Flink 工程时,会先创建一个客户端再进行提交,这个客户端就是 Client
Flink任务执行流程
- Client向HDFS上传Flink的Jar包和配置
- Client向Yarn ResourceManager提交任务并申请资源
- ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager
- ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
- TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务
Flink Streaming Dataflow
- Dataflow:Flink程序在执行的时候会被映射成一个数据流模型
- Operator:数据流模型中的每一个操作被称作Operator,Operator分为:Source/Transform/Sink
- Partition:数据流模型是分布式的和并行的,执行中会形成1~n个分区
- Subtask:多个分区任务可以并行,每一个都是独立运行在一个线程中的,也就是一个Subtask子任务
- Parallelism:并行度,就是可以同时真正执行的子任务数/分区数
Operator传递模式
数据在两个operator(算子)之间传递的时候有两种模式:
- One to One模式:
- 两个operator用此模式传递的时候,会保持数据的分区数和数据的排序;保留Source的分区特性,以及分区元素处理的有序性。–类似于Spark中的窄依赖
- Redistributing 模式:
- 这种模式会改变数据的分区数;每个一个operator subtask会根据选择transformation把数据发送到不同的目标subtasks,比如keyBy()会通过hashcode重新分区,broadcast()和rebalance()方法会随机重新分区。–类似于Spark中的宽依赖
Operator Chain
客户端在提交任务的时候会对Operator进行优化操作,能进行合并的Operator会被合并为一个Operator,
合并后的Operator称为Operator chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行–就是SubTask。
TaskSlot And Slot Sharing
TaskSlot(任务槽)
每个TaskManager是一个JVM的进程, 为了控制一个TaskManager(worker)能接收多少个task,Flink通过Task Slot来进行控制。TaskSlot数量是用来限制一个TaskManager工作进程中可以同时运行多少个工作线程,TaskSlot 是一个 TaskManager 中的最小资源分配单位,一个 TaskManager 中有多少个 TaskSlot 就意味着能支持多少并发的Task处理。
Flink将进程的内存进行了划分到多个slot中,内存被划分到不同的slot之后可以获得如下好处:
- TaskManager最多能同时并发执行的子任务数是可以通过TaskSolt数量来控制的
- TaskSolt有独占的内存空间,这样在一个TaskManager中可以运行多个不同的作业,作业之间不受影响。
Slot Sharing(槽共享)
Flink允许子任务共享插槽,即使它们是不同任务(阶段)的子任务(subTask),只要它们来自同一个作业。
比如图左下角中的map和keyBy和sink 在一个 TaskSlot 里执行以达到资源共享的目的。
允许插槽共享有两个主要好处:
- 资源分配更加公平,如果有比较空闲的slot可以将更多的任务分配给它。
- 有了任务槽共享,可以提高资源的利用率。
注意:
slot是静态的概念,是指taskmanager具有的并发执行能力
parallelism是动态的概念,是指程序运行时实际使用的并发能力
Flink运行组件
Flink运行时架构主要包括四个不同的组件,它们会在运行流处理应用程序时协同工作:
- 作业管理器(JobManager):分配任务、调度checkpoint做快照
- 控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager 所控制执行。
- JobManager 会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。
- JobManager 会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。
- JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
- 任务管理器(TaskManager):主要干活的
- Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
- 启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
- 在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
- 资源管理器(ResourceManager):管理分配资源
- 主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是Flink中定义的处理资源单元。
- Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。
- 当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
- 分发器(Dispatcher):方便递交任务的接口,WebUI
- 可以跨作业运行,它为应用提交提供了REST接口。
- 当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。
- Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。
- Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。
- 可以跨作业运行,它为应用提交提供了REST接口。
Flink执行图
由Flink程序直接映射成的数据流图是StreamGraph,也被称为逻辑流图,因为它们表示的是计算逻辑的高级视图。为了执行一个流处理程序,Flink需要将逻辑流图转换为物理数据流图(也叫执行图),详细说明程序的执行方式。
Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。
- Flink执行executor会自动根据程序代码生成DAG数据流图
- Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。
- StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。表示程序的拓扑结构。
- JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
- ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
- 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
- StreamGraph
- 最初的程序执行逻辑流程,也就是算子之间的前后顺序–在Client上生成
- JobGraph
- 将OneToOne的Operator合并为OperatorChain–在Client上生成
- ExecutionGraph
- 将JobGraph根据代码中设置的并行度和请求的资源进行并行化规划!–在JobManager上生成
- 物理执行图
- 将ExecutionGraph的并行计划,落实到具体的TaskManager上,将具体的SubTask落实到具体的TaskSlot内进行运行。