使用storm 实时计算_使用Storm进行可扩展的实时状态更新

使用storm 实时计算

在本文中,我将说明如何借助Storm框架以可扩展且无锁定的方式在数据库中维护实时事件驱动流程的当前状态。

Storm是基于事件的数据处理引擎。 它的模型依赖于基本原语,例如事件转换,过滤,聚合……,我们将它们组合成拓扑 。 拓扑的执行通常分布在多个节点上,并且风暴群集还可以并行执行给定拓扑的多个实例。 因此,在设计时,必须牢记哪些Storm原语在分区范围内执行,即在一个群集节点的级别上执行,以及哪些在群集范围内执行(又称为重新分区操作) ,因为它们涉及将事件从中移出的网络流量。分区到分区)。 Storm Trident API文档明确提到了哪些功能做什么,作用范围如何。 Storm的分区概念与Kafka队列的分区概念保持一致, Kafka队列是入站事件的常见来源。

拓扑通常需要维护一些执行的持续状态。 例如,这可以是一些传感器值的滑动窗口平均值,从推文中提取的近期情绪,在不同位置出现的人数。……由于某些状态更新操作具有分区范围(例如partitionAggregate ),因此可伸缩性模型在这里尤为重要。其他则具有集群范围(例如groupby + perstitentAggregate的组合)。 这篇文章中说明了这一点。

示例代码在githup上可用 。 它基于Storm 0.8.2,Cassandra 1.2.5和JDK 1.7.0。 请注意,该示例未包含适当的错误处理:喷口或螺栓均不支持重试失败的元组,我将在以后的文章中解决。 另外,我使用Java序列化将数据存储在元组中,因此,即使Storm支持多种语言,我的示例也是特定于Java的。

实际示例:出席事件

我的示例是模拟一个跟踪人们在建筑物内位置的系统。 每当用户进入或离开房间时,每个房间入口处的传感器都会发出如下事件:

{"eventType": "ENTER", "userId": "John_5", "time": 1374922058918, "roomId": "Cafetaria", "id": "bf499c0bd09856e7e0f68271336103e0A", "corrId": "bf499c0bd09856e7e0f68271336103e0"}
{"eventType": "ENTER", "userId": "Zoe_15", "time": 1374915978294, "roomId": "Conf1", "id": "3051649a933a5ca5aeff0d951aa44994A", "corrId": "3051649a933a5ca5aeff0d951aa44994"}
{"eventType": "LEAVE", "userId": "Jenny_6", "time": 1374934783522, "roomId": "Conf1", "id": "6abb451d45061968d9ca01b984445ee8B", "corrId": "6abb451d45061968d9ca01b984445ee8"}
{"eventType": "ENTER", "userId": "Zoe_12", "time": 1374921990623, "roomId": "Hall", "id": "86a691490fff3fd4d805dce39f832b31A", "corrId": "86a691490fff3fd4d805dce39f832b31"}
{"eventType": "LEAVE", "userId": "Marie_11", "time": 1374927215277, "roomId": "Conf1", "id": "837e05916349b42bc4c5f65c0b2bca9dB", "corrId": "837e05916349b42bc4c5f65c0b2bca9d"}
{"eventType": "ENTER", "userId": "Robert_8", "time": 1374911746598, "roomId": "Annex1", "id": "c461a50e236cb5b4d6b2f45d1de5cbb5A", "corrId": "c461a50e236cb5b4d6b2f45d1de5cbb5"}

对(“ ENTER”和“ LEAVE”)对中的每个事件与一个房间内一个用户的一个占用时间段相对应。 这可能对传感器提出了很多要求,但是出于本示例的目的,这使我的生活更加轻松

为了使事情变得有趣,让我们想象一下,不能保证到达我们服务器的事件遵循时间顺序(请参见生成事件的python脚本中的shuffle()调用)。

我们将构建一个Storm拓扑,该拓扑将构建每个房间的每分钟每分钟的占用时间线,如本文结尾处的时间图所示。 在数据库中,房间时间线被切成一个小时的时间段,这些时间段被独立存储和更新。 这是Cafetaria占用1小时的示例:

{"roomId":"Cafetaria","sliceStartMillis":1374926400000,"occupancies":[11,12,12,12,13,15,15,14,17,18,18,19,20,22,22,22,21,22,23,25,25,25,28,28,33,32,31,31,29,28,27,27,25,
22,22,21,20,19,19,19,17,17,16,16,15,15,16,15,14,13,13,12,11,10,9,11,10,9,11,10]}

