Akka(9): 分布式运算:Remoting-远程构建式

   上篇我们讨论了Akka-Remoting。我们说Akka-Remoting是一种点对点的通讯方式,能使两个不同JVM上Akka-ActorSystem上的两个Actor之间可以相互沟通。Akka-Remoting还没有实现完全的Actor位置透明(location transparency),因为一个Actor还必须在获得对方Actor确切地址信息后才能启动与之沟通过程。Akka-Remoting支持“远程查找”和“远程构建”两种沟通方式。由于篇幅所限,我们只介绍了“远程查找”。在这一篇里我们将会讨论“远程构建”方式。

同样,我们先通过项目结构来分析:

lazy val local = (project in file(".")).settings(commonSettings).settings(name := "remoteCreateDemo").aggregate(calculator,remote).dependsOn(calculator)lazy val calculator = (project in file("calculator")).settings(commonSettings).settings(name := "calculator")lazy val remote = (project in file("remote")).settings(commonSettings).settings(name := "remoteSystem").aggregate(calculator).dependsOn(calculator)

远程构建的过程大致是这样的:由local通知remote启动构建Actor;remote从本地库中查找Actor的类定义(class)并把它载入内存。由于驱动、使用远程Actor是在local进行的,所以local,remote项目还必须共享Calculator,包括Calculator的功能消息。这项要求我们在.sbt中用aggregate(calculator)来协同编译。

我们把Calculator的监管supervisor也包括在这个源码文件里。现在这个calculator是个包括监管、功能、消息的完整项目了。Calculator源代码如下:

package remoteCreation.calculatorimport akka.actor._
import scala.concurrent.duration._object Calcultor {sealed trait MathOpscase class Num(dnum: Double) extends MathOpscase class Add(dnum: Double) extends MathOpscase class Sub(dnum: Double) extends MathOpscase class Mul(dnum: Double) extends MathOpscase class Div(dnum: Double) extends MathOpssealed trait CalcOpscase object Clear extends CalcOpscase object GetResult extends CalcOpsdef props = Props(new Calcultor)def supervisorProps = Props(new SupervisorActor)
}class Calcultor extends Actor with ActorLogging {import Calcultor._var result: Double = 0.0   //internal stateoverride def receive: Receive = {case Num(d) => result = dcase Add(d) => result += dcase Sub(d) => result -= dcase Mul(d) => result *= dcase Div(d) =>val _ = result.toInt / d.toInt   //yield ArithmeticExceptionresult /= dcase Clear => result = 0.0case GetResult =>sender() ! s"Result of calculation is: $result"}override def preRestart(reason: Throwable, message: Option[Any]): Unit = {log.info(s"Restarting calculator: ${reason.getMessage}")super.preRestart(reason, message)}
}class SupervisorActor extends Actor {def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {case _: ArithmeticException => SupervisorStrategy.Resume}override def supervisorStrategy: SupervisorStrategy =OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){decider.orElse(SupervisorStrategy.defaultDecider)}val calcActor = context.actorOf(Calcultor.props,"calculator")override def receive: Receive = {case msg@ _ => calcActor.forward(msg)}}

与上一个例子的”远程查找式“相同,remote需要为Remoting公开一个端口。我们可以照搬.conf配置文件内容:remote/src/main/resources/application.conf

akka {actor {provider = remote}remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 2552}log-sent-messages = onlog-received-messages = on}
}

由于远程构建和使用是在local上进行的,在remote上我们只需要启动ActorSystem就行了:

import com.typesafe.config.ConfigFactory
import akka.actor._object CalculatorRunner extends App {val remoteSystem = ActorSystem("remoteSystem",ConfigFactory.load("application"))println("Remote system started.")scala.io.StdIn.readLine()remoteSystem.terminate()}

Calculator的构建是在localSystem上启动的,我们需要在配置文件中描述远程构建标的(还是未能实现位置透明):local/src/main/resources/application.conf 

akka {actor {provider = remote,deployment {"/calculator" {remote = "akka.tcp://remoteSystem@127.0.0.1:2552"}}}remote {netty.tcp {hostname = "127.0.0.1",port=2554}}
}

注意:上面这个/calculator设置实际上指的是SupervisorActor。

现在我们可以在local上开始构建calculator,然后使用它来运算了:

