Flink SQL 基础概念:数据类型
- 1.原子数据类型
- 1.1 字符串类型
- 1.2 二进制字符串类型
- 1.3 精确数值类型
- 1.4 有损精度数值类型
- 1.5 布尔类型:BOOLEAN
- 1.6 日期、时间类型
- 2.复合数据类型
- 3.用户自定义数据类型
Flink SQL 内置了很多常见的数据类型,并且也为用户提供了自定义数据类型的能力。
总共包含 3 部分:
- 原子 数据类型
- 复合 数据类型
- 用户自定义 数据类型
1.原子数据类型
1.1 字符串类型
CHAR
、CHAR(n)
:定长字符串,就和 Java 中的 Char 一样,n
代表字符的定长,取值范围 [ 1 , 2147483647 ] [1, 2147483647] [1,2147483647]。如果不指定n
,则默认为 1。VARCHAR
、VARCHAR(n)
、STRING
:可变长字符串,就和 Java 中的 String 一样,n
代表字符的最大长度,取值范围 [ 1 , 2147483647 ] [1, 2147483647] [1,2147483647]。如果不指定n
,则默认为 1。STRING
等同于VARCHAR(2147483647)
。
1.2 二进制字符串类型
BINARY
、BINARY(n)
:定长二进制字符串,n
代表定长,取值范围 [ 1 , 2147483647 ] [1, 2147483647] [1,2147483647]。如果不指定n
,则默认为 1。VARBINARY
、VARBINARY(n)
、BYTES
:可变长二进制字符串,n
代表字符的最大长度,取值范围 [ 1 , 2147483647 ] [1, 2147483647] [1,2147483647]。如果不指定n
,则默认为 1。BYTES
等同于VARBINARY(2147483647)
。
1.3 精确数值类型
DECIMAL
、DECIMAL(p)
、DECIMAL(p, s)
、DEC
、DEC(p)
、DEC(p, s)
、NUMERIC
、NUMERIC(p)
、NUMERIC(p, s)
:固定长度和精度的数值类型,就和 Java 中的BigDecimal
一样,p
代表 数值位数(长度),取值范围 [ 1 , 38 ] [1, 38] [1,38];s
代表 小数点后的位数(精度),取值范围 [ 0 , p ] [0, p] [0,p]。如果不指定,p
默认为 10,s
默认为 0。TINYINT
: − 128 -128 −128 到 127 127 127 的 1 字节大小的有符号整数,就和 Java 中的byte
一样。SMALLINT
: − 32768 -32768 −32768 到 32767 32767 32767 的 2 字节大小的有符号整数,就和 Java 中的short
一样。INT
、INTEGER
: − 2147483648 -2147483648 −2147483648 到 2147483647 2147483647 2147483647 的 4 字节大小的有符号整数,就和 Java 中的int
一样。BIGINT
: − 9223372036854775808 -9223372036854775808 −9223372036854775808 到 9223372036854775807 9223372036854775807 9223372036854775807 的 8 字节大小的有符号整数,就和 Java 中的long
一样。
1.4 有损精度数值类型
FLOAT
:4 字节大小的单精度浮点数值,就和 Java 中的float
一样。DOUBLE
、DOUBLE PRECISION
:8 字节大小的双精度浮点数值,就和 Java 中的double
一样。
关于 FLOAT
和 DOUBLE
的区别可见 https://www.runoob.com/w3cnote/float-and-double-different.html
1.5 布尔类型:BOOLEAN
NULL
类型:NULL。Raw
类型:RAW('class', 'snapshot')
。只会在数据发生网络传输时进行序列化,反序列化操作,可以保留其原始数据。以 Java 举例,class
参数代表具体对应的 Java 类型,snapshot
代表类型在发生网络传输时的序列化器。
1.6 日期、时间类型
DATE
:由 年-月-日 组成的 不带时区含义 的日期类型,取值范围 [0000-01-01, 9999-12-31]TIME
、TIME(p)
:由 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的时间数据类型,精度高达纳秒,取值范围 [00:00:00.000000000, 23:59:59.9999999]。其中p
代表小数秒的位数,取值范围 [ 0 , 9 ] [0, 9] [0,9],如果不指定p
,默认为 0。TIMESTAMP
、TIMESTAMP(p)
、TIMESTAMP WITHOUT TIME ZONE
、TIMESTAMP(p) WITHOUT TIME ZONE
:由 年-月-日 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000, 9999-12-31 23:59:59.999999999]。其中p
代表小数秒的位数,取值范围 [ 0 , 9 ] [0, 9] [0,9],如果不指定p
,默认为 6。TIMESTAMP WITH TIME ZONE
、TIMESTAMP(p) WITH TIME ZONE
:由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中p
代表小数秒的位数,取值范围 [ 0 , 9 ] [0, 9] [0,9],如果不指定p
,默认为 6。TIMESTAMP_LTZ
、TIMESTAMP_LTZ(p)
:由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中p
代表小数秒的位数,取值范围 [ 0 , 9 ] [0, 9] [0,9],如果不指定p
,默认为 6。TIMESTAMP_LTZ
与TIMESTAMP WITH TIME ZONE
的区别在于:TIMESTAMP WITH TIME ZONE
的时区信息是携带在数据中的,举例:其输入数据应该是 2022-01-01 00:00:00.000000000 +08:00;TIMESTAMP_LTZ
的时区信息不是携带在数据中的,而是由 Flink SQL 任务的全局配置决定的,我们可以由table.local-time-zone
参数来设置时区。
INTERVAL YEAR TO MONTH
、INTERVAL DAY TO SECOND
:INTERVAL
的涉及到的种类比较多。INTERVAL
主要是用于给TIMESTAMP
、TIMESTAMP_LTZ
添加偏移量的。举例,比如给TIMESTAMP
加、减几天、几个月、几年。INTERVAL
子句总共涉及到的语法种类如下 Flink SQL 案例所示。
CREATE TABLE sink_table (result_interval_year TIMESTAMP(3),result_interval_year_p TIMESTAMP(3),result_interval_year_p_to_month TIMESTAMP(3),result_interval_month TIMESTAMP(3),result_interval_day TIMESTAMP(3),result_interval_day_p1 TIMESTAMP(3),result_interval_day_p1_to_hour TIMESTAMP(3),result_interval_day_p1_to_minute TIMESTAMP(3),result_interval_day_p1_to_second_p2 TIMESTAMP(3),result_interval_hour TIMESTAMP(3),result_interval_hour_to_minute TIMESTAMP(3),result_interval_hour_to_second TIMESTAMP(3),result_interval_minute TIMESTAMP(3),result_interval_minute_to_second_p2 TIMESTAMP(3),result_interval_second TIMESTAMP(3),result_interval_second_p2 TIMESTAMP(3)
) WITH ('connector' = 'print'
);
INSERT INTO sink_table
SELECT-- Flink SQL 支持的所有 INTERVAL 子句如下,总体可以分为 `年-月`、`日-小时-秒` 两种-- 1. 年-月。取值范围为 [-9999-11, +9999-11]-- 其中 p 是指有效位数,取值范围 [1, 4],默认值为 2。比如如果值为 1000,但是 p = 2,则会直接报错。-- INTERVAL YEARf1 + INTERVAL '10' YEAR as result_interval_year,-- INTERVAL YEAR(p)f1 + INTERVAL '100' YEAR(3) as result_interval_year_p,-- INTERVAL YEAR(p) TO MONTHf1 + INTERVAL '10-03' YEAR(3) TO MONTH as result_interval_year_p_to_month,-- INTERVAL MONTHf1 + INTERVAL '13' MONTH as result_interval_month,-- 2. 日-小时-秒。取值范围为 [-999999 23:59:59.999999999, +999999 23:59:59.999999999]-- 其中 p1/p2 都是有效位数,p1 取值范围 [1, 6],默认值为 2;p2 取值范围 [0, 9],默认值为 6-- INTERVAL DAYf1 + INTERVAL '10' DAY as result_interval_day,-- INTERVAL DAY(p1)f1 + INTERVAL '100' DAY(3) as result_interval_day_p1,-- INTERVAL DAY(p1) TO HOURf1 + INTERVAL '10 03' DAY(3) TO HOUR as result_interval_day_p1_to_hour,-- INTERVAL DAY(p1) TO MINUTEf1 + INTERVAL '10 03:12' DAY(3) TO MINUTE as result_interval_day_p1_to_minute,-- INTERVAL DAY(p1) TO SECOND(p2)f1 + INTERVAL '10 00:00:00.004' DAY TO SECOND(3) as result_interval_day_p1_to_second_p2,-- INTERVAL HOURf1 + INTERVAL '10' HOUR as result_interval_hour,-- INTERVAL HOUR TO MINUTEf1 + INTERVAL '10:03' HOUR TO MINUTE as result_interval_hour_to_minute,-- INTERVAL HOUR TO SECOND(p2)f1 + INTERVAL '00:00:00.004' HOUR TO SECOND(3) as result_interval_hour_to_second,-- INTERVAL MINUTEf1 + INTERVAL '10' MINUTE as result_interval_minute,-- INTERVAL MINUTE TO SECOND(p2)f1 + INTERVAL '05:05.006' MINUTE TO SECOND(3) as result_interval_minute_to_second_p2,-- INTERVAL SECONDf1 + INTERVAL '3' SECOND as result_interval_second,-- INTERVAL SECOND(p2)f1 + INTERVAL '300' SECOND(3) as result_interval_second_p2
FROM (SELECT TO_TIMESTAMP_LTZ(1640966476500, 3) as f1)
2.复合数据类型
- 数组类型:
ARRAY<t>
、t ARRAY
。数组最大长度为 2147483647 2147483647 2147483647。t
代表数组内的数据类型。举例ARRAY<INT>
、ARRAY<STRING>
,其等同于INT ARRAY
、STRING ARRAY
。 - Map 类型:
MAP<kt, vt>
。Map 类型就和 Java 中的 Map 类型一样,key
是没有重复的。举例Map<STRING, INT>
、Map<BIGINT, STRING>
。 - 集合类型:
MULTISET<t>
、t MULTISET
。就和 Java 中的 List 类型,一样,运行重复的数据。举例MULTISET<INT>
,其等同于INT MULTISET
。 - 对象类型:
ROW<n0 t0, n1 t1, ...>
、ROW<n0 t0 'd0', n1 t1 'd1', ...>
、ROW(n0 t0, n1 t1, ...)
、ROW(n0 t0 'd0', n1 t1 'd1', ...)
。其中,n
是字段的唯一名称,t
是字段的逻辑类型,d
是字段的描述。就和 Java 中的自定义对象一样。举例:ROW(myField INT, myOtherField BOOLEAN)
,其等同于ROW<myField INT, myOtherField BOOLEAN>
3.用户自定义数据类型
用户自定义类型就是运行用户使用 Java 等语言自定义一个数据类型出来。但是目前数据类型不支持使用 CREATE TABLE 的 DDL 进行定义,只支持作为函数的输入输出参数。如下案例:
- 第一步,自定义数据类型
public class User {// 1. 基础类型,Flink 可以通过反射类型信息自动把数据类型获取到// 关于 SQL 类型和 Java 类型之间的映射见:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/types/#data-type-extractionpublic int age;public String name;// 2. 复杂类型,用户可以通过 @DataTypeHint("DECIMAL(10, 2)") 注解标注此字段的数据类型public @DataTypeHint("DECIMAL(10, 2)") BigDecimal totalBalance;
}
- 第二步,在 UDF 中使用此数据类型
public class UserScalarFunction extends ScalarFunction {// 1. 自定义数据类型作为输出参数public User eval(long i) {if (i > 0 && i <= 5) {User u = new User();u.age = (int) i;u.name = "name1";u.totalBalance = new BigDecimal(1.1d);return u;} else {User u = new User();u.age = (int) i;u.name = "name2";u.totalBalance = new BigDecimal(2.2d);return u;}}// 2. 自定义数据类型作为输入参数public String eval(User i) {if (i.age > 0 && i.age <= 5) {User u = new User();u.age = 1;u.name = "name1";u.totalBalance = new BigDecimal(1.1d);return u.name;} else {User u = new User();u.age = 2;u.name = "name2";u.totalBalance = new BigDecimal(2.2d);return u.name;}}
}
- 第三步,在 Flink SQL 中使用
-- 1. 创建 UDF
CREATE FUNCTION user_scalar_func AS 'flink.examples.sql._12_data_type._02_user_defined.UserScalarFunction';-- 2. 创建数据源表
CREATE TABLE source_table (user_id BIGINT NOT NULL COMMENT '用户 id'
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.user_id.min' = '1','fields.user_id.max' = '10'
);-- 3. 创建数据汇表
CREATE TABLE sink_table (result_row_1 ROW<age INT, name STRING, totalBalance DECIMAL(10, 2)>,result_row_2 STRING
) WITH ('connector' = 'print'
);-- 4. SQL 查询语句
INSERT INTO sink_table
select-- 4.a. 用户自定义类型作为输出user_scalar_func(user_id) as result_row_1,-- 4.b. 用户自定义类型作为输出及输入user_scalar_func(user_scalar_func(user_id)) as result_row_2
from source_table;-- 5. 查询结果
+I[+I[9, name2, 2.20], name2]
+I[+I[1, name1, 1.10], name1]
+I[+I[5, name1, 1.10], name1]
参考:《Data Types | Apache Flink》