为了实现这一点,我们的拓扑需要:

  • 根据correlationID重新组合“ ENTER”和“ LEAVE”事件,并为此用户在此房间中产生相应的存在时间
  • 将每个在场期间的影响应用于房间入住时间表

顺便说一句,Cassandra提供了Counter列 ,尽管我可以很好地替代它们,但我在这里不使用它们。 但是,我的目的是说明Storm功能,即使它会使方法有些虚构。

分组依据/ persistentAggregate / iBackingMap说明

在查看示例代码之前,让我们澄清一下这些“三叉戟风暴”原语如何协同工作。

想象一下,我们从上午9:47到上午10:34收到了两个描述用户在roomA中存在​​的事件。 更新会议室的时间表需要:

  • 从数据库加载两个受影响的时间轴切片:[9.00am,10:00 am]和[10.00am,11:00 am]
  • 在这两个时间轴切片中添加此用户的状态
  • 将它们保存到数据库

但是,像这样天真地实现此目标并不是最佳选择,首先是因为它每个事件使用两个DB请求,其次是因为这种“读取-更新-写入”序列通常需要一种锁定机制,这种锁定机制通常无法很好地扩展。

为了解决第一点,我们想为几个事件重新组合数据库操作。 在Storm中,事件(或元组 )被成批处理。 IBackingMap是一个我们可以实现的原语,它使我们可以立即查看整批元组。 我们将使用它在批处理的开始(multiget)和结束时的所有DB-write操作(multiput)重新分组。 但是,multiget不允许我们查看元组本身,而只能查看“查询键”,这是根据元组内容计算出来的,如下所述。

原因在于上面提到的关于天真的实现的第二点:我们想并行执行几个[multiget +更新逻辑+ multiput]流,而不依赖锁。 这是通过确保那些并行子流程更新不相交的数据集来实现的。 这就要求定义拆分成并行流的拓扑元素还控制每个流内DB中要加载和更新的数据。 该元素是Storm groupBy原语:它通过按字段值对元组进行分组来定义拆分,并且它通过将“ groupedBy”值作为对multiget的查询关键字来控制每个并行流更新的数据。

下图在房间占用示例中对此进行了说明(简化为每个房间仅存储一个时间线,而不是每个一小时的时间片一个时间线):
storm-b​​log-groupby-11
但是,并行性并没有完全发生(例如,当前的Storm实现在分组流中依次调用每个reducer / combiner),但这是设计拓扑时要牢记的一个好模型。

有趣的是,在groupBy和multiget之间发生了一些Storm魔术。 回想一下,Storm旨在进行大规模分布,这意味着每个流在多个节点上并行执行,从诸如Hadoop HDFS或分布式Kafka队列之类的分布式数据源获取输入数据。 这意味着groupBy()同时在多个节点上执行,所有可能处理的事件都需要组合在一起。 groupBy是一个重新分区操作 ,可确保将所有需要分组的事件发送到同一节点,并由IBackingMap +组合器或约简器的同一实例处理,因此不会发生争用情况。

同样,Storm要求我们将IBackingMap包装到可用的Storm MapState原语(或我们自己的原语)之一中,通常用于处理失败/重播的元组。 如上所述,我不在本文中讨论这一方面。

使用这种方法,我们必须实现IBackingMap,以便它尊重以下属性:

  • 对于不同的键值,由multiget读取和由IBackingMap的multiput操作写入的数据库行必须是不同的。

我想这就是他们将这些值称为“关键”的原因 (尽管任何尊重此属性的方法都可以)。

回到例子

让我们看看这在实践中是如何工作的。 该示例的主要拓扑在此处可用:

// reading events
.newStream("occupancy", new SimpleFileStringSpout("data/events.json", "rawOccupancyEvent"))
.each(new Fields("rawOccupancyEvent"), new EventBuilder(), new Fields("occupancyEvent"))

第一部分只是读取JSON格式的输入事件(我正在使用简单的文件输出),对它们进行反序列化,然后使用Java序列化将它们放入称为“ occupancyEvent”的元组字段中。 这些元组中的每一个都描述了用户在房间内或房间外的“ ENTER”或“ LEAVE”事件。

// gathering "enter" and "leave" events into "presence periods"
.each(new Fields("occupancyEvent"), new ExtractCorrelationId(), new Fields("correlationId"))
.groupBy(new Fields("correlationId"))
.persistentAggregate( PeriodBackingMap.FACTORY, new Fields("occupancyEvent"), new PeriodBuilder(), new Fields("presencePeriod"))
.newValuesStream()