import akka.actor._
import remoteCreation.calculator.Calcultor._
import scala.concurrent.duration._
import akka.pattern._object RemotingCreate extends App {val localSystem = ActorSystem("localSystem")val calcActor = localSystem.actorOf(props,name = "calculator")   //created SupervisorActorimport localSystem.dispatchercalcActor ! ClearcalcActor ! Num(13.0)calcActor ! Mul(1.5)implicit val timeout = akka.util.Timeout(1 second)((calcActor ? GetResult).mapTo[String]) foreach printlnscala.io.StdIn.readLine()calcActor ! Div(0.0)calcActor ! Div(1.5)calcActor ! Add(100.0)((calcActor ? GetResult).mapTo[String]) foreach printlnscala.io.StdIn.readLine()localSystem.terminate()}

从代码上看构建calculator(SupervisorActor)过程与普通的Actor构建没分别,所有细节都放在配置文件里了。但是,要注意actorOf的name必须与配置文档中的设置匹配。

试运行结果与上一个例子相同。值得注意的是实际远程构建的是一个SupervisorActor。Calculator的构建是SupervisorActor构建的其中一部分。从运算结果看:这个SupervisorActor也实现了它的功能。

下面是这次示范的源代码:

 local/build.sbt

azy val commonSettings = seq (name := "RemoteCreateDemo",version := "1.0",scalaVersion := "2.11.8",libraryDependencies := Seq("com.typesafe.akka" %% "akka-actor" % "2.5.2","com.typesafe.akka" %% "akka-remote" % "2.5.2")
)lazy val local = (project in file(".")).settings(commonSettings).settings(name := "remoteCreateDemo").aggregate(calculator).dependsOn(calculator)lazy val calculator = (project in file("calculator")).settings(commonSettings).settings(name := "calculator")lazy val remote = (project in file("remote")).settings(commonSettings).settings(name := "remoteSystem").aggregate(calculator).dependsOn(calculator)

calculator/calculator.scala

package remoteCreation.calculatorimport akka.actor._
import scala.concurrent.duration._object Calcultor {sealed trait MathOpscase class Num(dnum: Double) extends MathOpscase class Add(dnum: Double) extends MathOpscase class Sub(dnum: Double) extends MathOpscase class Mul(dnum: Double) extends MathOpscase class Div(dnum: Double) extends MathOpssealed trait CalcOpscase object Clear extends CalcOpscase object GetResult extends CalcOpsdef props = Props(new Calcultor)def supervisorProps = Props(new SupervisorActor)
}class Calcultor extends Actor with ActorLogging {import Calcultor._var result: Double = 0.0   //internal stateoverride def receive: Receive = {case Num(d) => result = dcase Add(d) => result += dcase Sub(d) => result -= dcase Mul(d) => result *= dcase Div(d) =>val _ = result.toInt / d.toInt   //yield ArithmeticExceptionresult /= dcase Clear => result = 0.0case GetResult =>sender() ! s"Result of calculation is: $result"}override def preRestart(reason: Throwable, message: Option[Any]): Unit = {log.info(s"Restarting calculator: ${reason.getMessage}")super.preRestart(reason, message)}
}class SupervisorActor extends Actor {def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {case _: ArithmeticException => SupervisorStrategy.Resume}override def supervisorStrategy: SupervisorStrategy =OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){decider.orElse(SupervisorStrategy.defaultDecider)}val calcActor = context.actorOf(Calcultor.props,"calculator")override def receive: Receive = {case msg@ _ => calcActor.forward(msg)}}

remote/src/main/resources/application.conf

akka {actor {provider = remote}remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 2552}log-sent-messages = onlog-received-messages = on}
}

remote/CalculatorRunner.scala

package remoteCreation.remote
import com.typesafe.config.ConfigFactory
import akka.actor._object CalculatorRunner extends App {val remoteSystem = ActorSystem("remoteSystem",ConfigFactory.load("application"))println("Remote system started.")scala.io.StdIn.readLine()remoteSystem.terminate()}

local/src/main/resources/application.conf

akka {actor {provider = remote,deployment {"/calculator" {remote = "akka.tcp://remoteSystem@127.0.0.1:2552"}}}remote {netty.tcp {hostname = "127.0.0.1",port=2554}}
}

local/RemotingCreation.scala

import akka.actor._
import remoteCreation.calculator.Calcultor._
import scala.concurrent.duration._
import akka.pattern._object RemotingCreate extends App {val localSystem = ActorSystem("localSystem")val calcActor = localSystem.actorOf(props,name = "calculator")  //created SupervisorActor
import localSystem.dispatchercalcActor ! ClearcalcActor ! Num(13.0)calcActor ! Mul(1.5)implicit val timeout = akka.util.Timeout(1 second)((calcActor ? GetResult).mapTo[String]) foreach printlnscala.io.StdIn.readLine()calcActor ! Div(0.0)calcActor ! Div(1.5)calcActor ! Add(100.0)((calcActor ? GetResult).mapTo[String]) foreach printlnscala.io.StdIn.readLine()localSystem.terminate()}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

转载于:https://www.cnblogs.com/tiger-xc/p/7063301.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/269118.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

福禄克FI-3000光纤监测显微仪使用MPO检查摄像头?

福禄克网络FI-3000 光纤检测显微仪是行业内较好且可以完整的光纤检查仪器,它有单光纤和MPO的测摄像机;自动或手动检查,还可与福禄克网络Versiv布线认证系统、Linkware或智能手机配合使用。下面福禄克网络指定经销商—明辰智航的工程师给大家讲…

Cisco Catalyst 4500

Q. 思科正在推出哪些Cisco Catalyst 4500 E系列线路卡?A.思科正在推出三款全新的E系列线路卡,性能为每线路卡插槽2.4万兆。它们包括: 48端口10/100/1000 PoE (2:1过多订购) 48端口10/100/1000增强型PoE线路卡&#xff…

