Flink学习-时间和窗口

在流数据处理应用中,一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”,一 般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的 窗口计算。所以窗口和时间往往是分不开的。

时间语义

image-20231231194655718

  • 事件时间(Event Time):每个事件在对应的设备上发生的时间,也就是数据生成的时间。
  • 处理时间(Processing Time):执行处理操作的机器的系统时间
  • 摄取时间(Ingestion Time):事件进入到flink的时间

哪种 时间语义更重要

image-20231231194959522

一般情况下,业务日志数据中都会记录数据生成的时间戳,它就可以作为事件时间的判断基础。处理时间是我们计算效率的衡量标准,而事件事件更符合我们的业务计算逻辑。而处理时间是我们计算效率的衡量标准,由于没有任何附加考虑,数据一来就直接处理,因此这种方式可以让流处理延迟降到最低,效率达到最高。

flink1.13版本开始,将事件时间作为默认的时间语义。

水位线(Watermark)

什么是水位线

事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数据的时间戳来驱动的。

我们可以把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

watermark特点

image-20231231195431317

  • 水位线时插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线时基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以保证正确处理乱序数据
  • 一个水位线表示在当前流中事件时间已经达到了时间戳tt之前的所有数据都到齐了,之后流中不会出现时间戳t'<t的数据

总结起来,水位线(watermark)在Flink中的作用是用于处理乱序事件流,确保事件按照正确的顺序进行处理,以便进行准确的窗口计算和延迟处理。也就是, 牺牲掉一定的实时性,为了保证数据的完整性。

水位线的传递

image-20231231200028619

在 Flink 的数据流处理中,水位线是以特定的事件元素形式插入到数据流中的。这个特殊的事件元素被称为水位线事件(Watermark Event),它包含了水位线的时间戳信息。当数据流中的水位线事件到达算子(Operator)时,Flink 会根据其时间戳更新当前的水位线

​ 在源算子(Source Operator)中,可以通过调用特定的方法(如assignTimestampsAndWatermarks)来插入水位线事件。这样,在源算子产生的数据流中就会包含水位线事件,以及普通的数据事件。

​ 然后,水位线事件会随着数据流在不同的算子之间进行传递。当算子处理数据时,它会检查接收到的事件的时间戳,并与当前水位线进行比较。如果事件的时间戳大于当前水位线,算子会更新水位线,并触发相应的操作。

在“重分区”的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,这时我们应该以最慢的那个时钟,也就是最小的水位线为准。

水位线在上下游任务之间的传递,非常巧妙的避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。

如何生成水位线

生成水位线的总体原则

如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。

所以 Flink 中的水位线,其实是流处理中对低延迟结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

水位线生成策略(Watermark Strategies)

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方 法 :assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间

assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就是 所谓的“水位线生成策略”。WatermarkStrategy 中包含了一个“时间戳分配器TimestampAssigner 和一个“水位线生成器WatermarkGenerator

TimestampAssigner: 由 WatermarkStrategy 显示地指定从数据里面哪一个字段提取当前的时间戳,然后把它分配到当前的数据上。 就相当于再数据上追加了一个字段,这个字段是真正的 timestamp。它有可能和之前某个字段一样,也可能基于之前的字段做了一定的改变。

**时间戳的分配是生成水位线的基础。**基于时间戳,我们可以指定水位线生成策略WatermarkGenerator

WatermarkGenerator: 主要负责按照既定的方式,基于时间戳生成水位线。在 WatermarkGenerator 接口中,主要又有两个方法:onEvent()onPeriodicEmit()

  • onEvent:基于事件生成 watermark 。
  • onPeriodicEmit:基于周期性的发射生成 watermark 。默认200ms

Flink 内置水位线生成器

有序流

对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以 永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用.

val stream = env.addSource(new ClickSource)
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(new SerializableTimestampAssigner[Event] {override def extractTimestamp(element: Event, recordTimestamp: Long): Long = {element.timestamp}})
)
乱序流

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的 结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy.forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参 数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序 程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。

