Flink编程入门(二)

Flink 有三种部署模式,分别是 LocalStandalone Cluster Yarn Cluster

 

1.1. Local模式

 

对于 Local 模式来说,JobManager TaskManager 会公用一个 JVM 来完成 Workload。如果要验证一个简单的应用,Local 模式是最方便的。实际应用中大多使用 Standalone 或者 Yarn Cluster,而local模式只是将安装包解压启动(./bin/start-local.sh)即可,在这里不在演示。

1.2. Standalone 模式 

1.2.1. 下载

 

安装包下载地址:http://flink.apache.org/downloads.html

 

快速入门教程地址:

 

https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html

 

 

1.2.2. 上传安装包到linux系统

 

使用rz命令

 

1.2.3. 解压

 

tar –zxvf flink-1.3.2-bin-hadoop26-scala_2.10.tgz

 

1.2.4. 重命名

 

mv flink-1.3.2 flink

 

1.2.5. 修改环境变量

 

切换到root用户配置

 

export FLINK_HOME=/home/hadoop/flink

 

export PATH=$PATH:$FLINK_HOME/bin

 

配置结束后切换会普通用户

 

source /etc/profile

 

1.2.6. 修改配置文件

 

修改flink/conf/masters

 

master1:8081

 

修改flink/conf/slaves

 

master1ha

master2

master2ha

 

修改flink/conf/flink-conf.yaml

taskmanager.numberOfTaskSlots: 2

 jobmanager.rpc.address: master1

 

1.2.7. 启动flink

 

/home/Hadoop/flink/bin/start-cluster.sh

 

1.2.8. Flink Rest API

Flink 和其他大多开源的框架一样,提供了很多有用的 Rest API。不过 Flink RestAPI,目前还不是很强大,只能支持一些 Monitor 的功能。Flink Dashboard 本身也是通过其 Rest 来查询各项的结果数据。在 Flink RestAPI 基础上,可以比较容易的将 Flink Monitor 功能和其他第三方工具相集成,这也是其设计的初衷。

 

Flink 的进程中,是由 JobManager 来提供 Rest API 的服务。因此在调用 Rest 之前,要确定 JobManager 是否处于正常的状态。正常情况下,在发送一个 Rest 请求给 JobManager 之后,Client 就会收到一个 JSON 格式的返回结果。由于目前 Rest 提供的功能还不多,需要增强这块功能的读者可以在子项目 flink-runtime-web 中找到对应的代码。其中最关键一个类 WebRuntimeMonitor,就是用来对所有的 Rest 请求做分流的,如果需要添加一个新类型的请求,就需要在这里增加对应的处理代码。下面我例举几个常用 Rest API

 

1.查询 Flink 集群的基本信息: /overview。示例命令行格式以及返回结果如下:

 

$ curl http://localhost:8081/overview

 

