创建工程
pom.xml文件依赖如下:
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><!--依赖的一些组件需要 Scala 环境--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency></dependencies>
定义输入源
- 在 resources 目录下创建 hello.txt 作为输入源, 内容单词自定义.
- 在 Linux 环境下使用
nc -lk 端口
模拟网络流式输入.
编写代码
批处理方式 WordCount
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;import java.io.InputStream;/*** {@link DataSet} 批处理 api, 处理离线数据* @author regotto*/
public class WordCount {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> dataSource = env.readTextFile("D:\\CodeRepository\\flink-study\\src\\main\\resources\\hello.txt");// 将单词按照空格分割, 变为 (word, 1) 形式的二元组DataSet<Tuple2<String, Integer>> resultSet = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split("\\W+");for (String s : words) {out.collect(new Tuple2<String, Integer>(s, 1));}}// 按照 tuple2 index = 0 进行分组, 按照 index = 1 进行求和}).groupBy(0).sum(1);resultSet.print();}
}
流式 WordCount
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** {@link DataStream} 流式api, 处理实时数据* @author regotto*/
public class StreamWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从文件中获取输入流
// DataStream<String> source = env.readTextFile("D:\\CodeRepository\\flink-study\\src\\main\\resources\\hello.txt");// 从 socket 文本流读取数据, 模拟 flink 从 kafka获取数据DataStreamSource<String> source = env.socketTextStream("localhost", 7777);DataStream<Tuple2<String, Integer>> resultStream = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split("\\W+");for (String s : words) {out.collect(new Tuple2<String, Integer>(s, 1));}}// 按照 tuple2 index = 0 进行分组, 按照 index = 1 进行求和}).keyBy(0).sum(1);resultStream.print();env.execute();}
}
使用 nc -lk
监听7777端口, 模拟网络流式输入.
执行结果如下:
结论
批处理: 将所有文本处理完, 才统计输出.
流式: 在开发环境中, 每读取一行文本就计数一次, 进行统计输出.