Spark源码(一)-SparkRPC示例

一、何为SparkRPC

RPC全称为远程过程调用(Remote Procedure Call),它是一种计算机通信协议,允许一个计算机程序调用另一个计算机上的子程序,而无需了解底层网络细节。通过RPC,一个计算机程序可以像调用本地程序一样调用远程程序,使得分布式应用程序的开发更加简单和高效。

二、SparkRPC示意图

三、SparkRPC代码示例

1、基本流程

①启动Master和Worker

②Worker向Master发送注册信息(封装成一个类:RegisterWorker)

case class RegisterWorker(rpcEndpointRef:RpcEndpointRef,workerId:String,workerMemory:Int,workerCores:Int)

③Master收到Worker的注册信息,并将其存放到一个HashMap,其中Key为WorkerId,Value为WorkerInfo,其结构如下:

class WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long = _
}

其中lastHearBeatTime是该Worker最后一次心跳时间。

④Master中启动一个定时任务(设定为每15s执行一次),定时从HashMap中获取各个Worker信息,并将其中的lastHearBeatTime与当前时间进行比较,如果大于10s,就认为该Worker已经与Master失联,将其从HashMap中剔除

⑤)(与④其实同步进行) Worker同样开启了一个定时任务(设定为每10s执行一次), 定时给Master发送心跳HeartBeat; Master收到该心跳后,根据WorkerId从HashMap中取出对应的Worker信息,并将其lastHearBeatTime修改为当前时间,从而不断更新与Master保持通信的Worker的最后心跳时间。

case class HeartBeat(WorkerId:String)

2、完整代码 

(1)Master

