1 运行时相关的组件
Flink运行时架构主要包括四个不同的组件:作业管理器(JobManager)、资源管理器(ResourceManager)、任务管理器(TaskManager),以及分发器(Dispatcher)
(1)资源管理器(ResourceManager)
主要负责TaskManager的slot(插槽),slot是Flink中处理资源的单元。Flink为不同的环境和资源管理工具提供了不同资源管理器。当JobManager申请插槽资源的时候,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果没有足够的插槽,可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。还负责终止空闲的TaskManager,释放计算资源。
(2)JobManager
协调分布式计算,负责调度任务、协调 checkpoints、协调故障恢复等。每个 Job 至少会有一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 leader,其余处于 standby 状态。
每个应用程序都会被一个不同的JobManager所控制执行,是控制每一个应用程序执行的主进程。JobManager接收要执行的应用程序,包括作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包等。JobManager将JobGraph转换成物理层面的数据流图也叫执行图(ExecutionGraph),执行图包含了所有可以并行进行执行的任务。当JobManager向资源管理器请求完执行任务需要的资源(TaskManager上的slot)时就会将执行图分发到真正运行它们的TaskManager上,JobManager还需要负责所有需要中央协调的操作。
(3)TaskManager
用来执行 dataflow 中的 tasks(准确来说是 subtasks ),并且缓存和交换数据 streams。每个 Job 至少会有一个 TaskManager。
每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。每当TaskManager启动后都会向资源管理器注册它的插槽。当资源管理器向它发出提供slot指令后TaskManager就会将一个或者多个插槽提供给JobManager调用,JobManager就可以向插槽分配任务(tasks)来执行了。在执行过程中,同一应用程序的TaskManager之间可以交换数据。
(4)Dispatcher
Dispatcher为应用提交提供了REST接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager,可以跨作业运行。Dispatcher是REST接口,所以可以作为集群的一个HTTP接入点,这样就能够不受防火墙阻挡。Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。
2 任务提交流程
当一个任务提交时,较高层次的各运行时组件的交互如下:
(1)客户端提交应用
(2)分发器就会启动并将应用移交给一个JobManager
(3)JobManager向ResourceManager申请slots
(4)ResourceManager启动TaskManager
(5)TaskManager启动后向ResourceManager注册slots
(6)ResourceManager向TaskManager发出提供slot的指令
(7)TaskManager向JobManager提供slots
(8)JobManager在TaskManager提供的slots中提交要执行的任务
(9)在执行任务过程中TaskManager之间交互数据
当部署的集群环境不同(YARN,Mesos,Kubernetes,standalone等),上述步骤会有所不同,如果我们将Flink集群部署到YARN上,提交流程如下:
(1)Client首先把jar包和配置上传到hdfs里
(2)Client提交job到ResourceManager
(3)ResourceManager分配container资源,通知对应的NodeManager启动ApplicationMaster
(4)ApplicationMaster启动后加载jar包和配置构建环境,启动JobManager
(5)ApplicationMaster向ResourceManager申请启动TaskManager
(6)ResourceManager分配container之后,ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
(7)NodeManager加载jar包和配置构建环境,并启动TaskManager
(8)TaskManager启动后向jobManager发送心跳包,并等待JobManager向其分配任务
3 任务调度
3.1 Job Managers、Task Managers、Clients
一个Flink代码首先生成的是一个数据流图DataFlow graph,然后在Client客户端经过一些处理之后把它提交给JobManager;JobManager上就会把它结合并行度生成一个执行图,然后就知道了要多少个TaskManager,要多少个TaskSlots;申请到足够的资源后就把对应的任务分配到相应的TaskSlots。(注意:每个TaskManager里面可以包含多个TaskSlot,TaskSlot里面到底执行什么Task就看JobManager生成的执行图是什么样的,这就涉及TaskManager和Slots的概念
client:是提交job的客户端,用于准备并发送dataflow(JobGraph)给Master(JobManager),可以运行在任何机器上,只要能与JobManager环境相连即可,提交job后,client可以结束进程也可以维持连接以等待接收计算结果。
JobManager:负责Job的调度,并协调task做checkpoint,获得client提交的的job和jar包等资源后,会生成优化后的执行计划,以task单元调度到各个TaskManager去执行。
TaskManager:在启动的时候就已经设置好槽位数Slot,每个slot能启动一个task,task为线程,从JobManager接收需要部署的task,部署启动后为上游建立Netty 连接,接收数据并处理。
3.2 TaskManager、Slots
TaskManager是一个进程,在Slots上执行的task是一个线程。也就是一个TaskManager是一个JVM进程,可以在里面启动多个线程执行任务。每个任务要在固定的集合资源中运行,这个资源就是slots。
所以Slot就是我们执行每个任务线程的资源,而且这个线程相当于是直接划分好给定的资源,所以每个TaskSlot是表示每个TaskManager拥有的固定大小的子集。如果一个TaskManager有3个TaskSlot就要把自己的内存分成3份给slot,所以Slot之间内存是独享的。所以某个线程挂了不影响其他的,但是CPU是不独享的,这也就是为什么建议把Slot数量配置成CPU核心数的原因。这样在4核的CPU上跑4个独立的线程,默认每个线程占用一个核心做处理,不会出现cpu轮转竞争资源,所以slot数量最好把他配成CPU核心数。
上图是先把source,map合成一个大任务,然后后面keyby一个窗口做聚合,最后是sink,这是3个任务。这里前面2个任务并行度都是2,总共是5个任务,那么是不是这5个任务不是都占用一个独立的TaskSlot。假如一个任务对cpu占用比较少,内存消耗也比较少,另外一个任务cpu占用比较多,导致有些任务很快执行完,有些很慢,这样资源利用率是不高的。我们可以把不同的任务共享一个slot,效果如下
假设现在并行度是6,总共有13个任务,不需要有13个slot依次排开,因为slot里面可以不同的任务去共享slot,这种共享的方式可以提高资源的利用率的。资源共享还有一个效果:整个处理流程里面相当于,假设所有的slot并行度都是6,每一个slot里面都能包含所有完整的操作步骤,这相当于只要留着一个slot就可以把整个数据操作管道全保存下来,完整的数据流程还是可以留下来的。
可以发现这里面有数据要传输到别的slot上,甚至要跨TaskManager传输的话,这要做序列化反序列化,这个过程会降低效率。这就是为什么后面有些任务要合并在一起,本来是不同的操作,如果合并在一起相当于他们之间的数据传输就变成一个本地调用了,不需要再去跨slot传输,没有序列化与反序列化,这就是合并的过程,算子链。
注意:(1)对于流处理程序而言,需要占用的slot数量就是整个处理流程中,最大的那个并行度(3)Task Slot与parallelis的区别:Task Slot是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置;而并行度parallelism是动态概念,即TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置。(3)并行的概念。①数据并行:同样的一个任务,不同的并行子任务,同时处理不同的数据②任务并行:同一时间不同的slot在执行不同的任务。
3.3 数据传输形式与并行度
在执行过程中,一个流(stream)包含一个或多个分区(stream partition),而每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中彼此互不依赖地执行。
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。不同的算子可能具有不同的并行度,所以算子之间传输数据的形式也不一样,可以分为one-to-one和redistributing,具体是哪一种形式取决于算子的种类。
(1)one-to-one:stream维护着分区以及元素的顺序,如source和map之间,这意味着map算子的子任务看到的元素的个数和顺序与source算子的子任务生产的元素的个数,顺序相同。像map,fliter,flatMap等算子都是one-to-one的形式,类似于spark中的窄依赖
(2)redistributing:stream的分区会发生改变,每一个算子的子任务根据所选择的transformation发送到不同的目标任务。如keyBy是基于hashCode重分区,而broadcast和rebalance会随机重分区,这些算子都会引起redistributed,其实就类似于Spark中的Shuffle过程,类似于spark中的宽依赖
3.4 任务和算子链
分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。相同并行度的one to one操作,Flink这样相连的算子链接在一起形成一个task,原来的算子成为里面的一部分。把算子链接成 tasks 能够减少线程间切换和缓冲的开销,在降低延迟的同时提高了整体吞吐量。
下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。
source读取数据源,在map后面就hashcode重分区,keyBy做聚合(keyBy本身并不是一个操作,只是定义重分区的模式),然后就想窗口操作,最后sink输出。
这里map到后面的窗口操作是要重分区的。假设在代码里面设置如上并行度,本来的3步操作就分成7个任务,这7个任务因为source和map是one-to-one操作,所以连接在一起,就变成5个子任务。不同的子任务可以共享一个slot,所以其实有2个slot就可以用。整个处理过程中,最大的并行度就是当前需要的slot数量。
任务之间数据传输看操作,source到map本身是窄依赖是one-to-one的操作,并且并行度相同,那么他们可以合并。map到window操作本身是宽依赖,并行度相同也不能合并。window到sink不仅并行度不相同还是窄依赖所以不可以合并
注意:只有并行度相同,并且是one-to-one类型的数据传输,才可以把多个算子合并成一个任务。
3.5 数据流(DataFlow)
所有的Flink程序都是由三部分组成的: Source(读取数据源) 、Transformation(利用各种算子进行处理加工)和Sink(输出)
程序运行时,Flink上运行的程序会被映射成逻辑数据流(dataflows),每一个dataflow以一个或多个sources开始以一个或多个sinks结束。dataflow类似于任意的有向无环图DAG。在程序中的transformations跟dataflow中的算子(operator)可以是一一对应的关系,也可以是一个transformation可能对应多个operator。
3.6 执行图(ExecutionGraph)
Flink程序直接映射成的数据流图是StreamGraph,也被称为逻辑流图,因为它们表示的是计算逻辑的高级视图。为了执行一个流处理程序,Flink需要将逻辑流图转换为物理数据流图(也叫执行图),详细说明程序的执行方式。
整个业务的完成,其实就是执行图的逐渐优化的过程,Flink的执行图可分为4层:StreamGraph->JobGraph->ExecutionGraph->物理执行图
StreamGraph:根据用户编写的Stream API编写的代码生成的最初的图,也就是上面的dataflow,用来表示程序的拓扑结构。
JobGraph:StreamGraph经过优化后就生成了JobGraph,是提交给JobManager的数据结构,主要的优化是:将多个符合条件的节点chain在一起作为一个节点,就可以减少数据在节点之间的流动所需的序列化/反序列化/传输消耗。
ExecutionGraph:JobManager根据JobGraph生成ExecutionGraph,ExecutionGraph是JobGraph的并行化版本,是调度层核心的数据结构。
物理执行图:ExecutionGraph已经是可以执行的了,JobManager根据ExecutionGraph对Job进行调度后,在各个TaskManager上会把这个图转换成最终在每个slot上要执行的代码,不是具体的数据结构。
Flink的执行图流程如下:①最初按照代码生成的streamGraph(dataflowGraph),对应每一个算子每一步操作都是一个任务;②接下来在Client上会直接生成JobGraph,这步是把符合要求的任务合并在一起,串成一个任务;③JobGraph会提交给JobManager,JobManager会按照当前的并行度把他拆开,这里并行度不一样就涉及怎样传输,生成真正可以执行的ExecutorGraph;③ExecutorGraph传给TaskManager去执行生成真正的物理执行图。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1naR3qDR-1595864632942)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\ksohtml\wps6090.tmp.jpg)]
3.7 状态后端(State Backends)
key/values 索引存储的数据结构取决于 state backend的选择。一类 state backend 将数据存储在内存的哈希映射中,另一类 state backend 使用 RocksDB作为键/值存储。除了定义保存状态(state)的数据结构之外, state backend 还实现了获取键/值状态的时间点快照的逻辑,并将该快照存储为 checkpoint 的一部分。