Impala性能稍领先于presto,但是presto在数据源支持上非常丰富,包括hive、图数据库、传统关系型数据库、Redis等
缺点:这两种对hbase支持的都不好,presto 不支持,但是对hdfs、hive兼容性很好,其实这也是顺理成章的,所以数据源的处理很重要,针对hbase的二级索引查询可以用phoenix,效果也不错
Impala的优缺点
1.2.1 优点
1)基于内存运算,不需要把中间结果写入磁盘,省掉了大量的I/O开销。
2)无需转换为Mapreduce,直接访问存储在HDFS,HBase中的数据进行作业调度,速度快。
3)使用了支持Data locality的I/O调度机制,尽可能地将数据和计算分配在同一台机器上进行,减少了网络开销。
4)支持各种文件格式,如TEXTFILE 、SEQUENCEFILE 、RCFile、Parquet。
5)可以访问hive的metastore,对hive数据直接做数据分析。
1.2.2 缺点
1)对内存的依赖大,且完全依赖于hive。
2)实践中,分区超过1万,性能严重下降。
3)只能读取文本文件,而不能直接读取自定义二进制文件。
每当新的记录/文件被添加到HDFS中的数据目录时,该表需要被刷新
Doris 的架构很简洁,只设 FE(Frontend)、BE(Backend)两种角色、两个进程,不依赖于
外部组件,方便部署和运维,FE、BE 都可线性扩展。
FE(Frontend):存储、维护集群元数据;负责接收、解析查询请求,规划查询计划, 调度查询执行,返回查询结果。
Leader 和 Follower:主要是用来达到元数据的高可用
Observer:用来扩展查询节点,同时起到元数据备份的作用,observer 不参与任何的写入,只参与读取
BE(Backend):负责物理数据的存储和计算;依据 FE 生成的物理计划,分布式地执行查询。
MySQL Client
Doris 借助 MySQL 协议,用户使用任意 MySQL 的 ODBC/JDBC 以及 MySQL 的客户
端,都可以直接访问 Doris。
Broker
Broker 为一个独立的无状态进程。封装了文件系统接口,提供 Doris 读取远端存储系统
中文件的能力,包括 HDFS,S3,BOS 等。
Partition & Tablet
数据首先被划分成若干个分区(Partition),划分的规则通
常是按照用户指定的分区列进行范围划分
。而在每个分区内,数据被进一
步的按照 Hash 的方式分桶,分桶的规则是要找用户指定的分桶列的值进行 Hash 后分桶。
每个分桶就是一个数据分片(Tablet),也是数据划分的最小逻辑单元。
建表
Doris 支持支持单分区和复合分区两种建表方式。
复合分区:既有分区也有分桶
单分区:只做 HASH 分布,即只分桶。
HLL
1~16385 个字节 。hll 列类型,不需要指定长度和默认值、 长度根据数据的聚合 程度系统内控制,并且 HLL 列只能通过配套的 hll_union_agg 、 Hll_cardinality、hll_hash 进行查询或使用
BITMAP
bitmap 列类型,不需要指定长度和默 认值。表示整型的集合,元素最大支持 到 2^64 - 1
3.3.2.1 Range Partition
CREATE TABLE IF NOT EXISTS example_db.expamle_range_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用户 id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01
00:00:00" COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
ENGINE=olap
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY RANGE(`date`)
(
PARTITION `p201701` VALUES LESS THAN ("2017-02-01"),
PARTITION `p201702` VALUES LESS THAN ("2017-03-01"),
PARTITION `p201703` VALUES LESS THAN ("2017-04-01")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
PROPERTIES
(
"replication_num" = "3",
"storage_medium" = "SSD",
"storage_cooldown_time" = "2018-01-01 12:00:00"
);
分区的删除不会改变已存在分区的范围。删除分区可能出现空洞
3.3.2.2 List Partition
CREATE TABLE IF NOT EXISTS example_db.expamle_list_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用户 id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01
00:00:00" COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时
间"
)
ENGINE=olap
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY LIST(`city`)
(
PARTITION `p_cn` VALUES IN ("Beijing", "Shanghai", "Hong Kong"),
PARTITION `p_usa` VALUES IN ("New York", "San Francisco"),
PARTITION `p_jp` VALUES IN ("Tokyo")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
PROPERTIES
(
"replication_num" = "3",
"storage_medium" = "SSD",
"storage_cooldown_time" = "2018-01-01 12:00:00"
);
分区值为枚举值
不论分区列是什么类型,在写分区值时,都需要加双引号。
分桶列可以是多列,但必须为 Key 列
多列分区
Doris 支持指定多列作为分区列
PARTITION BY RANGE(`date`, `id`)
(
PARTITION `p201701_1000` VALUES LESS THAN ("2017-02-01", "1000"),
PARTITION `p201702_2000` VALUES LESS THAN ("2017-03-01", "2000"),
PARTITION `p201703_all` VALUES LESS THAN ("2017-04-01")
)
PARTITION BY LIST(`id`, `city`)
(
PARTITION `p1_city` VALUES IN (("1", "Beijing"), ("1", "Shanghai")),
PARTITION `p2_city` VALUES IN (("2", "Beijing"), ("2", "Shanghai")),
PARTITION `p3_city` VALUES IN (("3", "Beijing"), ("3", "Shanghai"))
)
AGGREGATE KEY 数据模型中,所有没有指定聚合方式(SUM、REPLACE、MAX、 MIN)的列视为 Key 列。而其余则为 Value 列。
Key 列必须在所有 Value 列之前。
在建表语句的最后 PROPERTIES 中,可以指定以下两个参数:
3.4.3.1 replication_num
每个 Tablet 的副本数量。默认为 3,建议保持默认即可。
Doris 中副本分布的
原则是,不允许同一个 Tablet 的副本分布在同一台物理机上
即使在同一台物理机上部署了 3 个或更多 BE 实例,如果这些 BE 的 IP 相同,则依然只
能设置副本数为 1
3.4.3.2 storage_medium & storage_cooldown_time
BE 的数据存储目录可以显式的指定为 SSD 或者 HDD(通过 .SSD 或者 .HDD 后缀
区分)。建
默认初始存储介质可通过 fe 的配置文件 fe.conf 中指定 default_storage_medium=xxx,
如果没有指定,则默认为 HDD。如果指定为 SSD,则数据初始存放在 SSD 上。
如果没有指定 storage_cooldown_time,则默认 30 天后,数据会从 SSD 自动迁移到 HDD
上。如果指定了 storage_cooldown_time,则在到达 storage_cooldown_time 时间后,数据才会
迁移。
ENGINE 的类型是 olap,即默认的 ENGINE 类型。在 Doris 中,只有这个ENGINE 类型是由 Doris 负责数据管理和存储的。其他 ENGINE 类型,如 mysql、broker、es 等等,本质上只是对外部其他数据库或系统中的表的映射,以保证 Doris 可以读取这些数据。
数据模型
Doris 的数据模型主要分为 3 类:Aggregate、Uniq、Duplicate
Aggregate 模型
CREATE TABLE IF NOT EXISTS test_db.example_site_visit2
(
`user_id` LARGEINT NOT NULL COMMENT "用户 id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`timestamp` DATETIME COMMENT "数据灌入时间,精确到秒",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时
间"
)
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 10;
分为 Key(维度列)和 Value(指标列)。
没有设置 AggregationType 的称为 Key,设置了 AggregationType 的称为 Value。
对于 Key 列相同的行会聚合成一行,而 Value 列会按照设置的 AggregationType 进行聚合。
SUM:求和
REPLACE:替代,下一批数据中的 Value 会替换之前导入过的行中的 Value。
MAX:保留最大值。
MIN:保留最小值。
数据的聚合,在 Doris 中有如下三个阶段发生:
(1)每一批次数据导入的 ETL 阶段。
(2)底层 BE 进行数据 Compaction 的阶段。该阶段,BE 会对已导入的不同批次的
数据进行进一步的聚合。
(3)数据查询阶段。在数据查询时,对于查询涉及到的数据,会进行对应的聚合。
Uniq 模型
CREATE TABLE IF NOT EXISTS test_db.user
(
`user_id` LARGEINT NOT NULL COMMENT "用户 id",
`username` VARCHAR(50) NOT NULL COMMENT "用户昵称",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`phone` LARGEINT COMMENT "用户电话",
`address` VARCHAR(500) COMMENT "用户地址",
`register_time` DATETIME COMMENT "用户注册时间"
)
UNIQUE KEY(`user_id`, `username`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 10;
保证 Key 的唯一性 获得 Primary Key 唯一性约束。
Duplicate 模型
在某些多维分析场景下,数据既没有主键,也没有聚合需求。Duplicate 数据模型可以
满足这类需求。数据完全按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据 完全相同,也都会保留。
而在建表语句中指定的 DUPLICATE KEY,只是用来指明底层数据按照那些列进行排序
CREATE TABLE IF NOT EXISTS test_db.example_log
(
`timestamp` DATETIME NOT NULL COMMENT "日志时间",
`type` INT NOT NULL COMMENT "日志类型",
`error_code` INT COMMENT "错误码",
`error_msg` VARCHAR(1024) COMMENT "错误详细信息",
`op_id` BIGINT COMMENT "负责人 id", s
`op_time` DATETIME COMMENT "处理时间"
)
DUPLICATE KEY(`timestamp`, `type`)
DISTRIBUTED BY HASH(`timestamp`) BUCKETS 10;
动态分区
分区列 time 类型为 DATE,创建一个动态分区规则。按天分区,只保留最近 7 天的分区,并且预先创建未来 3 天的分区。
create table student_dynamic_partition1
(id int,
time date,
name varchar(50),
age int
)
duplicate key(id,time)
PARTITION BY RANGE(time)()
DISTRIBUTED BY HASH(id) buckets 10
PROPERTIES(
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"replication_num" = "1"
);
建表时指定:
CREATE TABLE tbl1
(...)
PROPERTIES
(
"dynamic_partition.prop1" = "value1",
"dynamic_partition.prop2" = "value2",
...
)
运行时修改
ALTER TABLE tbl1 SET
(
"dynamic_partition.prop1" = "value1",
"dynamic_partition.prop2" = "value2",
...
)
Rollup
在 Doris 中,我们将用户通过建表语句创建出来的表称为 Base 表(Base Table)。Base
表中保存着按用户建表语句指定的方式存储的基础数据。
在 Base 表之上,我们可以创建任意多个 ROLLUP 表。这些 ROLLUP 的数据是基于 Base
表产生的,并且在物理上是独立存储的。
ROLLUP 表的基本作用,在于在 Base 表的基础上,获得更粗粒度的聚合数据。
比如需要查看某个用户的总消费,那么可以建立一个只有 user_id 和 cost 的 rollup
alter table example_site_visit2 add rollup rollup_cost_userid(user_id,cost);
SELECT user_id, sum(cost) FROM example_site_visit2 GROUP BY user_id;
Doris 会自动命中这个 ROLLUP 表,从而只需扫描极少的数据量,即可完成这次聚合查询。
因为 Duplicate 模型没有聚合的语意。所以该模型中的 ROLLUP,已经失去了“上卷” 这一层含义。而仅仅是作为调整列顺序,以命中前缀索引的作用。
是否命中 ROLLUP 完全由 Doris 系统自动决定
ROLLUP 的数据更新与 Base 表是完全同步的。
前缀索引
Doris 不支持在任意列上创建索引。
我们将一行数据的前 36 个字节 作为这行数据的前缀索引。当遇到 VARCHAR 类型时,前缀索引会直接截断。
因为建表时已经指定了列顺序,所以一个表只有一种前缀索引。这对于使用其他不能命中前缀索引的列作为条件进行的查询来说,效率上可能无法满足需求。
因此,我们可以通过创建 ROLLUP 来人为的调整列顺序。
物化视图
查询结果预先存储起来的特殊的表。
Rollup 具有一定的局限性,他不能基于明细模型做预聚合。
物化视图则在覆盖了 Rollup 的功能的同时,还能支持更丰富的聚合函数。所以物化视图其实是 Rollup 的一个超集。
create materialized view store_amt as
select store_id, sum(sale_amt)
from sales_records
group by store_id;
提交完创建物化视图的任务后,Doris就会异步在后台生成物化视图的数据,构建物化视图。
物化视图创建完成后,用户的查询会根据规则自动匹配到最优的物化视图
删除数据(Delete)
Doris 目前可以通过两种方式删除数据:DELETE FROM 语句和 ALTER TABLE DROP PARTITION 语句。
delete from student_kafka where id=1;
该语句只能针对 Partition 级别进行删除。如果一个表有多个 partition 含有需要 删除的数据,则需要执行多次针对不同 Partition 的 delete 语句。而如果是没有使用 Partition 的表,partition 的名称即表名。
where 后面的条件谓词只能针对 Key 列,并且谓词之间,只能通过 AND 连接。 如果想实现 OR 的语义,需要执行多条 delete。
数据的真正删除是在 BE 进行数据 Compaction 时进行的。所以执行完 delete 命 令后,并不会立即释放磁盘空间
数据导入
导入(Load)功能就是将用户的原始数据导入到 Doris 中。导入成功后,用户即可通过 Mysql 客户端查询数据。为适配不同的数据导入需求,Doris 系统提供了 6 种不同的导入方 式。每种导入方式支持不同的数据源,存在不同的使用方式(异步,同步)。
Broker Load
源数据在 Broker 可以访问的存储系统中,如 HDFS。 数据量在几十到百 GB 级别。
LOAD LABEL test_db.student_result
(
DATA INFILE("hdfs://my_cluster/student.csv")
INTO TABLE `student_result`
COLUMNS TERMINATED BY ","
FORMAT AS "csv"
(id, name, age, score)
)
WITH BROKER broker_name
(
#开启了 HA 的写法,其他 HDFS 参数可以在这里指定
"dfs.nameservices" = "my_cluster",
"dfs.ha.namenodes.my_cluster" = "nn1,nn2,nn3",
"dfs.namenode.rpc-address.my_cluster.nn1" = "hadoop1:8020",
"dfs.namenode.rpc-address.my_cluster.nn2" = "hadoop2:8020",
"dfs.namenode.rpc-address.my_cluster.nn3" = "hadoop3:8020",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
PROPERTIES
(
"timeout" = "3600"
);
每个导入任务,都有一个在单 database 内部唯一的 Label。Label 是 用户在导入命令中自定义的名称。通过这个 Label,用户可以查看对应导入任务的执行情况。
Label 的另一个作用,是防止用户重复导入相同的数据。强烈推荐用户同一批次数据使用相同的 label。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once 语 义
Stream Load
Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。
目前 Stream Load 支持两个数据格式:CSV(文本)和 JSON。
curl
--location-trusted
-u root
-H "label:123"
-H"column_separator:,"
-T student.csv
-X PUT
http://hadoop1:8030/api/test_db/student_result/_stream_load
3 Routine Load
当前仅支持从 Kafka 系统进行例行导入
CREATE ROUTINE LOAD test_db.kafka_test ON student_kafka
COLUMNS TERMINATED BY ",",
COLUMNS(id, name, age)
PROPERTIES
(
"desired_concurrent_number"="3",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list"= "hadoop1:9092,hadoop2:9092,hadoop3:9092",
"kafka_topic" = "test_doris1",
"property.group.id"="test_doris_group",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.enable.auto.commit"="false"
);
Binlog Load
增量同步用户在 Mysql 数据库的对数据更新操作的 CDC(Change Data Capture)功能。
INSERT/UPDATE/DELETE 支持。
过滤 Query。
暂不兼容 DDL 语句。
CREATE SYNC test_db.job1
(
FROM test.tbl1 INTO binlog_test
)
FROM BINLOG
(
"type" = "canal", // 目前可支持的对接类型只有 canal 方式
"canal.server.ip" = "hadoop1",
"canal.server.port" = "11111",
"canal.destination" = "doris-load",
"canal.username" = "canal",
"canal.password" = "canal"
);
Insert Into 的使用方式和 MySQL 等数据库中 Insert Into 语句的使用方式类似
数据导出
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
[WHERE [expr]]
TO "hdfs://host/path/to/export/"
PROPERTIES
(
"label" = "mylabel",
"column_separator"=",",
"columns" = "col1,col2",
"exec_mem_limit"="2147483648",
"timeout" = "3600"
)
WITH BROKER "hdfs"
(
"username" = "user",
"password" = "passwd"
);
查询结果导出
SELECT * FROM example_site_visit
INTO OUTFILE "hdfs://hadoop1:8020/doris-out/broker_a_"
FORMAT AS CSV
PROPERTIES
(
"broker.name" = "broker_name",
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "100MB"
);
Join 查询
Broadcast Join
系统默认实现 Join 的方式,是将小表进行条件过滤后,将其广播到大表所在的各个节
点上,形成一个内存 Hash 表,然后流式读出大表的数据进行 Hash Join。
FROM example_site_visit
JOIN [broadcast] example_site_visit2
Shuffle Join(Partitioned Join)
如果当小表过滤后的数据量无法放入内存的话,此时 Join 将无法完成可以显式指定 Shuffle Join。
即将小表和大表都按照 Join 的 key 进行 Hash,然后进行分布式的 Join
FROM example_site_visit
JOIN [shuffle] example_site_visit2
Colocation Join
保证这些表对应的数据分片会落在同一个 be 节点上,那么使得两表再进行 join 的时候,可以通过本地数据进行直接join,减少数据在节点之间的网络传输时间。
建表时两张表的分桶列的类型和数量需要完全一致,并且桶数一致
Bucket Shuffle Join
SQL 语句为 A 表 join B 表,并且 join 的等值表达式命中了 A 的数据分布列。而 Bucket Shuffle Join 会根据 A 表的数据分布信息,将 B 表的数据发送到对应的 A 表的数据存储计算节点。
与 Colocate Join 不同,它对于表的数据分布方式并没有侵入性,这对于用户来说是透明的。对于表的数据分布没有强制性的要求,不容易导致数据倾斜的问题
指定 RuntimeFilter 类型
旨在为某些 Join 查询在运行时动态生成过滤条件,来减少扫描的数据量,避免不必要的 I/O 和网络传输,从而加速查询。
set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
tableEnv.executeSql("CREATE TABLE flink_doris (\n" +
" siteid INT,\n" +
" citycode SMALLINT,\n" +
" username STRING,\n" +
" pv BIGINT\n" +
" ) \n" +
" WITH (\n" +
" 'connector' = 'doris',\n" +
" 'fenodes' = 'hadoop1:8030',\n" +
" 'table.identifier' = 'test_db.table1',\n" +
" 'username' = 'test',\n" +
" 'password' = 'test'\n" +
")\n");
ODBC 外部表
CREATE EXTERNAL RESOURCE `oracle_odbc`
PROPERTIES (
"type" = "odbc_catalog",
"host" = "192.168.0.1",
"port" = "8086",
"user" = "test",
"password" = "test",
"database" = "test",
"odbc_type" = "oracle",
"driver" = "Oracle 19 ODBC driver"
);
CREATE EXTERNAL TABLE `baseall_oracle` (
`k1` decimal(9, 3) NOT NULL COMMENT "",
`k2` char(10) NOT NULL COMMENT "",
`k3` datetime NOT NULL COMMENT "",
`k5` varchar(20) NOT NULL COMMENT "",
`k6` double NOT NULL COMMENT ""
) ENGINE=ODBC
COMMENT "ODBC"
PROPERTIES (
"odbc_catalog_resource" = "oracle_odbc",
"database" = "test",
"table" = "baseall"
);
6)Doris 建 Resource
通过 ODBC_Resource 来创建 ODBC 外表,这是推荐的方式,这样 resource 可以复用。
CREATE EXTERNAL RESOURCE `mysql_5_3_11`
PROPERTIES (
"host" = "hadoop1",
"port" = "3306",
"user" = "root",
"password" = "000000",
"database" = "test",
"table" = "test_cdc",
"driver" = "MySQL ODBC 5.3.11", --名称要和上面[]里的名称一致
"odbc_type" = "mysql",
"type" = "odbc_catalog")
7)基于 Resource 创建 Doris 外表
CREATE EXTERNAL TABLE `test_odbc_5_3_11` (
`id` int NOT NULL ,
`name` varchar(255) null
) ENGINE=ODBC
COMMENT "ODBC"
PROPERTIES (
"odbc_catalog_resource" = "mysql_5_3_11", --名称就是 resource 的名称
"database" = "test",
"table" = "test_cdc"
);
CREATE EXTERNAL TABLE `es_test` (
`k1` bigint(20) COMMENT "",
`k2` datetime COMMENT "",
`k3` varchar(20) COMMENT "",
`k4` varchar(100) COMMENT "",
`k5` float COMMENT ""
) ENGINE=ELASTICSEARCH // ENGINE 必须是 Elasticsearch
PROPERTIES (
"hosts" =
"http://hadoop1:9200,http://hadoop2:9200,http://hadoop3:9200",
"index" = "test",
"type" = "doc",
"user" = "",
"password" = ""
);