适合设计师的网站编辑软件/建站企业网站

适合设计师的网站编辑软件,建站企业网站,网络营销官网,风景网页设计图片1. 窗口的API概念 窗口的API使用分为按键分区和非按键分区,在定义窗口操作之前,首先就要确定好是基于按键分区Keyed的数据流KeyedStream来开窗还是基于没有按键分区的DataStream上开窗。 1.1 按键分区窗口(Keyed Windows) 按键…

1. 窗口的API概念

窗口的API使用分为按键分区非按键分区,在定义窗口操作之前,首先就要确定好是基于按键分区Keyed的数据流KeyedStream来开窗还是基于没有按键分区的DataStream上开窗。

1.1 按键分区窗口(Keyed Windows)

按键分区窗口就是按照key分为多条逻辑流logical streams,这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的计算。

代码实现:

stream.keyBy(...).window(...)

1.2 非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。

代码实现:

stream.windowAll(...)

这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll 本身就是一个非并行的操作。

1.3 代码中窗口 API 的调用

简单来说,窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)

代码实现:

stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,我们接下来就详细展开讲解。另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做 keyBy,后面调用.window()时直接换成.windowAll()就可以了。

2. 窗口分配器(Window Assigners)

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。窗口分配数据的规则,其实就对应着不同的窗口类型。所以,窗口分配器其实就是在指定窗口的类型。

窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner 作为参数,返回的WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是AllWindowedStream

窗口按照驱动类型可以分成时间窗口计数窗口,而按照具体的分配规则,又有滚动窗口滑动窗口会话窗口全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型 Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

2.1 时间窗口

时间窗口是最常用的窗口类型,又可以细分为滚动滑动会话三种。
在使用中直接调用.window(),在里面传入对应时间语义下的窗口分配器。这样一来,我们不需要专门定义时间语义,默认就是事件时间;如果想用处理时间,那么在这里传入处理时间窗口分配器就可以了。

2.1.1 滚动处理时间窗口(TumblingProcessingTimeWindows)

窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()
代码实现:

stringDataStreamSource.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...);

这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建了一个长度为 5 秒的滚动窗口。
另外.of()还有一个重载方法public static TumblingProcessingTimeWindows of(Time size, Time offset) ,可以传入两个 Time 类型的参数:sizeoffset。第一个参数表示窗口大小,第二个参数表示窗口起始点的偏移量。这个偏移量主要是解决时区问题,比如我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了:.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))

2.1.2 滑动处理时间窗口(SlidingProcessingTimeWindows)

窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()
代码实现:

stringDataStreamSource.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(...)

这里.of()方法需要传入两个 Time 类型的参数:size slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口。同样滑动窗口也可以追加第三个参数offset -偏移量,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

2.1.3 处理时间会话窗口(ProcessingTimeSessionWindows)

窗口分配器由类 ProcessingTimeSessionWindows 提供,需要调用它的静态方法.withGap()或者.withDynamicGap()
代码实现:

stream.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)

这里.withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最小间隔 session gap。我们这里创建了静态会话超时时间为 10 秒的会话窗口。

我们也可以使用.withDynamicGap()方法动态定义session gap时间,这就需要传入一个SessionWindowTimeGapExtractor作为参数,用来定义 session gap 的动态提取逻辑。在这里,我们提取了数据元素的第一个字段,用它的长度乘以 1000 作为会话超时的间隔。

.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {@Overridepublic long extract(Tuple2<String, Long> element) { // 提取 session gap 值返回, 单位毫秒return element.f0.length() * 1000;}
}))

2.1.4 滚动事件时间窗口(TumblingEventTimeWindows)

窗口分配器由类 TumblingEventTimeWindows 提供,用法与滚动处理事件窗口完全一致。

stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)

这里.of()方法也可以传入第二个参数 offset,用于设置窗口起始点的偏移量。

2.1.5 滑动事件时间窗口(SlidingEventTimeWindows)

窗口分配器由类 SlidingEventTimeWindows 提供,用法与滑动处理事件窗口完全一致。

