Flink作为一个非常优秀的大数据实时计算框架,在很多从事大数据开发的公司都是必备的技能,接下来我将通过Flink以批处理来实现入门案例WordCount
1:步骤一
idea新建设maven项目,并且自己配置好maven环境
2:步骤二
在pom文件中加入下面的依赖和配置
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--Flink 版本--><flink.version>1.13.0</flink.version><!--JDK 版本--><java.version>1.8</java.version><slf4j.version>1.7.30</slf4j.version><!--Scala 2.11 版本--><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>exec-maven-plugin</artifactId><version>1.6.0</version><executions><execution><goals><goal>java</goal></goals></execution></executions><configuration><classpathScope>test</classpathScope></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>
3:步骤三
配置 log4j,在resources下面建一个文件log4j.properties,里面内容如下
# Output pattern : date [thread] priority category - message
log4j.rootLogger=WARN,CONSOLE,RollingFile#CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n#RollingFile
log4j.appender.RollingFile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingFile.File=logs/signserver.log
log4j.appender.RollingFile.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingFile.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n#Project default level
log4j.logger.com.ntko.sign=debug
4:步骤四
新建一个input目录,并且新建words.txt文件,文件中输入如下内容
hello world hello flink hello java
5:步骤五
新建一个BatchWordCount类,里面代码如下
public class BatchWordCount {public static void main(String[] args) throws Exception {//1:创建执行环境ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();//2:从文件读取数据DataSource<String> dataSource = executionEnvironment.readTextFile("input/words.txt");//将每行数据进行分词,转换成二元组类型FlatMapOperator<String, Tuple2<String, Long>> returns = dataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {//将一行文本进行分词String[] words = line.split(" ");//将每个单词转换成二元组输出for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));//按照word进行分组UnsortedGrouping<Tuple2<String, Long>> tuple2UnsortedGrouping = returns.groupBy(0);//分组内进行聚合统计AggregateOperator<Tuple2<String, Long>> sum = tuple2UnsortedGrouping.sum(1);sum.print();}
}
6:步骤六
输出结果 可以看到flink单词出现了1次,world出现了1次,hello出现了三次,java出现了一次