react 消息队列_具有AkkaReact流的React队列

react 消息队列

React性流是最近宣布的一项计划,旨在在JVM上为具有内置背压的异步流处理创建标准。 该工作组由Typesafe,Red Hat,Oracle,Netflix等公司组成。

早期的实验性实现之一是基于Akka的 。 预览版0.3包括演员生产者和消费者,这为集成提供了新的可能性。



iStock_000040449260大-300x198

为了测试新技术,我实现了一个非常简单的Reactive Message Queue 。 该代码处于PoC阶段,缺乏错误处理等功能,但如果使用正确,则可以正常工作!

队列是响应式的,这意味着消息将在需要时传递给感兴趣的各方,而无需轮询。 在发送消息时(以便发送者不会使代理不堪重负)和在接收消息时(以便代理仅发送与接收者可以使用的消息一样多的消息)都会施加反压。

让我们看看它是如何工作的!

队列

首先,队列本身是一个参与者,对(React式)流一无所知。 该代码位于com.reactmq.queue包中。 actor接受以下actor消息(“ message”一词在此处已重载,因此我将使用普通的“ message”来表示我们发送到队列和从队列中接收的消息,而“ actor-messages”则为Scala)。类实例发送给演员):

  • SendMessage(content) –发送具有指定String内容的消息。 回复( SentMessage(id) )被发送回带有消息ID的发件人
  • ReceiveMessages(count) –表示发件人(演员)想接收最多邮件count信号。 该计数与先前发出信号的需求累加。
  • DeleteMessage(id) –毫不奇怪,删除一条消息

队列实现是ElasticMQ的简化版本。 收到消息后,如果在10秒钟内未将其删除(确认),则可以再次接收。

当一个actor发出对消息的需求信号时(通过将ReceiveMessages发送到队列actor),它应该期望有任意数量的ReceivedMessages(msgs) actor-message答复,其中包含接收到的数据。

变得被动

要创建和测试我们的React式队列,我们​​需要三个应用程序:

  • Sender
  • 中央Broker
  • Receiver

我们可以运行任何数量的SendersReceivers ,但是当然我们应该只运行一个Broker

我们需要做的第一件事是通过网络将SenderBroker连接,将ReceiverBroker连接。 我们可以使用Akka IO扩展和React式TCP扩展来做到这一点。 使用connectbind对,我们在绑定端获得了一个连接流:

// sender:
val connectFuture = IO(StreamTcp) ? StreamTcp.Connect(settings, sendServerAddress)connectFuture.onSuccess {case binding: StreamTcp.OutgoingTcpConnection =>logger.info("Sender: connected to broker")// per-connection logic
}// broker:
val bindSendFuture = IO(StreamTcp) ? StreamTcp.Bind(settings, sendServerAddress)bindSendFuture.onSuccess {case serverBinding: StreamTcp.TcpServerBinding =>logger.info("Broker: send bound")Flow(serverBinding.connectionStream).foreach { conn =>// per-connection logic}.consume(materializer)
}

有一个用于发送和接收消息的地址。

寄件人

首先让我们看一下Sender的每个连接逻辑。

Flow(1.second, () => { idx += 1; s"Message $idx from $senderName" }).map { msg =>logger.debug(s"Sender: sending $msg")createFrame(msg)}.toProducer(materializer).produceTo(binding.outputStream)

我们正在创建一个滴答流,它每秒产生一个新消息(非常方便测试)。 使用map流转换器,我们用消息创建了一个字节帧(稍后会详细介绍)。 但这仅是我们(非常简单)流的外观的描述; 它需要使用物化 toProducer方法,该方法将提供流变换节点的具体实现。 当前只有一个FlowMaterializer ,这同样令人惊讶地使用引擎盖下的Akka actor来实际创建流和流。

最后,我们将刚刚创建的生产者连接到TCP绑定的outputStream ,而恰好是消费者。 现在,我们有了一个React性的网络上的消息流,这意味着仅当Broker可以接受消息时才发送消息。 否则,反压将一直施加到滴答声产生器。

reactmq-actors1-248x300

代理:发送消息

在网络的另一端是Broker 。 让我们看看消息到达时会发生什么。

Flow(serverBinding.connectionStream).foreach { conn =>logger.info(s"Broker: send client connected (${conn.remoteAddress})")val sendToQueueConsumer = ActorConsumer[String](system.actorOf(Props(new SendToQueueConsumer(queueActor))))// sending messages to the queue, receiving from the clientval reconcileFrames = new ReconcileFrames()Flow(conn.inputStream).mapConcat(reconcileFrames.apply).produceTo(materializer, sendToQueueConsumer)
}.consume(materializer)

首先,我们创建了一个Flow ,那将是字节输入流-从连接的输入流。 接下来,我们重新构造使用框架发送的String实例,最后将流定向到发送到队列的使用者。

SendToQueueConsumer是到主队列SendToQueueConsumer的每个连接的桥。 它使用Akka的Reactive Streams实施中的ActorConsumer特性来自动管理应该在上游发出信号的需求。 利用该特征,我们可以创建一个由演员支持的React流Consumer[_] ,从而实现完全可定制的接收器。

class SendToQueueConsumer(queueActor: ActorRef) extends ActorConsumer {private var inFlight = 0override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {override def inFlightInternally = inFlight}override def receive = {case OnNext(msg: String) =>queueActor ! SendMessage(msg)inFlight += 1case SentMessage(_) => inFlight -= 1}
}

需要提供给ActorConsumer是一种测量当前正在处理的流项目的方法。 在这里,我们正在计算已发送到队列但尚未收到ID的消息数(因此,队列正在处理它们)。

消费者收到包装在OnNext actor消息中的新消息; 因此, OnNext由流发送给SentMessage ,而SentMessage被队列SentMessage发送以回复SendMessage

接收

接收部分以类似的方式完成,尽管它需要一些额外的步骤。 首先,如果您查看Receiver ,您将看到我们正在从输入流中读取字节,从帧中重构消息,并发回ID,从而确认消息。 实际上,我们将在接收消息和发送回ID之间运行一些消息处理逻辑。

Broker方,我们为每个连接创建两个流。

一个是发送给接收者的消息流,另一个是来自接收者的已确认消息ID的流,这些流被简单地转换为将DeleteMessage消息发送给队列actor。

与使用者类似,我们需要从队列参与者到流的每个连接的接收桥。 这是在ReceiveFromQueueProducer实现的。 在这里,我们扩展了ActorProducer特性,它使您可以完全控制实际创建流中消息的过程。

在此参与者中,流正在发送Request参与者消息,以发出需求信号。 有需求时,我们从队列中请求消息。 队列最终将以一个或多个ReceivedMessages actor消息进行响应(当队列中有任何消息时); 由于消息的数量永远不会超出信号需求,因此我们可以安全地调用ActorProducer.onNext方法,该方法将给定的项目发送到下游。

构图

一个小细节是我们需要一个自定义的框架协议(感谢Roland Kuhn的澄清 ),因为TCP流只是一个字节流,因此我们可以获得数据的任意片段,以后需要重新组合。 幸运的是,实现这样的框架非常简单–请参阅Framing类。 每个帧都由消息的大小和消息本身组成。

加起来

使用React式流和Akka实施,可以轻松创建具有端到端背压的React式应用程序。 上面的队列虽然缺少很多功能和证明,但不允许Senders使Broker过载,而另一方面, Broker会使Receivers过载。 所有这些,而无需实际编写任何背压处理代码!

翻译自: https://www.javacodegeeks.com/2014/06/reactive-queue-with-akka-reactive-streams.html

react 消息队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/342301.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

lora和lorawan无线技术在物联网的应用

Lora联盟表示:“Lora设备和开放的LoRaWAN协议使智能物联网应用能够解决我们智慧城市建设面临的一些最大挑战:能源管理、自然资源减少、污染控制、基础设施效率、防灾等。” LoRaWAN的用途是什么? LoRaWAN在物联网和智慧城市部署中具有多重用途…

LoRa无线技术与LoRaWAN网关模块的区别

有不少人分不清lorawan无线模块与LoRa网关无线传输技术到底有什么区别,他们在物联网领域的应用到底是什么样的。 LoRaWAN指的是MAC层的组网协议,而LoRa是一个物理层的协议。虽然现有的LoRaWAN组网基本上都使用LoRa作为物理层,但是LoRaWAN的协…

工业交换机和普通交换机的区别

对于交换机领域这一块,想必很多做安防的朋友不会陌生吧,交换机又分为商用网络交换机跟工业以太网交换机,为满足灵活多变的工业环境(environment)和抗干扰的方面来看,工业交换机和商业(Business)的交换机有特别大的差距&#xff0c…

AWS Messaging Services:选择合适的服务

1. AWS Messaging Services AWS Messaging服务使云中相似和不同的软件系统能够异步通信和交换信息。 这些软件系统可能不兼容且使用不同的语言,也可能在不同的平台上。 AWS消息传递服务具有高度可用性,高度可扩展性和高度可靠性。 AWS支持针对不同类型用…

变频器的四大组成部分和工作原理

随着电子技的发展变频器已经有了很大的变化,但其基本原理并没有发生改变。变频器的主要部分有四个:整流器、中间电路、逆变器、控制电路。 1)、整流器 通用变频器的整流电路是由三相桥式整流桥组成。它的功能是将工频电源进行整流&#xff0…

工业交换机与普通商用交换机的对比详解

工业交换机是专门为满足灵活多变的工业应用需求而设计的,从可靠性和抗干扰方面考虑,工业级别的交换机和商用的产品相差还是很大得,而且工业交换机有很多实用的功能,安装和电源的使用方面更符合工业现场的要求。工业场合选用该类型…

