什么是维表join?
对于每条流式数据,可以关联一个外部维表数据源,为FlinkSql实时计算提供数据关联查询。
说明: 维表是一张不断变化的表,在维表JOIN时,需指明该条记录关联维表快照的时刻。维表JOIN仅支持对当前时刻维表快照的关联,未来会支持关联左表proctime或rowtime所对应的维表快照。
维表join语法:
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
测试语句:
kafka事实表:
// 2. 定义输入表,从Kafka消费数据tableEnv.executeSql("CREATE TABLE sourceTable (\n" +" `user_id` STRING,\n" +" `item_id` INTEGER,\n" +" `behavior` STRING,\n" +" `ts` STRING,\n" +" `body` ROW<id STRING,name STRING,code STRING> ,\n" +"`proctime` as PROCTIME()"+") WITH (\n" +" 'connector' = 'kafka',\n" +" 'topic' = 'test-data',\n" +" 'properties.bootstrap.servers' = '127.0.0.1:9092',\n" +" 'properties.group.id' = 'test-data',\n" +" 'scan.startup.mode' = 'latest-offset',\n" +" 'format' = 'json', \n" +//字段丢失任务不失败" 'json.fail-on-missing-field' = 'true',\n"+//-- 解析失败跳过" 'json.ignore-parse-errors' = 'false' \n" + ")");
mysql维表:
tableEnv.executeSql("CREATE TABLE dim_province (\n" +" province_id BIGINT,\n" +" province_name VARCHAR,\n" +" region_name VARCHAR \n" +" ) WITH (\n" +" 'connector.type' = 'jdbc',\n" +" 'connector.url' = 'jdbc:mysql://localhost:3306/sms?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',\n" +" 'connector.table' = 'dim_province',\n" +" 'connector.username' = 'root',\n" +" 'connector.password' = 'root'\n" +
// " 'connector.lookup.cache.max-rows' = '1',\n" +
// " 'connector.lookup.cache.ttl' = '5s'\n" +" )");
执行查询:
tableEnv.executeSql("select sourceTable.item_id,sourceTable.ts,dim_province.province_name,sourceTable.proctime" +"" +" from sourceTable join dim_province " +" FOR SYSTEM_TIME AS OF sourceTable.proctime \n"+"ON sourceTable.item_id = dim_province.province_id").print();
结果如示:
+----+-------------+--------------------------------+--------------------------------+-------------------------+
| op | item_id | ts | province_name | proctime |
+----+-------------+--------------------------------+--------------------------------+-------------------------+
| +I | 2 | 1690786451861 | 222 | 2023-07-31 15:07:49.673 |
| +I | 2 | 1690786451861 | 222 | 2023-07-31 15:08:33.763 |
| +I | 3 | 1690786451861 | 333 | 2023-07-31 15:09:04.121 |
| +I | 2 | 1690786451861 | 222222 | 2023-07-31 15:09:45.225 |