Storm Trident拓扑中的错误处理

这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法。 我在这里关注代码设计,而不是监督或冗余之类的部署良好实践。

由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不移至下一个数据。 在这种情况下,错误处理归结为(或没有)报告此错误,并在以后(或没有)重试处理失败的输入数据。 这篇文章的第1部分是关于这方面的。

这意味着在处理元组时,通常很难确定它是我们第一次遇到它还是它的内容已经部分地应用于持久性。 因此,我们需要使状态更新操作成为幂等,这是本文的第二部分。

不要对这篇文章的大小印象深刻,Storm实际上为我们完成了大部分工作。 真正需要做的只是了解如何以合理的方式插入东西。

这篇文章基于Storm 0.9,Cassandra 2.0.4和Kafka 0.7。 我在github上放置了一个玩具项目 ,以说明下面讨论的几点。 该项目实际上是根据我在上一篇文章中介绍的“房间存在”示例改编的 。

第1部分:处理错误情况

决定何时要求重试

第一个简单的错误处理策略是简单地接受运行时错误导致的计算质量下降。 例如,如果拓扑在最近的滑动窗口上计算一些实时趋势估计,或者如果我们已经在处理诸如Twitter公开流之类的采样数据,则可能是这种情况。 如果我们选择忽略此类错误,则实现起来非常简单,只需用大量的try / catch包装拓扑逻辑,以某种方式报告错误,并且不要让任何事情冒充Storm。

但是,在大多数情况下,我们关心一致性,因此必须对尝试重试或不尝试失败的数据做出谨慎的决定。

运行时错误的一个典型示例是入站数据格式问题。 在那种情况下,重试当然是没有意义的,因为它不会第二次变得更好。 相反,我们应该记录故障数据,并可能要求某些人进行调查。 这是我的玩具项目中BytesToString Storm函数的一个简单示例:

