使用storm 实时计算_使用Storm进行可扩展的实时状态更新

使用storm 实时计算

在本文中,我将说明如何借助Storm框架以可扩展且无锁定的方式在数据库中维护实时事件驱动流程的当前状态。

Storm是基于事件的数据处理引擎。 它的模型依赖于基本原语,例如事件转换,过滤,聚合……,我们将它们组合成拓扑 。 拓扑的执行通常分布在多个节点上,并且风暴群集还可以并行执行给定拓扑的多个实例。 因此,在设计时,必须牢记哪些Storm原语在分区范围内执行,即在一个群集节点的级别上执行,以及哪些在群集范围内执行(又称为重新分区操作) ,因为它们涉及将事件从中移出的网络流量。分区到分区)。 Storm Trident API文档明确提到了哪些功能做什么,作用范围如何。 Storm的分区概念与Kafka队列的分区概念保持一致, Kafka队列是入站事件的常见来源。

拓扑通常需要维护一些执行的持续状态。 例如,这可以是一些传感器值的滑动窗口平均值,从推文中提取的近期情绪,在不同位置出现的人数。……由于某些状态更新操作具有分区范围(例如partitionAggregate ),因此可伸缩性模型在这里尤为重要。其他则具有集群范围(例如groupby + perstitentAggregate的组合)。 这篇文章中说明了这一点。

示例代码在githup上可用 。 它基于Storm 0.8.2,Cassandra 1.2.5和JDK 1.7.0。 请注意,该示例未包含适当的错误处理:喷口或螺栓均不支持重试失败的元组,我将在以后的文章中解决。 另外,我使用Java序列化将数据存储在元组中,因此,即使Storm支持多种语言,我的示例也是特定于Java的。

实际示例:出席事件

我的示例是模拟一个跟踪人们在建筑物内位置的系统。 每当用户进入或离开房间时,每个房间入口处的传感器都会发出如下事件:

{"eventType": "ENTER", "userId": "John_5", "time": 1374922058918, "roomId": "Cafetaria", "id": "bf499c0bd09856e7e0f68271336103e0A", "corrId": "bf499c0bd09856e7e0f68271336103e0"}
{"eventType": "ENTER", "userId": "Zoe_15", "time": 1374915978294, "roomId": "Conf1", "id": "3051649a933a5ca5aeff0d951aa44994A", "corrId": "3051649a933a5ca5aeff0d951aa44994"}
{"eventType": "LEAVE", "userId": "Jenny_6", "time": 1374934783522, "roomId": "Conf1", "id": "6abb451d45061968d9ca01b984445ee8B", "corrId": "6abb451d45061968d9ca01b984445ee8"}
{"eventType": "ENTER", "userId": "Zoe_12", "time": 1374921990623, "roomId": "Hall", "id": "86a691490fff3fd4d805dce39f832b31A", "corrId": "86a691490fff3fd4d805dce39f832b31"}
{"eventType": "LEAVE", "userId": "Marie_11", "time": 1374927215277, "roomId": "Conf1", "id": "837e05916349b42bc4c5f65c0b2bca9dB", "corrId": "837e05916349b42bc4c5f65c0b2bca9d"}
{"eventType": "ENTER", "userId": "Robert_8", "time": 1374911746598, "roomId": "Annex1", "id": "c461a50e236cb5b4d6b2f45d1de5cbb5A", "corrId": "c461a50e236cb5b4d6b2f45d1de5cbb5"}

对(“ ENTER”和“ LEAVE”)对中的每个事件与一个房间内一个用户的一个占用时间段相对应。 这可能对传感器提出了很多要求,但是出于本示例的目的,这使我的生活更加轻松

为了使事情变得有趣,让我们想象一下,不能保证到达我们服务器的事件遵循时间顺序(请参见生成事件的python脚本中的shuffle()调用)。

我们将构建一个Storm拓扑,该拓扑将构建每个房间的每分钟每分钟的占用时间线,如本文结尾处的时间图所示。 在数据库中,房间时间线被切成一个小时的时间段,这些时间段被独立存储和更新。 这是Cafetaria占用1小时的示例:

{"roomId":"Cafetaria","sliceStartMillis":1374926400000,"occupancies":[11,12,12,12,13,15,15,14,17,18,18,19,20,22,22,22,21,22,23,25,25,25,28,28,33,32,31,31,29,28,27,27,25,
22,22,21,20,19,19,19,17,17,16,16,15,15,16,15,14,13,13,12,11,10,9,11,10,9,11,10]}

