实现
package cn.itcast.akkaimport akka.actor.{Actor, ActorSystem, Props} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactoryimport scala.collection.mutableimport scala.concurrent.duration._ class Master(val host: String, val port: Int) extends Actor {//保存WorkerID 到 WorkerInfo的映射val idToWorker = new mutable.HashMap[String, WorkerInfo]()//保存所的WorkerInfo信息val workers = new mutable.HashSet[WorkerInfo]()val CHECK_INTERVAL = 15000override def preStart(): Unit = {//导入隐式转换import context.dispatchercontext.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)}override def receive: Receive = {//Worker发送个Mater的注册消息case RegisterWorker(workerId, cores, memory) => {if (!idToWorker.contains(workerId)) {//封装worker发送的信息val workerInfo = new WorkerInfo(workerId, cores, memory)//保存workerInfoidToWorker(workerId) = workerInfoworkers += workerInfo//Master向Worker反馈注册成功的消息sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}@$host:$port/user/${Master.MASTER_NAME}")}}//Worker发送给Master的心跳信息case Heartbeat(workerId) => {if (idToWorker.contains(workerId)) {val workerInfo = idToWorker(workerId)val currentTime = System.currentTimeMillis()//更新上一次心跳时间workerInfo.lastHeartbeatTime = currentTime}}//检测超时的Workercase CheckTimeOutWorker => {val currentTime = System.currentTimeMillis()val deadWorkers: mutable.HashSet[WorkerInfo] = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL)// for(w <- deadWorkers) {// idToWorker -= w.id// workers -= w// }deadWorkers.foreach(w => {idToWorker -= w.idworkers -= w})println("alive worker size : " + workers.size)}} }object Master {val MASTER_SYSTEM = "MaterActorSystem"val MASTER_NAME = "Master"def main(args: Array[String]) {// val host = args(0)// val port = args(1).toIntval host = "127.0.0.1"val port = 8888val confStr =s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//ActorSystem是单例的,用于创建Acotor并监控actorval actorSystem = ActorSystem(MASTER_SYSTEM, conf)//通过ActorSystem创建ActoractorSystem.actorOf(Props(new Master(host, port)), MASTER_NAME)actorSystem.awaitTermination()} }
package cn.itcast.akka trait Message extends Serializable//Worker -> Master case class RegisterWorker(id: String, cores: Int, memory: Int) extends Message//Master -> Worker case class RegisteredWorker(masterUrl: String) extends Message//Worker -> Master case class Heartbeat(id: String) extends Message//Worker internal message case object SendHeartbeat//Master internal message case object CheckTimeOutWorker
package cn.itcast.akkaimport java.util.UUID import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactoryimport scala.concurrent.duration._ class Worker(val cores: Int, val memory: Int, val masterHost: String, val masterPort: Int) extends Actor {//Master的引用var master: ActorSelection = _//Worker的IDval workerId = UUID.randomUUID().toString//masterUrlvar masterUrl: String = _val HEARTBEAT_INTERVAL = 10000//preStart在构造器之后receive之前执行override def preStart(): Unit = {//首先跟Master建立连接master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}@$masterHost:$masterPort/user/${Master.MASTER_NAME}")//通过master的引用向Master发送注册消息master ! RegisterWorker(workerId, cores, memory)}override def receive: Receive = {//Master发送给Worker注册成功的消息case RegisteredWorker(masterUrl) => {this.masterUrl = masterUrl//启动定时任务,向Master发送心跳//导入隐式转换import context.dispatchercontext.system.scheduler.schedule(0 millis, HEARTBEAT_INTERVAL millis, self, SendHeartbeat)}case SendHeartbeat => {//向Master发送心跳master ! Heartbeat(workerId)}} }object Worker {def main(args: Array[String]) {//Worker的地址和端口// val host = args(0)// val port = args(1).toInt// val cores = args(2).toInt// val memory = args(3).toIntval host = "127.0.0.1"val port = 9999val cores = 8val memory = 1024//Master的地址和端口// val masterHost = args(4)// val masterPort = args(5).toIntval masterHost = "127.0.0.1"val masterPort = 8888val confStr =s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//单例的ActorSystemval actorSystem = ActorSystem("WorkerActorSystem", conf)//通过actorSystem来创建Actorval worker = actorSystem.actorOf(Props(new Worker(cores, memory, masterHost, masterPort)), "Worker")actorSystem.awaitTermination()} }
package cn.itcast.akka class WorkerInfo(val id: String, val cores: Int, val memory: Int) {//TODOvar lastHeartbeatTime: Long = _}