简要概述
Apache Parquet 是一个开源、列式存储文件格式,最初由 Twitter 与 Cloudera 联合开发,旨在提供高效的压缩与编码方案以支持大规模复杂数据的快速分析与处理。Parquet 文件采用分离式元数据设计 —— 在数据写入完成后,再追加文件级元数据(包括 Schema、Row Group 与 Column Chunk 的偏移信息等),使得写入过程只需一次顺序扫描,同时读取端只需先解析元数据即可随机访问任意列或任意 Row Group。其核心结构包括:
- Header(魔数“PAR1”)
- 若干 Row Group(行组)
- Footer(文件级元数据 + 魔数“PAR1”)
文件格式总体布局
1. Header(文件头)
- 文件以4字节的魔数
PAR1
开始,标识这是一个 Parquet 文件格式。
2. Row Group(行组)
- Row Group 是水平切分的数据分片,每个 Row Group 包含表中若干行,通常大小在几十 MB 到几百 MB 之间,以兼顾并行处理与内存使用。
- 每个 Row Group 内部再按列拆分为若干 Column Chunk,支持对单个列或多个 Row Group 进行跳过式读取。
3. Column Chunk(列区块)
- Column Chunk 即某一列在某个 Row Group 中的数据块,它是 Parquet 最基本的物理存储单元之一。
- 每个 Column Chunk 中又分成多个Page,以便更细粒度地管理压缩与解压效率。
4. Page(页)
- Page 是 Column Chunk 中的最小读写单元,主要有三种类型:
- Dictionary Page:存放字典编码字典。
- Data Page:存放实际列数据(可进一步分为 Data Page V1/V2)。
- Index Page(可选):存放 Page-level 索引,加速定位。
- 通过 Page 级别的压缩与编码,Parquet 能够对不同类型的数据采用最优策略,极大提升 I/O 性能。
5. Footer(文件尾)
- 写入完全部 Row Group 后,接着写入File Metadata,其中包含:
- Schema 定义
- 每个 Row Group 的偏移及长度
- 每个 Column Chunk 的偏移、页信息、编码与压缩算法
- 最后再写入 4 字节魔数
PAR1
,完整标识文件结束。
元数据管理
- Thrift 定义:Parquet 使用 Apache Thrift 描述元数据结构(Schema、Row Group 元信息、Column Chunk 元信息、Page Header 等),以保证跨语言支持。
- 单次写入:由于元数据紧跟数据之后写入,写入端无需回写或多次扫描,保证单次顺序写入效率。
- 快速读取:读端先读取文件末尾的 Footer,加载所有元数据后即可随机、并行地读取任意列与任意 Row Group,极大减少磁盘 I/O 与网络传输开销。
编码与压缩技术
字典编码(Dictionary Encoding)
- 针对低基数(unique values 较少)的列,自动开启字典编码,将原值映射到字典索引,从而显著减少存储量。
混合 RLE + Bit-Packing
- Parquet 对整数类型实现了混合 RLE(Run-Length Encoding)与Bit-Packing,根据数据分布动态选择最佳方案,例如对于长串相同值使用 RLE,对于小整数值使用 Bit-Packing。
其它压缩算法
- 支持 Snappy、Gzip、LZO、Brotli、ZSTD、LZ4 等多种压缩格式,可按列灵活选择。
写入过程原理
- Schema 序列化:将 DataFrame/表结构映射为 Thrift Schema。
- Row Group 切分:按预设行数或大小切分成多个 Row Group。
- Column Chunk 生成:对每个列分别收集数据,分 Page 编码与压缩。
- 顺序写入:先写 Header → 各 Row Group 的 Column Chunk(含 Page)→ Footer(含全部元数据)→ 终止魔数。
读取过程原理
- 读取 Footer:读取文件末尾 8 字节(4 字节元数据长度 + 4 字节魔数),定位并加载全部元数据。
- 筛选元数据:根据查询列与过滤条件,决定需要加载的 Column Chunk 与 Page 位置。
- 随机/并行读取:直接跳转到各 Column Chunk 的偏移位置,按需加载并解压 Page。
- 重建记录:将各列 Page 解码后重组成行,供上层引擎消费。
实践示例:使用 Python 创建并写入 Parquet
下面示例演示如何用 pandas+pyarrow 将一个简单表格写入 Parquet,并简要注释写入过程:
import pandas as pd# 1. 创建 DataFrame
df = pd.DataFrame({"user_id": [1001, 1002, 1003],"event": ["login", "purchase", "logout"],"timestamp": pd.to_datetime(["2025-04-20 10:00:00","2025-04-20 10:05:00","2025-04-20 10:10:00"])
})# 2. 写入 Parquet(默认 row_group_size=64MB,可通过 partition_cols 分区)
df.to_parquet("events.parquet", engine="pyarrow", compression="snappy")
- pandas 在内部将 DataFrame Schema 转为 Thrift Schema;
- 按列生成 Column Chunk 并分 Page 编码;
- 最后输出 Header、Column Chunk、Footer 以及魔数字段。
1. Row Group 的概念与分割原则
- Row Group 是 Parquet 中最小的水平切分单元,表示若干行的完整切片。一个 Parquet 文件由一个或多个 Row Group 依次组成,每个 Row Group 内部再按列拆分为若干 Column Chunk,支持列式读写与跳过式扫描。
- 选择合适的 Row Group 大小,是在I/O 效率(大块顺序读更快)和并行度/内存占用(小块更易并行、占用更少)间的权衡。
2. Row Group 的“大小”度量方式
Parquet 对 Row Group 的大小有两种常见度量:
- 未压缩字节数(uncompressed bytes)
- 行数(number of rows)
- 在 Parquet-Java(Hadoop/Spark 常用)中,
ParquetProperties.DEFAULT_ROW_GROUP_SIZE
默认为 128 MiB(未压缩),并以字节为单位控制切分。 - 在 PyArrow(Python 生态)中,
row_group_size
参数实际上是以行数为单位;若不指定(None
),默认值已被调整为 1 Mi 行(绝对最大值仍为 64 Mi 行)。
3. 不同实现中的默认值与推荐配置
实现 / 场景 | 默认值 | 推荐范围 | 说明 |
---|---|---|---|
Parquet-Java | 128 MiB(未压缩) | 512 MiB – 1 GiB(未压缩) | 与 HDFS 块大小对齐,一般 HDFS 块也应设为 1 GiB,以便一组 Row Group 刚好填满一个块 |
Spark/HDFS | spark.sql.files.maxPartitionBytes 默认 128 MiB | 同上 | Spark 会将每个 Row Group 当作一个分区,读写时以此并行 |
PyArrow | 1 Mi 行 | 视数据表宽度换算成字节 | 若表宽为 10 列、每列单元假设 4 字节,1 Mi 行 ≈ 40 MiB(未压缩) |
其他工具(如 DuckDB/Polars) | 通常基于“每行最大允许字节数”来推算,默认约 1 Mi 行 | 需参考各自文档 | 如 Polars 中 row_group_size_bytes 默认 122 880 KiB(≈ 120 MiB) |
4. Row Group 在文件中的物理结构
一个 Row Group 在文件中的布局可以抽象为:
+--------------------------------------------+
| Row Group |
| ┌───────────────────────────────────────┐ |
| │ Column Chunk for Column 0 │ │
| │ ┌── Page 1 (Dictionary Page?) │ │
| │ │ - Page Header │ │
| │ │ - 字典或数据块 │ │
| │ └── Page 2 (Data Page v1/v2) │ │
| │ - Page Header │ │
| │ - 压缩/编码后的列数据 │ │
| │ … │ │
| └───────────────────────────────────────┘ │
| ┌───────────────────────────────────────┐ │
| │ Column Chunk for Column 1 │ │
| │ … │ │
| └───────────────────────────────────────┘ │
| ⋮ |
| ┌───────────────────────────────────────┐ │
| │ Column Chunk for Column N │ │
| │ … │ │
| └───────────────────────────────────────┘ │
+--------------------------------------------+
同时,文件末尾的 Footer 中包含了每个 Row Group 的元数据(Thrift 定义),主要字段有:
total_byte_size
:该 Row Group 内所有 Column Chunk 未压缩数据总字节数num_rows
:该 Row Group 包含的行数columns: list<ColumnChunk>
:每个 ColumnChunk 的偏移、长度、编码、压缩算法、Page 统计等
读写时,写端按序将上述 Row Group 数据写入磁盘,待所有 Row Group 完成后再写 Footer 与尾部魔数;读端则先读 Footer,加载元数据后即可随机定位到任意 Row Group 和任意字段的 Column Chunk 进行解压与解码。
5. 举例说明
假设有一个表 10 列,每列单元平均占用 4 字节,我们若以 1 Mi 行为 Row Group(PyArrow 默认值),则:
- 未压缩 Row Group 大小 ≈ 1 Mi × 10 列 × 4 B ≈ 40 MiB。
- 压缩后:若使用 Snappy,通常可压缩到 1/2 – 1/4,大约 10 MiB – 20 MiB。
- 如果改为 Parquet-Java 默认 128 MiB(未压缩),则 Row Group 行数可按上述公式反算:128 MiB ÷ (10 × 4 B) ≈ 3.3 Mi 行。
小结
- Row Group 大小既可以用字节也可以用行数度量,具体含义取决于所用写入器与参数;
- Parquet-Java 默认 128 MiB(未压缩),官方推荐 512 MiB – 1 GiB;
- PyArrow 默认 1 Mi 行,且新版已调整绝对最大 64 Mi 行;
- 结构上,Row Group 包含多列的 Column Chunk,每个 Column Chunk 又由若干 Page 组成;
- 选取合适的 Row Group 大小,可在 顺序 I/O 性能 与 并行度/内存占用 之间取得平衡。
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq# 1. 创建示例 DataFrame
df = pd.DataFrame({"user_id": [1001, 1002, 1003, 1004],"event": ["login", "purchase", "logout", "login"],"timestamp": pd.to_datetime(["2025-04-20 10:00:00","2025-04-20 10:05:00","2025-04-20 10:10:00","2025-04-20 10:15:00"])
})# 2. 写入 Parquet(指定 row_group_size 为 2 行,以演示多个 Row Group)
pq.write_table(pa.Table.from_pandas(df), 'events.parquet', row_group_size=2)# 3. 读取 Parquet 元数据并提取 Row Group 信息
pq_file = pq.ParquetFile('events.parquet')
row_groups = []
for i in range(pq_file.num_row_groups):rg = pq_file.metadata.row_group(i)row_groups.append({"row_group_id": i,"num_rows": rg.num_rows,"total_byte_size": rg.total_byte_size})# 4. 展示 Row Group 元数据
rg_df = pd.DataFrame(row_groups)
import ace_tools as tools; tools.display_dataframe_to_user("Row Group Metadata", rg_df)