akka使用_使用Akka简化交易系统

akka使用

我的同事正在开发一种交易系统,该系统可以处理大量的传入交易。 每笔交易都涵盖一种Instrument (例如债券或股票),并且具有某些(现在)不重要的属性。 他们坚持使用Java(<8),所以我们坚持下去:

class Instrument implements Serializable, Comparable<Instrument> {private final String name;public Instrument(String name) {this.name = name;}//...Java boilerplate}public class Transaction {private final Instrument instrument;public Transaction(Instrument instrument) {this.instrument = instrument;}//...Java boilerplate}

Instrument稍后将用作HashMap的键,因此将来我们会主动实现Comparable<Instrument> 。 这是我们的领域,现在的要求是:

  1. 交易进入系统,需要尽快处理(无论如何)
  2. 我们可以自由地以任何顺序处理它们
  3. …但是,同一种工具的交易需要按照进来时完全相同的顺序进行处理。

最初的实现很简单–将所有传入的事务放入一个使用者的队列(例如ArrayBlockingQueue )中。 这满足了最后一个要求,因为队列在所有事务中保留了严格的FIFO顺序。 但是,这样的架构阻止了针对不同工具的不相关交易的并发处理,从而浪费了令人信服的吞吐量提高。 毫无疑问,这种实现尽管很简单,却成为了瓶颈。

第一个想法是以某种方式分别按工具和过程工具拆分传入的交易。 我们提出了以下数据结构:

priavate final ConcurrentMap<Instrument, Queue<Transaction>> queues = new ConcurrentHashMap<Instrument, Queue<Transaction>>();public void accept(Transaction tx) {final Instrument instrument = tx.getInstrument();if (queues.get(instrument) == null) {queues.putIfAbsent(instrument, new LinkedBlockingQueue<Transaction>());}final Queue<Transaction> queue = queues.get(instrument);queue.add(tx);
}

! 但是最坏的时刻还没有到来。 您如何确保最多一个线程一次处理每个队列? 毕竟,否则,两个线程可以从一个队列(一种工具)中提取项目,并以相反的顺序处理它们,这是不允许的。 最简单的情况是每个队列都有一个Thread -这无法扩展,因为我们期望成千上万种不同的工具。 因此,我们可以说N线程,让每个线程处理队列的一个子集,例如instrument.hashCode() % N告诉我们哪个线程负责处理给定的队列。 但是由于以下三个原因,它仍然不够完美:

  1. 一个线程必须“观察”许多队列(很可能是忙于等待),并始终对其进行遍历。 或者,队列可能以某种方式唤醒其父线程
  2. 在最坏的情况下,所有工具都将具有冲突的哈希码,仅针对一个线程-这实际上与我们最初的解决方案相同
  3. 这只是该死的复杂! 漂亮的代码并不复杂!

实现这种怪异是可能的,但是困难且容易出错。 此外,还有另一个非功能性的要求:仪器来来往往,随着时间的流逝,成千上万的仪器。 一段时间后,我们应删除代表最近未见过的仪器的地图条目。 否则我们会发生内存泄漏。

如果您能提出一些更简单的解决方案,请告诉我。 同时,让我告诉你我对同事的建议。 如您所料,它是Akka –结果非常简单。 我们需要两种角色: DispatcherProcessorDispatcher有一个实例,并接收所有传入的事务。 它的职责是为每个Instrument找到或生成工作Processor角色,并将交易推向该角色:

public class Dispatcher extends UntypedActor {private final Map<Instrument, ActorRef> instrumentProcessors = new HashMap<Instrument, ActorRef>();@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {dispatch(((Transaction) message));} else {unhandled(message);}}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}private ActorRef findOrCreateProcessorFor(Instrument instrument) {final ActorRef maybeActor = instrumentProcessors.get(instrument);if (maybeActor != null) {return maybeActor;} else {final ActorRef actorRef = context().actorOf(Props.create(Processor.class), instrument.getName());instrumentProcessors.put(instrument, actorRef);return actorRef;}}
}

