Trident API 概览

Trident API 概览

  在网上看到了很多有TRIDENT相关API的翻译,看来看去,总觉得没有说清楚很多东西,所以自己结合使用的经验翻译了一篇出来;翻译完以后,也发现

在自己的翻译中也有很多地方是表达不清楚的··不过多少感觉有些个人的理解编织在里面了。大侠们勿喷哈!

原文地址:http://storm.apache.org/releases/1.1.0/Trident-API-Overview.html

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


Trident中核心的数据模式是“流”,以一系列batch的方式进行处理。一个流分区在集群中不同的节点上,并且对于流的操作在各个分区上并行执行。

 

Trident中有五类操作:

1.在本地的分区上执行的操作,不会引起网络传输;

2.对一个流进行重新分区但并不改变流中的内容(会有网络传输);

3.将网络传输作为操作的一部分的聚合操作;

4.在分组后的流上进行的操作;

5.合并和链接(原文:Merges and joins

 

本地分区操作

本地分区操作不引起网络传输,独立运行于每一个批量分区中

Function:

 

一个函数接收一批输入字段并且发送零个或者更多的tuple来作为输出。输出tuple的字段被追加到原始的输入tuple的字段后面。假如一个函数不发送任何的tuple,原始输入的

tuple就会被过滤掉。否则,原始输入tuple中的字段会被包含在每一个输出tuple中。假如你有一个像下面一样的函数:

 

public class MyFunction extends BaseFunction {

    public void execute(TridentTuple tuple, TridentCollector collector) {

        for(int i=0; i < tuple.getInteger(0); i++) {//获取原始输入tuple中的第一个字段的值,   //然后i0开始,如果i小于这个值,那么发送一个新的tuple

//tuple中显示的发送一个i的值

            collector.emit(new Values(i));

        }

    }

}

现在假设你在mystream(Trident的一个拓扑)变量中有一个流,包含的字段有["a","b","c"],并且有如下的3tuple要经过该函数:

[1, 2, 3]

[4, 1, 6]

[3, 0, 8]

假如你运行如下代码:

mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))//输入流中使用 "b"这个字段的值,输出流中申明一个"d"字段的值

附加解释

 运行过程如下:                                                                                                             

首先[1, 2, 3]进去了函数中,取到"b"字段中的值为2,那么可以发送两个tuple,其中 "d"字段的值分别为0 1 同时由于原始输入tuple中的字段会被保留,所以输出的两个tuple为如下格式:[1,2 ,3 ,0] [1,2,3,1];同理然后[4,1,6]进入函数,输出流为[4,1,6,0]                                                                                 最后[3,0,8]进入函数,由于不满足循环条件,没有输出tuple;所以[3,0,8]被直接过滤掉了。                                        

进过该函数处理的输出tuple拥有一下字段 ["a", "b", "c", "d"],输出的tuple看起来是这个样子的:

[1, 2, 3, 0]

[1, 2, 3, 1]

[4, 1, 6, 0]

 

Filter:

 

过滤器接收一个tuple,并决定是否保留该tuple。假设你有这样一个过滤器:

 

public class MyFilter extends BaseFilter {

    public boolean isKeep(TridentTuple tuple) {//返回true就会被保留,返回false就不会被保留了

        return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;//判断条件是第一个字段值为1,第二个字段值为2tuple才会被保留下来

    }

}

现在假设你有一些拥有 ["a", "b", "c"]这些字段的tuple,他们的值如下:

[1, 2, 3]

[2, 1, 1]

[2, 3, 4]

那么如果你运行如下代码:

mystream.filter(new MyFilter())

经过处理后的输出结果tuple就会变成下面这样:

[1, 2, 3]

 

mapflatMap(这两个函数比较新,在比较旧的storm版本中没有这两个函数)

 

 map返回一个由提供的mapping函数对原始输入流作用后的结果所组成的流(注:意味着原来字段对应的值被替换掉了),这种操作可以应用于一个流转换为另一个流(一对一)

 

 举个例子,假如这里有一个单词流,并且你想把这个单词流中的值转换为大写的方式,你就可以像下面这样定义一个mapping函数了:

   

   注:文中所提到的单词流,个人理解应该是这样的 ["world"] -----> [a],[b],[c]

  

