flink sql设置并行度_《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍

d2cda2a1bdd9fefa6a787a048b78a0fc.png

前言

之所以写这个是因为前段时间自己的项目出现过这样的一个问题:

Caused by: akka.pattern.AskTimeoutException: 
Ask timed out on [Actor[akka://flink/user/taskmanager_0#15608456]] after [10000 ms]. 
Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalRpcInvocation".

3fe3eb71bf512e60440ea09f66de05b1.png

跟着这问题在 Flink 的 Issue 列表里看到了一个类似的问题:https://issues.apache.org/jira/browse/FLINK-9056 ,看下面的评论差不多就是 TaskManager 的 slot 数量不足的原因,导致 job 提交失败。在 Flink 1.63 中已经修复了变成抛出异常了。

1e896f5685370c2909671c6f47e886e6.png

竟然知道了是因为 slot 不足的原因了,那么我们就要先了解下 slot 是什么东东呢?不过文章这里先介绍下 parallelism。

什么是 parallelism?

338dcde8f8bfcb5bdfcd63d962a2a3fd.png

如翻译这样,parallelism 是并行的意思,在 Flink 里面代表每个任务的并行度,适当的提高并行度可以大大提高 job 的执行效率,比如你的 job 消费 kafka 数据过慢,适当调大可能就消费正常了。

那么在 Flink 中怎么设置并行度呢?

如何设置 parallelism?

f119ccc87c7616adbce85ce539f754d2.png

如上图,在 flink 配置文件中可以查看到默认并行度是 1,

cat flink-conf.yaml | grep parallelism# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1

所以你如何在你的 flink job 里面不设置任何的 parallelism 的话,那么他也会有一个默认的 parallelism = 1。那也意味着你可以修改这个配置文件的默认并行度。

如果你是用命令行启动你的 Flink job,那么你也可以这样设置并行度(使用 -p 并行度):

./bin/flink run -p 10 ../word-count.jar

你也可以通过这样来设置你整个程序的并行度:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);

注意:这样设置的并行度是你整个程序的并行度,那么后面如果你的每个算子不单独设置并行度覆盖的话,那么后面每个算子的并行度就都是这里设置的并行度的值了。

如何给每个算子单独设置并行度呢?

data

如上,就是在每个算子后面单独的设置并行度,这样的话,就算你前面设置了 env.setParallelism(10) 也是会被覆盖的。

这也说明优先级是:算子设置并行度 > env 设置并行度 > 配置文件默认并行度

并行度讲到这里应该都懂了,下面 zhisheng 就继续跟你讲讲 什么是 slot?

什么是 slot?

其实什么是 slot 这个问题之前在第一篇文章 《从0到1学习Flink》—— Apache Flink 介绍 中就介绍过了,这里再讲细一点。

3dc65c98aabb6fd33fbbecb1279ad075.png

图中 Task Manager 是从 Job Manager 处接收需要部署的 Task,任务的并行性由每个 Task Manager 上可用的 slot 决定。每个任务代表分配给任务槽的一组资源,slot 在 Flink 里面可以认为是资源组,Flink 将每个任务分成子任务并且将这些子任务分配到 slot 来并行执行程序。

例如,如果 Task Manager 有四个 slot,那么它将为每个 slot 分配 25% 的内存。 可以在一个 slot 中运行一个或多个线程。 同一 slot 中的线程共享相同的 JVM。 同一 JVM 中的任务共享 TCP 连接和心跳消息。Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。

文字说的比较干,zhisheng 这里我就拿下面的图片来讲解:

a40903f30bc5587d6eb5788f7fd125cd.png

上面图片中有两个 Task Manager,每个 Task Manager 有三个 slot,这样我们的算子最大并行度那么就可以达到 6 个,在同一个 slot 里面可以执行 1 至多个子任务。

那么再看上面的图片,source/map/keyby/window/apply 最大可以有 6 个并行度,sink 只用了 1 个并行。

每个 Flink TaskManager 在集群中提供 slot。 slot 的数量通常与每个 TaskManager 的可用 CPU 内核数成比例。一般情况下你的 slot 数是你每个 TaskManager 的 cpu 的核数。

但是 flink 配置文件中设置的 task manager 默认的 slot 是 1。

4b01539d6b8fc6b088b41e7d18496b69.png

slot 和 parallelism

下面给出官方的图片来更加深刻的理解下 slot:

1、slot 是指 taskmanager 的并发执行能力

48d38fe44a31b6aa14912e747acf1977.png

taskmanager.numberOfTaskSlots:3

每一个 taskmanager 中的分配 3 个 TaskSlot, 3 个 taskmanager 一共有 9 个 TaskSlot。

2、parallelism 是指 taskmanager 实际使用的并发能力

478ea5b40735a156ec7b1b2238a3211f.png

parallelism.default:1

运行程序默认的并行度为 1,9 个 TaskSlot 只用了 1 个,有 8 个空闲。设置合适的并行度才能提高效率。

3、parallelism 是可配置、可指定的

0ead1e071628d80b32619377673b4a8d.png

上图中 example2 每个算子设置的并行度是 2, example3 每个算子设置的并行度是 9。

60a6bf48b861c8c1f5381be9ce0be773.png

example4 除了 sink 是设置的并行度为 1,其他算子设置的并行度都是 9。

好了,既然并行度和 slot zhisheng 都带大家过了一遍了,那么再来看文章开头的问题:slot 资源不够。

问题原因

现在这个问题的答案其实就已经很明显了,就是我们设置的并行度 parallelism 超过了 Task Manager 能提供的最大 slot 数量,所以才会报这个错误。

再来拿我的代码来看吧,当时我就是只设置了整个项目的并行度:

env.setParallelism(15);

为什么要设置 15 呢,因为我项目消费的 Kafka topic 有 15 个 parttion,就想着让一个并行去消费一个 parttion,没曾想到 Flink 资源的不够,稍微降低下 并行度为 10 后就没出现这个错误了。

总结

本文由自己项目生产环境的一个问题来讲解了自己对 Flink parallelism 和 slot 的理解,并告诉大家如何去设置这两个参数,最后也指出了问题的原因所在。

关注我

转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2019/01/14/Flink-parallelism-slot/ , 未经允许禁止转载。

微信公众号:zhisheng

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。

2bce53890b9bce2cc7234b1e9e7a9a9b.png

更多私密资料请加入知识星球!

1b6609937827329b34dc2c4f045c2250.png

Github 代码仓库

https://github.com/zhisheng17/flink-learning/

以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客

相关文章

1、《从0到1学习Flink》—— Apache Flink 介绍

2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

3、《从0到1学习Flink》—— Flink 配置文件详解

4、《从0到1学习Flink》—— Data Source 介绍

5、《从0到1学习Flink》—— 如何自定义 Data Source ?

6、《从0到1学习Flink》—— Data Sink 介绍

7、《从0到1学习Flink》—— 如何自定义 Data Sink ?

8、《从0到1学习Flink》—— Flink Data transformation(转换)

9、《从0到1学习Flink》—— 介绍Flink中的Stream Windows

10、《从0到1学习Flink》—— Flink 中的几种 Time 详解

11、《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch

12、《从0到1学习Flink》—— Flink 项目如何运行?

13、《从0到1学习Flink》—— Flink 写入数据到 Kafka

14、《从0到1学习Flink》—— Flink JobManager 高可用性配置

15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍

16、《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

17、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/355497.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

zabbix3.2监控

自动化运维框架 运维标准流程监控管理容量管理、关联关系、任务管理、自动部署、分布式集群、传统集群、机器管理安全控制灾难管理 自动化监控 监控评估数据采集主动式数据采集: client、公共插件、自定义脚本被动式服务状态: 服务状态、程序状态、用户访问质量第三方信息 公…

linux使用创建es用户,linux用户权限设置(安装elasticsearch7.x)

前言今天下载了elasticsearch的7.x版本,使用bin/elasticsearch -d 启动后,报出如下错误:java.lang.RuntimeException: can not run elasticsearch as rootat org.elasticsearch.bootstrap.Bootstrap.initializeNatives(Bootstrap.java:105)at…

fasttext 文本分类_一文综述经典的深度文本分类方法

作者 | 何从庆转载自AI算法之心(ID:AIHeartForYou)笔者整理最近几年比较经典的深度文本分类方法,希望帮助小伙伴们了解深度学习在文本分类中的应用。Convolutional Neural Networks for Sentence Classification (EMNLP 2014)Kim在EMNLP2014提出的TextCNN方法&…

Java 8 Streams API:对流进行分组和分区

这篇文章展示了如何使用Streams API中可用的Collectors将具有groupingBy的流元素和具有partitioningBy的流元素进行groupingBy 。 考虑一系列Employee对象,每个对象都有名称,城市和销售数量,如下表所示: ----------------------…

vi/vim 编辑器详解

vi/vim : 强大的编辑器 进入vi的命令 vi filename :打开或新建文件,并将光标置于第一行首 vi n filename :打开文件,并将光标置于第n行首 vi filename :打开文件,并将光标置于最后一行首 vi /pattern …

linux 定义快捷命令,Linux系统自定义快捷命令的详细说明

Linux系统用户可以自定义喜欢的快捷键命令。下面由学习啦小编为大家整理了Linux系统自定义快捷键命令的详细说明,希望对大家有帮助!Linux系统自定义快捷命令的详细说明目前总结到的有两种方式,一种是临时快捷键,一种是永久快捷键。Linux系统自…

python求最小公倍数_python求最大公约数和最小公倍数的简单方法

python怎么求最大公约数和最小公倍数 一、求最大公约数 用辗转相除法求最大公约数的算法如下: 两个正整数a和b(a>b),它们的最大公约数等于a除以b的余数c和b之间的最大公约数。比如10和25,25除以10商2余5&#xff0c…

linux如何获取raw中的文件路径,如何使用Linux获取Touchscreen Rawdata的坐标

我们有一个3米的微触摸显示屏.它通过usb连接到我的debian系统,并被识别为人机界面(hid).我正在尝试访问并推送实时信息…如果它被触及我想知道哪里(x,y)并通过netcat管道到另一台主机.不幸的是,我只能使用原始数据cat /dev/input/event2 | hexdump要么evtest你得到的hexcode似乎…

实验三+067+冯艳芳

一、实验目的 掌握黑盒测试用例设计方法 二、实验要求 (1)对被测程序进行黑盒测试用例设计 (2)运用等价类、边界值、决策表、状态图法等进行测试用例设计。 (3)对手机上任意一款音乐软件进行黑盒测试实践。…

python越来越慢_为什么Python中的串联速度越来越慢?

为什么在某些情况下,Python 3中的连接似乎比Python 2中的连接慢? 影响最大的串联方法似乎是字节对象的连续串联,从O(n)到O(n?)操作. 我的分析代码大部分在这里: #!/usr/bin/env python from operator import concat from sys import version, version_i…

jvm gc策略_IBM JVM调整– gencon GC策略

jvm gc策略本文将向您详细介绍从Java虚拟机(例如HotSpot或JRockit)迁移到IBM JVM时重要的Java堆空间调整注意事项。 此调整建议基于我为我的一个IT客户端执行的最新故障排除和调整任务。 IBM JVM概述 正如您可能从其他文章中看到的那样,IBM …

linux etc 服务启动脚本,linux 服务脚本启动问题

对于使用了 systemd 的系统,所有的 service 服务都会默认转为 systemd 服务之后再由 systemd 来执行,转换之后,你也可以直接使用 systemd 来执行了(它的用户工具就是你用的 systemctl),除非是一些非 service 标准的命令&#xff0…

芬兰高性能图表控件-免费试用并提供技术支持

图表控件对于很多技术研发人员、工程设计师来说肯定不陌生,但市面上已有的图表控件产品大多功能单一、性能也不稳定,很难满足不同人群在不同场合的使用需求。为此,专注于开发高性能和最先进的数据可视化工具公司Arction则给出了完美的解决方案…

linux open函数_Linux驱动开发 / 字符设备驱动内幕 (1)

哈喽,我是老吴,继续记录我的学习心得。一、保持专注的几个技巧将最重要的事放在早上做。待在无干扰环境下,比如图书馆。意识到刚坐下开始投入工作前,有点负面小情绪是特别正常的现象。让“开心一刻”成为计划的一部分。拥有合情合…

xftp 无法连接linux 22端口,解决Xshell不从22端口连接服务器

PL/SQL Developer主数据库连接和窗口连接切换Oracle开发者估计对PL/SQL Developer都非常熟悉了,里面有些小的功能点大概还有些初学者没发现.PL/SQL Developer支持多连接多窗口,下面详细说说. 主连接的概念 打开PL ...4 多表代替密码之Hill 密码 2实现该解密方法的KEY 不…

bzoj4484[JSOI2015]最小表示

题意 给出一张DAG,要求删除尽量多的边使得连通性不变.(即:若删边前u到v有路径,则删边后仍有路径).点数30000,边数100000. 分析 如果从u到v有(u,v)这条边,且从u到v只有这一条路径,那么这条边必须保留.否则这条边一定可以删除.因为如果有不止一条路径从u到v,必然存在点x(x!u,x!v)…

Enterprise Spring示例和集成测试

我的博客中的空白更长,因为我正在忙着写《 Pivotal认证的Spring企业集成专家考试–学习指南》 。 这本书是沉重的例子。 幸运的是,Apress同意开源所有这些示例。 因此, 在此GitHub存储库中有大量可用的Spring示例 。 总共146个带有集成测试的…

github删除文件_github 仓库中删除历史大文件

问题如果git中提交了大文件,而且保存到了版本库中,那在下载或者克隆git包的时候,速度会非常慢。再加上github在国内访问本来就很慢,可能会导致包无法下载(克隆)。为了提升下载(克隆)速度,可以永久的删除这些文件(包括该…

linux plc编程软件,基于Linux平台的可编程控制器软PLC设计

实例下面以一个简单的对3并口通道循环控制为例,说明软PLC 的工作流程。(1)梯形图编程。从软PLC 主界面进入后,启动梯形图编程,调用梯形图编程的主程序。梯形图编程共需要调用梯形图界面模块、关闭模块、IO 模块,这些均在配置文件中…

django的ModelForm

一、ModelForm 二、Ajax 原生 jQuery 伪Ajax操作 三、文件上传(预览) - Form提交(会刷新页面) - Ajax上传文件(推荐) 四、图片验证码(跟session配合) 五、CKEditor,UEEditor,TinyEdi…