为了实现这一点,我们的拓扑需要:

  • 根据correlationID重新组合“ ENTER”和“ LEAVE”事件,并为此用户在此房间中产生相应的存在时间
  • 将每个在场期间的影响应用于房间入住时间表

顺便说一句,Cassandra提供了Counter列 ,尽管我可以很好地替代它们,但我在这里不使用它们。 但是,我的目的是说明Storm功能,即使它会使方法有些虚构。

分组依据/ persistentAggregate / iBackingMap说明

在查看示例代码之前,让我们澄清一下这些“三叉戟风暴”原语如何协同工作。

想象一下,我们从上午9:47到上午10:34收到了两个描述用户在roomA中存在​​的事件。 更新会议室的时间表需要:

  • 从数据库加载两个受影响的时间轴切片:[9.00am,10:00 am]和[10.00am,11:00 am]
  • 在这两个时间轴切片中添加此用户的状态
  • 将它们保存到数据库

但是,像这样天真地实现此目标并不是最佳选择,首先是因为它每个事件使用两个DB请求,其次是因为这种“读取-更新-写入”序列通常需要一种锁定机制,这种锁定机制通常无法很好地扩展。

为了解决第一点,我们想为几个事件重新组合数据库操作。 在Storm中,事件(或元组 )被成批处理。 IBackingMap是一个我们可以实现的原语,它使我们可以立即查看整批元组。 我们将使用它在批处理的开始(multiget)和结束时的所有DB-write操作(multiput)重新分组。 但是,multiget不允许我们查看元组本身,而只能查看“查询键”,这是根据元组内容计算出来的,如下所述。

原因在于上面提到的关于天真的实现的第二点:我们想并行执行几个[multiget +更新逻辑+ multiput]流,而不依赖锁。 这是通过确保那些并行子流程更新不相交的数据集来实现的。 这就要求定义拆分成并行流的拓扑元素还控制每个流内DB中要加载和更新的数据。 该元素是Storm groupBy原语:它通过按字段值对元组进行分组来定义拆分,并且它通过将“ groupedBy”值作为对multiget的查询关键字来控制每个并行流更新的数据。

下图在房间占用示例中对此进行了说明(简化为每个房间仅存储一个时间线,而不是每个一小时的时间片一个时间线):
storm-b​​log-groupby-11
但是,并行性并没有完全发生(例如,当前的Storm实现在分组流中依次调用每个reducer / combiner),但这是设计拓扑时要牢记的一个好模型。

有趣的是,在groupBy和multiget之间发生了一些Storm魔术。 回想一下,Storm旨在进行大规模分布,这意味着每个流在多个节点上并行执行,从诸如Hadoop HDFS或分布式Kafka队列之类的分布式数据源获取输入数据。 这意味着groupBy()同时在多个节点上执行,所有可能处理的事件都需要组合在一起。 groupBy是一个重新分区操作 ,可确保将所有需要分组的事件发送到同一节点,并由IBackingMap +组合器或约简器的同一实例处理,因此不会发生争用情况。

同样,Storm要求我们将IBackingMap包装到可用的Storm MapState原语(或我们自己的原语)之一中,通常用于处理失败/重播的元组。 如上所述,我不在本文中讨论这一方面。

使用这种方法,我们必须实现IBackingMap,以便它尊重以下属性:

  • 对于不同的键值,由multiget读取和由IBackingMap的multiput操作写入的数据库行必须是不同的。

我想这就是他们将这些值称为“关键”的原因 (尽管任何尊重此属性的方法都可以)。

回到例子

让我们看看这在实践中是如何工作的。 该示例的主要拓扑在此处可用:

// reading events
.newStream("occupancy", new SimpleFileStringSpout("data/events.json", "rawOccupancyEvent"))
.each(new Fields("rawOccupancyEvent"), new EventBuilder(), new Fields("occupancyEvent"))

第一部分只是读取JSON格式的输入事件(我正在使用简单的文件输出),对它们进行反序列化,然后使用Java序列化将它们放入称为“ occupancyEvent”的元组字段中。 这些元组中的每一个都描述了用户在房间内或房间外的“ ENTER”或“ LEAVE”事件。

// gathering "enter" and "leave" events into "presence periods"
.each(new Fields("occupancyEvent"), new ExtractCorrelationId(), new Fields("correlationId"))
.groupBy(new Fields("correlationId"))
.persistentAggregate( PeriodBackingMap.FACTORY, new Fields("occupancyEvent"), new PeriodBuilder(), new Fields("presencePeriod"))
.newValuesStream()