stream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(...)

2.1.6 事件时间会话窗口(EventTimeSessionWindows)

窗口分配器由类 EventTimeSessionWindows 提供,用法与处理事件会话窗口完全一致。

stream.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)

2.2 计数窗口

计数窗口本身底层是基于全局窗口(Global Window)实现的。直接调用.countWindow()方法。根据分配规则的不同,又可以分为滚动计数窗口滑动计数窗口两类。

2.2.1 滚动计数窗口

滚动计数窗口只需要传入一个长整型的参数 size,表示窗口的大小。

stream.keyBy(...).countWindow(10)

我们定义了一个长度为 10 的滚动计数窗口,当窗口中元素数量达到 10 的时候,就会触发计算执行并关闭窗口。

2.2.2 滑动计数窗口

与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:sizeslide,前者表示窗口大小,后者表示滑动步长。

stream.keyBy(...).countWindow(103)

我们定义了一个长度为 10、滑动步长为 3 的滑动计数窗口。每个窗口统计 10 个数据,每隔 3 个数据就统计输出一次结果。

2.3 全局窗口

全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接用.window(),分配器由 GlobalWindows 类提供。

stream.keyBy(...).window(GlobalWindows.create());

需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

3. 窗口函数(Window Functions)

定义窗口分配器只是知道了数据属于哪个窗口,可以将数据收集起来;但是如果想让收集的数据到底做什么就需要在接上一个定义窗口计算的窗口函数(window functions)

经过窗口分配器处理之后,数据可以分配到对应窗口中,而数据流经过转换得到的数据类型是WindowedStream。不是 DataStream,所以并不能直接进行其他转换,必须进一步调用窗口函数,对收集的数据进行处理计算之后,才能最终得到 DataStream,如下图:
在这里插入图片描述

窗口函数定义了要对窗口中收集的数据做的计算处理,根据处理方式分为两类:增量聚合函数全窗口函数

3.1 增量聚合函数(incremental aggregation functions)

窗口将数据收集起来,最基本的处理操作就是聚合。窗口对无限流的切分,可以看作得到了一个有界数据集。如果如果等到所有数据都到了在窗口到了结束时间再去聚合就很不高效——这就相当于真的在用批处理的思路来做实时流处理。

为了提高实时性,流处理就像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出即可,这就提高了程序运行的效率和实时性。

典型的增量聚合函数有两个:ReduceFunctionAggregateFunction

3.1.1 归约函数(ReduceFunction)

最简单的聚合就是归约(reduce),就是将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。基于 WindowedStream 调用.reduce()方法,然
后传入 ReduceFunction 作为参数,就可以指定以归约两个元素的方式去对窗口中数据进行聚合了。这里的 ReduceFunction与简单聚合时用到的 ReduceFunction 是同一个函数类接口,所以使用方式也是完全一样的。

