FlinkAPI开发之水位线(Watermark)

案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048

Flink中的时间语义

在这里插入图片描述

哪种时间语义更重要

从《星球大战》说起

在这里插入图片描述

数据处理系统中的时间语义

在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。
在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。

水位线(Watermark)

事件时间和窗口

在这里插入图片描述

什么是水位线

在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

水位线和窗口的工作原理

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
注意:Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开,这部分内容我们会在后面详述。

生成水位线

生成水位线的总体原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

水位线生成策略

在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。具体使用如下:

DataStream<Event> stream = env.addSource(new ClickSource());
DataStream<Event> withTimestampsAndWatermarks = 
stream.assignTimestampsAndWatermarks(<watermark strategy>);

说明:WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”

WatermarkGenerator。
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{// 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式,基于时间戳生成水位线@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

Flink内置水位线

有序流中内置水位线设置
对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class WatermarkStrategyDemo {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);// TODO: 2024/1/7 定义时间语义environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/11 定义Watermark策略WatermarkStrategy<Orders> ordersWatermarkStrategy = WatermarkStrategy.<Orders>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Orders>() {@Overridepublic long extractTimestamp(Orders orders, long l) {System.out.println("返回时间戳"+orders.getOrder_date()+"毫秒13位的");return orders.getOrder_date();}});// TODO: 2024/1/11  指定 watermark策略SingleOutputStreamOperator<Orders> watermarks = ordersDataStreamSource.assignTimestampsAndWatermarks(ordersWatermarkStrategy);// TODO: 2024/1/11 进行聚合运算SingleOutputStreamOperator<Orders> reduce = watermarks.keyBy(orders -> orders.getProduct_id())// TODO: 2024/1/11 定义时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<Orders>() {@Overridepublic Orders reduce(Orders orders, Orders t1) throws Exception {Orders orders1 = new Orders(t1.getOrder_id(), t1.getUser_id(), t1.getOrder_date(), t1.getOrder_amount() + orders.getOrder_amount(), t1.getProduct_id(), t1.getOrder_num());return orders1;}});ordersDataStreamSource.print("聚合前数据");reduce.print("聚合数据");environment.execute();}
}

在这里插入图片描述

乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import javafx.scene.input.DataFormat;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Tuple2;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);// TODO: 2024/1/7 定义时间语义environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/11 定义Watermark策略WatermarkStrategy<Orders> ordersWatermarkStrategy = WatermarkStrategy.<Orders>forBoundedOutOfOrderness(Duration.ofSeconds(3))// TODO: 2024/1/11 指定水位线时间戳也可以用 Lambda 表达式.withTimestampAssigner((orders, l) -> orders.getOrder_date());// TODO: 2024/1/11 指定水位线SingleOutputStreamOperator<Orders> operator = ordersDataStreamSource.assignTimestampsAndWatermarks(ordersWatermarkStrategy);// TODO: 2024/1/11 分组聚合SingleOutputStreamOperator<Object> reduce = operator.keyBy(orders -> orders.getProduct_id()).window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<Orders, Object, Integer, TimeWindow>() {// TODO: 2024/1/9 参数说明:分组key值,窗口计时器,按照key分组后的数据,收集器@Overridepublic void process(Integer integer, ProcessWindowFunction<Orders, Object, Integer, TimeWindow>.Context context, Iterable<Orders> elements, Collector<Object> collector) throws Exception {// TODO: 2024/1/9 窗口内同一个key包含的数据条数long count = elements.spliterator().estimateSize();// TODO: 2024/1/9 窗口的开始时间long windowStartTs = context.window().getStart();// TODO: 2024/1/9 窗口的结束时间long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");// TODO: 2024/1/9 输出收集器collector.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});ordersDataStreamSource.map(new MapFunction<Orders, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(Orders orders) throws Exception {//时间格式,HH是24小时制,hh是AM PM12小时制SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//比如timestamp=1449210225945;String date_string = sdf.format(new Date(orders.getOrder_date()));return new Tuple2<>(date_string,orders.getOrder_amount());}}).print();reduce.print("聚合数据");environment.execute();}
}

在这里插入图片描述

自定义水位线生成器

