Task执行流程

1、源码走读

(1)当Driver中的SchedulerBackend(Standalone模式为CoarseGrainedSchedulerBackend)给ExecutorBackend(Standalone模式为CoarseGrainedExecutorBackend)发送LaunchTask之后,CoarseGrainedExecutorBackend在收到LaunchTask消息后,Executor会通过TaskRunner在ThreadPool来运行具体的Task,TaskRunner内部会做一些准备工作:例如反序列化Task,然后通过网络来获取需要的文件,jar等。

CoarseGrainedExecutorBackend在收到LaunchTask消息后首先看下Executor存不存在,不存在则系统直接退出,然后反序列化我们的任务,反序列的是TaskDescription,反序列化之后就launchTask。

注意:在执行具体Task的业务逻辑前会进行4次反序列化:①TaskDescription的反序列化②Task的反序列化③RDD的反序列化④反序列化task的依赖

(2)进入到executor的launchTask方法中,在这内部有个TaskRunner

TaskRunner本身是个Runnable接口,在这个内部要运行核心肯定是run方法,在run方法里面定义一些属性:taskMemoryManager内存管理,deserializeStartTime反序列开始时间等。因为要加载具体的类所以要ClassLoaderThread.currentThread. setContextClassLoader(replClassLoader)。调用execBackend.statusUpdate(taskId,  TaskState.RUNNING,  EMPTY_BYTE_BUFFER),他是ExecutorBackend的方法,ExecutorBackend通过statusUpdate方法其实是给driver发信息汇报自己状态的。有了(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)这些状态,告诉driver说现在任务开始运行了。在然后这个反序列化得到的tasktuple包括任务运行的文件,jars,Bytes等等都会被反序列过来

进入到updateDependencies方法中。这里下载我们task运行需要的所有依赖文件,获得hadoop的Configuration。这里下载文件用了synchronized关键字,因为我们每一个taskrunner运行在线程中,这个方法会被多个线程调用,方法在全局中,所以需要加锁。下载文件,所有的依赖都有之后就反序列化任务即task本身。

回到run方法中,接下来判断任务是否被killed

判断任务是否被killed

再然后,调用反序列化后的task.run获得执行结果

task.run调用的时候转过来会调用runTask方法,runTask是task里面的抽象方法,task主要有两种类型:ShuffleMapTask和ResultTask。

①下面的ShuffleMapTask的runTask,实际内部执行的时候会通过调用rdd的iterator针对我们Partition进行计算。我们的ShuffleMapTask在计算具体的Partition之后会通过ShuffleManager获得ShuffleWrite把当前task计算的结果根据具体的ShuffleManager的实现来写入到具体的文件,操作完成后会把MapStatus发送给DAGScheduler。把MapStatus向DAGScheduler里面的MapOutputTracker汇报。

在rdd的iterator方法中,ShuffleMapTask先看一下cache memory中有没有曾经的数据

最终计算的时候就是调用这个rdd的compute,这里有个TaskContext类型的参数这里面维持了很多上下文信息

看个具体的RDD的compute实现,任何的RDD的compute返回的都是Iterator

这个f是个函数一般是自己写的,对他分配处理的业务逻辑,因为有些RDD是系统自动生成的所以可能是系统调用的逻辑。这个就是自己写的业务逻辑了,只不过一个Stage从后往前推他会把所有的RDD合并最后变成一个,函数链条也会展开成一个很大的函数

回到runtask中,这个writer要看不同的Shuffle

②ResultTask是根据前面Stage的执行结果进行Shuffle产生整个job最后的结果。这个是ResultTask,在反序列化RDD的时候直接去调func。

(3)回到run方法中,这个序列化的value是前面task.run获得的执行结果,之所以记录这么多时间是为了在web控制台可以看到这些信息

(4)在Executor的run方法的task执行完之后会调用CoarseGrainedExecutorBackend的statusUpdate。其实就是给我们的driver发一个信息。

(5)CoarseGrainedSchedulerBackend收到statusUpdate消息后,它会调用Scheduler.statusUpdate,会释放相关的资源,如果没有什么问题的话空闲资源中就加上曾经想消耗的东西,再次进行资源调度

(6)回到launchtask方法中,把他交给runningTasks这样一个数据结构中,放入taskid以及业务逻辑,然后交给ThreadPool。

