使用Storm进行可扩展的实时状态更新

在本文中,我将说明如何借助Storm框架以可扩展且无锁定的方式在数据库中维护实时事件驱动流程的当前状态。

Storm是基于事件的数据处理引擎。 它的模型依赖于基本原语,例如事件转换,过滤,聚合……,我们将它们组合成拓扑 。 拓扑的执行通常分布在多个节点上,并且风暴群集还可以并行执行给定拓扑的多个实例。 因此,在设计时,务必牢记哪些Storm原语在分区范围内执行,即在一个群集节点的级别上执行,以及哪些在群集范围内执行(又称为重新分区操作) ,因为它们涉及将事件从分区到分区)。 Storm Trident API文档明确提到了哪些功能做什么,作用范围如何。 Storm的分区概念与Kafka队列的分区概念保持一致, Kafka队列是入站事件的常见来源。

拓扑通常需要维护一些执行的持续状态。 例如,这可以是一些传感器值的滑动窗口平均值,从推文中提取的近期情绪,在不同位置出现的人数。……由于某些状态更新操作具有分区范围(例如partitionAggregate ),因此可伸缩性模型在这里尤为重要。其他则具有集群范围(例如groupby + perstitentAggregate的组合)。 这篇文章中说明了这一点。

示例代码可在githup上获得 。 它基于Storm 0.8.2,Cassandra 1.2.5和JDK 1.7.0。 请注意,此示例未包含适当的错误处理:喷口或螺栓均不支持重试失败的元组,我将在以后的文章中解决。 另外,我使用Java序列化将数据存储在元组中,因此,即使Storm支持多种语言,我的示例也是特定于Java的。

实际示例:出席事件

我的示例是模拟一个跟踪人们在建筑物内位置的系统。 每当用户进入或离开房间时,每个房间入口处的传感器都会发出如下事件:

{"eventType": "ENTER", "userId": "John_5", "time": 1374922058918, "roomId": "Cafetaria", "id": "bf499c0bd09856e7e0f68271336103e0A", "corrId": "bf499c0bd09856e7e0f68271336103e0"}
{"eventType": "ENTER", "userId": "Zoe_15", "time": 1374915978294, "roomId": "Conf1", "id": "3051649a933a5ca5aeff0d951aa44994A", "corrId": "3051649a933a5ca5aeff0d951aa44994"}
{"eventType": "LEAVE", "userId": "Jenny_6", "time": 1374934783522, "roomId": "Conf1", "id": "6abb451d45061968d9ca01b984445ee8B", "corrId": "6abb451d45061968d9ca01b984445ee8"}
{"eventType": "ENTER", "userId": "Zoe_12", "time": 1374921990623, "roomId": "Hall", "id": "86a691490fff3fd4d805dce39f832b31A", "corrId": "86a691490fff3fd4d805dce39f832b31"}
{"eventType": "LEAVE", "userId": "Marie_11", "time": 1374927215277, "roomId": "Conf1", "id": "837e05916349b42bc4c5f65c0b2bca9dB", "corrId": "837e05916349b42bc4c5f65c0b2bca9d"}
{"eventType": "ENTER", "userId": "Robert_8", "time": 1374911746598, "roomId": "Annex1", "id": "c461a50e236cb5b4d6b2f45d1de5cbb5A", "corrId": "c461a50e236cb5b4d6b2f45d1de5cbb5"}

对应于一个房间内一个用户的一个使用周期的(“ ENTER”和“ LEAVE”)对中的每个事件具有相同的相关性ID。 这可能对传感器提出了很多要求,但是出于本示例的目的,这使我的生活更加轻松

为了使事情变得有趣,让我们想象一下,不能保证到达我们服务器的事件遵守时间顺序(请参阅生成事件的python脚本中的shuffle()调用)。

我们将构建一个Storm拓扑,该拓扑将构建每个房间的每分钟每分钟的占用时间线,如本文结尾处的时间图所示。 在数据库中,房间时间线被切成一个小时的时间段,这些时间段被独立存储和更新。 这是Cafetaria占用1小时的示例:

{"roomId":"Cafetaria","sliceStartMillis":1374926400000,"occupancies":[11,12,12,12,13,15,15,14,17,18,18,19,20,22,22,22,21,22,23,25,25,25,28,28,33,32,31,31,29,28,27,27,25,
22,22,21,20,19,19,19,17,17,16,16,15,15,16,15,14,13,13,12,11,10,9,11,10,9,11,10]}

为了实现这一点,我们的拓扑需要:

  • 根据correlationIDID重新组合“ ENTER”和“ LEAVE”事件,并为此用户在此房间中产生相应的存在时间
  • 将每个在场期间的影响应用于房间入住时间表

顺便说一句,Cassandra提供了Counter列 ,尽管我可以很好地替代它们,但我在这里不使用它们。 但是,我的目的是说明Storm功能,即使它会使方法有些虚构。

分组依据/ persistentAggregate / iBackingMap说明

在查看示例代码之前,让我们澄清一下这些“三叉戟风暴”原语如何协同工作。

想象一下,我们从上午9:47到上午10:34收到了两个描述用户在roomA中存在​​的事件。 更新会议室的时间表需要:

  • 从数据库加载两个受影响的时间轴切片:[9.00am,10:00 am]和[10.00am,11:00 am]
  • 在这两个时间轴切片中添加此用户的状态
  • 将它们保存到数据库

但是,像这样天真地实现这一目标远非最优,首先是因为它每个事件使用两个DB请求,其次是因为这种“读取-更新-写入”序列通常需要一种锁定机制,该锁定机制通常无法很好地扩展。

为了解决第一点,我们想对几个事件重新组合数据库操作。 在Storm中,事件(或元组 )被成批处理。 IBackingMap是一个我们可以实现的原语,它使我们可以立即查看整批元组。 我们将使用它在批处理的开始(multiget)和结束时的所有DB-write操作(multiput)重新分组。 但是,multiget不允许我们查看元组本身,而只能查看“查询键”,这是从元组内容中计算出来的,如下所述。

原因在于上面提到的关于天真的实现的第二点:我们想并行执行几个[multiget +更新逻辑+ multiput]流,而不依赖锁。 这是通过确保那些并行子进程更新不相交的数据集来实现的。 这就要求定义拆分为并行流的拓扑元素还控制在每个流内的DB中加载和更新哪些数据。 该元素是Storm groupBy原语:它通过按字段值对元组进行分组来定义拆分,并且它通过将“ groupedBy”值作为对multiget的查询关键字来控制每个并行流更新的数据。

下图在房间占用示例中对此进行了说明(简化为每个房间仅存储一个时间轴,而不是每个一小时片段存储一个时间轴):
storm-b​​log-groupby-11
但是,并行性并没有完全发生(例如,当前的Storm实现在分组流中依次调用每个reducer / combiner),但这是设计拓扑时要牢记的一个好模型。

有趣的是,在groupBy和multiget之间发生了一些Storm魔术。 回想一下,Storm旨在进行大规模分布,这意味着每个流在多个节点上并行执行,并从诸如Hadoop HDFS或分布式Kafka队列之类的分布式数据源获取输入数据。 这意味着groupBy()同时在几个节点上执行,所有可能处理的事件都需要组合在一起。 groupBy是一种重新分区操作 ,可确保将所有需要分组的事件都发送到同一节点,并由IBackingMap +组合器或化简器的同一实例处理,因此不会发生争用情况。

同样,Storm要求我们将IBackingMap包装到可用的Storm MapState原语(或我们自己的原语)之一中,通常用于处理失败/重播的元组。 如上所述,我不在本文中讨论这一方面。

使用这种方法,我们必须实现IBackingMap,以便它尊重以下属性:

  • 对于不同的键值,由multiget读取和由IBackingMap的multiput操作写入的数据库行必须是不同的。

我想这就是他们将这些值称为“关键”的原因 (尽管任何尊重此属性的方法都可以)。

回到例子

让我们看看这在实践中是如何工作的。 该示例的主要拓扑在此处可用:

// reading events
.newStream("occupancy", new SimpleFileStringSpout("data/events.json", "rawOccupancyEvent"))
.each(new Fields("rawOccupancyEvent"), new EventBuilder(), new Fields("occupancyEvent"))

第一部分只是读取JSON格式的输入事件(我使用的是简单的文件输出),对它们进行反序列化,然后使用Java序列化将它们放入称为“ occupancyEvent”的元组字段中。 这些元组中的每一个都描述了用户在房间内或房间外的“ ENTER”或“ LEAVE”事件。

