02. Flink 快速上手
1、创建项目导入依赖
pom文件:
<properties><flink.version>1.17.0</flink.version>
</properties><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
</dependency>
2、需求
批处理基本思路:先逐行读取文本,在根据空格进行单词拆分,最后再去统计每个单词出现的频率。
(1)数据准备
在工程目录下新建文件夹input,新建文本words.txt。
文件输入:
hello world
hello flink
hello java
2.1 批处理
代码编写(使用DataSet API实现)
package com.company.onedayflink.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class FlinkBatchWords {public static void main(String[] args) throws Exception {// 1、创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2、从文件中读取数据DataSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");// 3、切分、转换FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {/**** @param value 读取到的输入* @param out 返回的内容,Tuple2是一个二元分组,(字符串,个数)。* @throws Exception*/@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 3.1 切分for (String s : value.split(" ")) {// 3.2 将单组转为二元组Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 3.3 将二元组发送给下游out.collect(tuple);}}});// 4、按照 word 分组UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroup = wordAndOne.groupBy(0); // 0 表示下标为0的参数,也就是二元组的String单词// 5、各分组聚合AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroup.sum(1);//1 表示下标1的元素,即单词个数// 6、输出sum.print();}
}
运行结果:
2.2 流处理
2.2.1 有界流
代码编写(使用DataStream API实现,读取文件属于有界流)
package com.company.onedayflink.demo;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;@Slf4j
public class FlinkStreamWords {public static void main(String[] args) throws Exception {// 1、创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、从文件中读取数据DataStreamSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");// 3、处理数据(切换、转换、分组、聚合)// 3.1 切换、转换SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String s : value.split(" ")) {// 构建二元组Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 通过采集器向下游发送数据out.collect(tuple);}}});// 3.2 分组, KeySelector<IN, KEY> 中 IN 表示输入的类型,KEY 表示分组key的类型KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne.keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0); // value.f0 表示二元组的第一个元素// 3.3 聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOneKS.sum(1); // 1 表示二元组的第二个元素// 4、输出数据sum.print();// 5、执行env.execute();}
}
执行结果:
2> (java,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
6> (world,1)
8> (flink,1)
前面的编号是并行度,线程数。
2.2.2 无界流
(1)使用 netcat 监听7777端口,建立stream流
安装 netcat
brew install netcat
监听 7777 端口
nc -lk 7777
(2)代码编写(使用DataStream API实现,读取stream流属于无界流)
package com.company.onedayflink.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlinkSteamSocketWords {public static void main(String[] args) throws Exception {// 1、创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、读取数据(其中hostname 是需要监听的主机名称,mac电脑可以在终端使用hostname命令查看)DataStreamSource<String> socketDS = env.socketTextStream("zgyMacBook-Pro.local", 7777);// 3、数据处理(切割、转换、分组、聚合)SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {// 3.1 切分for (String s : value.split(" ")) {// 3.2 将单组转为二元组Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 3.3 将二元组发送给下游out.collect(tuple);}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);// 4、输出sum.print();// 5、执行env.execute();}
}
(3)测试
在终端发送消息
hello flink
hello world
观察程序控制台打印
8> (flink,1)
3> (hello,1)
6> (world,1)
3> (hello,2)