{"taskmanagers":1,"slots-total":16,

 

"slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}

 

2.查询当前 Flink 集群中的 Job 信息:/jobs。示例命令行格式以及返回结果如下:

 

$ curl http://localhost:8081/jobs

 

{"jobs-running":[],"jobs-finished":

 

["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}

 

3.查询一个指定的 Job 信息: /jobs/jobid。这个查询的结果会返回特别多的详细的内容,这是我在浏览器中进行的测试,如下图:

 

想要了解更多 Rest 请求内容的读者,可以去 Apache Flink 的页面中查找。

 

1.2.9. 运行测试任务

 

./bin/flink run -m master1:8082 ./examples/batch/WordCount.jar --input hdfs://master1:9000/words.txt --output hdfs://master1:9000/clinkout

 

1.3. Flink HA

 

首先,我们需要知道 Flink 有两种部署的模式,分别是 Standalone 以及 Yarn Cluster 模式。对于 Standalone 来说,Flink 必须依赖于 Zookeeper 来实现 JobManager HAZookeeper 已经成为了大部分开源框架 HA 必不可少的模块)。在 Zookeeper 的帮助下,一个 Standalone Flink 集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选举新的 JobManager 来接管 Flink 集群。

 

对于 Yarn Cluaster 模式来说,Flink 就要依靠 Yarn 本身来对 JobManager HA 了。其实这里完全是 Yarn 的机制。对于 Yarn Cluster 模式来说,JobManager TaskManager 都是被 Yarn 启动在 Yarn Container 中。此时的 JobManager,其实应该称之为 Flink Application Master。也就说它的故障恢复,就完全依靠着 Yarn 中的 ResourceManager(和 MapReduce AppMaster 一样)。由于完全依赖了 Yarn,因此不同版本的 Yarn 可能会有细微的差异。这里不再做深究。

 

1.3.1. 修改配置文件

 

修改flink-conf.yaml

 

state.backend: filesystem

 

state.backend.fs.checkpointdir: hdfs://master1:9000/flink-checkpoints

 

high-availability: zookeeper

 

high-availability.storageDir: hdfs://master1:9000/flink/ha/

high-availability.zookeeper.quorum: master1ha:2181,master2:2181,master2ha:2181

high-availability.zookeeper.client.acl: open

修改conf

server.1=master1ha:2888:3888

server.2=master2:2888:3888 

server.3=master2ha:2888:3888

 

修改masters

 

master1:8082

 

master1ha:8082

 

修改slaves

 

master1ha

 

master2

 

master2ha

 

1.3.2. 启动

 

/home/Hadoop/flink/bin/start-cluster.sh

 

1.4. Yarn Cluster 模式

 

1.4.1. 引入

 

在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的 Workload。因此 Flink 也支持在 Yarn 上面运行。首先,让我们通过下图了解下 Yarn Flink 的关系。

 

在图中可以看出,Flink Yarn 的关系与 MapReduce Yarn 的关系是一样的。Flink 通过 Yarn 的接口实现了自己的 App Master。当在 Yarn 中部署了 FlinkYarn 就会用自己的 Container 来启动 Flink JobManager(也就是 App Master)和 TaskManager

 

1.4.2. 修改环境变量

 

export HADOOP_CONF_DIR= /home/hadoop/hadoop/etc/hadoop

 

1.4.3. 部署启动

 

yarn-session.sh -d -s 2 -tm 800 -n 2

 

上面的命令的意思是,同时向Yarn申请3container,其中 2 Container 启动 TaskManager-n 2),每个 TaskManager 拥有两个 Task Slot-s 2),并且向每个 TaskManager Container 申请 800M 的内存,以及一个ApplicationMasterJob Manager)。

 

 

 

Flink部署到Yarn Cluster后,会显示Job Manager的连接细节信息。

 

Flink on Yarn会覆盖下面几个参数,如果不希望改变配置文件中的参数,可以动态的通过-D选项指定,如 -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368

 

jobmanager.rpc.address:因为JobManager会经常分配到不同的机器上

 

taskmanager.tmp.dirs:使用Yarn提供的tmp目录

 

parallelism.default:如果有指定slot个数的情况下

 

yarn-session.sh会挂起进程,所以可以通过在终端使用CTRL+C或输入stop停止yarn-session

 

如果不希望Flink Yarn client长期运行,Flink提供了一种detached YARN session,启动时候加上参数-d—detached

 

 

 

在上面的命令成功后,我们就可以在 Yarn Application 页面看到 Flink 的纪录。如下图。

 

如果在虚拟机中测试,可能会遇到错误。这里需要注意内存的大小,Flink Yarn 会申请多个 Container,但是 Yarn 的配置可能限制了 Container 所能申请的内存大小,甚至 Yarn 本身所管理的内存就很小。这样很可能无法正常启动 TaskManager,尤其当指定多个 TaskManager 的时候。因此,在启动 Flink 之后,需要去 Flink 的页面中检查下 Flink 的状态。这里可以从 RM 的页面中,直接跳转(点击 Tracking UI)。这时候 Flink 的页面如图

 

 

yarn-session.sh启动命令参数如下:

 

Usage:  

 

   Required  

 

     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)  

 

   Optional  

 

     -D <arg>                        Dynamic properties  

 

     -d,--detached                   Start detached  

 

     -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB]  

 

     -nm,--name                      Set a custom name for the application on YARN  

 

     -q,--query                      Display available YARN resources (memory, cores)  

 

     -qu,--queue <arg>               Specify YARN queue.  

 

     -s,--slots <arg>                Number of slots per TaskManager  

 

     -st,--streaming                 Start Flink in streaming mode  

 

     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container [in MB]  

 