代码实例:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 创建自定义数据源的实时流DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}}));stringDataStreamSource.map(new MapFunction<Event, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(Event value) throws Exception {// 将数据转换成二元组,方便计算return Tuple2.of(value.getUser(), 1L);}}).keyBy(data -> data.f0)// 设置滚动事件时间窗口 5秒一个.window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {// 定义累加规则,窗口闭合时,向下游发送累加结果return Tuple2.of(value1.f0, value1.f1 + value2.f1);}}).print();env.execute();}

CustomSource 代码:

public class CustomSource  implements ParallelSourceFunction<Event> {// 声明一个布尔变量,作为控制数据生成的标识位private Boolean running = true;@Overridepublic void run(SourceContext<Event> ctx) throws Exception {Random random = new Random(); // 在指定的数据集中随机选取数据String[] users = {"Mary", "Alice", "Bob", "Cary"};String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};while (running) {Event event = new Event(users[random.nextInt(users.length)],urls[random.nextInt(urls.length)],Calendar.getInstance().getTimeInMillis());ctx.collect(event);System.out.printf("我开始生成数据了,[%s], [%d]%n", event.getUser() ,event.getTimestamp());// 隔 1 秒生成一个点击事件,方便观测Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

打印结果:
在这里插入图片描述
这个是凑巧了正好5个数据都在第一个窗口,如果5个数据不在第一个窗口就是这样的:

在这里插入图片描述

3.1.1 聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。导致需要在聚合前,先将数据转换(map)成预期结果类型但是这样遇到某些场景就很麻烦。

例如,如果我们希望计算一组数据的平均值,应该怎样做聚合呢?很明显,这时我们需要计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商(sum/count)。如果用 ReduceFunction,那么我们应该先把数据转换成二元组(sum, count)的形式式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要 map-reduce-map 三步操作,这显然不够高效。

FlinkWindow API 中的aggregate就提供了这样的操作。直接基于 WindowedStream 调用.aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction 的实现类作为参数。AggregateFunction 在源码中的定义如下:

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 
{ACC createAccumulator();ACC add(IN value, ACC accumulator);OUT getResult(ACC accumulator);ACC merge(ACC a, ACC b);
}

AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)累加器类型(ACC)输出类型(OUT)输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。
接口中有四个方法:

  1. createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  2. add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
  3. getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sumcount 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
  4. merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)

AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

代码实例:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 创建自定义数据源的实时流DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}}));stringDataStreamSource.keyBy(data -> data.getUser())// 设置滚动事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 输入 Event 累加器是 Tuple3<String, Long, Integer> 用户名,时间戳,个数,输出Tuple3<String, Integer, String> 用户名,个数,平均值.aggregate(new AggregateFunction<Event, Tuple3<String, Long, Integer>, Tuple3<String, Integer, String>>() {// 初始化累加器@Overridepublic Tuple3<String, Long, Integer> createAccumulator() {return Tuple3.of("", 0L, 0);}// 当数据来了处理一个数据@Overridepublic Tuple3<String, Long, Integer> add(Event value, Tuple3<String, Long, Integer> accumulator) {return Tuple3.of(value.getUser(), accumulator.f1 + value.getTimestamp(), accumulator.f2 + 1);}// 返回结果@Overridepublic Tuple3<String, Integer, String> getResult(Tuple3<String, Long, Integer> accumulator) {Timestamp timestamp = new Timestamp(accumulator.f1 / accumulator.f2);return Tuple3.of(accumulator.f0, accumulator.f2, timestamp.toString());}// merge操作一般是会话窗口才会用到@Overridepublic Tuple3<String, Long, Integer> merge(Tuple3<String, Long, Integer> a, Tuple3<String, Long, Integer> b) {return Tuple3.of(a.f0, a.f1 + b.f1, a.f2 + b.f2);}}).print();env.execute();}

输出结果:
在这里插入图片描述

AggregateFunction 的工作原理是:

  • 首先调用 createAccumulator()为任务初始化一个状态(累加器);
  • 而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;
  • 等到了窗口需要输出时,再调用 getResult()方法得到计算结果。
  • 很明显,与ReduceFunction相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

3.2 全窗口函数(full window functions)

窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

全窗口函数就是典型的批处理思路——等到所有数据都到齐了在处理数据。这种处理效率很慢,但为什么还会存在全窗口函数?是因为有些场景下,必须需要基于全部数据的计算结果才有效,这是聚合函数做不到的。

在 Flink 中,全窗口函数也有两种:WindowFunctionProcessWindowFunction

3.2.1 窗口函数(WindowFunction)

WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

stream.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());

这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。WindowFunction 接口在源码中实现如下:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

WindowFunction 能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用。一般在实际应用,直接使用 ProcessWindowFunction 就可以了。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 创建自定义数据源的实时流DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}}));stringDataStreamSource.keyBy(data -> true)// 设置滚动事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<Event, String, Boolean, TimeWindow>() {@Overridepublic void apply(Boolean aBoolean, TimeWindow window, Iterable<Event> input, Collector<String> out) throws Exception {long start = window.getStart();long end = window.getEnd();HashSet<String> users = new HashSet<>();for (Event event : input) {users.add(event.getUser());}out.collect("窗口:" + new Timestamp(start) + " ~ " + new Timestamp(end)+ "的独立访客数量是:" + users.size());}}).print();stringDataStreamSource.print("data");env.execute();}

