Spark源码(一)-SparkRPC示例

一、何为SparkRPC

RPC全称为远程过程调用(Remote Procedure Call),它是一种计算机通信协议,允许一个计算机程序调用另一个计算机上的子程序,而无需了解底层网络细节。通过RPC,一个计算机程序可以像调用本地程序一样调用远程程序,使得分布式应用程序的开发更加简单和高效。

二、SparkRPC示意图

三、SparkRPC代码示例

1、基本流程

①启动Master和Worker

②Worker向Master发送注册信息(封装成一个类:RegisterWorker)

case class RegisterWorker(rpcEndpointRef:RpcEndpointRef,workerId:String,workerMemory:Int,workerCores:Int)

③Master收到Worker的注册信息,并将其存放到一个HashMap,其中Key为WorkerId,Value为WorkerInfo,其结构如下:

class WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long = _
}

其中lastHearBeatTime是该Worker最后一次心跳时间。

④Master中启动一个定时任务(设定为每15s执行一次),定时从HashMap中获取各个Worker信息,并将其中的lastHearBeatTime与当前时间进行比较,如果大于10s,就认为该Worker已经与Master失联,将其从HashMap中剔除

⑤)(与④其实同步进行) Worker同样开启了一个定时任务(设定为每10s执行一次), 定时给Master发送心跳HeartBeat; Master收到该心跳后,根据WorkerId从HashMap中取出对应的Worker信息,并将其lastHearBeatTime修改为当前时间,从而不断更新与Master保持通信的Worker的最后心跳时间。

case class HeartBeat(WorkerId:String)

2、完整代码 

(1)Master

package org.apache.spark.wakedataimport org.apache.spark.SparkConf
import org.apache.spark.SecurityManager
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}import java.util.concurrent.{Executors, TimeUnit}
import scala.collection.mutableclass Master(val rpcEnv:RpcEnv) extends ThreadSafeRpcEndpoint{val idToWorker = new mutable.HashMap[String, WorkerInfo]()override def onStart(): Unit = {//启动一个定时器val service = Executors.newScheduledThreadPool(1)service.scheduleAtFixedRate(new Runnable {override def run(): Unit = {// 如果Worker最后一次心跳时间距离当前时间 大于10s,就需要移除该Workerval deadWorkers = idToWorker.values.filter(w => System.currentTimeMillis() - w.lastHearBeatTime > 10000)deadWorkers.foreach(w => {idToWorker -= w.workerId})println(s"当前活跃的Worker数量:${idToWorker.size}")}},0,15,TimeUnit.SECONDS)}override def receive: PartialFunction[Any, Unit] = {
//    case "test" => println("接收到了测试消息");//      println(s"Master收到了来自Worker的信息workerId:$workerId,workerMemory:$workerMemory,workerCores:$workerCores")//给Worker发送异步消息//      rpcEndpointRef.send("response")// 接收到Worker发送过来的注册消息case RegisterWorker(rpcEndpointRef,workerId,workerMemory,workerCores) => {//封装Worker传递过来的信息val workerInfo = new WorkerInfo(workerId, workerMemory, workerCores)idToWorker(workerId) = workerInfo//向Worker返回一个注册成功的消息rpcEndpointRef.send(RegisteredWorker)}//接收到Worker发送过来的心跳信息case HeartBeat(workerId) =>{val workerInfo = idToWorker(workerId)//更新最后一次访问时间workerInfo.lastHearBeatTime = System.currentTimeMillis()}
}//接收同步消息override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case "ask-msg" => {println("接收到来自Worker的同步消息")//Master响应Worker的请求:给Worker返回消息context.reply("reply-msg")}}
}object Master {def main(args: Array[String]): Unit = {val conf = new SparkConf()//创建 SecurityManager(安全管理器,对系统资源的访问进行检查和限制)val securityMgr  = new SecurityManager(conf)//创建rpcEnv,并指定名称、IP地址和端口等val rpcEnv = RpcEnv.create("SparkMaster", "localhost", 8888, conf, securityMgr)//创建Master RpcEndpointval master = new Master(rpcEnv)//将Master的RpcEndpoint传入到setupEndpoint,并指定名称,返回一个RpcEndpoint的引用,val masterEndpoint = rpcEnv.setupEndpoint("master", master)//通过RpcEndpoint的引用发送消息
//    masterEndpoint.send("test")//将程序挂起,等待退出rpcEnv.awaitTermination()}}

(2)Worker

package org.apache.spark.wakedataimport org.apache.spark.SparkConf
import org.apache.spark.SecurityManager
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}import java.util.concurrent.{Executors, TimeUnit}
import scala.concurrent.ExecutionContext.Implicits.globalclass Worker(val rpcEnv:RpcEnv) extends ThreadSafeRpcEndpoint{var masterEndpointRef:RpcEndpointRef = _val WORKER_ID =  "worker02"override def onStart(): Unit = {//向Master发送请求连接(本质上是连接master的endPoint)masterEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 8888), "master")//向Master发送注册Worker的请求(注意这里发送的是同步消息,底层使用的是ask方法.为何是同步发送? 因为必须首先建立好连接,然后才能发送消息)//其中self是worker RpcEndpoint的引用masterEndpointRef.send(RegisterWorker(self,WORKER_ID,1000,8))}//接收异步消息override def receive: PartialFunction[Any, Unit] = {
//    case "response" => {
//      println("Worker接收到Master返回的消息")
//      //向Master发送同步消息
//      val future = masterEndpointRef.ask[String]("ask-msg")
//      //接收Master返回的消息
//      future.map(res => println(s"Worker接收到Master返回的响应请求消息:$res"))
//    }//Worker接收到Master发送过来的异步消息case RegisteredWorker => {//启动一个定时器val service = Executors.newScheduledThreadPool(1)service.scheduleAtFixedRate(new Runnable {override def run(): Unit = {masterEndpointRef.send(HeartBeat(WORKER_ID))}},0,10,TimeUnit.SECONDS)}}
}object Worker {def main(args: Array[String]): Unit = {val conf = new SparkConf()val SecurityMgr = new SecurityManager(conf)//创建rpcEnvval rpcEnv = RpcEnv.create("SparkWorker", "localhost", 9998, conf,SecurityMgr)//创建rpcendpointval worker = new Worker(rpcEnv)//返回一个RpcEndpoint的引用val workerEndpoint = rpcEnv.setupEndpoint("Worker", worker)rpcEnv.awaitTermination()}}

(3)RegisterWorker-样例类和伴生对象

   此类封装Worker注册信息

package org.apache.spark.wakedataimport org.apache.spark.rpc.RpcEndpointRefcase class RegisterWorker(rpcEndpointRef:RpcEndpointRef,workerId:String,workerMemory:Int,workerCores:Int)
package org.apache.spark.wakedatacase object RegisteredWorker

(4)WorkerInfo

package org.apache.spark.wakedataclass WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long = _
}
package org.apache.spark.wakedataclass WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long = _
}
package org.apache.spark.wakedataclass WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long = _
}

3、关于SparkRPC的一些细节 

1、接收异步消息是在receive方法

2、接收同步消息是在receiveAndReply

 //接收同步消息override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case "ask-msg" => {println("接收到来自Worker的同步消息")//Master响应Worker的请求:给Worker返回消息context.reply("reply-msg")}}
}