public class UpperCase extends MapFunction {

 @Override

 public Values execute(TridentTuple input) {

   return new Values(input.getString(0).toUpperCase());

 }

}

 

这个mapping函数可以被应用于流上,来产生一个把原始输入流中的单词转换为大写形式的新流;

 

mystream.map(new UpperCase())

注:个人理解------处理后的结果是 ["world"]------> [A],[B],[C]

 flatMapmap很相似,但是拥有将一个流转换为多个流的能力(一对多),然后把生成的元素平压到一个新的流中。(注:这句话怎么理解呢?额,不好表达清楚,有厉害的可以帮忙翻译翻译;我有一些个人的理解,但是还没想好怎么组织语言)

 

 举个例子,有一个句子的流,而且你想你想把这个句子的流转换为单词的流,那么你就需要像下面这样来定义flatMap函数:

 

public class Split extends FlatMapFunction {

  @Override

  public Iterable<Values> execute(TridentTuple input) {//其实函数看起来很简单

    List<Values> valuesList = new ArrayList<>();

    for (String word : input.getString(0).split(" ")) {

      valuesList.add(new Values(word));

    }

    return valuesList;

  }

}

 

 这个flatMap函数可以作用于一个句子流,然后生成一个单词流:

 

mystream.flatMap(new Split())

 

 当然这些操作完全支持链式调用,那么你就可以通过如下的方式来将一个句子流转换一个大写单词流:

 

mystream.flatMap(new Split()).map(new UpperCase())

 

 如果你不把输出字段作为参数传递给mapfaltMapmapfaltMap会把输入字段作为输出字段使用

 

 假如你想用新的输出字段来替换旧的输入字段,那么你可以像下面这样在调用方法的时候,增加一个Fields参数

 

mystream.map(new UpperCase(), new Fields("uppercased"))

 输出流会忽略输入流中的字段,并只保留 "uppercased"这个字段。flatMap同理,例子如下:

mystream.flatMap(new Split(), new Fields("word"))

 

 

 

Peek:(这个函数比较新,在比较旧的storm版本中没有这个函数)

 

 peek用来对流中流过的每一个Trident tuple做一些额外的操作,这个功能在debug中会很有用,当tuple经过管道中的某个特定点的时候你可以观察到这些tuple

 

 举个例子,下面的代码将会在单词被转换为大写的结果传递给groupBy之前打印他们:

 

mystream.flatMap(new Split()).map(new UpperCase())

 .peek(new Consumer() {

      @Override public void accept(TridentTuple input) {//这个函数中,你只能获得tuple,然后用这个tuple的数据做一些事情,

//比如打印出来看一看,发送个电子邮件什么的.但是你不可能对流产生任何的影响

         System.out.println(input.getString(0)); } })

.groupBy(new Fields("word"))

.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

 

 

 

min minBy :(这两个函数比较新,在比较旧的storm版本中没有这两个函数)

 

 minminBy可以返回一个trident流中一个分区中一批tuple中的最小值;

 

 假如,一个trident流中包含字段["device-id","count"]并且以分区的方式发送流;

 

 Partition 0:       

 [123, 2]

 [113, 54]

 [23,  28]

 [237, 37]

 [12,  23]

 [62,  17]

 [98,  42]

 

 Partition 1:

 [64,  18]

 [72,  54]

 [2,   28]

 [742, 71]

 [98,  45]

 [62,  12]

 [19,  174]

 

 Partition 2:

 [27,  94]

 [82,  23]

 [9,   86]

 [74,  37]

 [51,  49]

 [37,  98]

 

 

 binBy操作像下面这样应用与上面的流中的tuple上时,结果是在每个分区上发送count是最小值的tuple

 