2、总结

(1)CoarseGrainedExecutorBackend在收到 CoarseGrainedSchedulerBackend发送的LaunchTask消息后反序列化TaskDescription

(2)通过executor的launchTask方法中执行Task,在launchTask内部会创建TaskRunner,在TaskRunner内部会做一些准备工作,如反序列化task,task依赖,获取jar等

(3)TaskRunner在ThreadPool具体运行Task;

(4)TaskRunner中会调用反序列化的Task.run方法执行并获得执行结果。在调用run方法的时候会调用Task的抽象方法runTask。在runTask内部会调用RDD的iterator()方法,在处理的内部会迭代Partition的元素并给我们自定义函数进行处理。对于ShuffleMapTask,首先要对RDD以及其依赖关系进行反序列化,最终会调用RDD的compute方法

(5)把执行结果序列化,并根据大小判断不同的姐夫哦传回给Driver的方式

(6)CoarseGrainedExecutorBackend给DriverEndPoint发送StatusUpdate传输执行结果,DriverEndPoint把执行结果传给TaskSchedulerImpl处理然后交给TaskResultGetter内部通过线程去分别处理Task执行成功和失败的不同情况,然后返回DAGScheduler任务处理结束的状况。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473626.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 1638. 统计只差一个字符的子串数目(DP)

文章目录1. 题目2. 解题2.1 暴力枚举2.2 DP1. 题目 给你两个字符串 s 和 t ,请你找出 s 中的非空子串的数目,这些子串满足替换 一个不同字符 以后,是 t 串的子串。 换言之,请你找到 s 和 t 串中 恰好 只有一个字符不同的子字符串…

Airflow简介

1、什么是Airflow Airflow 是一个 Airbnb 的 Workflow 开源项目,使用Python编写实现的任务管理、调度、监控工作流平台。Airflow 是基于DAG(有向无环图)的任务管理系统,可以简单理解为是高级版的crontab,但是它解决了crontab无法解决的任务…

Flask简介与简单项目操作流程

