使用Akka简化交易系统

我的同事正在开发一种交易系统,该系统可以处理大量的传入交易。 每笔交易都涵盖一种Instrument (例如债券或股票),并且具有某些(现在)不重要的属性。 他们坚持使用Java(<8),所以我们坚持下去:

class Instrument implements Serializable, Comparable<Instrument> {private final String name;public Instrument(String name) {this.name = name;}//...Java boilerplate}public class Transaction {private final Instrument instrument;public Transaction(Instrument instrument) {this.instrument = instrument;}//...Java boilerplate}

Instrument稍后将用作HashMap的键,因此将来我们会主动实现Comparable<Instrument> 。 这是我们的领域,现在的要求是:

  1. 交易进入系统,需要尽快处理(无论如何)
  2. 我们可以按任何顺序自由处理它们
  3. …但是,同一种工具的交易需要按照进来时的顺序完全相同地顺序进行。

最初的实现很简单–将所有传入的事务放入一个使用方的队列(例如ArrayBlockingQueue )中。 这满足了最后的要求,因为队列在所有事务中都保留了严格的FIFO顺序。 但是,这种架构阻止了针对不同工具的不相关交易的并发处理,从而浪费了令人信服的吞吐量提高。 毫无疑问,这种实现尽管很简单,却成为了瓶颈。

第一个想法是以某种方式分别按工具和流程工具拆分传入的交易。 我们提出了以下数据结构:

priavate final ConcurrentMap<Instrument, Queue<Transaction>> queues = new ConcurrentHashMap<Instrument, Queue<Transaction>>();public void accept(Transaction tx) {final Instrument instrument = tx.getInstrument();if (queues.get(instrument) == null) {queues.putIfAbsent(instrument, new LinkedBlockingQueue<Transaction>());}final Queue<Transaction> queue = queues.get(instrument);queue.add(tx);
}

! 但是最坏的时刻还没有到来。 您如何确保最多一个线程一次处理每个队列? 毕竟,否则,两个线程可以从一个队列(一种仪器)中提取项目并以相反的顺序处理它们,这是不允许的。 最简单的情况是每个队列都有一个Thread -这无法扩展,因为我们期望成千上万种不同的工具。 因此,我们可以说N线程,让每个线程处理队列的一个子集,例如instrument.hashCode() % N告诉我们哪个线程负责处理给定的队列。 但是由于以下三个原因,它仍然不够完美:

  1. 一个线程必须“观察”许多队列(很可能是忙等待),并始终对其进行遍历。 或者,队列可能以某种方式唤醒其父线程
  2. 在最坏的情况下,所有工具都将具有冲突的哈希码,仅针对一个线程-这实际上与我们最初的解决方案相同
  3. 这只是该死的复杂! 漂亮的代码并不复杂!

实现这种怪异是可能的,但是困难且容易出错。 此外,还有另一个非功能性的要求:仪器来来往往,随着时间的流逝,成千上万的仪器。 一段时间后,我们应删除代表最近未见过的仪器的地图条目。 否则我们会发生内存泄漏。

如果您能提出一些更简单的解决方案,请告诉我。 同时,让我告诉你我对同事的建议。 如您所料,它是Akka –结果非常简单。 我们需要两种角色: DispatcherProcessorDispatcher有一个实例,并接收所有传入的事务。 它的责任是为每个Instrument找到或生成工作Processor角色,并向其推送事务:

public class Dispatcher extends UntypedActor {private final Map<Instrument, ActorRef> instrumentProcessors = new HashMap<Instrument, ActorRef>();@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {dispatch(((Transaction) message));} else {unhandled(message);}}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}private ActorRef findOrCreateProcessorFor(Instrument instrument) {final ActorRef maybeActor = instrumentProcessors.get(instrument);if (maybeActor != null) {return maybeActor;} else {final ActorRef actorRef = context().actorOf(Props.create(Processor.class), instrument.getName());instrumentProcessors.put(instrument, actorRef);return actorRef;}}
}

这很简单。 由于我们的Dispatcher actor实际上是单线程的,因此不需要同步。 我们几乎没有收到Transaction ,查找或创建Processor并进一步传递Transaction 。 这是Processor实现的样子:

public class Processor extends UntypedActor {private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {process(((Transaction) message));} else {unhandled(message);}}private void process(Transaction tx) {log.info("Processing {}", tx);}
}

而已! 有趣的是,我们的Akka实现几乎与我们第一个使用队列映射的想法相同。 毕竟,参与者只是一个队列,还有一个(逻辑)线程在该队列中处理项目。 区别在于:Akka管理有限的线程池,并可能在成千上万的参与者之间共享它。 而且,由于每个工具都有其专用(和“单线程”)执行器,因此可以保证每个工具的事务顺序处理。

还有一件事。 如前所述,有大量的乐器,我们不想让演员出现一段时间了。 假设如果Processor在一个小时内未收到任何交易,则应停止并收集垃圾。 如果以后我们收到此类工具的新交易,则可以随时重新创建它。 这是一个非常棘手的问题–我们必须确保,如果处理器决定删除自身时,如果事务到达,我们将无法松开该事务。 Processor没有停止自身,而是向其父Processor发出空闲时间过长的信号。 然后, Dispatcher将发送PoisonPill到它。 因为ProcessorIdleTransaction消息都是顺序处理的,所以没有交易发送到不再存在的参与者的风险。