mystream.minBy(new Fields("count"))

 

 上面代码在3个分区中运行的结果是:

 

 Partition 0:

 [123, 2]

 

 Partition 1:

 [62,  12]

 

 Partition 2:

 [82,  23]

 

 你可以在public <T> Stream minBy(String inputFieldName, Comparator<T> comparator)public Stream min(Comparator<TridentTuple> comparator)方法中查看其他minminBy操作;

 下面的例子演示了这些API是如何使用不同的比较器来找出一批tuple中的最小值的:

 

FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));

 

TridentTopology topology = new TridentTopology();

        

Stream vehiclesStream = topology.newStream("spout1", spout).each(allFields,new Debug("##### vehicles"));

Stream slowVehiclesStream =vehiclesStream .min(new SpeedComparator()) // Comparator w.r.t speed on received tuple..each(vehicleField, new Debug("#### slowest vehicle"));

vehiclesStream.minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple..each(vehicleField, new Debug("#### least efficient vehicle"));

 

//这两个类的地址在:https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java

static class SpeedComparator implements Comparator<TridentTuple>, Serializable {

        @Override

        public int compare(TridentTuple tuple1, TridentTuple tuple2) {

            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);

            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);

            return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);

        }

       }

static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable {

@Override

        public int compare(TridentTuple tuple1, TridentTuple tuple2) {

            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);

            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);

            return Double.compare(vehicle1.efficiency, vehicle2.efficiency);

        }

    }

 

 

 

maxmaxBy:(这两个函数比较新,在比较旧的storm版本中没有这两个函数)

 

 max maxBy 可以返回一个trident流中一个分区中一批tuple中的最大值;

 

 假如,一个trident流中包含字段["device-id","count"]并且以分区的方式发送流;

 

Partition 0:       

[123, 2]

[113, 54]

[23,  28]

[237, 37]

[12,  23]

[62,  17]

[98,  42]

 

Partition 1:

[64,  18]

[72,  54]

[2,   28]

[742, 71]

[98,  45]

[62,  12]

[19,  174]

 

Partition 2:

[27,  94]

[82,  23]

[9,   86]

[74,  37]

[51,  49]

[37,  98]

 maxBy操作像下面这样应用与上面的流中的tuple上时,结果是在每个分区上发送count是最大值的tuple

 

mystream.maxBy(new Fields("count"))

 

  上面代码在3个分区中运行的结果是:

  

  Partition 0:

  [113, 54]

 

  Partition 1:

  [19,  174]

 

  Partition 2:

  [37,  98]

  

  

  你可以在public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator)public Stream max(Comparator<TridentTuple> comparator)方法中查看其他maxmaxBy操作;

  下面的例子演示了这些API是如何使用不同的比较器来找出一批tuple中的最大值的:

  

FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));

 

TridentTopology topology = new TridentTopology();

 

Stream vehiclesStream = topology.newStream("spout1", spout). each(allFields, new Debug("##### vehicles"));

 

        vehiclesStream

                .max(new SpeedComparator()) // Comparator w.r.t speed on received tuple.

                .each(vehicleField, new Debug("#### fastest vehicle"))

                .project(driverField)

                .each(driverField, new Debug("##### fastest driver"));

 

        vehiclesStream

                .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple.

                .each(vehicleField, new Debug("#### most efficient vehicle"));

 

########两个比较器和minminBy中用到的比较器是一样的

 

 

 

Windowing:

 

Trident可以处理在同一个window中的一批一批的tuple,并且将汇总结果发送到下一个操作。这里有两种类型的window,分别是TumblingwindowSlidingWindow;两者都支持基于处理时间的或者是基于tuple的个数

 Window划分。

 

 Tumbling window

 

 tuple被基于处理时间或者tuplecount值,分配在一个单独的Window中;任何的tuple都只可能属于一个Window

 

    /**

     * 返回一个包含tuple个数为windowCounttummbling window中每一个tuple的汇总结果所组成的流

     */