当我们遇到correlationId的不同值时,groupBy原语会创建尽可能多的元组组(这可能意味着很多,因为通常最多两个事件具有相同的correlationId)。 当前批处理中具有相同相关ID的所有元组将重新组合在一起,并且一组或几组元组将一起呈现给persistentAggregate中定义的元素。 PeriodBackingMap是IBackingMap的实现,其中实现了multiget方法,该方法将接收下一步将要处理的元组组的所有相关ID(例如:{“ roomA”,“ roomB”,“ Hall ”},如上图所示)。

public List<RoomPresencePeriod> multiGet(List<List<Object>> keys) {return CassandraDB.DB.getPresencePeriods(toCorrelationIdList(keys));
}

该代码只需要从数据库中检索每个相关ID的潜在存在期间即可。 因为我们对一个元组字段进行了groupBy,所以每个List在这里都包含一个单个String:correlationId。 请注意,我们返回的列表必须与键列表的大小完全相同,以便Storm知道哪个周期对应于哪个键。 因此,对于数据库中不存在的任何键,我们只需在结果列表中放置一个空值即可。

一旦加载,Storm就会将一个具有相同相关性ID的元组一个一个地呈现给我们的化简器PeriodBuilder 。 在我们的例子中,我们知道在此批次中,每个唯一的relativeId最多被调用两次,但是一般来说可能更多,或者如果当前批次中不存在其他ENTER / LEAVE事件,则仅被调用一次。 在对muliget()/ multiput()的调用与我们的reducer之间,借助我们选择的MapState实现,Storm让我们可以插入适当的逻辑来重放先前失败的元组。 在以后的文章中有更多的信息……

一旦我们减少了每个元组序列,Storm就会将结果传递给IBackingMap的mulitput(),在这里我们只是将所有内容“追加”到数据库:

public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) {CassandraDB.DB.upsertPeriods(newOrUpdatedPeriods);
}

Storm persistenceAggregate使用我们的化简提供给multitput()的值,自动将其发送到拓扑元组的后续部分。 这意味着我们刚刚建立的在线状态很容易作为元组字段使用,我们可以使用它们直接更新会议室时间线:

// building room timeline
.each(new Fields("presencePeriod"), new IsPeriodComplete())
.each(new Fields("presencePeriod"), new BuildHourlyUpdateInfo(), new Fields("roomId", "roundStartTime"))
.groupBy(new Fields("roomId", "roundStartTime"))
.persistentAggregate( TimelineBackingMap.FACTORY, new Fields("presencePeriod","roomId", "roundStartTime"), new TimelineUpdater(), new Fields("hourlyTimeline"))

第一行只是过滤掉尚未包含“ ENTER”和“ LEAVE”事件的任何期间。

然后, BuildHourlyUpdateInfo实现一对多的元组发射逻辑:对于每个占用期,它仅在“开始时间”内发射一个元组。 例如,从9:47 am到10:34 am在roomA中的占用将在此处触发针对RoomA的9.00am时间轴切片的元组的发射,以及另一个针对10.00am的元组的发射。

下一部分实现了与以前相同的groupBy / IBackingMap方法,只是这次使用了两个分组键而不是一个(因此,mulitget中的List <Object>将包含两个值:一个String和一个Long)。 由于我们存储一个小时的时间轴块,因此上述IBackingMap的必要属性得到了尊重。 多重获取为每个(“ roomId”,“开始时间”)对检索时间线块,然后TimelineUpdater (再次使用reducer)用与当前批次中找到的该时间线片相对应的每个存在时间更新时间线片(这就是BuildHourlyUpdateInfo的一对多元组发射逻辑)和multiput()仅保存结果。

导致咖啡厅占用

当我们看着它时,一切总是更加美丽,所以让我们来绘制房间的占用情况 。 稍加一些R代码 ,我们就可以一分钟一分钟地看到房间的占用情况(这并不意味着什么,因为所有数据都是随机的,但是……):
屏幕截图-2013-07-29-at-15-02-52

结论

希望本文能为维护Storm拓扑中的状态提供一种有用的方法。 我还尝试说明了将处理逻辑实现为小型拓扑元素的实现,将其彼此插入,而不是将一些“冗长的螺栓”捆绑在冗长而复杂的逻辑部分上。

Storm的一个重要方面是它的可扩展性,很可能去插入它的子类或在任何地方插入它的子类来调整其行为。 春天有十年前的那种聪明而有趣的感觉(哦,该死,我现在有点老了……^ __ ^)

