akka 消息发送接收_Akka型演员:探索接收器模式

akka 消息发送接收

在上一篇文章中,我们研究了Akka Typed提供的一些基本功能。 在本文和下一篇文章中,我们将更进一步地了解一些其他功能,并通过查看Akka Typed提供的两种不同模式来做到这一点:Receiver和Receptionist模式。 如果您是Akka Typed的新手,那么最好先阅读上一篇文章,因为这将使您对Akka Typed有所了解。 因此,对于本系列中的Akka型文章,我们将研究Receiver模式。

  • 与往常一样,您可以在Github Gist中找到此示例的代码: https : //gist.github.com/josdirksen/77e59d236c637d46ab32

接收方模式

在Akka Typed发行版中,有一个名为akka.typed.patterns的包。 在此程序包中,有两种不同的模式,即接收方模式和接收方模式。 坦白说,为什么这两种模式足够重要以增加发行版,但我确实不知道,但是它们确实为在Akka Typed之后引入更多概念和想法提供了一个很好的方法。

因此,让我们看一下Receiver模式,在下一篇文章中我们将做Receptionist模式。 要了解Receiver模式的功能,只需看一下我们可以发送给它的消息:

/*** Retrieve one message from the Receiver, waiting at most for the given duration.*/final case class GetOne[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetOneResult[T]]) extends Command[T]/*** Retrieve all messages from the Receiver that it has queued after the given* duration has elapsed.*/final case class GetAll[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetAllResult[T]]) extends Command[T]/*** Retrieve the external address of this Receiver (i.e. the side at which it* takes in the messages of type T.*/final case class ExternalAddress[T](replyTo: ActorRef[ActorRef[T]]) extends Command[T]

从这些消息中可以看到,Receiver的工作是将T类型的消息排队,并提供其他命令以在等待特定时间的同时获取这些消息中的一个或多个。 要使用接收器,我们需要获取ExternalAddress,以便我们可以向其发送类型为T的消息。 并且可以从其他参与者发送get GetOne和GetAll消息,以查看接收器中是否有任何消息在等待。

对于我们的示例,我们将创建以下参与者:

  • 生产者,它向接收者发送类型为T的消息。
  • 可以从此接收器检索类型T消息的使用者。
  • 根角色,运行此方案。

我们将从生产者开始,如下所示:

