FLink处理函数简介
在Flink底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的【处理】(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作【处理函数】(process function)。在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。
Flink几种处理函数简介
- ProcessFunction是用于处理数据流的通用函数。它是一个抽象类,定义了处理数据流的常用方法,如processElement,onTimer等。您可以扩展ProcessFunction类并重写这些方法,以便在Flink程序中执行复杂的数据流处理逻辑。
- KeyedProcessFunction是ProcessFunction的特殊类型,用于处理带有键的数据流。它定义了额外的方法,如getKey,context.timerService()等,用于访问数据流中每个元素的键以及在处理函数中安排定时器。
- ProcessWindowFunction和ProcessAllWindowFunction是用于处理时间窗口的特殊函数。它们提供了一个process方法,用于在每个窗口中对数据进行处理。ProcessWindowFunction接受带有键的数据流,并且每个窗口都对应于一个键,而ProcessAllWindowFunction接受不带键的数据流,并且每个窗口都包含整个数据流。
这里重点介绍KeyedProcessFunction,KeyedProcessFunction是用来处理KeyedStream的。每有一个数据进入算子,则会触发一次processElement()的处理。它还提供了定时器的功能,在在预警、监控等场景特定场景下,非常适合。
KeyedProcessFunction定时器包分为两种:基于事件时间、基于处理时间。下面以统计计数的方式展示这两种定时器的用法,并附上详细的分析思路。以下用例基于Flink1.14
实例分析
KeyedProcessFunction基于事件时间的定时器
代码:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;/*** @description:** @author pony* @date 2024/1/17 20:55* @version 1.0* nc -l 9999*/
public class KeyedProcessFunctionOnTimerEventTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(60)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {return Long.valueOf(element.split(",")[1]);}}).withIdleness(Duration.ofSeconds(1));DataStream<Tuple2<String, Long>> stream0 = env.socketTextStream("x.x.x.x", 9999).assignTimestampsAndWatermarks(watermarkStrategy) //必须在数据源上指定watermark.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {return new Tuple2<String, Long>(value.split(",")[0], Long.valueOf(value.split(",")[1]));}});// apply the process function onto a keyed streamDataStream<Tuple2<String, Long>> result = stream0.keyBy(value -> value.f0).process(new CountEventTimeWithTimeoutFunction());result.print();env.execute("KeyedProcessFunction wordCount");}/*** The implementation of the ProcessFunction that maintains the count and timeouts*/static class CountEventTimeWithTimeoutFunctionextends KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>> {private ValueState<Long> state;private static final Integer DELAY = 1000; //1s@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));}@Overridepublic void processElement(Tuple2<String, Long> value,Context ctx,Collector<Tuple2<String, Long>> out) throws Exception {Long current = state.value();if (current == null) {current = 0L;}current++;state.update(current);//获取当前数据流的水位线long currentWatermark = ctx.timerService().currentWatermark();// long timer = ctx.timestamp() + DELAY;//设置定时器的时间为当前event time+DELAYlong timer = currentWatermark + DELAY;//设置定时器的时间为当前水位线+DELAY//注册事件时间定时器,与watermark绑定,必须满足条件: watermark >= timer 来触发特定event的定时器ctx.timerService().registerEventTimeTimer(timer);//删除事件时间定时器if (currentWatermark < 0) {ctx.timerService().deleteEventTimeTimer(timer);}System.out.println("last Watermark: " + currentWatermark + ", format: " + time(currentWatermark));// 打印信息,用于核对数据System.out.println(String.format("processElement: %s, %d, ctx.timestamp() : %d (%s), timer : %d (%s)\n",ctx.getCurrentKey(),current,ctx.timestamp(),time(ctx.timestamp()),timer,time(timer)));}@Overridepublic void onTimer(long timestamp, //定时器触发时间,等于以上的timerOnTimerContext ctx,Collector<Tuple2<String, Long>> out) throws Exception {// 取得当前单词String currentKey = ctx.getCurrentKey();// get the state for the key that scheduled the timerLong result = state.value();// 打印数据,用于核对是否符合预期System.out.println(String.format("onTimer: %s, %d, ctx.timestamp() : %d (%s), timestamp : %d (%s)\n",currentKey,result,ctx.timestamp(),time(ctx.timestamp()),timestamp,time(timestamp)));System.out.println("current Watermark: " + ctx.timerService().currentWatermark() + ", format: " + time(ctx.timerService().currentWatermark()));out.collect(new Tuple2<String, Long>(currentKey, result));}@Overridepublic void close() throws Exception {super.close();state.clear();}}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(timeStamp));}
}
测试数据:
nc -l 9999
a1,1704038400000
a1,1704038401000
a1,1704038403000
运行结果:
KeyedProcessFunction基于处理时间的定时器
代码:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;/*** @description:** @author pony* @date 2024/1/17 20:55* @version 1.0* nc -l 9999*/
public class KeyedProcessFunctionOnTimerProcessTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(60)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {
// return System.currentTimeMillis();return Long.valueOf(element.split(",")[1]);}}).withIdleness(Duration.ofSeconds(1));DataStream<Tuple2<String, Long>> stream0 = env.socketTextStream("x.x.x.x", 9999).assignTimestampsAndWatermarks(watermarkStrategy) //必须在数据源上指定watermark.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {return new Tuple2<String, Long>(value.split(",")[0], Long.valueOf(value.split(",")[1]));}});// apply the process function onto a keyed streamDataStream<Tuple2<String, Long>> result = stream0.keyBy(value -> value.f0).process(new CountProcessTimeWithTimeoutFunction());result.print();env.execute("KeyedProcessFunction wordCount");}static class CountProcessTimeWithTimeoutFunctionextends KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>> {private ValueState<Long> state;private static final Integer DELAY = 60 * 1000; //1s@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));}@Overridepublic void processElement(Tuple2<String, Long> value,Context ctx,Collector<Tuple2<String, Long>> out) throws Exception {Long current = state.value();if (current == null) {current = 0L;}current++;state.update(current);long timer = ctx.timestamp() + DELAY;//设置定时器的时间为当前event time+DELAY//注册处理时间定时器, 与watermark无关,定时器触发条件:当前系统时间>timerctx.timerService().registerProcessingTimeTimer(timer);//删除处理时间定时器
// ctx.timerService().deleteProcessingTimeTimer(timer);System.out.println("processElement currentProcessingTime: " + ctx.timerService().currentProcessingTime() + ", format: " + time(ctx.timerService().currentProcessingTime()));// 打印所有信息,用于核对数据System.out.println(String.format("processElement: %s, %d, ctx.timestamp() : %d (%s), timer : %d (%s)\n",ctx.getCurrentKey(),current,ctx.timestamp(),time(ctx.timestamp()),timer,time(timer)));}@Overridepublic void onTimer(long timestamp,OnTimerContext ctx,Collector<Tuple2<String, Long>> out) throws Exception {// 取得当前单词String currentKey = ctx.getCurrentKey();// get the state for the key that scheduled the timerLong result = state.value();System.out.println("onTimer currentProcessingTime: " + ctx.timerService().currentProcessingTime() + ", format: " + time(ctx.timerService().currentProcessingTime()));// 打印数据,用于核对是否符合预期System.out.println(String.format("onTimer: %s, %d, ctx.timestamp() : %d (%s), timestamp : %d (%s)\n",currentKey,result,ctx.timestamp(),time(ctx.timestamp()),timestamp,time(timestamp)));//另外还支持侧流OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("single"){};if (result < 2) {ctx.output(outputTag, new Tuple2<>(currentKey, result));} else {out.collect(new Tuple2<String, Long>(currentKey, result));}}@Overridepublic void close() throws Exception {super.close();state.clear();}}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(timeStamp));}
}
测试数据:
nc -l 9999
a,1705568024000
a,1705568024000
运行结果:
总结
在真实业务场景中【 KeyedProcessFunction基于处理时间的定时器】用的比较多,比较符合业务场景,即根据事件的时间来指定处理时间去定时触发定时器。因此在此场景中,可以不指定watermarkStrategy,可以获取传输参数的时间时间来定时触发定时器。
参考:
Process Function
Generating Watermarks