当我们遇到correlationId的不同值时,groupBy原语会创建尽可能多的元组组(这可能意味着很多,因为通常最多两个事件具有相同的correlationId)。 当前批处理中具有相同相关ID的所有元组将重新组合在一起,并且一组或几组元组将一起呈现给persistentAggregate中定义的元素。 PeriodBackingMap是IBackingMap的实现,其中实现了multiget方法,该方法将接收下一步将要处理的元组组的所有相关ID(例如:{“ roomA”,“ roomB”,“ Hall ”},如上图所示)。

public List<RoomPresencePeriod> multiGet(List<List<Object>> keys) {return CassandraDB.DB.getPresencePeriods(toCorrelationIdList(keys));
}

该代码只需要从数据库中检索每个相关ID的潜在存在期间即可。 因为我们对一个元组字段进行了groupBy,所以每个List在这里都包含一个单个String:correlationId。 请注意,我们返回的列表必须与键列表的大小完全相同,以便Storm知道哪个周期对应于哪个键。 因此,对于数据库中不存在的任何键,我们只需在结果列表中放置一个空值即可。

一旦加载,Storm就会将一个具有相同相关性ID的元组一个一个地呈现给我们的化简器PeriodBuilder 。 在我们的例子中,我们知道在此批次中,每个唯一的relativeId最多被调用两次,但是一般来说可能更多,或者如果当前批次中不存在其他ENTER / LEAVE事件,则仅被调用一次。 在对muliget()/ multiput()的调用与我们的reducer之间,借助我们选择的MapState实现,Storm让我们可以插入适当的逻辑来重放先前失败的元组。 在以后的文章中有更多的信息……

一旦我们减少了每个元组序列,Storm就会将结果传递给IBackingMap的mulitput(),在这里我们只是将所有内容“追加”到数据库:

public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) {CassandraDB.DB.upsertPeriods(newOrUpdatedPeriods);
}

Storm persistenceAggregate使用我们的化简提供给multitput()的值,自动将其发送到拓扑元组的后续部分。 这意味着我们刚刚建立的在线状态很容易作为元组字段使用,我们可以使用它们直接更新会议室时间线:

// building room timeline
.each(new Fields("presencePeriod"), new IsPeriodComplete())
.each(new Fields("presencePeriod"), new BuildHourlyUpdateInfo(), new Fields("roomId", "roundStartTime"))
.groupBy(new Fields("roomId", "roundStartTime"))
.persistentAggregate( TimelineBackingMap.FACTORY, new Fields("presencePeriod","roomId", "roundStartTime"), new TimelineUpdater(), new Fields("hourlyTimeline"))

第一行只是过滤掉尚未包含“ ENTER”和“ LEAVE”事件的任何期间。

然后, BuildHourlyUpdateInfo实现一对多的元组发射逻辑:对于每个占用期,它仅在“开始时间”内发射一个元组。 例如,从9:47 am到10:34 am在roomA中的占用将在此处触发针对RoomA的9.00am时间轴切片的元组的发射,以及另一个针对10.00am的元组的发射。

下一部分实现了与以前相同的groupBy / IBackingMap方法,只是这次使用了两个分组键而不是一个(因此,mulitget中的List <Object>将包含两个值:一个String和一个Long)。 由于我们存储一个小时的时间轴块,因此上述IBackingMap的必要属性得到了尊重。 多重获取为每个(“ roomId”,“开始时间”)对检索时间线块,然后TimelineUpdater (再次使用reducer)用与当前批次中找到的该时间线片相对应的每个存在时间更新时间线片(这就是BuildHourlyUpdateInfo的一对多元组发射逻辑)和multiput()仅保存结果。

导致咖啡厅占用

当我们看着它时,一切总是更加美丽,所以让我们来绘制房间的占用情况 。 稍加一些R代码 ,我们就可以一分钟一分钟地看到房间的占用情况(这并不意味着什么,因为所有数据都是随机的,但是……):
屏幕截图-2013-07-29-at-15-02-52

结论

希望本文能为维护Storm拓扑中的状态提供一种有用的方法。 我还尝试说明了将处理逻辑实现为小型拓扑元素的实现,将其彼此插入,而不是将一些“冗长的螺栓”捆绑在冗长而复杂的逻辑部分上。