public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,Fields inputFields, Aggregator aggregator, Fields functionFields);

 

    /**

     * 返回一个时间跨度为windowDurationtummbling window中每一个tuple的汇总结果所组成的流

     */

public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,Fields inputFields, Aggregator aggregator, Fields functionFields);

 

 

 Sliding window:

 

 Tuple被分组到各个window中,并且window每隔一定的时间间隔进行一次滑动。一个tuple可以属于一个或者多个window

 

    /**

     * 数为windowCountsliding window中每一个tuple的汇总结果所组成的流,并将sliding window向后滑动slideCount

     */

public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);

 

    /**

     * 返回一个时间跨度为window向后滑动windowDurationsliding window中每一个tuple的汇总结果所组成的流,并将sliding window向后滑动windowDuration

     */

public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);

 

 

Common windowing API

 下面是接受任何被支持的windowing configuration的公共windowing API

 

public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,Aggregator aggregator, Fields functionFields)

 

windowConfig 可以是下面这几种类型:

1.SlidingCountWindow.of(int windowCount, int slidingCount)

2.SlidingDurationWindow.of(BaseWindowedBolt.Duration windowDuration,BaseWindowedBolt.Duration slidingDuration)

3.TumblingCountWindow.of(int windowLength)

4.TumblingCountWindow.of(int windowLength)

 

Trident windowing APIS 需要WindowsStoreFactory 来保存接收到的tuple和汇总值;现在已经提供的一个基础的工厂是基于hbaseHBaseWindowsStoreFactory;它可以被扩展,用来支持不同场景的应用。

HBaseWindowStoreFactory 的例子如下:

 

// window-state table should already be created with cf:tuples column(要在hbase中提前建立好一个表叫window-state,并且已经在cf列族中添加了tuples)

HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));

    

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),

            new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),

            new Values("how many apples can you eat"), new Values("to be or not to be the person"));

    spout.setCycle(true);

 

TridentTopology topology = new TridentTopology();

 

Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),new Split(), new Fields("word"))

                         .window(TumblingCountWindow.of(1000), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))

            .peek(new Consumer() {

                @Override

                public void accept(TridentTuple input) {

                    LOG.info("Received tuple: [{}]", input);

                }

            });

 

    StormTopology stormTopology =  topology.build();

 

partitionAggregate

 

 partitionAggregate 在每一个批次的tuple的每一个分区上运行一个函数,和function(第一个介绍的那个)不同,partitionAggregate 处理后发送的tuple覆盖了他所接收到的tuple来看看这个例子:

 

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

 

 假设输入的流中的tuple包含字段 ["a", "b"],并且有一下这样几个分区的tuple流入了sum函数:

 

Partition 0:

["a", 1]

["b", 2]

 

Partition 1:

["a", 3]

["c", 8]

 

Partition 2:

["e", 1]

["d", 9]

["d", 10]

 

 

 然后输出的流中只包含一个字段'sum'tuple的内容如下:

 

Partition 0:

[3]

 

Partition 1:

[11]

 

Partition 2:

[20]

 

 这里有三种不同的接口用来定义聚合器:CombinerAggregator, ReducerAggregator, Aggregator.

 

 CombinerAggregator接口定义的内容如下:

 

public interface CombinerAggregator<T> extends Serializable {

    T init(TridentTuple tuple);

    T combine(T val1, T val2);

    T zero();

}

 

 一个CombinerAggregator只能返回一个tuple并且该tuple只有一个字段。CombinerAggregator在每一个输入的tuple上都会运行init方法来初始化值,然后使用combine方法来combine所有的值,直到只剩下一个

 值为止。如果分区内没有任何tupleCombinerAggregator就发送zero方法产生的值。例如,这是Count的实现:

 

public class Count implements CombinerAggregator<Long> {

    public Long init(TridentTuple tuple) {

        return 1L;

    }

 

    public Long combine(Long val1, Long val2) {

        return val1 + val2;

    }

 

