Spark2.x RPC解析

1、概述

在Spark中很多地方都涉及网络通信,比如Spark各个组件间的消息互通、用户文件与Jar包的上传、节点间的Shuffle过程、Block数据的复制与备份等。Spark 2.0 之后,master 和worker 之间完全不使用akka 通信,改用netty实现。因为使用Akka要求message发送端和接收端有相同的版本,为了避免Akka造成的版本问题,给用户的应用更大灵活性,决定使用更通用的RPC实现。

spark 基于netty新的rpc框架借鉴了Akka的中的设计,它是基于Actor模型,如下图所示:

Spark通讯框架中各个组件(Client/Master/Worker)可以认为是一个个独立的实体,各个实体之间通过消息来进行通信。具体各个组件之间的关系图如下:

Endpoint(Client/Master/Worker)有1个InBox和N个OutBox(N>=1,N取决于当前Endpoint与多少其他的Endpoint进行通信,一个与其通讯的其他Endpoint对应一个OutBox),Endpoint接收到的消息被写入InBox,发送出去的消息写入OutBox并被发送到其他Endpoint的InBox中。

2、Spark通信架构

(1)RpcEndpoint:RPC端点,Spark针对每个节点(Client/Master/Worker)都称之为一个Rpc端点,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher;

(2)RpcEnv:RPC上下文环境,每个RPC端点运行时依赖的上下文环境称为RpcEnv;

(3)Dispatcher:消息分发器,针对于RPC端点需要发送消息或者从远程RPC接收到的消息,分发至对应的指令收件箱/发件箱。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱;

(4)Inbox:指令消息收件箱,一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部ReceiverQueue中,另外Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费;

(5)RpcEndpointRef:RpcEndpointRef是对远程RpcEndpoint的一个引用。当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。

(6)OutBox:指令消息发件箱,对于当前RpcEndpoint来说,一个目标RpcEndpoint对应一个发件箱,如果向多个目标RpcEndpoint发送信息,则有多个OutBox。当消息放入Outbox后,紧接着通过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;

(7)RpcAddress:表示远程的RpcEndpointRef的地址,Host + Port。

(8)TransportClient:Netty通信客户端,一个OutBox对应一个TransportClient,TransportClient不断轮询OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer;

(9)TransportServer:Netty通信服务端,一个RpcEndpoint对应一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱;

3、相关源码阅读

3.1、RpcEnv

RpcEnv是Rpc的环境(相当于Actor中的ActorSystem),所有的RPCEndPoint都需要注册给RpcEnv实例对象(注册的时候会指定注册的名称,这样客户端就可以通过名称查询到RPCEndPoint的RPCEndPointRef引用,进而进行通信(客户端通过操作RPCEndPointRef要给RpcEndPoint发信息,怎么发要RpcEnv去管理,RpcEnv在具体的实例看见发的信息,因为有Ref肯定有路由,就路由到远程的具体的RpcEndPoint实体内部的receive方法中)),如果不注册的话收不到消息。所有的RpcEndPoint其实都是属于RpcEnv的,只有属于他客户端发消息的时候才能把信息路由给RpcEndPoint。

也就是RpcEnv 是一个RPC 环境。 RpcEndpoint需要使用RpcEnv的名称来注册自己,以接收消息。RpcEnv将处理从RpcEndpointRef或远程节点发送的消息,并将它们传递到相应的RpcEndpoint。

RpcEnv是个抽象类,作为Rpc通信肯定要传入SparkConf,因为是分布式的,在spark2.x版本中,使用的具体的实现类是NettyRpcEnv。RpcEnv的结构如下:

在RpcEnv的伴生对象中,重要的功能是创建一个RpcEnv的实例,这个实例就是用于管理endpoint。

在 RpcEnv中,有一个重要且常用的方法setupEndpoint方法。该方法就是利用上面创建的RpcEnv实例,来初始化(注册)一个Endpoint,并返回该endpoint的RpcEndpointRef(代理对象)。当我们调用RpcEnv中的setupEndpoint来注册一个endpoint到rpcEnv的时候,在NettyRpcEnv内部,会将该endpoint的名称与其本省的映射关系,rpcEndpoint与rpcEndpointRef之间映射关系保存在dispatcher对应的成员变量中。我们拿到Endpoint的代理对象后就能向该endpoint发送消息

还有一个setupEndpointRef 方法来获取到指定endpoint的引用对象

下面看看master端的代码,master在启动的时候在其伴生对象中会有一个rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)方法创建一个RpcEnv的实例,这个实例就是用于管理endpoint。然后masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))获得一个具体一个具体的endpoint的实例

3.2、RpcEndpointRef

RpcEndpointRef是RpcEndpoint的一个引用,简称代理。RpcEndpointRef是线程安全的。

要想向一个RpcEndpoint发送消息,必须先持有其Ref代理,通过该代理才能发送消息。RpcEnv结构如下

常用的发送消息的方法是send,ask,askWithRetry,他们之间的区别:send发送的消息,没有返回值,用receive接收即可;ask和askWithRetry,发送的消息有返回值,用receiveAndReply来接收。

