使用storm 实时计算
在本文中,我将说明如何借助Storm框架以可扩展且无锁定的方式在数据库中维护实时事件驱动流程的当前状态。
Storm是基于事件的数据处理引擎。 它的模型依赖于基本原语,例如事件转换,过滤,聚合……,我们将它们组合成拓扑 。 拓扑的执行通常分布在多个节点上,并且风暴群集还可以并行执行给定拓扑的多个实例。 因此,在设计时,必须牢记哪些Storm原语在分区范围内执行,即在一个群集节点的级别上执行,以及哪些在群集范围内执行(又称为重新分区操作) ,因为它们涉及将事件从中移出的网络流量。分区到分区)。 Storm Trident API文档明确提到了哪些功能做什么,作用范围如何。 Storm的分区概念与Kafka队列的分区概念保持一致, Kafka队列是入站事件的常见来源。
拓扑通常需要维护一些执行的持续状态。 例如,这可以是一些传感器值的滑动窗口平均值,从推文中提取的近期情绪,在不同位置出现的人数。……由于某些状态更新操作具有分区范围(例如partitionAggregate ),因此可伸缩性模型在这里尤为重要。其他则具有集群范围(例如groupby + perstitentAggregate的组合)。 这篇文章中说明了这一点。
示例代码在githup上可用 。 它基于Storm 0.8.2,Cassandra 1.2.5和JDK 1.7.0。 请注意,该示例未包含适当的错误处理:喷口或螺栓均不支持重试失败的元组,我将在以后的文章中解决。 另外,我使用Java序列化将数据存储在元组中,因此,即使Storm支持多种语言,我的示例也是特定于Java的。
实际示例:出席事件
我的示例是模拟一个跟踪人们在建筑物内位置的系统。 每当用户进入或离开房间时,每个房间入口处的传感器都会发出如下事件:
{"eventType": "ENTER", "userId": "John_5", "time": 1374922058918, "roomId": "Cafetaria", "id": "bf499c0bd09856e7e0f68271336103e0A", "corrId": "bf499c0bd09856e7e0f68271336103e0"}
{"eventType": "ENTER", "userId": "Zoe_15", "time": 1374915978294, "roomId": "Conf1", "id": "3051649a933a5ca5aeff0d951aa44994A", "corrId": "3051649a933a5ca5aeff0d951aa44994"}
{"eventType": "LEAVE", "userId": "Jenny_6", "time": 1374934783522, "roomId": "Conf1", "id": "6abb451d45061968d9ca01b984445ee8B", "corrId": "6abb451d45061968d9ca01b984445ee8"}
{"eventType": "ENTER", "userId": "Zoe_12", "time": 1374921990623, "roomId": "Hall", "id": "86a691490fff3fd4d805dce39f832b31A", "corrId": "86a691490fff3fd4d805dce39f832b31"}
{"eventType": "LEAVE", "userId": "Marie_11", "time": 1374927215277, "roomId": "Conf1", "id": "837e05916349b42bc4c5f65c0b2bca9dB", "corrId": "837e05916349b42bc4c5f65c0b2bca9d"}
{"eventType": "ENTER", "userId": "Robert_8", "time": 1374911746598, "roomId": "Annex1", "id": "c461a50e236cb5b4d6b2f45d1de5cbb5A", "corrId": "c461a50e236cb5b4d6b2f45d1de5cbb5"}
对(“ ENTER”和“ LEAVE”)对中的每个事件与一个房间内一个用户的一个占用时间段相对应。 这可能对传感器提出了很多要求,但是出于本示例的目的,这使我的生活更加轻松 。
为了使事情变得有趣,让我们想象一下,不能保证到达我们服务器的事件遵循时间顺序(请参见生成事件的python脚本中的shuffle()调用)。
我们将构建一个Storm拓扑,该拓扑将构建每个房间的每分钟每分钟的占用时间线,如本文结尾处的时间图所示。 在数据库中,房间时间线被切成一个小时的时间段,这些时间段被独立存储和更新。 这是Cafetaria占用1小时的示例:
{"roomId":"Cafetaria","sliceStartMillis":1374926400000,"occupancies":[11,12,12,12,13,15,15,14,17,18,18,19,20,22,22,22,21,22,23,25,25,25,28,28,33,32,31,31,29,28,27,27,25,
22,22,21,20,19,19,19,17,17,16,16,15,15,16,15,14,13,13,12,11,10,9,11,10,9,11,10]}
为了实现这一点,我们的拓扑需要:
- 根据correlationID重新组合“ ENTER”和“ LEAVE”事件,并为此用户在此房间中产生相应的存在时间
- 将每个在场期间的影响应用于房间入住时间表
顺便说一句,Cassandra提供了Counter列 ,尽管我可以很好地替代它们,但我在这里不使用它们。 但是,我的目的是说明Storm功能,即使它会使方法有些虚构。
分组依据/ persistentAggregate / iBackingMap说明
在查看示例代码之前,让我们澄清一下这些“三叉戟风暴”原语如何协同工作。
想象一下,我们从上午9:47到上午10:34收到了两个描述用户在roomA中存在的事件。 更新会议室的时间表需要:
- 从数据库加载两个受影响的时间轴切片:[9.00am,10:00 am]和[10.00am,11:00 am]
- 在这两个时间轴切片中添加此用户的状态
- 将它们保存到数据库
但是,像这样天真地实现此目标并不是最佳选择,首先是因为它每个事件使用两个DB请求,其次是因为这种“读取-更新-写入”序列通常需要一种锁定机制,这种锁定机制通常无法很好地扩展。
为了解决第一点,我们想为几个事件重新组合数据库操作。 在Storm中,事件(或元组 )被成批处理。 IBackingMap是一个我们可以实现的原语,它使我们可以立即查看整批元组。 我们将使用它在批处理的开始(multiget)和结束时的所有DB-write操作(multiput)重新分组。 但是,multiget不允许我们查看元组本身,而只能查看“查询键”,这是根据元组内容计算出来的,如下所述。
原因在于上面提到的关于天真的实现的第二点:我们想并行执行几个[multiget +更新逻辑+ multiput]流,而不依赖锁。 这是通过确保那些并行子流程更新不相交的数据集来实现的。 这就要求定义拆分成并行流的拓扑元素还控制每个流内DB中要加载和更新的数据。 该元素是Storm groupBy原语:它通过按字段值对元组进行分组来定义拆分,并且它通过将“ groupedBy”值作为对multiget的查询关键字来控制每个并行流更新的数据。
下图在房间占用示例中对此进行了说明(简化为每个房间仅存储一个时间线,而不是每个一小时的时间片一个时间线):
但是,并行性并没有完全发生(例如,当前的Storm实现在分组流中依次调用每个reducer / combiner),但这是设计拓扑时要牢记的一个好模型。
有趣的是,在groupBy和multiget之间发生了一些Storm魔术。 回想一下,Storm旨在进行大规模分布,这意味着每个流在多个节点上并行执行,从诸如Hadoop HDFS或分布式Kafka队列之类的分布式数据源获取输入数据。 这意味着groupBy()同时在多个节点上执行,所有可能处理的事件都需要组合在一起。 groupBy是一个重新分区操作 ,可确保将所有需要分组的事件发送到同一节点,并由IBackingMap +组合器或约简器的同一实例处理,因此不会发生争用情况。
同样,Storm要求我们将IBackingMap包装到可用的Storm MapState原语(或我们自己的原语)之一中,通常用于处理失败/重播的元组。 如上所述,我不在本文中讨论这一方面。
使用这种方法,我们必须实现IBackingMap,以便它尊重以下属性:
- 对于不同的键值,由multiget读取和由IBackingMap的multiput操作写入的数据库行必须是不同的。
我想这就是他们将这些值称为“关键”的原因 (尽管任何尊重此属性的方法都可以)。
回到例子
让我们看看这在实践中是如何工作的。 该示例的主要拓扑在此处可用:
// reading events
.newStream("occupancy", new SimpleFileStringSpout("data/events.json", "rawOccupancyEvent"))
.each(new Fields("rawOccupancyEvent"), new EventBuilder(), new Fields("occupancyEvent"))
第一部分只是读取JSON格式的输入事件(我正在使用简单的文件输出),对它们进行反序列化,然后使用Java序列化将它们放入称为“ occupancyEvent”的元组字段中。 这些元组中的每一个都描述了用户在房间内或房间外的“ ENTER”或“ LEAVE”事件。
// gathering "enter" and "leave" events into "presence periods"
.each(new Fields("occupancyEvent"), new ExtractCorrelationId(), new Fields("correlationId"))
.groupBy(new Fields("correlationId"))
.persistentAggregate( PeriodBackingMap.FACTORY, new Fields("occupancyEvent"), new PeriodBuilder(), new Fields("presencePeriod"))
.newValuesStream()
当我们遇到correlationId的不同值时,groupBy原语会创建尽可能多的元组组(这可能意味着很多,因为通常最多两个事件具有相同的correlationId)。 当前批处理中具有相同相关ID的所有元组将重新组合在一起,并且一组或几组元组将一起呈现给persistentAggregate中定义的元素。 PeriodBackingMap是IBackingMap的实现,其中实现了multiget方法,该方法将接收下一步将要处理的元组组的所有相关ID(例如:{“ roomA”,“ roomB”,“ Hall ”},如上图所示)。
public List<RoomPresencePeriod> multiGet(List<List<Object>> keys) {return CassandraDB.DB.getPresencePeriods(toCorrelationIdList(keys));
}
该代码只需要从数据库中检索每个相关ID的潜在存在期间即可。 因为我们对一个元组字段进行了groupBy,所以每个List在这里都包含一个单个String:correlationId。 请注意,我们返回的列表必须与键列表的大小完全相同,以便Storm知道哪个周期对应于哪个键。 因此,对于数据库中不存在的任何键,我们只需在结果列表中放置一个空值即可。
一旦加载,Storm就会将一个具有相同相关性ID的元组一个一个地呈现给我们的化简器PeriodBuilder 。 在我们的例子中,我们知道在此批次中,每个唯一的relativeId最多被调用两次,但是一般来说可能更多,或者如果当前批次中不存在其他ENTER / LEAVE事件,则仅被调用一次。 在对muliget()/ multiput()的调用与我们的reducer之间,借助我们选择的MapState实现,Storm让我们可以插入适当的逻辑来重放先前失败的元组。 在以后的文章中有更多的信息……
一旦我们减少了每个元组序列,Storm就会将结果传递给IBackingMap的mulitput(),在这里我们只是将所有内容“追加”到数据库:
public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) {CassandraDB.DB.upsertPeriods(newOrUpdatedPeriods);
}
Storm persistenceAggregate使用我们的化简提供给multitput()的值,自动将其发送到拓扑元组的后续部分。 这意味着我们刚刚建立的在线状态很容易作为元组字段使用,我们可以使用它们直接更新会议室时间线:
// building room timeline
.each(new Fields("presencePeriod"), new IsPeriodComplete())
.each(new Fields("presencePeriod"), new BuildHourlyUpdateInfo(), new Fields("roomId", "roundStartTime"))
.groupBy(new Fields("roomId", "roundStartTime"))
.persistentAggregate( TimelineBackingMap.FACTORY, new Fields("presencePeriod","roomId", "roundStartTime"), new TimelineUpdater(), new Fields("hourlyTimeline"))
第一行只是过滤掉尚未包含“ ENTER”和“ LEAVE”事件的任何期间。
然后, BuildHourlyUpdateInfo实现一对多的元组发射逻辑:对于每个占用期,它仅在“开始时间”内发射一个元组。 例如,从9:47 am到10:34 am在roomA中的占用将在此处触发针对RoomA的9.00am时间轴切片的元组的发射,以及另一个针对10.00am的元组的发射。
下一部分实现了与以前相同的groupBy / IBackingMap方法,只是这次使用了两个分组键而不是一个(因此,mulitget中的List <Object>将包含两个值:一个String和一个Long)。 由于我们存储一个小时的时间轴块,因此上述IBackingMap的必要属性得到了尊重。 多重获取为每个(“ roomId”,“开始时间”)对检索时间线块,然后TimelineUpdater (再次使用reducer)用与当前批次中找到的该时间线片相对应的每个存在时间更新时间线片(这就是BuildHourlyUpdateInfo的一对多元组发射逻辑)和multiput()仅保存结果。
导致咖啡厅占用
当我们看着它时,一切总是更加美丽,所以让我们来绘制房间的占用情况 。 稍加一些R代码 ,我们就可以一分钟一分钟地看到房间的占用情况(这并不意味着什么,因为所有数据都是随机的,但是……):
结论
希望本文能为维护Storm拓扑中的状态提供一种有用的方法。 我还尝试说明了将处理逻辑实现为小型拓扑元素的实现,将其彼此插入,而不是将一些“冗长的螺栓”捆绑在冗长而复杂的逻辑部分上。
Storm的一个重要方面是它的可扩展性,很可能去插入它的子类或在任何地方插入它的子类来调整其行为。 春天有十年前的那种聪明而有趣的感觉(哦,该死,我现在有点老了……^ __ ^)
翻译自: https://www.javacodegeeks.com/2013/08/scalable-real-time-state-update-with-storm.html
使用storm 实时计算