背景
在日常的工作中,对数据去重是一件很常见的操作,比如我们只需要保留重复记录的第一条,而忽略掉后续重复的记录,达到去重的效果,本文就使用flink的FilterMap和ProcessFunction来实现去重逻辑
FilterMap和ProcessFunction去重实现
filterMap实现去重
public class DuplicateRichFlatMap extends RichFlatMapFunction<WikipediaEditEvent, WikipediaEditEvent> {ValueState<Boolean> duplicateInput;@Overridepublic void open(Configuration parameters) throws Exception {duplicateInput = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("duplicate", Types.BOOLEAN));}@Overridepublic void flatMap(WikipediaEditEvent in, Collector<WikipediaEditEvent> collector) throws Exception {if (duplicateInput.value() == null) {collector.collect(in);duplicateInput.update(true);}}}
这里实现的关键就是有一个key-value的flink状态
ProcessFunction去重
public class DupliacateProcessFunction extends KeyedProcessFunction<String, WikipediaEditEvent, WikipediaEditEvent> {ValueState<Boolean> duplicateInput;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<Boolean>("previousInput", Types.BOOLEAN);// 状态ttl超时时间设置StateTtlConfig ttlConfig =StateTtlConfig.newBuilder(Time.days(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// check 10 keys for every state access.cleanupIncrementally(100, false).build();stateDescriptor.enableTimeToLive(ttlConfig);duplicateInput = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(WikipediaEditEvent in, Context context, Collector<WikipediaEditEvent> collector)throws Exception {if (duplicateInput.value() == null) {collector.collect(in);duplicateInput.update(true);}}
}
这里的关键代码也是拥有一个key-value的状态
触发计算的job代码如下
public class DuplicateJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();see.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);DataStream<WikipediaEditEvent> edits = see.addSource(new RandomStringSource());KeyedStream<WikipediaEditEvent, String> keyedEdits = edits.keyBy(new KeySelector<WikipediaEditEvent, String>() {@Overridepublic String getKey(WikipediaEditEvent event) {return event.getUser();}});// 通过RichFlatMap实现去重DataStream<WikipediaEditEvent> result = keyedEdits.flatMap(new DuplicateRichFlatMap());// 通过ProcessFunction实现去重
// DataStream<WikipediaEditEvent> result = keyedEdits.process(new DupliacateProcessFunction());result.print();see.execute();}
}