1、概述
需要实现 SimpleAccumulator 接口,并重写 add,getLocalValue,resetLocal,merge,clone 等方法。
2、代码示例
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.SimpleAccumulator;@NoArgsConstructor
@AllArgsConstructor
@Data
public class _05_CustomSimpleAccumulator implements SimpleAccumulator<Integer> {private Integer count;@Overridepublic void add(Integer value) {this.count += value;}@Overridepublic Integer getLocalValue() {return this.count;}@Overridepublic void resetLocal() {this.count = 0;}@Overridepublic void merge(Accumulator<Integer, Integer> other) {this.count += other.getLocalValue();}@Overridepublic Accumulator<Integer, Integer> clone() {return new _05_CustomSimpleAccumulator(this.count);}
}
3、使用自定义的累加器
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class _05_MyCustomAccumulator {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.disableOperatorChaining();_05_CustomSimpleAccumulator counterAccumulator = new _05_CustomSimpleAccumulator(0);env.socketTextStream("localhost", 8888).keyBy(e -> e).process(new KeyedProcessFunction<String, String, String>() {@Overridepublic void open(Configuration parameters) throws Exception {getRuntimeContext().addAccumulator("word-count-with-default", counterAccumulator);}@Overridepublic void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx, Collector<String> out) throws Exception {counterAccumulator.add(1);out.collect(value);}}).print();JobExecutionResult jobExecutionResult = env.execute();Object accumulatorResult = jobExecutionResult.getAccumulatorResult("word-count-with-default");System.out.println("累加器结果=>" + accumulatorResult);}
}