3、发送同步消息使用ask

//      //向Master发送同步消息
//      val future = masterEndpointRef.ask[String]("ask-msg")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/759030.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

谷歌Gemma大模型部署记录

谷歌Gemma大模型部署记录 配置信息 1.系统:Ubuntu20 2.显卡:RTX3060 6G 一、安装Ollama 官网地址:https://ollama.com/download/linux 按照指令安装 curl -fsSL https://ollama.com/install.sh | sh二、运行模型 输入指令:…

【Java】:类和对象

1.面向对象的初步认知 1.1 什么是面向对象 Java是一门面向对象的语言,在面向对象的世界里,一切皆为对象。面向对象是解决问题的一种思想,主要依靠对象之间的交互完成一件事情。用面向对象的思想来涉及程序,更符合人们对事物的认知…

【LeetCode-114.二叉树展开为链表】

题目详情: 给你二叉树的根结点 root ,请你将它展开为一个单链表: 展开后的单链表应该同样使用 TreeNode ,其中 right 子指针指向链表中下一个结点,而左子指针始终为 null 。展开后的单链表应该与二叉树 先序遍历 顺序…

seleniumUI自动化实例(CSDN发布文章)

1.CSDN登陆成功后,点击发布 源码: #点击首页中的发布按钮 CSDNconf.driver.find_element(By.LINK_TEXT,"发布").click() time.sleep(15) 2.输入标题 #输入文章标题,标题格式“selenium UI自动化测试实例今天的日期” CSDNconf.d…

POI和EasyExcel区别和操作Excel

POI和EasyExcel操作Excel 常用场景 1、将用户信息导出为excel表格(导出数据… ) 2、将Excel表中的信息录入到网站数据库(文件数据上传… ) 开发中经常会设计到excel的处理,如导出Excel,导入Excel到数据库…

springboot+itextpdf+thymeleaf+ognl根据静态模版文件实现动态生成pdf文件并导出demo

第一步&#xff1a;导入maven依赖 <!-- 导出为PDF依赖包 --><dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId></dependency><dependency><groupId>com.itextpdf</groupId><art…

HarmonyOS(鸿蒙)应用开发——(一)

目录 1 创建hellopro项目 2 了解ArkTS 3 了解ArkTS的组件 4 组件介绍 4.1 常用基础组件&#xff1a; 4.1.1 Text 4.1.2 Button 4.1.3 TextInput 4.2 容器组件 4.2.1 Column 4.2.2 Row 5 案例——实现一个简易登录页面 5.1 在实现预览效果之前&#xff0c;我们…

【机器学习】基于果蝇算法优化的BP神经网络分类预测(FOA-BP)