参考:来自Svend博客的 JCG合作伙伴 Svend Vanderveken 使用Storm进行的可伸缩实时状态更新 。

翻译自: https://www.javacodegeeks.com/2013/08/scalable-real-time-state-update-with-storm.html

使用storm 实时计算

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/346972.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【多元域乘法】多项式乘法电路原理及MATLAB详解

关注公号【逆向通信猿】更精彩!!! 关于二元域上的两个元素的乘法、多项式除法,在之前的博客 【有限域除法】二元多项式除法电路原理及MATLAB详解 子程序:sub_poly_div.m 【有限域元素加法和乘法】有限域元素加法和乘法的原理及MATLAB实现 子程序:sub_gf_add.m、sub_gf_…

my CSAPP Attack lab堆栈详解

关注公号【逆向通信猿】更精彩!!! 这个实验时学习了简书上的一篇文章后,自己根据课程例子进行的一次小测试,phase 4和5的堆栈图解还没有画,等后续有时间会进行补充。 本人转载的简书原文: https://blog.csdn.net/wlwdecs_dn/article/details/121249364#comments_19237…

Spring MVC教程

1.简介 作为企业Java开发人员&#xff0c;这项工作的主要重点之一是开发Web应用程序。 对于Web应用程序&#xff0c;后果还包括许多挑战。 具体来说&#xff0c;其中一些是状态管理&#xff0c;工作流和验证。 HTTP协议的无状态性质只会使事情变得更加复杂。 Spring的Web框架旨…

ubuntu22.04 下载路径

ftp下载路径 csdn下载 ubuntu22.04下载路径ubuntu-22.04-desktop-amd64.7z.001资源-CSDN文库 ubuntu22.04下载路径ubuntu-22.04-desktop-amd64.7z.002资源-CSDN文库 【免费】ubuntu-22.04-desktop-amd64.7z.003资源-CSDN文库 【免费】ubuntu-22.04-desktop-amd64.7z.004资源-…

Spring Reactor教程

在RESTful服务的世界中&#xff0c;实际上实际上是在幕后进行许多工作&#xff0c;我们通常必须在应用程序中进行很多处理&#xff0c;而实际上并不会影响需要发送给真实用户的响应。 可以被动地做出这些业务决策&#xff0c;以便它们对与应用程序交互的用户没有任何影响。 Spr…

api签名_使用签名保护基于HTTP的API

api签名我在EMC上的一个平台上可以构建SaaS解决方案。 与越来越多的其他应用程序一样&#xff0c;该平台具有基于RESTful HTTP的API。 使用JAX-RS之类的开发框架&#xff0c;构建这样的API相对容易。 但是&#xff0c; 正确构建它们并不容易。 建立基于HTTP的API的问题 问…

【多元域除法】多项式除法电路原理及MATLAB详解

关注公号【逆向通信猿】更精彩!!! 关于二元域上的两个元素的加法和乘法、多项式除法,在之前的博客 【有限域除法】二元多项式除法电路原理及MATLAB详解 子程序:sub_poly_div.m 【有限域元素加法和乘法】有限域元素加法和乘法的原理及MATLAB实现 子程序:sub_gf_add.m、s…

win10高分辨率下修改字体显示大小(不是缩放百分比)

问题 不通过修改设置缩放百分比来增大win10的字体显示大小&#xff0c;缩放百分比调大后会导致很多问题出现&#xff01;&#xff01;&#xff01; 修改 打开设置&#xff0c;或者右键个性化&#xff0c;在搜索栏输入&#xff1a;“放大文本大小”&#xff0c;搜索框下面会自…

应用程序无法正常启动 0xc0150002

Visual Studio 2017在debug下运行程序报错 应用程序无法正常启动 0xc0150002 分析原因 可能是&#xff1a;原程序是低版本的VS所编写的&#xff0c;缺少低版本的运行库&#xff0c;所以报错 解决 安装了VS2010后即可正常运行 error LNK2019: 无法解析的外部符号 __vsnwprin…

Excel之抽奖器实现

Excel实现一个抽奖器&#xff0c;关键在于学会几个Excel中的函数即可轻松实现。 单人抽奖 RANDBETWEEN 例&#xff1a; INDEX(A2:A61,RANDBETWEEN(1,60))缺点&#xff1a;这种方式生成的抽奖器&#xff0c;在多人情况下&#xff0c;由于RANDBETWEEN函数的返回值有可能是相同…

