Flink之Watermark水印、水位线

Watermark水印、水位线

  • 水位线
    • 概述
    • 水印本质
    • 生成Watermark
    • Watermark策略
    • WatermarkStrategy工具类
    • 使用Watermark策略
  • 内置Watermark生成器
    • 单调递增时间戳分配器
    • 固定延迟的时间戳分配器
  • 自定义WatermarkGenerator
    • 周期性Watermark生成器
    • 标记Watermark生成器
    • Watermark策略与Kafka连接器
  • 其他
    • 处理空闲数据源
    • 并行度下的水位线传递
    • 迟到数据的处理

水位线

概述

在Apache Flink中,Watermark(水印)是一种用于处理事件时间(eventtime)的时间指示器。它模拟了事件流中事件时间进展的概念。

事件时间是指事件实际发生的时间,在分布式流处理中经常用于处理无序事件流。然而,由于网络延迟、乱序事件的到达以及分布式处理的特点,事件时间可能不按顺序到达处理器。在这种情况下,处理程序需要一种机制来标识它们已经处理过的事件时间,并据此生成或更新水印。

水印是一个特殊的事件,包含了一个时间戳。它表示截至到该时间戳的事件已经全部到达(或预期已到达),并且可以被认为是完整的。水印告知系统在事件时间维度上处理事件的进展情况,并在触发窗口计算、事件乱序处理等方面提供辅助。

水印的生成通常基于事件数据中的时间戳,通过一些策略来推断出未到达的事件的时间戳。简单的策略可以是事件时间减去一个固定的延迟值,例如,如果我们有一个事件的时间戳,我们可以生成一个比该事件时间戳小一定固定时间的水印。

Flink通过处理数据流中的时间戳和水印来衡量事件时间进展,并通过水印来驱动事件时间的处理。可以根据应用程序的需要自定义水印生成的策略。

水印本质

Watermark是水印、水位线的意思,水印的出现是为了解决实时计算中的数据乱序问题,它的本质是DataStream中一个带有时间戳的元素。

水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。通过使用水位线机制,Flink能够动态地处理乱序事件,并在保证准确性的同时提供低延迟的数据处理。

如果Flink系统中出现了一个WaterMarkT,那么就意味着EventTime<T的数据都已经到达,窗口的结束时间和T相同的那个窗口被触发进行计算了。因此,水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。

在程序并行度大于1的情况下,会有多个流产生水印和窗口,这时候Flink会选取时间戳最小的水印。

生成Watermark

生成水位线使用assignTimestampsAndWatermarks()方法,它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。

dataStream.assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy);

需要传入一个WatermarkStrategy作为参数,也就是所谓的水位线生成策略

Watermark策略

Flink程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。通过使用TimestampAssigner API从元素中的某个字段去访问/提取时间戳。

时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉Flink应用程序事件时间的进度。其可以通过指定WatermarkGenerator 来配置watermark的生成方式。

需要设置一个同时包含TimestampAssigner 和WatermarkGenerator的WatermarkStrateg

WatermarkStrategy是一个接口,该接口中包含了一个时间戳分配器TimestampAssigner和一个水位线生成器WatermarkGenerator。

WatermarkStrategy接口如下:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {/*** 根据策略实例化一个 watermark 生成器* 主要负责按照既定的方式,基于时间戳生成水位线*/WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context var1);/*** 负责从流中数据元素的某个字段中提取时间戳,并分配给元素* 时间戳的分配是生成水位线的基础*/   default TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new RecordTimestampAssigner();}
}

WatermarkStrategy工具类

工具类WatermarkStrategy中也提供了几个常用的watermark策略,并且可以在某些必要场景下构建自己的 watermark策略。

        /*** 为时间戳单调递增的情况创建水印策略,适用于有序流*/static <T > WatermarkStrategy < T > forMonotonousTimestamps() {return (ctx) -> new AscendingTimestampsWatermarks<>();}/*** 为记录无序流的情况创建水印策略,但可以设置事件无序程度的上限。*/static <T > WatermarkStrategy < T > forBoundedOutOfOrderness(Duration maxOutOfOrderness) {return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);}/*** 基于watermarkgeneratorsupper自定义创建水印策略 */static <T > WatermarkStrategy < T > forGenerator(WatermarkGeneratorSupplier < T > generatorSupplier) {return generatorSupplier::createWatermarkGenerator;}/*** 创建完全不生成水印的水印策略。这在进行纯基于处理时间的流处理的场景中可能是有用*/static <T > WatermarkStrategy < T > noWatermarks() {return (ctx) -> new NoWatermarksGenerator<>();}