Storm的一个重要方面是它的可扩展性,很可能去插入它的子类或在任何地方插入它的子类来调整其行为。 春天有十年前的那种聪明而有趣的感觉(哦,该死,我现在有点老了……^ __ ^)

参考:来自Svend博客的 JCG合作伙伴 Svend Vanderveken 使用Storm进行的可伸缩实时状态更新 。

翻译自: https://www.javacodegeeks.com/2013/08/scalable-real-time-state-update-with-storm.html

使用storm 实时计算

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/346972.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MFC系列-第22天】GDI算法实战——过渡色

关注公号【逆向通信猿】更精彩&#xff01;&#xff01;&#xff01; 第22天 GDI算法实战 CDC(HDC)绘图类&#xff1a; 五大GDI对象类&#xff1a;CPen&#xff0c;CBrush&#xff0c;CFont&#xff0c;CBitmap&#xff0c;CRgn 22.1 走马灯 设置定时器 SetTimer(1, 20, N…

无服务器:不费吹灰之力!

几年前&#xff0c; 集装箱横扫开发人员&#xff0c;而开发人员的土地就像6级飓风一样 。 码头工人 Rkt 。 其他 。 Docker Swarm 。 K8s 。 OpenShift 。 现在&#xff0c;我们实际上处于震中&#xff0c;但是当我们瞥见地平线时&#xff0c;我们看到另一个人来了&#x…

【MFC系列-第23天】CMemoryDC的封装过程

CDC(HDC)绘图类&#xff1a; 五大GDI对象类&#xff1a;CPen&#xff0c;CBrush&#xff0c;CFont&#xff0c;CBitmap&#xff0c;CRgn 23.1 LoadImage API HANDLE LoadImage(HINSTANCE hinst,LPCTSTR lpszName, UINT uType, int cxDesired,int cyDesired,UINT fuLoad );uT…

JDK 8与JDK 10:三元/拆箱的区别

最近的Nicolai Parlog &#xff08; nipafx &#xff09; 鸣叫引起了我的注意&#xff0c;因为它引用了关于JDK 8和JDK 10之间行为更改的有趣StackOverflow讨论 &#xff0c;并询问“为什么&#xff1f;” SerCe 在StackOverflow线程上引用的问题最终归结为在JDK 8和JDK 10之间…

【多元域乘法】多项式乘法电路原理及MATLAB详解

关注公号【逆向通信猿】更精彩!!! 关于二元域上的两个元素的乘法、多项式除法,在之前的博客 【有限域除法】二元多项式除法电路原理及MATLAB详解 子程序:sub_poly_div.m 【有限域元素加法和乘法】有限域元素加法和乘法的原理及MATLAB实现 子程序:sub_gf_add.m、sub_gf_…

my CSAPP Attack lab堆栈详解

关注公号【逆向通信猿】更精彩!!! 这个实验时学习了简书上的一篇文章后,自己根据课程例子进行的一次小测试,phase 4和5的堆栈图解还没有画,等后续有时间会进行补充。 本人转载的简书原文: https://blog.csdn.net/wlwdecs_dn/article/details/121249364#comments_19237…

Spring MVC教程

1.简介 作为企业Java开发人员&#xff0c;这项工作的主要重点之一是开发Web应用程序。 对于Web应用程序&#xff0c;后果还包括许多挑战。 具体来说&#xff0c;其中一些是状态管理&#xff0c;工作流和验证。 HTTP协议的无状态性质只会使事情变得更加复杂。 Spring的Web框架旨…

【MFC系列-第24天】梯形分页和蝴蝶QQ宠物的实现

CDC(HDC)绘图类&#xff1a; 五大GDI对象类&#xff1a;CPen&#xff0c;CBrush&#xff0c;CFont&#xff0c;CBitmap&#xff0c;CRgn 24.1 梯形分页的双缓冲改进和尺寸自适应 24.2 蝴蝶跟随鼠标点击运动 class CHitFlyDlg : public CDialogEx {CMemoryDC m_dc;//缓冲enu…

ubuntu22.04 下载路径

ftp下载路径 csdn下载 ubuntu22.04下载路径ubuntu-22.04-desktop-amd64.7z.001资源-CSDN文库 ubuntu22.04下载路径ubuntu-22.04-desktop-amd64.7z.002资源-CSDN文库 【免费】ubuntu-22.04-desktop-amd64.7z.003资源-CSDN文库 【免费】ubuntu-22.04-desktop-amd64.7z.004资源-…

