Flink 客户端操作命令及可视化工具

Flink提供了丰富的客户端操作来提交任务和与任务进行交互。下面主要从Flink命令行、Scala ShellSQL ClientRestful APIWeb五个方面进行整理。

Flink安装目录的bin目录下可以看到flinkstart-scala-shell.shsql-client.sh等文件,这些都是客户端操作的入口。
[点击并拖拽以移动] ​

flink 常见操作:可以通过 -help 查看帮助

run 运行任务

-d:以分离模式运行作业
-c:如果没有在jar包中指定入口类,则需要在这里通过这个参数指定;
-m:指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager,可以说是yarn集群名称;
-p:指定程序的并行度。可以覆盖配置文件中的默认值;
-s:保存点savepoint的路径以还原作业来自(例如hdfs:///flink/savepoint-1537);

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID dce7b69ad15e8756766967c46122736f

就可以看到我们提交的JobManager,默认是一个并发。
[点击并拖拽以移动] ​

点进去就可以看到详细的信息
[点击并拖拽以移动] ​

点击左侧TaskManager —Stdout能看到具体输出的日志信息。
[点击并拖拽以移动] ​

或者查看TaskManager节点的log目录下的*.out文件,也能看到具体的输出信息。
[点击并拖拽以移动] ​

list 查看任务列表

-mjobmanager<arg>作业管理器(主)的地址连接。

[root@hadoop1 flink-1.10.1]# bin/flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
09.07.2020 16:44:09 : dce7b69ad15e8756766967c46122736f : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

Stop 停止任务

需要指定jobmanagerip:protjobId。如下报错可知,一个job能够被stop要求所有的source都是可以stoppable的,即实现了 StoppableFunction接口。

[root@hadoop1 flink-1.10.1]# bin/flink stop -m 127.0.0.1:8081 dce7b69ad15e8756766967c46122736f
Suspending job "dce7b69ad15e8756766967c46122736f" with a savepoint.------------------------------------------------------------The program finished with the following exception:org.apache.flink.util.FlinkException: Could not stop with a savepoint job "dce7b69ad15e8756766967c46122736f".at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)

StoppableFunction接口如下,属于优雅停止任务。

 /*** @Description 需要 stoppabel 的函数必须实现此接口,例如流式任务 source**               stop() 方法在任务收到 stop信号的时候调用*               source 在接收到这个信号后,必须停止发送新的数据优雅的停止。* @Date 2020/7/9 17:26*/@PublicEvolvingpublic interface StoppableFunction {/*** 停止 source,与 cancel() 不同的是,这是一个让 source优雅停止的请求。* 等待中的数据可以继续发送出去,不需要立即停止*/void stop();
}

Cancel 取消任务

如果在conf/flink-conf.yaml里面配置state.savepoints.dir,会保存savepoint,否则不会保存savepoint。(重启)

state.savepoints.dir: file:///tmp/savepoint

执行 Cancel命令 取消任务