package org.apache.spark.wakedataimport org.apache.spark.SparkConf
import org.apache.spark.SecurityManager
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}import java.util.concurrent.{Executors, TimeUnit}
import scala.collection.mutableclass Master(val rpcEnv:RpcEnv) extends ThreadSafeRpcEndpoint{val idToWorker = new mutable.HashMap[String, WorkerInfo]()override def onStart(): Unit = {//启动一个定时器val service = Executors.newScheduledThreadPool(1)service.scheduleAtFixedRate(new Runnable {override def run(): Unit = {// 如果Worker最后一次心跳时间距离当前时间 大于10s,就需要移除该Workerval deadWorkers = idToWorker.values.filter(w => System.currentTimeMillis() - w.lastHearBeatTime > 10000)deadWorkers.foreach(w => {idToWorker -= w.workerId})println(s"当前活跃的Worker数量:${idToWorker.size}")}},0,15,TimeUnit.SECONDS)}override def receive: PartialFunction[Any, Unit] = {
//    case "test" => println("接收到了测试消息");//      println(s"Master收到了来自Worker的信息workerId:$workerId,workerMemory:$workerMemory,workerCores:$workerCores")//给Worker发送异步消息//      rpcEndpointRef.send("response")// 接收到Worker发送过来的注册消息case RegisterWorker(rpcEndpointRef,workerId,workerMemory,workerCores) => {//封装Worker传递过来的信息val workerInfo = new WorkerInfo(workerId, workerMemory, workerCores)idToWorker(workerId) = workerInfo//向Worker返回一个注册成功的消息rpcEndpointRef.send(RegisteredWorker)}//接收到Worker发送过来的心跳信息case HeartBeat(workerId) =>{val workerInfo = idToWorker(workerId)//更新最后一次访问时间workerInfo.lastHearBeatTime = System.currentTimeMillis()}
}//接收同步消息override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case "ask-msg" => {println("接收到来自Worker的同步消息")//Master响应Worker的请求:给Worker返回消息context.reply("reply-msg")}}
}object Master {def main(args: Array[String]): Unit = {val conf = new SparkConf()//创建 SecurityManager(安全管理器,对系统资源的访问进行检查和限制)val securityMgr  = new SecurityManager(conf)//创建rpcEnv,并指定名称、IP地址和端口等val rpcEnv = RpcEnv.create("SparkMaster", "localhost", 8888, conf, securityMgr)//创建Master RpcEndpointval master = new Master(rpcEnv)//将Master的RpcEndpoint传入到setupEndpoint,并指定名称,返回一个RpcEndpoint的引用,val masterEndpoint = rpcEnv.setupEndpoint("master", master)//通过RpcEndpoint的引用发送消息
//    masterEndpoint.send("test")//将程序挂起,等待退出rpcEnv.awaitTermination()}}

(2)Worker

package org.apache.spark.wakedataimport org.apache.spark.SparkConf
import org.apache.spark.SecurityManager
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}import java.util.concurrent.{Executors, TimeUnit}
import scala.concurrent.ExecutionContext.Implicits.globalclass Worker(val rpcEnv:RpcEnv) extends ThreadSafeRpcEndpoint{var masterEndpointRef:RpcEndpointRef = _val WORKER_ID =  "worker02"override def onStart(): Unit = {//向Master发送请求连接(本质上是连接master的endPoint)masterEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 8888), "master")//向Master发送注册Worker的请求(注意这里发送的是同步消息,底层使用的是ask方法.为何是同步发送? 因为必须首先建立好连接,然后才能发送消息)//其中self是worker RpcEndpoint的引用masterEndpointRef.send(RegisterWorker(self,WORKER_ID,1000,8))}//接收异步消息override def receive: PartialFunction[Any, Unit] = {
//    case "response" => {
//      println("Worker接收到Master返回的消息")
//      //向Master发送同步消息
//      val future = masterEndpointRef.ask[String]("ask-msg")
//      //接收Master返回的消息
//      future.map(res => println(s"Worker接收到Master返回的响应请求消息:$res"))
//    }//Worker接收到Master发送过来的异步消息case RegisteredWorker => {//启动一个定时器val service = Executors.newScheduledThreadPool(1)service.scheduleAtFixedRate(new Runnable {override def run(): Unit = {masterEndpointRef.send(HeartBeat(WORKER_ID))}},0,10,TimeUnit.SECONDS)}}
}object Worker {def main(args: Array[String]): Unit = {val conf = new SparkConf()val SecurityMgr = new SecurityManager(conf)//创建rpcEnvval rpcEnv = RpcEnv.create("SparkWorker", "localhost", 9998, conf,SecurityMgr)//创建rpcendpointval worker = new Worker(rpcEnv)//返回一个RpcEndpoint的引用val workerEndpoint = rpcEnv.setupEndpoint("Worker", worker)rpcEnv.awaitTermination()}}

(3)RegisterWorker-样例类和伴生对象

   此类封装Worker注册信息

package org.apache.spark.wakedataimport org.apache.spark.rpc.RpcEndpointRefcase class RegisterWorker(rpcEndpointRef:RpcEndpointRef,workerId:String,workerMemory:Int,workerCores:Int)
package org.apache.spark.wakedatacase object RegisteredWorker

(4)WorkerInfo

package org.apache.spark.wakedataclass WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long = _
}
package org.apache.spark.wakedataclass WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long = _
}
package org.apache.spark.wakedataclass WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long = _
}

3、关于SparkRPC的一些细节 

1、接收异步消息是在receive方法

2、接收同步消息是在receiveAndReply

 //接收同步消息override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case "ask-msg" => {println("接收到来自Worker的同步消息")//Master响应Worker的请求:给Worker返回消息context.reply("reply-msg")}}
}

3、发送同步消息使用ask

//      //向Master发送同步消息
//      val future = masterEndpointRef.ask[String]("ask-msg")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/759030.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

谷歌Gemma大模型部署记录

谷歌Gemma大模型部署记录 配置信息 1.系统:Ubuntu20 2.显卡:RTX3060 6G 一、安装Ollama 官网地址:https://ollama.com/download/linux 按照指令安装 curl -fsSL https://ollama.com/install.sh | sh二、运行模型 输入指令:…

