这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法。 我在这里关注代码设计,而不是监督或冗余之类的部署良好实践。
由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不移至下一个数据。 在这种情况下,错误处理归结为(或没有)报告此错误,并在以后(或没有)重试处理失败的输入数据。 这篇文章的第1部分是关于这方面的。
这意味着在处理元组时,通常很难确定它是我们第一次遇到它还是它的内容已经部分地应用于持久性。 因此,我们需要使状态更新操作成为幂等,这是本文的第二部分。
不要对这篇文章的大小印象深刻,Storm实际上为我们完成了大部分工作。 真正需要做的只是了解如何以合理的方式插入东西。
这篇文章基于Storm 0.9,Cassandra 2.0.4和Kafka 0.7。 我在github上放置了一个玩具项目 ,以说明下面讨论的几点。 该项目实际上是根据我在上一篇文章中介绍的“房间存在”示例改编的 。
第1部分:处理错误情况
决定何时要求重试
第一个简单的错误处理策略是简单地接受运行时错误导致的计算质量下降。 例如,如果拓扑在最近的滑动窗口上计算一些实时趋势估计,或者如果我们已经在处理诸如Twitter公开流之类的采样数据,则可能是这种情况。 如果我们选择忽略此类错误,则实现起来非常简单,只需用大量的try / catch包装拓扑逻辑,以某种方式报告错误,并且不要让任何事情冒充Storm。
但是,在大多数情况下,我们关心一致性,因此必须对尝试重试或不尝试失败的数据做出谨慎的决定。
运行时错误的一个典型示例是入站数据格式问题。 在那种情况下,重试当然是没有意义的,因为它不会第二次变得更好。 相反,我们应该记录故障数据,并可能要求某些人进行调查。 这是我的玩具项目中BytesToString Storm函数的一个简单示例:
public class BytesToString extends BaseFunction {
@Overridepublic void execute(TridentTuple tuple, TridentCollector tridentCollector) { try { String asString = new String((byte[]) tuple.getValueByField("bytes"), "UTF-8"); tridentCollector.emit(new Values(asString)); } catch (UnsupportedEncodingException e) { logger.err("ERROR: lost data: unable to parse inbound message from Kafka (expecting UTF-8 string)", e); } }
另一方面,如果错误与某些不可访问的外部数据源有关,例如由网络分区引起的错误,我们应按下一节所述触发重试。
除上述两种错误外,还有许多其他类型的错误,但要点仍然是:区分可重试错误与不可重试错误并做出相应反应很重要。
最后一点,当您决定不报告在IBackingMap的multiget中发生的错误时,请格外小心 ,因为该函数必须返回与输入键列表大小相同的列表。 因此,如果出现不可重试的错误,我们必须以某种方式返回某些结果。 在大多数情况下,如果我们选择不重试这种情况下的错误,那是因为某些过去的错误已经在持久性方面破坏了某些内容,并且为时已晚。 在下面的示例中,由于对从DB读取的某些数据进行的解析失败而发生错误,并且代码仅返回null值,这等同于考虑到持久性没有任何作用(至少没有用处)。 另请参阅下面的第3部分,以了解针对这种情况的可能解决方案。
@Override public List<OpaqueValue> multiGet(List<List<Object>> keys) { try { return Utils.opaqueStringToOpaqueValues(opaqueStrings, HourlyTimeline.class); } catch (IOException e) { logger.err("error while trying to deserialize data from json => giving up (data is lost!)", e); return Utils.listOfNulls(keys.size()); // this assumes previous state does not exist => destroys data! }
}
(好吧,来自TimelineBackingMap的这段代码实际上将所有数据替换为null,这使情况变得更糟,但这是一个玩具项目……)
导致三叉戟元组被重播…
一旦确定触发元组重播是合理的,我们只需要询问它,Storm就会做其他所有事情(只需插入正确的喷嘴,请参阅下一节)。 从技术上讲,这很简单:从功能或过滤器之类的Trident原语中触发重试就像抛出FailedException一样简单,就像玩具项目中的TimeLineBackingMap中一样,其中包括重试和非重试错误的示例(请注意,代码下面来自TimelineBackingMap的示例假定任何数据库错误都是可重试的,这过于简化了):
@Override public void multiPut(List<List<Object>> keys, List<OpaqueValue> timelines) {;List<OpaqueValue> jsonOpaqueTimelines; try { jsonOpaqueTimelines = Utils.opaqueValuesToOpaqueJson(timelines); } catch (IOException e) { System.err.println("error while trying to serialize data to json => giving up (data is lost!)"); return; }if (jsonOpaqueTimelines != null) { try { DB.put("room_timelines", toSingleKeys(keys), jsonOpaqueTimelines); } catch (Exception e) { logger.err("error while storing timelines to cassandra, triggering a retry...", e); throw new FailedException("could not store data into Cassandra, triggering a retry...", e); } }
};
然后,Storm会将错误传播回喷嘴,以强制重播元组。 如果我们希望在Storm UI中报告错误,则可以抛出ReportedFailedException。
我强烈不建议使用的另一种方法是让任何其他类型的RuntimeException冒泡到Storm。 这本质上以更高的性能成本实现了相同的结果:它将触发工作节点崩溃,并且Nimbus将自动重启,并且所有spout将恢复从最新的已知成功索引中读取(spout实现(如Kafka spout将其最新成功处理的偏移存储在zookeeper中)为了这个目的)。 这种快速失败策略是Storm设计的一部分(请参阅有关工人监督和容错的文档)。 从本质上讲,这实现了与让spout重播某些元组相同的一致性保证,但是对性能的影响当然更大,因为我们具有完整的JVM重新启动并重置了所有当前正在运行的拓扑实例。 因此,切勿故意这样做。 仍然令人放心的是,如果我们的节点崩溃,数据不会中断,并且流量自然会继续。
Storm决定重播元组的第三种情况是它们是否在配置的超时之前未到达拓扑的末尾。 更确切地说,如果未按时收到ACK,则该机制实际上是由发出该元组的spout触发的,因此,如果元组成功处理但由于某些网络分区ACK无法到达该spout,则也可以触发这些重播。 用于控制此设置的Storm参数是topology.enable.message.timeouts
和topology.message.timeout.secs
,根据defaults.yaml的默认值为“ true”和30秒。 这只是为什么拓扑中的幂等性如此重要的又一个原因。
…并实际上重播元组
一旦失败通知到达喷嘴(或在超时情况下由通知生成),我们需要确保失败的元组将被重播。 除非您自己开发喷嘴,否则只能归结为选择正确的喷嘴口味 。 此选择会影响元组的重播(或不重播)方式,因此它必须与适当的策略保持一致,以处理拓扑中的已重播的元组,这是下一部分的主题。 有3种喷口:
- 非事务性:无保证,但如果您选择的实现提供“至少一次”保证,在某些情况下它们仍然有用
- 事务性的:不建议使用,因为它们在某些分区情况下可能会阻止拓扑
- opaque(不透明):就重播而言,它们达到元组至少会被播放一次,但在重播方面提供了弱保证,但在重播的情况下,发出的批次可能会不同。 在实践中,使用它们时,我所建议的所有重要事项是确保拓扑对于这种灵活的重放具有鲁棒性,这将在下一部分中进行讨论。
关于元组和批处理重播的最后说明
我在元组级别上进行了讨论,因为这使设计决策更简单。 实际上,要求Storm重播单个元组将触发同一批中包含的许多其他元组的重播,其中一些可能没有错误。
第2部分:重播元组的幂等处理
故事的另一面是,既然我们知道元组可能会被处理几次,请确保拓扑是幂等的,即,发送相同元组的次数不会使状态不一致。 没有副作用的拓扑部分当然不受元组重播的影响。
关于状态一致性的Storm Trident文档非常清楚,因此我在这里仅添加一些内容。
如果我们的状态更新操作已经幂等
如果状态更新操作本质上已经是幂等的,那么它已经具有元组重播的弹性,并且不需要Storm特殊机制。
如果id值完全基于入站元组内容,则任何“按id存储”操作都是这种情况。 例如,在我的玩具项目中,我存储了占用会话,这些会话的主键是从入站事件中找到的相关ID派生的,因此在这种情况下,写操作已经可以重播了,因为任何重播都只会覆盖相同的现有数据信息而不会破坏任何数据(假设我们有订购保证,在这种情况下是正确的)。
public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) { DB.upsertPeriods(newOrUpdatedPeriods);
}
在CassandraDB.java中:
try { PreparedStatement statement = getSession().prepare("INSERT INTO presence (id, payload) values (?,?)"); execute(new BoundStatement(statement).bind(rpp.getId(), periodJson)); } catch (Exception e) { logger.error("error while contacting Cassandra, triggering a retry...", e); new FailedException("error while trying to record room presence in Cassandra ", e); }
同时使read-update-write操作成为幂等
我在先前的博客文章中描述了Storm如何使我们能够实现执行以下操作而不需要DB锁并且仍然避免出现竞争情况:
- 从数据库读取以前的状态,
- 根据新的元组数据更新内存中的状态,
- 将新状态保存到数据库
风暴的美丽之处在于,为了处理重播的元组而不破坏状态,我们只需要调整步骤1和3。这是非常重要的:我们现在可以在步骤2中实现所有处理逻辑,就像每个元组只被播放一次,然后根本不关心重播(只要我们是“纯”的,请参见下面的评论…)。 这就是“风暴只有一次语义”的含义。
而且,如果我们在内部实现1和3,则使它们重播即可,只是将它们与现有的Storm类包装在一起即可。 最健壮的方式是使用Opaque逻辑,但代价是每个状态存储两次状态,如Trident文档中关于transaction spout的说明 。
更好的是,已经有很多不透明的BackingMap实现可用于Storm-contrib中的诸如Cassandra或Mysql的许多后端,因此,在大多数情况下,除了选择正确的之外,实际上没有任何其他事情可做。
最重要的一点是,要使用处理重播元组的不透明BackingMap,必须使用尊重不透明先决条件的喷嘴,如本矩阵所述 。
如果由于某种原因需要实现自己的BackingMap,我们唯一要做的就是使它存储数据的当前和先前版本以及交易ID。 这是我的玩具项目中的一个简单示例(但实际上,在编写类似代码之前,请考虑一下Storm-contrib ):
public void put(String table, List<String> keys, List<OpaqueValue> opaqueStrings) {;// this should be optimized with C* batches... for (Pair<String, OpaqueValue> keyValue : Utils.zip(keys, opaqueStrings)) { PreparedStatement statement = getSession().prepare(format("INSERT INTO %s (id, txid, prev, curr) values (?, ?, ?, ?)", table)); OpaqueValue opaqueVal = keyValue.getValue(); execute(new BoundStatement(statement).bind(keyValue.getKey(), opaqueVal.getCurrTxid(), opaqueVal.getCurr(), opaqueVal.getPrev())); }
}
public List<OpaqueValue> get(String table, List<String> keys) {;List<OpaqueValue> vals = new ArrayList<>(keys.size()); ResultSet rs = execute(format("select id, txid, prev, curr from %s where id in ( %s ) ", table, toCsv(keys) )); Map<String, OpaqueValue> data = toMapOfOpaque(rs); for (String key: keys){ vals.add(data.get(key)); }return vals; }
然后,要真正获得Trident的一次语义,唯一要做的就是将其包装在OpaqueMap中,如下所示:
public static StateFactory FACTORY = new StateFactory() { public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return OpaqueMap.build(new TimelineBackingMap(new CassandraDB(conf))); } }
幕后发生的事情是, OpaqueMap将根据与当前批处理元组关联的事务ID和在持久性中找到的事务ID,选择要显示给我们的更新逻辑的先前存储的状态(“ curr”或“ prev”)。 该事务ID是由喷嘴提供的,因此这就是保持喷嘴与状态选择对齐如此重要的原因:状态对每个事务ID的含义进行假设。
不要破坏前一个实例!
让我们回到上面提到的read-update-write序列的步骤2。 既然我们知道不透明逻辑需要存储任何状态的新版本和旧版本,请查看以下Reducer代码并尝试确定其损坏原因:
public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { LocationChangedEvent event = (LocationChangedEvent) tuple.getValueByField("occupancyEvent");;if (ENTER == event.getEventType()) { curr.setStartTime(event.getTime()); // buggy code } else { curr.setEndTme(event.getTime()); // buggy code } return curr; }
函数式编程的专家称其为“不纯”方法,因为它会修改其输入参数。 它破坏Storm不透明逻辑的原因是,现在“当前”和“先前” java引用实际上都引用内存中的同一实例。 因此,当不透明逻辑同时保留某个状态的先前版本和当前版本时,实际上它保存的是新版本的两倍,因此先前的版本丢失了。
更好的实现可能是这样的
public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { LocationChangedEvent event = (LocationChangedEvent) tuple.getValueByField("occupancyEvent");;RoomPresencePeriod updated = new RoomPresencePeriod(curr); // copy constructor if (ENTER == event.getEventType()) { updated.setStartTime(event.getTime()); } else { updated.setEndTme(event.getTime()); } return updated; }
第3部分:人为错误:全部重播
最后一点,我们必须谦虚地意识到,无论我们采取了多少上述努力和保障,我们仍然会在生产环境中部署错误(对此,我发誓,抱歉!)。 对于数据处理平台,错误可能意味着破坏数据的错误,当数据是我们的业务时,这是很糟糕的。 在某些情况下,我们只会发现事实之后数据已损坏,就像上面有关multiget的注释中所述。
内森·马兹(Nathan Marz)在他的《 大数据》书中 ,描述了一个简单的基于Lambda架构的“重播所有”想法,以解决该想法。 这本书的简短摘要也可以在这里找到 。
翻译自: https://www.javacodegeeks.com/2014/02/error-handling-in-storm-trident-topologies.html