[root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s e8ce0d111262c52bf8228d5722742d47
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job e8ce0d111262c52bf8228d5722742d47 with savepoint to default savepoint directory.
Cancelled job e8ce0d111262c52bf8228d5722742d47. Savepoint stored in file:/tmp/savepoint/savepoint-e8ce0d-f7fa96a085d8.

也可以在停止的时候显示指定savepoint目录

1 [root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint f58bb4c49ee5580ab5f27fdb24083353
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job f58bb4c49ee5580ab5f27fdb24083353 with savepoint to /tmp/savepoint.
Cancelled job f58bb4c49ee5580ab5f27fdb24083353. Savepoint stored in file:/tmp/savepoint/savepoint-f58bb4-127b7e84910e.

取消和停止(流作业)的区别如下:
cancel()调用, 立即调用作业算子的cancel()方法,以尽快取消它们。如果算子在接到cancel()调用后没有停止,Flink将开始定期中断算子线程的执行,直到所有算子停止为止。
stop()调用 ,是更优雅的停止正在运行流作业的方式。stop()仅适用于source实现了StoppableFunction接口的作业。当用户请求停止作业时,作业的所有source都将接收stop()方法调用。直到所有source正常关闭时,作业才会正常结束。这种方式,使 作业正常处理完所有作业。

触发 savepoint

当需要生成savepoint文件时,需要手动触发savepoint。如下,需要指定正在运行的 JobID 和生成文件的存放目录。同时,我们也可以看到它会返回给用户存放的savepoint的文件名称等信息。

 [root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar Executing TopSpeedWindowing example with default input data set.Use --input to specify file input.Printing result to stdout. Use --output to specify output path.Job has been submitted with JobID 216c427d63e3754eb757d2cc268a448d[root@hadoop1 flink-1.10.1]# bin/flink savepoint -m 127.0.0.1:8081 216c427d63e3754eb757d2cc268a448d /tmp/savepoint/Triggering savepoint for job 216c427d63e3754eb757d2cc268a448d.Waiting for response...Savepoint completed. Path: file:/tmp/savepoint/savepoint-216c42-154a34cf6bfdYou can resume your program from this savepoint with the run command.

savepointcheckpoint的区别:
checkpoint是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;savepoint是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。
checkpoint是作业failover的时候自动使用,不需要用户指定。savepoint一般用于程序的版本更新,bug修复,A/B Test等场景,需要用户指定。

从指定 savepoint 中启动

[root@hadoop1 flink-1.10.1]# bin/flink run -d -s /tmp/savepoint/savepoint-f58bb4-127b7e84910e/ examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 1a5c5ce279e0e4bd8609f541b37652e2

查看JobManager的日志能够看到Reset the checkpoint ID为我们指定的savepoint文件中的ID
[点击并拖拽以移动] ​

modify 修改任务并行度

这里修改masterconf/flink-conf.yamltask slot数修改为4。并通过xsync分发到 两个slave节点上。

taskmanager.numberOfTaskSlots: 4

修改参数后需要重启集群生效:关闭/启动集群

[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh && bin/start-cluster.sh 
Stopping taskexecutor daemon (pid: 8236) on host hadoop2.
Stopping taskexecutor daemon (pid: 8141) on host hadoop3.
Stopping standalonesession daemon (pid: 22633) on host hadoop1.
Starting cluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop2.
Starting taskexecutor daemon on host hadoop3.

启动任务

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 2e833a438da7d8052f14d5433910515a

从页面上能看到Task Slots总计变为了8,运行的Slot1,剩余Slot数量为7
[点击并拖拽以移动] ​

这时候默认的并行度是1
[点击并拖拽以移动] ​

Flink1.0版本命令行flink modify已经没有这个行为了,被移除了。。。Flink1.7上是可以运行的。

[root@hadoop1 flink-1.10.1]# bin/flink modify -p 4 cc22cc3d09f5d65651d637be6fb0a1c3
"modify" is not a valid action.

Info 显示程序的执行计划

[root@hadoop1 flink-1.10.1]# bin/flink info examples/streaming/TopSpeedWindowing.jar 
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------

拷贝输出的json内容,粘贴到这个网站:http://flink.apache.org/visualizer/可以生成类似如下的执行图。

[点击并拖拽以移动] ​

可以与实际运行的物理执行计划进行对比。
[点击并拖拽以移动] ​

SQL Client Beta

进入 Flink SQL

[root@hadoop1 flink-1.10.1]# bin/sql-client.sh embedded

Select查询,按Q退出如下界面;

Flink SQL> select 'hello word';SQL Query Result (Table)Table program finished.                                                                                       Page: Last of 1                                                                                         Updated: 16:37:04.649EXPR$0hello wordQ Quit                                         + Inc Refresh                                  G Goto Page                                    N Next Page                                    O Open Row
R Refresh                                      - Dec Refresh                                  L Last Page                                    P Prev Page

打开http://hadoop1:8081能看到这条select语句产生的查询任务已经结束了。这个查询采用的是读取固定数据集的Custom Source,输出用的是Stream Collect Sink,且只输出一条结果。
[点击并拖拽以移动] ​

[点击并拖拽以移动] ​

explain 查看 SQL 的执行计划。

Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
== Abstract Syntax Tree ==         //抽象语法树
LogicalAggregate(group=[{0}], cnt=[COUNT()])
+- LogicalValues(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])== Optimized Logical Plan ==      //优化后的逻辑执行计划
GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
+- Exchange(distribution=[hash[name]])+- Values(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])== Physical Execution Plan ==    //物理执行计划
Stage 13 : Data Sourcecontent : Source: Values(tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])Stage 15 : Operatorcontent : GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])ship_strategy : HASH

结果展示

SQL Client支持两种模式来维护并展示查询结果:

table mode

在内存中物化查询结果,并以分页table形式展示。用户可以通过以下命令启用table mode:例如如下案例;