【RS码1】系统RS码编码原理及MATLAB实现(不使用MATLAB库函数)

关注公号【逆向通信猿】更精彩!!! 基础知识 要想搞懂本节知识,需要先熟悉掌握以下前几篇博客 【多元域乘法】多项式乘法电路原理及MATLAB详解 【多元域除法】多项式除法电路原理及MATLAB详解 RS码编码原理 RS码的编码与BCH码类似,区别在于RS码为多进制的 生成多项式…

如何用Java编写类似C的Sizeof函数

如果您刚开始学习Java并且是C语言背景&#xff0c;那么您可能已经注意到Java和C编程语言之间存在一些差异&#xff0c;例如String是Java中的对象&#xff0c;而不是NULL终止的字符数组。 同样&#xff0c;Java中没有sizeof&#xff08;&#xff09;运算符。 所有原始值都有预定…

Spring启动教程

1.简介 如果您一直想使用一个Web框架&#xff0c;它使您能够快速开始API开发&#xff0c;而无须设置Web服务器&#xff0c;收集所有有线依赖项&#xff0c;安装各种工具的麻烦&#xff0c;那么您将拥有一个出色的框架&#xff0c;即Spring开机 Spring Boot的主要座右铭是约定优…

【LDPC系列1】基于MATLAB中LDPC编译码器对象的图像传输通信系统仿真

关注公号【逆向通信猿】更精彩!!! 1. 构造编码器对象 采用MATLAB内置的comm.LDPCEncoder构造编码器对象,其中使用默认的校验矩阵,信息位长32400比特,码长64800比特,该校验矩阵中除第一行中1的个数为6个外,其余行中1的个数均为7;前12960列中1的个数为8,后32400列构成…

【LDPC系列2】基于MATLAB中LDPC编译码器对象的图像传输通信系统仿真(IEEE 802.16e标准协议基础矩阵)

关注公号【逆向通信猿】更精彩!!! 1. 构造校验矩阵H,并保存为mat文件 采用IEEE 802.16e标准协议中的基础校验矩阵 通过构造QC-LDPC校验矩阵,码长n=2304,信息长k=1152,码率r=1/2,基础矩阵维数为1224: Hb = [-1 94 73 -1

VS2010附加进程调试DLL时断点无法断下的解决方法

系统版本&#xff1a;Win10 x64 1809 VS版本&#xff1a;VS2017 企业版 问题一 在动态链接库(DLL)附加到进程调试时&#xff0c;用VS2017附加后单步调试&#xff0c;结果发现总是在调试过程中卡死&#xff0c;VS2017无响应&#xff1b; 解决办法是&#xff1a;强制结束VS2017…

用于SaaS和NoSQL的Jdbi

一个自然的接口&#xff0c;用于与CRM&#xff0c;ERP&#xff0c;会计&#xff0c;营销自动化&#xff0c;NoSQL&#xff0c;平面文件等基于Java的数据集成 Jdbi是Java的SQL便利库&#xff0c;它为JDBC提供更自然的Java数据库接口&#xff0c;该接口易于绑定到域数据类型。 该…

【卷积码系列3】(n,k,m)卷积码的维特比译码实现(不使用MATLAB库函数)及性能对比(vitdec函数不使用MATLAB库函数【全部代码需私信另外付费获取】)

理论基础 MATLAB库函数polly2trellis(卷积码生成多项式转网格图描述)的实现过程详解 上面这篇仅作为了解!!! 【卷积码系列1】(n,k,m)卷积码的编码原理详解及MATLAB实现 【卷积码系列2】(n,k,m)卷积码的生成多项式矩阵系数转网格图描述(不使用MATLAB库函数) 维特比译码曲…

Java批处理教程

在当今世界&#xff0c;互联网已经改变了我们的生活方式&#xff0c;其主要原因之一是大多数日常琐事都使用互联网。 这导致可用于处理的大量数据。 其中涉及大量数据的一些示例是处理工资单&#xff0c;银行对帐单&#xff0c;利息计算等。因此&#xff0c;请设想一下&#x…

编写junit 测试_使用JUnit和Repeat注​​释编写有效的负载测试

编写junit 测试EasyTest最近推出了一组新的注释&#xff0c;可帮助其用户编写有效的测试用例。 进入EasyTest的两个主要注释是&#xff1a; 重复 持续时间 今天&#xff0c;我们将讨论重复标注。 一种新的方法级别注释 重复已添加到EasyTest Framework。 此批注可用于重复…