Flink学习笔记
前言:今天是学习 flink 的第 18 天啦!很多小伙伴私信说,自己只会SQL语法来编写flinkSQL,如何使用代码来操作呢?因为工作中都是要用到代码编写的。还有小伙伴说,想要实现表是动态变化的,解决汇率等数据表动态拉宽问题。
因此我今天学习了 flinkSQL 顶层 API ——用代码实现 FlinkSQL 语法操作(涵盖全面实用的 API ),以及 FlinkSQL 的流处理,重点学习动态表和时间特性,时区特性等,主要是解决大数据领域如何用代码方式实现 FlinkSQL 操作,这里的语法和 MYSQL 的 API稍有区别,以及动态关联拉宽数据表的大数据问题,因此总结了此篇文章,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流,希望对大家有帮助!
Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!
喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"
文章目录
- Flink学习笔记
- 四、FlinkSQL 中 Catalog 操作
- 1. 初始化操作
- 2. 数据库操作
- 3. 数据表操作
- 4. 视图操作
- 5. 分区操作
- 6. 函数操作
- 1. 时间特性
- 1.1 处理时间(Processing Time)
- 1.1.1 在建表的 DDL 中指定
- 1.1.2 DataStream 转为 Table 时指定
- 1.2 事件时间(Event Time)
- 1.2.1 在建表的 DDL 中指定
- 1.2.2 DataStream 转为 Table 时指定
- 2. 时区特性
- 2.1 TimeStamp 和 TimeStamp_LTZ 区别
- 2.2 代码中设置时间
- 2.3 时间属性和时区
- 2.3.1 时间新特性
- 2.3.2 Socket 数据源案例
- 3. 时态表
- 3.1 版本表和普通表
- 3.2 时态表函数
- 3.2.1 批的方式时态表
- 3.2.2 流的方式时态表
- 3.3时态表 Join VS 双流 Join
- 3.3.1 基于处理时间 | 事件时间的时态 Join
- 3.3.2 案例演示
- 3.4 Lookup Join
四、FlinkSQL 中 Catalog 操作
1. 初始化操作
// 1.实例化目录
HiveCatalog catalog = new HiveCatalog(catalogName, // catalog name"default", // default database"src/main/resources", // Hive config (hive-site.xml) directory"2.1.1" // Hive version
);// 2.注册目录
tableEnv.registerCatalog(catalogName, catalog);// 3.使用目录
tableEnv.useCatalog(catalogName);
2. 数据库操作
// 1. 创建数据库
catalog.createDatabase(databaseName,new CatalogDatabaseImpl(new HashMap<>(), "my comment"), true);// 2. 删除数据库
catalog.dropDatabase(databaseName, true);// 3. 检验数据库
catalog.databaseExists(databaseName)// 4. 罗列数据库
catalog.listDatabases();
3. 数据表操作
// 1.创建表
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);// 2.删除表
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);// 3.修改表
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);// 4.重命名表
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");// 5.获得表
catalog.getTable("mytable");// 6.判断表是否存在
catalog.tableExists("mytable");// 7.返回数据库所有表的列表
catalog.listTables("mydb");
4. 视图操作
// 1.创建视图
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);// 2.删除视图
catalog.dropTable(new ObjectPath("mydb", "myview"), false);// 3.修改视图
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);// 4.重命名视图
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);// 5.获得视图
catalog.getTable("myview");// 6.检查视图是否存在
catalog.tableExists("mytable");// 7.获得数据库所有视图
catalog.listViews("mydb");
5. 分区操作
// 1.创建分区
catalog.createPartition(new ObjectPath("mydb", "mytable"),new CatalogPartitionSpec(...),new CatalogPartitionImpl(...),false);// 2.删除分区
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);// 3.修改分区
catalog.alterPartition(new ObjectPath("mydb", "mytable"),new CatalogPartitionSpec(...),new CatalogPartitionImpl(...),false);// 4.获得分区
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));// 5.检查分区是否存在
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));// 6.返回表所有的分区
catalog.listPartitions(new ObjectPath("mydb", "mytable"));// 7.列出给定分区规范下表的分区
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));// 8.按表达式筛选器列出表的分区
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
6. 函数操作
// 1.创建函数
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);// 2.删除函数
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);// 3.修改函数
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);// 4.获得函数
catalog.getFunction("myfunc");// 5.检查函数是否存在
catalog.functionExists("myfunc");// 6.列出数据库中的函数
catalog.listFunctions("mydb");
## 五、FlinkSQL 流处理
1. 时间特性
1.1 处理时间(Processing Time)
简介:机器在本地生成的时间,不需要提取时间戳,也不需要水印!
1.1.1 在建表的 DDL 中指定
例子:文件系统建表
package cn.itcast.day01.time;/*** @author lql* @time 2024-03-15 21:14:58* @description TODO*/import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;/*** 在flinksql中使用处理时间* 在创建表的DDL中指定处理时间*/
public class ProcessingTimeTableDDL {public static void main(String[] args) throws Exception {// todo 0)设置当前hadoop操作的用户名System.setProperty("HADOOP_USER_NAME", "root");// todo 1)初始化flink流处理的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bbSetting = EnvironmentSettings.newInstance().useBlinkPlanner().build();StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bbSetting);// todo 2) 指定并行度env.setParallelism(1);// todo 3) 读取文件数据String path = ProcessingTimeTableDDL.class.getClassLoader().getResource("order.csv").getPath();// todo 4)创建HiveCatalogString catalogName = "myHive";String databaseName = "itcast_flinksql";HiveCatalog catalog = new HiveCatalog(catalogName, //指定catalog的名字"default", //默认数据库的名字"src/main/resources", //指定hive-site.xml文件的路径"2.1.1" //指定hive的版本);// todo 5)注册目录System.out.println("===========注册目录==================");tabEnv.registerCatalog(catalogName, catalog);// todo 6)切换目录System.out.println("===========切换目录==================");tabEnv.useCatalog(catalogName);// todo 7)创建数据库System.out.println("===========创建数据库==================");String createDBSql = "CREATE DATABASE IF NOT EXISTS "+catalogName+"."+databaseName;tabEnv.executeSql(createDBSql);// todo 8)切换数据库System.out.println("===========切换数据库==================");tabEnv.useDatabase(databaseName);//todo 9)根据文件路径创建表String sqlDDL = "create table InputTable (" +" `id` varchar," +" `timestamp` bigint," +" `money` double," +" `category` varchar," +" `pt` AS PROCTIME()" +" ) with (" +" 'connector' = 'filesystem'," +" 'path' = 'file:///"+path+"'," +" 'format' = 'csv'" +" )";tabEnv.executeSql(sqlDDL);Table resultTable = tabEnv.sqlQuery("select * from InputTable ");resultTable.printSchema();// 打印输出tabEnv.toAppendStream(resultTable, Row.class).print("result");env.execute();}
}
总结:在建表的时候增加一列:pt AS PROCTIME()
,注意 timestamp 需要为 BIGINT 类型!
1.1.2 DataStream 转为 Table 时指定
例子:文件流转化为表
package cn.itcast.day01.time;/*** @author lql* @time 2023-06-28 22:10:38* @description TODO*/
import cn.itcast.day01.example.DataStreamToTable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.ConnectTableDescriptor;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;/*** 在flinksql中使用处理时间* 在DataStream转化成Table时候指定处理时间*/
public class ProcessingTimeDataStream {public static void main(String[] args) throws Exception {//todo 1)创建流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bsSettings);//todo 2)设置并行度env.setParallelism(1);//todo 3)从文件中读取数据String path = DataStreamToTable.class.getClassLoader().getResource("order.csv").getPath();DataStreamSource<String> inputDataStream = env.readTextFile(path);//inputDataStream.print();//todo 4)将读取的字符串数据转换成pojoSingleOutputStreamOperator<OrderBean> orderDataStream = inputDataStream.map(new MapFunction<String, OrderBean>() {@Overridepublic OrderBean map(String value) throws Exception {String[] dataArray = value.split(",");return new OrderBean(dataArray[0], dataArray[1], Double.parseDouble(dataArray[2]), dataArray[3]);}});//todo 4)基于tableEnv,将流转换成表//The proctime attribute 'timestamp' must not replace an existing field.Table table = tabEnv.fromDataStream(orderDataStream, $("id"), $("timestamp"), $("money"), $("category"),$("pt").proctime());table.printSchema();//todo 6)对table对象使用table api编程的方式进行数据的查询操作Table tableResult = table.select($("id"), $("timestamp"), $("money"), $("category")).filter($("category").isEqual("电脑"));//todo 7)对table对象使用sql编程的方式进行数据的查询操作//7.1:将table对象注册为一张表或者视图tabEnv.createTemporaryView("orderTable", table);//7.2:对表的数据进行操作Table sqlResult = tabEnv.sqlQuery("select id,`timestamp`,money,category from orderTable where category='电脑'");//todo 8)将table对象的数据进行输出//如果将table表对象的数据进行打印输出,但是table是不存在print方法的,因此需要将table再次转回dataStream才可以进行输出打印tabEnv.toAppendStream(sqlResult, Row.class).print("SQL API>>>");//todo 10)运行作业env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class OrderBean{private String id;private String timestamp;private Double money;private String category;}
}
总结:转化为表指定字段时,添加一列$("pt").proctime()
Table table = tabEnv.fromDataStream(orderDataStream, $("id"), $("timestamp"), $("money"), $("category"),$("pt").proctime());
1.2 事件时间(Event Time)
1.2.1 在建表的 DDL 中指定
例子:文件系统建表
package cn.itcast.day02.time;/*** @author lql* @time 2024-03-15 12:31:52* @description TODO*/import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** 基于事件时间使用时间属性* 创建表的时候指定事件时间*/
public class EventTimeTableDDL {public static void main(String[] args) throws Exception {// todo 1)构建flink流处理的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// todo 2)设置并行度env.setParallelism(1);// todo 3)构建flink的表运行环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);//todo 4)创建表的sql语句String filePath = EventTimeTableDDL.class.getClassLoader().getResource("order.csv").getPath();String sqlDDL = "CREATE TABLE InputTable (\n" +" `id` varchar,\n" +" `timestamp` bigint,\n" +" `money` double,\n" +" `category` varchar,\n" +" `rt` AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),\n" +" watermark for rt as rt - interval '1' second\n" +") WITH (\n" +" 'connector' = 'filesystem',\n" +" 'path' = 'file:///"+filePath+"',\n" +" 'format' = 'csv'\n" +")";// todo 5)执行表的sql语句tabEnv.executeSql(sqlDDL);// todo 6)打印表的结构信息Table table = tabEnv.sqlQuery("select * from InputTable");table.printSchema();// todo 7) 运行启动tabEnv.toAppendStream(table, Row.class).print();env.execute();}
}
总结:
- 1- 建表时加上:
rt
AS TO_TIMESTAMP(FROM_UNIXTIME(timestamp
)) - 2- 记得加上水印:watermark for rt as rt - interval ‘1’ second
1.2.2 DataStream 转为 Table 时指定
例子:文件流转化为表
package cn.itcast.day02.time;/*** @author lql* @time 2024-03-15 12:42:47* @description TODO*/import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;/*** 基于事件时间使用时间属性* 将dataStream转换成表的时候指定事件时间*/
public class EventTimeDataStream {public static void main(String[] args) throws Exception {// todo 1)构建flink流处理的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// todo 2)设置并行度env.setParallelism(1);// todo 3)构建flink的表运行环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);String path = EventTimeDataStream.class.getClassLoader().getResource("order.csv").getPath();DataStreamSource<String> inputDataStream = env.readTextFile(path);// todo 4)将读取到的字符串转化为pojoSingleOutputStreamOperator<OrderBean> orderDataStream = inputDataStream.map(new MapFunction<String, OrderBean>() {@Overridepublic OrderBean map(String value) throws Exception {String[] dataArray = value.split(",");return new OrderBean(dataArray[0], Long.parseLong(dataArray[1]), Double.parseDouble(dataArray[2]), dataArray[3]);}});/*** assignTimestampsAndWatermarks() 方法将水印生成器和时间戳分配器应用于数据流。* WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) 为有界无序的水印生成策略,设置最大允许的乱序时间为零。* withTimestampAssigner(new MyTimeAssiger()) 将时间戳分配器设置为 MyTimeAssiger 类的实例,用于从数据中提取时间戳。*/SingleOutputStreamOperator<OrderBean> waterMarkStream = orderDataStream.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderBean>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new MyTimeAssiger()));// todo 5)将dataStream转化为表对象//替换现有字段Table table = tabEnv.fromDataStream(waterMarkStream, $("id"),$("money"), $("category"), $("timestamp").rowtime());//作为新字段追加到schemaTable table2 = tabEnv.fromDataStream(waterMarkStream, $("id"),$("timestamp"),$("money"), $("category"), $("rt").rowtime());//todo 7)将表转换成table对象tabEnv.toAppendStream(table, Row.class).print();tabEnv.toAppendStream(table2, Row.class).print();env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class OrderBean{private String id;private Long timestamp;private Double money;private String category;}/*** 自定义指定事件时间字段*/private static class MyTimeAssiger implements SerializableTimestampAssigner<OrderBean> {@Overridepublic long extractTimestamp(OrderBean orderBean, long l) {return orderBean.getTimestamp() * 1000L;}}
}
总结:
- 1- 作为新字段追加到 schema:$(“timestamp”).rowtime()
- 2- 作为新字段追加到 schema:$(“rt”).rowtime()
- 3- 事件时间流要使用水印操作!
2. 时区特性
2.1 TimeStamp 和 TimeStamp_LTZ 区别
TIMESTAMP
:在Flink中,TIMESTAMP
类型相当于一个字符串类型。无论作业的时区如何变化,得到的字符串都是不变的。TIMESTAMP_LTZ
:全球统一的时间点类型,其底层实现是Bigint
类型。当将其转换为字符串时,结果会根据作业时区改变。
案例演示:
# 创建一个视图,两个时间:TO_TIMESTAMP_LTZ(4001, 3),TIMESTAMP '1970-01-01 00:00:01.001'
Flink SQL> CREATE VIEW MyView2 AS SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz, TIMESTAMP '1970-01-01 00:00:01.001' AS ntz;# 描述视图结构
Flink SQL> DESC MyView2;
情况一:无明显变化
# 设置为格林尼治时间
Flink SQL> SET table.local-time-zone=UTC;
Flink SQL> SELECT * FROM MyView2;
情况二:设置上海时间,LTZ 时间动态变化,NTZ 时间不变
Flink SQL> SET table.local-time-zone=Asia/Shanghai;
Flink SQL> SELECT * FROM MyView2;
2.2 代码中设置时间
EnvironmentSettings envSetting = EnvironmentSettings.newInstance().build();
TableEnvironment tEnv = TableEnvironment.create(envSetting);// 设置为 UTC 时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));// 设置为上海时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));// 设置为 Los_Angeles 时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));
2.3 时间属性和时区
2.3.1 时间新特性
注意:在 Flink1.13 之前, PROCTIME() 函数返回类型是 TIMESTAMP , 返回值是UTC时区下的 TIMESTAMP 。 例如:当上海的时间为 2021-03-01 12:00:00 时, PROCTIME() 显示的时间却是错误的 2021-03-01 04:00:00 。 这个问题在 Flink 1.13 中修复了, 因此用户不用再去处理时区的问题了。
- 意思是,proctime()不用设置上海时间也能返回,但是为了保险起见,可以设置想要的时间!
2.3.2 Socket 数据源案例
需求:创建表从 Socket 获取数据,分别设置 UTC 时区 和 Asia/Shanghai 时区查看时间字段的变化
需要 jar 包:ChangelogSocketExample.jar
(我的下载区准备好了哦,需要的可以自行下载!)
# 创建 socket 数据表
Flink SQL> CREATE TABLE MyTable1 (item STRING,price DOUBLE,proctime as PROCTIME()) WITH ('connector' = 'socket','hostname' = 'node1','port' = '9999','format' = 'csv');
3. 时态表
3.1 版本表和普通表
- 版本表:能够记录访问历史版本,来自数据库的 changelog 可以定义为版本表!
Flink 使用主键约束和事件时间来定义一张版本表和版本视图。仅 Blink planner 支持此功能。
- 普通表:只能记录访问最新版本,HBase 的表可以定义为普通表!
3.2 时态表函数
时态表函数和时态表 DDL 最大的区别在于:
时态表 DDL 可以在纯 SQL 环境中使用但是时态表函数不支持,用时态表 DDL 声明的时态表支持 changelog 流和 append-only 流但时态表函数仅支持 append-only 流,没有涉及到主键约束。
3.2.1 批的方式时态表
例子:订单表跟着汇率表变化!
package cn.itcast.day02.Temproal;/*** @author lql* @time 2024-03-16 12:39:11* @description TODO*/
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;import static org.apache.flink.table.api.Expressions.*;/*** 使用时态表函数计算订单金额(批的方式实现)* 需要两个流数据:* 1)订单流* 2)汇率流* 其中汇率流的数据使用时态表函数进行关联*/
public class TemporalTablesFunctionBatch {public static void main(String[] args) throws Exception {// Todo 1) 构建表环境// 1.1 构建流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.2 构建 settings 环境EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// 1.3 构建表环境StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bsSettings);// Todo 2) 构建数据源// 2.1 订单流List<Tuple3<Double, String, Long>> orderList = new ArrayList<>();orderList.add(new Tuple3<>(7D, "Euro", 2L)); //欧元orderList.add(new Tuple3<>(7D, "US Dollar", 3L)); //美元orderList.add(new Tuple3<>(0.05D, "Yen", 4L)); //人民币orderList.add(new Tuple3<>(8D, "Euro", 5L));//欧元SingleOutputStreamOperator<Tuple3<Double, String, Long>> orderStream = env.fromCollection(orderList).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<Double, String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Double, String, Long>>() {@Overridepublic long extractTimestamp(Tuple3<Double, String, Long> element, long l) {return element.f2 * 1000L;}}));// 2.2 汇率流List<Tuple3<String, Integer, Long>> rateList = new ArrayList<>();rateList.add(new Tuple3<>("US Dollar", 102, 1L));rateList.add(new Tuple3<>("Euro", 114, 1L));rateList.add(new Tuple3<>("Yen", 1, 1L));rateList.add(new Tuple3<>("Euro", 116, 5L));rateList.add(new Tuple3<>("Euro", 117, 7L));SingleOutputStreamOperator<Tuple3<String, Integer, Long>> rateStream = env.fromCollection(rateList).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Long>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, Long> element, long l) {return element.f2 * 1000L;}}));// Todo 3) 将两个流转化为表,记得指定事件时间Table orderTable = tabEnv.fromDataStream(orderStream, $("amount"), $("currency"), $("rowtime").rowtime());Table rateTable = tabEnv.fromDataStream(rateStream, $("currency"), $("rate"), $("rowtime").rowtime());// Todo 4) 将表对象注册成视图/表tabEnv.createTemporaryView("Orders", orderTable);tabEnv.createTemporaryView("RatesHistory", rateTable);// Todo 5) 将汇率表定义为时态函数:指定事件时间,动态的字段// 先定义时态函数,再注册到表环境中TemporalTableFunction temporalTableFunction = rateTable.createTemporalTableFunction($("rowtime"), $("currency"));tabEnv.createTemporaryFunction("Rates",temporalTableFunction);// Todo 6) 关联查询Table result = tabEnv.sqlQuery(" SELECT o.currency, o.amount, r.rate, \n" +" o.amount * r.rate AS yen_amount \n" +" FROM \n" +" Orders AS o, \n" +" LATERAL TABLE (Rates(o.rowtime)) AS r \n" +" WHERE r.currency = o.currency");//todo 7)查询打印tabEnv.toAppendStream(result, Row.class).printToErr();env.execute();}
}
结果:
+I[US Dollar, 7.0, 102, 714.0]
+I[Yen, 0.05, 1, 0.05]
+I[Euro, 7.0, 114, 798.0]
+I[Euro, 8.0, 116, 928.0]
总结:
- 1- 两个流需要指定水印
- 2- 流转化为表对象的时候,需要指定事件时间
- 3- 将变化的表定义为时态函数,再注册到表环境中
- 4- 关联查询,时态表函数中,变化的表要用 LATERAL TABLE
3.2.2 流的方式时态表
例子:从Kafka消费事件流(browse_event)和商品流(product_history_info)数据,并根据事件流中的商品id关联商品流的数据。
事件流:
{"userID": "user_001", "eventTime": "2021-01-01 00:00:00", "eventType": "browse", "productID": "product_005"}
{"userID": "user_001", "eventTime": "2021-01-01 00:00:01", "eventType": "browse", "productID": "product_005"}
{"userID": "user_001", "eventTime": "2021-01-01 00:00:02", "eventType": "browse", "productID": "product_005"}
{"userID": "user_001", "eventTime": "2021-01-01 00:00:03", "eventType": "browse", "productID": "product_005"}
{"userID": "user_001", "eventTime": "2021-01-01 00:00:04", "eventType": "browse", "productID": "product_005"}
{"userID": "user_001", "eventTime": "2021-01-01 00:00:05", "eventType": "browse", "productID": "product_005"}
{"userID": "user_001", "eventTime": "2021-01-01 00:00:06", "eventType": "browse", "productID": "product_005"}
{"userID": "user_002", "eventTime": "2021-01-01 00:00:01", "eventType": "browse", "productID": "product_003"}
{"userID": "user_002", "eventTime": "2021-01-01 00:00:02", "eventType": "browse", "productID": "product_003"}
{"userID": "user_002", "eventTime": "2021-01-01 00:00:05", "eventType": "browse", "productID": "product_003"}
{"userID": "user_002", "eventTime": "2021-01-01 00:00:06", "eventType": "browse", "productID": "product_003"}
商品流:
{"productID":"product_005","productName":"苹果电脑","productCategory":"电脑","updatedAt":"2021-01-01 00:00:00", "productPrice": 20}
{"productID":"product_005","productName":"苹果电脑","productCategory":"电脑","updatedAt":"2021-01-01 00:00:02", "productPrice": 30}
{"productID":"product_005","productName":"苹果电脑","productCategory":"电脑","updatedAt":"2021-01-01 00:00:05", "productPrice": 40}
{"productID":"product_003","productName":"华为手机","productCategory":"手机","updatedAt":"2021-01-01 00:00:02", "productPrice": 20}
{"productID":"product_003","productName":"华为手机","productCategory":"手机","updatedAt":"2021-01-01 00:00:05", "productPrice": 30}
代码:解析两个 json 数据源,两张表的拉宽操作
package cn.itcast.day02.Temproal;/*** @author lql* @time 2024-03-16 13:50:43* @description TODO*/import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;import static org.apache.flink.table.api.Expressions.*;/*** 使用时态表函数对访问的商品信息进行拉宽操作(流的方式实现)* 需要两个流数据:* 1)商品访问事件流* 2)商品基础信息流* 商品访问事件流中的商品id与商品基础信息流的数据进行拉宽操作使用时态表函数进行关联*/
public class TemporalTablesFunctionStreaming {public static void main(String[] args) throws Exception {// Todo 1) 构建表环境// 1.1 构建流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.2 构建 settings 环境EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// 1.3 构建表环境StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bsSettings);// Todo 2) 构建数据源//定义访问的kafka集群地址String kafkaBootstrapServers = "node1:9092";//定义访问事件流的topicString browseTopic = "browseTopic2";//定义商品基础信息流的topicString productInfoTopic = "productHistoryInfoTopic2";//定义访问事件流的消费者组idString browseTopicGroupID = "browseTopicGroupID_002";//定义访问商品基础信息流的消费者组idString productInfoTopicGroupID = "productInfoTopicGroupID_002";// 2.1 构建访问事件流的数据源//注意: 为了在北京时间和时间戳之间有直观的认识,这里的UserBrowseLog中增加了一个字段eventTimeTimestamp作为eventTime的时间戳Properties browseProperties = new Properties();browseProperties.put("bootstrap.servers", kafkaBootstrapServers);browseProperties.put("group.id", browseTopicGroupID);browseProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");browseProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");DataStreamSource<String> browseStream = env.addSource(new FlinkKafkaConsumer<>(browseTopic, new SimpleStringSchema(), browseProperties));browseStream.print("事件流原始数据>>>");SingleOutputStreamOperator<UserBrowseLog> browseWatermarkStream = browseStream.process(new BrowseKafkaProcessFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<UserBrowseLog>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<UserBrowseLog>() {@Overridepublic long extractTimestamp(UserBrowseLog element, long l) {// 这里不用转化为毫秒时间戳,因为json转化为java bean的时候已经指定过了return element.getEventTimeTimestamp();}}));browseWatermarkStream.print("事件流水印数据>>>");// 2.1 构建商品信息流的数据源Properties productInfoProperties = new Properties();productInfoProperties.put("bootstrap.servers", kafkaBootstrapServers);productInfoProperties.put("group.id", productInfoTopicGroupID);productInfoProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");productInfoProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");DataStreamSource<String> productInfoStream = env.addSource(new FlinkKafkaConsumer<>(productInfoTopic, new SimpleStringSchema(), productInfoProperties));SingleOutputStreamOperator<ProductInfo> productInfoWatermarkStream = productInfoStream.process(new ProductInfoProcessFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<ProductInfo>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<ProductInfo>() {@Overridepublic long extractTimestamp(ProductInfo element, long l) {return element.getUpdatedAtTimestamp();}}));productInfoWatermarkStream.printToErr("商品流水印数据>>>");// Todo 3) 将流转化为表Table table_brow = tabEnv.fromDataStream(browseStream, $("userID"), $("eventTime"),$("eventType"), $("productID"), $("eventTimeTimestamp"), $("browseRowtime").rowtime());Table table_product = tabEnv.fromDataStream(productInfoStream, $("productID"), $("productName"),$("productCategory"), $("updatedAt"), $("updatedAtTimestamp"), $("productPrice"), $("productInfoRowtime").rowtime());// Todo 4) 将表注册成视图tabEnv.createTemporaryView("browse",table_brow);tabEnv.createTemporaryView("productInfo",table_product);//todo 6)使用sql的方式连接两张表TemporalTableFunction productInfoFunction = tabEnv.scan("productInfo").createTemporalTableFunction($("productInfoRowtime"), $("productID"));tabEnv.createTemporaryFunction("productInfoFunc", productInfoFunction);String sql = ""+ "SELECT "+ "browse.userID, "+ "browse.eventTime, "+ "browse.eventTimeTimestamp, "+ "browse.eventType, "+ "browse.productID, "+ "productInfo.productID, "+ "productInfo.productName, "+ "productInfo.productCategory, "+ "productInfo.productPrice, "+ "productInfo.updatedAt, "+ "productInfo.updatedAtTimestamp "+ "FROM "+ " browse, "+ " LATERAL TABLE (productInfoFunc(browse.browseRowtime)) as productInfo "+ "WHERE "+ " browse.productID=productInfo.productID";//todo 7)执行sql查询操作Table table = tabEnv.sqlQuery(sql);tabEnv.toAppendStream(table, Row.class).print();//执行env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class UserBrowseLog implements Serializable {private String userID;private String eventTime;private String eventType;private String productID;private Long eventTimeTimestamp;}@Data@AllArgsConstructor@NoArgsConstructorpublic static class ProductInfo implements Serializable {//产品idprivate String productID;//产品名称private String productName;//产品类型private String productCategory;//更新时间private String updatedAt;//更新时间戳private Long updatedAtTimestamp;private double productPrice;}private static class BrowseKafkaProcessFunction extends ProcessFunction<String,UserBrowseLog> {@Overridepublic void processElement(String value, Context context, Collector<UserBrowseLog> collector) throws Exception {UserBrowseLog log = JSON.parseObject(value,UserBrowseLog.class);//增加一个long类型的时间戳DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+8:00"));//将事件时间转换成毫秒的时间戳返回long eventTimestamp = eventTime.toInstant().toEpochMilli();log.setEventTimeTimestamp(eventTimestamp);collector.collect(log);}}private static class ProductInfoProcessFunction extends ProcessFunction<String,ProductInfo>{@Overridepublic void processElement(String value, Context context, Collector<ProductInfo> collector) throws Exception {ProductInfo log = JSON.parseObject(value, ProductInfo.class);//增加一个long类型的时间戳DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");OffsetDateTime eventTime = LocalDateTime.parse(log.getUpdatedAt(), format).atOffset(ZoneOffset.of("+8:00"));//将事件时间转换成毫秒的时间戳返回long eventTimestamp = eventTime.toInstant().toEpochMilli();log.setUpdatedAtTimestamp(eventTimestamp);collector.collect(log);}}
}
总结:将事件时间转化为毫秒级时间戳,添加字段,记得加上 8 小时
3.3时态表 Join VS 双流 Join
都可以管理 State;时态表 JOIN是单边驱动,是被动的查询;而双流JOIN是双边驱动,两边都是主动的进行JOIN计算。
3.3.1 基于处理时间 | 事件时间的时态 Join
语法:
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
3.3.2 案例演示
rateOrder 数据:
1,29,RMB,2021-01-02 00:00:00
2,19,RMB,2021-01-03 00:00:00
3,33,RMB,2021-01-11 00:00:00
4,55,RMB,2021-01-21 00:00:00
rateHistory 数据:
RMB,114,2021-01-01 00:00:00
RMB,115,2021-01-03 00:00:00
RMB,116,2021-01-19 00:00:00
Euro,119,2021-01-03 00:00:00
USD,99,2021-01-03 00:00:00
USD,100,2021-01-03 00:00:00
Euro,118,2021-01-03 00:00:00
代码:订单表和汇率表,将汇率表设置成时态表,用户根据订单表中的下单时间 Join 下单时的汇率表当时最新的维度数据
package cn.itcast.day02.Temproal;/*** @author lql* @time 2024-03-16 15:32:44* @description TODO*/import cn.itcast.day01.example.DataStreamToTable;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** 需求描述* 订单表和汇率表,将汇率表设置成时态表,这样用户就可以根据订单表中的下单时间Join下单时的汇率表当时最新的维度数据*/
public class TemporalTableJoinEventTime {public static void main(String[] args) throws Exception {// Todo 1) 构建表环境// 1.1 构建流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.2 构建 settings 环境EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// 1.3 构建表环境StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bsSettings);//todo 2)加载数据String rateOrderPath = TemporalTableJoinEventTime.class.getClassLoader().getResource("rateOrder.csv").getPath();String rateHistoryPath = TemporalTableJoinEventTime.class.getClassLoader().getResource("rateHistory.csv").getPath();String sqlDDL = "create table rateOrder (" +" order_id String," +" `price` DECIMAL(32,2)," +" currency String," +" order_time TIMESTAMP(3)," +" WATERMARK FOR order_time as order_time" +" ) with (" +" 'connector' = 'filesystem'," +" 'path' = 'file:///"+rateOrderPath+"'," +" 'format' = 'csv'" +" )";tabEnv.executeSql(sqlDDL);sqlDDL = "create table rateHistory (" +" currency String," +" `conversion_rate` DECIMAL(32,2)," +" update_time TIMESTAMP(3)," +" PRIMARY KEY (currency) NOT ENFORCED," +" WATERMARK FOR update_time as update_time" +" ) with (" +" 'connector' = 'filesystem'," +" 'path' = 'file:///"+rateHistoryPath+"'," +" 'format' = 'csv'" +" )";tabEnv.executeSql(sqlDDL);String sql = "select order_id," +" price," +" rateOrder.currency," +" conversion_rate," +" order_time" +" from rateOrder" +" left join rateHistory for system_time as of rateOrder.order_time" +" on rateOrder.currency=rateHistory.currency";Table result = tabEnv.sqlQuery(sql);tabEnv.toAppendStream(result, Row.class).print();env.execute();}
}
结果:
+I[1, 29.00, RMB, 114.00, 2021-01-02T00:00]
+I[2, 19.00, RMB, 115.00, 2021-01-03T00:00]
+I[3, 33.00, RMB, 115.00, 2021-01-11T00:00]
+I[4, 55.00, RMB, 116.00, 2021-01-21T00:00]
总结:
- 1- 两个表都需要设置水印时间
- 2- 时态表需要定义主键约束和事件时间
- 3- 在关联查询时,任意表去关联时态表,使用 system_time!
3.4 Lookup Join
Lookup join通常用于使用从外部系统查询的数据来丰富表。连接要求一个表具有处理时间属性,另一个表由查找源连接器支持。
实例:右表是 MYSQL 数据源
CREATE TEMPORARY TABLE Customers (id INT,name STRING,country STRING,zip STRING
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://mysqlhost:3306/customerdb?characterEncoding=utf-8&useSSL=false','table-name' = 'customers'
);-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS oJOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id = c.id;