1、概述
Histogram 是 key-value 累加器。
2、代码示例
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.Map;
import java.util.TreeMap;/*** key-value 累加器*/
public class _04_HistogramAccumulator {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 8888);// 创建累加器对象Histogram histogram = new Histogram();SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() {@Overridepublic void open(Configuration parameters) throws Exception {getRuntimeContext().addAccumulator("word_count", histogram);}@Overridepublic void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
// if ("a".equals(value)) {
// histogram.add(1);
// } else if ("b".equals(value)) {
// histogram.add(2);
// } else if ("c".equals(value)) {
// histogram.add(3);
// } else {
// histogram.add(4);
// }histogram.add(value.hashCode());out.collect("process=>" + value);}});process.print();JobExecutionResult jobExecutionResult = env.execute();TreeMap<Integer, Integer> treeMap = jobExecutionResult.getAccumulatorResult("word_count");for (Map.Entry<Integer, Integer> entry : treeMap.entrySet()) {System.out.println("entry=>" + entry.getKey() + "-" + entry.getValue());}}
}