使用forBoundedOutOfOrderness watermark生成器和一个lambda表达式作为时间戳分配器

        DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4));SingleOutputStreamOperator<Tuple2<String, Integer>> assignTimestampsAndWatermarks = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));

注意:

时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。

使用Watermark策略

WatermarkStrategy在哪里使用?

1.直接在数据源上使用2.直接在非数据源的操作之后使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<MyEvent> stream = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<MyEvent> withTimestampsAndWatermarks = stream.filter( event -> event.severity() == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>);withTimestampsAndWatermarks.keyBy( (event) -> event.getGroup() ).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce( (a, b) -> a.add(b) ).addSink(...)

注意:

使用 WatermarkStrategy 去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。

内置Watermark生成器

Flink内置了两个WaterMark生成器

1.forMonotonousTimestamps: 时间戳单调增长:其实就是允许的延迟为0

WatermarkStrategy.forMonotonousTimestamps();

2.forBoundedOutOfOrderness: 允许固定时间的延迟

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

单调递增时间戳分配器

对于有序流,主要特点就是时间戳单调增长,永远不会出现迟到数据的问题。因此当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);// 将输入数据转换为IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 定义Watermark策略WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy// 升序的watermark,没有等待时间,即当 数字转时间戳 达到 滚动处理时间窗口10s 就触发窗口执行.<Integer>forMonotonousTimestamps()// TimestampAssigner是一个可以从事件数据中提取时间戳字段的简单函数// 指定时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<Integer>() {@Overridepublic long extractTimestamp(Integer element, long recordTimestamp) {// 将输入数字转时间戳,单位毫秒,当作数据的时间戳System.out.println("数据 " + element);return element * 1000L;}});//  指定watermark策略SingleOutputStreamOperator<Integer> singleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);singleOutputStreamOperator// 事件时间语义窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessAllWindowFunction<Integer, String, TimeWindow>() {@Overridepublic void process(Context context, Iterable<Integer> input, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss");long count = input.spliterator().estimateSize();out.collect("窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());}}).print();env.execute();}
> nc -lk 8888
1
2
5
8
9
10
15
18
20
21
数据 1
数据 2 
数据 5 
数据 8 
数据 9 
数据 10 
窗口在时间区间: 1970-01-01 08:00:00-1970-01-01 08:00:10 产生5条数据,具体数据:[1, 2, 5, 8, 9]
数据 15 
数据 18 
数据 20 
窗口在时间区间: 1970-01-01 08:00:10-1970-01-01 08:00:20 产生3条数据,具体数据:[10, 15, 18]
数据 21 

固定延迟的时间戳分配器

乱序流中需要等待迟到数据到齐,必须设置一个固定量的延迟时间(数据流中的数据可能遇到的最大延迟)。此时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟时间的结果。

调用WatermarkStrategy.forBoundedOutOfOrderness()方法可以实现,方法传入一个maxOutOfOrderness参数,表示最大乱序程度,它表示数据流中乱序数据时间戳的最大差值,如果能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);// 将输入数据转换为IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 定义Watermark策略WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy// 最大容忍的延迟时间: 定watermark生成 乱序 等待3s 即当输入 (数字转时间戳 - 3) 达到 滚动处理时间窗口10s 就触发窗口执行.<Integer>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定时间戳分配器 从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 将输入数字转时间戳,单位毫秒,当作数据的时间戳System.out.println("数据 " + element);return element * 1000L;});//  指定 watermark策略SingleOutputStreamOperator<Integer> singleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);singleOutputStreamOperator// 使用事件时间语义窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessAllWindowFunction<Integer, String, TimeWindow>() {@Overridepublic void process(Context context, Iterable<Integer> input, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = input.spliterator().estimateSize();out.collect("窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());}}).print();env.execute();}
 nc -lk 8888
