【基础篇】七、Flink核心概念

文章目录

  • 1、并行度
  • 2、并行度的设置
  • 3、算子链
  • 4、禁用算子链
  • 5、任务槽
  • 6、任务槽和并行度的关系

1、并行度

要处理的数据量很多时,可以把一个算子的操作(比如前面demo里的flatMap、sum),"复制"多份到多个节点,数据来了以后可以到任意一个节点执行。即将一个算子任务拆分成多个并行的子任务,再分发到不同的节点上执行,实现真正的并行计算。(好绕口,就是把一个活儿让好几个Task节点共同去做)

在Flink执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行

在这里插入图片描述

某一个算子的子任务的个数被称之为其并行度(parallelism)。 一条流水线上,几个人在同时干着打螺丝,几个人在同时处理着焊电路板。同一个程序,不同的算子,可以有不同的并行度。一个流程序的并行度,可以认为就是其所有算子中最大的并行度。如上图,source、map、window、sink四个算子,sink为1,其余为2,则这个流处理程序的并行度为2。

2、并行度的设置

方式一:代码中设置

算子后跟着调用setParallelism()方法为某一个算子设置并行度

stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);   //map算子并行度为2

执行环境对象后面调setParallelism()方法设置全局并行度,对所有算子生效

env.setParallelism(2);

一般不设全局,会导致无法动态扩容。

方式二:提交应用时指令中设置

-p参数来指定当前应用程序执行的并行度,类似上面的全局设置

bin/flink run –p 2 –c com.plat.SocketStreamWordCount  ./FlinkDemo-1.0-SNAPSHOT.jar

这种和Web控制台设置一个意思:

在这里插入图片描述

方式三:配置文件中设置

在集群的配置文件flink-conf.yaml中直接更改默认并行度:

parallelism.default: 2

这个设置对于整个集群上提交的所有作业有效,初始值为1。当代码中没设置、提交时没指定,就用这个配置文件的。在开发环境中,没有配置文件,默认并行度就是当前机器的CPU核心数

在这里插入图片描述

最后,本地调试想看控制台界面,可创建本地环境执行对象,用于本地调试:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createlocalEnvironmentWithwebuI(new Configuration());

访问localhosr:8081,socket是特殊的,只能是1,改不了,其余算子均为4。

在这里插入图片描述
最后,这几种方式的优先级为:代码中为某算子单独设定 > 代码中执行环境对象全局设置 > 提交时指定 > 配置文件

3、算子链

一个数据流,数据在各种算子之间传输的形式可能是一对一(one-to-one)的直通(forwarding),也可能是打乱的重分区(redistributing)。

在这里插入图片描述

一对一(One-to-one,forwarding)

如上图,source算子读完数据后,可以直接发给map算子接着处理。map 算子的子任务,看到的元素个数和顺序跟source 算子的子任务产生的完全一样,即一对一,一个算子的task和一个算子的task数据一样。特点是:

  • 数据不需要重新分区
  • 数据不需要调整顺序

重分区(Redistributing)

和一对一的直流相反,此时数据的分区会发生改变,如图中,map完数据后,直接keyBy/window(注意keyBy自身不是算子),按key分组了。也就是每一个算子的子任务task,会根据某些规则,把数据发送到不同的下游task,从而引起了数据重分区。

合并算子链

在Flink中,并行度相同一对一(one to one)算子操作,可以直接连接在一起形成一个大的任务(task),每个task又会被一个线程执行,即算子链。合并的条件:

  • 两算子并行度相等(子任务数量一样)
  • 两算子为one to one的直流关系

在这里插入图片描述

上图中Source和map之间满足了算子链的要求,所以可以直接合并在一起,形成了一个任务;因为并行度为2,所以合并后的任务也有两个并行子任务。这样,这个数据流图所表示的作业最终会有5个任务,由5个线程并行执行合并算子链的机制,可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。

4、禁用算子链

Flink默认会按照算子链的原则进行链接合并,但有的场景不适合合并,比如:

  • 两个算子串在一起,它们的子任务task搭配形成n组(n为并行度),每组共用一个线程,但如果两个算子本身计算任务都很重,那就不适合串一起,就像两个脾气都差的人合租,此时应该断开算子链
  • 当出现错误,需要定位问题是哪个算子时,就要禁用算子链

全局禁用算子链:

//env为执行环境对象
env.disableOperatorChaining();