下面看看worker端的代码:worker获取到master的ref引用对象,然后发送注册消息

3.3、在standalone模式中worker向master注册案例

1在worker启动的时候有onStart方法,这里面调用了registerWithMaster,这里面用了tryRegisterAllMaster方法在具体注册的时候向所有的master提交,是用线程池的中一个线程来提交。然后就获得了masterEndpoint。获得了masterEndpoint之后,将其作为参数传入registerWithMaster方法。然后通过ask发送消息。

(2)当调用ask将消息发送出去。其实是调用NettyRpcEndpointRef中ask,在方法中当前发送地址(nettyEnv.address),目标的master地址(this)和发送的消息message被封装成了RequestMessage消息。

(3)在NettyRpcEnv.ask中如果是远程rpc调用的话,最终ask将调用postToOutbox函数,并且此时消息会被序列化成Byte流。实现如下:

(4)在postToOutbox函数中,消息将经过OutboxMessage中的sendWith方法(client.sendRpc(content)),最终通过TransportClient的sendRpc方法(client.sendRpc(content)),而在TransportClient中将消息进一步封装,然后发送给master。

(5)在master端TransportRequestHandler的handle方法中,由于信息在worker端被分装成了RpcRequest,所以在该handle方法中,将调用processRpcRequest进行处理。

(6)processRpcRequest函数将调用rpcHandler的实现类NettyRpcHandler中的receive方法。在该方法中,首先通过internalRecieve将消息解包成RequestMessage。然后该消息通过dispatcher的分发给对应的endpoint

(7)在Dispatcher的postMessage方法中,可以看到,首先根据对应的endpoint的EndpointData信息(主要是该endpoint及其应用以及其信箱(inbox)),然后将消息塞到给endpoint(此例中的master)的信箱中,最后将消息塞到recievers的阻塞队列中。

(8)在Dispatcher中有一个线程池threadpool在MessageLoop类的run方法中,将receivers中的对象取出来,交由信箱的process方法去处理。如果没有收到任何消息,将会阻塞在take处

(9)在inbox的proces方法中,首先取出消息,然后根据消息的类型,最终将调用endpoint的receiver方法进行处理(也就是master中的receive方法)。至此,整个一次rpc调用的流程结束。

总结:①当调用ask将消息发送出去。其实是调用NettyRpcEndpointRef中的ask等方法,并将消息封装②NettyRpcEndpointRef中的ask方法调用了NettyRpcEnv.ask如果是远程rpc调用的话,最终ask将调用postToOutbox函数,并且此时消息会被序列化成Byte流。③在postToOutbox函数中调用OutboxMessage中的sendWith方法中调用TransportClient的sendRpc方法,在TransportClient中将消息进一步封装,然后发送给master④在master端TransportRequestHandler的handle方法中进行消息类型判断,调用processRpcRequest函数⑤processRpcRequest函数将调用rpcHandler的实现类NettyRpcHandler中的receive方法,然后该消息通过dispatcher的分发给对应的endpoint⑥在Dispatcher的postMessage方法中,可以看到,首先根据对应的endpoint的EndpointData信息放到inbox信箱中,最后将消息塞到recievers的阻塞队列中⑦在Dispatcher中有一个线程池threadpool在MessageLoop类的run方法中,将receivers中的对象取出来,交由信箱的process方法去处理。如果没有收到任何消息,将会阻塞在take处⑧在inbox的proces方法中,首先取出消息,然后根据消息的类型,最终将调用endpoint的receiver或receiveAndReply方法进行处理(也就是master中的receive方法)。

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473660.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 1629. 按键持续时间最长的键

文章目录1. 题目2. 解题1. 题目 LeetCode 设计了一款新式键盘,正在测试其可用性。测试人员将会点击一系列键(总计 n 个),每次一个。 给你一个长度为 n 的字符串 keysPressed ,其中 keysPressed[i] 表示测试序列中第 …

数据结构中的栈

整理衣服时,先放冬天的衣服,后放夏天的衣服,这样夏天的衣服就在上面,方便夏季取用。 栈(stack),有些地方称为堆栈,是一种容器,可存入数据元素、访问元素、删除元素&…

数据结构中的队列

生活中很多时候需要排队来维持秩序,如等公交、取票、办理银行业务等。 队列(queue)是只允许在一端进行插入操作,而在另一端进行删除操作的线性表。 队列是一种先进先出的(First In First Out)的线性表&am…

SparkContext解析

1、SparkContext概述 Spark的程序编写是基于SparkContext的,体现在2方面:①Spark编程的核心基础(RDD),第一个RDD是由SparkContext创建的;②Spark程序的调度优化也是基于SparkContext,RDD在一开…

LeetCode 1630. 等差子数组