// gathering "enter" and "leave" events into "presence periods"
.each(new Fields("occupancyEvent"), new ExtractCorrelationId(), new Fields("correlationId"))
.groupBy(new Fields("correlationId"))
.persistentAggregate( PeriodBackingMap.FACTORY, new Fields("occupancyEvent"), new PeriodBuilder(), new Fields("presencePeriod"))
.newValuesStream()

当我们遇到correlationId的不同值时,groupBy原语会创建尽可能多的元组组(这可能意味着很多,因为通常最多两个事件具有相同的correlationId)。 当前批处理中具有相同相关ID的所有元组将重新组合在一起,并且一组或几组元组将一起显示给persistentAggregate中定义的元素。 PeriodBackingMap是IBackingMap的实现,其中实现了multiget方法,该方法将接收将在后续步骤中处理的元组组的所有相关ID(例如:{“ roomA”,“ roomB”,“ Hall ”},如上图所示。

public List<RoomPresencePeriod> multiGet(List<List<Object>> keys) {return CassandraDB.DB.getPresencePeriods(toCorrelationIdList(keys));
}

该代码只需要从数据库中检索每个相关ID的潜在存在期间即可。 因为我们在一个元组字段上执行了groupBy,所以每个List在这里都包含一个字符串:correlationId。 请注意,我们返回的列表必须与键列表的大小完全相同,以便Storm知道哪个周期对应于哪个键。 因此,对于数据库中不存在的任何键,我们只需在结果列表中放置一个空值即可。

一旦加载,Storm就会将一个具有相同相关性ID的元组一个一个地呈现给我们的化简器PeriodBuilder 。 在我们的例子中,我们知道在此批处理中每个唯一的RelationshipId最多被调用两次,但是一般来说可能会更多,或者如果当前批处理中不存在其他ENTER / LEAVE事件,则只能调用一次。 在对muliget()/ multiput()的调用与我们的reducer之间,借助我们选择的MapState实现,Storm让我们可以插入适当的逻辑来重放以前失败的元组。 在以后的文章中有更多关于…

一旦我们减少了每个元组序列,Storm就会将结果传递给IBackingMap的mulitput(),在这里我们将所有内容“追加”到数据库:

public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) {CassandraDB.DB.upsertPeriods(newOrUpdatedPeriods);
}

Storm persistenceAggregate会使用我们的reducer提供给multitput()的值自动向拓扑元组的后续部分发出。 这意味着我们刚刚建立的在线状态很容易作为元组字段使用,我们可以使用它们直接更新会议室时间线:

// building room timeline
.each(new Fields("presencePeriod"), new IsPeriodComplete())
.each(new Fields("presencePeriod"), new BuildHourlyUpdateInfo(), new Fields("roomId", "roundStartTime"))
.groupBy(new Fields("roomId", "roundStartTime"))
.persistentAggregate( TimelineBackingMap.FACTORY, new Fields("presencePeriod","roomId", "roundStartTime"), new TimelineUpdater(), new Fields("hourlyTimeline"))

第一行只是过滤掉尚未包含“ ENTER”和“ LEAVE”事件的任何期间。

然后, BuildHourlyUpdateInfo实现一对多的元组发射逻辑:对于每个占用期,它仅在“开始时间”内发射一个元组。 例如,从9:47 am到10:34 am在roomA中的占用将在此处触发针对roomA的9.00am时间轴切片的元组,以及另一个针对10.00am的元组的发射。

下一部分实现与以前相同的groupBy / IBackingMap方法,只是这次使用两个分组键而不是一个(因此,mulitget中的List <Object>将包含两个值:一个String和一个Long)。 由于我们存储一个小时的时间轴块,因此上面提到的IBackingMap的必要属性受到尊重。 多重获取为每个(“ roomId”,“开始时间”)对检索时间线块,然后TimelineUpdater (再次是reducer)用与当前批次中找到的该时间线片相对应的每个存在时间更新时间线片(这就是BuildHourlyUpdateInfo的一对多元组发射逻辑)和multiput()仅保存结果。

导致咖啡厅占用

