前言
一个通用系统意味着更广泛的适用性,但通用的另一种解释是平庸,因为它无法在所有场景内都做到极致。
ClickHouse 在没有像三驾马车这样的指导性论文的背景下,通过针对特定场景的极致优化,获得闪电般的查询性能。
ClickHouse 是什么
官方定义:ClickHouse is a high-performance, column-oriented SQL database management system (DBMS) for online analytical processing (OLAP).
高性能的 OLAP 数据库管理系统。
ClickHouse 核心特性
完备的 DBMS 功能
能被称为 DBMS,那么就具有完备的数据库管理功能,如 DDL(数据定义语言),允许动态地创建、修改或删除数据库、表和视图;DML(数据操作语言),允许动态查询、插入、修改或删除数据。同时,还包括权限控制,可以按照用户粒度设置数据库或者表的操作权限。
列式存储
行式存储(Row-Oriented)面向 OLTP(Online Analytical Processing),每行数据存储在一起,可以快速地进行增删改操作,不适合聚合查询,因为 data scan 范围大。
列式存储(Column-Oriented)面向 OLAP(Online Transaction Processing),每一列数据作为单独的物理存储,分析查询时只需要读取相关的列即可。
ClickHouse 默认每一列对应一个 column_name.bin 数据文件。列式存储也为 ClickHouse 的数据压缩和向量化执行提供了便利。
数据压缩
数据压缩后,体积减小,可以减少网络带宽和磁盘 IO 的压力。同时,因为采用了数据列式存储,同一列数据相似度就会高,天然的具备压缩优势。
向量化执行
相比于传统火山模型中的 row-based 模式,向量化执行引擎采用 batch-based 批量处理模式,可以大幅减少函数调用开销、数据的 Cache Miss,提升 CPU 利用效率。同时也能使用 SIMD CPU 指令集,也就是向量化执行。
SIMD(单指令流多数据流),即能够使用一个汇编指令处理多个数据,如下图
CPU 通过一些扩展指令集实现向量化执行,例如,在 intel 的处理器中,有以下扩展指令集:
-
SSE:这个指令集扩展了 8 个 128 位的寄存器,它们被命名为 XMM0 到 XMM7。后续的 SSE2,SSE3,SSE4 等版本在此基础上进行了扩展。
-
AVX:这个指令集扩展了 16 个 256 位的寄存器,它们被命名为 YMM0 到 YMM15。
-
AVX-512:这个指令集扩展了 32 个 512 位的寄存器,它们被命名为 ZMM0 到 ZMM31。
通过使用这些扩展指令集,我们可以一次将多个整数、字符、浮点数等多个数据加载到寄存器中,使用单条扩展汇编指令完成多个数据的批量处理后,再一次将寄存器中的值写回内存。
丰富的索引
通过 Partition、Part、Sparse-Primary-Key、skip-index、mark-range 等索引结构,ClickHouse 能够只读取 必要的列 的 必要的片段(granule),将 scan 粒度压缩到极致。
存储引擎
ClickHouse提供了丰富的存储引擎
外部表引擎
- File on FS
- S3
- HDFS
- MySQL
- … …
内部表引擎
- Memory
- Log
- StripeLog
- MergeTree family
尤其是 MergeTree 系列引擎,最基础的 MergeTree 引擎支持主键索引、数据分区、数据副本等能力,在其之上又有一系列的扩展特性。
特定场景的优化
specializations for special cases
字符串搜索(Substring Search)算法,如 C 中的 strstr、menmen,C++ 中的 std::search、std::find 等,但是通用意味着平庸,它们在特定场景下会很低效(slow in some usage scenario),拿 memmem 举例:
void * memmem(const void * haystack, size_t haystacklen, const void * needle, size_t needlelen)
memmem 的接口决定了其 代码可重入(reentrable)、无初始化例程(no separate initialization routine),这也是该算法成为标准库函数的理由——通用。
但是如果模式串(needle)只有一个,而文本串(haystack)有很多的情况下:
Searcher searcher(needle);
for (const auto & haystack : haystacks)searcher.search(haystack);
这种场景下使用 memmem 不是最优解,我们可以对 needle 做一些初始化例程(separate initialization routine)来获得更好的性能,例如 Boyer-Moore (BM)、Knuth-Morris-Pratt (KMP) 、Volnitsky 等算法就是这样做的,它们通过对模式串(needle)预先建立一系列规则来获得搜索加速。
单是字符串搜索(Substring Search)算法,就有非常多的场景:
- 精确搜索(exact search) / 近似搜索(approximate search)
- 一个模式串 / 多个模式串
- 长模式串 / 短模式串
- 模式串是字节 / unicode字符 / 单词集?
- 搜索是否在预定义的文本中进行,还是在未提前知道的文本中进行
- 文本是否完全存储在内存中(located in memory completely),还是作为数据流可用(a stream of data)
- 最坏情况时间复杂度的保证(with strong guarantees on time)
- … …
丰富的字符串搜索算法:
But none of these algorithms are used in ClickHouse.
ClickHouse 使用的:
- 当模式串是常量时,使用 Volnitsky 算法
- 当模式串是非常量时,使用 SIMD 加速的暴力搜索算法
- 当模式串是一组常量集合时,使用 Variation of Volnitsky 算法
- 正则表达式匹配时,使用 re2 和 hyperscan 库
- … …
同样的,排序(sorting)算法也是如此:
- 被排序元素类型是 number / tuple / string / structure
- 所有数据位于内存(available comletely in RAM)
- 基于比较(comparison) / 三向比较(3-way comparison) / 并行比较(parallel comparison) / 基数排序(by radix)
- 直接排序 / 间接排序(obtain a permutation)
- 稳定排序 / 非稳定排序(相同键值的相对顺序是否改变)
- full / partial / n-element
- … …
ClickHouse 使用了 pdqsort / radix sort 等
哈希表(Hash Table):
- 哈希函数的选择
- 小 key / 大 key
- 填充因子的选择
- 冲突解决算法的选择
- 元素是否支持移动语义
- 扩容后如何移动元素
- 使用位图快速探测
- 预取(prefetch)和批处理(batching)
- … …
关系模型与 SQL 查询
ClickHouse 使用关系模型以及 SQL 查询,使得其更容易理解和使用。同时,其 server 层支持 Mysql Client / ClickHouse Native TCP Client / HTTP Client 等方式接入,更加灵活易用。
MergeTree 引擎
ClickHouse 的 MergeTree 引擎支持重复 Primary Key 、partition 、skip index 等索引类型。
以下是 MergeTree 建表 DDL 以及 数据插入语句:
CREATE TABLE mt
(`EventDate` Date,`OrderID` Int32,`BannerID` UInt64,`GoalNum` Int8
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
ORDER BY (OrderID, BannerID)INSERT INTO mt SELECTtoDate('2018-09-26'),number,number + 10000,number % 128
FROM numbers(1000000)INSERT INTO mt SELECTtoDate('2018-10-15'),number,number + 10000,number % 128
FROM numbers(1000000, 1000000)
每次写入数据(insert)会根据 分区键(PARTITION BY)在对应的 ClickHouse 数据目录下创建一个目录存储对应的数据,称之为 part,如
clickhouse-data/data/default/mt/201809_1_1_0/
clickhouse-data/data/default/mt/201810_2_2_0/
之所以不称为 partition,是因为新插入的 part 可能和之前的已经存在的 part 具有相同的分区键,但是真正的分区合并(Merge)并不会立即发生,而会在合适的时机在后台 Merge。因此,partition 是一个逻辑概念,一个 partition 包含多个 part。如果再次插入一个分区键同样为201810 的 part,就会新增一个数据目录,如下:
clickhouse-data/data/default/mt/201809_1_1_0/
clickhouse-data/data/default/mt/201810_2_2_0/
clickhouse-data/data/default/mt/201810_3_3_0/
此时 partition 201810 包含两个 part,可以使用以下语句立即合并 part
OPTIMIZE TABLE mt
合并后结果:
clickhouse-data/data/default/mt/201809_1_1_0/
clickhouse-data/data/default/mt/201810_2_2_0/
clickhouse-data/data/default/mt/201810_2_3_1/
clickhouse-data/data/default/mt/201810_3_3_0/
其中,part 201810_2_2_0 和 201810_3_3_0 被标记为无效,后续会被删除,scan 时会忽略
每个 part 中有以下信息:
- 每个字段对应的压缩数据列,如 BannerID.bin
- 分区信息,partition.dat 以及 分区级别的 minmax 索引,如 minmax_EventDate.idx
- 主键索引 primary.idx
- 维护主键索引和数据列对应关系的 mark 文件,如 BannerID.mrk(会根据是否压缩 / 数据文件的 compact / InMemory / Wide 格式改变后缀名)
$ tree 201809_1_1_0/
201809_1_1_0/
├── BannerID.bin
├── BannerID.cmrk2
├── checksums.txt
├── columns.txt
├── count.txt
├── default_compression_codec.txt
├── EventDate.bin
├── EventDate.cmrk2
├── GoalNum.bin
├── GoalNum.cmrk2
├── metadata_version.txt
├── minmax_EventDate.idx
├── OrderID.bin
├── OrderID.cmrk2
├── partition.dat
├── primary.cidx
└── serialization.json
每个 part 中的所有数据列都按照 Primary Key 排序。
Primary Key 为稀疏(Sparse)索引,默认间隔 8192 条记录生成一行索引数据,数据文件同样默认 8192 条数据作为一个 granule 并压缩为一个压缩块。主键索引和 granule 通过 mark 数据标记文件建立联系,mark 文件中记录了每个 granule 在压缩数据列的偏移位置。
这样一来,通过分区索引、分区minmax索引、主键索引、二级索引、数据标记、压缩数据块等设计,ClickHouse 能够最小化 scan 成本。
ClickHouse 还支持多路径存储策略,将数据分布在不同的硬盘上,在此基础上,支持多线程读取压缩数据块,并尽可能将不同的硬盘分配给不同的线程,利用 RAID 技术提供更大的 IO 带宽。当一些线程提前完成读取任务时,还可以执行预分配给其他线程的任务(Threads can steal tasks)。
ClickHouse 分布式
如果单节点 ClickHouse 无法满足业务场景,那么就要考虑分布式多节点 ClickHouse。
在单机的场景下,表连接(Join)通常使用以下算法:
- Nested-Loop Join:这是最基本的连接算法,通过双层循环比较数据来获得结果。Nested-Loop Join 还包括一些优化版本,如 Index Nested-Loop Join 和 Block Nested-Loop Join。
- Hash Join:这种算法首先为一张表(通常是小表)建立一个哈希表,然后扫描另一张表(通常是大表),在哈希表中查找匹配的记录。Hash Join 在处理大数据量时通常比 Nested-Loop Join 更高效,但需要足够的内存来存储哈希表。
- Sort-Merge Join:这种算法首先将两张表按照连接键进行排序,然后顺序扫描两张表,找到匹配的记录。Sort-Merge Join 在处理大数据量且内存有限的情况下可能是一个好的选择,但它需要额外的排序步骤,可能会增加计算成本。
分布式场景下,通常使用以下策略(strategy):
- Broadcast Join:在分布式环境中,如果一张表的数据量远小于另一张表,那么可以将小表广播到所有节点上,然后在每个节点上执行本地连接操作。只适合小集群或是小右表。
- Shuffle Join:当两张表的数据量都很大且无法广播时,可以使用 Shuffle Join。这种算法首先将两张表按照连接键进行分区,然后将相同连接键的数据发送到同一个节点上进行连接操作。
ClickHouse 采用了 Scatter/Gather 计算架构,在 MergeTree 的基础上引入了 Distributed 引擎实现分布式,Distributed 是逻辑概念,数据首先被 scatter 分散到多个节点进行并行处理,然后将处理结果 gather 汇集到一个节点。这种架构非常适合可以并行且无需跨节点通信的计算任务。
但是,这种架构在 Gather 节点聚合数据时,可能会遇到性能瓶颈。另一个挑战是,由于数据需要跨节点重新分布,Scatter/Gather 架构并不适合实现真正的 shuffle join,而更倾向于使用 broadcast join。不过,在实际场景中,右表的数据需要先被 Gather 节点收集,然后再分发到各个节点,因此这并非是典型的 broadcast join 实现。同时,如果两个大表需要进行 join 操作,broadcast join 则会受到网络带宽和节点数量的限制。
Spark 使用的是 MapReduce 计算架构,在Spark中,执行过程被划分为多个阶段,每个阶段由一系列的 map 和 reduce 任务组成,这是一种类似于 MapReduce 的架构。当存在宽依赖(wide dependency),需要对数据进行重新分区(即 shuffle)时,就会形成一个新的阶段。在每个阶段结束时,Spark 都会将计算结果落盘,这样即使在后续的计算过程中发生了错误,也可以从这些持久化的结果中恢复,而不需要重新计算整个阶段。这种设计使得 Spark 的计算过程更加稳定,特别适合执行长时间运行的 ETL(Extract-Transform-Load)任务,但是不适合低延时的数据查询。
Doris 使用的是 MPP 架构支持数据混洗(shuffle),能够高效地完成表连接(quick joins)和高基数数据聚合(high-cardinality aggregations)
因此,在分布式 join 场景下,ClickHouse 的性能不如 Doris/ StarRocks 等引擎,ClickHouse 需要预先将雪花模式(Snowflake Schema)或是星型模式(Star Schema)转换为 宽表/平面表模式(Flat-table Schema),避免分布式 join 的性能瓶颈。
Reference
- ClickHouse Document
- clickhouse-presentations/meetup41/merge_tree
- presentations.clickhouse.com/bdtc_2019
- ClickHouse 源码解析: 综述
- ClickHouse Join为什么被大家诟病?
- Why ClickHouse Cannot Keep Up