Trident API 概览
在网上看到了很多有TRIDENT相关API的翻译,看来看去,总觉得没有说清楚很多东西,所以自己结合使用的经验翻译了一篇出来;翻译完以后,也发现
在自己的翻译中也有很多地方是表达不清楚的··不过多少感觉有些个人的理解编织在里面了。大侠们勿喷哈!
原文地址:http://storm.apache.org/releases/1.1.0/Trident-API-Overview.html
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
在Trident中核心的数据模式是“流”,以一系列batch的方式进行处理。一个流分区在集群中不同的节点上,并且对于流的操作在各个分区上并行执行。
在Trident中有五类操作:
1.在本地的分区上执行的操作,不会引起网络传输;
2.对一个流进行重新分区但并不改变流中的内容(会有网络传输);
3.将网络传输作为操作的一部分的聚合操作;
4.在分组后的流上进行的操作;
5.合并和链接(原文:Merges and joins)
本地分区操作
本地分区操作不引起网络传输,独立运行于每一个批量分区中
Function:
一个函数接收一批输入字段并且发送零个或者更多的tuple来作为输出。输出tuple的字段被追加到原始的输入tuple的字段后面。假如一个函数不发送任何的tuple,原始输入的
tuple就会被过滤掉。否则,原始输入tuple中的字段会被包含在每一个输出tuple中。假如你有一个像下面一样的函数:
public class MyFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
for(int i=0; i < tuple.getInteger(0); i++) {//获取原始输入tuple中的第一个字段的值, //然后i从0开始,如果i小于这个值,那么发送一个新的tuple;
//tuple中显示的发送一个i的值
collector.emit(new Values(i));
}
}
}
现在假设你在mystream(Trident的一个拓扑)变量中有一个流,包含的字段有["a","b","c"],并且有如下的3个tuple要经过该函数:
[1, 2, 3]
[4, 1, 6]
[3, 0, 8]
假如你运行如下代码:
mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))//输入流中使用 "b"这个字段的值,输出流中申明一个"d"字段的值
附加解释
运行过程如下:
首先[1, 2, 3]进去了函数中,取到"b"字段中的值为2,那么可以发送两个tuple,其中 "d"字段的值分别为0 ,1 同时由于原始输入tuple中的字段会被保留,所以输出的两个tuple为如下格式:[1,2 ,3 ,0] [1,2,3,1];同理然后[4,1,6]进入函数,输出流为[4,1,6,0] 最后[3,0,8]进入函数,由于不满足循环条件,没有输出tuple;所以[3,0,8]被直接过滤掉了。
进过该函数处理的输出tuple拥有一下字段 ["a", "b", "c", "d"],输出的tuple看起来是这个样子的:
[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]
Filter:
过滤器接收一个tuple,并决定是否保留该tuple。假设你有这样一个过滤器:
public class MyFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {//返回true就会被保留,返回false就不会被保留了
return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;//判断条件是第一个字段值为1,第二个字段值为2的tuple才会被保留下来
}
}
现在假设你有一些拥有 ["a", "b", "c"]这些字段的tuple,他们的值如下:
[1, 2, 3]
[2, 1, 1]
[2, 3, 4]
那么如果你运行如下代码:
mystream.filter(new MyFilter())
经过处理后的输出结果tuple就会变成下面这样:
[1, 2, 3]
map和flatMap(这两个函数比较新,在比较旧的storm版本中没有这两个函数)
map返回一个由提供的mapping函数对原始输入流作用后的结果所组成的流(注:意味着原来字段对应的值被替换掉了),这种操作可以应用于一个流转换为另一个流(一对一);
举个例子,假如这里有一个单词流,并且你想把这个单词流中的值转换为大写的方式,你就可以像下面这样定义一个mapping函数了:
注:文中所提到的单词流,个人理解应该是这样的 ["world"] -----> [a],[b],[c]
public class UpperCase extends MapFunction {
@Override
public Values execute(TridentTuple input) {
return new Values(input.getString(0).toUpperCase());
}
}
这个mapping函数可以被应用于流上,来产生一个把原始输入流中的单词转换为大写形式的新流;
mystream.map(new UpperCase())
注:个人理解------处理后的结果是 ["world"]------> [A],[B],[C]
flatMap和map很相似,但是拥有将一个流转换为多个流的能力(一对多),然后把生成的元素平压到一个新的流中。(注:这句话怎么理解呢?额,不好表达清楚,有厉害的可以帮忙翻译翻译;我有一些个人的理解,但是还没想好怎么组织语言)
举个例子,有一个句子的流,而且你想你想把这个句子的流转换为单词的流,那么你就需要像下面这样来定义flatMap函数:
public class Split extends FlatMapFunction {
@Override
public Iterable<Values> execute(TridentTuple input) {//其实函数看起来很简单
List<Values> valuesList = new ArrayList<>();
for (String word : input.getString(0).split(" ")) {
valuesList.add(new Values(word));
}
return valuesList;
}
}
这个flatMap函数可以作用于一个句子流,然后生成一个单词流:
mystream.flatMap(new Split())
当然这些操作完全支持链式调用,那么你就可以通过如下的方式来将一个句子流转换一个大写单词流:
mystream.flatMap(new Split()).map(new UpperCase())
如果你不把输出字段作为参数传递给map和faltMap,map和faltMap会把输入字段作为输出字段使用
假如你想用新的输出字段来替换旧的输入字段,那么你可以像下面这样在调用方法的时候,增加一个Fields参数
mystream.map(new UpperCase(), new Fields("uppercased"))
输出流会忽略输入流中的字段,并只保留 "uppercased"这个字段。flatMap同理,例子如下:
mystream.flatMap(new Split(), new Fields("word"))
Peek:(这个函数比较新,在比较旧的storm版本中没有这个函数)
peek用来对流中流过的每一个Trident tuple做一些额外的操作,这个功能在debug中会很有用,当tuple经过管道中的某个特定点的时候你可以观察到这些tuple。
举个例子,下面的代码将会在单词被转换为大写的结果传递给groupBy之前打印他们:
mystream.flatMap(new Split()).map(new UpperCase())
.peek(new Consumer() {
@Override public void accept(TridentTuple input) {//这个函数中,你只能获得tuple,然后用这个tuple的数据做一些事情,
//比如打印出来看一看,发送个电子邮件什么的.但是你不可能对流产生任何的影响
System.out.println(input.getString(0)); } })
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
min 和 minBy :(这两个函数比较新,在比较旧的storm版本中没有这两个函数)
min和minBy可以返回一个trident流中一个分区中一批tuple中的最小值;
假如,一个trident流中包含字段["device-id","count"]并且以分区的方式发送流;
Partition 0:
[123, 2]
[113, 54]
[23, 28]
[237, 37]
[12, 23]
[62, 17]
[98, 42]
Partition 1:
[64, 18]
[72, 54]
[2, 28]
[742, 71]
[98, 45]
[62, 12]
[19, 174]
Partition 2:
[27, 94]
[82, 23]
[9, 86]
[74, 37]
[51, 49]
[37, 98]
当binBy操作像下面这样应用与上面的流中的tuple上时,结果是在每个分区上发送count是最小值的tuple。
mystream.minBy(new Fields("count"))
上面代码在3个分区中运行的结果是:
Partition 0:
[123, 2]
Partition 1:
[62, 12]
Partition 2:
[82, 23]
你可以在public <T> Stream minBy(String inputFieldName, Comparator<T> comparator)和public Stream min(Comparator<TridentTuple> comparator)方法中查看其他min和minBy操作;
下面的例子演示了这些API是如何使用不同的比较器来找出一批tuple中的最小值的:
FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
TridentTopology topology = new TridentTopology();
Stream vehiclesStream = topology.newStream("spout1", spout).each(allFields,new Debug("##### vehicles"));
Stream slowVehiclesStream =vehiclesStream .min(new SpeedComparator()) // Comparator w.r.t speed on received tuple..each(vehicleField, new Debug("#### slowest vehicle"));
vehiclesStream.minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple..each(vehicleField, new Debug("#### least efficient vehicle"));
//这两个类的地址在:https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
static class SpeedComparator implements Comparator<TridentTuple>, Serializable {
@Override
public int compare(TridentTuple tuple1, TridentTuple tuple2) {
Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);
Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);
return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
}
}
static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable {
@Override
public int compare(TridentTuple tuple1, TridentTuple tuple2) {
Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);
Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);
return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
}
}
max和maxBy:(这两个函数比较新,在比较旧的storm版本中没有这两个函数)
max 和 maxBy 可以返回一个trident流中一个分区中一批tuple中的最大值;
假如,一个trident流中包含字段["device-id","count"]并且以分区的方式发送流;
Partition 0:
[123, 2]
[113, 54]
[23, 28]
[237, 37]
[12, 23]
[62, 17]
[98, 42]
Partition 1:
[64, 18]
[72, 54]
[2, 28]
[742, 71]
[98, 45]
[62, 12]
[19, 174]
Partition 2:
[27, 94]
[82, 23]
[9, 86]
[74, 37]
[51, 49]
[37, 98]
当maxBy操作像下面这样应用与上面的流中的tuple上时,结果是在每个分区上发送count是最大值的tuple。
mystream.maxBy(new Fields("count"))
上面代码在3个分区中运行的结果是:
Partition 0:
[113, 54]
Partition 1:
[19, 174]
Partition 2:
[37, 98]
你可以在public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator)和public Stream max(Comparator<TridentTuple> comparator)方法中查看其他max和maxBy操作;
下面的例子演示了这些API是如何使用不同的比较器来找出一批tuple中的最大值的:
FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
TridentTopology topology = new TridentTopology();
Stream vehiclesStream = topology.newStream("spout1", spout). each(allFields, new Debug("##### vehicles"));
vehiclesStream
.max(new SpeedComparator()) // Comparator w.r.t speed on received tuple.
.each(vehicleField, new Debug("#### fastest vehicle"))
.project(driverField)
.each(driverField, new Debug("##### fastest driver"));
vehiclesStream
.maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple.
.each(vehicleField, new Debug("#### most efficient vehicle"));
########两个比较器和min、minBy中用到的比较器是一样的
Windowing:
Trident可以处理在同一个window中的一批一批的tuple,并且将汇总结果发送到下一个操作。这里有两种类型的window,分别是Tumblingwindow和SlidingWindow;两者都支持基于处理时间的或者是基于tuple的个数
的Window划分。
Tumbling window:
tuple被基于处理时间或者tuple的count值,分配在一个单独的Window中;任何的tuple都只可能属于一个Window。
/**
* 返回一个包含tuple个数为windowCount的tummbling window中每一个tuple的汇总结果所组成的流
*/
public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,Fields inputFields, Aggregator aggregator, Fields functionFields);
/**
* 返回一个时间跨度为windowDuration的tummbling window中每一个tuple的汇总结果所组成的流
*/
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,Fields inputFields, Aggregator aggregator, Fields functionFields);
Sliding window:
Tuple被分组到各个window中,并且window每隔一定的时间间隔进行一次滑动。一个tuple可以属于一个或者多个window。
/**
* 数为windowCount的sliding window中每一个tuple的汇总结果所组成的流,并将sliding window向后滑动slideCount
*/
public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
/**
* 返回一个时间跨度为window向后滑动windowDuration的sliding window中每一个tuple的汇总结果所组成的流,并将sliding window向后滑动windowDuration
*/
public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
Common windowing API:
下面是接受任何被支持的windowing configuration的公共windowing API:
public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,Aggregator aggregator, Fields functionFields)
windowConfig 可以是下面这几种类型:
1.SlidingCountWindow.of(int windowCount, int slidingCount)
2.SlidingDurationWindow.of(BaseWindowedBolt.Duration windowDuration,BaseWindowedBolt.Duration slidingDuration)
3.TumblingCountWindow.of(int windowLength)
4.TumblingCountWindow.of(int windowLength)
Trident windowing APIS 需要WindowsStoreFactory 来保存接收到的tuple和汇总值;现在已经提供的一个基础的工厂是基于hbase的HBaseWindowsStoreFactory;它可以被扩展,用来支持不同场景的应用。
HBaseWindowStoreFactory 的例子如下:
// window-state table should already be created with cf:tuples column(要在hbase中提前建立好一个表叫window-state,并且已经在cf列族中添加了tuples列)
HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
new Values("how many apples can you eat"), new Values("to be or not to be the person"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),new Split(), new Fields("word"))
.window(TumblingCountWindow.of(1000), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
.peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
LOG.info("Received tuple: [{}]", input);
}
});
StormTopology stormTopology = topology.build();
partitionAggregate:
partitionAggregate 在每一个批次的tuple的每一个分区上运行一个函数,和function(第一个介绍的那个)不同,partitionAggregate 处理后发送的tuple覆盖了他所接收到的tuple;来看看这个例子:
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
假设输入的流中的tuple包含字段 ["a", "b"],并且有一下这样几个分区的tuple流入了sum函数:
Partition 0:
["a", 1]
["b", 2]
Partition 1:
["a", 3]
["c", 8]
Partition 2:
["e", 1]
["d", 9]
["d", 10]
然后输出的流中只包含一个字段'sum',tuple的内容如下:
Partition 0:
[3]
Partition 1:
[11]
Partition 2:
[20]
这里有三种不同的接口用来定义聚合器:CombinerAggregator, ReducerAggregator, 和 Aggregator.
CombinerAggregator接口定义的内容如下:
public interface CombinerAggregator<T> extends Serializable {
T init(TridentTuple tuple);
T combine(T val1, T val2);
T zero();
}
一个CombinerAggregator只能返回一个tuple并且该tuple只有一个字段。CombinerAggregator在每一个输入的tuple上都会运行init方法来初始化值,然后使用combine方法来combine所有的值,直到只剩下一个
值为止。如果分区内没有任何tuple,CombinerAggregator就发送zero方法产生的值。例如,这是Count的实现:
public class Count implements CombinerAggregator<Long> {
public Long init(TridentTuple tuple) {
return 1L;
}
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
public Long zero() {
return 0L;
}
}
当在aggregate方法中而不是在partitionAggregate方法中使用CombinerAggregators 的时候,你就能感受到它的好处了。在aggregate方法中,Trident会通过在网络之间传递tuple之前进行局部分区聚合的
方式来优化计算。
ReducerAggregator接口的定义如下:
public interface ReducerAggregator<T> extends Serializable {
T init();
T reduce(T curr, TridentTuple tuple);
}
ReducerAggregator在初始化的时候设置一个初始值,然后迭代每一个输入的tuple的value来产生一个只有一个值的单一tuple来作为输出。例如,下面是使用ReducerAggregator实现的Count函数:
public class Count implements ReducerAggregator<Long> {
public Long init() {
return 0L;
}
public Long reduce(Long curr, TridentTuple tuple) {
return curr + 1;
}
}
ReducerAggregator也可以被使用在persistentAggregate方法中,稍后你将会看到。
最最通用的聚合接口就是Aggregator了,接口定义如下:
public interface Aggregator<T> extends Operation {
T init(Object batchId, TridentCollector collector);
void aggregate(T state, TridentTuple tuple, TridentCollector collector);
void complete(T state, TridentCollector collector);
}
Aggregator们可以发送带有任意数量字段的任意数量的tuple。在其方法执行的任何地方都可以发送tuple,Aggregator们按照如下的方式来执行:
1.初始化方法在处理一个batch之前被调用,返回结果是一个用来表示聚合状态的对象,并且会被传递给aggregate和complete方法中。
2.aggregate 方法在每一个batch分区的tuple上运行,这个方法可以更新初始化的那个状态对象,并可以选择性地发送一些消息。
3.complete 方法在batch分区上的所有tuple都被Aggregator处理后调用。
下面的例子演示如何使用Aggregator来实现一个Count:
public class CountAgg extends BaseAggregator<CountState> {
static class CountState {
long count = 0;
}
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count+=1;
}
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}
有时候你想同时执行很多个aggregator,这种方式被称为链式调用,可以像下面这样使用:
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
上面的代码会在每一个分区上运行Count和Sum聚合器,输出将会只有一个tuple,并包含 ["count", "sum"]字段。
stateQuery 和 partitionPersist:
stateQuery 和 partitionPersist 分别可以查询和更新作为数据资源的state;相关介绍在Trident state doc中。
projection:
projection方法作用在流上后,可以使流中只包含方法中指定的字段;假如你有一个包含字段["a", "b", "c", "d"]的流,然后你运行下面的方法:
mystream.project(new Fields("b", "d"))
那么输出的流中就会只包含字段 ["b", "d"]。
Repartitioning operations:
Repartitioning operations(重分区操作)运行一个可以改变tuple在各个任务之间是如何分区的函数,该函数的运行结果可能会改变分区的数目;例如,并行处理数大于重新分区后的分区数的时候。
重分区操作需要进行网络传输,下面是提供的重分区函数:
1.shuffle:随机分配tuple到所有的目标分区中;
2.broadcast:每一个tuple都会被重复的发送到每一个目标分区中;这在DRPC操作用很有用,例如:你需要在每一个分区的data上进行stateQuery的时候
3.partitionBy:该函数接收一批字段,然后根据这批字段进行分区。这批字段会被进行哈希运算然后根据分区个数取模,然后根据运算结果进行分区。该函数保证相同一批字段的tuple一定会去到同一个分区
中。
4.global:所有的tuple都被发送到同一个分区中。流中所有的batch都会选择同一个分区。
5.batchGlobal:在batch中的所有tuple都会进入同一个分区,但是不同的batch中的tuple可能会进入到不同的分区中。
6.partition:该函数接收一个本地化的分区方法,本地化的分区方法需要实现org.apache.storm.grouping.CustomStreamGrouping。
Aggregation operations(集合操作):
Trident有aggregate 和persistentAggregate 方法来提供在一个流上进行聚合操作;aggregate 独立地运行在流中的每一个batch上,persistentAggregate 会运行在流中的所有batch上,并且会把结果
保存在state中。
运行aggregate方法会在流上进行全局的聚合。当你使用ReducerAggregator 和ReducerAggregator 的时候,首先流会被重新分组到一个单独的分区中,然后分区函数在这个单独的分区中运行;然而当你
使用CombinerAggregator的时候,Trident首先会在每一个分区上进行聚合,然后把每个分区的聚合结果重新分区到一个独立的分区中,然后在完成网络传输后完成全局聚合操作。CombinerAggregator比较
高效,你应该尽量的使用它。
这里有一个例子展示如何使用aggregate 来获得某个batch中的全局count:
mystream.aggregate(new Count(), new Fields("count"))
像partitionAggregate一样,aggregate中的聚合器可以以链式的方式进行调用;然而,如果你把一个CombinerAggregator 和一个不是CombinerAggregator 的聚合器链在一起后,storm就无法进行在每个分区
中预先进行聚合操作的优化了。
你可以在Trident state doc中查看persistentAggregate的使用方式。
Operations on grouped streams(在分组流上的操作)
groupBy 操作根据特定的字段运行一个partitionBy 操作来对流进行重新分区,然后在每一个分区中,把特定字段相同的tuple放到一个组中。下面是一个示例图:
如果你在一个分组后的流上运行aggregators ,那么聚合操作会在每一个组中运行,而不是在每个batch中运行。persistentAggregate 也可以运行在一个分组后的流上,在这种情况下
聚合后的结果会被保存在一个 MapState中,该 MapState使用用来分组的字段作为key。在Trident state doc中你可以找到更多答案。
和普通的流一样,运行在分组后的流上的aggregators 也可以进行链式调用。
Merges and joins:
API的最后一部分就是把不同的流结合在一起,最最简单的结合流的方式就是把几个不同的流合并到同一个流中。你可以通过merge 方法(像下面这样)来达到目的:
topology.merge(stream1, stream2, stream3);
Trident会用第一个流的字段来重新命名其他合并的流的字段,在作为新的输出流的字段。
另一种合结合流的方式就是join操作,现在来看一个标准的join操作,就像在SQL中的join操作一样,join要求输入是有限的,所以对于无限地不停地发送的流是不起作用的。在Trident
中的join操作仅仅作用于每一个有spout发出的很小的batch中;
下面的例子在包含字段["key", "val1", "val2"] 的流和包含字段["x", "val1"]的另一个流上进行join操作:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
上面的代码中,把stream1和stream2通过key和xjoin在一起,Trident要求输出流中所有的输出字段都要起名字,因为在输入流中可能会用重复的字段名称。由join操作发出的tuple
会包含如下内容:
1.首先是链接字段的列表。在这里key等同以stream1中的key也等同于stream2中的x;
2.然后就是所有流中没有进行join的字段,这些字段按照传递进来的顺序排序;在这个例子中,a=stream1.val1,b=stream1.val2,c=stream2.val1.
当来自不同的spout的流和并的时候,这些spout会在发送batch上进行同步。也就是说一个要处理的batch会包含所有的参与的spout所发送的tuple。
你也许会好奇,该如何实现一个类似"windowed join"的操作,也就是说,来自一方的tuple和来自另一方的最近一小时的tuple进行join操作。
要实现这样的功能,你需要利用partitionPersist 和stateQuery,最近一小时tuple会被保存并且循环迭代在一个state中,以join操作的field作为key。然后stateQuery 将会通过join的字段查询state中的数据来进行join操作。