akka使用
我的同事正在开发一种交易系统,该系统可以处理大量的传入交易。 每笔交易都涵盖一种Instrument
(例如债券或股票),并且具有某些(现在)不重要的属性。 他们坚持使用Java(<8),所以我们坚持下去:
class Instrument implements Serializable, Comparable<Instrument> {private final String name;public Instrument(String name) {this.name = name;}//...Java boilerplate}public class Transaction {private final Instrument instrument;public Transaction(Instrument instrument) {this.instrument = instrument;}//...Java boilerplate}
Instrument
稍后将用作HashMap
的键,因此将来我们会主动实现Comparable<Instrument>
。 这是我们的领域,现在的要求是:
- 交易进入系统,需要尽快处理(无论如何)
- 我们可以自由地以任何顺序处理它们
- …但是,同一种工具的交易需要按照进来时完全相同的顺序进行处理。
最初的实现很简单–将所有传入的事务放入一个使用者的队列(例如ArrayBlockingQueue
)中。 这满足了最后一个要求,因为队列在所有事务中保留了严格的FIFO顺序。 但是,这样的架构阻止了针对不同工具的不相关交易的并发处理,从而浪费了令人信服的吞吐量提高。 毫无疑问,这种实现尽管很简单,却成为了瓶颈。
第一个想法是以某种方式分别按工具和过程工具拆分传入的交易。 我们提出了以下数据结构:
priavate final ConcurrentMap<Instrument, Queue<Transaction>> queues = new ConcurrentHashMap<Instrument, Queue<Transaction>>();public void accept(Transaction tx) {final Instrument instrument = tx.getInstrument();if (queues.get(instrument) == null) {queues.putIfAbsent(instrument, new LinkedBlockingQueue<Transaction>());}final Queue<Transaction> queue = queues.get(instrument);queue.add(tx);
}
! 但是最坏的时刻还没有到来。 您如何确保最多一个线程一次处理每个队列? 毕竟,否则,两个线程可以从一个队列(一种工具)中提取项目,并以相反的顺序处理它们,这是不允许的。 最简单的情况是每个队列都有一个Thread
-这无法扩展,因为我们期望成千上万种不同的工具。 因此,我们可以说N
线程,让每个线程处理队列的一个子集,例如instrument.hashCode() % N
告诉我们哪个线程负责处理给定的队列。 但是由于以下三个原因,它仍然不够完美:
- 一个线程必须“观察”许多队列(很可能是忙于等待),并始终对其进行遍历。 或者,队列可能以某种方式唤醒其父线程
- 在最坏的情况下,所有工具都将具有冲突的哈希码,仅针对一个线程-这实际上与我们最初的解决方案相同
- 这只是该死的复杂! 漂亮的代码并不复杂!
实现这种怪异是可能的,但是困难且容易出错。 此外,还有另一个非功能性的要求:仪器来来往往,随着时间的流逝,成千上万的仪器。 一段时间后,我们应删除代表最近未见过的仪器的地图条目。 否则我们会发生内存泄漏。
如果您能提出一些更简单的解决方案,请告诉我。 同时,让我告诉你我对同事的建议。 如您所料,它是Akka –结果非常简单。 我们需要两种角色: Dispatcher
和Processor
。 Dispatcher
有一个实例,并接收所有传入的事务。 它的职责是为每个Instrument
找到或生成工作Processor
角色,并将交易推向该角色:
public class Dispatcher extends UntypedActor {private final Map<Instrument, ActorRef> instrumentProcessors = new HashMap<Instrument, ActorRef>();@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {dispatch(((Transaction) message));} else {unhandled(message);}}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}private ActorRef findOrCreateProcessorFor(Instrument instrument) {final ActorRef maybeActor = instrumentProcessors.get(instrument);if (maybeActor != null) {return maybeActor;} else {final ActorRef actorRef = context().actorOf(Props.create(Processor.class), instrument.getName());instrumentProcessors.put(instrument, actorRef);return actorRef;}}
}
这很简单。 由于我们的Dispatcher
actor实际上是单线程的,因此不需要同步。 我们几乎没有收到Transaction
,查找或创建Processor
并进一步传递Transaction
。 这是Processor
实现的样子:
public class Processor extends UntypedActor {private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {process(((Transaction) message));} else {unhandled(message);}}private void process(Transaction tx) {log.info("Processing {}", tx);}
}
而已! 有趣的是,我们的Akka实现几乎与我们第一个使用队列映射的想法相同。 毕竟,参与者只是一个队列,还有一个(逻辑)线程处理该队列中的项目。 区别在于:Akka管理有限的线程池,并可能在成千上万的参与者之间共享它。 而且,由于每个工具都有其专用(和“单线程”)执行器,因此可以保证每个工具的事务的顺序处理。
还有一件事。 如前所述,有大量的乐器,我们不想让演员们出现一段时间了。 假设如果Processor
在一个小时内未收到任何交易,则应停止并收集垃圾。 如果以后我们收到此类工具的新交易,则可以随时重新创建它。 这是一个非常棘手的问题–我们必须确保,如果在处理器决定删除自身时交易到达,我们就不能放弃该交易。 Processor
没有停止自身,而是向其父Processor
发出空闲时间过长的信号。 然后, Dispatcher
将发送PoisonPill
到它。 因为ProcessorIdle
和Transaction
消息都按顺序处理,所以没有交易发送到不再存在的参与者的风险。
每个setReceiveTimeout
通过使用setReceiveTimeout
安排超时来独立地管理其生命周期:
public class Processor extends UntypedActor {@Overridepublic void preStart() throws Exception {context().setReceiveTimeout(Duration.create(1, TimeUnit.HOURS));}@Overridepublic void onReceive(Object message) throws Exception {//...if (message instanceof ReceiveTimeout) {log.debug("Idle for two long, shutting down");context().parent().tell(ProcessorIdle.INSTANCE, self());} else {unhandled(message);}}}enum ProcessorIdle {INSTANCE
}
显然,当Processor
在一个小时内未收到任何消息时,它会向其父级( Dispatcher
)轻轻发出信号。 但是演员仍然活着,并且只要一个小时后发生交易就可以处理交易。 Dispatcher
作用是杀死给定的Processor
并将其从地图中删除:
public class Dispatcher extends UntypedActor {private final BiMap<Instrument, ActorRef> instrumentProcessors = HashBiMap.create();public void onReceive(Object message) throws Exception {//...if (message == ProcessorIdle.INSTANCE) {removeIdleProcessor(sender());sender().tell(PoisonPill.getInstance(), self());} else {unhandled(message);}}private void removeIdleProcessor(ActorRef idleProcessor) {instrumentProcessors.inverse().remove(idleProcessor);}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}//...}
不便之处。 instrumentProcessors
过去是Map<Instrument, ActorRef>
。 事实证明这是不够的,因为我们突然不得不按值删除此映射中的条目。 换句话说,我们需要找到一个映射到给定ActorRef
( Processor
)的键( Instrument
)。 有多种处理方法(例如,空闲Processor
可以发送它处理的Instrumnt
),但是我改用了BiMap<K, V>
。 之所以可以使用它,是因为指定的Instrument
和ActorRef
都是唯一的(每个演员actor)。 使用BiMap
我可以简单地对地图进行inverse()
(从BiMap<Instrument, ActorRef>
到BiMap<ActorRef, Instrument>
并将ActorRef
视为键。
这个Akka例子只不过是“ hello,world ”。 但是与复杂的解决方案相比,我们必须使用并发队列,锁和线程池进行编写,这是完美的。 我的队友非常兴奋,以至于最终他们决定将整个应用程序重写为Akka。
翻译自: https://www.javacodegeeks.com/2014/06/simplifying-trading-system-with-akka.html
akka使用