val stream1 = env.addSource(new ClickSource)
//插入水位线的逻辑
.assignTimestampsAndWatermarks(
//针对乱序流插入水位线,延迟时间设置为 5s
WatermarkStrategy
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner[Event] {override def extractTimestamp(element: Event, recordTimestamp: Long): Long = 			element.timestamp})
)

自定义水位线策略

WatermarkStrategy 中,时间戳分配器 TimestampAssigner 都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于 WatermarkGenerator 的实现。整体说来,Flink 有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。

  • 周期性水位线生成器(Periodic Generator)

    周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水 位线。

  • 断点式水位线生成器(Punctuated Generator)

    断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时, 就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。

窗口window

Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想 要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这 就是所谓的“窗口”(Window)。在 Flink 中, 窗口就是用来处理无界流的核心

窗口计算中,一般使用半开半闭的时间范围,即左闭右开。也就是说,窗口的开始时间是包含的,而结束时间是不包含的。因此,当一个事件的事件时间正好等于水位线时间时,它会被包含在该窗口的计算范围内。

​ 但是由于有乱序数据的存在,我们需要设置一个延迟时间来等所有数据到齐。比如设置延迟时间为2秒,这样0~10秒的窗口会在12的数据到来之后,才真正关闭计算输出结果。这样就可以包含迟到的数据了。

这里需要注意的是,Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个 窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时, 窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开.

窗口的分类

按照驱动类型分类

窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是“怎样截取数据”。换句话说,就是以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类型”。

我们最容易想到的就是按照时间段去截取数据,这种窗口就叫作“时间窗口”(Time Window)。这在实际应用中最常见,之前所举的例子也都是时间窗口。除了由时间驱动之外, 窗口其实也可以由数据驱动,也就是说按照固定的个数,来截取一段数据集,这种窗口叫作“计数窗口”(Count Window)。

时间窗口

flink中有一个时间窗口的类,叫TimeWindow,有两个私有属性:startend。表示窗口的开始和结束的时间戳,单位为毫秒

private final long start;
private final long end;
public long maxTimestamp(){return end - 1;
}
计数窗口

基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。

计数窗口理解简单,只需指定窗口大小,就可以把数据分配到对应的窗口中,Flink内部对应的类来表示计数窗口,底层通过全局窗口Global Window)实现。

按照窗口分配数据的规则分类

根据分配数据的规则,窗口的具体实现可以分为 4 类:

  • 滚动窗口(Tumbling Window)
  • 滑动窗口(Sliding Window)
  • 会话窗口(Session Window)
  • 以及全局窗口(Global Window)
滚动窗口 (Tumbling Windows)

滚动窗口对数据进行均匀分片。窗口之间没有重叠,也不会有间隔是首尾相接的状态,如果把多个窗口的创建看作一个窗口的移动,那么他就像在滚动一样。

滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有窗口大小,我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。

滚动窗口应用非常广泛,它可以对每个时间段做聚合统计,很多 BI 分析指标都可以用它来实现。

滑动窗口 (Sliding Windows)

与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的, 而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。

既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两 个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代 表了窗口计算的频率。同样,滑动窗口可以基于时间定义,也可以基于数据个数定义。

在一些场景中,可能需要统计最近一段时间内的指标,而结果的输出频率要求又很高,甚 至要求实时更新,比如股票价格的 24 小时涨跌幅统计,或者基于一段时间内行为检测的异常 报警。这时滑动窗口无疑就是很好的实现方式。

会话窗口(Session Windows)

会话窗口顾名思义,是基于“会话”(session)来对数据进行分组的。这里的会话类似 Web 应用中 session 的概念,不过并不表示两端的通讯过程,而是借用会话超时失效的机制来描述窗口。

与滑动窗口和滚动窗口不同,会话窗口只能基于时间来定义。对于会话窗口而言,最重要的参数就是会话超时时间的长度(size),也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果 gap 大于 size那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。在具体实现上,我们可以设置静态固定的大小(size),也可以通过一个自定义的 提取器(gap extractor)动态提取最小间隔 gap 的值。

