MongoDB是一个非常出色的“ NoSQL”数据库,具有广泛的应用程序。 在SoftwareMill开发的一个项目中,我们将其用作复制的事件存储,然后将事件从事件流传输到其他组件。
介绍
基本思想非常简单(另请参阅Martin Fowler关于Event Sourcing的文章)。 我们的系统生成一系列事件。 这些事件将保留在事件存储中。 系统中的其他组件遵循事件流并对其进行“处理”。 例如,可以将它们汇总并写入报告数据库(另一方面,它类似于CQRS )。 这种方法有很多优点:
- 事件的读取和写入是解耦的(异步的)
- 鉴于它没有死得太久,任何后续组件都可能死亡,然后“追赶”
- 可能有多个关注者。 跟随者可以从从属副本读取数据,以获得更好的可伸缩性
- 事件活动的爆发对事件接收器的影响减少; 最坏的情况下,报告生成速度会变慢
这里的关键组件当然是快速可靠的事件存储。 我们用来实现一个的MongoDB的三个关键功能是:
- 上限集合和尾部游标
- 快速收集附件
- 复制集
采集
作为基础,我们使用有上限的集合 ,根据定义,该集合受大小限制。 如果编写新事件将导致集合超出大小限制,则最早的事件将被覆盖。 这给了我们类似于事件的循环缓冲区的功能。 (此外,我们也很安全地避免了磁盘空间不足错误。)
在2.2版之前,默认情况下,上限集合没有_id字段(因此没有索引)。 但是,由于我们希望事件能够在整个副本集上可靠地写入,因此_id字段及其上的索引都是必需的。
写作活动
编写事件是一个简单的Mongo插入操作; 插入也可以分批完成。 根据我们对事件丢失的容忍度,我们可能会使用各种Mongo 写入问题 (例如,等待来自单节点或多个节点的写入确认)。
所有事件都是不可变的。 除了更好的,线程安全的Java代码外,这是事件流的必要条件。 如果事件是可变的,事件接收器将如何知道更新的内容? 而且,这对Mongo的性能有很好的影响。 由于永远不会更改数据,因此写入磁盘的文档永远不会缩小或扩展,因此无需在磁盘上移动块。 实际上,在具有上限的集合中,Mongo不允许增长曾经编写的文档。
阅读活动
读取事件流要复杂一些。 首先,可能有多个阅读器,每个阅读器在流中具有不同的进度。 其次,如果流中没有事件,我们希望读者等待一些事件可用,并避免主动轮询。 最后,我们想分批处理事件,以提高性能。
有尾游标可以解决这些问题。 要创建这样的游标,我们必须提供一个起点–事件的ID,我们将从该事件开始读取; 如果未提供ID,则光标将返回最早的可用事件。 因此,每个读取器必须存储它已读取和处理的最后一个事件。
更重要的是,如果没有新数据可用,可尾光标可以有选择地阻塞一段时间,从而解决了主动轮询问题。
(顺便说一下,mongo用于在副本集之间复制数据的oplog集合也是一个有上限的集合。从属Mongo实例在该集合后面尾随,流式传输“事件”(即数据库操作),并按顺序在本地应用它们。 )
读取Java中的事件
使用Mongo Java驱动程序时 ,有一些“问题”。 首先,您需要初始化游标。 为此,我们需要提供(1)最后一个事件ID(如果存在); (2)我们要读取事件的顺序(此处为自然顺序,即插入顺序); (3)两个关键的游标选项,我们希望游标是可拖尾的,并且如果没有新数据,我们希望将其阻止:
DBObject query = lastReceivedEventId.isPresent()? BasicDBObjectBuilder.start('_id', BasicDBObjectBuilder.start('$gte', lastReceivedEventId.get()).get()).get(): null;DBObject sortBy = BasicDBObjectBuilder.start('$natural', 1).get();DBCollection collection = ... // must be a capped collection
DBCursor cursor = collection.find(query).sort(sortBy).addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);
您可能想知道为什么我们使用>= last_id
而不是>
。 由于生成Mongo ObjectId的方式在这里需要。 如果使用一个简单的> last_id
我们可能会错过一些与last_id
事件在同一秒之后但之后发生的事件。 这也意味着我们的Java代码必须处理这一事实,并丢弃收到的第一个事件。
游标的类扩展了基本的Java Iterator
接口,因此非常易于使用。 因此,现在我们可以进行批处理了。 在游标上进行迭代时,驱动程序将批量从Mongo服务器接收数据; 因此我们可以像调用其他迭代器一样简单地调用hasNext()
和next()
来接收后续元素,并且只有某些调用会真正导致与服务器的网络通信。
在Mongo Java驱动程序中,实际上可能阻塞的hasNext()
是hasNext()
。 如果我们要分批处理事件,我们需要(a)只要有可用的元素就读取它们,并且(b)在被阻止没有更多事件之前有某种了解的方式,并且我们可以处理事件已经批处理。 由于hasNext()
可以阻止,因此我们无法直接执行此操作。
这就是为什么我们引入了中间队列( LinkedBlockingQueue
)的原因。 在单独的线程中,从游标读取的事件在到达时即被放入队列中。 如果没有事件,则线程将在cursor.hasNext()
上cursor.hasNext()
。 阻塞队列有一个可选的大小限制,因此,如果队列已满,则放置一个元素也将阻塞,直到有可用空间为止。 在事件消费者线程中,我们首先尝试以阻塞方式(使用.poll
从队列中读取单个元素,因此我们在这里等待所有事件可用。 然后,我们尝试将队列的全部内容消耗到一个临时集合中(使用.drainTo
,构建批处理,并可能获取0个元素,但我们始终拥有第一个)。
值得一提的是,如果集合为空,则Mongo不会阻止,因此我们必须回到主动轮询。 我们还必须考虑到游标可能会在等待期间死亡的事实。 要对此进行检查,我们应该验证cursor.getCursorId() != 0
,其中0是“死光标”的ID。 在这种情况下,我们只需要重新实例化游标即可。
加起来
综上所述,我们得到了一个非常快速的事件源/流解决方案。 从某种意义上说,这是“自我调节”,即如果事件活动达到高峰,则事件接收器将大批量读取这些事件。 如果事件活动少,则将分批快速处理它们。
我们还将同一个Mongo实例用于其他目的。 从操作角度来看,拥有一个数据库系统来聚簇和维护常规数据和事件肯定是一件好事。
参考: Adam Warski博客的Blog中来自我们的JCG合作伙伴 Adam Warski的MongoDB事件流 。
翻译自: https://www.javacodegeeks.com/2012/11/event-streaming-with-mongodb.html