public class BytesToString extends BaseFunction {
@Overridepublic void execute(TridentTuple tuple, TridentCollector tridentCollector) { try { String asString = new String((byte[]) tuple.getValueByField("bytes"), "UTF-8"); tridentCollector.emit(new Values(asString)); } catch (UnsupportedEncodingException e) { logger.err("ERROR: lost data: unable to parse inbound message from Kafka (expecting UTF-8 string)", e); } }

另一方面,如果错误与某些不可访问的外部数据源有关,例如由网络分区引起的错误,我们应按下一节所述触发重试。

除上述两种错误外,还有许多其他类型的错误,但要点仍然是:区分可重试错误与不可重试错误并做出相应反应很重要。

最后一点,当您决定不报告在IBackingMap的multiget中发生的错误时,请格外小心 ,因为该函数必须返回与输入键列表大小相同的列表。 因此,如果出现不可重试的错误,我们必须以某种方式返回某些结果。 在大多数情况下,如果我们选择不重试这种情况下的错误,那是因为某些过去的错误已经在持久性方面破坏了某些内容,并且为时已晚。 在下面的示例中,由于对从DB读取的某些数据进行的解析失败而发生错误,并且代码仅返回null值,这等同于考虑到持久性没有任何作用(至少没有用处)。 另请参阅下面的第3部分,以了解针对这种情况的可能解决方案。

@Override public List<OpaqueValue> multiGet(List<List<Object>> keys) { try { return Utils.opaqueStringToOpaqueValues(opaqueStrings, HourlyTimeline.class); } catch (IOException e) { logger.err("error while trying to deserialize data from json => giving up (data is lost!)", e); return Utils.listOfNulls(keys.size()); // this assumes previous state does not exist => destroys data! } 
}

(好吧,来自TimelineBackingMap的这段代码实际上将所有数据替换为null,这使情况变得更糟,但这是一个玩具项目……)

导致三叉戟元组被重播…

一旦确定触发元组重播是合理的,我们只需要询问它,Storm就会做其他所有事情(只需插入正确的喷嘴,请参阅下一节)。 从技术上讲,这很简单:从功能或过滤器之类的Trident原语中触发重试就像抛出FailedException一样简单,就像玩具项目中的TimeLineBackingMap中一样,其中包括重试和非重试错误的示例(请注意,代码下面来自TimelineBackingMap的示例假定任何数据库错误都是可重试的,这过于简化了):

@Override public void multiPut(List<List<Object>> keys, List<OpaqueValue> timelines) {;List<OpaqueValue> jsonOpaqueTimelines; try { jsonOpaqueTimelines = Utils.opaqueValuesToOpaqueJson(timelines); } catch (IOException e) { System.err.println("error while trying to serialize data to json => giving up (data is lost!)"); return; }if (jsonOpaqueTimelines != null) { try { DB.put("room_timelines", toSingleKeys(keys), jsonOpaqueTimelines); } catch (Exception e) { logger.err("error while storing timelines to cassandra, triggering a retry...", e); throw new FailedException("could not store data into Cassandra, triggering a retry...", e); } } 
};

然后,Storm会将错误传播回喷嘴,以强制重播元组。 如果我们希望在Storm UI中报告错误,则可以抛出ReportedFailedException。

我强烈不建议使用的另一种方法是让任何其他类型的RuntimeException冒泡到Storm。 这本质上以更高的性能成本实现了相同的结果:它将触发工作节点崩溃,并且Nimbus将自动重启,并且所有spout将恢复从最新的已知成功索引中读取(spout实现(如Kafka spout将其最新成功处理的偏移存储在zookeeper中)为了这个目的)。 这种快速失败策略是Storm设计的一部分(请参阅有关工人监督和容错的文档)。 从本质上讲,这实现了与让spout重播某些元组相同的一致性保证,但是对性能的影响当然更大,因为我们具有完整的JVM重新启动并重置了所有当前正在运行的拓扑实例。 因此,切勿故意这样做。 仍然令人放心的是,如果我们的节点崩溃,数据不会中断,并且流量自然会继续。

Storm决定重播元组的第三种情况是它们是否在配置的超时之前未到达拓扑的末尾。 更确切地说,如果未按时收到ACK,则该机制实际上是由发出该元组的spout触发的,因此,如果元组成功处理但由于某些网络分区ACK无法到达该spout,则也可以触发这些重播。 用于控制此设置的Storm参数是topology.enable.message.timeoutstopology.message.timeout.secs ,根据defaults.yaml的默认值为“ true”和30秒。 这只是为什么拓扑中的幂等性如此重要的又一个原因。

…并实际上重播元组

一旦失败通知到达喷嘴(或在超时情况下由通知生成),我们需要确保失败的元组将被重播。 除非您自己开发喷嘴,否则只能归结为选择正确的喷嘴口味 。 此选择会影响元组的重播(或不重播)方式,因此它必须与适当的策略保持一致,以处理拓扑中的已重播的元组,这是下一部分的主题。 有3种喷口:

  • 非事务性:无保证,但如果您选择的实现提供“至少一次”保证,在某些情况下它们仍然有用
  • 事务性的:不建议使用,因为它们在某些分区情况下可能会阻止拓扑
  • opaque(不透明):就重播而言,它们达到元组至少会被播放一次,但在重播方面提供了弱保证,但在重播的情况下,发出的批次可能会不同。 在实践中,使用它们时,我所建议的所有重要事项是确保拓扑对于这种灵活的重放具有鲁棒性,这将在下一部分中进行讨论。

关于元组和批处理重播的最后说明

我在元组级别上进行了讨论,因为这使设计决策更简单。 实际上,要求Storm重播单个元组将触发同一批中包含的许多其他元组的重播,其中一些可能没有错误。

第2部分:重播元组的幂等处理

故事的另一面是,既然我们知道元组可能会被处理几次,请确保拓扑是幂等的,即,发送相同元组的次数不会使状态不一致。 没有副作用的拓扑部分当然不受元组重播的影响。

关于状态一致性的Storm Trident文档非常清楚,因此我在这里仅添加一些内容。

如果我们的状态更新操作已经幂等

如果状态更新操作本质上已经是幂等的,那么它已经具有元组重播的弹性,并且不需要Storm特殊机制。

如果id值完全基于入站元组内容,则任何“按id存储”操作都是这种情况。 例如,在我的玩具项目中,我存储了占用会话,这些会话的主键是从入站事件中找到的相关ID派生的,因此在这种情况下,写操作已经可以重播了,因为任何重播都只会覆盖相同的现有数据信息而不会破坏任何数据(假设我们有订购保证,在这种情况下是正确的)。

public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) { DB.upsertPeriods(newOrUpdatedPeriods); 
}

在CassandraDB.java中:

try { PreparedStatement statement = getSession().prepare("INSERT INTO presence (id, payload) values (?,?)"); execute(new BoundStatement(statement).bind(rpp.getId(), periodJson)); } catch (Exception e) { logger.error("error while contacting Cassandra, triggering a retry...", e); new FailedException("error while trying to record room presence in Cassandra ", e); }

同时使read-update-write操作成为幂等

我在先前的博客文章中描述了Storm如何使我们能够实现执行以下操作而不需要DB锁并且仍然避免出现竞争情况:

  • 从数据库读取以前的状态,
  • 根据新的元组数据更新内存中的状态,
  • 将新状态保存到数据库

风暴的美丽之处在于,为了处理重播的元组而不破坏状态,我们只需要调整步骤1和3。这是非常重要的:我们现在可以在步骤2中实现所有处理逻辑,就像每个元组只被播放一次,然后根本不关心重播(只要我们是“纯”的,请参见下面的评论…)。 这就是“风暴只有一次语义”的含义。

而且,如果我们在内部实现1和3,则使它们重播即可,只是将它们与现有的Storm类包装在一起即可。 最健壮的方式是使用Opaque逻辑,但代价是每个状态存储两次状态,如Trident文档中关于transaction spout的说明 。

更好的是,已经有很多不透明的BackingMap实现可用于Storm-contrib中的诸如Cassandra或Mysql的许多后端,因此,在大多数情况下,除了选择正确的之外,实际上没有任何其他事情可做。

最重要的一点是,要使用处理重播元组的不透明BackingMap,必须使用尊重不透明先决条件的喷嘴,如本矩阵所述 。

如果由于某种原因需要实现自己的BackingMap,我们唯一要做的就是使它存储数据的当前和先前版本以及交易ID。 这是我的玩具项目中的一个简单示例(但实际上,在编写类似代码之前,请考虑一下Storm-contrib ):

public void put(String table, List<String> keys, List<OpaqueValue> opaqueStrings) {;// this should be optimized with C* batches... for (Pair<String, OpaqueValue> keyValue : Utils.zip(keys, opaqueStrings)) { PreparedStatement statement = getSession().prepare(format("INSERT INTO %s (id, txid, prev, curr) values (?, ?, ?, ?)", table)); OpaqueValue opaqueVal = keyValue.getValue(); execute(new BoundStatement(statement).bind(keyValue.getKey(), opaqueVal.getCurrTxid(), opaqueVal.getCurr(), opaqueVal.getPrev())); } 
}
public List<OpaqueValue> get(String table, List<String> keys) {;List<OpaqueValue> vals = new ArrayList<>(keys.size()); ResultSet rs = execute(format("select id, txid, prev, curr from %s where id in ( %s ) ", table, toCsv(keys) )); Map<String, OpaqueValue> data = toMapOfOpaque(rs); for (String key: keys){ vals.add(data.get(key)); }return vals; }

然后,要真正获得Trident的一次语义,唯一要做的就是将其包装在OpaqueMap中,如下所示:

public static StateFactory FACTORY = new StateFactory() { public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return OpaqueMap.build(new TimelineBackingMap(new CassandraDB(conf))); } }

幕后发生的事情是, OpaqueMap将根据与当前批处理元组关联的事务ID和在持久性中找到的事务ID,选择要显示给我们的更新逻辑的先前存储的状态(“ curr”或“ prev”)。 该事务ID是由喷嘴提供的,因此这就是保持喷嘴与状态选择对齐如此重要的原因:状态对每个事务ID的含义进行假设。

不要破坏前一个实例!

让我们回到上面提到的read-update-write序列的步骤2。 既然我们知道不透明逻辑需要存储任何状态的新版本和旧版本,请查看以下Reducer代码并尝试确定其损坏原因:

public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { LocationChangedEvent event = (LocationChangedEvent) tuple.getValueByField("occupancyEvent");;if (ENTER == event.getEventType()) { curr.setStartTime(event.getTime());            // buggy code } else { curr.setEndTme(event.getTime());              // buggy code } return curr; }

函数式编程的专家称其为“不纯”方法,因为它会修改其输入参数。 它破坏Storm不透明逻辑的原因是,现在“当前”和“先前” java引用实际上都引用内存中的同一实例。 因此,当不透明逻辑同时保留某个状态的先前版本和当前版本时,实际上它保存的是新版本的两倍,因此先前的版本丢失了。

更好的实现可能是这样的

public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { LocationChangedEvent event = (LocationChangedEvent) tuple.getValueByField("occupancyEvent");;RoomPresencePeriod updated = new RoomPresencePeriod(curr);  // copy constructor if (ENTER == event.getEventType()) { updated.setStartTime(event.getTime()); } else { updated.setEndTme(event.getTime()); } return updated; }

第3部分:人为错误:全部重播

最后一点,我们必须谦虚地意识到,无论我们采取了多少上述努力和保障,我们仍然会在生产环境中部署错误(对此,我发誓,抱歉!)。 对于数据处理平台,错误可能意味着破坏数据的错误,当数据是我们的业务时,这是很糟糕的。 在某些情况下,我们只会发现事实之后数据已损坏,就像上面有关multiget的注释中所述。

内森·马兹(Nathan Marz)在他的《 大数据》书中 ,描述了一个简单的基于Lambda架构的“重播所有”想法,以解决该想法。 这本书的简短摘要也可以在这里找到 。

参考:来自Svend博客的 JCG合作伙伴 Svend Vanderveken 在Storm Trident拓扑中的错误处理 。

翻译自: https://www.javacodegeeks.com/2014/02/error-handling-in-storm-trident-topologies.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/364645.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue路由知识整理

vue路由知识整理 对于单页应用,官方提供了vue-router进行路由跳转的处理.我们已经可以通过组合组件来组成应用程序&#xff0c;当你要把 vue-router 添加进来&#xff0c;我们需要做的是&#xff0c;将组件(components)映射到路由(routes)&#xff0c;然后告诉 vue-router 在哪…

ATS日志说明

ATS日志说明 转&#xff1a;http://www.safecdn.cn/在ATS日志中我们经常遇到形形色色的缓存结果码&#xff0c;为了更清晰地认识它们&#xff0c;相关资料整理到这里&#xff1a;TCP_HIT请求对象的一份合法拷贝被缓存&#xff0c;ATS将发送该对象给clientTCP_MISS请求对象未缓存…

leetcode 134. 加油站(Gas Station)

目录 题目描述&#xff1a;示例 1:示例 2:解法&#xff1a;题目描述&#xff1a; 在一条环路上有 N 个加油站&#xff0c;其中第 i 个加油站有汽油 gas[i] 升。 你有一辆油箱容量无限的的汽车&#xff0c;从第 i 个加油站开往第 i1 个加油站需要消耗汽油 cost[i] 升。你从其中的…

在JUnit中测试预期的异常

单元测试用于验证一段代码是否按照开发人员的预期运行。 有时&#xff0c;这意味着检查代码是否也会引发预期的异常。 JUnit是Java单元测试的标准&#xff0c;并提供了几种验证抛出异常的机制。 本文探讨了这些选项及其相对优点。 以下面的简单代码段为例。 除了编写测试以确保…

php session 效率,大量php session临时文件带来的服务器效率问题

早上流量有点大&#xff0c;网站出口流量大概5M左右&#xff0c;访问质量却不太好&#xff0c;Web响应比较慢&#xff0c;切系统负载很高。检 查了下各web节点&#xff0c;所有web服务器的httpd线程均达到满负荷&#xff0c;很奇怪。因为所有web节点都通过nfs来共享session目录…

安卓第三次作业

<?xml version"1.0" encoding"utf-8"?> <uses-sdkandroid:minSdkVersion"8"android:targetSdkVersion"18" /> <uses-permission android:name"android.permission.RECORD_AUDIO" /> <uses-permissio…

[vue插件]基于vue2.x的电商图片放大镜插件

最近在撸一个电商网站&#xff0c;有一个需求是要像淘宝商品详情页那样&#xff0c;鼠标放在主图上&#xff0c;显示图片放大镜效果&#xff0c;找了一下貌似没有什么合适的vue插件&#xff0c;于是自己撸了一个&#xff0c;分享一下。小白第一次分享&#xff0c;各位大神莫见笑…

洛谷 P1968 美元汇率

传送门 我在下面哦~~ Im here 思路 这是一道比较简单的DP题 美元可由马克转化得到&#xff0c;马克可由美元转化得到&#xff0c;最后要求最大的美元值 我们可以用f数组来记录最大能达到多少马克和多少美元。 定义一个\(f[N][3]\)的数组&#xff0c;第一维表示到达了第i天 \(f[…

玩JerseyTest(Jersey 2.5.1和DI)

我将尝试解释一个简单的REST示例。 这个想法是建立一个基本的架构来开始使用Jersey。 当我开始使用某些框架时&#xff0c;通常会开发一个快速失败的测试环境&#xff0c;这就是我要做的。 下一个示例具有以下功能&#xff1a; 泽西岛2.5.1 依赖注入 用于测试的JUnit 类&a…

MySQL之视图、触发器、事务、存储过程、函数

一. 视图 视图是一个虚拟表&#xff08;非真实存在&#xff09;&#xff0c;是跑到内存中的表&#xff0c;真实表是硬盘上的表&#xff0c;怎么就得到了虚拟表&#xff0c;就是你查询的结果&#xff0c;只不过之前我们查询出来的虚拟表&#xff0c;从内存中取出来显示在屏幕上…

php送数据找不到表,php – 数据源默认值中找不到的模型表

我正在创建一个cakephp 2.x应用程序.在开发过程中途,我突然发现自己有一个“找不到表”的错误.Missing Database TableError: Table blocked for model Parental was not found in datasource default.Notice: If you want to customize this error message, create project\Vi…

break、continue、return的区别

break、continue、return的区别 break&#xff1a;表示中断&#xff0c;可以在switch case中或循环中 使用 当遇到break 则结束当前整个switch case 或 循环 continue&#xff1a;表示继续&#xff0c;只能在循环中使用&#xff0c;当遇到continue时&#xff0c;则结束本次&…

原生JS封装ajax以及request

一、封装原生的xhr为ajax类 xhr以及用法见之前的文章1、根据url确定请求的头部以及别的信息。 var _headerConfig {};if(url.indexOf(getcaptcha) ! -1) {_headerConfig {Accept: image/png,responseType: arraybuffer,}} else if(url.indexOf(files/upload) ! -1) {_headerC…

Java 8 Friday Goodies:SQL ResultSet流

在Data Geekery &#xff0c;我们喜欢Java。 而且&#xff0c;由于我们真的很喜欢jOOQ的流畅的API和查询DSL &#xff0c;我们对Java 8将为我们的生态系统带来什么感到非常兴奋。 我们已经写了一些关于Java 8好东西的博客 &#xff0c;现在我们觉得是时候开始一个新的博客系列了…

java类型转换答案,在java中支持两种类型的类型转换,自动类型转换和强制类型转换。父类转化为子类需要强制转换。...

在java中支持两种类型的类型转换,自动类型转换和强制类型转换。父类转化为子类需要强制转换。更多相关问题计算机病毒通过()传染扩散得极快&#xff0c;危害最大。当一个现象的数量由小变大&#xff0c;另一个现象的数量相反地由大变小&#xff0c;这种相关称为()。输油管道沿线…

Java实现回形数,只利用数组、循环和if-else语句

import java.util.Scanner; public class learn {   public static void main(String[] args){ System.out.println("请输入你需要打印多少阶的回形数:");     Scanner scan new Scanner(System.in);     int num scan.nextInt();    // 接收num,则…

FWT(快速沃尔什变换)学习

https://www.cnblogs.com/cjyyb/p/9065615.html edo直接看大佬的博客吧 不仅有证明之类的还有板子呢 转载于:https://www.cnblogs.com/ENESAMA/p/10109995.html

创建一个简单的JAX-RS MessageBodyWriter

JAX-RS确实很酷&#xff0c;借助JAXB&#xff0c;只需添加带有JAXB批注的批注数据对象&#xff0c;即可为您转换许多响应数据类型。 我对JAXB相当陌生&#xff0c;但是一些简单的注释剪切/粘贴操作将带给您很长的路要走。 出于无法从JAX-RS资源方法返回该数据类型的目的&#…

SpringCloud学习笔记(6)----Spring Cloud Netflix之负载均衡-Ribbon的使用

1. 什么是负载均衡&#xff1f; 负载均衡&#xff0c;就是分发请求流量到不同的服务器。 负载均衡一般分为两种 1. 服务器端负载均衡&#xff08;nginx&#xff09; 2. 客户端负载均衡&#xff08;Ribbon&#xff09; 2. 服务提供者&#xff08;spring-cloud-provider&#xff…

pHp中文网零基础,零基础编程

基础编程conmysql_connect(constant("SERVER"),constant("USERNAME"),constant("PASSWORD"));if(!$this->con){die(connet to mysql error.mysql_error());}else{mysql_select_db(constant("DATABASE"),$this->con);}}function …