当我们看着它时,一切总是更加美丽,因此让我们来绘制房间的占用情况 。 稍后,用一些R代码 ,我们可以看到每分钟的房间占用情况(由于所有数据都是随机的,所以意义不大,但是……):
屏幕截图-2013-07-29-at-15-02-52

结论

希望本文提供了一种维护Storm拓扑状态的有用方法。 我还尝试说明了将处理逻辑实现为小型拓扑元素的实现,这些拓扑元素彼此插入,而不是将一些“冗长的”螺栓捆绑在冗长而复杂的逻辑部分上。

Storm的一个重要方面是它的可扩展性,很可能会在任何地方插入该子类或子类的子类来调整其行为。 春天十年前有这种聪明有趣的感觉(哦,该死,我现在有点老了……^ __ ^)

参考:来自Svend博客的 JCG合作伙伴 Svend Vanderveken 使用Storm进行的可伸缩实时状态更新 。

翻译自: https://www.javacodegeeks.com/2013/08/scalable-real-time-state-update-with-storm.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/366945.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【干货】十分钟读懂浏览器渲染流程

在之前写过的一篇《"天龙八步"细说浏览器输入URL后发生了什么》一文中&#xff0c;和大家分享了从在浏览器中输入网址URL到最终页面展示的整个过程。部分读者向我反馈对于最后的浏览器渲染布局这块不是很清晰&#xff0c;所以本文就浏览器渲染流程单独开篇讲解&#…

控制台资费管理主菜单java_java毕业设计_springboot框架的高速公路收费管理系统...

今天介绍一个java毕设题目, 题目内容为springboot框架的高速公路收费管理系统, 是一个采用b/s结构的javaweb项目, 采用java语言编写开发工具eclipse, 项目框架jspspringbootmybatis, 高速公路收费管理系统的信息存储于mysql中, 并基于mybatis进行了orm封装, 该高速公路收费管理…

在Amazon EMR上运行Hadoop MapReduce作业

不久前&#xff0c;我发布了如何使用CLI设置EMR群集的信息。 在本文中&#xff0c;我将展示如何使用适用于AWS的Java SDK来设置集群。 展示如何使用Java AWS开发工具包执行此操作的最佳方法是展示完整的示例&#xff0c;因此&#xff0c;让我们开始吧。 设置一个新的Maven项目…

[JSConf EU 2018] 大脑控制 Javascript

先解释&#xff0c;本人为前端菜鸟&#xff0c;之前也未参加过类似的活动&#xff0c;没有翻译过什么文章&#xff0c;此次是好奇心使然&#xff0c;也是想尝试下&#xff0c;学习学习&#xff0c;英文很烂&#xff0c;全靠有道&#xff0c;但是视频整个看下来&#xff0c;还是…

JavaScript中不得不说的断言?

断言主要应用于“调试”与“测试” 一、前端中的断言 仔细地查找一下JavaScript中的API&#xff0c;实际上并没有多少关于断言的方法。唯一一个就是console.assert&#xff1a; // console.assert(condition, message)const a 1console.assert(typeof a number, a should be…

Java EE状态会话Bean(EJB)示例

在本文中&#xff0c;我们将了解如何在简单的Web应用程序中使用状态会话Bean来跟踪客户端会话中的状态。 1.简介 有状态会话Bean通常保存有关特定客户端会话的信息&#xff0c;并在整个会话中保留该信息&#xff08;与无状态会话Bean相对&#xff09;。 有状态EJB实例仅与一个…

起点海外版 Hybrid App-内嵌页优化实践

本文作者&#xff1a;刘文涛 原创声明&#xff1a;本文为阅文前端团队 YFE 成员出品&#xff0c;请尊重原创&#xff0c;转载请联系公众号 (id: yuewen_YFE) 获取授权&#xff0c;并注明作者、出处和链接。 今年年初我司开启了起点品牌的海外之旅&#xff0c;名为「 Webnovel 」…

aix 卸载mysql_AIX 删除数据库及集群软件

一、 删除数据库1、用dbca自动删库在CRT上无法打开dbca图形界面&#xff0c;要安装一个Xmanage软件&#xff0c;用Xstart连接终端&#xff0c;并修改oracle用户的.profile&#xff0c;加上“export DISPLAY192.168.8.120:0.0”Xstart配置信息如下&#xff1a;2、手工删除数据库…