输出结果:
在这里插入图片描述

可以看到TimeWindows的属性很少
在这里插入图片描述

3.2.2 处理窗口函数(ProcessWindowFunction)

ProcessWindowFunctionWindow API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员。

作为全窗口函数ProcessWindowFunction 同样需要将所有数据缓存下来,等到窗口触发计算时才使用,其实就是增强版的WindowFunction

比如求一个网站的每小时用户量:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 创建自定义数据源的实时流DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}}));stringDataStreamSource.keyBy(data -> true)// 设置滚动事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new UvCountByWindow()).print();stringDataStreamSource.print("data");env.execute();}public static class UvCountByWindow extends ProcessWindowFunction<Event, String, Boolean, TimeWindow> {@Overridepublic void process(Boolean aBoolean, ProcessWindowFunction<Event, String, Boolean, TimeWindow>.Context context, Iterable<Event> elements, Collector<String> out) throws Exception {HashSet<String> users = new HashSet<>();// 遍历所有数据获取去重的用户名单for (Event element : elements) {users.add(element.getUser());}// 获取窗口信息long start = context.window().getStart();long end = context.window().getEnd();out.collect(" 窗 口 : " + new Timestamp(start) + " ~ " + newTimestamp(end) + " 的独立访客数量是:" + users.size());}}

输出结果:
在这里插入图片描述

3.3 增量聚合和全窗口函数的结合使用、

增量聚合函数处理计算会更高效。举一个最简单的例子,对一组数据求和。大量的数据连续不断到来,全窗口函数只是把它们收集缓存起来,并没有处理;到了窗口要关闭、输出结果的时候,再遍历所有数据依次叠加,得到最终结果。而如果我们采用增量聚合的方式,那么只需要保存一个当前和的状态,每个数据到来时就会做一次加法,更新状态;到了要输出结果的时候,只要将当前状态直接拿出来就可以了。

增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。

调用WindowedStream.reduce().aggregate()方法时,只是简单地直接传入了一个 ReduceFunctionAggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction

// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) // ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> 
function)// AggregateFunction 与 WindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> 
windowFunction)// AggregateFunction 与 ProcessWindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction,ProcessWindowFunction<V, R, K, W> windowFunction)

调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。

举一个具体的实例来说明。在网站的各种统计指标中,一个很重要的统计指标就是热门的链接;想要得到热门的 url,前提是得到每个链接的“热门度”。一般情况下,可以用url 的浏览量(点击量)表示热门度。我们这里统计 10 秒钟的 url 浏览量,每 5 秒钟更新一次;另外为了更加清晰地展示,还应该把窗口的起始结束时间一起输出。我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 创建自定义数据源的实时流DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}}));stringDataStreamSource.keyBy(data -> true)// 设置滚动事件时间窗口 10秒.window(TumblingEventTimeWindows.of(Time.seconds(10)))// 同时传入增量聚合函数和全窗口函数 增量聚合函数的输出就是全局窗口的输入.aggregate(new UvAgg(), new UvCountResult()).print();stringDataStreamSource.print("data");env.execute();}// 自定义实现AggregateFunction ,增量聚合计算uv值,来一条数据就加一个数据public static class UvAgg implements AggregateFunction<Event, HashSet<String>, Integer> {@Overridepublic HashSet<String> createAccumulator() {return new HashSet<>();}@Overridepublic HashSet<String> add(Event value, HashSet<String> accumulator) {accumulator.add(value.getUser());return accumulator;}@Overridepublic Integer getResult(HashSet<String> accumulator) {return accumulator.size();}@Overridepublic HashSet<String> merge(HashSet<String> a, HashSet<String> b) {return null;}}// 自定义全局窗口处理函数 处理增量聚合窗口的值public static class UvCountResult extends ProcessWindowFunction<Integer, String, Boolean, TimeWindow> {@Overridepublic void process(Boolean aBoolean, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();// 增量聚合窗口的返回值就只有一个long uv  = elements.iterator().next();out.collect(" 窗 口 : " + new Timestamp(start) + " ~ " + newTimestamp(end) + " 的独立访客数量是:" + uv);}}

