大数据学习——akka自定义RPC

 

 

实现

package cn.itcast.akkaimport akka.actor.{Actor, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactoryimport scala.collection.mutableimport scala.concurrent.duration._

class Master(val host: String, val port: Int) extends Actor {//保存WorkerID 到 WorkerInfo的映射val idToWorker = new mutable.HashMap[String, WorkerInfo]()//保存所的WorkerInfo信息val workers = new mutable.HashSet[WorkerInfo]()val CHECK_INTERVAL = 15000override def preStart(): Unit = {//导入隐式转换import context.dispatchercontext.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)}override def receive: Receive = {//Worker发送个Mater的注册消息case RegisterWorker(workerId, cores, memory) => {if (!idToWorker.contains(workerId)) {//封装worker发送的信息val workerInfo = new WorkerInfo(workerId, cores, memory)//保存workerInfoidToWorker(workerId) = workerInfoworkers += workerInfo//Master向Worker反馈注册成功的消息sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}@$host:$port/user/${Master.MASTER_NAME}")}}//Worker发送给Master的心跳信息case Heartbeat(workerId) => {if (idToWorker.contains(workerId)) {val workerInfo = idToWorker(workerId)val currentTime = System.currentTimeMillis()//更新上一次心跳时间workerInfo.lastHeartbeatTime = currentTime}}//检测超时的Workercase CheckTimeOutWorker => {val currentTime = System.currentTimeMillis()val deadWorkers: mutable.HashSet[WorkerInfo] = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL)//      for(w <- deadWorkers) {//        idToWorker -= w.id//        workers -= w//      }deadWorkers.foreach(w => {idToWorker -= w.idworkers -= w})println("alive worker size : " + workers.size)}}
}object Master {val MASTER_SYSTEM = "MaterActorSystem"val MASTER_NAME = "Master"def main(args: Array[String]) {//    val host = args(0)//    val port = args(1).toIntval host = "127.0.0.1"val port = 8888val confStr =s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//ActorSystem是单例的,用于创建Acotor并监控actorval actorSystem = ActorSystem(MASTER_SYSTEM, conf)//通过ActorSystem创建ActoractorSystem.actorOf(Props(new Master(host, port)), MASTER_NAME)actorSystem.awaitTermination()}
}
package cn.itcast.akka

trait Message extends Serializable//Worker -> Master
case class RegisterWorker(id: String, cores: Int, memory: Int) extends Message//Master -> Worker
case class RegisteredWorker(masterUrl: String) extends Message//Worker -> Master
case class Heartbeat(id: String) extends Message//Worker internal message
case object SendHeartbeat//Master internal message
case object CheckTimeOutWorker
package cn.itcast.akkaimport java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactoryimport scala.concurrent.duration._

class Worker(val cores: Int, val memory: Int, val masterHost: String, val masterPort: Int) extends Actor {//Master的引用var master: ActorSelection = _//Worker的IDval workerId = UUID.randomUUID().toString//masterUrlvar masterUrl: String = _val HEARTBEAT_INTERVAL = 10000//preStart在构造器之后receive之前执行override def preStart(): Unit = {//首先跟Master建立连接master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}@$masterHost:$masterPort/user/${Master.MASTER_NAME}")//通过master的引用向Master发送注册消息master ! RegisterWorker(workerId, cores, memory)}override def receive: Receive = {//Master发送给Worker注册成功的消息case RegisteredWorker(masterUrl) => {this.masterUrl = masterUrl//启动定时任务,向Master发送心跳//导入隐式转换import context.dispatchercontext.system.scheduler.schedule(0 millis, HEARTBEAT_INTERVAL millis, self, SendHeartbeat)}case SendHeartbeat => {//向Master发送心跳master ! Heartbeat(workerId)}}
}object Worker {def main(args: Array[String]) {//Worker的地址和端口//    val host = args(0)//    val port = args(1).toInt//    val cores = args(2).toInt//    val memory = args(3).toIntval host = "127.0.0.1"val port = 9999val cores = 8val memory = 1024//Master的地址和端口//    val masterHost = args(4)//    val masterPort = args(5).toIntval masterHost = "127.0.0.1"val masterPort = 8888val confStr =s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//单例的ActorSystemval actorSystem = ActorSystem("WorkerActorSystem", conf)//通过actorSystem来创建Actorval worker = actorSystem.actorOf(Props(new Worker(cores, memory, masterHost, masterPort)), "Worker")actorSystem.awaitTermination()}
}
package cn.itcast.akka
class WorkerInfo(val id: String, val cores: Int, val memory: Int) {//TODOvar lastHeartbeatTime: Long = _}

 

 

转载于:https://www.cnblogs.com/feifeicui/p/10996077.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/248765.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从Client应用场景介绍IdentityServer4(一)

从Client应用场景介绍IdentityServer4&#xff08;一&#xff09; 原文:从Client应用场景介绍IdentityServer4&#xff08;一&#xff09;一、背景 IdentityServer4的介绍将不再叙述&#xff0c;百度下可以找到&#xff0c;且官网的快速入门例子也有翻译的版本。这里主要从Clie…

开发常用代码笔记

Vue 使用moment插件对时间进行格式化&#xff08;全局设置&#xff09; 下载插件 npm install moment --save 在main.js中引入插件 import moment from ‘moment’ 在main.js中定义全局过滤器 Vue.filter(dataFilter,function (dataStr,patten YYYY-MM-DD HH:mm:ss) {retur…

微信小程序——账号及开发工具

1. 注册微信小程序账号 点击我进入微信公众平台 进入后点击立即注册 注册成功且登录后进入小程序管理后台 2. 安装开发者工具 点击进入开发文档 进入安装开发工具&#xff08;稳定版本&#xff09; 一路默认下一步进行安装 3. 开发者工具的使用 使用注册微信小程序的微信号…

CSS注意的地方

content-box和border-box的区别 2018年02月27日 22:20:16 sulingliang 阅读数&#xff1a;8011盒子模型 盒子宽度&#xff1a;paddingbordercontent-width 盒子高度&#xff1a;paddingbordercontent-height 如图所示 盒子模型content-box 说明&#xff1a;在内容宽度和高度之…

机器学习笔记(6) 线性回归

先从最简单的例子开始,假设我们有一组样本(如下图的一个个黑色的圆点),只有一个特征,如下图,横轴是特征值,纵轴是label。比如横轴是房屋面积,纵轴是房屋价格. 现在我们要做什么呢&#xff1f;我们试图找到一条直线yaxb,可以尽量好的拟合这些点. 你可能要问了,为啥是直线,不是曲…

仿微信朋友圈项目梳理

项目功能简介&#xff1a; 用户通过手机号验证码进行登录和注册 可以浏览动态列表中的所有动态 登录成功后用户可以发表自己的动态 也可以对自己认可欣赏的动态进行点赞和评论 也可以通过动态结识志同道合的朋友 进行聊天和探讨 前端&#xff1a;采用Vue框架搭建 weui进行页面…

echarts鼠标事件以及自定义数据获取

事件添加方法&#xff1a; 对应官网位置&#xff1a;https://www.echartsjs.com/api.html#events 鼠标事件包括 click、dblclick、mousedown、mousemove、mouseup、mouseover、mouseout、globalout、contextmenu。 myChart.on(click, function (params) {console.log(params); …

[数学]点、线、面分割问题

平面分割问题 p条直线相交于一点时&#xff0c;分割的图形有 2*(n-1) 个&#xff0c;此时再加一条直线&#xff0c;在 2*(n-1) 的基础上再加 n条&#xff0c;此时为2*n n条曲线&#xff0c;其中有m条相交于一点&#xff0c;每两个曲线都交于两点 平面上有n条直线&#xff0c;且…

移动开发

1.移动端基础 1.1 浏览器现状 PC端浏览器 360浏览器、谷歌浏览器、火狐浏览器、QQ浏览器、百度浏览器&#xff08;停止服务&#xff09;、搜狗浏览器、IE浏览器 移动端浏览器 UC、QQ浏览器、欧朋浏览器、百度手机浏览器、360、搜狗、猎豹、谷歌等其他手机自带的浏览器 国…

Django之路由系统

Django的路由系统 Django 1.11版本 URLConf官方文档 URL配置(URLconf)就像Django 所支撑网站的目录。它的本质是URL与要为该URL调用的视图函数之间的映射表。 你就是以这种方式告诉Django&#xff0c;对于这个URL调用这段代码&#xff0c;对于那个URL调用那段代码。 URLconf配置…

微信小程序——操作数据库

案例一&#xff1a;统计用户的访问次数 业务需求&#xff1a; 统计每个用户对程序的访问次数将访问次数存储到数据库中访问次数应该与用户进行关联 业务逻辑&#xff1a; 如果用户是第一次访问此程序&#xff0c;向数据库添加一条记录&#xff1a;{openid&#xff1a;45454…

shop--12.阿里云部署以及域名绑定

一、申请阿里云服务器&#xff08;1&#xff09;PC访问阿里云https://www.aliyun.com/&#xff0c;申请阿里云帐号&#xff08;可以用您的支付宝帐号登录&#xff0c;因为支付宝帐号已经进行了实名认证&#xff0c;使用起来更方便&#xff09;并登录&#xff08;2&#xff09;找…

微信小程序——获取用户的运动步数

程序获取用户信息步骤 点击参考微信文档中的授权首先程序先向用户申请访问哪些权限用户做出选择后返回给程序程序携带权限访问服务器如果用户允许则返回信息如果用户为允许则不返回 自定义函数getUserRun 为获取用户的微信运动数据 页面加载调用此函数函数中执行下面操作 1…

第一次个人作业

该作业所属课程&#xff1a;https://edu.cnblogs.com/campus/xnsy/SoftwareEngineeringClass2作业要求地址&#xff1a;https://edu.cnblogs.com/campus/xnsy/SoftwareEngineeringClass2/homework/3340团队名称&#xff1a;脑壳痛 作业的目标 1.通过测试其他组的软件项目学习其…

微信小程序——解决上传并部署云函数时报错ResourceNotFound.Function, 未找到函数版本,请创建后再试。 (7f2d9d2d-5eac-4575-9n57-acd66cfa587g

1. 上传部署我们的云函数 2. 报错 错误信息为&#xff1a;Error: ResourceNotFound.Function, 未找到函数版本&#xff0c;请创建后再试。 (7f2d9d2d-5eac-4575-9b57-acd66cfa587e) 3. 原因 原因是可能我们在调试的时候不小心将我们开发控制台中的云函数删除了 4. 解决办法…

【IT界的厨子】酱香鲈鱼

食材: 前世曾经回眸的鲈鱼一条(主要选刺少的鱼&#xff0c;适合孩子吃&#xff0c;大人吃随意&#xff0c;草鱼比较大) 五花肉少许(肥一些的) 豆腐 辅料: 葱姜 蒜(选) 大料 香菜 调味: 啤酒(两罐) 黄豆酱或豆瓣酱(选) 老抽 生抽 料酒 盐 步骤: 1、鱼肉划开&#xff0c;方便炖的…

for each....in、for in、for of

一、一般的遍历数组的方法: var array [1,2,3,4,5,6,7]; for (var i 0; i < array.length; i) { console.log(i,array[i]); } 二、用for in的方遍历数组 for(let index in array) { console.log(index,array[index]); }; 三、forEach array.forEach(v>{ cons…

Vue cli3.0创建Vue项目

创建Vue项目 在要创建项目的文件夹下面打开Powershell窗口 输入命令 vue create 项目名称 选择第二项 回车后 选择是否使用历史路由 no 回车 选择 Less 回车 选择第三个 回车 选择第一个 回车 选择第一个 回车 是否保存模板 选择no 完成啦 完成

Remote desktop manager共享账号

因为多个远程机器&#xff0c;是会用了域账号进行登录的。而域账号的密码&#xff0c;三个月之后&#xff0c;密码强制过期 添加一个新的entry&#xff0c;类型是Credential Entry&#xff0c;然后选择用户名/密码 在remote desktop编辑的页面&#xff0c;Credentials选择Crede…

Mui常用的方法

中对话框 语法&#xff1a;mui.confirm 用法 mui.confirm("确认要切换角色&#xff1f;", "提示", btnArray, function(e) {if(e.index 1) {} else {}});组件名作用alert警告框confirm确认框prompt输入对话框toast消息提示框&#xff08;自动消失&#x…