这很简单。 由于我们的Dispatcher actor实际上是单线程的,因此不需要同步。 我们几乎没有收到Transaction ,查找或创建Processor并进一步传递Transaction 。 这是Processor实现的样子:

public class Processor extends UntypedActor {private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {process(((Transaction) message));} else {unhandled(message);}}private void process(Transaction tx) {log.info("Processing {}", tx);}
}

而已! 有趣的是,我们的Akka实现几乎与我们第一个使用队列映射的想法相同。 毕竟,参与者只是一个队列,还有一个(逻辑)线程处理该队列中的项目。 区别在于:Akka管理有限的线程池,并可能在成千上万的参与者之间共享它。 而且,由于每个工具都有其专用(和“单线程”)执行器,因此可以保证每个工具的事务的顺序处理。

还有一件事。 如前所述,有大量的乐器,我们不想让演员们出现一段时间了。 假设如果Processor在一个小时内未收到任何交易,则应停止并收集垃圾。 如果以后我们收到此类工具的新交易,则可以随时重新创建它。 这是一个非常棘手的问题–我们必须确保,如果在处理器决定删除自身时交易到达,我们就不能放弃该交易。 Processor没有停止自身,而是向其父Processor发出空闲时间过长的信号。 然后, Dispatcher将发送PoisonPill到它。 因为ProcessorIdleTransaction消息都按顺序处理,所以没有交易发送到不再存在的参与者的风险。

每个setReceiveTimeout通过使用setReceiveTimeout安排超时来独立地管理其生命周期:

public class Processor extends UntypedActor {@Overridepublic void preStart() throws Exception {context().setReceiveTimeout(Duration.create(1, TimeUnit.HOURS));}@Overridepublic void onReceive(Object message) throws Exception {//...if (message instanceof ReceiveTimeout) {log.debug("Idle for two long, shutting down");context().parent().tell(ProcessorIdle.INSTANCE, self());} else {unhandled(message);}}}enum ProcessorIdle {INSTANCE
}

显然,当Processor在一个小时内未收到任何消息时,它会向其父级( Dispatcher )轻轻发出信号。 但是演员仍然活着,并且只要一个小时后发生交易就可以处理交易。 Dispatcher作用是杀死给定的Processor并将其从地图中删除:

public class Dispatcher extends UntypedActor {private final BiMap<Instrument, ActorRef> instrumentProcessors = HashBiMap.create();public void onReceive(Object message) throws Exception {//...if (message == ProcessorIdle.INSTANCE) {removeIdleProcessor(sender());sender().tell(PoisonPill.getInstance(), self());} else {unhandled(message);}}private void removeIdleProcessor(ActorRef idleProcessor) {instrumentProcessors.inverse().remove(idleProcessor);}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}//...}

不便之处。 instrumentProcessors过去是Map<Instrument, ActorRef> 。 事实证明这是不够的,因为我们突然不得不按值删除此映射中的条目。 换句话说,我们需要找到一个映射到给定ActorRefProcessor )的键( Instrument )。 有多种处理方法(例如,空闲Processor可以发送它处理的Instrumnt ),但是我改用了BiMap<K, V> 。 之所以可以使用它,是因为指定的InstrumentActorRef都是唯一的(每个演员actor)。 使用BiMap我可以简单地对地图进行inverse() (从BiMap<Instrument, ActorRef>BiMap<ActorRef, Instrument>并将ActorRef视为键。

这个Akka例子只不过是“ hello,world ”。 但是与复杂的解决方案相比,我们必须使用并发队列,锁和线程池进行编写,这是完美的。 我的队友非常兴奋,以至于最终他们决定将整个应用程序重写为Akka。

翻译自: https://www.javacodegeeks.com/2014/06/simplifying-trading-system-with-akka.html

akka使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/341738.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浅析HDMI1.4光纤延长器的工作原理和应用领域

HDMI光纤延长器是用来延长信号的传输器件&#xff0c;解决HDMI音视频信号无法远距离传输的问题&#xff0c;且保证信号传输的质量。那么&#xff0c;HDMI光纤延长器的工作原理是什么&#xff1f;HDMI光纤延长器有哪些应用呢&#xff1f;接下来就由飞畅科技的小编来为大家详细介…

Java中的模板方法模式

模板方法模式是一种行为模式&#xff0c;建议在超类中更一般地定义算法。 该算法是在称为模板方法的方法中定义的。 子类仅定义更具体的算法步骤的实现。 使用这种设计模式的好处是&#xff0c;算法后面的任何更改只会影响超类中的代码。 此外&#xff0c;它还可以提高代码的可…

什么是单模单纤/双纤光纤收发器?

光纤收发器&#xff0c;是一种将短距离的双绞线电信号和长距离的光信号进行互换的以太网传输媒体转换单元&#xff0c;按其所需主要分为单纤光纤收发器和双纤光纤收发器&#xff0c;接下来我们就来详细介绍下什么是单模单纤/双纤光纤收发器&#xff1f;单模单纤和单模双纤光纤收…

什么是以太网光纤收发器,其产品特点和技术参数都有哪些?

以太网光纤收发器是一款提供以太网数据信号到光纤数据信号的双向透明转换器&#xff0c;可以将以太网信号通过光纤线路传输突破传输距离100米的限制&#xff0c;使得以太网网络覆盖得到极大的延伸。光纤收发器的出现&#xff0c;确保能够顺畅的将电信号与光纤信号相互转换&…

什么是CAN总线中继器?

CAN网桥&#xff08;Bridge&#xff09;是一个智能的中继器。使用CAN网桥对设备进行互连&#xff0c;克服了CAN总线结点个数及通信距离的物理限制&#xff0c;能有效扩充CAN网络的结点总数&#xff0c;延长通信距离。今天&#xff0c;飞畅科技的小编为大家详细介绍下CAN总线中继…

java百里香_百里香Spring测试的意见

java百里香我最近在基于Spring的Web应用程序中转换为thymeleaf以进行视图模板化&#xff0c;而不是jsp。 百里香叶文档中关于为什么百里香叶在jsp上为什么能保持水分的所有争论&#xff0c;我肯定被卖掉了。 除了能够预览模板之外&#xff0c;对我来说&#xff0c;主要原因之一…

一文读懂工业设备的两种通讯方式:现场总线和工业以太网

随着传统制造企业正在加快智能制造转型的进程&#xff0c;工业互联网迅速在全世界范围内兴起。在工业互联网的技术构架中&#xff0c;通过各类通讯方式接入不同设备、系统和产品&#xff0c;来采集海量数据是其重要的一环。本文将重点介绍工业底层设备的两种通讯方式&#xff1…

Java中的中介器设计模式

在本教程中&#xff0c;我们将学习一种行为模式&#xff0c;该行为模式将促进彼此通信的多个对象之间的松散耦合。 Mediator设计模式背后的想法是拥有一个中心对象&#xff0c;该对象封装了一组对象之间的交互方式。 在调解器模式中&#xff0c;我们在称为调解器的单独类中提取…

什么是中国1号信令?

中国1号信令是在电话自动交换网中&#xff0c;我国所用的随路信令的总称&#xff0c;目前在国内长途网和市话中的局间中继线上使用。那么&#xff0c;什么是中国1号信令&#xff1f;中国1号信令的分类又有哪些呢&#xff1f;接下来我们就跟随飞畅科技的小编一起来详细了解下吧&…

如何选购工业级光模块

大家都知道光模块是影响整个网络性能的关键因素&#xff0c;特别是在工业以太网中&#xff0c;网络连接控制的多为大型工业设备&#xff0c;光模块的稳定性尤为重要&#xff0c;那么&#xff0c;我们该如何选购工业级光模块呢&#xff1f;接下来就由飞畅科技的小编来为大家详细…

ejb生命周期_无状态EJB:池化和生命周期

ejb生命周期无状态EJB池和生命周期的概述视图&#xff08;注释&#xff09;。 对新手有用。 。 。 。 。 EJB池&#xff1a;快速概述 EJB实例存储在称为EJB池的位置-这不过是内存中的缓存 。 无状态EJB通常按需实例化&#xff0c;即&#xff0c;当客户端调用Bean上的方法时。…

单E1光端机分类及技术指标详解

单E1光端机是一种将G.703的E1信号调制到光纤上传输的设备。采用大规模集成芯片&#xff0c;电路简单&#xff0c;功耗低&#xff0c;可靠性高&#xff0c;具有完整的告警状态指示和完善的网管功能。那么&#xff0c;单E1光端机分类及技术指标有哪些呢&#xff1f;接下来我们就跟…

将Java类作为子进程运行

我本周需要将Java类&#xff08;而不是jar&#xff09;作为子进程运行。 更准确地说&#xff0c;我想从测试内部产生一个新进程&#xff0c;而不是直接在测试内部运行&#xff08;进程内&#xff09;。 我不认为这是幻想或复杂的事情。 但是&#xff0c;这不是我以前不需要做的…

光猫的分类及应用范围有哪些?

光猫也称为单端口光端机&#xff0c;是针对特殊用户环境而设计的产品&#xff0c;它利用一对光纤进行单E1或单V.35或单10BaseT点到点式的光传输终端设备。该设备作为本地网的中继传输设备&#xff0c;适用于基站的光纤终端传输设备以及租用线路设备。而对于多口的光端机一般会直…

关于光模块用单模光纤和多模光纤小知识

通过对光纤的认知&#xff0c;我们了解到光纤是通过导光来传输信号、不导电、不怕雷击&#xff0c;所以也不需要用接地保护&#xff0c;我们按光在光纤中的传输模式分为&#xff1a;多模光纤和单模光纤。对于我们使用者来说&#xff0c;你把多模和单模名称由来记住就可以了。接…

硬盘序列号示例_序列化代理模式示例

硬盘序列号示例有些书极大地改变了你的生活。 其中一本书是Joshua Bloch撰写的“ Effective Java” 。 在下面您可能会发现一些小的实验&#xff0c;该实验的灵感来自于本书的第11章“串行化”。 假设我们有一个为继承而设计的类&#xff0c;它本身不是可序列化的 &#xff0c…

什么是光纤转换器?光纤转换器转换类别介绍

光纤转换器是RS-232/422/485串行数据通过光纤的远距离传输&#xff0c;可以完成串口到光纤的转换&#xff0c;并且可以延长串行通信信号的传输距离。那么&#xff0c;光纤转换器转换类别有哪些呢&#xff1f;接下来我们就跟随飞畅科技的小编一起来详细了解下吧&#xff01; 光纤…

H2数据库的Spring Boot

在本快速教程中&#xff0c;我们将引导一个由内存H2数据库支持的简单Spring Boot应用程序。 我们将使用Spring Data JPA与我们的数据库进行交互。 项目设置&#xff1a; 首先&#xff0c;让我们使用Spring Initializr生成我们的项目模板&#xff1a; 单击“生成项目”链接后&…

集线器,交换机,路由器工作层次的区别

集线器&#xff08;Hub&#xff09;、交换机&#xff08;Switch&#xff09;与路由器&#xff08;Router&#xff09;号称是网络硬件三剑客&#xff0c;一直以来都是网络界的活跃分子&#xff0c;但让非常多初入网络之门的菜鸟恼火的是&#xff0c;它们三者不仅外观相似&#x…

什么是协议网桥?

网桥&#xff08;Bridge&#xff09;是早期的两端口二层网络设备&#xff0c;用来连接不同网段。网桥的两个端口分别有一条独立的交换信道&#xff0c;不是共享一条背板总线&#xff0c;可隔离冲突域。网桥比集线器&#xff08;Hub&#xff09;性能更好&#xff0c;集线器上各端…