1.4.4. 提交任务

 

之后,我们可以通过这种方式提交我们的任务

 

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

 

以上命令在参数前加上y前缀,-yn表示TaskManager个数。

 

在这个模式下,同样可以使用-m yarn-cluster提交一个"运行后即焚"detached yarn-yd)作业到yarn cluster

 

 

 

1.4.5. 停止yarn cluster

 

yarn application -kill application_1507603745315_0001

 

转载于:https://www.cnblogs.com/advise09/p/10194917.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/484463.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

7-4 银行排队问题之单队列多窗口加VIP服务 (30 分)

7-4 银行排队问题之单队列多窗口加VIP服务 (30 分) 说实话这道题挺恶心 有意思的&#xff0c;大模拟&#xff0c;主要的思路就是模拟时间轴。 题目描述 假设银行有K个窗口提供服务&#xff0c;窗口前设一条黄线&#xff0c;所有顾客按到达时间在黄线后排成一条长龙。当有窗口…

线程同步-互锁函数

互锁函数&#xff1a; LONG InterlockedIncrement&#xff08;LONG volatile* Addend&#xff09;&#xff1a;递增指定32位变量&#xff0c;增量位1&#xff0c;可以阻止其他线程同时使用此变量&#xff1b;LONG InterlockedDecrement&#xff08;LONG volatile* Addend&#…

李德毅院士:探索新一代人工智能产业发展

来源&#xff1a;主线科技2021年5月14日&#xff0c;主线科技联合中国人工智能学会智驾专委会正式发起成立“新一代人工智能物流创新中心”&#xff0c;并携手福佑卡车开启中国首个干线物流自动驾驶商业项目。在此次活动上&#xff0c;中国工程院院士、CAAI名誉理事长、主线科技…

1.修理牧场

农夫要修理牧场的一段栅栏&#xff0c;他测量了栅栏&#xff0c;发现需要N块木头&#xff0c;每块木头长度为整数L​i个长度单位&#xff0c;于是他购买了一条很长的、能锯成N块的木头&#xff0c;即该木头的长度是Li​​ 的总和。 但是农夫自己没有锯子&#xff0c;请人锯木的…

线程同步-事件内核对象

事件内核对象&#xff1a;两种状态&#xff0c;受信状态与未受信状态&#xff0c;3个成员&#xff1b; nUsageCount&#xff08;使用计数&#xff09;&#xff1b;bManualReset&#xff08;是否人工重置&#xff09;&#xff1b;bSignagled&#xff08;是否受信&#xff09;。基…

yii2.0AR两表联查

首先&#xff0c;建两个关联表。 表一 -- Table structure for article -- ---------------------------- DROP TABLE IF EXISTS article; CREATE TABLE article ( id int(11) NOT NULL AUTO_INCREMENT COMMENT 自增id, new text, t_id int(11) DEFAULT NULL COMMENT 关联id, P…

AI产业链全景图!【物联网智商精选】

来源&#xff1a;ittbankAI产业链全景图最后给大家介绍一款英伟达高性能AI边缘计算超级计算机模块&#xff1a;NVIDIA JETSON TX2Jetson TX2是NIVDIA瞄准人工智能在Jetson TK1和TX1推出后的升级TX2的GPU和CPU都进行了升级&#xff0c;内存增加到了8GB、存储增加到了32GB&#…

7-2 哈夫曼编码 (30 分)

2019.12.15更正Best函数样本数据初始化问题并且对代码添加了注释; 2020.11.17更正&#xff1a;题目说明&#xff1b; 原贴发于2019.11.22 注意&#xff1a;本题不是哈夫曼编码裸题&#xff0c;学习哈夫曼编码的同学不要过度依赖本题算法&#xff0c;只有参考价值&#xff1b; 给…

通用AI咋发展?向大脑学习是条路子

来源&#xff1a;中国科学报作者&#xff1a;郑金武编辑&#xff1a;赵路排版&#xff1a;志海作为代表自然界拥有最通用智能的生物大脑&#xff0c;可以借助低功耗和少量后天数据&#xff0c;实现在复杂环境下执行复杂任务的智能行为。因此&#xff0c;探索生物大脑智能认知的…

Oracle:select into 查询没有记录的解决办法

