Trident State 译文
Trident针对状态化的数据源的读写进行了一流的分装。State可以包含在拓扑中-例如,保存在内存中,有HDFS提供备份-也可以保存在一个外部的数据库中,像Memcached和Cassandra。针对以上的所有情况,Trident的API都是一样的。
为了保证state在更新的过程中遇到重试或者失败处理时任然能够具有幂等性,storm采取了必要的容错。也就是说,storm能够做到每一条消息仅且仅被处理一次。
在进行state更新操作的时候,可以选择不同等级的容错方式;在看这些容错方式之前,让我们来用一个例子说明如何保证仅且仅被处理一次的语意。假设你正在某个流中进行累加的聚合操作,并且准备把聚合的结果保存在数据库中。 现在你在数据库中保存了一个值来表示累加的结果,每处理一个tuple你就对数据库中的值进行一次累加操作。
当失败处理发生的时候,tuples就会被重放。这就给state的更新操作(还有任何会带来副作用的操作)带来了问题--你将无法确定你是否已经基于这个被重发的tuple对state成功地进行了更新操作。也许你还从来没有处理过这个tuple,在这种情况下你就需要对数据库中的值进行一次累加操作。也许你已经成功处理过这个tuple并且对数据库中的值进行过了一次累加操作,但是这个tuple在你更新state之后的某个环节出错了;在这种情况下,你在接收到这个tuple的时候就不应该对数据库中的值进行累加操作了。也可能这个tuple曾经出现过,但是在对数据库中的值进行累加的时候出错了,在这种情况下你需要对数据库中的值进行累加操作。
仅仅在数据库中保存累加的值,你永远无法确定这个tuple是否已经被处理过了。所以你需要更多的信息来帮助你做出正确的决定。Trident提供了一下的语义来帮助用户获得仅且仅被处理一次的语义。
1.所有的tuple都是一小批一小批的发送的(以batch的方式发送)。
2.每一个批量的tuple都会被赋予一个唯一的"transaction id" (txid);加入该批tuple被重播,那么该批tuple仍然保持相同的txid。
3.State的更新在各个批次的tuple之间是有序的,也就是说,只有第2批成功更新以后,第3批才会执行对state的更新操作。
有了这些保障,你自己的state就能够检测到某一批tuple是否被处理过,并选择正确的方式来更新state。你到底要采取什么样的方式来更新state依赖于输入的spout也就是每一个批量的tuple所提供的一致性语义。Storm提供三种容错级别的soput:"non-transactional"(非事务型), "transactional"(事务型), 和 "opaque transactional"(透明事务型)。同样的storm也提供了三种容错级别的state:"non-transactional"(非事务型), "transactional"(事务型), 和 "opaque transactional"(透明事务型)。让我们来看看每一种事务类型的spout,以及通过每种spout你所能获得的容错方式。
Transactional spouts
记住,Trident总是一小批一小批的处理tuple,并且每一个批次有一个唯一的事务ID。Spout的特性有其锁提供的保障措施决定;事务型的spout具有一下特性:
1. 一个txid所对应的batch永远是相同的。同一个txid的重放的batch永远和之前该txid所对应的batch相同。
2. 不同batch中的tuple之间不会存在交集(一个tuple不是属于这个batch,就是属于另一个batch,永远不能同时属于两个以上的batch)。
3. 每一个tuple都一定会在一个batch中被发送(没有任何一个tuple被遗漏)。
这是一种很容易理解的spout类型,一个流被划分成固定的批次,并且永远不会改变。Storm提供了一个针对kafka的事务型spout。
你也许会问:为什么我们不总是使用transactional spout?这很容易理解。一个原因是并不是所有的地方都需要容错的。举例来说,TransactionalTridentKafkaSpout 工作的方式是给定一个txid的batch所包含的一个属于一个topic的来自于所有Kafka partition的tuple序列。一旦这个batch被发出,在任何时候如果这个batch被重新发出时,它必须包含原来所有的tuple以满足 transactional spout的语义。现在我们假定一个batch被TransactionalTridentKafkaSpout所发出,这个batch没有被成功处理,并且同时kafka的一个节点也down掉了。你就无法像之前一样重播一个完全一样的batch(因为kakfa的节点down掉,该topic的一部分partition可能会无法使用),整个处理会被中断。
这也就是"opaque transactional" spouts(不透明事务spout)存在的原因- 他们对于丢失源节点这种情况是容错的,仍然能够帮你达到有且只有一次处理的语义。后面会对这种spout有所介绍。
(当然,在Kafka开启replication功能时,transactional spout也是可以做到容错的;现在的kafka已经完全支持了,所以,上文中所说的当一个节点挂掉以后TransactionalTridentKafkaSpout无法正常工作的情况也就不存在了,也正是因为这样,大部分时间都选择使用了TransactionalTridentKafkaSpout,个人感觉在使用kafka的时候"opaque transactional" spouts确实没有存在的意义)
在讨论 "opaque transactional" spouts之前,让我们先来看看你该如何为transactional spout设计一个具有仅且仅处理一次的state。这个state的类型被称为"transactional state" ,它利用任何txid都永远对应与相同一个批次的tuple的特性。
假设你的拓扑是用来统计单词个数的,并且你将要把统计结果保存在一个key-value数据库中。Key肯定就是对应的单词了,值当然就是统计结果。你已经看到只是存储一个数量是不足以知道你是否已经处理过一个batch的。所以,你需要将txid和统计结果一起保存在值中。那么,当你需要更新统计结果的时候,你只需要比较一下数据库中的txid和当前batch的txid是否相同;如果相同,你就直接跳过更新操作--因为有强顺序的保障,你可以肯定数据库中的值已经包含了当前batch。如果不相同,你就修改统计结果。这个逻辑之所以能说的通是因为batch的txid永远不会改变,并且batch之间有序地对state进行更新操作。
用一个例子来说明这个逻辑为什么行得通,假如你发送了一个txid=3的batch,该batch中包含一下的tuple:
[“man”]
[“man”]
[“dog”]
假设现在数据库中保存这如下的key-value数据:
man => [count=3,txid=1]
dog => [count=4,txid=3]
apple =>[count=10,txid=2]
和man相关联的txid是1;由于当前的batch的txid是3,那么你就可以肯定这批tuple中man 的值还没有累加到数据库中。所以你可以给man的count累加2,并且更新txid为3。然而,dog对应的txid在数据库中和当前batch中 一样,所以你可以肯定对于dog来说当前batch中的值已经在数据库中增加过了。那么就选择跳过更新。在该batch更新后,数据库中的数据如下所示:
man => [count=5,txid=3]
dog => [count=4,txid=3]
apple =>[count=10,txid=2]
接下来我们一起再来看看 opaque transactional spout以及怎样去为这种spout设计相应的state。
Opaque transactional spouts
opaque transactional spout并不能保证每一个txid永远对应一个相同的batch,opaque transactional spout拥有如下特性:
1. 每一个tuple都只会在一个batch中执行通过。也就是说,一个tuple在某一个batch处理失败了,该tuple可能在之后的另一个新的batch中处理成功。
OpaqueTridentKafkaSpout就是一个拥有该特性的spout,该spout允许kafka节点挂掉。每当OpaqueTridentKafkaSpout要发送一个新的batch的时候,它将会从上一个batch所成功发送的tuple的结束点开始发送,这就保证了没有tuple会被遗漏掉,也保证了一个tuple不会被多个batch成功处理。
在使用opaque transactional spouts的时候,再使用和transactional spout相同的处理方式:判断数据库中存放的txid和当前txid去做对比已经不好用了。这是因为在state的更新过程之间,batch可能已经变了。
你能做的就是在数据库中保存更多的状态;除了保存值和txid以外,你还需要保存更新前的值(previous value)。让我们还是用上面的例子来说明这个逻辑。假定你当前batch中的对应count是“2”, 并且我们需要进行一次状态更新。而当前数据库中存储的信息如下:
{
value = 4,
prevValue = 1,
txid = 2
}
假设当前的txid为3,和数据库中的txid(2)不同。在这种情况下,你把“preValue”设置为“value”,然后将value增加2,并更新txid为3。操作过后的数据库内容变成了下面的样子:
{
value = 6,
prevValue = 4,
txid = 3
}
再假设当前的txid为2,和数据库中的txid(2)相同。这时你可以确定数据库中的“value”被之前拥有相同txid的batch更新过,但是之前的batch和现在的batch内容可能不同了。所以你要做的是让“value”的值等于“preValue”加2,操作过后的数据库内容变成了下面的样子:
{
value = 3,
prevValue = 1,
txid = 2
}
--------------------------------------------------------------------------------------------------------------------------
注:这里理解起来可能有些晦涩,举个例子吧。
假设一个batch的大小为3,有下面这么多tuple要进行累加:
[dog] [dog] [man] [man] [man]
假设数据库中现在的信息为:
dog =>{value=2,prevValue=1,txid=1}
man =>{value=3,prevValue=1,txid=1}
然后发送一个txid为2的batch {[dog] [dog] [man]}
然后进行保存操作,
man 成功保存,但是dog保存的时候发生了错误,所以数据库中的信息变成了
dog =>{value=2,prevValue=1,txid=1}
man =>{value=4,prevValue=3,txid=2}
那么失败了,就会有batch的重发,恰好这是负责发送第一个 [dog]的kafka节点坏掉了,batch无法获得第一个[dog]了,那么就只能从第二个dog开始发了,所以发送的batch的txid依然为2,内容为{[dog] [man] [man]}
到这里,dog 的两个txid不同,更新;但是man txid相同了,所以用prevValue+2来更新value;从这里应该可以看出,为什么是这样做了。
更新后的结果如下:
dog =>{value=3,prevValue=2,txid=2}
man =>{value=5,prevValue=3,txid=2}
---------------------------------------------------------------------------------------------------------------------------------
因为Trident保证了batch之间的强顺序性,因此这种方法是有效的。一旦Trident去处理一个新的batch,它就不会重新回到之前的任何一个batch。并且由于opaque transactional spout确保在各个batch之间是没有共同成员的,每个tuple只会在一个batch中被成功处理,你可以安全的在之前的值上进心更新。
Non-transactional spouts
Non-transactional spout(非事务spout)不提供任何的保障。所以在tuple处理失败后不进行重发的情况下,一个tuple可能是最多被处理一次的。同时他也可能会是至少处理一次的,如果tuple在不同的batch中被多次成功处理的时候。无论怎样,这种spout是不可能实现有且只有一次被成功处理的语义的。
Summary of spout and state types
这个图展示了哪些spout和state的组合能够实现有且只有一次被成功处理的语义:
Opaque transactional state有着最为强大的容错性。但是这是以存储更多的信息作为代价的。Transactional states 需要存储较少的状态信息,但是仅能和 transactional spouts协同工作. 最后, non-transactional state所需要存储的信息最少,但是却不能实现有且只有一次被成功处理的语义。
State和Spout类型的选择其实是一种在容错性和存储消耗之间的权衡,你的应用的需要会决定那种组合更适合你。
State APIs
在前面你已经看到了一些用来实现仅且仅执行一次语义的复杂方法,有一个关于Trident的好消息就是,Trident把所有容错的逻辑都在state内部实现了。那么作为一个用户,你就从比较txid,保存多余的值到数据库中,或者任何像它们两个那样的苦差事中脱离了出来。你只需要像下面这样写代码就可以了:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(MemcachedState.opaque(serverLocations),new Count(),new Fields("count")) //重点就是这句了,这里其实使用了mapState,用来做批量的聚合结果的保//存
.parallelismHint(6);
所有管理opaque transactional state的必要逻辑都在MemcachedState.opaque方法内部实现了。另外,更新操作是批量进行的,以减少对数据库的压力。
基础的state接口只有两个方法:
public interface State {
void beginCommit(Long txid); // can be null for things like partitionPersist occurring off //a DRPC stream(放生在DRPC流中的partitionPersist操作中,txid可能为空)
void commit(Long txid);
}
在这个接口所提供的两个方法中,你可以知道什么时候开了对state的更新操作,什么时候完成了对state的更新操作,在每个方法中你都能够获得txid。Trident对你的state是如何工作的没有做出任何的假设(也就是说,你要自己写更新和查询方法)。
加入你自己有一套数据库,并且希望通过Trident来在其中更新、查询用户的位置信息。那么你自己实现的state中就要自己去写更新和查询的方法了:
public class LocationDB implements State {
public void beginCommit(Long txid) {
}
public void commit(Long txid) {
}
public void setLocation(long userId, String location) {
// code to access database and set location
//自己写的向数据库中保存用户位置信息的方法,这个方法会在你自己实现的
//BaseStateUpdater中调用(呵呵,自己实现然后自己调用)
}
public String getLocation(long userId) {
// code to get location from database
//自己写的从数据库中查找用户位置信息的方法,这个方法会在你自己实现的
//BaseQueryFunction中调用(也是自己实现自己调用)
}}
然后你就要实现一个Trdient定义的StateFactory ,使你能够在Trient的task中创建你自己的state。下面是为LocationDB 实现的StateFactory:
public class LocationDBFactory implements StateFactory {
public State makeState(Map conf, int partitionIndex, int numPartitions) {
return new LocationDB();
} }
Trident提供了QueryFunction 用来对state进行查询,提供了StateUpdater 用来对state进行更新操作。让我们来写一个QueryLocation的操作,该操作从LocationDB中查询用户的位置信息。首先来看看那你该如何在拓扑中使用QueryLocation操作。假设你的拓扑接收一个用户的id的输入流。
TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
.stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))
//.stateQuey就是查询了,第一个参数指定了要查询的state(这个state使用LocationDBFactory来创建的,这就是为什么要为你的state建立一个stateFactory了,因为你无法在Trident的API中直接new你的state,你只能new stateFactory,然后Trident会调用其中的makeState方法来创建state);第二个参数就是输入的流的字段,这里把userId输入到操作中;第三个参数就是你自己实现的QueryFunction 用来执行查询操作;第四个参数是输出字段。
好了,现在可以来看看如何来实现一个自己的QueryFunction 了。
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
//查询的方法,下面的代码都是要自己写的
List<String> ret = new ArrayList();
for(TridentTuple input: inputs) {
ret.add(state.getLocation(input.getLong(0)));//每次查询一个,效率不高
}
return ret;//这个ret的类型是你自己定义好的泛型(在类的开始处)
//返回的ret会循环调用下面的execute方法来发送每一个location
}
public void execute(TridentTuple tuple, String location, TridentCollector collector) {
//发送输出数据的方法,输出字段的定义在上面已经完成了,说白了还是一个bolt节点 ps:在新的版本中String location已经变成了一个List了,也就是ret一次都传进来了,在execute方法中进行遍历
collector.emit(new Values(location));
}
}
QueryFunction的执行分为两步:第一步,Trident会收集一个batch的输入数据然后把他们传递给batchRetrieve。在这个例子中,batchRetrieve会接收到很多的用户ID。BatchRetrieve方法需要返回和接收到的batch中的tuple的数量相同的一个list数据。List中的第一个元素对应第一个tuple查询的结果,第二个元素对应第二个tuple查询的结果,以此类推。
也许你会看出上面的代码中没有利用Trident所提供的batch的优势,因为它每次只从LocationDB 中查询一条数据。所以可以把LocationDB 向下面这样优化一下:
public class LocationDB implements State {
public void beginCommit(Long txid) {
}
public void commit(Long txid) {
}
public void setLocationsBulk(List<Long> userIds, List<String> locations) {
// set locations in bulk批量进行更新
}
public List<String> bulkGetLocations(List<Long> userIds) {
// get locations in bulk批量进行查询
}}
有了上面优化后的LocationDB ,那么QueryLocation 就也需要修改一下了:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
List<Long> userIds = new ArrayList<Long>();
for(TridentTuple input: inputs) {
userIds.add(input.getLong(0));
}
return state.bulkGetLocations(userIds);//一次查一批...
}
public void execute(TridentTuple tuple, String location, TridentCollector collector) {
collector.emit(new Values(location));
} }
将QueryLocation 修改为上面的样子以后,就可以大大减少对数据库的请求了。
查询说完了,下面就是如何来更新state了。你要利用StateUpdater 接口来实现自己的目的。下面是例子:
public class LocationUpdater extends BaseStateUpdater<LocationDB> {
public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {//很简单
List<Long> ids = new ArrayList<Long>();
List<String> locations = new ArrayList<String>();
for(TridentTuple t: tuples) {
ids.add(t.getLong(0));
locations.add(t.getString(1));
}
state.setLocationsBulk(ids, locations);
}}
有了上面的代码,你就可以像下面这样在Trident中来更新state了
TridentTopology topology = new TridentTopology();
TridentState locations =
topology.newStream("locations", locationsSpout)
.partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())
第一个参数就是LocationDB对应的stateFactory;第二个参数是输入的流的字段;第三个就是上面写的更新操作了。
partitionPersist 操作会更新一个State。其内部是将 State和一批更新的tuple交给StateUpdater,由StateUpdater完成相应的更新操作。
在这段代码中,只是简单的从输入的tuple中提取处userid和对应的location,并一起更新到State中。
partitionPersist 会返回一个TridentState对象来表示被这个Trident topoloy更新过的locationDB。 然后你就可以使用这个state在topology的任何地方进行查询操作了。
同时,你也可以看到我们传了一个TridentCollector给StateUpdaters。 emit到这个collector的tuple就会去往一个新的stream。在这个例子中,我们并没有去往一个新的stream的需要,但是如果你在做一些事情,比如说更新数据库中的某个count,你可以emit更新的count到这个新的stream。然后你可以通过调用TridentState#newValuesStream方法来访问这个新的stream来进行其他的处理。
persistentAggregate
persistentAggregate是另一个用来更新state的方法, 你在之前的word count例子中应该已经见过了,如下:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
persistentAggregate是在partitionPersist之上的另外一层抽象。它知道怎么去使用一个Trident 聚合器来更新State。在这个例子当中,因为这是一个group好的stream,Trident会期待你提供的state是实现了MapState接口的。用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中。MapState接口看上去如下所示:
public interface MapState<T> extends State {
List<T> multiGet(List<List<Object>> keys);
List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
void multiPut(List<List<Object>> keys, List<T> vals);}
当你在一个未经过group的stream上面进行聚合的话,Trident会期待你的state实现Snapshottable接口:
public interface Snapshottable<T> extends State {
T get();
T update(ValueUpdater updater);
void set(T o);
}
MemoryMapState 和 MemcachedState 都实现了上面的2个接口。(自己写的mapState也会实现上面的两个接口)
Implementing Map States
在Trident中实现MapState是非常简单的,它几乎帮你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 类实现了所有相关的逻辑,包括容错的逻辑。你只需要将一个IBackingMap 的实现提供给这些类就可以了。IBackingMap接口看上去如下所示:
public interface IBackingMap<T> {
List<T> multiGet(List<List<Object>> keys);
void multiPut(List<List<Object>> keys, List<T> vals);
}
OpaqueMap's会用OpaqueValue的value来调用multiPut方法,TransactionalMap's会提供TransactionalValue中的value,而NonTransactionalMaps只是简单的把从Topology获取的object传递给multiPut。
Trident还提供了一种CachedMap类来进行自动的LRU 缓存。
另外,Trident 提供了 SnapshottableMap 类将一个MapState 转换成一个 Snapshottable 对象.(用来对没有进行group by 的流进行全局汇总)
大家可以看看 MemcachedState的实现,从而学习一下怎样将这些工具组合在一起形成一个高性能的MapState实现。MemcachedState是允许大家选择使用opaque transactional, transactional, 还是 non-transactional 语义的。
Ps:翻译的内容就这么多了,其实网上翻译的很多,但是看了以后并不能给很多新手带来一些帮助(原文写的太高深了)。努力翻译了一下,但是还是觉得有很多没有说清楚,下面会抽时间把storm官方提供的 hbase相关的trident state的源代码解读一下,我觉得只有解读一下这个源代码,才会让人更加清晰 state当地怎么用,以及如何写自己的state。