变频器的工作原理和功能应用

变频器(Variable-frequency Drive,VFD)是应用变频技术与微电子技术,通过改变电机工作电源频率方式来控制交流电动机的电力控制设备。 变频器主要由整流(交流变直流)、滤波、逆变(直流变交流&am…

飞畅科技-工业以太网交换机组网方式介绍

工业以太网交换机专门为满足灵活多变的工业应用需求而设计,提供一种高性价比工业以太网通讯解决方案。工业交换机的应用十分广泛,在行业应用方面,主要应用于:煤矿安全、轨道交通、工厂自动化、水处理系统、城市安防等。接下来&…

轻松读懂三极管,原来它是这样工作的

一:三极管的介绍 三极管,也就是半导体三极管,是一种控制电流的半导体器件,其作用就是吧微弱信号放大成幅度值较大的电信号,也用作于无触点开关。通常,三极管具有电流放大的作用,它的结构是在一…

飞畅科技-工业以太网的应用现状及前景展望

由于以太网无可争议的优势,将以太网应用于工业自动化领域正成为人们关注的热点。那么,以太网用于工业领域需要要解决哪些问题,其发展前景怎么样呢?接下来我们就来详细的介绍下工业以太网的应用现状及发展前景。感兴趣的朋友就一起…

Spring Boot –如何跳过缓存thyemeleaf模板,js,css等以每次绕过重启服务器

Spring Boot自动配置为ThyemeLeaf注册的默认模板解析器是基于类路径的,这意味着它从编译的资源/ target / classes / **加载模板和其他静态资源。 要加载对资源(HTML,js,CSS等)的更改,我们可以 每次都重新…

「低功耗蓝牙模块」主从一体 蓝牙嗅探-助力智能门锁

一、BLE蓝牙的具体优势: 1、BLE蓝牙模块的待机时间超长 市面上的蓝牙智能锁基本都是使用干电池供电,而BLE低功耗蓝牙模块在广播、传输、待机和睡眠模式下均拥有超低的功耗,比如E104-2G4U04A模块,最大发射功率仅为2.5mW。 2、可以使…

工业以太网的优点有哪些?

以太网支持的传输介质为粗同轴电缆、细同轴电缆、双绞线、光纤等,其最大优点是简单,经济实用,易为人们所掌握,所以深受广大用户欢迎。那么,工业以太网具有哪些优势呢?接下来我们就跟随飞畅科技的小编一起来…

蓝牙模块的5大应用场景

蓝牙模块,作为集成蓝牙无线技术功能的PCBA板,主要用于短距离无线通讯,已经作为物联网无线传输发展的中坚力量。那么蓝牙模块在实际生活中有哪些应用呢?跟亿佰特小编一起来看看吧 一、智慧医疗 当前的健康医疗设备通常是可穿戴产品…

飞畅科技-工业以太网交换机的差异性

通过之前对工业交换机的认知,我们了解到工业以太网交换机采用存储转换的交换方式,同时提高了以太网通信速度,并且内置智能报警设计监控网络运行状况,使得在恶劣危险的工业环境中保证以太网可靠稳定的运行。那么,工业以…

rs485通信OSI模型网络层

网络层处理发生在RS485总线上的设备之间的实际通信。由于RS485接口主要是一种电气规范,因此对话可以到此结束,但由于它支持多点,因此需要在 OSI 模型中解决它。 没有针对网络层寻址的固定规范,但RS485总线必须由主机正确管理以避…

HackTheBox - Medium - Linux - Jupiter

Jupiter Jupiter 是一台中等难度的 Linux 机器,它有一个使用 PostgreSQL 数据库的 Grafana 实例,该数据库在权限上过度扩展,容易受到 SQL 注入的影响,因此容易受到远程代码执行的影响。一旦站稳脚跟,就会注意到一个名…

工业以太网交换机的冗余功能及发展历程介绍

由于工业环境对工业控制网络可靠性能的超高要求,工业以太网的冗余功能应运而生。从快速生成树冗余(RSTP)、环网冗余(RapidRing)到主干冗余(Trunking),都有各自不同的优势和特点。报警、串口使用、主干&…

常用电源符号含义分享

电源符号,你是否还傻傻分不清楚?常用电源符号附上! 在电路设计中,总会出现各式各样的电源符号,经常会把人弄懵逼,今天小编整理了二十多个比较常用的电源符号分享给大家,快收藏呀。 1.VBB&#…

activemq网络桥接_ActiveMQ –经纪人网络解释–第5部分

activemq网络桥接在前面的第4部分中,我们已经看到了如何使用网络连接器在队列中平衡远程使用者的负载。 在第5部分中,如果主题上同时存在多个远程持久订阅者,我们将看到相同的配置如何工作。 考虑以下配置…。 图1:经纪人网络–…