在一些类似保持会话的场景下,往往可以使用会话窗口来进行数据的处理统计。

全局窗口(Global Windows)

还有一类比较通用的窗口,就是“全局窗口”。这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认 是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

image-20231231203922146

Flink 中的计数窗口(Count Window),底层就是用全局窗口实现的。

窗口API

按键分区窗口(Keyed Windows)

经过按键分区 keyBy()操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这 就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时 执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的 处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。

在代码实现上,我们需要先对DataStream调用keyBy()进行按键分区,然后再调用window() 定义窗口。

stream.keyBy(...)
.window(...)

非按键分区(Non-Keyed Windows)

如果没有进行 keyBy(),那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只 能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用 这种方式。

在代码中,直接基于 DataStream 调用 windowAll()定义窗口。

stream.windowAll(...)

这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll 本身就是一个非并行的操作

代码中窗口 API 的调用

窗口 操作主要有两个部分:

  • 窗口分配器(Window Assigners)
  • 窗口函数(Window Functions)
stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate() 方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。

窗口分配器有各种形式, 而窗口函数的调用方法也不只.aggregate()一种.

窗口分配器(Window Assigers)

定义窗口分配器Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。而窗口分配数据的规则,其实就对应着不同的窗口类型。所以可 以说,窗口分配器其实就是在指定窗口的类型

窗口分配器最通用的定义方式,就是调用 window()方法。这个方法需要传入一个 WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调用 windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream

//滚动处理时间窗口 :窗口5秒
TumblingProcessingTimeWindows.of(Time.seconds(5));
//滑动处理时间窗口:长度为 10 秒、滑动步长为 5 秒的滑动窗 口
SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))    ;
//处理时间会话窗口 :  静态会话超时时间为 10 秒的会话窗口
ProcessingTimeSessionWindows.withGap(Time.seconds(10))    ;
ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[(String, Long)] {override def extract(element: (String, Long)) { // 提取 session gap 值返回, 单位毫秒element._1.length * 1000}})    ;
//滚动事件时间窗口
TumblingEventTimeWindows.of(Time.seconds(5));
//滑动事件时间窗口
SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5));
//事件时间会话窗口
EventTimeSessionWindows.withGap(Time.seconds(10));//滚动计数窗口
stream.keyBy(...)
.countWindow(10);
//滑动计数窗口
stream.keyBy(...)
.countWindow(103);
//全局窗口
stream.keyBy(...)
.window(GlobalWindows.create())

窗口函数

定义窗 口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。

根据处理的方式分为:增量聚合函数(流)、全窗口函数(批)。

增量聚合函数(incremental aggregation functions)

典型的增量聚合函数有两个:ReduceFunctionAggregateFunction

归约函数(ReduceFunction)

最基本的聚合方式就是归约(reduce)。窗口的归约聚合,就是将窗口中收集到的数据两两进行归约。当我们进行流处 理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了 增量式的聚合。

窗口函数中也提供了 ReduceFunction:只要基于 WindowedStream 调用.reduce()方法,然 后传入 ReduceFunction 作为参数,就可以指定以归约两个元素的方式去对窗口中数据进行聚 合了。这里的 ReduceFunction 其实与简单聚合时用到的 ReduceFunction 是同一个函数类接口, 所以使用方式也是完全一样的。

//      抽取数据中的时间戳.
assignAscendingTimestamps(_.timestamp).map(x => {(x.user, 1)})
//      按照用户名进行分组
.keyBy(_._1)
//      设置5s的滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5))   )
//      聚合 保留一个用户名,值相加
.reduce((l, f) => (l._1, l._2 + f._2)    )stream.print()
env.execute()
聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样

为了更加灵活地处理窗口计算,Flink的Window API提供了更加一般化的aggregate()方法。 直接基于 WindowedStream 调用 aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个 方法需要传入一个 AggregateFunction 的实现类作为参数。

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 
{
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
OUT getResult(ACC accumulator);
ACC merge(ACC a, ACC b);
}

AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型; 累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

AggregateFunction 接口中有四个方法:

  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
  • getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。

另外,Flink 也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream 调用。主要包括 sum()/max()/ maxBy() /min()/ minBy(),与 KeyedStream 的简单 聚合非常相似。它们的底层,其实都是通过 AggregateFunction 来实现的。

全窗口函数(Full Window Functions)

窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗 口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

在 Flink 中,全窗口函数也有两种:WindowFunctionProcessWindowFunction

窗口函数(WindowFunction)

WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

stream.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction())
处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象” (Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富,可以认为是一个增强版的 WindowFunction。

其他API

对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink 还提供了其他一些可选的 API,让我们可以更加灵活地控制窗口行为。

触发器(Trigger)

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。

基于 WindowedStream 调用 trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream
.keyBy(...)
.window(...)
.trigger(new MyTrigger())

Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间 窗口,默认的触发器都是 EventTimeTrigger;类似还有 ProcessingTimeTriggerCountTrigger。所以一般情况下是不需要自定义触发器的。

rigger 是一个抽象类,自定义时必须实现下面四个抽象方法:

  • onElement():窗口中每到来一个元素,都会调用这个方法。
  • onEventTime():当注册的事件时间定时器触发时,将调用这个方法。
  • onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
  • clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。

可以看到,除了 clear()比较像生命周期方法,其他三个方法其实都是对某种事件的响应onElement()是对流中数据元素到来的响应;而另两个则是对时间的响应。这几个方法的参数中都有一个“触发器上下文”(TriggerContext)对象,可以用来注册定时器回调(callback)。这里提到的“定时器”(Timer),其实就是我们设定的一个“闹钟”,代表未来某个时间点会执行的事件;当时间进展到设定的值时,就会执行定义好的操作。很明显,对于时间窗口(TimeWindow)而言,就应该是在窗口的结束时间设定了一个定时器,这样到时间就可以触发 窗口的计算输出了。关于定时器的内容,我们在后面讲解处理函数(process function)时还会提到。

上面的前三个方法可以响应事件,那它们又是怎样跟窗口操作联系起来的呢?这就需要了解一下它们的返回值。这三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型。

  • CONTINUE(继续):什么都不做
  • FIRE(触发):触发计算,输出结果
  • PURGE(清除):清空窗口中的所有数据,销毁窗口
  • FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口

我们可以看到,Trigger 除了可以控制触发计算,还可以定义窗口什么时候关闭(销毁)。上面的四种类型,其实也就是这两个操作交叉配对产生的结果。一般我们会认为,到了窗口的结束时间,那么就会触发计算输出结果,然后关闭窗口——似乎这两个操作应该是同时发生的;但 TriggerResult 的定义告诉我们,两者可以分开。

移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器。

stream
.keyBy(...)
.window(...)
.evictor(new MyEvictor())

Evictor 接口定义了两个方法:

  • evictBefore():定义执行窗口函数之前的移除数据操作
  • evictAfter():定义执行窗口函数之后的以处数据操作

默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的。

允许延迟(Allowed Lateness)

在事件时间语义下,窗口中可能会出现数据迟到的情况。迟到数据默认会被直接丢弃,不会进入窗口进行统计计算。这样可能会导致统计结果不准确。

为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。

基于 WindowedStream 调用 allowedLateness()方法,传入一个 Time 类型的延迟时间,就可以表示允许这段时间内的延迟数据。

stream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(1))

从这里我们就可以看到,窗口的触发计算(Fire)和清除(Purge)操作确实可以分开。不过在默认情况下,允许的延迟是 0,这样一旦水位线到达了窗口结束时间就会触发计算并清除窗口,两个操作看起来就是同时发生了。当窗口被清除(关闭)之后,再来的数据就会被丢弃。

将迟到的数据放入侧输出流

Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”, 这个流中单独放置那些本该被丢弃的数据。