disableChaining方法可只给某个算子设置禁用算子链,那它和它前后的算子就都不能再组成算子链(控制台上UI会显示Forward,表明本来是一对一的算子链关系)

.map(word -> Tuple2.of(word, 1L)).disableChaining();

在这里插入图片描述

startNewChain方法,从当前算子开始新链,即只和前面的算子断开,和后面的算子能串一起的话还是会串

// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()

5、任务槽

Flink中每一个TaskManager都是一个JVM进程,它可以启动多个独立的线程,来并行执行子任务。但每个TaskManager总的计算资源有限,并行任务越多,每个线程能分到的可用资源就越少,为了限制TaskManager能并行处理的最大任务数,提出任务槽(task slots)的概念,对TaskManager上对每个任务运行所占用的资源做出明确的划分。一锅饭,能盛6碗,谁来都夹一筷子,谁都吃不饱,因此,锅前放6个碗,也就是分为6碗饭,来一个人,就端走一碗,端没了别人就去其他锅,分到饭的六个人,也不用和别人抢,且能吃饱。这个碗就是任务槽。

在这里插入图片描述

比如一个TaskManager上有三个slot,那就把这个TaskManager的内存资源分为三份,一个插槽一份。如此,在插槽上执行一个子任务时,就相当于划定了一块内存给这个子任务专款专用,不需要和其他子任务去竞争内存资源。前面提到的合并成算子链后的5个子任务,两个TaskManager就可实现,如上图。

任务槽数量的设置

在flink安装目录的conf/flink-conf.yaml配置文件中,可以设置每个TaskManager的slot数量,默认是1个slot。

taskmanager.numberOfTaskSlots: 8

slot目前仅仅用来隔离内存,不会涉及CPU的隔离在具体应用时,可以将slot数量配置为机器的CPU核心数,尽量避免不同任务之间对CPU的竞争。这也是开发环境默认并行度设为机器CPU数量的原因。(这就像合租,内存就像卧室,厕所就像CPU,三个人,三间房,但一个厕所也够用,类比CPU时间片和线程切换)

子任务task对任务槽的共享

上面讲到,一人一碗饭,一个子任务一个插槽。而插槽的共享,就是放宽了政策,不同类型的算子,它们的并行子任务允许放到同一个插槽上并行执行(注意,依旧并行)。如下图,两个TaskManager,6个插槽,每个插槽上的子任务对应的算子种类都不一样。

在这里插入图片描述

如上图所示,只要属于同一个作业,那么对于不同任务节点(算子)的并行子任务,就可以放到同一个slot上执行。所以对于第一个任务节点source→map,它的6个并行子任务必须分到不同的slot上,而第二个任务节点keyBy/window/apply的并行子任务却可以和第一个任务节点共享slot。slot共享的好处在于:

  • 活儿大致平均分配到了所有的TaskManager
  • slot有好几种算子的子任务,组合起来就是一个完整的作业管道或者流。此时,即使某个TaskManager宕机,其他节点也不受影响,作业继续执行

如果不希望默认的slot共享,比如需要让某个算子的task独享一个slot,就可以设置slot共享组

.map(word -> Tuple2.of(word, 1L)).slotSharingGroup("taskTest");

只有属于同一个slot共享组的子任务,才会开启slot共享,这个组默认是default,不同slot共享组之间的任务是完全隔离的,必须分配到不同的slot上。

6、任务槽和并行度的关系

  • 任务槽slot是一个静态概念,表示最大的并发上限。假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,那么一共有9个task slot,表示集群最多能并行执行同一算子的9个子任务。
  • 并行度是一个动态概念,表示实际运行占用了几个。比如并行度为4,即这个算子有4个子任务task,需要放在4个插槽上。此时,并行度为4,slot为9。

Job运行时,必须插槽slot的数量必须大于等于并行度,否则任务运行失败:NoResourceAvailableException:could not acquire the minimun required resources 。注意Yarn等模式部署时,会动态申请TaskManager,申请的TM的数量 = job并行度 /每个TM的slot的数量,向上取整。

比如,某算子并行度为10,即它有10个task要放在不同的插槽上,此时插槽有9个,那就不能运行,而不是9个跑完再让第十个执行。再比如,一个Flink程序中定义了4个算子:

source→ flatmap→ reduce→ sink

前提: flink-conf.yaml中taskmanager.numberOfTaskSlots数量为3(建议为CPU核心数),假设TaskManager数量也为3,即插槽有3*3=9个