    public Long zero() {

        return 0L;

    }

}

 

 

 当在aggregate方法中而不是在partitionAggregate方法中使用CombinerAggregators 的时候,你就能感受到它的好处了。在aggregate方法中,Trident会通过在网络之间传递tuple之前进行局部分区聚合的

方式来优化计算。

 

 

 ReducerAggregator接口的定义如下:

 

public interface ReducerAggregator<T> extends Serializable {

    T init();

    T reduce(T curr, TridentTuple tuple);

}

 ReducerAggregator在初始化的时候设置一个初始值,然后迭代每一个输入的tuplevalue来产生一个只有一个值的单一tuple来作为输出。例如,下面是使用ReducerAggregator实现的Count函数:

 

 

public class Count implements ReducerAggregator<Long> {

    public Long init() {

        return 0L;

    }

 

    public Long reduce(Long curr, TridentTuple tuple) {

        return curr + 1;

    }

}

 

 

ReducerAggregator也可以被使用在persistentAggregate方法中,稍后你将会看到。

 

 最最通用的聚合接口就是Aggregator了,接口定义如下:

 

public interface Aggregator<T> extends Operation {

    T init(Object batchId, TridentCollector collector);

    void aggregate(T state, TridentTuple tuple, TridentCollector collector);

    void complete(T state, TridentCollector collector);

}

 

Aggregator们可以发送带有任意数量字段的任意数量的tuple。在其方法执行的任何地方都可以发送tupleAggregator们按照如下的方式来执行:

 

1.初始化方法在处理一个batch之前被调用,返回结果是一个用来表示聚合状态的对象,并且会被传递给aggregatecomplete方法中。

2.aggregate 方法在每一个batch分区的tuple上运行,这个方法可以更新初始化的那个状态对象,并可以选择性地发送一些消息。

3.complete 方法在batch分区上的所有tuple都被Aggregator处理后调用。

 

下面的例子演示如何使用Aggregator来实现一个Count

 

public class CountAgg extends BaseAggregator<CountState> {

    static class CountState {

        long count = 0;

    }

 

    public CountState init(Object batchId, TridentCollector collector) {

        return new CountState();

    }

 

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {

        state.count+=1;

    }

 

    public void complete(CountState state, TridentCollector collector) {

        collector.emit(new Values(state.count));

    }

}

 

有时候你想同时执行很多个aggregator,这种方式被称为链式调用,可以像下面这样使用:

mystream.chainedAgg()

        .partitionAggregate(new Count(), new Fields("count"))

        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

        .chainEnd()

 

 上面的代码会在每一个分区上运行CountSum聚合器,输出将会只有一个tuple,并包含 ["count", "sum"]字段。

 

 

 

stateQuery partitionPersist:

 

 stateQuery partitionPersist 分别可以查询和更新作为数据资源的state;相关介绍在Trident state doc中。

 

 

projection

 projection方法作用在流上后,可以使流中只包含方法中指定的字段;假如你有一个包含字段["a", "b", "c", "d"]的流,然后你运行下面的方法:

 

mystream.project(new Fields("b", "d"))

 

 那么输出的流中就会只包含字段 ["b", "d"]

 

 

Repartitioning operations:

 

 Repartitioning operations(重分区操作)运行一个可以改变tuple在各个任务之间是如何分区的函数,该函数的运行结果可能会改变分区的数目;例如,并行处理数大于重新分区后的分区数的时候。

重分区操作需要进行网络传输,下面是提供的重分区函数:

 

 1.shuffle:随机分配tuple到所有的目标分区中;

 2.broadcast:每一个tuple都会被重复的发送到每一个目标分区中;这在DRPC操作用很有用,例如:你需要在每一个分区的data上进行stateQuery的时候

 3.partitionBy:该函数接收一批字段,然后根据这批字段进行分区。这批字段会被进行哈希运算然后根据分区个数取模,然后根据运算结果进行分区。该函数保证相同一批字段的tuple一定会去到同一个分区

