相关资料
文档内容 | 链接地址 |
---|---|
datagen生成器 | https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/datagen/ |
print 生成器 | https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/print/ |
准备工作
优点就是下载个idea就能体验,无需配置的环境(如 数据源等)
1、idea 开发工具
2、创建 maven 项目 – archetype 选择quickstart 表示java开发
java代码
代码逻辑
1、采用datagen 生成器,作为数据 source
2、采用print 作为打印器,作为sink 直接输出
package org.example;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSqlDemo {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 创建输入表(使用DataGen生成测试数据)String sourceDDL = "CREATE TABLE user_behavior (\n" +" user_id BIGINT,\n" +" behavior STRING,\n" +" ts TIMESTAMP(3)\n" +") WITH (\n" +" 'connector' = 'datagen',\n" +" 'rows-per-second' = '1',\n" +" 'fields.user_id.kind' = 'random',\n" +" 'fields.user_id.min' = '1',\n" +" 'fields.user_id.max' = '2',\n" +" 'fields.behavior.length' = '2'\n" +")";// 创建输出表(打印结果)String sinkDDL = "CREATE TABLE print_table (\n" +" behavior STRING,\n" +" cnt BIGINT\n" +") WITH (\n" +" 'connector' = 'print'\n" +")";// 执行DDLtableEnv.executeSql(sourceDDL);tableEnv.executeSql(sinkDDL);// 执行查询并插入结果Table resultTable = tableEnv.sqlQuery("SELECT behavior, COUNT(*) AS cnt " +"FROM user_behavior " +"GROUP BY behavior");// 插入到输出表resultTable.executeInsert("print_table").await();// 执行任务(流式任务需要保持运行)env.execute("Flink SQL Demo");}
}
pom依赖配置
`
4.0.0
<groupId>org.example</groupId>
<artifactId>flinklearn</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.17.1</flink.version>
</properties><dependencies><!-- Flink Core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><!-- Flink实时流--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Flink Table --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.0.1</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.example.FlinkSqlDemo</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>