目录 1.原理与思路2.设计与实现3.结果预测4.代码获取 1.原理与思路 【智能算法应用】智能算法优化BP神经网络思路【智能算法】果蝇算法&#xff08;FOA&#xff09;原理及实现 2.设计与实现 数据集&#xff1a; 多输入多输出&#xff1a;样本特征24&#xff0c;标签类别4。…

【计算机视觉】三、图像处理——实验:图像去模糊和去噪、提取边缘特征

文章目录 0. 实验环境1. 理论基础1.1 滤波器&#xff08;卷积核&#xff09;1.2 PyTorch:卷积操作 2. 图像处理2.1 图像读取2.2 查看通道2.3 图像处理 3. 图像去模糊4. 图像去噪4.1 添加随机噪点4.2 图像去噪 0. 实验环境 本实验使用了PyTorch深度学习框架&#xff0c;相关操作…

bezier曲线拟合椭圆弧线

椭圆弧线用bezier曲线拟合 。 先计算出 椭圆中心 起始角度 旋转角度 S t e p 1 : C o m p u t e ( x 1 ′ , y 1 ′ ) Step 1: Compute(x_1, y_1) Step1:Compute(x1′​,y1′​) ( x 1 ′ y 1 ′ ) ( cos ⁡ φ sin ⁡ φ − sin ⁡ φ cos ⁡ φ ) ⋅ ( x 1 − x 2 2 y 1 −…

some/ip CAN CANFD

关于SOME/IP的理解 在CAN总线的车载网络中&#xff0c;通信过程是面向信号的 当ECU的信号的值发生了改变&#xff0c;或者发送周期到了&#xff0c;就会发送消息&#xff0c;而不考虑接收者是否需要&#xff0c;这样就会造成总线上出现不必要的信息&#xff0c;占用了带宽 …

RabbitMQ详细讲解

目录 4.0 AMQP协议的回顾 4.1 RabbitMQ支持的消息模型 4.2 引入依赖 4.3 第一种模型(直连) 1. 开发生产者 2. 开发消费者 3. 参数的说明 4.4 第二种模型(work quene) 1. 开发生产者 2.开发消费者-1 3.开发消费者-2 4.测试结果 5.消息自动确认机制 4.5 第三种模型(…

开源表单设计器vue-form-design自动化校验实现原理

表单校验可以改善用户体验和减轻服务器的压力, 而动态配置表单校验能极大的提高动态表单的扩展性、灵活性, 满足多样性、差异化需求 目标 &#x1f44c;&#xff0c;首先我们简要说下要实现的目标功能&#xff1a; 具有基础的表单验证功能提供一些内置验证规则提供对外开放的…

用OceanBase binlog service 轻松进行数据回滚

背景 在日常的数据库运维过程中&#xff0c;难免会遭遇数据误操作的情形&#xff0c;比如因疏忽而执行了非预期的delete或update操作&#xff0c;这时就需要进行数据回滚。如果在OceanBase中启用了回收站功能&#xff0c;并设置了合适的undo_retention&#xff0c;那么我们可以…

jmx_prometheus_javaagent-0.19.0.jar+Prometheus+Grafana 监控Tongweb嵌入式(by lqw)

文章目录 1.思路2.部署准备3.应用jar包修改配置和导入tw嵌入式的依赖&#xff08;参考&#xff09;4.Prometheus部署5.Prometheus配置6.安装和配置Grafana 1.思路 Tongweb嵌入式最终是把依赖打入到java应用&#xff08;也就是jar包里&#xff09;&#xff0c;然后启动jar包进行…

单片机LED灯闪烁

延时函数计算&#xff08;相关代码生成&#xff09;&#xff1a; #include "reg52.h" #include <INTRINS.H> void Delay500ms() //11.0592MHz {unsigned char i, j, k;_nop_();_nop_();i 22;j 3;k 227;do{do{while (--k);} while (--j);} while (--i); }vo…

让扣你代码的人电脑关机-js反爬

文案 让扣你代码的人电脑关机&#xff0c;赶紧学起来。众所周知。浏览器中无法导入模块&#xff0c;会报错。nodejs中可以导入模块。那么我们可以在导入语句后加入整蛊代码。在捕获异常后执行正常的代码。那么代码在浏览器中就会正常执行&#xff0c;而当你在本地环境中执行的…

Docker常用命令!!!

一、docker基础命令 1、启动docker systemctl start docker 2、关闭docker systemctl stop docker 3、重启docker systemctl restart docker 4、docker设置随服务启动而自启动 systemctl enable docker 5、查看docker 运行状态 systemctl status docker 6、查看docker 版本号信…

Microsoft Edge浏览器Internal Server Error问题解决

网页无法在Microsoft Edge浏览器&#xff0c;尝试Google浏览器可以&#xff0c;排除服务器问题&#xff0c;应该是浏览器本身的问题。 一般这种都是和cookie有关&#xff0c;尝试删除记录 解决&#xff01;

【MQTT】Vue中使用mqtt

MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;作为一种轻量级、开放、灵活、简单、易于实现的通信协议。它基于发布/订阅&#xff08;Publish/Subscribe&#xff09;模式的消息传输协议&#xff0c;在上位机和硬件设备间通信时经常用到。虽然在嵌入式软件一…