水位线特点
- 插入到数据流中的一个标记,可以认为是一个特殊的数据
- 主要内容是一个时间戳
- 水位线是基于数据的时间戳生成的,即
事件时间
- 水位线必须单调递增
- 水位线可以通过设置延迟,来保证正确处理乱序数据
- 一个水位线,表示事件时间已经达到了时间戳t
- 水位线是Flink流处理中保证结果正确性的核心机制
窗口
错误理解:窗口是一个固定位置的框,数据流源源不断地流过来,到某个时间窗口该关闭了,就停止收集数据,触发计算并窗口关闭
输出结果。
Flink中窗口是动态创建的,当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。事实上,触发计算和窗口关闭两个行为可以分开。
总体原则
水位线出现表示这个时间之前的数据已经全部到齐,之后再也不会出现了,不过要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。水位线是流处理中对低延迟和结果正确性的一个权衡机制。
水位线生成方案
水位线的生成位置:越靠近数据源越好
WatermarkStrategy:水位线策略对象
1. 水位线生成器 WatermarkGenerator
- onEvent() 给每条数据生成水位线
- onPeriodicEmit():周期性生成水位线
2. 时间戳分配器 TimestampAssigner
- extractTimestamp()
有序流水位线生成的代码如下:
public class Flink01_UserDefineWaterMarkStrategy {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//设置生成水位线的周期env.getConfig().setAutoWatermarkInterval(1000);//tom,/home,1000SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888).map(line -> {String[] words = line.split(",");return new Event(words[0].trim(),words[1].trim(),Long.valueOf(words[2].trim()));});ds.print("input");ds.assignTimestampsAndWatermarks(new MyWatermarkStrategy());ds.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}public static class MyWatermarkStrategy implements WatermarkStrategy<Event>{/*** 创建水位线生成器* @param context* @return*/@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyWatermarkGenerator();}public static class MyWatermarkGenerator implements WatermarkGenerator<Event>{private Long maxTs = Long.MIN_VALUE;/*** 每一条数据调用一次,用于生成一次水位线* @param event* @param eventTimestamp* @param output*/@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {//有序流,每条数据生成水位线
// System.out.println("有序流每条数据生成水位线===》"+eventTimestamp);
// output.emitWatermark(new Watermark(eventTimestamp));maxTs = Math.max(maxTs, eventTimestamp);}/*** 周期性生成水位线* 默认周期是200ms* @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {//有序流,周期性生成水位线System.out.println("有序流周期性生成水位线===》"+maxTs);output.emitWatermark(new Watermark(maxTs));}}/*** 创建时间戳分配器,用于从数据中提取时间戳* @param context* @return*/@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new MyTimestampAssigner();}}public static class MyTimestampAssigner implements TimestampAssigner<Event>{/*** 从数据中提取时间戳* @param element The element that the timestamp will be assigned to.* @param recordTimestamp The current internal timestamp of the element, or a negative value, if* no timestamp has been assigned yet.* @return*/@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTs();}}}