1
5
8
6
7
11
4  
13
15
20
19
23
26
数据 1
数据 5
数据 8
数据 6
数据 7
数据 11
数据 4
数据 13
窗口在时间区间: 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生6条数据,具体数据:[1, 5, 8, 6, 7, 4]
数据 15
数据 20
数据 19
数据 23
窗口在时间区间: 1970-01-01 08:00:10.000-1970-01-01 08:00:20.000 产生4条数据,具体数据:[11, 13, 15, 19]
数据 26

自定义WatermarkGenerator

TimestampAssigner是一个可以从事件数据中提取时间戳字段的简单函数

watermark 的生成方式本质上是有两种:

1.周期性生成

周期性生成器通常通过 onEvent() 观察传入的事件数据,然后在框架调用 onPeriodicEmit() 时发出 watermark。

2.标记生成

标记生成器将查看 onEvent() 中的事件数据,并等待检查在流中携带 watermark 的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出 watermark。通常情况下,标记生成器不会通过 onPeriodicEmit() 发出 watermark。

都需要继承接口WatermarkGenerator,接口如下:

/*** {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。** <p><b>注意:</b>  WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks} * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。*/
@Public
public interface WatermarkGenerator<T> {/*** 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** 周期性的调用,也许会生成新的 watermark,也许不会** <p>调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定*/void onPeriodicEmit(WatermarkOutput output);
}

周期性Watermark生成器

周期性生成器通常通过 onEvent() 观察传入的事件数据,然后在框架调用onPeriodicEmit()时发出watermark

生成watermark的时间间隔(每 n 毫秒)可以通过ExecutionConfig.setAutoWatermarkInterval(…) 指定。每次都会调用生成器的onPeriodicEmit()方法,如果返回的watermark非空且值大于前一个watermark,则将发出新的watermark

示例1:

 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);// 将输入数据转换为IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 默认周期 200ms 修改默认周期时间为1000msenv.getConfig().setAutoWatermarkInterval(1000);WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy// 自定义 周期性生成器 	3000L:延迟时间.<Integer>forGenerator(ctx -> new MyWatermarkGenerator<>(3000L)).withTimestampAssigner((element, recordTimestamp) -> {// 将输入数字转时间戳,单位毫秒,当作数据的时间戳System.out.println("数据 " + element);return element * 1000L;});SingleOutputStreamOperator<Integer> singleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);singleOutputStreamOperator// 使用事件时间语义窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessAllWindowFunction<Integer, String, TimeWindow>() {@Overridepublic void process(Context context, Iterable<Integer> input, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = input.spliterator().estimateSize();out.collect("窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());}}).print();env.execute();}/*** 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。* 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。*/public static class MyWatermarkGenerator<T> implements WatermarkGenerator<T> {/*** 乱序等待时间* 允许的最大延迟时间 ms*/private long maxOutOfOrderness;/*** 保存 当前为止 最大的事件时间*/private long currentMaxTimestamp;public MyWatermarkGenerator(long maxOutOfOrderness) {this.maxOutOfOrderness = maxOutOfOrderness;}/*** 每条数据来,都会调用一次: 用来生产WaterMark中的时间戳* 为每个事件调用,允许水印生成器检查和记住事件时间戳,或根据事件本身发出水印。** @param event* @param eventTimestamp 提取到数据的事件时间* @param output*/@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);System.out.println("调用onEvent 目前为止最大时间戳 " + currentMaxTimestamp);}/*** 周期性调用: 发送watermark 默认200ms调用一次* <p>* 调用此方法和生成水印的时间间隔取决于ExecutionConfig.getAutoWatermarkInterval()** @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发出的 watermark = 当前最大时间戳 - 最大乱序时间output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));System.out.println("调用onPeriodicEmit 生成watermark " + (currentMaxTimestamp - maxOutOfOrderness - 1));}}}
