前言
Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。
一、代码基础格式
//1st 设置执行环境
xxxEnvironment env = xxxEnvironment.getEnvironment;//2nd 设置流
DataSource xxxDS=env.xxxx();//3rd 设置转换
Xxx transformation =xxxDS.xxxx();//4th 设置sink
transformation.print();//5th 可能需要
env.execute();
二、Demo1 批处理
-
源码
public static void main(String[] args) throws Exception {//1,创建一个执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2,获取输入流DataSource<String> lineDS = env.readTextFile("input/word.txt");//3,处理数据FlatMapOperator<String, Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {//3.1 分隔字符串String[] values = value.split(" ");//3.2 汇总统计for (String word : values) {Tuple2<String, Integer> wordTuple = Tuple2.of(word, 1);collector.collect(wordTuple);}}});//4,按单词聚合UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = wordDS.groupBy(0);//5,分组内聚合AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);//6,输出结果sum.print();}
-
效果展示
三、Demo2 流处理
-
源码
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = value.split(" ");for (String word : words) {Tuple2<String, Integer> temp = Tuple2.of(word, 1);collector.collect(temp);}}});KeyedStream<Tuple2<String, Integer>, Tuple> wordCountKeyBy = wordDS.keyBy(0);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordCountKeyBy.sum(1);sum.print();env.execute();}
-
效果展示
四、Demo3 无边界流处理
-
源码
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lineDS = env.socketTextStream("192.168.3.11", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);sum.print();env.execute();}
-
效果展示
往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计