Case1:并行度parallelism.default=1

在这里插入图片描述

分析:4种算子,并行度为1 ⇒ 其中两个形成算子链算一个 ⇒ 三个子任务 ⇒ 同一作业的不同种类的算子的任务,共享任务槽 ⇒ 总共占用一个插槽,剩8个可用

Case2:全局并行度为2

在这里插入图片描述

分析:三种算子,并行度为2 ⇒ 其中两个形成算子链算一个 ⇒六个子任务 ⇒ 插槽共享 ⇒ 总共占用2个插槽,剩7个可用 ⇒ 计算机资源利用不充分,设置合适的并行度才能提高效率

Case3:全局并行度为9

分析: 并行度为9 ⇒ 一种算子有9个子任务 ⇒ 插槽共享 ⇒ 占九个

在这里插入图片描述

Case4:全局set为9,但sink算子set为1

在这里插入图片描述

分析: 并行度为9 ⇒ 一种算子有9个子任务 ⇒ 29 + 11 = 19个子任务 ⇒ 插槽共享

最后,可以看到,整个流处理程序的并行度,就是所有算子并行度的最大值,因为这代表了程序运行所需要的插槽slot的数量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/106136.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【LeetCode热题100】--121.买卖股票的最佳时机

121.买卖股票的最佳时机 class Solution {public int maxProfit(int[] prices) {int minprice Integer.MAX_VALUE;int maxprofit 0;for(int i 0;i<prices.length;i){if(prices[i] < minprice){minprice prices[i]; //找到最小值}else if(prices[i] - minprice > ma…

Linux系统编程:文件描述符以及IO多路复用

书接上回&#xff0c;我们之前学习的文件系统编程都是在内存空间中的文件流&#xff08;用户态文件缓冲区&#xff09;内进行操作的&#xff0c;比如使用的fopen、fclose、fread和fwrite等等都是库函数&#xff0c;并没有用到内核态的功能&#xff08;实际上库函数中调用的是内…

python特别篇—github基本操作手册

一、开始使用 1.1 “Hello world” 1.1.1 github介绍 GitHub是一个基于Git版本控制系统的代码托管平台。它提供了一个在线的代码仓库&#xff0c;使开发者可以将自己的代码存储在云端&#xff0c;并与其他开发者进行协作。GitHub不仅仅是一个代码托管平台&#xff0c;还提供了…

HSN:微调预训练ViT用于目标检测和语义分割,华南理工和阿里巴巴联合提出

今天跟大家分享华南理工大学和阿里巴巴联合提出的将ViT模型用于下游任务的高效微调方法HSN&#xff0c;该方法在迁移学习、目标检测、实例分割、语义分割等多个下游任务中表现优秀&#xff0c;性能接近甚至在某些任务上超越全参数微调。 论文标题&#xff1a;Hierarchical Side…

uniapp 微信小程序 vue3.0+TS手写自定义封装步骤条(setup)

uniapp手写自定义步骤条&#xff08;setup&#xff09; 话不多说 先上效果图&#xff1a; setup.vue组件代码&#xff1a; <template><view class"stepBox"><viewclass"stepitem"v-for"(item, index) in stepList":key"i…

Sprint framework Day07:注解结合 xml 配置

前言 Spring注解结合XML配置是指在Spring应用中&#xff0c;使用注解和XML配置的方式来进行Bean的定义、依赖注入和其他配置。这种方式可以充分利用Spring框架的注解和XML配置两种不同的配置方式的特点。 在Spring框架中&#xff0c;我们可以使用注解来定义Bean&#xff0c;如…

《动手学深度学习 Pytorch版》 8.5 循环神经网络的从零开始实现

%matplotlib inline import math import torch from torch import nn from torch.nn import functional as F from d2l import torch as d2lbatch_size, num_steps 32, 35 train_iter, vocab d2l.load_data_time_machine(batch_size, num_steps) # 仍然使用时间机器数据集8.…

VSCode自定义代码块详解

第一步&#xff1a;点击文件-首选项-用户代码片段 第二步&#xff1a;选择代码块作用域的文件类型 类型一&#xff1a;全局作用域 这种类型的代码块是创建在vscode软件内部的文件。是跟随这当前安装的vscode这个软件的&#xff0c;不会随着项目的关闭而失效&#xff0c;会一直存…

Gpt-4多模态功能强势上线,景联文科技多模态数据采集标注服务等您来体验!

就在上个月&#xff0c;OpenAI 宣布对ChatGPT 进行重大更新&#xff0c;该模型不仅能够通过文字输入进行识别和分析&#xff0c;还能够通过语音、图像甚至视频等多种模态的输入来获取、识别、分析和输出信息。这一重要技术突破&#xff0c;将促进多模态自然语言处理的发展&…

Android位置服务和应用权限

Github:https://github.com/MADMAX110/Odometer 一、使用位置服务 之前的Odometer应用是显示一个随机数&#xff0c;现在要使用Android的位置服务返回走过的距离。 修改getDiatance方法使其返回走过的距离&#xff0c;为此要用Android的位置服务。这些服务允许你得到用户的当…

巧用正则表达式

文章目录 题目巧用正则表达式&#xff0c;题目将十进制转为16进制&#xff0c;可以采用Java的语法来表示 题目 巧用正则表达式&#xff0c;题目将十进制转为16进制&#xff0c;可以采用Java的语法来表示 String nInteger.toString(num,16); 那如何确定是否都是字母呢a-f呢&…

车载多源融合定位

终端硬件由两部分组成&#xff0c;组合导航处理板和地磁导航处理板。 组合导航处理板负责采集加速度计、陀螺、GNSS和轮速计等数据进行组合导航解算&#xff0c;差分数据通过6Q主板获取到后通过串口发送至组合导航处理板。地磁导航处理板负责地磁数据采集&#xff0c;保存至数…

Rxjava3 全新详解及常用操作符

简介 RxJava 是一个基于 Java 的响应式编程库&#xff0c;用于处理异步事件流和数据流。它是由 Netflix 开发并开源&#xff0c;现在广泛用于 Android 和 Java 后端开发。RxJava 提供了一种用于组合和处理异步数据的丰富工具集&#xff0c;它的核心思想是将数据流视为一系列事…

微信发红包(各种红包类型)-测试用例设计

微信发红包&#xff08;各种红包类型&#xff09;

总结10.15

项目进展 登陆注册&#xff0c;连接了数据库&#xff0c;找回密码写到了通过给邮箱发送验证码&#xff0c;然后重新输入密码 项目看法 之后俩天加紧把这个登陆注册这些搞完&#xff0c;注册用到的随机生成一个账号且不重复&#xff0c;且设置一个邮箱作为之后找回密码时候的…

CVPR 2023 | 数据驱动的解释对分布外数据具有鲁棒性吗?

论文链接&#xff1a; https://arxiv.org/abs/2303.16390 代码链接&#xff1a; https://github.com/tangli-udel/DRE 01. 研究背景&#xff1a;数据驱动的解释对分布外数据具有鲁棒性吗&#xff1f; 近年来&#xff0c;将黑盒机器学习&#xff08;ML&#xff09;模型用于高风…

CentOS 7 编译安装Boost

1、前提条件 linux平台/CentOS 7 下要编译安装Boost除gcc和gcc-c之外&#xff0c;还需要两个开发库&#xff1a;bzip2-devel 和python-devel &#xff0c;因此在安装前应该先保证这两个库已经安装。 安装指令: yum install bzip2 bzip2-devel bzip2-libs python-devel Cent…

zookeeper源码学习笔记(一)

一、缘起 1、CP还是AP 作为一个在大数据行业工作了7&#xff5e;8年的老兵&#xff0c;在被问到zookeeper和CAP时&#xff0c;竟然有些恍惚&#xff0c;AP还是CP&#xff1f; 看了一些博文&#xff0c;答案几乎都是CP&#xff1f; zookeeper的实现中&#xff0c;P是一定的&…

低代码提速应用开发

低代码介绍 低代码平台是指一种能够帮助企业快速交付业务应用的平台。自2000年以来&#xff0c;低代码市场一直充斥着40大大小小的各种玩家&#xff0c;比如国外的Appian、K2、Pega Systems、Salesforce和Ultimus&#xff0c;国内的H3 BPM。 2015年以后&#xff0c;这个市场更是…

2023年厦门市高等职业院校技能竞赛软件测试竞赛规程

2023年厦门市高等职业院校技能竞赛 软件测试竞赛规程 一、赛项名称 赛项名称&#xff1a;软件测试 竞赛形式&#xff1a;团体赛 赛项专业大类&#xff1a;电子信息 二、竞赛目的 &#xff08;一&#xff09;检验教学成效 本赛项竞赛内容以《国家职业教育改革实施方案》为设计方…