基于 WindowedStream 调用 sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同。

val stream = env.addSource(new ClickSource)
val outputTag = new OutputTag[Event]("late")
stream
.keyBy("user")
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)

将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的DataStream,调用 getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。

val winAggStream = stream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
.aggregate(new MyAggregateFunction)val lateStream = winAggStream.getSideOutput(outputTag)

getSideOutput()是 DataStream 的方法,获取到的侧输出流数据类型应该和OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同。

窗口的生命周期

1、 窗口的创建

窗口的类型和基本信息由窗口分配器(window assigners)指定,但窗口不会预先创建好,而是由数据驱动创建。当第一个应该属于这个窗口的数据元素到达时,就会创建对应的窗口。

2、窗口计算的触发

除了窗口分配器,每个窗口还会有自己的窗口函数(window functions)和触发器(trigger)。窗口函数可以分为增量聚合函数和全窗口函数,主要定义了窗口中计算的逻辑;而触发器则是指定调用窗口函数的条件。

对于不同的窗口类型,触发计算的条件也会不同。Flink 预定义的窗口类型都有对应内置的触发器。

3、 窗口的销毁

一般情况下,当时间达到了结束点,就会直接触发计算输出结果、进而清除状态销毁窗口。这时窗口的销毁可以认为和触发计算是同一时刻。这里需要注意,Flink 中只对时间窗口 (TimeWindow)有销毁机制;由于计数窗口(CountWindow)是基于全局窗口(GlobalWindw) 实现的,而全局窗口不会清除状态,所以就不会被销毁。

在特殊的场景下,窗口的销毁和触发计算会有所不同。事件时间语义下,如果设置了允许延迟,那么在水位线到达窗口结束时间时,仍然不会销毁窗口;窗口真正被完全删除的时间点,是窗口的结束时间加上用户指定的允许延迟时间。

迟到数据的处理

设置水位线延迟时间

水位线是事件时间的进展,它是我们整个应用的全局逻辑时钟。水位线生成之后,会随着数据在任务间流动,从而给每个任务指明当前的事件时间。所以从这个意义上讲,水位线是一个覆盖万物的存在,它并不只针对事件时间窗口有效。水位线其实是所有事件时间定时器触发的判断标准。那么水位线的延迟,当然也就是全局时钟的滞后。

既然水位线这么重要,那一般情况就不应该把它的延迟设置得太大,否则流处理的实时性就会大大降低。因为水位线的延迟主要是用来对付分布式网络传输导致的数据乱序,而网络传输的乱序程度一般并不会很大,大多集中在几毫秒至几百毫秒。所以实际应用中,我们往往会给水位线设置一个“能够处理大多数乱序数据的小延迟”,视需求一般设在毫秒~秒级

当我们设置了水位线延迟时间后,所有定时器就都会按照延迟后的水位线来触发。如果一个数据所包含的时间戳,小于当前的水位线,那么它就是所谓的“迟到数据”。

允许窗口处理迟到数据

除设置水位线延迟外,Flink 的窗口也是可以设置延迟时间,允许继续处理迟到数据的。

这种情况下,由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果;然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。这样就可以逐步修正计算结果,最终得到准确的统计值了。这其实就是著名的 Lambda 架构。原先需要两套独立的系统来同时保证实时性和结果的最 终正确性,如今 Flink 一套系统就全部搞定了。

将迟到数据发送到窗口侧输出流

还可以用窗口的侧输出流,来收集关窗以后的迟到数据。这种方式是最后“兜底”的方法,只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。我们只能将之前的窗口计算结果保存下来,然后获取侧输出流中的迟到数据,判断数据所属的窗口,手动对结果进行合并更新。尽管有些烦琐,实时性也不够强,但能够保证最终结果一定是正确的。

所以总结起来,Flink 处理迟到数据,对于结果的正确性有三重保障:水位线的延迟,窗口允许迟到数据,以及将迟到数据放入窗口侧输出流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/589014.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面试手撕算法高频专题:数组的双指针思想及应用(算法村第三关白银挑战)