周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
下面是一段自定义周期性生成水位线的代码:

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Tuple2;import java.text.SimpleDateFormat;
import java.util.Date;public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);// TODO: 2024/1/7 定义时间语义environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/12 设置水位线 SingleOutputStreamOperator<Orders> operator = ordersDataStreamSource.assignTimestampsAndWatermarks(new CustomWatermarkStrategy());// TODO: 2024/1/12 打印数据ordersDataStreamSource.map(new MapFunction<Orders, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(Orders orders) throws Exception {//时间格式,HH是24小时制,hh是AM PM12小时制SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//比如timestamp=1449210225945;String date_string = sdf.format(new Date(orders.getOrder_date()));return new Tuple2<>(date_string,orders.getOrder_amount());}}).print();// TODO: 2024/1/11 分组聚合SingleOutputStreamOperator<Object> reduce = operator.keyBy(orders -> orders.getProduct_id()).window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<Orders, Object, Integer, TimeWindow>() {// TODO: 2024/1/9 参数说明:分组key值,窗口计时器,按照key分组后的数据,收集器@Overridepublic void process(Integer integer, ProcessWindowFunction<Orders, Object, Integer, TimeWindow>.Context context, Iterable<Orders> elements, Collector<Object> collector) throws Exception {// TODO: 2024/1/9 窗口内同一个key包含的数据条数long count = elements.spliterator().estimateSize();// TODO: 2024/1/9 窗口的开始时间long windowStartTs = context.window().getStart();// TODO: 2024/1/9 窗口的结束时间long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");// TODO: 2024/1/9 输出收集器collector.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});reduce.print("聚合数据");environment.execute();}// TODO: 2024/1/12 自定义水位线生成器public static class CustomWatermarkStrategy implements WatermarkStrategy<Orders>{@Overridepublic WatermarkGenerator<Orders> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Orders>() {// TODO: 2024/1/12  延迟时间private Long delayTime = 2000L;// TODO: 2024/1/12  观察到的最大时间戳private Long maxTs = -Long.MAX_VALUE + delayTime + 1L;@Overridepublic void onEvent(Orders orders, long l, WatermarkOutput watermarkOutput) {// TODO: 2024/1/12   每来一条数据就调用一次 , 更新最大时间戳maxTs = Math.max(orders.getOrder_date(),maxTs);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// TODO: 2024/1/12   发射水位线,默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}};}@Overridepublic TimestampAssigner<Orders> createTimestampAssigner(TimestampAssignerSupplier.Context context) {// TODO: 2024/1/12  告诉程序数据源里的时间戳是哪一个字段 return new SerializableTimestampAssigner<Orders>(){@Overridepublic long extractTimestamp(Orders orders, long l) {return orders.getOrder_date();}};}}
}

在这里插入图片描述

断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。

在数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。

env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)

水位线的传递

在这里插入图片描述

在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。

案例:乱序流的watermark,将并行度设为2,观察现象。
在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待。

import com.zxl.bean.Orders;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setParallelism(2);DataStream<String> socketTextStream = environment.socketTextStream("175.24.186.230", 9999);DataStream<Integer> streamOperator = socketTextStream.map(Integer::parseInt);//自定义分区DataStream<Integer> dataStream = streamOperator.partitionCustom(new Partitioner<Integer>() {@Overridepublic int partition(Integer integer, int i) {if (integer % 2 == 0) {return 0;} else {return 1;}}}, new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer integer) throws Exception {return integer;}});WatermarkStrategy<Orders> ordersWatermarkStrategy = WatermarkStrategy.<Orders>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((orders, l) -> orders.getOrder_date());SingleOutputStreamOperator<Integer> watermarks = dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000L)// TODO: 2024/1/12 空闲等待5s.withIdleness(Duration.ofSeconds(5)));// 分成两组: 奇数一组,偶数一组 , 开10s的事件时间滚动窗口watermarks.keyBy(r -> r % 2).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {@Overridepublic void process(Integer integer, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();environment.execute();}
}

在这里插入图片描述

迟到数据的处理

推迟水印推进

在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

设置窗口延迟关闭

Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。
以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