Flink SQL> SET execution.result-mode=table;
[INFO] Session property has been set.Flink SQL>  SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;SQL Query Result (Table)Table program finished.                                                                                       Page: Last of 1                                                                                         Updated: 16:55:08.589name                       cntAlice                         1Greg                         1Bob                         2Q Quit                                         + Inc Refresh                                  G Goto Page                                    N Next Page                                    O Open Row
R Refresh                                      - Dec Refresh                                  L Last Page                                    P Prev Page

​ [点击并拖拽以移动] ​​

​ [点击并拖拽以移动] ​​

changelog mode

不会物化查询结果,而是直接对continuous query产生的添加和撤回retractions结果进行展示:如下案例中的-表示撤回消息

Flink SQL> SET execution.result-mode=changelog;
[INFO] Session property has been set.Flink SQL>  SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;SQL Query Result (Changelog)Table program finished.                                                                                                                                                                                               Updated: 16:58:05.777+/-                      name                       cnt+                       Bob                         1+                     Alice                         1+                      Greg                         1-                       Bob                         1+                       Bob                         2Q Quit                                                                        + Inc Refresh                                                                 O Open Row
R Refresh                                                                     - Dec Refresh

​ [点击并拖拽以移动] ​​

​ [点击并拖拽以移动] ​​

Environment Files

CREATE TABLE 创建表DDL语句:

Flink SQL> CREATE TABLE pvuv_sink (
>     dt VARCHAR,
>     pv BIGINT,
>     uv BIGINT
> ) ;
[INFO] Table has been created.

SHOW TABLES 查看所有表名

Flink SQL>  show tables;
pvuv_sink

DESCRIBE 表名 查看表的详细信息;

Flink SQL>  describe pvuv_sink;
root|-- dt: STRING|-- pv: BIGINT|-- uv: BIGINT

插入等操作均与关系型数据库操作语句一样,省略N个操作

Restful API

接下来我们演示如何通过rest api来提交jar包和执行任务。
[点击并拖拽以移动] ​

通过Show Plan可以看到执行图
[点击并拖拽以移动] ​

提交之后的操作,取消的话点击页面的Cancel Job

​ [点击并拖拽以移动] ​​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/382567.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ySQL挑战搭建一个简易的成绩管理系统的数据库

文章为自己搜索网上资源&#xff0c;再在这里进行整理&#xff0c;所以标注为转载 [实验步骤](https://www.shiyanlou.com/courses/reports/1347700) 总结做实验注意事项&#xff1a; 1.添加主键 2.主键和外键的关系 3.注意自增的书写添加 mysql 如何修改、添加、删除表主键…

网络之DNS协议图解

DNS是计算机域名系统 (Domain Name System) 域名系统采用类似目录树的等级结构。 域名服务器是指保存有该网络中所有主机的域名和对应IP地址&#xff0c;并具有将域名转换为IP地址功能的服务器。 域名服务器为客户机/服务器模式中的服务器方&#xff0c;它主要有两种形式&am…

网络之ICMP协议

ICMP 主要功能&#xff1a; 确认IP包是否成功送达目标地址通知在发送过程当中IP包被废弃的具体原因改善网络设置等 在IP通信中如果某个IP包因为某种原因未到达目标地址&#xff0c;那么这个原因由ICMP通知。 过程&#xff08;图解TCP/IP&#xff09; ICMP类型 常见的&am…

网络之NAT协议

由来&#xff1a; 2011年2月3日中国农历新年&#xff0c; IANA对外宣布&#xff1a;IPv4地址空间最后5个地址块已经被分配给下属的5个地区委员会。2011年4月15日&#xff0c;亚太区委员会APNIC对外宣布&#xff0c;除了个别保留地址外&#xff0c;本区域所有的IPv4地址基本耗尽…

排序稳定性的意义

首先&#xff0c;为什么会有排序算法稳定性的说法&#xff1f;只要能排好不就可以了吗&#xff1f; 看例子 第1行是数字2 记作 1 2 第2行是数字4 记作 2 4 第3行是数字2 记作 3 2 排序后的结果&#xff08;如果看不懂命令的意思&#xff0c;参照这个博客&#xff09; 那么引入…

本能富可敌国,最后却选择拯救世界!Bram的Vim和乌干达儿童

他本能富可敌国&#xff0c;最后却选择拯救世界 在命令行界面输入vim会出现一堆文件&#xff0c;但是一直有这么一句话 Help poor children in Uganda! “帮助可怜的乌干达儿童” 查询了一下这里面相关的历史背景和知识 在Vim许可证文件结束后的部分翻译 &#xff0d;如果…

Linux基础查漏补缺