【Java】:类和对象

1.面向对象的初步认知 1.1 什么是面向对象 Java是一门面向对象的语言,在面向对象的世界里,一切皆为对象。面向对象是解决问题的一种思想,主要依靠对象之间的交互完成一件事情。用面向对象的思想来涉及程序,更符合人们对事物的认知…

【LeetCode-114.二叉树展开为链表】

题目详情: 给你二叉树的根结点 root ,请你将它展开为一个单链表: 展开后的单链表应该同样使用 TreeNode ,其中 right 子指针指向链表中下一个结点,而左子指针始终为 null 。展开后的单链表应该与二叉树 先序遍历 顺序…

某政务项目驻场全栈Java开发招聘要求-MD主数据子域-招2人

原创作者:田超凡(程序员田宝宝) 版权所有,引用请注明原作者,严禁复制转载 写在前面:临时的核心保密项目、周期3-4个月。要求能接受封闭式开发,Base昆明,项目是内网物理服务器集群下…

seleniumUI自动化实例(CSDN发布文章)

1.CSDN登陆成功后,点击发布 源码: #点击首页中的发布按钮 CSDNconf.driver.find_element(By.LINK_TEXT,"发布").click() time.sleep(15) 2.输入标题 #输入文章标题,标题格式“selenium UI自动化测试实例今天的日期” CSDNconf.d…

POI和EasyExcel区别和操作Excel

POI和EasyExcel操作Excel 常用场景 1、将用户信息导出为excel表格(导出数据… ) 2、将Excel表中的信息录入到网站数据库(文件数据上传… ) 开发中经常会设计到excel的处理,如导出Excel,导入Excel到数据库…

springboot+itextpdf+thymeleaf+ognl根据静态模版文件实现动态生成pdf文件并导出demo

第一步&#xff1a;导入maven依赖 <!-- 导出为PDF依赖包 --><dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId></dependency><dependency><groupId>com.itextpdf</groupId><art…

HarmonyOS(鸿蒙)应用开发——(一)

目录 1 创建hellopro项目 2 了解ArkTS 3 了解ArkTS的组件 4 组件介绍 4.1 常用基础组件&#xff1a; 4.1.1 Text 4.1.2 Button 4.1.3 TextInput 4.2 容器组件 4.2.1 Column 4.2.2 Row 5 案例——实现一个简易登录页面 5.1 在实现预览效果之前&#xff0c;我们…

【机器学习】基于果蝇算法优化的BP神经网络分类预测(FOA-BP)

目录 1.原理与思路2.设计与实现3.结果预测4.代码获取 1.原理与思路 【智能算法应用】智能算法优化BP神经网络思路【智能算法】果蝇算法&#xff08;FOA&#xff09;原理及实现 2.设计与实现 数据集&#xff1a; 多输入多输出&#xff1a;样本特征24&#xff0c;标签类别4。…

【计算机视觉】三、图像处理——实验:图像去模糊和去噪、提取边缘特征

文章目录 0. 实验环境1. 理论基础1.1 滤波器&#xff08;卷积核&#xff09;1.2 PyTorch:卷积操作 2. 图像处理2.1 图像读取2.2 查看通道2.3 图像处理 3. 图像去模糊4. 图像去噪4.1 添加随机噪点4.2 图像去噪 0. 实验环境 本实验使用了PyTorch深度学习框架&#xff0c;相关操作…

bezier曲线拟合椭圆弧线