文章目录1. 题目2. 解题1. 题目 如果一个数列由至少两个元素组成,且每两个连续元素之间的差值都相同,那么这个序列就是 等差数列 。更正式地,数列 s 是等差数列,只需要满足:对于每个有效的 i , s[i1] - s[…

LeetCode 1631. 最小体力消耗路径(DFS + 二分查找)

文章目录1. 题目2. 解题1. 题目 你准备参加一场远足活动。给你一个二维 rows x columns 的地图 heights ,其中 heights[row][col] 表示格子 (row, col) 的高度。 一开始你在最左上角的格子 (0, 0) ,且你希望去最右下角的格子 (rows-1, columns-1) &…

Spark资源调度分配

1、任务调度与资源调度 任务调度:是指通过DAGScheduler,TaskScheduler,SchedulerBackend等进行的作业调度。 资源调度:是指应用程序获取资源。 任务调度是在资源调度的基础上,没有资源调度,那么任务调度…

两个栈实现队列与两个队列实现栈

1. 两个栈实现队列 实现一 思路 s1是入栈的,s2是出栈的。 入队列,直接压到s1是就行了出队列,先把s1中的元素全部出栈压入到s2中,弹出s2中的栈顶元素;再把s2的所有元素全部压回s1中 实现二 思路 s1是入栈的&#xff0c…

ACwing 5. 多重背包问题 II(二进制拆分+DP)

文章目录1. 题目2. 解题1. 题目 有 N 种物品和一个容量是 V 的背包。 第 i 种物品最多有 si 件,每件体积是 vi,价值是 wi。 求解将哪些物品装入背包,可使物品体积总和不超过背包容量,且价值总和最大。 输出最大价值。 输入格式…

排序:冒泡排序与选择排序

冒泡排序 冒泡排序(英语:Bubble Sort)是一种简单的排序算法。它重复地遍历要排序的数列,一次比较两个元素,如果他们的顺序错误就把他们交换过来。遍历数列的工作是重复地进行直到没有再需要交换,也就是说该…

Spark Master的注册机制与状态管理

目录 1、Master接收注册的主要对象 2、Master接收Worker的注册 3、Master接收Driver的注册 4、Master处理Driver状态变化 5、Master接收Application的注册 6、Master处理Executor状态变化 1、Master接收注册的主要对象 Master主要接受注册的对象是:Applicatio…

排序:插入排序与希尔排序

插入排序 插入排序(英语:Insertion Sort)是一种简单直观的排序算法。它的工作原理是通过构建有序序列,对于未排序数据,在已排序序列中从后向前扫描,找到相应位置并插入。插入排序在实现上,在从…

jieba分词提取小说人名

文章目录1. 读入文本2. 分词3. 计数4. 排序5. 添加用户字典以《神雕侠侣》为例: 使用 jieba.posseg获取词性,人名的词性为 nr 1. 读入文本 import jieba.posseg as psg with open(shendiaoxialv.txt,encodingutf-8) as f:text f.readlines()print(te…

Spark Worker源码

目录 1、概述 2、LaunchDriver 3、LaunchDriver 4、总结 1、概述 worker肯定是实现RPC通信的,否则别人没法给你发消息。他继承的是ThreadSafeRpcEndpoint,ThreadSafeRpcEndpoint是线程安全的,意味着处理一条消息完成后再处理下一个消息。换…

排序:快速排序与归并排序

快速排序 快速排序(英语:Quicksort),又称划分交换排序(partition-exchange sort),通过一趟排序将要排序的数据分割成独立的两部分,其中一部分的所有数据都比另外一部分的所有数据都…

LeetCode 764. 最大加号标志(DP)

文章目录1. 题目2. 解题1. 题目 在一个大小在 (0, 0) 到 (N-1, N-1) 的2D网格 grid 中,除了在 mines 中给出的单元为 0,其他每个单元都是 1。网格中包含 1 的最大的轴对齐加号标志是多少阶?返回加号标志的阶数。如果未找到加号标志&#xff…

机器学习基础—Kaggle泰坦尼克预测(完整分析)

1.引言 我们先找个简单的实际例子,来看看,所谓的数据挖掘或者机器学习实际应用到底是怎么样一个过程。 2.背景 2.1 关于Kaggle Kaggle是一个数据分析建模的应用竞赛平台,有点类似KDD-CUP(国际知识发现和数据挖掘竞赛)&…

Spark Executor解析

目录 1、Spark Executor如何工作 2、Spark Executor工作源码 1、Spark Executor如何工作 当Driver发送过来Task的时候,其实是发送给CoarseGrainedExecutorBackend这个RPCEndpoint,而不是直接发送给Executor(Executor由于不是消息循环体永远…

LeetCode 381. O(1) 时间插入、删除和获取随机元素 - 允许重复(vector + 哈希)

文章目录1. 题目2. 解题1. 题目 设计一个支持在平均 时间复杂度 O(1) 下, 执行以下操作的数据结构。 注意: 允许出现重复元素。 insert(val):向集合中插入元素 val。remove(val):当 val 存在时,从集合中移除一个 val。getRando…

Stage划分和Task最佳位置

目录 1、Job Stage划分 2、Task最佳位置 3、总结 3.1 Stage划分总结: 3.2 Task最佳位置总结: 1、Job Stage划分 Spark Application中因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由是…