调用onPeriodicEmit 生成watermark -3001
调用onPeriodicEmit 生成watermark -3001
数据 5
调用onEvent 目前为止最大时间戳 5000
调用onPeriodicEmit 生成watermark 1999
数据 6
调用onEvent 目前为止最大时间戳 6000
调用onPeriodicEmit 生成watermark 2999
调用onPeriodicEmit 生成watermark 2999
数据 3
调用onEvent 目前为止最大时间戳 6000
调用onPeriodicEmit 生成watermark 2999
调用onPeriodicEmit 生成watermark 2999
数据 13
调用onEvent 目前为止最大时间戳 13000
调用onPeriodicEmit 生成watermark 9999
窗口在时间区间: 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生3条数据,具体数据:[5, 6, 3]
调用onPeriodicEmit 生成watermark 9999
调用onPeriodicEmit 生成watermark 9999
调用onPeriodicEmit 生成watermark 9999
数据 10
调用onEvent 目前为止最大时间戳 13000
调用onPeriodicEmit 生成watermark 9999
调用onPeriodicEmit 生成watermark 9999

示例2:

/*** 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {private final long maxTimeLag = 5000; // 5 秒@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {// 处理时间场景下不需要实现}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));}
}

标记Watermark生成器

标记watermark生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。把发送水位线的逻辑写在onEvent方法当中即可

标记生成器将查看onEvent()中的事件数据,并等待检查在流中携带watermark的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出watermark。通常情况下,标记生成器不会通过onPeriodicEmit()发出 watermark。

        WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy// 自定义间歇性生成器.<Integer>forGenerator(ctx -> new MyWatermarkGenerator<>(3000L)).withTimestampAssigner((element, recordTimestamp) -> {// 将输入数字转时间戳,单位毫秒,当作数据的时间戳System.out.println("数据 " + element);return element * 1000L;});
    public static class MyWatermarkGenerator<T> implements WatermarkGenerator<T> {/*** 乱序等待时间* 允许的最大延迟时间 ms*/private long maxOutOfOrderness;/*** 保存 当前为止 最大的事件时间*/private long currentMaxTimestamp;public MyWatermarkGenerator(long maxOutOfOrderness) {this.maxOutOfOrderness = maxOutOfOrderness;}/*** 每条数据来,都会调用一次: 用来提取最大的事件时间,保存下来,并发送watermark** @param event* @param eventTimestamp 提取到的数据的 事件时间* @param output*/@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));System.out.println("调用onEvent  目前为止最大时间戳 " + currentMaxTimestamp + " 生成watermark " + (currentMaxTimestamp - maxOutOfOrderness - 1));}/*** 周期性调用: 不需要** @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {}}
数据 5
调用onEvent  目前为止最大时间戳 5000 生成watermark 1999
数据 6
调用onEvent  目前为止最大时间戳 6000 生成watermark 2999
数据 3
调用onEvent  目前为止最大时间戳 6000 生成watermark 2999
数据 13
调用onEvent  目前为止最大时间戳 13000 生成watermark 9999
窗口在时间区间: 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生3条数据,具体数据:[5, 6, 3]

Watermark策略与Kafka连接器

使用 Apache Kafka 连接器作为数据源时,每个Kafka分区可能有一个简单的事件时间模式(递增的时间戳或有界无序)当使用Kafka数据源时,多个分区常常并行使用,因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式在这种情况下,可以使用Flink中可识别Kafka分区的watermark生成机制。使用此特性,将在Kafka消费端内部针对每个Kafka分区生成watermark,并且不同分区watermark的合并方式与在数据流shuffle时的合并方式相同。

注意:

在自定义数据源中发送水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线。

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);KafkaSource<String> kafkaSource = KafkaSource.<String>builder()// 指定kafka节点的地址和端口.setBootstrapServers("node01:9092,node02:9092,node03:9092")// 指定消费者组的id.setGroupId("flink_group")// 指定消费的 Topic.setTopics("flink_topic")// 指定反序列化器,反序列化value.setValueOnlyDeserializer(new SimpleStringSchema())// flink消费kafka的策略.setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");DataStreamSink<String> kafka_source = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafka_source").print("Kafka");stream.print("Kafka");env.execute();}

其他

处理空闲数据源

如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着WatermarkGenerator也不会获得任何新数据去生成watermark。我们称这类数据源为空闲输入或空闲源。

在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子watermark的计算方式是取所有不同的上游并行数据源watermark的最小值,则其watermark将不会发生变化。

为了解决这个问题,可以使用WatermarkStrategy来检测空闲输入并将其标记为空闲状态。WatermarkStrategy为此提供了一个工具接口:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));

并行度下的水位线传递

在多并行度下,当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从socket接收数据流SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);// 将输入数据转换为IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 将数据合理地分发到不同的分区中DataStream<Integer> partitionCustom = dataStream.partitionCustom(new MyPartitioner(), value -> value);// 定义Watermark策略WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy// 时间序列递增,没有等待时间,即当输入 数字转时间戳 达到 滚动处理时间窗口10s 就触发窗口执行.<Integer>forMonotonousTimestamps()// 将输入数字转时间戳,单位毫秒,当作数据的时间戳.withTimestampAssigner((r, ts) -> r * 1000L);//  指定 watermark策略SingleOutputStreamOperator<Integer> singleOutputStreamOperator = partitionCustom.assignTimestampsAndWatermarks(watermarkStrategy);// 分2组窗口 数据%分区数,分成两组: 奇数一组,偶数一组SingleOutputStreamOperator<String> process = singleOutputStreamOperator.keyBy(a -> a % 2)// 使用事件时间语义窗.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {@Overridepublic void process(Integer key, Context context, Iterable<Integer> input, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = input.spliterator().estimateSize();out.collect("分组 " + key + " 的窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());}});process.print();env.execute();}public static class MyPartitioner implements Partitioner<Integer> {@Overridepublic int partition(Integer key, int numPartitions) {if (key % 2 == 0) {// 将偶数分配到第一个分区return 0;} else {// 将奇数分配到第二个分区return 1;}}}

发送测试数据

> nc -lk 8888
1
3
5
7
9
11
13
15
17

此时,控制台不会有任何输出,原因如下:

偶数窗口中,没有任何数据,由于当前Task是以最小的那个作为当前任务的事件时钟,就会导致当前Task的水位线无法推进,就导致窗口无法触发。

因此,这里可以使用上面提到的处理空闲数据源,设置空闲等待即可解决

        // 定义Watermark策略WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy// 升序的watermark,没有等待时间,即当输入 数字 达到 滚动处理时间窗口10s 就触发窗口执行.<Integer>forMonotonousTimestamps()// 指定时间戳分配器 从数据中提取.withTimestampAssigner((r, ts) -> r * 1000L)//空闲等待5s.withIdleness(Duration.ofSeconds(5));
2> 分组 1 的窗口在时间区间: 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生5条数据,具体数据:[1, 3, 5, 7, 9]

迟到数据的处理

设置窗口推迟关窗时间,在关窗之前,迟到数据来了,还能被窗口计算,来一条迟到数据触发一次计算。关窗后,迟到数据不会被计算,放入侧输出流

在设置一定的窗口允许迟到时间时,只考虑大部分的迟到数据,忽略不考虑极端小部分迟到很久的数据

极端小部分迟到的数据, 放到侧输出流。 获取到之后可以做各种处理

1.推迟水印推进

在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

2.设置窗口延迟关闭

Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。

.window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3))

3.使用侧流接收迟到的数据

.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3)).sideOutputLateData(lateWS)

实现示例

接收窗口关闭之后的迟到数据

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);// 将输入数据转换为IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 定义Watermark策略WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy.<Integer>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element * 1000L);//  指定 watermark策略SingleOutputStreamOperator<Integer> sensorDSwithWatermark = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<Integer> lateTag = new OutputTag<>("late-data", Types.POJO(Integer.class));SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor % 2).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 推迟2s关窗.sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {@Overridepublic void process(Integer key, Context context, Iterable<Integer> input, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = input.spliterator().estimateSize();out.collect("分组 " + key + " 的窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/110788.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

新版pycharm(2023.2.2)修改字体大小

下载了2023新版pycharm&#xff0c;想修改字体&#xff0c;发现找不到之前的setting入口&#xff0c;网上搜索也都是file-setting-editor这些&#xff0c;自己找了找&#xff0c;记录下 2023版pycharm的修改字体大小在file-Manage IDE Settings-Setting Sync… 里面&#xff0…

一篇文章带你弄懂编译和链接

一篇文章带你弄懂编译和链接 文章目录 一篇文章带你弄懂编译和链接一、环境二、翻译环境1.编译①预处理②编译③汇编 2.链接 三、运行环境 一、环境 翻译环境和运行环境 翻译环境&#xff1a;源代码被转换成可执行的机器指令。 运行环境&#xff1a;用于实际执行代码。 二、…

设计模式之抽象工厂模式

前言 工厂模式一般指的是简单工厂模式、工厂方法模式、抽象工厂模式&#xff0c;这是三种工厂模式的最后一篇&#xff0c;其他两种的文章链接如下&#xff1a; 设计模式之简单工厂模式-CSDN博客 设计模式之工厂方法模式-CSDN博客 建议三种模式放在一起对比学习&#xff0c;…

亚马逊测评关于IP和DNS的问题

最近不少人询问了关于IP和DNS的问题&#xff0c;在此进行一些科普。 当客户端试图访问一个网站时&#xff0c;首先会向其所在的ISP的DNS服务器进行查询。如果ISP的DNS服务器没有相关缓存&#xff0c;则会向上级DNS服务器进行查询。 一些诸如CDN之类的服务&#xff0c;可能会为…

云安全—责任共担

0x00 前言 云安全的职责范围实际上一直遵循的是&#xff0c;谁提供谁负责&#xff0c;如果交付给云消费者的时候&#xff0c;交付者使用过程中就要自行负责&#xff0c;也就是我们经常遇到的配置不当等问题&#xff0c;在三层服务模式中&#xff0c;责任互相嵌套&#xff0c;最…

IT运维管理系统在国有大型企业网络中的应用和可以解决的问题

随着国有大型企业业务的快速发展&#xff0c;网络运维管理面临着诸多挑战。本文将从问题概述、解决方案、监控易优势、实际案例和总结等方面阐述IT运维管理系统在国有大型企业网络中的应用和可以解决的问题。​IT运维管理系统&#xff1a;国有大型企业网络的变革者与解决之道 一…

MMKV源码解读与理解

概述 通过 mmap 技术实现的高性能通用 key-value 组件。同时选用 protobuf 协议&#xff0c;进一步压缩数据存储。 标准 protobuf 不提供增量更新的能力&#xff0c;每次写入都必须全量写入。考虑到主要使用场景是频繁地进行写入更新&#xff0c;我们需要有增量更新的能力&am…

批量修改视频尺寸:简单易用的视频剪辑软件教程

如果你需要批量修改视频尺寸&#xff0c;同时保持高质量的画质&#xff0c;那么“固乔剪辑助手”这款软件是你的不二之选。下面就是如何使用这款软件进行批量修改视频尺寸的详细步骤。 1. 首先&#xff0c;你需要在浏览器中进入“固乔科技”的官网&#xff0c;然后下载并安装“…

大数据 DataX 数据同步数据分析入门

目录 一、DataX 概览 1.1 DataX 是什么 1.2 DataX 3.0 概览 设计理念 当前使用现状 二、DataX 详解 2.1 DataX 3.0 框架设计 2.2 DataX 3.0 插件体系 2.3 DataX 3.0 核心架构 2.3.1 核心模块介绍 2.3.2 DataX 调度流程 2.4 DataX 3.0 的六大核心优势 2.4.1 可靠的…

Linux考试复习整理

文章目录 Linux考试整理一.选择题1.用户的密码现象放置在哪个文件夹&#xff1f;2.删除文件或目录的命令是&#xff1f;3.显示一个文件最后几行的命令是&#xff1f;4.删除一个用户并同时删除用户的主目录5.Linux配置文件一般放在什么目录&#xff1f;6.某文件的组外成员的权限…

MongoDB常用脚本汇总

概述 本文汇总记录日常工作中常用的MongoDB查询脚本。 实战 新增 新增集合&#xff1a; db.getSiblingDB("corpus").createCollection(message);删除 删除一条数据&#xff1a; db.getSiblingDB("cx_user").userAccount.deleteOne({_id: ObjectId(6…

科技与时尚共进化,优衣库以硬实力创造品牌长期价值

时尚总是轮回&#xff0c;服装产品如何保持长青&#xff1f;对优衣库来说&#xff0c;产品力不褪色的密码之一&#xff0c;就是始终坚持推动服装科技与时尚融合&#xff0c;赋予生活潮流更多内涵&#xff0c;和更高品质的穿搭体验。 这一点&#xff0c;往往在每年换季新品上市…

2023年【氧化工艺】考试报名及氧化工艺考试总结

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 氧化工艺考试报名是安全生产模拟考试一点通总题库中生成的一套氧化工艺考试总结&#xff0c;安全生产模拟考试一点通上氧化工艺作业手机同步练习。2023年【氧化工艺】考试报名及氧化工艺考试总结 1、【单选题】 由和O…

多维时序 | MATLAB实现SSA-CNN-BiGRU-Attention多变量时间序列预测(SE注意力机制)

多维时序 | MATLAB实现SSA-CNN-BiGRU-Attention多变量时间序列预测&#xff08;SE注意力机制&#xff09; 目录 多维时序 | MATLAB实现SSA-CNN-BiGRU-Attention多变量时间序列预测&#xff08;SE注意力机制&#xff09;预测效果基本描述模型描述程序设计参考资料 预测效果 基本…

企业微信设置可信域名

可信域名的验证文件注意一定放在域名所在的根目录下。 以cloud studio为例&#xff0c;工作区新建终端的路径就是域名在的根目录&#xff0c;而不是服务器的根目录

VA01/VA02/VA03 销售订单根据定价和步骤校验权限隐藏价格

1、业务需求 针对用户使用销售订单时&#xff0c;根据定价和步骤顺序&#xff0c;判断是否有权限&#xff0c;没有权限时隐藏销售订单抬头和行项目的部分价格数据 要限制的定价和步骤在spro中的位置 限制的步骤 2、增强实现 2.1权限对象 创建带有定价和步骤的权限对象 分配…

【华为OD机试python】返回矩阵中非1的元素个数【2023 B卷|200分】

【华为OD机试】-真题 !!点这里!! 【华为OD机试】真题考点分类 !!点这里 !! 题目描述 存在一个m*n的二维数组,其成员取值范围为0,1,2。 其中值为1的元素具备同化特性,每经过1S,将上下左右值为0的元素同化为1。 而值为2的元素,免疫同化。 将数组所有成员随机初始化为0或…

告别黑窗口——C++应用程序界面美化指南

摆脱黑窗口是许多C开发者在设计应用程序时面临的重要问题之一。黑窗口给用户带来了不友好的使用体验&#xff0c;因此改善应用程序界面成为提升用户满意度和使用效果的关键。在本篇博文中&#xff0c;我们将探讨一些方法&#xff0c;帮助你使用C语言实现应用程序界面的美化&…

C++项目开发指导(新员工培训材料)

&#xff08;注&#xff1a;这是一份给新员工的培训材料&#xff0c;集合了实际工作的经验和教训&#xff0c;不一定具有普适性。这份东西大概写于2014-1016年&#xff0c;不涉及之后的社会新气象。特别需要强调的是&#xff0c;这是面向新员工的培训&#xff0c;重点在于破除学…

Jenkins+vue发布项目

在Jenkins 中先创建一个任务名称 然后进行下一步&#xff0c;放一个项目 填写一些参数 参数1&#xff1a; 参数2&#xff1a; 参数3&#xff1a;参数4&#xff1a; 点击保存就行了 配置脚本 // git def git_url http://gitlab.xxxx.git def git_auth_id GITEE_RIVER…