每个setReceiveTimeout通过使用setReceiveTimeout安排超时来独立地管理其生命周期:

public class Processor extends UntypedActor {@Overridepublic void preStart() throws Exception {context().setReceiveTimeout(Duration.create(1, TimeUnit.HOURS));}@Overridepublic void onReceive(Object message) throws Exception {//...if (message instanceof ReceiveTimeout) {log.debug("Idle for two long, shutting down");context().parent().tell(ProcessorIdle.INSTANCE, self());} else {unhandled(message);}}}enum ProcessorIdle {INSTANCE
}

显然,当Processor在一个小时内未收到任何消息时,它会向其父级( Dispatcher )轻轻发出信号。 但是演员仍然活着,并且如果交易恰好在一小时后发生,便可以处理。 Dispatcher作用是杀死给定的Processor并将其从地图中删除:

public class Dispatcher extends UntypedActor {private final BiMap<Instrument, ActorRef> instrumentProcessors = HashBiMap.create();public void onReceive(Object message) throws Exception {//...if (message == ProcessorIdle.INSTANCE) {removeIdleProcessor(sender());sender().tell(PoisonPill.getInstance(), self());} else {unhandled(message);}}private void removeIdleProcessor(ActorRef idleProcessor) {instrumentProcessors.inverse().remove(idleProcessor);}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}//...}

不便之处。 instrumentProcessors过去是Map<Instrument, ActorRef> 。 事实证明这是不够的,因为我们突然不得不按值删除此映射中的条目。 换句话说,我们需要找到一个映射到给定ActorRefProcessor )的键( Instrument )。 有多种处理方法(例如,空闲的Processor可以发送它处理的Instrumnt ),但是我改用了BiMap<K, V> 。 之所以起作用,是因为指定的InstrumentActorRef都是唯一的(每个乐器的actor)。 使用BiMap我可以简单地对地图进行inverse() (从BiMap<Instrument, ActorRef>BiMap<ActorRef, Instrument>并将ActorRef视为键。

这个Akka例子只不过是“ hello,world ”。 但是与卷积解决方案相比,我们必须使用并发队列,锁和线程池进行编写,这是完美的。 我的队友非常兴奋,以至于最终他们决定将整个应用程序重写为Akka。

翻译自: https://www.javacodegeeks.com/2014/06/simplifying-trading-system-with-akka.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/363751.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Python】贪心算法入门

一.引言 本文将通过两个问题和两道例题带你入门贪心算法。 贪心算法&#xff08;Greedy Algorithm&#xff09;是一种在每一步选择中都采取在当前状态下最优&#xff08;最好或最有利&#xff09;的选择&#xff0c;从而希望导致全局最优解的算法。贪心算法不保证找到全局最优…

ASP.NET MVC+LINQ开发一个图书销售站点(9):编辑目录

编辑目录和新建类似&#xff0c;这里我们用MVC提供的辅助类 1.在Model 的BookShopDBDataContext分部类里添加: 2. 在CategoryController添加如下方法(注意&#xff1a;我们添加了后端验证) 3. 修改View下的EditCategory.aspx. (注意&#xff1a;我们用了MVC提供的辅助类生成Tex…

Jenkins+maven环境部署

选择使用tomcat下运行jenkins项目&#xff0c;安装步骤如下 1. 安装tomcat&#xff0c;查看想要下载的版本 https://mirrors.cnnic.cn/apache/tomcat/ wget https://mirrors.cnnic.cn/apache/tomcat/tomcat-9/v9.0.7/bin/apache-tomcat-9.0.7.tar.gz 2. 安装jdk wget --no-c…

内外边距、浮动、布局相关

关于清除元素的内外边距&#xff1a; 1、行内元素只有左右边距、没有内外边距、内边距在ie6等低版本的浏览器中也会有问题。尽量不要给元素指定行内的内外边距&#xff1b; 2、外边距的合并 使用margin定义块元素的垂直外边距时&#xff0c;可能会出现外边距的合并&#xff…

android根据mac地址连接耳机,Android获取设备IMEI和Mac地址

释放双眼&#xff0c;带上耳机&#xff0c;听听看~&#xff01;public static boolean checkPermission(Context context, String permission) {boolean result false;if (Build.VERSION.SDK_INT > 23) {try {Class> clazz Class.forName("android.content.Contex…

让vs2008支持jQuery的智能提示!

告诉大家一个非常好的消息&#xff0c;就是现在我们已可以让VS2008同时支持jQuery的智能提示功能啦可以先看看下面的效果图&#xff1a;jquery1.png (18.76 K)2008-3-30 14:37:54jquery2.png (21.18 K)2008-3-30 14:37:54怎样&#xff1f;酷吧&#xff0c;呵呵想实现以上效果只…

编写干净的测试–提防魔术

很难为干净的代码找到一个好的定义&#xff0c;因为我们每个人都有自己的单词clean的定义。 但是&#xff0c;有一个似乎是通用的定义&#xff1a; 干净的代码易于阅读。 这可能会让您感到有些惊讶&#xff0c;但是我认为该定义也适用于测试代码。 使测试尽可能具有可读性是我…

Eclipse创建Java Web项目及基本配置

https://www.cnblogs.com/zzlback/p/8552622.html转载于:https://www.cnblogs.com/aiyowei/p/10428638.html

为什么要使用Vuex?

为什么要使用Vuex? 1. 假如不使用 1.1 父子组件依赖同一个state 1.2 兄弟组件依赖同一个state 2. 用了Vuex之后 3. 方便记忆和理解 更多专业前端知识&#xff0c;请上 【猿2048】www.mk2048.com

十个习惯让你精通新的开发技术

Ben Watson&#xff0c;知名开发者。任职于GeoEye&#xff0c;是其所属开发团队的领导者。本文发表于他自己的博客&#xff0c;阐述了十种学习新技术的方法。 1、要看书 在成千上万的编程图书中&#xff0c;可能很大一部分根本毫无用处。但是仍然有很多图书对你的(编程)能力有很…

基于android平台的24点游戏设计与实现需求分析,基于Android平台的24点游戏设计与实现需求分析_毕业设计论文.doc...

基于Android平台的24点游戏设计与实现摘要随着移动设备的普及以及移动设备的硬件的提升&#xff0c;移动设备的功能越来越完善&#xff0c;移动设备的系统平台也日渐火热起来。目前国内最常见的移动开发平台有Symbian&#xff0c;iPhone&#xff0c;Windows Phone以及当下正在逐…

序列化代理模式示例

有些书可以极大地改变您的生活。 其中一本是Joshua Bloch撰写的“ Effective Java” 。 在下面您可能会发现一些小的实验&#xff0c;该实验的灵感来自于本书的第11章“串行化”。 假设我们有一个为继承而设计的类&#xff0c;它本身不是可序列化的 &#xff0c;并且没有无参数…

fit_transform和transform的区别

部分转载 https://blog.csdn.net/weixin_38278334/article/details/82971752 https://www.cnblogs.com/summer-nude/p/7380694.html 写在前面fit和transform没有任何关系&#xff0c;仅仅是数据处理的两个不同环节&#xff0c;之所以出来fit_transform这个函数名&#xff0c;仅…

使用注解配置Spring

使用注解配置Spring 1.为主配置文件引入新的命名空间(约束) 2.开启使用注解代理配置文件 3.在类中使用注解完成配置 将对象注册到容器 修改对象的作用范围 值类型注入 引用类型注入 注意: 初始化|销毁方法 转载于:https://www.cnblogs.com/HiJackykun/p/10428728.html

android监控指纹信息变化,android监听指纹变化(解决反射思路在android10不生效的问题)...

前天偶尔运行代码&#xff0c;一个段异常映入眼帘&#xff0c;我擦android10上反射机制监听不到指纹id等数据了&#xff0c;原因是android10彻底抛弃了之前指纹的api。所以反射不到了。怎么解决这个问题&#xff1f;我们换个思路当然反射依然可以&#xff0c;不过你需要在andro…

[转载]数据结构笔试题基础

第一章 数据结构与算法 一.算法的基本概念计算机解题的过程实际上是在实施某种算法&#xff0c;这种算法称为计算机算法。 1.算法的基本特征&#xff1a;可行性&#xff0c;确定性&#xff0c;有穷性&#xff0c;拥有足够的情报。 2.算法的基本要素&#xff1a;算法中对数据的运…

random_state ---summary

1-简介 random_state 相当于随机数种子random.seed() 。random_state 与 random seed 作用是相同的。可参考&#xff1a;https://www.jianshu.com/p/4deb2cb2502f 对模型没有影响&#xff0c;但是对于一些进行随机选择的过程有影响。比如随机拆分训练集和测试集。随机种子一致的…

基于cookie的SSO单点登录系统

利用COOKIE实现单点登录功能 近期公司要求帮一个项目实现单点登录功能&#xff0c;在综合考量下决定采用cookie实现&#xff0c;大概的流程如下图所&#xff1a; 转载于:https://www.cnblogs.com/buggeerWang/p/10430770.html

js的栈与堆

JavaScript中基本数据类型和引用数据类型的区别 这是我引用别人的 觉得很好 1、基本数据类型和引用数据类型 ECMAScript包括两个不同类型的值&#xff1a;基本数据类型和引用数据类型。 基本数据类型指的是简单的数据段&#xff0c;引用数据类型指的是有多个值构成的对象。 当…

休眠调试–查找查询的来源

Hibernate为什么在程序的哪个部分以及在哪个部分中生成给定的SQL查询并不总是立即的&#xff0c;尤其是当我们处理的是我们自己编写的代码时。 这篇文章将介绍如何配置Hibernate查询日志记录&#xff0c;并将其与其他技巧一起使用&#xff0c;以找出在程序中执行给定查询的原因…