文章目录第二遍重新回顾Linux基础查看主机名修改主机名查看IP地址Linux的 “--”和“-”根目录文件的意义和作用alias直接在命令行界面输入firefox数组越界发生什么命令行光标移动的几个操作重定向第二遍重新回顾Linux基础 1.查找忽略的知识点 2.再次记忆一些基础知识 3.巩固基…

linux 常用命令02--文件属性 以及软硬链接

文件属性和用户用户组 通过ls-l 显示文件详细信息 drwxrwxr-x 2 user usergroup 4096 10月 30 20:55 stu1drwxrwxr-x d代表目录文件&#xff0c; -代表普通文件 rwx rwx r-x 归属用户的权限 归属组的权限 其他用户的权限 权限位数字表示法(8进制数…

linux查漏补缺之常用命令

wc命令 -c, --bytes, --chars输出字节统计数。-l, --lines输出换行符统计数。-L, --max-line-length输出最长的行的长度。-w, --words输出单词统计数。grep命令 图解

思维导图:面试小结

文件&#xff1a;思维导图

蒙特卡洛法求圆周率100亿数据

代码 import time import random hits0 pi0 DARTS100000*100000 starttime.perf_counter() for i in range(DARTS):x,yrandom.random(),random.random()distpow(x ** 2y**2,0.5)if dist < 1.0:hits1 pi4*(hits/DARTS) print("圆周率的值是{:.10f}".format(pi)) p…

linux gcc 简单使用记录01

大体编译流程 gcc 参数&#xff1a; I 包含头文件路径 L 包含库文件路径 l 库名 比如libxxx.so 对应着 -lxxx(掐头去尾) O 优化选项 1&#xff0c;3 W 警告 all 显示更多的 c 编译成 .o 文件&#xff08;二进制&#xff09; E 输出到标准输出&#xff0c;宏替换&#xff0c…

Ubuntu 18的中文界面切换《图解教程》亲测成功

然后找到Chinese simple 把汉语挪到第一行

linux gcc 制作动态库

编译与位置无关的代码&#xff0c;生成.o&#xff0c;关键参数 -fPIC createlibso目录下 ├── cheng.c ├── chu.c ├── head │ └── test.h ├── jia.c └── jian.cgcc -fPIC -c *.c -I ./head在createlibso目录下生成 与位置无关的.o文件 ├── cheng.c ├…

Ubuntu的中文是哪种字体?python的词云分析和 三国演义人物出场统计

Ubuntu的默认中文是哪种呢&#xff1f; fc-list :langzh 用这个命令查看出来 NotoSerifCJK-Bold.ttc 为什么要知道这个呢&#xff1f; 来看一块python3代码 import jieba import wordcloudf open("threekingdom.txt","rb") t f.read() f.close() ls …

linux 系统课程-进程控制01

进程的状态转化 进程拥有四种状态&#xff08;切换&#xff09;&#xff0c;他们之间的关系如图 运行 挂起 终止 就绪内存管理单元 MMU &#xff08;Memory Management Unit&#xff09; mmu 负责 a. 虚拟内存与物理内存的映射 b. 设置内存的访问级别 pcb 进程控制块 环…

linux c++ 多进程初步01

fork函数 fork函数 ps ajx 这个命令可以查看进程与进程之间的血缘关系 kill 给进程发送一个信号SIGKILL 9号信号kill -SIGKILL pik 杀死进程进程共享 子进程会复制父进程的几乎所有信息&#xff1a;子进程复制父进程用户空间所有数据&#xff1b; 子进程复制父进程内核空间P…

Ubuntu怎么设置桌面快捷方式(图片详解)

然后找到你要的copy然后到桌面&#xff0c;点开然后允许权限&#xff01;就搞定了

linux C++ 多进程初步02

ps:疑惑的地方&#xff0c;1 进程pcb的概念&#xff0c; 还有 ulimit -a 显示的信息 是一个进程可以最大占用资源的上限吗&#xff1f; 还有 文件描述符的概念&#xff1f;&#xff1f; 这里不是很明白&#xff01;记录一下2还有WIFEXITED 孤儿进程 与僵尸进程 孤儿进程&#…

软件工程学习笔记《一》什么是软件工程

文章目录软件工程学习笔记目录软件工程过程软件工程方法软件质量软件质量如何评价软件的质量模型ISO9126模型易用性&#xff1a;效率可维护性可移植性为什么内存缓冲区是2048或4096软件工程学习笔记目录 [https://blog.csdn.net/csdn_kou/article/details/83754356] 单纯摆出一…