所谓的双指针其实就是两个变量&#xff0c;不一定真的是指针。 快慢指针&#xff1a;一起向前走对撞指针、相向指针&#xff1a;从两头向中间走背向指针&#xff1a;从中间向两头走 移除值为val的元素 题目描述 27. 移除元素 - 力扣&#xff08;LeetCode&#xff09; 给你…

12.31_黑马数据结构与算法笔记Java

目录 345 设计跳表 Leetcode 1206 346 设计最小栈 Leetcode 155 347 设计端网址 Leetcode 355 348 设计推特 Leetcode 355 349 股票系列问题 Leetcode 121 350 股票系列问题 Leetcode 122 351 股票系列问题 Leetcode 714 352 股票系列问题 Leetcode 309 353 股票系列问…

【CISSP学习笔记】6. 安全开发

该知识领域涉及如下考点&#xff0c;具体内容分布于如下各个子章节&#xff1a; 理解安全并将其融入软件开发生命周期 (SDLC) 中在软件开发环境中识别和应用安全控制评估软件安全的有效性评估获得软件对安全的影响定义并应用安全编码准则和标准 6.1. 系统开发控制 6.1.1. 软…

以太网转RS485通讯类库封装

最近选用有人科技的以太网转RS485模块做项目&#xff0c;设备真漂亮&#xff0c;国货之光。调通了通讯的代码&#xff0c;发到网上供大家参考&#xff0c;多多交流。 以下分别是配套的头文件与源文件&#xff1a; /*******************************************************…

EOS开发Ubuntu安装EOSIO.CDT(Install the EOSIO.CDT)

EOS开发Ubuntu安装EOSIO.CDT&#xff08;Install the EOSIO.CDT&#xff09; EOSIO.CDT介绍&#xff1a;EOSIO合约开发工具包&#xff0c;简称CDT&#xff0c;是与合约编译相关的工具集合。而且后续教程主要使用 CDT 来编译合约和生成 ABI&#xff0c;不要忽略。 刚才我们安装好…

Octave处理高斯光束

文章目录 读取图像截取感兴趣区域强度图拟合 Octave是一种开源的数值计算软件&#xff0c;主要用于科学计算、数据分析和数值模拟等领域。既提供了一个用户友好的命令行界面&#xff0c;使用户能够通过输入简单的命令来进行各种数学运算和数据操作。也提供了功能完备的GUI窗口&…

关于LayUI表格重载数据问题