中。

 4.global:所有的tuple都被发送到同一个分区中。流中所有的batch都会选择同一个分区。

 5.batchGlobal:在batch中的所有tuple都会进入同一个分区,但是不同的batch中的tuple可能会进入到不同的分区中。

 6.partition:该函数接收一个本地化的分区方法,本地化的分区方法需要实现org.apache.storm.grouping.CustomStreamGrouping

 

 

Aggregation operations(集合操作):

 

 Tridentaggregate persistentAggregate 方法来提供在一个流上进行聚合操作;aggregate 独立地运行在流中的每一个batch上,persistentAggregate 会运行在流中的所有batch上,并且会把结果

保存在state中。

 

 运行aggregate方法会在流上进行全局的聚合。当你使用ReducerAggregator ReducerAggregator 的时候,首先流会被重新分组到一个单独的分区中,然后分区函数在这个单独的分区中运行;然而当你

使用CombinerAggregator的时候,Trident首先会在每一个分区上进行聚合,然后把每个分区的聚合结果重新分区到一个独立的分区中,然后在完成网络传输后完成全局聚合操作。CombinerAggregator比较

高效,你应该尽量的使用它。

 

 这里有一个例子展示如何使用aggregate 来获得某个batch中的全局count

 

mystream.aggregate(new Count(), new Fields("count"))

 

 partitionAggregate一样,aggregate中的聚合器可以以链式的方式进行调用;然而,如果你把一个CombinerAggregator 和一个不是CombinerAggregator 的聚合器链在一起后,storm就无法进行在每个分区

中预先进行聚合操作的优化了。

 

 你可以在Trident state doc中查看persistentAggregate的使用方式。

 

 

Operations on grouped streams(在分组流上的操作)

 

 groupBy 操作根据特定的字段运行一个partitionBy 操作来对流进行重新分区,然后在每一个分区中,把特定字段相同的tuple放到一个组中。下面是一个示例图:

 

 

 

 

 如果你在一个分组后的流上运行aggregators ,那么聚合操作会在每一个组中运行,而不是在每个batch中运行。persistentAggregate 也可以运行在一个分组后的流上,在这种情况下

聚合后的结果会被保存在一个 MapState中,该 MapState使用用来分组的字段作为key。在Trident state doc中你可以找到更多答案。

 

 和普通的流一样,运行在分组后的流上的aggregators 也可以进行链式调用。

 

 

Merges and joins:

 

 

 API的最后一部分就是把不同的流结合在一起,最最简单的结合流的方式就是把几个不同的流合并到同一个流中。你可以通过merge 方法(像下面这样)来达到目的:

 

topology.merge(stream1, stream2, stream3);

 

 Trident会用第一个流的字段来重新命名其他合并的流的字段,在作为新的输出流的字段

 

 另一种合结合流的方式就是join操作,现在来看一个标准的join操作,就像在SQL中的join操作一样,join要求输入是有限的,所以对于无限地不停地发送的流是不起作用的。在Trident

中的join操作仅仅作用于每一个有spout发出的很小的batch中;

 

 下面的例子在包含字段["key", "val1", "val2"] 的流和包含字段["x", "val1"]的另一个流上进行join操作:

 

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

 

 

 上面的代码中,把stream1stream2通过keyxjoin在一起,Trident要求输出流中所有的输出字段都要起名字,因为在输入流中可能会用重复的字段名称。由join操作发出的tuple

会包含如下内容:

 

1.首先是链接字段的列表。在这里key等同以stream1中的key也等同于stream2中的x

2.然后就是所有流中没有进行join的字段,这些字段按照传递进来的顺序排序;在这个例子中,a=stream1.val1,b=stream1.val2,c=stream2.val1.

 当来自不同的spout的流和并的时候,这些spout会在发送batch上进行同步。也就是说一个要处理的batch会包含所有的参与的spout所发送的tuple

 你也许会好奇,该如何实现一个类似"windowed join"的操作,也就是说,来自一方的tuple和来自另一方的最近一小时的tuple进行join操作。

 要实现这样的功能,你需要利用partitionPersist stateQuery,最近一小时tuple会被保存并且循环迭代在一个state中,以join操作的field作为key。然后stateQuery 将会通过join的字段查询state中的数据来进行join操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/569229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