椭圆弧线用bezier曲线拟合 。 先计算出 椭圆中心 起始角度 旋转角度 S t e p 1 : C o m p u t e ( x 1 ′ , y 1 ′ ) Step 1: Compute(x_1, y_1) Step1:Compute(x1′​,y1′​) ( x 1 ′ y 1 ′ ) ( cos ⁡ φ sin ⁡ φ − sin ⁡ φ cos ⁡ φ ) ⋅ ( x 1 − x 2 2 y 1 −…

some/ip CAN CANFD

关于SOME/IP的理解 在CAN总线的车载网络中&#xff0c;通信过程是面向信号的 当ECU的信号的值发生了改变&#xff0c;或者发送周期到了&#xff0c;就会发送消息&#xff0c;而不考虑接收者是否需要&#xff0c;这样就会造成总线上出现不必要的信息&#xff0c;占用了带宽 …

RabbitMQ详细讲解

目录 4.0 AMQP协议的回顾 4.1 RabbitMQ支持的消息模型 4.2 引入依赖 4.3 第一种模型(直连) 1. 开发生产者 2. 开发消费者 3. 参数的说明 4.4 第二种模型(work quene) 1. 开发生产者 2.开发消费者-1 3.开发消费者-2 4.测试结果 5.消息自动确认机制 4.5 第三种模型(…

React——props children (插槽平替)

React当中不存在v-slot插槽这种概念&#xff0c;而当我们又需要实现这个种功能时&#xff0c;该怎么办呢&#xff1f; 我们可以通过props children属性去实现。 props children属性&#xff1a; children属性&#xff1a;表示该组件的子节点&#xff0c;自动放在props的chil…

开源表单设计器vue-form-design自动化校验实现原理

表单校验可以改善用户体验和减轻服务器的压力, 而动态配置表单校验能极大的提高动态表单的扩展性、灵活性, 满足多样性、差异化需求 目标 &#x1f44c;&#xff0c;首先我们简要说下要实现的目标功能&#xff1a; 具有基础的表单验证功能提供一些内置验证规则提供对外开放的…

ORACLE:VARCHAR2(4000)太小怎么办?

目录 数据备份&#xff1a; 1. 创建新列&#xff1a; 2. 迁移数据&#xff1a; 3. 验证数据完整性&#xff1a; 4.删除旧列&#xff1a; 5. 重命名新列&#xff08;如果需要保持原列名&#xff09;&#xff1a; 在Oracle数据库中&#xff0c;你不能直接通过ALTER TABLE语…

用OceanBase binlog service 轻松进行数据回滚

背景 在日常的数据库运维过程中&#xff0c;难免会遭遇数据误操作的情形&#xff0c;比如因疏忽而执行了非预期的delete或update操作&#xff0c;这时就需要进行数据回滚。如果在OceanBase中启用了回收站功能&#xff0c;并设置了合适的undo_retention&#xff0c;那么我们可以…

如何在ubuntu 18.04中升级python 3.6到3.7

在ubuntu下安装python 3.7有两种方法: 1,通过使用Deadsnakes PPA中的标准apt工具(本文暂时只介绍这种方法) 2,从源代码进行构建。 前提条件&#xff1a; 需要以root用户或具有sudo访问权限的用户身份登录才能在Ubuntu系统上安装软件包。 方法一&#xff1a;使用apt工具安装…

jmx_prometheus_javaagent-0.19.0.jar+Prometheus+Grafana 监控Tongweb嵌入式(by lqw)

文章目录 1.思路2.部署准备3.应用jar包修改配置和导入tw嵌入式的依赖&#xff08;参考&#xff09;4.Prometheus部署5.Prometheus配置6.安装和配置Grafana 1.思路 Tongweb嵌入式最终是把依赖打入到java应用&#xff08;也就是jar包里&#xff09;&#xff0c;然后启动jar包进行…

Spring Boot 配置中心与应用属性完美匹配 | 深入探究@ConfigurationProperties与@NacosPropertySource

ConfigurationProperties ConfigurationProperties 注解是 Spring Boot 中用于将外部配置文件&#xff08;如 YAML 或 properties 文件&#xff09;中的属性映射到 Java Bean 类属性的强大工具。 以下是关于 ConfigurationProperties 注解与 YAML 配置文件属性匹配规则的详细…