具有Akka反应流的反应队列

反应性流是最近宣布的一项计划,旨在在JVM上为具有内置背压的异步流处理创建标准。 该工作组由Typesafe,Red Hat,Oracle,Netflix等公司组成。

早期的实验性实现之一是基于Akka的 。 预览版0.3包括演员生产者和消费者,这为集成提供了新的可能性。



iStock_000040449260大-300x198

为了测试新技术,我实现了一个非常简单的Reactive Message Queue 。 该代码处于PoC阶段,缺乏错误处理等功能,但如果使用正确,则可以正常工作!

队列是响应式的,这意味着消息将在需要时传递给感兴趣的各方,而无需轮询。 在发送消息时(以便发送者不会使代理不堪重负)以及在接收消息时(以便代理仅发送与接收者可以使用的消息一样多的消息)都会施加反压。

让我们看看它是如何工作的!

队列

首先,队列本身是一个参与者,对(反应式)流一无所知。 该代码位于com.reactmq.queue包中。 actor接受以下actor消息(“ message”一词在这里超载,因此我将使用普通的“ message”来表示我们发送到队列和从队列中接收的消息,“ actor-messages”是Scala。类实例发送给演员):

  • SendMessage(content) –发送具有指定String内容的消息。 带有消息ID的回复( SentMessage(id) )被发送回发送者
  • ReceiveMessages(count) –表示发件人(执行者)想接收的邮件count最多的信号。 该计数与先前发出信号的需求累加。
  • DeleteMessage(id) –毫不奇怪,删除一条消息

队列实现是ElasticMQ中的简化版本。 收到消息后,如果在10秒钟内未将其删除(确认),则可以再次接收。

当一个actor发出对消息的需求信号时(通过将ReceiveMessages发送到队列actor),它应该期望ReceivedMessages(msgs)任意数量的ReceivedMessages(msgs) actor-message答复,其中包含接收到的数据。

变得被动

要创建和测试我们的反应式队列,我们​​需要三个应用程序:

  • Sender
  • 中央Broker
  • Receiver

我们可以运行任意数量的SendersReceivers ,但是当然我们应该只运行一个Broker

我们需要做的第一件事是通过网络将SenderBroker连接,将ReceiverBroker连接。 我们可以使用Akka IO扩展和反应式TCP扩展来做到这一点。 使用connectbind对,我们在绑定端获得了连接流:

// sender:
val connectFuture = IO(StreamTcp) ? StreamTcp.Connect(settings, sendServerAddress)connectFuture.onSuccess {case binding: StreamTcp.OutgoingTcpConnection =>logger.info("Sender: connected to broker")// per-connection logic
}// broker:
val bindSendFuture = IO(StreamTcp) ? StreamTcp.Bind(settings, sendServerAddress)bindSendFuture.onSuccess {case serverBinding: StreamTcp.TcpServerBinding =>logger.info("Broker: send bound")Flow(serverBinding.connectionStream).foreach { conn =>// per-connection logic}.consume(materializer)
}

有一个用于发送和接收消息的地址。

发件人

首先让我们看一下Sender的每个连接逻辑。

Flow(1.second, () => { idx += 1; s"Message $idx from $senderName" }).map { msg =>logger.debug(s"Sender: sending $msg")createFrame(msg)}.toProducer(materializer).produceTo(binding.outputStream)

我们正在创建一个滴答流,它每秒产生一个新消息(非常方便测试)。 使用map流转换器,我们用消息创建了一个字节帧(稍后会详细介绍)。 但这仅仅是对我们(非常简单)流的外观的描述; 它需要使用物化 toProducer方法,该方法将提供流变换节点的具体实现。 目前,只有一个FlowMaterializer ,这同样令人惊讶地使用引擎盖下的Akka actor来实际创建流和流。

最后,我们将刚刚创建的生产者连接到TCP绑定的outputStream ,它恰好是使用者。 现在,我们有了一个反应性的网络消息流,这意味着仅当Broker可以接受消息时,才发送消息。 否则,反压将一直施加到滴答声产生器。

reactmq-actors1-248x300

代理:发送消息

在网络的另一端是Broker 。 让我们看看消息到达时会发生什么。

Flow(serverBinding.connectionStream).foreach { conn =>logger.info(s"Broker: send client connected (${conn.remoteAddress})")val sendToQueueConsumer = ActorConsumer[String](system.actorOf(Props(new SendToQueueConsumer(queueActor))))// sending messages to the queue, receiving from the clientval reconcileFrames = new ReconcileFrames()Flow(conn.inputStream).mapConcat(reconcileFrames.apply).produceTo(materializer, sendToQueueConsumer)
}.consume(materializer)

首先,我们创建了一个Flow ,那将是字节输入流-从连接的输入流。 接下来,我们重新构造使用框架发送的String实例,最后将流定向到发送到队列的使用者。

SendToQueueConsumer是到主队列SendToQueueConsumer的每个连接的桥。 它使用Akka的Reactive Streams实施中的ActorConsumer特性来自动管理应该在上游发出信号的需求。 利用这一特征,我们可以创建一个由参与者支持的反应流Consumer[_] ,从而实现完全可定制的接收器。

class SendToQueueConsumer(queueActor: ActorRef) extends ActorConsumer {private var inFlight = 0override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {override def inFlightInternally = inFlight}override def receive = {case OnNext(msg: String) =>queueActor ! SendMessage(msg)inFlight += 1case SentMessage(_) => inFlight -= 1}
}

需要提供给ActorConsumer的方法是一种度量当前正在处理的流项目的数量的方法。 在这里,我们正在计算已发送到队列但尚未收到ID的消息数(因此,队列正在处理它们)。

消费者收到包装在OnNext actor消息中的新消息; 因此, OnNext由流发送到SentMessage ,而SentMessage被队列SentMessage发送以回复SendMessage

接收

接收部分以类似的方式完成,尽管它需要一些额外的步骤。 首先,如果您查看Receiver ,您会看到我们正在从输入流中读取字节,从帧中重构消息,并发回ID,从而确认消息。 实际上,我们将在接收消息和发回ID之间运行一些消息处理逻辑。

Broker方,我们为每个连接创建两个流。

一个是发送给接收者的消息流,另一个是来自接收者的已确认消息id的流,这些流被简单地转换为将DeleteMessage消息发送给队列actor。

与使用者类似,我们需要从队列参与者到流的每个连接接收桥。 这是在ReceiveFromQueueProducer实现的。 在这里,我们扩展了ActorProducer特性,它使您可以完全控制实际创建流中消息的过程。

在此参与者中,流正在发送Request参与者消息,以发出需求信号。 当有需求时,我们从队列中请求消息。 队列最终将以一个或多个ReceivedMessages actor消息进行响应(当队列中有任何消息时); 由于消息的数量永远不会超过信号需求,因此我们可以安全地调用ActorProducer.onNext方法,该方法将给定的项目发送到下游。

构图

一个小细节是,我们需要一个自定义的框架协议(感谢Roland Kuhn的澄清 ),因为TCP流只是字节流,因此我们可以获得数据的任意片段,需要稍后重新组合。 幸运的是,实现这样的框架非常简单–请参阅Framing类。 每个帧由消息的大小和消息本身组成。

加起来

使用反应式流和Akka实施,可以轻松创建具有端到端背压的反应式应用程序。 上面的队列虽然缺少许多功能和证明,但不允许Senders使Broker过载,而另一方面, Broker会使Receivers过载。 所有这些,而无需实际编写任何背压处理代码!

翻译自: https://www.javacodegeeks.com/2014/06/reactive-queue-with-akka-reactive-streams.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/363146.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Django框架下报的版本问题

报错环境 python3.6.5,django2.2,PyMySQL0.9.3 …… django.core.exceptions.ImproperlyConfigured: mysqlclient 1.3.13 or newer is required; you have 0.9.3. 解决方法: Django连接MySQL时默认使用MySQLdb驱动,但MySQLdb不支持…

Gradle入门:简介

Gradle是一种构建工具,可以使用基于Groovy编程语言的内部DSL替换基于XML的构建脚本。 最近它吸引了很多关注,这就是为什么我决定仔细研究一下。 这篇博客文章是我的Gradle教程的第一部分,它有两个目标: 帮助我们安装Gradle 描…

首页回顾功能

公司要弄一个首页回顾的功能,可以查看以往某个时间的首页。程序每天自动在上午和下午分别抓取一个页面,生成地址为http://xxx.com/review/channel20090715am.html 的形式。 这个功能用到了jQuery UI 的 datepicker日历插件,但是主要的还是通过…

浏览器内核总结

一般来讲,浏览器分为外壳部分和渲染部分。外壳部分就是用户看得见摸得到的外观和操作界面;而渲染部分则包括了浏览器内核和JS引擎,其中JS引擎主要负责执行javascript语言实现网页上的动作,而内核则负责渲染网页,把数据变成用户可以…

mysql卸载重装总是卡在starting server这一选项

因为自己不小心把msyql给下载了,重装了一个5.7版本的可是在安装时卡在starting server这一部分,运行不下去。重写卸载重装仍然不成功,还是卡在starting server.无法继续下面的安装,查看日志也没有报错信息。 问题分析:…

箭头函数与普通函数的区别

箭头函数: let fun () > {console.log(lalalala); } 普通函数: function fun() {console.log(lalla); } 箭头函数相当于匿名函数,并且简化了函数定义。箭头函数有两种格式,一种只包含一个表达式,连{ ... }和return…

前端基础-HTML标记语言

阅读目录 一、 HTML标签与文档结构二、 HTML标签详细语法与注意点三、 HTML中标签分类四、 HTML注释 一、 HTML标签与文档结构 HTML作为一门标记语言,是通过各种各样的标签来标记网页内容的。我们学习HTML主要就是学习的HTML标签。 那什么是标签呢? #…

带有Gradle的Docker容器分为4个步骤

您是否需要通过Java Web应用程序创建Docker映像? 您在使用Gradle吗? 如果是这样,那么您距Docker nivana仅4步之遥。 对于此示例,我将使用一个简单的Spring Boot应用程序。 您可以在我的名为galoshe的Github存储库中找到所有源代码…

第七周

这个作业属于哪个课程C语言程序设计 (第三版)这个作业要求在哪里2019春季第七周作业我的课程目标学习指针的运用这个作业在哪个具体方面帮助我实现目标这个作业让我知道了指针实用性参考文献无一、2019春第七周作业(基础题) 7-2 自…

Java EE 8 –为更多设备提供更多应用程序

如果我不喜欢夏天的一件事,那就是事实是没有太多要分享或谈论的新闻。 谁决定将Java Day Tokyo置于这一年的无聊时间里,谁干得不错,就给我一个机会撰写有关新的和即将到来的Java EE 8规范的博客文章,其中包含了更多的思想和建议。…

Gradle入门:依赖管理

即使不是没有可能,创建没有任何外部依赖关系的现实应用程序也是一项挑战。 这就是为什么依赖性管理是每个软件项目中至关重要的部分的原因。 这篇博客文章描述了我们如何使用Gradle管理项目的依赖关系。 我们将学习配置已使用的存储库和所需的依赖项。 我们还将通过…

HTML5常用标签及特殊字符表

*http://html5doctor.com/nav*http://html5doctor.com/article*http://html5doctor.com/section*http://html5doctor.com/asidehttp://html5doctor.com/divhttp://html5doctor.com/figurehttp://html5doctor.com/outlinehttp://html5doctor.com/semantics p 和 span 的理解 p标…

【转载】使用Imaging组件加载GIF动画

Mobil手机加载GIF动态图像的方法有两种,一个就是使用GIF89a标准算法,另一个就是使用SDK自带的Imaging组件,这两种方法是很典型的手机图像处理技术的实践。使用Imaging组件加载GIF比使用标准算法处理高效的多,特别是在处理真彩GIF动…

【处理手记】Configuration system failed to initialize异常的另类原因

有个c#程序在某台电脑上,执行某个操作时,总是会报如图错误: 度娘一番,发现市面上常见的原因是配置文件中的特定节点的位置不对,或者配置文件损坏等等,而这个程序根本没有使用内置的配置文件方案&#xff0c…

学习《Building Applications with FME Objects》 之四 从数据集读取要素

FMEOReader可以访问任何支持格式的数据。 FMEOReader返回两类要素:schema(模式)要素和数据要素,模式要素用于描述数据集模型。每种支持的格式都有一个模式,一个模式要素是一类要素的数据模型,模式要素描述属…

使用Zapier将应用程序与Neo4j集成

最近,我被带往Zapier ,以便在系统之间完成一些轻量级的集成,以快速地进行概念验证。 最初是持怀疑态度的,我发现它确实可以节省时间,并将您从未集成过的系统所有部分捆绑在一起。 而且,这是人们集成他们使…

[silverlight基础]仿文字连接跑马灯效果-高手绕道

运行效果如下:分析示意图&#xff1a;代码:1<Canvas x:Name"a"Background"AliceBlue"MouseEnter"a_MouseEnter"MouseLeave"a_MouseLeave"Cursor"Hand">2<Canvas.Clip>3<RectangleGeometry RadiusX"0&qu…

Hibernate脏检查的剖析

介绍 持久性上下文使实体状态转换进入队列&#xff0c;该实体状态转换在刷新后转换为数据库语句。 对于托管实体&#xff0c;Hibernate可以代表我们自动检测传入的更改并安排SQL UPDATE。 这种机制称为自动脏检查 。 默认的脏检查策略 默认情况下&#xff0c;Hibernate检查所有…

软件工程第二次作业-VSTS单元测试

一、选择开发工具 开发工具选择 Visual studio 2017 社区版&#xff0c;开发语言为C 由于之前已经安装完毕&#xff0c;所以不上传安装过程&#xff0c;主界面如下&#xff1a; 二、练习自动单元测试 使用的测试工具是VSTS&#xff0c;具体步骤如下&#xff1a; 1.编写一个判断…

随便写写(8)

凌晨两点了&#xff0c;还在捣鼓虚拟机&#xff0c;教师节老师们吃不上饭&#xff0c;罪过可就大了。远程有点慢&#xff0c;还遇到个小问题&#xff0c;.netfx 3.5几次都没装上&#xff0c;看了下日志&#xff0c;好像是ghost里的.netfx 2.0有问题&#xff0c;想卸载之&#x…