flink sql设置并行度_《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍

d2cda2a1bdd9fefa6a787a048b78a0fc.png

前言

之所以写这个是因为前段时间自己的项目出现过这样的一个问题:

Caused by: akka.pattern.AskTimeoutException: 
Ask timed out on [Actor[akka://flink/user/taskmanager_0#15608456]] after [10000 ms]. 
Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalRpcInvocation".

3fe3eb71bf512e60440ea09f66de05b1.png

跟着这问题在 Flink 的 Issue 列表里看到了一个类似的问题:https://issues.apache.org/jira/browse/FLINK-9056 ,看下面的评论差不多就是 TaskManager 的 slot 数量不足的原因,导致 job 提交失败。在 Flink 1.63 中已经修复了变成抛出异常了。

1e896f5685370c2909671c6f47e886e6.png

竟然知道了是因为 slot 不足的原因了,那么我们就要先了解下 slot 是什么东东呢?不过文章这里先介绍下 parallelism。

什么是 parallelism?

338dcde8f8bfcb5bdfcd63d962a2a3fd.png

如翻译这样,parallelism 是并行的意思,在 Flink 里面代表每个任务的并行度,适当的提高并行度可以大大提高 job 的执行效率,比如你的 job 消费 kafka 数据过慢,适当调大可能就消费正常了。

那么在 Flink 中怎么设置并行度呢?

如何设置 parallelism?

f119ccc87c7616adbce85ce539f754d2.png

如上图,在 flink 配置文件中可以查看到默认并行度是 1,

cat flink-conf.yaml | grep parallelism# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1

所以你如何在你的 flink job 里面不设置任何的 parallelism 的话,那么他也会有一个默认的 parallelism = 1。那也意味着你可以修改这个配置文件的默认并行度。

如果你是用命令行启动你的 Flink job,那么你也可以这样设置并行度(使用 -p 并行度):

./bin/flink run -p 10 ../word-count.jar

你也可以通过这样来设置你整个程序的并行度:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);

注意:这样设置的并行度是你整个程序的并行度,那么后面如果你的每个算子不单独设置并行度覆盖的话,那么后面每个算子的并行度就都是这里设置的并行度的值了。

如何给每个算子单独设置并行度呢?

data

如上,就是在每个算子后面单独的设置并行度,这样的话,就算你前面设置了 env.setParallelism(10) 也是会被覆盖的。

这也说明优先级是:算子设置并行度 > env 设置并行度 > 配置文件默认并行度

并行度讲到这里应该都懂了,下面 zhisheng 就继续跟你讲讲 什么是 slot?

什么是 slot?

其实什么是 slot 这个问题之前在第一篇文章 《从0到1学习Flink》—— Apache Flink 介绍 中就介绍过了,这里再讲细一点。

3dc65c98aabb6fd33fbbecb1279ad075.png

图中 Task Manager 是从 Job Manager 处接收需要部署的 Task,任务的并行性由每个 Task Manager 上可用的 slot 决定。每个任务代表分配给任务槽的一组资源,slot 在 Flink 里面可以认为是资源组,Flink 将每个任务分成子任务并且将这些子任务分配到 slot 来并行执行程序。

例如,如果 Task Manager 有四个 slot,那么它将为每个 slot 分配 25% 的内存。 可以在一个 slot 中运行一个或多个线程。 同一 slot 中的线程共享相同的 JVM。 同一 JVM 中的任务共享 TCP 连接和心跳消息。Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。

文字说的比较干,zhisheng 这里我就拿下面的图片来讲解:

a40903f30bc5587d6eb5788f7fd125cd.png

上面图片中有两个 Task Manager,每个 Task Manager 有三个 slot,这样我们的算子最大并行度那么就可以达到 6 个,在同一个 slot 里面可以执行 1 至多个子任务。

那么再看上面的图片,source/map/keyby/window/apply 最大可以有 6 个并行度,sink 只用了 1 个并行。

每个 Flink TaskManager 在集群中提供 slot。 slot 的数量通常与每个 TaskManager 的可用 CPU 内核数成比例。一般情况下你的 slot 数是你每个 TaskManager 的 cpu 的核数。

但是 flink 配置文件中设置的 task manager 默认的 slot 是 1。

4b01539d6b8fc6b088b41e7d18496b69.png

slot 和 parallelism

下面给出官方的图片来更加深刻的理解下 slot:

1、slot 是指 taskmanager 的并发执行能力

48d38fe44a31b6aa14912e747acf1977.png

taskmanager.numberOfTaskSlots:3

每一个 taskmanager 中的分配 3 个 TaskSlot, 3 个 taskmanager 一共有 9 个 TaskSlot。

2、parallelism 是指 taskmanager 实际使用的并发能力

478ea5b40735a156ec7b1b2238a3211f.png

parallelism.default:1

运行程序默认的并行度为 1,9 个 TaskSlot 只用了 1 个,有 8 个空闲。设置合适的并行度才能提高效率。

3、parallelism 是可配置、可指定的

0ead1e071628d80b32619377673b4a8d.png

上图中 example2 每个算子设置的并行度是 2, example3 每个算子设置的并行度是 9。

60a6bf48b861c8c1f5381be9ce0be773.png

example4 除了 sink 是设置的并行度为 1,其他算子设置的并行度都是 9。

好了,既然并行度和 slot zhisheng 都带大家过了一遍了,那么再来看文章开头的问题:slot 资源不够。

问题原因

现在这个问题的答案其实就已经很明显了,就是我们设置的并行度 parallelism 超过了 Task Manager 能提供的最大 slot 数量,所以才会报这个错误。

再来拿我的代码来看吧,当时我就是只设置了整个项目的并行度:

env.setParallelism(15);

为什么要设置 15 呢,因为我项目消费的 Kafka topic 有 15 个 parttion,就想着让一个并行去消费一个 parttion,没曾想到 Flink 资源的不够,稍微降低下 并行度为 10 后就没出现这个错误了。

总结

本文由自己项目生产环境的一个问题来讲解了自己对 Flink parallelism 和 slot 的理解,并告诉大家如何去设置这两个参数,最后也指出了问题的原因所在。

关注我

转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2019/01/14/Flink-parallelism-slot/ , 未经允许禁止转载。

微信公众号:zhisheng

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。

2bce53890b9bce2cc7234b1e9e7a9a9b.png

更多私密资料请加入知识星球!

1b6609937827329b34dc2c4f045c2250.png

Github 代码仓库

https://github.com/zhisheng17/flink-learning/

以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客

相关文章

1、《从0到1学习Flink》—— Apache Flink 介绍

2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

3、《从0到1学习Flink》—— Flink 配置文件详解

4、《从0到1学习Flink》—— Data Source 介绍

5、《从0到1学习Flink》—— 如何自定义 Data Source ?

6、《从0到1学习Flink》—— Data Sink 介绍

7、《从0到1学习Flink》—— 如何自定义 Data Sink ?

8、《从0到1学习Flink》—— Flink Data transformation(转换)

9、《从0到1学习Flink》—— 介绍Flink中的Stream Windows

10、《从0到1学习Flink》—— Flink 中的几种 Time 详解

11、《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch

12、《从0到1学习Flink》—— Flink 项目如何运行?

13、《从0到1学习Flink》—— Flink 写入数据到 Kafka

14、《从0到1学习Flink》—— Flink JobManager 高可用性配置

15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍

16、《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

17、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/355497.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

zabbix3.2监控

自动化运维框架 运维标准流程监控管理容量管理、关联关系、任务管理、自动部署、分布式集群、传统集群、机器管理安全控制灾难管理 自动化监控 监控评估数据采集主动式数据采集: client、公共插件、自定义脚本被动式服务状态: 服务状态、程序状态、用户访问质量第三方信息 公…

fasttext 文本分类_一文综述经典的深度文本分类方法

作者 | 何从庆转载自AI算法之心(ID:AIHeartForYou)笔者整理最近几年比较经典的深度文本分类方法,希望帮助小伙伴们了解深度学习在文本分类中的应用。Convolutional Neural Networks for Sentence Classification (EMNLP 2014)Kim在EMNLP2014提出的TextCNN方法&…

vi/vim 编辑器详解

vi/vim : 强大的编辑器 进入vi的命令 vi filename :打开或新建文件,并将光标置于第一行首 vi n filename :打开文件,并将光标置于第n行首 vi filename :打开文件,并将光标置于最后一行首 vi /pattern …

实验三+067+冯艳芳

一、实验目的 掌握黑盒测试用例设计方法 二、实验要求 (1)对被测程序进行黑盒测试用例设计 (2)运用等价类、边界值、决策表、状态图法等进行测试用例设计。 (3)对手机上任意一款音乐软件进行黑盒测试实践。…

python越来越慢_为什么Python中的串联速度越来越慢?

为什么在某些情况下,Python 3中的连接似乎比Python 2中的连接慢? 影响最大的串联方法似乎是字节对象的连续串联,从O(n)到O(n?)操作. 我的分析代码大部分在这里: #!/usr/bin/env python from operator import concat from sys import version, version_i…

jvm gc策略_IBM JVM调整– gencon GC策略

jvm gc策略本文将向您详细介绍从Java虚拟机(例如HotSpot或JRockit)迁移到IBM JVM时重要的Java堆空间调整注意事项。 此调整建议基于我为我的一个IT客户端执行的最新故障排除和调整任务。 IBM JVM概述 正如您可能从其他文章中看到的那样,IBM …

芬兰高性能图表控件-免费试用并提供技术支持

图表控件对于很多技术研发人员、工程设计师来说肯定不陌生,但市面上已有的图表控件产品大多功能单一、性能也不稳定,很难满足不同人群在不同场合的使用需求。为此,专注于开发高性能和最先进的数据可视化工具公司Arction则给出了完美的解决方案…

linux open函数_Linux驱动开发 / 字符设备驱动内幕 (1)

哈喽,我是老吴,继续记录我的学习心得。一、保持专注的几个技巧将最重要的事放在早上做。待在无干扰环境下,比如图书馆。意识到刚坐下开始投入工作前,有点负面小情绪是特别正常的现象。让“开心一刻”成为计划的一部分。拥有合情合…

github删除文件_github 仓库中删除历史大文件

问题如果git中提交了大文件,而且保存到了版本库中,那在下载或者克隆git包的时候,速度会非常慢。再加上github在国内访问本来就很慢,可能会导致包无法下载(克隆)。为了提升下载(克隆)速度,可以永久的删除这些文件(包括该…

linux plc编程软件,基于Linux平台的可编程控制器软PLC设计

实例下面以一个简单的对3并口通道循环控制为例,说明软PLC 的工作流程。(1)梯形图编程。从软PLC 主界面进入后,启动梯形图编程,调用梯形图编程的主程序。梯形图编程共需要调用梯形图界面模块、关闭模块、IO 模块,这些均在配置文件中…

insert into语句_入门MySQL——DML语句篇

前言:在上篇文章中,主要为大家介绍的是DDL语句的用法,可能细心的同学已经发现了。本篇文章将主要聚焦于DML语句,为大家讲解表数据相关操作。这里说明下DDL与DML语句的分类,可能有的同学还不太清楚。DDL(Data Definitio…

linux不重启换root密码是什么原因,在Linux下修改和重置root密码的方法(超简单)

刚开始接触linux的人,忘记了root密码可能会不知所措。想找回自己的root密码,但是又不知道方法。其实,只需要简单的几步就可以重置自己的root密码了(找回密码我也不会)1.开机HcQBEm上敲击e,然后编辑选项2.在linux16这一行&#xff…

SCP-bzoj-1019

项目编号:bzoj-1019 项目等级:Safe 项目描述: 戳这里 特殊收容措施: 对于一个hanoi,知道了各种移动操作的优先级,也就确定了方案。可以证明对于盘子数为N的hanoi,任意移动方案都等价于将数目为N…

一键分享手机代码_通过广告路由器指定手机浏览器自动认证WIFI上网 附代码

说说应用过程,下面用手机QQ浏览器为例。在路由器搭建免费WIFI,用户连接免费WIFI后,使用手机QQ浏览器点击打开任意网页即可自动通过认证并上网,有的手机会自动打开认证网页,如果使用其他手机浏览器则自动跳转到引导认证…

netbeans7.4_NetBeans 7.1:创建自定义提示

netbeans7.4我已经在帖子中对我最喜欢的NetBeans提示进行了讨论,这些帖子中包含用于现代化Java代码的七个NetBeans提示和七个不可或缺的NetBeans Java提示 。 这两个帖子中涉及的十四个提示仅占NetBeans支持的“即开即用”提示总数的一小部分。 但是,由于…

linux uboot启动流程分析,uboot启动流程分析

uboot版本为NXP维护的2016.03版本下载地址为http://git.freescale.com/git/...分析uboot的启动流程,需要编译一下uboot,然后打开链接脚本u-boot.lds在u-boot.lds中1 OUTPUT_FORMAT("elf32-littlearm", "elf32-littlearm", "elf…

JavaOne 2015 –第二十版十大收获

我们刚刚在旧金山有了JavaOne的第二十版。 这将是我自2004年以来第十二次参加不间断的系列活动。最大的教训是什么,可以揭示Java的未来。 模块化斗争 自从Java 2007首次提到模块以来,已经花费了将近9年的时间,或者说,直到2016年9…

IOS--文件管理NSFileManager

iOS的沙盒机制。应用仅仅能訪问自己应用文件夹下的文件。iOS不像android。没有SD 卡概念。不能直接訪问图像、视频等内容。iOS应用产生的内容,如图像、文件、缓存内容等都必须存储在自己的沙盒内。默认情况下,每一个沙盒含有3个文件 夹:Docum…

linux修改文件内容_详解5种实用方法---Linux系统清空或删除大文件内容

概述有时我们在处理Linux终端中的文件时,可能要去清除文件的内容,而无需使用任何Linux命令行编辑器打开它。怎么才能实现呢?下面通过几种不同的方式教大家清空文件内容。1.通过重定向到空来清空文件内容使用shell重定向null(不存在的对象)清空…

凯撒密码c语言小写字母,凯撒密码c(c语言编程凯撒密码)

凯撒密码c(c语言编程凯撒密码)2020-05-15 13:09:51共10个回答#include#includeintmain(){charsave[10][30];inta,b,i,j;scanf("%d",&a);for(i0;i能不能说清楚一点,是加密吗?#include#include#defineMAXSIZE81intmain(){charstr[MAXSIZE];inti;intof…