Akka型演员:探索接收器模式

在上一篇文章中,我们研究了Akka Typed提供的一些基本功能。 在本文和下一篇文章中,我们将更进一步地了解一些其他功能,并通过查看Akka Typed提供的两种不同模式来做到这一点:Receiver和Receptionist模式。 如果您是Akka Typed的新手,那么最好先阅读上一篇文章,因为这将使您对Akka Typed有所了解。 因此,对于本系列中的Akka型文章,我们将研究Receiver模式。

  • 与往常一样,您可以在Github Gist中找到此示例的代码: https : //gist.github.com/josdirksen/77e59d236c637d46ab32

接收方模式

在Akka Typed发行版中,有一个名为akka.typed.patterns的包。 在此程序包中,有两种不同的模式,即接收方模式和接收方模式。 坦白说,为什么这两种模式足够重要以增加发行版,但我确实不知道,但是它们确实为在Akka Typed之后引入更多概念和想法提供了一个很好的方法。

因此,让我们看一下Receiver模式,在下一篇文章中我们将做Receptionist模式。 要了解Receiver模式的功能,只需看一下我们可以发送给它的消息:

/*** Retrieve one message from the Receiver, waiting at most for the given duration.*/final case class GetOne[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetOneResult[T]]) extends Command[T]/*** Retrieve all messages from the Receiver that it has queued after the given* duration has elapsed.*/final case class GetAll[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetAllResult[T]]) extends Command[T]/*** Retrieve the external address of this Receiver (i.e. the side at which it* takes in the messages of type T.*/final case class ExternalAddress[T](replyTo: ActorRef[ActorRef[T]]) extends Command[T]

从这些消息中可以看到,Receiver的工作是将T类型的消息排队,并提供其他命令以在等待特定时间的同时获取这些消息中的一个或多个。 要使用接收器,我们需要获取ExternalAddress,以便我们可以向其发送类型为T的消息。 并且可以从其他参与者发送get GetOne和GetAll消息,以查看接收器中是否有任何消息在等待。

对于我们的示例,我们将创建以下参与者:

  • 生产者,它向接收者发送类型为T的消息。
  • 可以从此接收器检索类型T消息的使用者。
  • 根角色,运行此方案。

我们将从生产者开始,如下所示:

/*** Producer object containing the protocol and the behavior. This is a very simple* actor that produces messages using a schedule. To start producing messages* we need to send an initial message*/object Producer {// a simple protocol defining the messages that can be sentsealed trait ProducerMsgfinal case class registerReceiverMsgIn(msgIn: ActorRef[HelloMsg]) extends ProducerMsgfinal case class addHelloWorldMsg(msg: HelloMsg) extends ProducerMsg// the producer, which first waits for a registerReceiver message, after which// it changes behavior, to send messages.val producer = Full[ProducerMsg] {// if we receive a register message, we know where to send messages tocase Msg(ctx, registerReceiverMsgIn(msgConsumer)) =>println("Producer: Switching behavior")// simple helper function which sends a message to self.def scheduleMessage() = ctx.schedule(500 millisecond, ctx.self, addHelloWorldMsg(Hello(s"hello @ ${System.currentTimeMillis()}")))// schedule the first one, the rest will be triggered through the behavior.scheduleMessage()Static {// add a message to the receiver and schedule a new onecase addHelloWorldMsg(msg) => {println(s"Producer: Adding new '$msg' to receiver: $msgConsumer") ;msgConsumer ! msg; scheduleMessage()}}// don't switch behavior on any of the other messagescase _ => Same}}

在此对象中,我们定义了可以发送给角色的消息以及行为。 registerReceiverMsgIn消息为操作者提供了应该向其发送消息的目的地(稍后将对此进行详细介绍),并且addHelloWorldMsg告诉该行为将什么消息发送到registerReceiverMsgIn消息提供的地址。 如果您查看此行为,则可以看到我们使用Full [T]行为。 对于这种行为,我们必须为所有消息和信号提供匹配器,此外,我们还可以访问actor ctx。 在其初始状态下,此行为仅响应registerReceiverMsgIn消息。 当它收到这样的消息时,它会做两件事:

  1. 它定义了一个函数,我们可以用来调度消息,我们也可以直接调用它,以调度消息在半秒钟内发送。
  2. 它定义了我们的新行为。 此新行为可以处理scheduleMessage函数发送的消息。 收到该消息后,它将内容发送到提供的messageConsumer(接收方),然后再次调用计划消息。 保持每500毫秒发送一次消息。

因此,当我们发送初​​始的registerReceiverMessage时,它将导致actor每500 ms向接收者发送一条新消息。 现在让我们看看另一面:消费者。

对于消费者,我们还将所有内容包装在一个对象中,如下所示:

object Consumer {val consumer = Total[HelloMsg] {// in the case of a registerReceiver message, we change the implementation// since we're ready to receive other message.case registerReceiverCmdIn(commandAddress) => {println("Consumer: Switching behavior")// return a static implementation which closes over actorRefs// all messages we receive we pass to the receiver, which will queue// them. We have a specific message that prints out the received messagesContextAware { ctx =>Static[HelloMsg] {// printmessages just prints out the list of messages we've receivedcase PrintMessages(msgs) => println(s"Consumer: Printing messages: $msgs") ;msgs.foreach { hw => println(s"  $hw")}// if we get the getAllMessages request, we get all the messages from// the receiver.case GetAllMessages() => {println("Consumer: requesting all messages")val wrap = ctx.spawnAdapter[GetAllResult[HelloMsg]] {case msgs:GetAllResult[HelloMsg] => println(s"Consumer: Received ${msgs.msgs.length} messages"); PrintMessages(msgs.msgs)}commandAddress ! GetAll(2 seconds)(wrap)}}}}// for all the other cases return the existing implementation, in essence// we're just ignoring other messages till we change statecase _ => Same}    }

在此对象中,我们定义了一个行为,该行为在接收到第一条消息后也会切换其实现。 在这种情况下,第一条消息称为registerReceiverCmdIn。 通过此消息,我们可以访问(接收方的)actorRef,将GetAll和getOne消息发送至该消息。 切换行为后,我们将处理自己的自定义GetAllMessages消息,该消息将触发将GetAll消息发送到接收器。 由于未针对从Receiver收到的响应类型键入我们自己的行为,因此我们使用适配器(ctx.spawnAdapter)。 该适配器将接收来自接收器的响应并打印出消息。

消息的最后一部分是一个演员,它会启动此行为:

// Simple root actor, which we'll use to start the other actorsval scenario1 = {Full[Unit] {case Sig(ctx, PreStart) => {import Producer._import Consumer._println("Scenario1: Started, now lets start up a number of child actors to do our stuff")// first start the two actors, one implements the receiver pattern, and// the other is the one we control directly.val receiverActor = ctx.spawn(Props(Receiver.behavior[HelloMsg]), "receiver")val consumerActor = ctx.spawn(Props(consumer), "adder")val producerActor = ctx.spawn(Props(producer), "producer")// our producerActor first needs the actorRef it can use to add messages to the receiver// for this we use a wrapper, this wrapper creates a child, which we use to get the// address, to which we can send messages.val wrapper = ctx.spawnAdapter[ActorRef[HelloMsg]] {case p: ActorRef[HelloMsg] => producerActor ! registerReceiverMsgIn(p)}// now send the message to get the external address, the response will be sent// to our own actor as a registerReceiver message, through the adapterreceiverActor ! ExternalAddress(wrapper)// our printing actor needs to now the address of the receiver so send it to himconsumerActor ! registerReceiverCmdIn(receiverActor)// by calling getAllMessages we get the messages within a time period.println("Scenario1: Get all the messages")consumerActor ! GetAllMessages()Thread.sleep(3000)consumerActor ! GetAllMessages()Thread.sleep(5000)consumerActor ! GetAllMessages()Same}}}

这里没什么特别的。 在这种情况下,我们将创建各种角色,并使用ctx.spawnAdapter来获取接收者的外部地址,并将其传递给producerActor。 接下来,我们将接收者参与者的地址传递给消费者。 现在,我们在使用者地址上调用GetAllMessages,该地址将从接收方获取消息并打印出来。

因此,总结一下将在此示例中执行的步骤:

  1. 我们创建一个将运行此方案的root actor。
  2. 从这个根基参与者,我们创建了三个参与者:接收者,消费者和生产者。
  3. 接下来,我们从接收方获取externalAddress(我们将类型为T的消息发送到的地址),并使用适配器将其传递给生产方。
  4. 生产者在收到此消息后,将切换行为并开始将消息发送到传入的地址。
  5. 同时,根角色将接收方的地址传递给使用者。
  6. 使用者在收到此消息时,将更改行为并现在等待GetAllMessages类型的消息。
  7. 现在,根actor将发送GetAllMessages到使用者。
  8. 当使用者收到此消息时,它将使用适配器将GetAll消息发送给接收者。 当适配器接收到响应时,它会打印出接收到的消息数量,并通过为接收者从接收到的每条消息发送一个PrintMessage来对使用者进行进一步处理。

这种情况的结果如下所示:

Scenario1: Started, now lets start up a number of child actors to do our stuff
Scenario1: Get all the messages
Consumer: Switching behavior
Consumer: requesting all messages
Producer: Switching behavior
Producer: Adding new 'Hello(hello @ 1446277162929)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277163454)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277163969)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 3 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277162929), Hello(hello @ 1446277163454), Hello(hello @ 1446277163969))Hello(hello @ 1446277162929)Hello(hello @ 1446277163454)Hello(hello @ 1446277163969)
Producer: Adding new 'Hello(hello @ 1446277164488)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277165008)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new 'Hello(hello @ 1446277165529)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277166049)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277166569)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277167089)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 6 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277164488), Hello(hello @ 1446277165008), Hello(hello @ 1446277165529), Hello(hello @ 1446277166049), Hello(hello @ 1446277166569), Hello(hello @ 1446277167089))Hello(hello @ 1446277164488)Hello(hello @ 1446277165008)Hello(hello @ 1446277165529)Hello(hello @ 1446277166049)Hello(hello @ 1446277166569)Hello(hello @ 1446277167089)
Producer: Adding new 'Hello(hello @ 1446277167607)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277168129)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277168650)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277169169)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277169690)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277170210)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new 'Hello(hello @ 1446277170729)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277171249)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277171769)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277172289)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 10 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277167607), Hello(hello @ 1446277168129), Hello(hello @ 1446277168650), Hello(hello @ 1446277169169), Hello(hello @ 1446277169690), Hello(hello @ 1446277170210), Hello(hello @ 1446277170729), Hello(hello @ 1446277171249), Hello(hello @ 1446277171769), Hello(hello @ 1446277172289))Hello(hello @ 1446277167607)Hello(hello @ 1446277168129)Hello(hello @ 1446277168650)Hello(hello @ 1446277169169)Hello(hello @ 1446277169690)Hello(hello @ 1446277170210)Hello(hello @ 1446277170729)Hello(hello @ 1446277171249)Hello(hello @ 1446277171769)Hello(hello @ 1446277172289)
Producer: Adding new 'Hello(hello @ 1446277172808)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277173328)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277173849)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277174369)' to receiver: Actor[akka://Root/user/receiver#1097367365]

酷吧! 从消息序列中可以看到,我们的生产者将消息发送到接收者,接收者将它们排队。 接下来,我们有一个使用者,它请求到目前为止已收到的所有消息并打印出来。

这是关于Akka-Typed的文章的内容,在下一篇文章中,我们将介绍同样存在于Akka-Typed的接待员模式。

翻译自: https://www.javacodegeeks.com/2015/11/akka-typed-actors-exploring-the-receiver-pattern.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/356759.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

读书笔记 - 《乌合之众》

以往的随笔都是胡乱写两句,目的主要是为了督促自己尽快把堆积如山的书籍清理掉,但这次值得花几分钟真的写几句。其实实在都想不起来怎么会买它,这次偶然拿起这本书也是因为它很薄,读大部头读得累了放松一下,没想到居然…

MATLAB如何用循环分割,利用Matlab进行分割提取浮游生物

我试图从扫描图像中提取浮游生物.大纲也不错,但是,现在我不知道如何提取图像,因此可以单独保存每个浮游生物.我尝试使用标签,但是有很多噪音,它标出了每一个规格.我想知道是否有更好的方法来做到这一点.这是我的代码:I imread(plankton_2.jpg);figure, imshow(I), …

saxparser_使用SaxParser和完整代码进行XML解析

saxparserSAX解析器使用回调函数(org.xml.sax.helpers.DefaultHandler)通知客户端XML文档结构。 您应该扩展DefaultHandler并重写一些方法来实现xml解析。 覆盖的方法是 startDocument()和endDocument()–…

第一个Python程序

在E:\Python 下新建一个hello.py文件,里面的内容是print(hello world) 进入命令提示窗格,输入E: 点击回车 输入 cd python 点击回车 输入Python hello.py 结果如图 转载于:https://www.cnblogs.com/lgqboke/p/5882049.html

matlab导入txt生成曲面,求助:怎样将txt的数据导入到Matlab中并根据参数画出波形...

导师让我把波形画出来,这下完全不会啊!希望各位高手帮帮在下……txt中部分数据如下:ID DDD HH:MM:SS.mmmuuun PARA1 PARA2 CH RISE COUN ENER DURATION AMP A-FRQ RMS ASL PCNTS THR R-FRQ I-FRQ SIG STRNGTH ABS-ENERGY1…

Java多线程:易失性变量,事前关联和内存一致性

什么是volatile变量? volatile是Java中的关键字。 您不能将其用作变量或方法名称。 期。 我们什么时候应该使用它? 哈哈,对不起,没办法。 当我们在多线程环境中与多个线程共享变量时,我们通常使用volatile关键字&…

20145219 《信息安全系统设计基础》第01周学习总结

20145219 《信息安全系统设计基础》第01周学习总结 教材学习内容总结 别出心裁的Linux命令学习法 1、Ubuntu快捷键 CTRLALTT:打开终端;CTRLSHIFTT:新建标签页;ALT数字N:终端中切换到第N个标签页;Tab:终端中命令补全&…

拉盖尔多项式 matlab,类氢原子的定态波函数

&nbsp&nbsp&nbsp&nbsp&nbsp&nbsp&nbsp预备知识 球坐标系中的定态薛定谔方程,原子单位制本文使用原子单位制.类氢原子(hydrogen-like atom)被定义为原子核有 $Z$ 个质子(核电荷为 $Ze$)有一个核外电子的原子/离子&#xff0…

再论EM算法的收敛性和K-Means的收敛性

标签(空格分隔): 机器学习 (最近被一波波的笔试面试淹没了,但是在有两次面试时被问到了同一个问题:K-Means算法的收敛性。在网上查阅了很多资料,并没有看到很清晰的解释,所以希望可以…

java xml dom getelementbyid,DOM中常见的元素获取方式

1.getElementById获取元素 返回的是一个元素对象var timer document.getElementById(time);console.dir 打印返回元素对象,更好的查看里面的属性和方法console.dir( timer );2.getElementsByTagName 获取某类标签元素 返回的是 获取过来元素对象的集合 以…

杰尔·地狱

什么是JAR地狱? (或者是classpath地狱?还是依赖地狱?)在考虑使用Maven或OSGi等现代开发工具时,哪些方面仍然有意义? 有趣的是,似乎没有对这些问题的结构化答案(即&#…

matlab radsimp,[转载]MATLAB学习笔记(八)

符号运算符号运算分为以下几类:符号表达式和符号矩阵的操作整体定义为符号微积分符号线性方程符号微分方程A、符号变量、符号表达式和符号方程的分解一、 生成符号变量要使用sym和syms:使用sym函数可以定义符号表达式,此时有两种定义方法&…

windows,python3.x下安装pyspider

由于是初学者,业余学习,习惯使用windows,初次了解到pyspider写代码和调试代码非常简便;作者binux是在Ubuntu下部署测试的。在作者的博客看到windows下安装的讨论。windows直接安装失败主要是lxml、pycurl安装失败,需要…

matlab实验符号计算答案,实验五matlab符号计算

实验五matlab符号计算 实验 5 符号计算 教师评分班级 学号 姓名实验日期 2014 年 6 月 17 日 星期 二 第 1 至 2 节课实验地点实验目的1. 掌握定义符号对象的办法2. 掌握符号表达式的运算法则以及符号矩阵运算3. 掌握求符号函数极限及导数的方法4. 掌握求符号函数定积分和不定积…

junit4 单元测试框架_超越JUnit –测试框架的替代方案

junit4 单元测试框架JUnit是事实上的Java单元测试框架,但是可能有一些新的(不是那么新的)框架可以用于Web开发。 在采用之前可能要问自己的问题: 它们是否快速,容易开发并因此成本低廉? 他们运行快并因此鼓…

Java学习笔记之:Java String类

一、引言 字符串广泛应用在Java编程中,在Java中字符串属于对象,Java提供了String类来创建和操作字符串。 创建字符串最简单的方式如下: String str "Hello world!"; String类型是特殊的引用类型,我们也可以通过实例化的方式来创建 …

java循环输入直到,使用循环接受其他用户输入,直到用户输入结束输入的值

我是Java的新手 . 我需要一些帮助,使用循环接受其他用户输入,直到用户输入结束输入的值 . 我的问题从语句“System.out.println(”你完成了吗?输入大写的Y / N)开始 . 下面是我的代码 .公共类EmployeeData {//declare variablesprivate Strin…

WildFly 10 CR 2发布– Java EE 7,Java 8,Hibernate 5,JavaScript支持热重载

昨天,WildFly团队发布了最新版本的WildFly 10 。 CR2很可能是预计于十月份发布最终版本之前的最后一个版本。 即使主要支持的Java EE规范是7,WildFly 8和WildFly 9仍具有许多新功能,该版本现在具有三个服务器版本,实现了Java EE 7…

php中什么时候用传值,php中传值与传引用的区别。什么时候传值什么时候传引用?...

java中的this与super的区别java中的this与super的区别 1. 子类的构造函数如果要引用super的话,必须把super放在函数的首位 代码如下: class Base { Base() { System.out.pr ...php传值和传引用的区别php传值和传引用的区别所谓值传递,就是说仅将对象的值传递给目标对象,就相当于…

openshift 部署_OpenShift Express:部署Java EE应用程序(支持AS7)

openshift 部署在过去的几年中,我越来越多地听说过“云”服务。 最初,我并不是很想尝试一下。 但是几个月后(一年?),我决定看看这是怎么回事。 我从事Java EE开发已有7年以上,因此,我…