输出结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/755183.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android视角看鸿蒙第六课(module.json5中的各字段含义之pages)designWidth的用法

Android视角看鸿蒙第六课(module.json5中的各字段含义之pages&#xff09; 导读 前面几篇文章&#xff0c;我们陆续分析了entry->src->main下的module.json5中的各个字段的含义及作用。目前剩余pages和abilities两个字段&#xff0c;本篇文章一起来了解pages。 过程有错…

用Stable Diffusion生成同角色不同pose的人脸

随着技术的不断发展&#xff0c;我们现在可以使用稳定扩散技术&#xff08;Stable Diffusion&#xff09;来生成同一角色但不同姿势的人脸图片。本文将介绍这一方法的具体步骤&#xff0c;以及如何通过合理的提示语和模型选择来生成出更加真实和多样化的人脸图像。 博客首发地…

【Python】进阶学习:一文解决如何从指定的源目录中,挑选出符合条件的文件,并将这些文件复制到目标目录中

【Python】进阶学习&#xff1a;一文解决如何从指定的源目录中&#xff0c;挑选出符合条件的文件&#xff0c;并将这些文件复制到目标目录中 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化…

音频剪辑软件评测,哪一款最适合你?

“音频剪辑小白求解&#xff01;我正在制作一部个人纪录片&#xff0c;拍摄了很多原始音频素材&#xff0c;但是需要进行剪辑和整理才能使用。我完全不懂音频剪辑&#xff0c;请问有没有简单易懂的教程或者方法&#xff0c;帮助我快速上手并完成剪辑工作呢&#xff1f;” 随着…

5_相机标定_3_calibrateCamera()例子

上次介绍了calibrateCamera()接口参数&#xff0c;这次实际调用。 程序中所用标准标定板。 一、图片预处理 使用的图片原像素是3072*2048&#xff0c;即600万像素&#xff0c;处理起来不快&#xff1b;改成了560*420&#xff0c;即20万像素。调用opencv接口如下&#xff1a; //…

阿里通义灵码体验

点击访问体验 之前有体验过github的代码助手&#xff0c;奈何收费了&#xff0c;上周发现有一个免费的代码助手。 下载安装 vscode 搜索扩展 TONGYI Lingma 安装完成后登陆即可体验 写注释让他写代码 根据上下文自动补充 这里我只写了一个方法名&#xff0c;getAgencyList…

linux安装erlang

摘要 Erlang是一种通用的面向并发的编程语言&#xff0c;它由瑞典电信设备制造商爱立信所辖的CS-Lab开发&#xff0c;目的是创造一种可以应对大规模并发活动的编程语言和运行环境。 环境准备 系统环境 使用命令&#xff1a;uname -a 或者 uname -r&#xff0c;执行后如下&a…

浅谈SQL注入漏洞原理及利用方式

1.SQL注入 原理&#xff1a; 在数据交互中&#xff0c;前端的数据传入到后台处理时&#xff0c;由于后端没有做严格的判断&#xff0c;导致其传入的恶意“数据”拼接到SQL语句中后&#xff0c;被当作SQL语句的一部分执行。漏洞产生于脚本&#xff0c;注入是针对数据库进行。 …

Javaweb的学习21_CSS_属性

CSS的属性 (常用)属性&#xff1a; 1. 字体、文本 font-size&#xff1a;字体大小 color&#xff1a;文本颜色 text-align&#xff1a;文本的对齐方式 line-height&#xff1a;行高 2. 背景 background&#xff1a;是个复合属性 3. 边框 border&#xff1a;设置边框&#xff0c…

WordPress自动生成原创文章插件

WordPress作为最受欢迎的内容管理系统之一&#xff0c;为博客和网站的搭建提供了便捷的解决方案。而在内容创作方面&#xff0c;自动生成原创文章的插件为WordPress用户提供了更为高效的选项。 什么是WordPress自动生成原创文章插件&#xff1f; WordPress自动生成原创文章插件…

【NLP学习记录】One-Hot编码

1. One-Hot编码概念 one-hot编码的基本思想是将每个类别映射到一个向量&#xff0c;其中只有一个元素的值为1&#xff0c;其余元素的值为0。这样&#xff0c;每个类别之间相互独立&#xff0c;不存在顺序或距离关系。 举例&#xff1a;对于三个类别的情况&#xff0c;可以使用…

基于协同过滤的毕业生就业推荐系统python+django+flask

功能&#xff1a; 管理员&#xff1a;主页、个人中心、求职者管理、企业管理、招聘信息管理、就业信息管理、面试邀请管理、就业签约管理、投递的简历管理、系统管理 企业&#xff1a;主页、个人中心、求职者管理、招聘信息管理&#xff08;可看见所有的招聘信息。发布招聘信息…

三 C#插入排序算法

简介 插入排序算法是一种简单、直观的排序算法&#xff0c;其原理是将一个待排序的元素逐个地插入到已经排好序的部分中。 插入排序实现原理 插入排序算法是一种简单、直观的排序算法&#xff0c;其原理是将一个待排序的元素逐个地插入到已经排好序的部分中。 具体实现步骤…

【iOS】Blocks

文章目录 前言一、什么是Blocks二、Blocks模式1.Block语法2.Block类型变量3.截获自动变量值4.__block说明符5.截获的自动变量 三、Blocks的实现1.Block的实质__main_block_impl_0Block对象的实现结构体初始化 2.截获自动变量值3.__block说明符4.Block存储域5.__block变量存储域…

数据结构——lesson10排序之插入排序

&#x1f49e;&#x1f49e; 前言 hello hello~ &#xff0c;这里是大耳朵土土垚~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f339;&#x1f339;&#x1f339; &#x1f4a5;个人主页&#x…

python之数据类型转换

基本数据类型转换 Python 中基本数据类型转换的方法有下面几个。 方法说明int(x [,base ])将x转换为一个整数float(x )将x转换到一个浮点数complex(real [,imag ])创建一个复数str(x )将对象 x 转换为字符串repr(x )将对象 x 转换为表达式字符串eval(str )用来计算在字符串中…

Linux的背景介绍

1.Linux的发展史 Linux&#xff0c;一般指GNU/Linux&#xff08;单独的Linux内核并不可直接使用&#xff0c;一般搭配GNU套件&#xff0c;故得此称呼&#xff09;&#xff0c;是一种免费使用和自由传播的类UNIX操作系统&#xff0c;其内核由林纳斯本纳第克特托瓦兹&#xff08…

【源码阅读】evmⅠ

代码位置如下&#xff1a; 参考link 以太坊中有一个很重要的用途是智能合约&#xff0c;而其中evm模块是实现了执行智能合约的虚拟机。evm可以逐条解析执行智能合约的指令。 evm中的核心对象是EVM&#xff0c;代表一个以太坊虚拟机。其内部主要依赖&#xff1a;解释器Interore…

蓝桥杯历年真题Java b组 省赛 2018年第九届 第几天

一、题目一 第几天 2000年的1月1日&#xff0c;是那一年的第1天。 那么&#xff0c;2000年的5月4日&#xff0c;是那一年的第几天&#xff1f; 注意&#xff1a;需要提交的是一个整数&#xff0c;不要填写任何多余内容。 分析&#xff1a; 将每个月的天数加起来&#xff0c…

2024年敏捷产品负责人CSPO认证培训

课程名称&#xff1a;Scrum Product Owner CSPO产品负责人认证 课程类型&#xff1a;经理级 课程简介&#xff1a; Scrum Product Owner产品负责人在Scrum产品开发当中扮演“舵手”的角色&#xff0c;他决定产品的愿景、路线图以及投资回报&#xff0c;他需要回答为什么做&am…