Apache Flink创建模板项目有2种方式:
1. 通过Maven archetype命令创建;
2. 通过Flink 提供的Quickstart shell脚本创建;
关于Apache Flink的环境搭建,请参考相关链接:
Apache Flink快速入门-基本架构、核心概念和运行流程
Apache Flink v1.8 本地单机环境安装和运行Flink应用
1. 通过Maven archetype创建Flink项目
#使用Maven创建
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.8.0
-DgroupId=com.rickie
-DartifactId=flink-tutorial
-Dversion=0.1
-Dpackage=com.rickie.tutorial
-DinteractiveMode=false
参数说明:
原型archetype有关参数表
项目相关参数:
通过上述mvn 命令创建的Java模板项目结构。
从上述项目结构可以看出,该项目是一个比较完善的Maven项目,其中Java代码部分,BatchJob.java和StreamingJob.java 分别对应Flink 批量接口DataSet的实例代码和流式接口DataStream的实例代码。
2. 编写业务代码
将上述项目导入到IDEA中,Flink应用程序模板如下图所示。
打开StreamingJob.java文件,实现简单的单词统计(Word Count)业务功能。
具体代码如下所示。
package com.rickie.tutorial;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Skeleton for a Flink Streaming Job.
*
*
For a tutorial how to write a Flink streaming application, check the
* tutorials and examples on the Flink Website.
*
*
To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
*
If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
*/
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
// 设置streaming运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 连接socket获取输入的数据
DataStream text = env.socketTextStream("127.0.0.1