rails 5 action cable 服务器部署

config/environments/development.rb config.action_cable.url ws://10.129.56.223:28080 // 浏览器访问的websoket 服务地址 ActionCable.server.config.disable_request_forgery_protection true // 允许远程请求访问转载于:https://www.cnblo…

通过福禄克FI-7000光纤显微摄像机进行光纤端面清洁

无论任何光纤类型、应用或数据速率,光的传输都需要干净的链接路径,包括通过路线上的任何无源连接或接头,虽然我们谈论了很多关于光纤清洁度的话题,但不管我们在这个话题上说了多少,当我们面对它时,光纤端面…

XSL学习笔记6 XSLT内置模板规则

XSL学习笔记6 XSLT内置模板规则定义正确的模板规则来匹配XML树中的节点是XSLT应用的关键。为了让源文档树的节点在没有明确匹配规则的情况下,能够被递归处理,XSLT定义了几个内置的模板规则,可以将其看作是XSL式样表文档的默认模板规则。内置模…

单元测试mock当前时间

在实际项目中很多地方用到DateTime.Now&#xff0c;这个时间是时时变化的。如果要进行单元测试对比预期结果时&#xff0c;这个时间无法预测&#xff0c;可以添加如下两个时间类 namespace Common.Helper {/// <summary>/// 获取当前时间/// However, when unit testing …

利用福禄克DSX2-5000 CH解决双绞线布线中常见的故障

双绞线布线中最常见的故障原因包括安装错误、在劣质组件&#xff08;电缆、连接器、跳线等&#xff09;中发现的缺陷、已安装电缆的损坏以及测试仪设置不正确&#xff0c;但有时技术人员想知道链路故障的具体原因 那么基于性能参数测试失败的原因都有哪些呢&#xff1f; 当涉…

Windows 2003性能监视器中的计数器名称变成数字的解决方法

前些天在给Exchange安装IMF后&#xff0c;看到IMF指南里面说&#xff0c;利用Performance也就是性能监视器可以查看IMF的性能&#xff0c;于是打开Performance, 但是却看到了下面这一幕&#xff0c;所有的计数器都变成数字。 还好&#xff0c;在微软SUPPORT网站很容易找到了下面…

Oracle Schema Objects——Tables——TableType

Oracle Schema Objects Object Tables object type An Oracle object type is a user-defined type with a name, attributes, and methods. Oracle 对象类型是具有名称、 属性、和方法的用户定义类型。 Object types make it possible to model real-world entities such as …

利用局域网性能测试仪保障企业网络环境

现代社会网络的普及&#xff0c;已经致使人民已经离不开网络&#xff0c;无论是从工作、生活、基础设施等等网络应用到各行各业。而普及了网络&#xff0c;网络的质量就是最重要的一环。试想&#xff0c;网络的快与慢小到影响一个人的心情好坏&#xff0c;大的可以影响到企业业…

MySQL存储过程编程

http://www.drekey.cn/blog/archives/category/stored

Kotlin基础-扩展

/** 扩展&#xff1a;对既有的类增加新功能而无需继承该类&#xff0c;即无法获取其源代码* 主要作用&#xff1a;“立即”为相关类整体上添加“工具类”方法或属性* Kotlin支持:扩展函数&#xff0c;扩展属性**与接受者类中参数&#xff0c;名字都一样的扩展是无效的* 尽量避免…

电缆的验证、鉴定和认证应该选择什么测试工具

在电缆的测试安装时&#xff0c;有以下几种测试工具选择&#xff1a;验证、鉴定及认证。当您要进行电缆认证时&#xff0c;不同测试工具的部分功能会有重叠&#xff0c;但可以学习以下几个问题&#xff0c;以帮助您选择正确的测试工具。 &#xff08;1&#xff09;电缆验证测试…

项目管理改进实践

昨天我下载了一个专业的敏捷项目管理的软件——Mingle&#xff0c;研究了一天&#xff0c;基本会用了。 应为“专业”&#xff0c;所以不是免费的&#xff0d; &#xff0d;&#xff01;不过还好。。。。6人以下可以使用免费的lisence。。。。 这是一次重要的项目管理改进实践…

Java:使用split方法时忽略中英文的符号区别

Java&#xff1a;使用split方法时忽略中英文的符号区别 split(",|&#xff0c;")

linux ubunt 安装软件的前期准备——更新源的更换

如果是高手&#xff0c;请翻到页面最下方&#xff0c;更换更新源的总结&#xff0c;直接操作即可 可能会优点啰嗦&#xff0c;但是认真看&#xff0c;一定能解决问题~~希望对大家有帮助~ 最近在熟悉linux环境&#xff0c;自己安装了一个ubuntu虚拟机。 很多朋友问装了ubuntu怎么…

以太网性能测试分析仪

如今&#xff0c;面对无数未上市的和已上市的应用&#xff0c;网络环境与设备的不同&#xff0c;企业网络的性能如何得到保障&#xff1f;企业网络如果不稳定&#xff0c;业务的中断、用户的流失都会给企业带来巨大的损失。一名合格的网络管理人员应该如何利用自己的专业知识还…