/*** Producer object containing the protocol and the behavior. This is a very simple* actor that produces messages using a schedule. To start producing messages* we need to send an initial message*/object Producer {// a simple protocol defining the messages that can be sentsealed trait ProducerMsgfinal case class registerReceiverMsgIn(msgIn: ActorRef[HelloMsg]) extends ProducerMsgfinal case class addHelloWorldMsg(msg: HelloMsg) extends ProducerMsg// the producer, which first waits for a registerReceiver message, after which// it changes behavior, to send messages.val producer = Full[ProducerMsg] {// if we receive a register message, we know where to send messages tocase Msg(ctx, registerReceiverMsgIn(msgConsumer)) =>println("Producer: Switching behavior")// simple helper function which sends a message to self.def scheduleMessage() = ctx.schedule(500 millisecond, ctx.self, addHelloWorldMsg(Hello(s"hello @ ${System.currentTimeMillis()}")))// schedule the first one, the rest will be triggered through the behavior.scheduleMessage()Static {// add a message to the receiver and schedule a new onecase addHelloWorldMsg(msg) => {println(s"Producer: Adding new '$msg' to receiver: $msgConsumer") ;msgConsumer ! msg; scheduleMessage()}}// don't switch behavior on any of the other messagescase _ => Same}}

在此对象中,我们定义了可以发送给角色的消息以及行为。 registerReceiverMsgIn消息为actor提供了应该向其发送消息的目的地(稍后会对此进行详细介绍),并且addHelloWorldMsg告诉行为将什么消息发送到registerReceiverMsgIn消息提供的地址。 如果您查看此行为,则可以看到我们使用Full [T]行为。 对于这种行为,我们必须为所有消息和信号提供匹配器,此外,我们还可以访问actor ctx。 在其初始状态下,此行为仅响应registerReceiverMsgIn消息。 当它收到这样的消息时,它会做两件事:

  1. 它定义了一个函数,我们可以用来调度消息,我们也可以直接调用它,以调度消息在半秒钟内发送。
  2. 它定义了我们的新行为。 此新行为可以处理scheduleMessage函数发送的消息。 收到该消息后,它将内容发送到提供的messageConsumer(接收方),然后再次调用计划消息。 保持每500毫秒发送一次消息。

因此,当我们发送初​​始的registerReceiverMessage时,它将导致actor每500 ms向接收者发送一条新消息。 现在让我们看看另一面:消费者。

对于消费者,我们还将所有内容包装在一个对象中,如下所示:

object Consumer {val consumer = Total[HelloMsg] {// in the case of a registerReceiver message, we change the implementation// since we're ready to receive other message.case registerReceiverCmdIn(commandAddress) => {println("Consumer: Switching behavior")// return a static implementation which closes over actorRefs// all messages we receive we pass to the receiver, which will queue// them. We have a specific message that prints out the received messagesContextAware { ctx =>Static[HelloMsg] {// printmessages just prints out the list of messages we've receivedcase PrintMessages(msgs) => println(s"Consumer: Printing messages: $msgs") ;msgs.foreach { hw => println(s"  $hw")}// if we get the getAllMessages request, we get all the messages from// the receiver.case GetAllMessages() => {println("Consumer: requesting all messages")val wrap = ctx.spawnAdapter[GetAllResult[HelloMsg]] {case msgs:GetAllResult[HelloMsg] => println(s"Consumer: Received ${msgs.msgs.length} messages"); PrintMessages(msgs.msgs)}commandAddress ! GetAll(2 seconds)(wrap)}}}}// for all the other cases return the existing implementation, in essence// we're just ignoring other messages till we change statecase _ => Same}    }

在此对象中,我们定义了一个行为,该行为在接收到第一条消息后也会切换其实现。 在这种情况下,第一个消息称为registerReceiverCmdIn。 通过此消息,我们可以访问(接收方的)actorRef,将GetAll和getOne消息发送至该消息。 切换行为后,我们将处理自己的自定义GetAllMessages消息,这将触发将GetAll消息发送到接收器。 由于未针对从Receiver收到的响应类型键入我们自己的行为,因此我们使用适配器(ctx.spawnAdapter)。 该适配器将接收来自接收器的响应并打印出消息。

最后一个消息部分是一个启动此行为的参与者:

// Simple root actor, which we'll use to start the other actorsval scenario1 = {Full[Unit] {case Sig(ctx, PreStart) => {import Producer._import Consumer._println("Scenario1: Started, now lets start up a number of child actors to do our stuff")// first start the two actors, one implements the receiver pattern, and// the other is the one we control directly.val receiverActor = ctx.spawn(Props(Receiver.behavior[HelloMsg]), "receiver")val consumerActor = ctx.spawn(Props(consumer), "adder")val producerActor = ctx.spawn(Props(producer), "producer")// our producerActor first needs the actorRef it can use to add messages to the receiver// for this we use a wrapper, this wrapper creates a child, which we use to get the// address, to which we can send messages.val wrapper = ctx.spawnAdapter[ActorRef[HelloMsg]] {case p: ActorRef[HelloMsg] => producerActor ! registerReceiverMsgIn(p)}// now send the message to get the external address, the response will be sent// to our own actor as a registerReceiver message, through the adapterreceiverActor ! ExternalAddress(wrapper)// our printing actor needs to now the address of the receiver so send it to himconsumerActor ! registerReceiverCmdIn(receiverActor)// by calling getAllMessages we get the messages within a time period.println("Scenario1: Get all the messages")consumerActor ! GetAllMessages()Thread.sleep(3000)consumerActor ! GetAllMessages()Thread.sleep(5000)consumerActor ! GetAllMessages()Same}}}

这里没什么特别的。 在这种情况下,我们将创建各种角色,并使用ctx.spawnAdapter来获取接收者的外部地址,并将其传递给producerActor。 接下来,我们将接收者参与者的地址传递给消费者。 现在,我们在使用者地址上调用GetAllMessages,该地址将从接收方获取消息并打印出来。

因此,总结一下将在此示例中执行的步骤:

  1. 我们创建一个将运行此方案的根角色。
  2. 从这个根基参与者,我们创建了三个参与者:接收者,消费者和生产者。
  3. 接下来,我们从接收方获取externalAddress(我们将类型为T的消息发送到的地址),并使用适配器将其传递给生产方。
  4. 生产者在接收到此消息后,将切换行为并开始将消息发送到传递的地址。
  5. 同时,根actor将接收方的地址传递给使用者。
  6. 使用者在收到此消息时,将更改行为并现在等待GetAllMessages类型的消息。
  7. 现在,根actor将发送GetAllMessages到使用者。
  8. 当使用者接收到此消息时,它将使用适配器将GetAll消息发送给接收者。 当适配器接收到响应时,它会打印出接收到的消息数量,并通过为接收者从接收到的每条消息发送一个PrintMessage来对使用者进行进一步处理。

这种情况的结果如下所示:

Scenario1: Started, now lets start up a number of child actors to do our stuff
Scenario1: Get all the messages
Consumer: Switching behavior
Consumer: requesting all messages
Producer: Switching behavior
Producer: Adding new 'Hello(hello @ 1446277162929)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277163454)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277163969)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 3 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277162929), Hello(hello @ 1446277163454), Hello(hello @ 1446277163969))Hello(hello @ 1446277162929)Hello(hello @ 1446277163454)Hello(hello @ 1446277163969)
Producer: Adding new 'Hello(hello @ 1446277164488)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277165008)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new 'Hello(hello @ 1446277165529)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277166049)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277166569)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277167089)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 6 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277164488), Hello(hello @ 1446277165008), Hello(hello @ 1446277165529), Hello(hello @ 1446277166049), Hello(hello @ 1446277166569), Hello(hello @ 1446277167089))Hello(hello @ 1446277164488)Hello(hello @ 1446277165008)Hello(hello @ 1446277165529)Hello(hello @ 1446277166049)Hello(hello @ 1446277166569)Hello(hello @ 1446277167089)
Producer: Adding new 'Hello(hello @ 1446277167607)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277168129)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277168650)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277169169)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277169690)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277170210)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new 'Hello(hello @ 1446277170729)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277171249)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277171769)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277172289)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 10 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277167607), Hello(hello @ 1446277168129), Hello(hello @ 1446277168650), Hello(hello @ 1446277169169), Hello(hello @ 1446277169690), Hello(hello @ 1446277170210), Hello(hello @ 1446277170729), Hello(hello @ 1446277171249), Hello(hello @ 1446277171769), Hello(hello @ 1446277172289))Hello(hello @ 1446277167607)Hello(hello @ 1446277168129)Hello(hello @ 1446277168650)Hello(hello @ 1446277169169)Hello(hello @ 1446277169690)Hello(hello @ 1446277170210)Hello(hello @ 1446277170729)Hello(hello @ 1446277171249)Hello(hello @ 1446277171769)Hello(hello @ 1446277172289)
Producer: Adding new 'Hello(hello @ 1446277172808)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277173328)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277173849)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277174369)' to receiver: Actor[akka://Root/user/receiver#1097367365]

酷吧! 从消息序列中可以看到,我们的生产者将消息发送到接收者,接收者将它们排队。 接下来,我们有一个使用者,它请求到目前为止已收到的所有消息并打印出来。

这是关于Akka-Typed的文章的内容,在下一篇文章中,我们将介绍同样在Akka-Typed中呈现的接待员模式。

翻译自: https://www.javacodegeeks.com/2015/11/akka-typed-actors-exploring-the-receiver-pattern.html

akka 消息发送接收

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/336820.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

scale和java比较_浅谈java中BigDecimal的equals与compareTo的区别

这两天在处理支付金额校验的时候出现了点问题,有个金额比较我用了BigDecimal的equals方法来比较两个金额是否相等,结果导致金额比较出现错误(比如3.0与3.00的比较等)。【注:以下所讲都是以sun jdk 1.4.2版本为例,其他版本实现未必…

activemq和jms_带有ActiveMQ和Maven的JMS Sender应用程序

activemq和jms我们已经看到了如何使用ActiveMQ和Maven创建JMS Receiver应用程序 。 让我们看看我们如何类似地创建JMS Sender应用程序 。 web.xml与创建接收器应用程序时使用的相同&#xff1a; <web-app xmlns"http://java.sun.com/xml/ns/javaee"xmlns:xsi&qu…

java初始化hashset_JAVA中的Hashset类

HashSet扩展AbstractSet并且实现Set接口。它创建一个类集&#xff0c;该类集使用散列表进行存储。正像大多数读者很可能知道的那样&#xff0c;散列表通过使用称之为散列法的机制来存储信息。在散列(hashing)中&#xff0c;一个关键字的信息内容被用来确定唯一的一个值&#xf…

java集成lucene_将Lucene搜索集成到应用程序中

java集成lucene本文是我们名为“ Apache Lucene基础知识 ”的学院课程的一部分。 在本课程中&#xff0c;您将了解Lucene。 您将了解为什么这样的库很重要&#xff0c;然后了解Lucene中搜索的工作方式。 此外&#xff0c;您将学习如何将Lucene Search集成到您自己的应用程序中…

java经纬度曲线简化_JAVA 后台计算 经纬度 最短距离

1、 代码块package com.ilogie.tms.util;import java.io.IOException;import java.math.BigDecimal;import java.text.MessageFormat;public class LocationUtils {// 以下为 获得 两点之间最短距离private static final BigDecimal EARTH_RADIUS MathUtil.toBigDecimal(6378.…

java ee的小程序_在Java EE应用程序中实现自动重试

java ee的小程序最初&#xff0c;我想将此博客称为“ 具有拦截器驱动的重试策略的灵活超时 ”&#xff0c;但后来我认为它太“繁重”。 该声明以及修改后的标题应该&#xff08;希望&#xff09;使您了解此帖子可能谈论的内容;-) 触发 这篇文章主要由我在较早的一篇文章中收到…

Java变长数组笛卡尔积_Java 8中的流作为流的笛卡尔积(仅使用流)

小编典典在示例中传递流永远比传递列表更好&#xff1a;private static Stream cartesian(BinaryOperator aggregator, List... lists) {...}并像这样使用它&#xff1a;Stream result cartesian((a, b) -> a b,Arrays.asList("A", "B"),Arrays.asLis…

jboss eap 7_EAP 7 Alpha和Java EE 7入门

jboss eap 7红帽JBoss企业应用程序平台7&#xff08;JBoss EAP 7&#xff09;是基于开放标准构建并符合Java Enterprise Edition 7规范的中间件平台。 它建立在WildFly等经过验证的创新开源技术之上&#xff0c;这将使Java EE 7的开发更加容易。 这是有关如何开始使用最新ALPHA…

php 返回页面重复提交,php防止表单重复提交

后端防止重复提交的基本原理:服务器返回表单页面时&#xff0c;会先生成一个subToken保存于session&#xff0c;并把该subToen传给表单页面。当表单提交时会带上subToken&#xff0c;服务器获取表单信息判断session保存的subToken和表单提交subToken是否一致。若不一致或sessio…

spark在服务器运行示例_创建示例HTTPS服务器以获取乐趣和收益

spark在服务器运行示例通常&#xff0c;在开发或/和针对真实场景进行测试期间&#xff0c;我们&#xff08;开发人员&#xff09;面临着运行成熟的HTTPS服务器的需求&#xff0c;可能同时进行一些模拟。 在JVM平台上&#xff0c;除非您知道适合此工作的正确工具&#xff0c;否则…

为什么说php单线程,php单线程的缺点是什么?

PHP即“超文本预处理器”&#xff0c;是一种通用开源脚本语言。PHP是在服务器端执行的脚本语言&#xff0c;与C语言类似&#xff0c;是常用的网站编程语言。PHP独特的语法混合了C、Java、Perl以及 PHP 自创的语法。利于学习&#xff0c;使用广泛&#xff0c;主要适用于Web开发领…

sidecar_Spring Cloud Sidecar –节点初始化

sidecar在上一篇博客文章中&#xff0c;我描述了Sidecar应用程序如何用于在Eureka中注册Cassandra节点&#xff0c;更一般地&#xff0c;它可以用于在Eureka中注册任何非JVM应用程序。 在本文中&#xff0c;我将介绍应用程序如何查询Sidecar注册节点。 发现注册节点–初始化后…

php 对象 final,PHP7_OOP_对象重载以及魔术方法_对象遍历_final关键字

//对象遍历&#xff1a;class MyClass{public$var1 "value 1";public$var2 "value 2";public$var3 "value 3";protected$protected "pro var";private $private "privar";functioninterateVisible(){echo "MyClas…

供给测结构性改革内容_智能包装结构,提高可测性

供给测结构性改革内容有很多方法可以将整个应用程序分成多个包。 关于按功能或按层打包的优缺点的讨论可以在许多编程博客和论坛上找到。 我想从可测试性开始讨论这个主题&#xff0c;看看它是否会带来任何有意义的结果。 首先&#xff0c;让我们尝试描述我们通常希望跨不同层…

openshift_在OpenShift上扩展Java EE微服务

openshift这个小系列的前两个部分介绍了如何使用WildFly Swarm构建一个小型的JAX-RS服务并将其打包到Docker映像中 。 您学习了如何将此示例部署到OpenShift &#xff0c;现在是时候对其进行一些扩展了。 为什么扩展很重要 基于微服务的体系结构的关键方面之一是分解为高性能的…

php 异步post,php – 使用POST的异步cURL

我正在制作一个命令行应用程序.在执行登录过程后,我需要同时通过cURL发出多个POST请求 – 这意味着传出请求必须发送会话ID等.事件链如下&#xff1a;>我用curl_init打开cURL连接>我使用curl_exec登录远程站点发送POST请求,并获得返回的HTML代码作为响应>我同时向同一…

log4j2 logger_简单一致的Log4j2 Logger命名

log4j2 logger在“ 带有Java 7方法句柄的可移植记录器名称”一文中 &#xff0c;我写了关于使用Java 7的方法句柄来命名类的记录器的文章。 我在那篇文章中说过&#xff0c;这种方法的优点包括记录器命名的一致性&#xff0c;并避免了意外的代码复制和粘贴&#xff0c;这可能导…

java 子类tostring,JAVA中Object类的toString()方法,objecttostring

JAVA中Object类的toString()方法&#xff0c;objecttostringtoStringpublic String toString()返回该对象的字符串表示。通常&#xff0c;toString 方法会返回一个“以文本方式表示”此对象的字符串。结果应是一个简明但易于读懂的信息表达式。建议所有子类都重写此方法。Objec…

openshift 部署_在OpenShift上部署Java EE微服务

openshift 部署我昨天用WildFly Swarm在博客上发布了有关简单JAX-RS微服务的博客。 您学习了如何使用Maven构建所谓的“胖子”&#xff0c;还使用Maven Docker插件对我们的微服务进行了Docker化并在Docker Machine上本地运行。 这是在本地测试事物的好方法。 到目前为止&#x…

在程序中实现java源文件编译,在程序中实现对java源文件编译的3种方法

一般情况下对java源文件的编译均是在代码完成后使用javac编译的&#xff0c;不管是使用 IDE还是直接使用命令行。这里要说的情况是比较特别的&#xff0c;就是在代码内动态的编译一些代码。比如你想通过在某个目录下通过放置一些源代码的方式来实现对程序功能的动态扩展&#x…