在数据库编程中&#xff0c;select into 语句可以将数据库的某些值赋值给程序的变量&#xff0c;使用起来非常方便。但很多时候也会遇到查询出来没有记录的情况&#xff0c;这时程序会出错。 可以使用 exception when NO_DATA_FOUND then 但是如merge into using 查不到时可以使…

MFC随机博弈黑白棋

随机博弈黑白棋 随机博弈黑白棋 TxyITxs | 随机博弈黑白棋 | 2019.04.21 摘要 通过随机落子&#xff0c;实现黑白棋的博弈。无任何落子规则&#xff0c;棋子死活与围棋中棋子的死活一致&#xff0c;即存在至少一口气。动态模拟双方博弈&#xff0c;但棋盘无落子位置时停止。…

7-3 喊山 (30 分)

搞了好几天心态的一道题&#xff0c;还是菜&#xff0c;原因就是想复杂了&#xff0c;第一次遇到了内存超限的问题&#xff0c;把邻接矩阵由二维数组换成了vector才过的。 7-3 喊山 (30 分) 喊山&#xff0c;是人双手围在嘴边成喇叭状&#xff0c;对着远方高山发出“喂—喂喂…

章鱼有9个大脑能编辑基因,智商高到无法理解,为何没发展出文明

来源&#xff1a;科学杂志按照进化论的观点&#xff0c;生物演化出脊椎&#xff0c;是发展出更高智商的敲门砖。因为脊椎让生物体内的神经高度集中&#xff0c;反应速度明显加快&#xff0c;并且还有利于大脑向更加高级的结构演化。事实也确实如此&#xff0c;如果盘点世界上最…

Python运算符与编码

while循环 1.基本循环while 条件: 循环体如果条件为真&#xff0c;那么循环则执行如果条件为假&#xff0c;那么循环不执行 while循环代码体现形式while 3>2:print(在人间) num 1  while num<101:  print(num)  num num 1 break 终止  continue 跳出本次循环,…

Windows坐标系统

坐标映射方式是设备环境中的一个重要属性&#xff0c;默认值为MM_TEXT&#xff0c;即左上角为原点&#xff0c;右方为x轴正方向&#xff0c;下方为y轴正方向&#xff0c;这种坐标系使用的单位是像素&#xff0c;其好处是窗口中的每一点的坐标不会因为窗口大小而改变。 映射方法…

7-1 公路村村通 (30 分)

现有村落间道路的统计数据表中&#xff0c;列出了有可能建设成标准公路的若干条道路的成本&#xff0c;求使每个村落都有公路连通所需要的最低成本。 输入格式: 输入数据包括城镇数目正整数N&#xff08;≤1000&#xff09;和候选道路数目M&#xff08;≤3N&#xff09;&#…

华为徐文伟:用数学和系统工程方法推进未来网络研究

来源&#xff1a;华为在2021第五届未来网络发展大会上&#xff0c;来自产业界、学术界、研究机构等领域的专家、行业领袖&#xff0c;围绕网络操作系统、6G通信、网络安全、工业互联网等热点话题&#xff0c;共同探讨新型网络技术的攻关与变革。华为董事、战略研究院院长徐文伟…

Windows框架

#include<Windows.h> #include<tchar.h> #include"resource.h" //全局变量 LPSTR g_MainFrame "主框架"; LPSTR g_ClientFrame "客户区框架"; LPSTR g_ChildFrame[] { "子框架1","子框架2" }; LRESULT CALL…

7-4 最短工期 (25 分)

参考链接&#xff1a;https://blog.csdn.net/tianwei0822/article/details/88642441 一个项目由若干个任务组成&#xff0c;任务之间有先后依赖顺序。项目经理需要设置一系列里程碑&#xff0c;在每个里程碑节点处检查任务的完成情况&#xff0c;并启动后续的任务。现给定一个…

对我国6G早期研究布局的几点建议

来源&#xff1a;赛迪智库众所周知&#xff0c;5G网络技术无法满足2030年及未来的移动通信需求。第六代无线移动通信网络&#xff08;6G&#xff09;将引入全球覆盖、高频谱效率和能源效率、高智能性和安全性等新的性能指标和用例等&#xff0c;以解决快速增长的通信需求。虽然…