poj 2406 还是KMP的简单应用

记住KMP是多计算一位的。其中next[i]为不为自身的最大首尾重复子串长度。 位移ji-next[i]可以看作是构成字符串s的字串&#xff08;如果i%j0&#xff0c;存在这样的构成&#xff09;&#xff0c;相应的重复次数也就是n/d。 a b c d * next:-1 0 0 0 0 这时ji-next[i]; …

Trident State译文

Trident State 译文 Trident针对状态化的数据源的读写进行了一流的分装。State可以包含在拓扑中-例如&#xff0c;保存在内存中&#xff0c;有HDFS提供备份-也可以保存在一个外部的数据库中&#xff0c;像Memcached和Cassandra。针对以上的所有情况&#xff0c;Trident的API都…

远程访问数据库查询数据量一大就Hang

最近刚为客户升级了一套Oracle Database&#xff0c;一切进展顺利&#xff0c;眼看就要顺利验收时&#xff0c;发现有部分客户端软件连接新版本数据库时会Hang&#xff0c;问题非常诡异。 系统环境如下 升级前的环境OS:Windows Server 2003 DB:Windows Database Enterprise Edi…

storm-hbase jar包中的bolt节点源码解析

一段时间内&#xff0c;大家都是自己在storm的节点中实现对hbase的操作&#xff0c;不管是普通的topo还是在trident中都是这样&#xff1b;不知道从那个版本起&#xff0c;在storm的压缩包中就多出了好几个jar包&#xff0c;把针对habse&#xff0c;mysql&#xff0c;mongodb等…

软件之道:软件开发争议问题剖析

软件之道&#xff1a;软件开发争议问题剖析 基本信息 原书名&#xff1a; Making Software 原出版社&#xff1a; OReilly 作者&#xff1a; (美)Andy Oram Greg Wilson 译者&#xff1a; 鲍央舟 张玳 沈欢星丛书名&#xff1a; 图灵程序设计丛书出版社&#xff1a;人民邮…

如何理解矩阵

线性代数课程&#xff0c;无论你从行列式入手还是直接从矩阵入手&#xff0c;从一开始就充斥着莫名其妙。比如说&#xff0c;在全国一般工科院系教学中应用最广泛的同济线性代数教材&#xff08;现在到了第四版&#xff09;&#xff0c;一上来就介绍逆序数这个“前无古人&#…

对于泛型的理解

如果希望构建一个集合容器&#xff0c;会用到ArrayList array new ArrayList(); ArrayList有几个缺点&#xff1a;1 无法保证容器中的类型安全&#xff08;类型一致问题&#xff09; 2 存进arralist的数据&#xff0c;CIL会自动进行装箱&#xff0c;也就是保存进ArrayList中的…

交通灯管理系统

题目需求&#xff1a; 模拟实现十字路口的交通灯管理系统逻辑&#xff0c;具体需求如下&#xff1a; 1、异步随机生成按照各个路线行驶的车辆。 例如&#xff1a; 由南向而来去往北向的车辆 ---- 直行车辆 由西向而来去往南向的车辆 ---- 右转车辆 由东向…

REDIS提供的map,list,set,sortedset使用测试