如何在github中的readme.md加入项目截图

1. 先在之前的本地项目文件夹里创建一个存放截图的文件夹。&#xff08;如img文件夹&#xff09; 2. 将新增的内容通过github desktop上传到github中 3. 在github中立马能看到刚刚上传的图片&#xff0c;打开图片&#xff0c;点击Download 4. 直接复制地址栏的网址 5. 最后在RE…

记表格设计规范整理与页面可视化生成工具开发

前言 公司有一个项目在维护&#xff0c;大概有300左右&#xff0c;其中表单与表格的页面占比大概百分之五六十&#xff0c;为了节省开发时间&#xff0c;避免多人协作时&#xff0c;出现多套冗余代码&#xff0c;我们尝试写了一下表单和表格的生成工具&#xff0c;从梳理到规范…

java仿qq空间音乐播放_完美实现仿QQ空间评论回复特效

评论回复是个很常见的东西&#xff0c;但是各大网站实现的方式却不尽相同。大体上有两种方式1.像优酷这种最常见&#xff0c;在输入框中要回复的人&#xff0c;这种方式下&#xff0c;用www.cppcns.com户可以修改。新浪微博则是在这个基础上&#xff0c;弹出好友菜单。这种方式…

使用签名保护基于HTTP的API

我在EMC上的一个平台上可以构建SaaS解决方案。 与越来越多的其他应用程序一样&#xff0c;该平台具有基于RESTful HTTP的API。 使用像JAX-RS这样的开发框架&#xff0c;构建这样的API相对容易。 但是&#xff0c; 正确构建它们并不容易。 建立基于HTTP的API的问题 问题不仅…

Python开发【模块】:Celery 分布式异步消息任务队列

前言&#xff1a; Celery 是一个 基于python开发的分布式异步消息任务队列&#xff0c;通过它可以轻松的实现任务的异步处理&#xff0c; 如果你的业务场景中需要用到异步任务&#xff0c;就可以考虑使用celery&#xff0c; 举几个实例场景中可用的例子: 你想对100台机器执行一…

iOS开发者的一些前端感悟

很多前端工程师会把自己比作“魔法师”&#xff0c;而对于JavaScript这门语言&#xff0c;我也想把它唤作一门“有魔力的语言”。因为这群有无限想法的人&#xff0c;真的在用它创造各种让你惊叹的事物。 Web三件套一、前言 几年前&#xff0c;笔者还是一名初涉编程的学生&…

windows下github 出现Permission denied (publickey)

github教科书传送门:http://www.liaoxuefeng.com/wiki/0013739516305929606dd18361248578c67b8067c8c017b000 再学习到"添加远程仓库"的时候遇到了 Permission denied (publickey) 这个问题&#xff0c; 总结来说以前的步骤如下所示&#xff1a; 1、git config --glo…

[UE4]嵌套Canvas

转载于:https://www.cnblogs.com/timy/p/9090642.html

写博客的这几个月,获益良多

1.前言 也将近过年了&#xff0c;看了那么多人搞了年会总结。自己活跃社区这几个月&#xff0c;改变了不少&#xff0c;收获也不少。就想写下这段时间写文章的一些总结&#xff0c;统计下‘成绩’&#xff0c;说下感想&#xff0c;就写了这篇文章。这次总结的关键词就是&#x…

shiro 权限 URL 配置细节

转载于:https://www.cnblogs.com/hwgok/p/9100277.html

2016 年崛起的 JS 项目

本文是我对中文版 risingstars2016 的整理&#xff0c;而本人就是中文版的译者&#xff0c;首发于知乎专栏 前端周刊。共 21384 字&#xff0c;读完需 30 分钟&#xff0c;速读需 5 分钟。长江后浪推前浪&#xff0c;如果你能花 30 分钟读完我 6 个小时翻译的内容&#xff0c;相…

php 开启命令模式,如何启用PhpStorm中的命令行工具

本篇文章主要给大家介绍如何使用phpstorm中的命令行工具。PhpStorm下载地址&#xff1a;PhpStorm使用命令行工具&#xff0c;我们可以直接从IDE调用命令&#xff01;在我们使用任何命令行工具之前&#xff0c;我们必须在设置中启用它。涉及到的步骤如下&#xff1a;使用命令行工…