Flask框架简介Flask诞生于2010年,是Armin ronacher(人名)用Python语言基于Werkzeug工具箱编写的轻量级Web开发框架。它主要面向需求简单的小应用。Flask本身相当于一个内核,其他几乎所有的功能都要用到扩展(邮件扩展Fl…

LeetCode 1640. 能否连接形成数组(哈希)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 arr ,数组中的每个整数 互不相同 。 另有一个由整数数组构成的数组 pieces,其中的整数也 互不相同 。请你以 任意顺序 连接 pieces 中的数组以形成 arr 。但是,不允许 对每个数组 pieces[i]…

python基本知识、数据库、网络、编程等总结

Python语言特性 1 Python的函数参数传递 看两个例子: a 1 def fun(a):a 2 fun(a) print a # 1 a [] def fun(a):a.append(1) fun(a) print a # [1] 所有的变量都可以理解是内存中一个对象的“引用”,或者,也可以看似c中void*的感觉。 通过id来看引…

LeetCode 1641. 统计字典序元音字符串的数目(DP)

文章目录1. 题目2. 解题1. 题目 给你一个整数 n,请返回长度为 n 、仅由元音 (a, e, i, o, u) 组成且按 字典序排列 的字符串数量。 字符串 s 按 字典序排列 需要满足:对于所有有效的 i,s[i] 在字母表中的位置总是与 s[i1] 相同或在 s[i1] 之…

LeetCode 1642. 可以到达的最远建筑(二分查找 / 优先队列贪心)

文章目录1. 题目2. 解题2.1 二分查找2.2 优先队列贪心1. 题目 给你一个整数数组 heights ,表示建筑物的高度。另有一些砖块 bricks 和梯子 ladders 。 你从建筑物 0 开始旅程,不断向后面的建筑物移动,期间可能会用到砖块或梯子。 当从建筑…

ClickHouse高可用及副本测试

1 概述 ​ 对于默认的分布式表的配置,每个分片只有一份,这种多分片单副本集群,挂掉一个节点的话查询分布式表会报错。为了解决这个问题的话可以使用ClickHouse高可用集群,对于每个分片具有2个或2个以上的副本,当某个节…

阿里云Redis读写分离典型场景:如何轻松搭建电商秒杀系统

秒杀活动是绝大部分电商选择的低价促销,推广品牌的方式。不仅可以给平台带来用户量,还可以提高平台知名度。一个好的秒杀系统,可以提高平台系统的稳定性和公平性,获得更好的用户体验,提升平台的口碑,从而提…

LeetCode 756. 金字塔转换矩阵(回溯)

文章目录1. 题目2. 解题1. 题目 现在,我们用一些方块来堆砌一个金字塔。 每个方块用仅包含一个字母的字符串表示。 使用三元组表示金字塔的堆砌规则如下: 对于三元组(A, B, C) ,“C”为顶层方块,方块“A”、“B”分别作为方块“…

Flask框架项目实例:**租房网站(一)

Flask是一款MVC框架,主要是从模型、视图、模板三个方面对Flask框架有一个全面的认识,通过完成作者-读书功能,先来熟悉Flask框架的完整使用步骤。 操作步骤为: 1.创建项目2.配置数据库3.定义模型类4.定义视图并配置URL 5.定义模板…

LeetCode 316. 去除重复字母 / 1081. 不同字符的最小子序列(单调栈)

文章目录1. 题目2. 解题1. 题目 LC 316: 给你一个字符串 s ,请你去除字符串中重复的字母,使得每个字母只出现一次。需保证 返回结果的字典序最小(要求不能打乱其他字符的相对位置)。 示例 1: 输入&#…

LeetCode 809. 情感丰富的文字

文章目录1. 题目2. 解题1. 题目 有时候人们会用重复写一些字母来表示额外的感受,比如 "hello" -> "heeellooo", "hi" -> "hiii"。 我们将相邻字母都相同的一串字符定义为相同字母组,例如:&qu…

网站部署nginx--uwsgi

网站代码写完之后就是项目部署,主要包括两个方面: 1.nginx安装与配置: 1、Nginx 安装 系统平台:CentOS release 6.6 (Final) 64位。 一、安装编译工具及库文件 yum -y install make zlib zlib-devel gcc-c libtool openssl open…

天池 在线编程 滑动数独(滑动窗口)

文章目录1. 题目2. 解题1. 题目 描述 给定一个 3xn的矩阵 number,并且该矩阵只含有1到9的正整数。 考虑有一个大小为 3x3 滑动窗口,从左到右遍历该矩阵 number, 那么该滑动窗口在遍历整个矩阵的过程中会有n-2个。 现在你的任务是找出这些滑…

TIGK监控平台介绍

1 概述 众所周知监控平台对大数据平台是非常重要的,监控是故障诊断和分析的重要辅助利器,在发生事故之前就能预警,最大限度降低系统故障率。   监控系统我们可以分为业务层面,应用层面,系统层面 1.1 业务层面 业务系…

天池 在线编程 队列检查(排序)

文章目录1. 题目2. 解题1. 题目 描述 班上的学生根据他们的年级照片的身高升序排列&#xff0c;确定当前未站在正确位置的学生人数 数组长度 < 10^5示例 输入: heights [1,1,3,3,4,1]输出: 3解释: 经过排序后 heights变成了[1,1,1,3,3,4]&#xff0c;有三个学生不在应在…

celery异步执行任务在Django中的应用实例

1. 创建django项目celery_demo, 创建应用demo: django-admin startproject celery_demo python manage.py startapp demo2.在celery_demo模块中创建celery.py模块, 文件目录为: celery.py模块内容为: from celery import Celery from django.conf import settings import os#…

Spring自学教程-注解的使用(三)

一、java中的注解定义注解下面是一个定义注解的实例。Target(ElementType.TYPE)Retention(RetentionPolicy.RUNTIME)DocumentedInheritedpublic interface Description { String value();}其中的interface是一个关键字&#xff0c;在设计annotations的时候必须把一个类型定义为…

Django单元测试

一.前言/准备 测Django的东西仅限于在MTV模型。哪些可以测&#xff1f;哪些不可以。 1.html里的东西不能测。①Html里的HTML代码大部分都是写死的②嵌套在html中的Django模板语言也不能测&#xff0c;即使有部分逻辑。 但写测试用例时至少要调用一个类或者方法。模板语言没有出…