public class RedisTest {public JedisPool jedisPool null;public void init(){//创建配置信息JedisPoolConfig pool new JedisPoolConfig();//设置最大的总链接数pool.setMaxTotal(300);//设置最大空闲链接数pool.setMaxIdle(100);//设置最大等待时间pool.setMaxWaitMilli…

Java 多线程-生产者、消费者

一、整体代码 ThreadDemo.java public class ThreadDemo { public static void main(String[] args) { Godown godown new Godown(0); Consumer c1 new Consumer(50, godown); Consumer c2 new Consumer(20, godown); Consumer c3 new Consumer(30, godown); Producer p1 …

scala初学之helloWorld

特此声明&#xff0c;本文中的代码 部分或全部来源王家林的scala教程&#xff1b;虽然王家林一直被大家所诟病&#xff0c;而且也无法确定这些scala的程序代码是不是他的。但是作为一个初学者觉得就算代码不是他的&#xff0c;他只是拿过来翻译一次&#xff0c;看他的视频也是能…

scala初学之函数定义、流程控制、异常处理入门

特此声明&#xff0c;本文中的代码 部分或全部来源王家林的scala教程&#xff1b;虽然王家林一直被大家所诟病&#xff0c;而且也无法确定这些scala的程序代码是不是他的。但是作为一个初学者觉得就算代码不是他的&#xff0c;他只是拿过来翻译一次&#xff0c;看他的视频也是能…

HGOI20190707 题解

Problem A 钢铁侠的诞生 现在有$n$个数字$a_i \leq 10^9 $&#xff0c;然后取出$m$个数字&#xff0c;保证合法。 从小到大输出剩余的$n-m$个数字。 对于100%的数据$m\leq n \leq 3\times 10^5$ Sol : 直接map映射然后用iterator来遍历整个map输出答案即可。 复杂度大概是$O(n…

scala初学之Tuple、Array、Map、文件操作入门实战

特此声明&#xff0c;本文中的代码 部分或全部来源王家林的scala教程&#xff1b;虽然王家林一直被大家所诟病&#xff0c;而且也无法确定这些scala的程序代码是不是他的。但是作为一个初学者觉得就算代码不是他的&#xff0c;他只是拿过来翻译一次&#xff0c;看他的视频也是能…

Java连载3-编译与运行阶段详解JRE,JDK,JVM关系

一、 1.JDK下载地址&#xff1a;https://www.oracle.com/technetwork/java/javase/downloads/jdk12-downloads-5295953.html 二、Java的加载与执行 1.Java程序运行包括&#xff1a; &#xff08;1&#xff09;编译阶段&#xff1a;检查Java源程序是否符合Java语法&#xff0c;符…

KMP算法NEXT数组纯手工生成

用一个实际的例子来说明&#xff0c;经历了看懂&#xff0c;看不懂&#xff0c;看懂&#xff0c;看不懂&#xff0c;看懂...后我终于决定把它记下来了。 例子字符串为&#xff1a;abaabaca 首先可以肯定&#xff0c;第一个位置永远位0&#xff0c;第二个位置永远为1.那么可以…

P1078 文化之旅

题面 这题好像是初二时老师讲过的一道题&#xff0c;但是。。我没听&#xff1f;&#xff1f;反正没交过就对了。。 我本来想的是深搜spfa&#xff0c;写到50行实现不了&#xff1f;&#xff1f;果断看tj&#xff0c;floyd&#xff1f;&#xff1f;&#xff1f;&#xff08;黑人…

KMP算法中NEXT数组的作用以及代码实现

在http://blog.csdn.net/u012613903/article/details/79004094中写到了如何手工去求一个NEXT数组&#xff0c;这个在很多考试中可以用来解题。但是在实际的使用中&#xff0c;NEXT数组究竟发挥着什么样的作用&#xff0c;如何用代码实现KMP算法呢&#xff1f; KMP算法是用来确…

最长公共连续子串

给出两个字符串&#xff08;可能包含空格&#xff09;,找出其中最长的公共连续子串,输出其长度。 注意这里是找连续子串。 算法&#xff1a;动态规划。f[i][j]表示第一个字符串前i个字符中与第二个字符串前j个中的最长连续子串长度 那么状态转移为&#xff1a;当s1(i)s2(j)时&a…

求最长回文串-从动态规划到马拉车之路(上)

要解决的问题&#xff1a; 给定一个字符串&#xff0c;要求求出这个字符串中的最长的回文串子串。 例子&#xff1a; cbddba的最长回文子串为 bddb cbdedba的最长回文子串为dbedb 由上面的例子可以看到&#xff0c;在考虑回文子串的问题时需要考虑奇偶性。因为奇回文关于中…