目的 搜索框搜索内容重载数据只显示搜索到的结果 遇到的问题 在layui官方文档里介绍的table属性有data项,但使用下列代码 table.reload(test, {data:data //data为json数据}); 时发现&#xff0c;会会重新调用table.render的url拿到原来的数据&#xff0c;并不会显示出来传…

看懂基本的电路原理图(入门)

文章目录 前言一、二极管二、电容三、接地一般符号四、晶体振荡器五、各种符号的含义六、查看原理图的顺序总结 前言 电子入门&#xff0c;怎么看原理图&#xff0c;各个图标都代表什么含义&#xff0c;今天好好来汇总一下。 就比如这个电路原理图来说&#xff0c;各个符号都…

文件监控-IT安全管理软件

文件监控和IT安全管理软件是用于保护企业数据和网络安全的工具。这些工具可以帮助企业监控文件的变化&#xff0c;防止未经授权的访问和修改&#xff0c;并确保数据的安全性和完整性。 一、具有哪些功能 文件监控软件可以实时监控文件系统的活动&#xff0c;包括文件的创建、修…

L1-076:降价提醒机器人

题目描述 小 T 想买一个玩具很久了&#xff0c;但价格有些高&#xff0c;他打算等便宜些再买。但天天盯着购物网站很麻烦&#xff0c;请你帮小 T 写一个降价提醒机器人&#xff0c;当玩具的当前价格比他设定的价格便宜时发出提醒。 输入格式&#xff1a; 输入第一行是两个正整数…

条款13:以对象管理资源

文章目录 没有管理的情况解决办法之unique_ptr智能指针解决办法之shared_ptr智能指针总结 没有管理的情况 资源是指一旦你使用完它&#xff0c;就需要返回系统的东西。 class Investment { ... }; // 投资类型层次结构的基类 Investment* createInvestment(); // 工厂函数&…

2022–2023学年2021级计算机科学与技术专业数据库原理 (A)卷

一、单项选择题&#xff08;每小题1.5分&#xff0c;共30分&#xff09; 1、构成E—R模型的三个基本要素是&#xff08; B &#xff09;。 A&#xff0e;实体、属性值、关系 B&#xff0e;实体、属性、联系 C&#xff0e;实体、实体集、联系 D&#xff0e;实体、实体…

html-css-js移动端导航栏底部固定+i18n国际化全局

需求&#xff1a;要做一个移动端的仿照小程序的导航栏页面操作&#xff0c;但是这边加上了i18n国家化&#xff0c;由于页面切换的时候会导致国际化失效&#xff0c;所以写了这篇文章 1.效果 切换页面的时候中英文也会跟着改变&#xff0c;不会导致切换后回到默认的语言 2.实现…

oracle 9i10g编程艺术-读书笔记1

根据书中提供的下载代码链接地址&#xff0c;从github上找到源代码下载地址。 https://github.com/apress下载好代码后&#xff0c;开始一段新的旅行。 设置 SQL*Plus 的 AUTOTRACE 设置 SQL*Plus 的 AUTOTRACE AUTOTRACE 是 SQL*Plus 中一个工具&#xff0c;可以显示所执行…

分布式数据库事务故障恢复的原理与实践

关系数据库中的事务故障恢复并不是一个新问题&#xff0c;自70年代关系数据库诞生之后就一直伴随着数据库技术的发展&#xff0c;并且在分布式数据库的场景下又遇到了一些新的问题。本文将会就事务故障恢复这个问题&#xff0c;分别讲述单机数据库、分布式数据库中遇到的问题和…

华为商城秒杀时加密验证 device_data 的算法研究

前言 之前华为商城放出 Mate60 手机时, 想给自己和家人抢购一两台&#xff0c;手动刷了好几天无果后&#xff0c;决定尝试编写程序&#xff0c;直接发送 POST 请求来抢。通过抓包和简单重放发送后&#xff0c;始终不成功。仔细研究&#xff0c;发现 Cookie 中有一个名为 devic…

启动gazebo harmonic

ros2 launch ros_gz_sim gz_sim.launch.py gz_version:8 如果不输入gz_version:8,默认就是6&#xff0c;启动的就是默认版本ign版本 左边那个是8&#xff0c;右边那个是6

基于EPICS modbus模块的单通道电压监测项目

先介绍在本项目中使用到的硬件&#xff1a; 1&#xff09;开发板&#xff1a;为香橙派Zero2&#xff0c;安装系统如下&#xff1a; Distributor ID: Ubuntu Description: Ubuntu 22.04.2 LTS Release: 22.04 Codename: jammy 2&#xff09; USB转485模块&…

深入探索MongoDB集群模式:从高可用复制集

MongoDB复制集概述 MongoDB复制集主要用于实现服务的高可用性&#xff0c;与Redis中的哨兵模式相似。它的核心作用是数据的备份和故障转移。 复制集的主要功能 数据复制&#xff1a;数据写入主节点&#xff08;Primary&#xff09;时&#xff0c;自动复制到一个或多个副本节…

【Java 进阶篇】Linux 常用命令使用详解:玩转命令行的魔法世界

在计算机的世界里&#xff0c;Linux是一个强大而富有魅力的操作系统。对于很多小白用户来说&#xff0c;刚接触Linux时可能感觉有些陌生&#xff0c;尤其是在命令行界面下。然而&#xff0c;正是这个看似晦涩的命令行&#xff0c;才是Linux系统最为强大和灵活的地方。本文将围绕…