一、何为SparkRPC
RPC全称为远程过程调用(Remote Procedure Call),它是一种计算机通信协议,允许一个计算机程序调用另一个计算机上的子程序,而无需了解底层网络细节。通过RPC,一个计算机程序可以像调用本地程序一样调用远程程序,使得分布式应用程序的开发更加简单和高效。
二、SparkRPC示意图
三、SparkRPC代码示例
1、基本流程
①启动Master和Worker
②Worker向Master发送注册信息(封装成一个类:RegisterWorker)
case class RegisterWorker(rpcEndpointRef:RpcEndpointRef,workerId:String,workerMemory:Int,workerCores:Int)
③Master收到Worker的注册信息,并将其存放到一个HashMap,其中Key为WorkerId,Value为WorkerInfo,其结构如下:
class WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long = _
}
其中lastHearBeatTime是该Worker最后一次心跳时间。
④Master中启动一个定时任务(设定为每15s执行一次),定时从HashMap中获取各个Worker信息,并将其中的lastHearBeatTime与当前时间进行比较,如果大于10s,就认为该Worker已经与Master失联,将其从HashMap中剔除
⑤)(与④其实同步进行) Worker同样开启了一个定时任务(设定为每10s执行一次), 定时给Master发送心跳HeartBeat; Master收到该心跳后,根据WorkerId从HashMap中取出对应的Worker信息,并将其lastHearBeatTime修改为当前时间,从而不断更新与Master保持通信的Worker的最后心跳时间。
case class HeartBeat(WorkerId:String)
2、完整代码
(1)Master
package org.apache.spark.wakedataimport org.apache.spark.SparkConf
import org.apache.spark.SecurityManager
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}import java.util.concurrent.{Executors, TimeUnit}
import scala.collection.mutableclass Master(val rpcEnv:RpcEnv) extends ThreadSafeRpcEndpoint{val idToWorker = new mutable.HashMap[String, WorkerInfo]()override def onStart(): Unit = {//启动一个定时器val service = Executors.newScheduledThreadPool(1)service.scheduleAtFixedRate(new Runnable {override def run(): Unit = {// 如果Worker最后一次心跳时间距离当前时间 大于10s,就需要移除该Workerval deadWorkers = idToWorker.values.filter(w => System.currentTimeMillis() - w.lastHearBeatTime > 10000)deadWorkers.foreach(w => {idToWorker -= w.workerId})println(s"当前活跃的Worker数量:${idToWorker.size}")}},0,15,TimeUnit.SECONDS)}override def receive: PartialFunction[Any, Unit] = {
// case "test" => println("接收到了测试消息");// println(s"Master收到了来自Worker的信息workerId:$workerId,workerMemory:$workerMemory,workerCores:$workerCores")//给Worker发送异步消息// rpcEndpointRef.send("response")// 接收到Worker发送过来的注册消息case RegisterWorker(rpcEndpointRef,workerId,workerMemory,workerCores) => {//封装Worker传递过来的信息val workerInfo = new WorkerInfo(workerId, workerMemory, workerCores)idToWorker(workerId) = workerInfo//向Worker返回一个注册成功的消息rpcEndpointRef.send(RegisteredWorker)}//接收到Worker发送过来的心跳信息case HeartBeat(workerId) =>{val workerInfo = idToWorker(workerId)//更新最后一次访问时间workerInfo.lastHearBeatTime = System.currentTimeMillis()}
}//接收同步消息override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case "ask-msg" => {println("接收到来自Worker的同步消息")//Master响应Worker的请求:给Worker返回消息context.reply("reply-msg")}}
}object Master {def main(args: Array[String]): Unit = {val conf = new SparkConf()//创建 SecurityManager(安全管理器,对系统资源的访问进行检查和限制)val securityMgr = new SecurityManager(conf)//创建rpcEnv,并指定名称、IP地址和端口等val rpcEnv = RpcEnv.create("SparkMaster", "localhost", 8888, conf, securityMgr)//创建Master RpcEndpointval master = new Master(rpcEnv)//将Master的RpcEndpoint传入到setupEndpoint,并指定名称,返回一个RpcEndpoint的引用,val masterEndpoint = rpcEnv.setupEndpoint("master", master)//通过RpcEndpoint的引用发送消息
// masterEndpoint.send("test")//将程序挂起,等待退出rpcEnv.awaitTermination()}}
(2)Worker
package org.apache.spark.wakedataimport org.apache.spark.SparkConf
import org.apache.spark.SecurityManager
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}import java.util.concurrent.{Executors, TimeUnit}
import scala.concurrent.ExecutionContext.Implicits.globalclass Worker(val rpcEnv:RpcEnv) extends ThreadSafeRpcEndpoint{var masterEndpointRef:RpcEndpointRef = _val WORKER_ID = "worker02"override def onStart(): Unit = {//向Master发送请求连接(本质上是连接master的endPoint)masterEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 8888), "master")//向Master发送注册Worker的请求(注意这里发送的是同步消息,底层使用的是ask方法.为何是同步发送? 因为必须首先建立好连接,然后才能发送消息)//其中self是worker RpcEndpoint的引用masterEndpointRef.send(RegisterWorker(self,WORKER_ID,1000,8))}//接收异步消息override def receive: PartialFunction[Any, Unit] = {
// case "response" => {
// println("Worker接收到Master返回的消息")
// //向Master发送同步消息
// val future = masterEndpointRef.ask[String]("ask-msg")
// //接收Master返回的消息
// future.map(res => println(s"Worker接收到Master返回的响应请求消息:$res"))
// }//Worker接收到Master发送过来的异步消息case RegisteredWorker => {//启动一个定时器val service = Executors.newScheduledThreadPool(1)service.scheduleAtFixedRate(new Runnable {override def run(): Unit = {masterEndpointRef.send(HeartBeat(WORKER_ID))}},0,10,TimeUnit.SECONDS)}}
}object Worker {def main(args: Array[String]): Unit = {val conf = new SparkConf()val SecurityMgr = new SecurityManager(conf)//创建rpcEnvval rpcEnv = RpcEnv.create("SparkWorker", "localhost", 9998, conf,SecurityMgr)//创建rpcendpointval worker = new Worker(rpcEnv)//返回一个RpcEndpoint的引用val workerEndpoint = rpcEnv.setupEndpoint("Worker", worker)rpcEnv.awaitTermination()}}
(3)RegisterWorker-样例类和伴生对象
此类封装Worker注册信息
package org.apache.spark.wakedataimport org.apache.spark.rpc.RpcEndpointRefcase class RegisterWorker(rpcEndpointRef:RpcEndpointRef,workerId:String,workerMemory:Int,workerCores:Int)
package org.apache.spark.wakedatacase object RegisteredWorker
(4)WorkerInfo
package org.apache.spark.wakedataclass WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long = _
}
package org.apache.spark.wakedataclass WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long = _
}
package org.apache.spark.wakedataclass WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long = _
}
3、关于SparkRPC的一些细节
1、接收异步消息是在receive方法
2、接收同步消息是在receiveAndReply
//接收同步消息override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case "ask-msg" => {println("接收到来自Worker的同步消息")//Master响应Worker的请求:给Worker返回消息context.reply("reply-msg")}}
}
3、发送同步消息使用ask
// //向Master发送同步消息
// val future = masterEndpointRef.ask[String]("ask-msg")