注意:允许迟到只能运用在event time上

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;public class WatermarkLateDemo {public static void main(String[] args) throws Exception {// TODO: 2024/1/15  创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();// TODO: 2024/1/15  设置并行度为1environment.setParallelism(1);// TODO: 2024/1/7 定义时间语义environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/15 设置水位线WatermarkStrategy<Orders> watermarkStrategy = WatermarkStrategy// TODO: 2024/1/15 设置最大乱序程度,数据流中乱序数据时间戳的最大差值.<Orders>forBoundedOutOfOrderness(Duration.ofSeconds(3))// TODO: 2024/1/15 指定水位线中对应的时间字段 .withTimestampAssigner((orders, l) -> orders.getOrder_date())// TODO: 2024/1/15 设置空闲等待时间,如果某个分区中有长时间无数据产生将放弃此分区的水位线选举权力 .withIdleness(Duration.ofSeconds(3));// TODO: 2024/1/15 添加水位线SingleOutputStreamOperator<Orders> operator = ordersDataStreamSource.assignTimestampsAndWatermarks(watermarkStrategy);// TODO: 2024/1/15  定义侧输出流的标签OutputTag<Orders> lateData = new OutputTag<>("late_data", Types.POJO(Orders.class));// TODO: 2024/1/15 设置分区keySingleOutputStreamOperator<Object> process = operator.keyBy(orders -> orders.getProduct_id())// TODO: 2024/1/15 定义滚动窗口时间大小.window(TumblingEventTimeWindows.of(Time.seconds(5)))// TODO: 2024/1/15 窗口延迟关闭时间.allowedLateness(Time.seconds(3))// TODO: 2024/1/15 迟到数据的测输出流.sideOutputLateData(lateData)// TODO: 2024/1/15 进行数据聚合.process(new ProcessWindowFunction<Orders, Object, Integer, TimeWindow>() {@Overridepublic void process(Integer integer, ProcessWindowFunction<Orders, Object, Integer, TimeWindow>.Context context, Iterable<Orders> elements, Collector<Object> collector) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();collector.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();process.getSideOutput(lateData).print("侧输出流");environment.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/627403.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue cli 配置了productionSourceMap: true 开启source-map 没有生成map文件

因为UglifyJsPlugin导致生成map失败&#xff0c;可以将其注释即可 也可以加上 new UglifyJsPlugin({sourceMap:true })

用python实现给出关键字查找并标注pdf文件中关键字

要在Python中标注PDF文件中的关键字&#xff0c;可以使用Python的PDFMiner库和Python的matplotlib库。 首先&#xff0c;需要安装这两个库。可以使用pip命令进行安装&#xff1a; shell 复制代码 pip install pdfminer.six matplotlib 接下来&#xff0c;可以使用以下代码实现…

基于Java+SSM运动会管理系统详细设计和实现【附源码】

基于JavaSSM运动会管理系统详细设计和实现【附源码】 &#x1f345; 作者主页 央顺技术团队 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 文末获取源码联系方式 &#x1f4dd; &#x1f345; 查看下方微信号获取联系方式 承接各种定制系统 …

java解析json复杂数据的第四种思路

文章目录 一、概述二、数据预览1. 接口json数据 三、代码实现1. 核心代码2. 字符串替换结果3. 运行结果 一、概述 接前两篇 java解析json复杂数据的两种思路 java解析json复杂数据的第三种思路 我们已经有了解析json数据的几种思路&#xff0c;下面介绍的方法是最少依赖情况下…

C++写二进制文件

源文件 #include <iostream> #include <fstream> #include <sstream> #include <cmath>void convert2() {// 打开输入文本文件std::ifstream inputFile("mask.txt");// 打开输出二进制文件std::ofstream outputFile("mask.dat", …

Dubbo分层设计之Serialize层

前言 Dubbo 框架采用 微内核 插件 的基本设计原则&#xff0c;自身功能几乎也都通过 SPI 扩展点实现&#xff0c;可以方便地被用户自由扩展和更换。 Dubbo 框架采用分层设计&#xff0c;自上而下共分为十层&#xff0c;各层均为单向依赖&#xff0c;每一层都可以剥离上层被复…

GO——单元测试(test)

go test用来做什么 做单元测试&#xff0c;测试函数是否符合预期 go test在哪个包 testing 如何使用 参考&#xff1a; https://geektutu.com/post/quick-go-test.html 以my_func.go中的Add方法为例 在同一个文件夹下添加my_func_test.go文件 测试文件以_test.go为结尾里…

远程视频会议卡顿!如何改善企业网络连接质量?

您的企业是否有这样的组网挑战&#xff1f; 要将不同分公司/店铺的监控画面汇总到服务器或者平台系统上&#xff0c;却由于地理位置过于分散&#xff0c;而且监控部署环境复杂多样&#xff0c;不同分公司/店铺部署的网络也不一样&#xff0c;有些甚至还是家用网络&#xff0c;…

现在00后开发人员不晓得加班为何事嘛?

我招了两个做HTML5开端开发的人员&#xff0c;是从培训机构招来的&#xff0c;按理说他们应该很努学很用样才对的。他们上班第一天我就跟他们讲&#xff0c;我们不需要上、下班打卡&#xff1b;你们也不必太过担心迟到或早退。因为我们搞开发的人员首先是按自己的工作任务完成情…

【部署LLaMa到自己的Linux服务器】

部署LLaMa到自己的Linux服务器 1、Llama2 项目获取方法1&#xff1a;有git可以直接克隆到本地方法2&#xff1a;直接下载 2、LLama2 项目部署3、申请Llama2许可4、下载模型权重5、运行 1、Llama2 项目获取 方法1&#xff1a;有git可以直接克隆到本地 创建一个空文件夹然后鼠标…

蓝牙网关G602

一、产品概述 G602是一款支持蓝牙4.2/5.0的蓝牙网关&#xff0c;主处理器采用580MHz的MIPS24KEc处理器&#xff0c;DRAM为DDR2 64MB&#xff0c;16MB FLASH。G602蓝牙网关集成PA和LNA&#xff0c;蓝牙扫描和连接距离可以达到100米以上&#xff0c;极大的增加了覆盖范围&#x…

CORS漏洞学习

CORS漏洞属于一个协议漏洞&#xff0c;具体是由于同源策略的设置问题触发的漏洞&#xff0c;漏洞利用条件较为苛刻&#xff0c;但实战中也常见。 首先要了解同源策略 什么是同源策略&#xff1f; 同源策略是一种Web浏览器安全机制&#xff0c;旨在防止网站相互攻击。 同源策…

LeetCode刷题——394. 字符串解码(HOT100)

✊✊✊&#x1f308;大家好&#xff01;本篇文章将较详细介绍栈的题目394. 字符串解码&#xff0c;提供栈和递归两种解法。代码语言为&#xff1a;C代码&#x1f607;。 &#x1f3a1;导航小助手&#x1f3a1; 394. 字符串解码&#x1f512;1、题目&#xff1a;☀️2、思路&…

数学建模-Matlab R2022a安装步骤

软件介绍 MATLAB是一款商业数学软件&#xff0c;用于算法开发、数据可视化、数据分析以及数值计算的高级技术计算语言和交互式环境&#xff0c;主要包括MATLAB和Simulink两大部分&#xff0c;可以进行矩阵运算、绘制函数和数据、实现算法、创建用户界面、连接其他编程语言的程…

2024年【危险化学品经营单位主要负责人】考试报名及危险化学品经营单位主要负责人考试资料

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 危险化学品经营单位主要负责人考试报名考前必练&#xff01;安全生产模拟考试一点通每个月更新危险化学品经营单位主要负责人考试资料题目及答案&#xff01;多做几遍&#xff0c;其实通过危险化学品经营单位主要负责…

NeRF 其三:Instant-NGP

NeRF 其三&#xff1a;Instant-NGP 1. 球谐函数1.1 NeRF 中球谐函数的作用1.2 球谐函数1.2.1 当阶数 j 0 j0 j0 时&#xff0c; m 0 m0 m0&#xff1a;1.2.2 当阶数 j 1 j1 j1 时&#xff0c; m 0 m0 m0&#xff1a;1.2.3 当阶数 j 1 j1 j1 时&#xff0c; m 1 m1 m1&…

SSL弱加密算法的漏洞研究

文章目录 一、什么是 SSL二、SSL/TLS 协议作用三、SSL/TLS 协议的基本思路四、如何保证公钥不被篡改?五、SSLSCAN工具1、下载和安装2、使用六、免责声明一、什么是 SSL SSL 代表安全套接字层。它是一种用于加密和验证应用程序(如浏览器)和 Web 服务器之间发送的数据的协议。…

vue中设置注释模板

参考地址 ctrlshiftp 打开编辑器配置输入configure user snippets - 选择 new global snipp files - 命名为 vueComment&#xff0c;弹出注释模板&#xff0c;即可自定义注释 如下/// 回车 即可在代码块中使用注释 { "Print to console": {"prefix": &q…

什么是游戏盾,游戏盾是如何做到免疫攻击的

什么是游戏盾&#xff1a;游戏盾是针对游戏行业面对的DDoS、CC攻击推出的针对性的网络安全解决方案&#xff0c;相比高防IP&#xff0c;除了能针对大型DDoS攻击&#xff08;T级别&#xff09;进行有效防御外&#xff0c;还具备彻底解决游戏行业特有的TCP协议的CC攻击问题能力&a…

拖拽不够自由?Vue3 DnD它来了!

前言 众所周知&#xff0c;在React中有一款非常强大的拖拽库&#xff0c;叫React DnD&#xff0c;而Vue中&#xff0c;大部分都是类似于vue.draggable等拖拽排序的库&#xff0c;然而它并不能满足我们所有的需求&#xff0c;特别是应对一些自由拖拽或混合拖拽的场景(例如&…