camel seda 协议_探索Apache Camel Core – Seda组件

camel seda 协议Apache Camel中的seda组件与我在先前的博客中介绍的direct组件非常相似&#xff0c;但是以异步的方式。 为此&#xff0c;它使用java.util.concurrent.BlockingQueue作为默认实现来使消息排队并与主Route线程断开连接&#xff0c;然后在单独的线程中处理消息。 …

【MFC系列-第25、26天】绘图软件

25.1 绘图软件的绘制原理 纯虚函数&#xff1a;抽象函数&#xff0c;强制在派生类中进行实现&#xff1b; 虚函数&#xff1a;有函数体&#xff0c;可在基类也可在派生类中实现。 基类CLayer class CLayer {//抽象类 public:CLayer();~CLayer();virtual void OnDraw(CDC* pDC…

Java 10:“ var”关键字

Java 10使用关键字var引入了局部变量类型推断 。 这意味着无需编写&#xff1a; Map<Department, List<Employee>> map new HashMap<>(); // ... for (Entry<Department, List<Employee>> dept : map.entrySet()) {List<Employee> emplo…

【MFC系列-第32天】控件自绘技术

32.1 对话框背景设置 方法一 BOOL CClDlg::OnEraseBkgnd(CDC* pDC) {CRect rect;GetClientRect(rect);pDC->FillSolidRect(rect, RGB(200, 255, 255));return TRUE; }方法二&#xff1a;WM_CTRLCOLOR消息 按类型按句柄按控件ID HBRUSH CMFCApplication1Dlg::OnCtlColor(…

Spring Reactor教程

在RESTful服务的世界中&#xff0c;实际上实际上是在幕后进行许多工作&#xff0c;我们通常必须在应用程序中进行很多处理&#xff0c;而实际上并不会影响需要发送给真实用户的响应。 可以被动地做出这些业务决策&#xff0c;以便它们对与应用程序交互的用户没有任何影响。 Spr…

MFC多线程处理界面假死之红外图像数据获取和excel写入

在MFC主界面某个Button Click事件中起一个线程去做处理一些事情,在起的线程运行完毕后,接着跑Click起线程后的代码,已达到按顺序执行,保证时许正确的目的。 问题 通常处理一个线程等待用 WaitForSingleObject,这个放在主界面线程成中会造成主界面“卡死”,其原因是它将…

api签名_使用签名保护基于HTTP的API

api签名我在EMC上的一个平台上可以构建SaaS解决方案。 与越来越多的其他应用程序一样&#xff0c;该平台具有基于RESTful HTTP的API。 使用JAX-RS之类的开发框架&#xff0c;构建这样的API相对容易。 但是&#xff0c; 正确构建它们并不容易。 建立基于HTTP的API的问题 问…

【多元域除法】多项式除法电路原理及MATLAB详解

关注公号【逆向通信猿】更精彩!!! 关于二元域上的两个元素的加法和乘法、多项式除法,在之前的博客 【有限域除法】二元多项式除法电路原理及MATLAB详解 子程序:sub_poly_div.m 【有限域元素加法和乘法】有限域元素加法和乘法的原理及MATLAB实现 子程序:sub_gf_add.m、s…

用Rocker制作模板

在本文中&#xff0c;我们将快速介绍Rocker &#xff0c;这是一个静态类型化的快速Java 8模板引擎。 必需的依赖项 要开始使用Rocker&#xff0c;我们需要在项目中添加以下依赖项&#xff1a; <dependency><groupId>com.fizzed</groupId><artifactId>…

win10高分辨率下修改字体显示大小(不是缩放百分比)

问题 不通过修改设置缩放百分比来增大win10的字体显示大小&#xff0c;缩放百分比调大后会导致很多问题出现&#xff01;&#xff01;&#xff01; 修改 打开设置&#xff0c;或者右键个性化&#xff0c;在搜索栏输入&#xff1a;“放大文本大小”&#xff0c;搜索框下面会自…

【MFC系列-第33天】链接控件自绘技术

33.1 常用的字体复制 方法一 CFont* pFont pDC->GetCurrentFont(); LOGFONT lf; pFont->GetLogFont(&lf); m_fontN.CreateFontIndirect(&lf); lf.lfUnderline TRUE; m_fontT.CreateFontIndirect(&lf);方法二 HFONT hFont (HFONT)GetStockObject(DEFAUL…