Iceberg: 列式读取Parquet数据

两种BaseBatchReader的实现类

BaseBatchReader支持以Batch + Vectorized的特性,读取底层的文件。

ColumnarBatchReader

通过VectorizedSparkParquetReaders::build Reader()静态方法创建的读取器,关键特性如下:

  1. 支持读取Delete File
  2. 以Arrow的格式直接读取Parquet文件
  3. 最终返回的数据集的类型为Spark.ColumnarBatch,是Spark中的实现类
  public static ColumnarBatchReader buildReader(Schema expectedSchema,MessageType fileSchema,Map<Integer, ?> idToConstant,DeleteFilter<InternalRow> deleteFilter) {return (ColumnarBatchReader)TypeWithSchemaVisitor.visit(expectedSchema.asStruct(),fileSchema,new ReaderBuilder(expectedSchema,fileSchema,NullCheckingForGet.NULL_CHECKING_ENABLED,idToConstant,ColumnarBatchReader::new,deleteFilter));

ArrowBatchReader

通过ArrowReader::buildReader()静态方法创建的读取器,关键特性如下:

  1. 不支持读取Delete File
  2. 以Arrow的格式直接读取Parquet文件
  3. 返回的最终结果为ColumnarBatch类型,是Iceberg内置的实现类

在Iceberg 1.2.x的版本中,只在测试用例中使用到,因此在这里不再讨论,它的实现比ColumnarBatchReader更简单。

ColumnarBatchReader的创建

DataSourceRDD::compute方法中创建PartitionReader实例

// 在计算RDD数据的过程中,会通过如下的方法创建一个实现了PartitionReader接口的具体类的实例,
// 这里partitionReaderFactory的类型为SparkColumnarReaderFactory,
// SparkColumnarReaderFactory类是Iceberg中的实现,它重写了createColumnarReader(InputPartition)接口
// 以返回一个PartitionReader<ColumnarBatch>的实例。
val batchReader = partitionReaderFactory.createColumnarReader(inputPartition)

PartitionReaderFactory.createColumnarReader方法创建BatchDataReader实例

class SparkColumnarReaderFactory implements PartitionReaderFactory {public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition inputPartition) {SparkInputPartition partition = (SparkInputPartition) inputPartition;if (partition.allTasksOfType(FileScanTask.class)) {return new BatchDataReader(partition, batchSize);} else {throw new UnsupportedOperationException("Unsupported task group for columnar reads: " + partition.taskGroup());}}
}

BatchDataReader::open方法创建VectorizedParquetReader迭代器

BatchDataReader::open

class BatchDataReader extends BaseBatchReader<FileScanTask>implements PartitionReader<ColumnarBatch> {@Overrideprotected CloseableIterator<ColumnarBatch> open(FileScanTask task) {// 获取Data File的路径String filePath = task.file().path().toString();LOG.debug("Opening data file {}", filePath);// update the current file for Spark's filename() functionInputFileBlockHolder.set(filePath, task.start(), task.length());Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());// 获取底层文件的句柄InputFile inputFile = getInputFile(filePath);Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");// 获取数据文件对应的Delete FilesSparkDeleteFilter deleteFilter =task.deletes().isEmpty()? null: new SparkDeleteFilter(filePath, task.deletes(), counter());// 返回一个数据文件上的迭代器return newBatchIterable(inputFile,task.file().format(),task.start(),task.length(),task.residual(),idToConstant,deleteFilter).iterator();}
}

BaseBatchReader::newBatchIterable方法创建VectorizedParquetReader实例

VectorizedParquetReader类是最上层的类,它提供了对遍历文件内容的入口。

abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {protected CloseableIterable<ColumnarBatch> newBatchIterable(InputFile inputFile,FileFormat format,long start,long length,Expression residual,Map<Integer, ?> idToConstant,SparkDeleteFilter deleteFilter) {switch (format) {case PARQUET:// 如果文件的格式是PARQUET,则创建一个Parquet上的迭代器return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter);case ORC:// 忽略,不讨论return newOrcIterable(inputFile, start, length, residual, idToConstant);default:throw new UnsupportedOperationException("Format: " + format + " not supported for batched reads");}}private CloseableIterable<ColumnarBatch> newParquetIterable(InputFile inputFile,long start,long length,Expression residual,Map<Integer, ?> idToConstant,SparkDeleteFilter deleteFilter) {// get required schema if there are deletesSchema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();return Parquet.read(inputFile).project(requiredSchema).split(start, length)// 指定可以创建BaseBatchReader的实现类的实例的方法.createBatchedReaderFunc(fileSchema ->VectorizedSparkParquetReaders.buildReader(requiredSchema, fileSchema, idToConstant, deleteFilter)).recordsPerBatch(batchSize).filter(residual).caseSensitive(caseSensitive())// Spark eagerly consumes the batches. So the underlying memory allocated could be reused// without worrying about subsequent reads clobbering over each other. This improves// read performance as every batch read doesn't have to pay the cost of allocating memory..reuseContainers().withNameMapping(nameMapping()).build();}
}

ColumnarBatchReader::new方法创建ColumnarBatchReader实例

VectorizedSparkParquetReaders.buildReader()方法见第一大章节的简述。

public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {private final boolean hasIsDeletedColumn;private DeleteFilter<InternalRow> deletes = null;private long rowStartPosInBatch = 0;// 只有一个构造器,readers是保存了读取文件中每一个列(字段)的Reader,它们都是实现了VectorizedReader<T>接口的// VectorizedArrowReader<T>的实例public ColumnarBatchReader(List<VectorizedReader<?>> readers) {super(readers);// 遍历每一个字段的Reader类型,看看当前文件中是不是存在内置的列_deleted,它标识着当前当前行是不是被删除了。this.hasIsDeletedColumn =readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader);}
}

Parquet文件读取

通过前面的分析,知道对上层(Spark RDD)可见的接口,是由VectorizedParquetReader(一个Iterator的实现类)提供的,
它内部封装了对ColumnarBatchReader的操作。

VectorizedParquetReader::iterator方法,返回Parquet文件上的迭代器

public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {@Overridepublic CloseableIterator<T> iterator() {FileIterator<T> iter = new FileIterator<>(init());addCloseable(iter);return iter;}
}

FileIterator::next方法,读取数据

由于FilterIterator实现了JAVA中的Iterator接口,因此可以在compute Spark RDD时,通过这个迭代器,获取到文件中的内容,
也就是next()方法返回的ColumnarBatch对象。

  /*** 这里T的类型为ColumnarBatch。*/private static class FileIterator<T> implements CloseableIterator<T> {public T next() {if (!hasNext()) {throw new NoSuchElementException();}if (valuesRead >= nextRowGroupStart) {// 第一次执行时,valuesRead == nextRowGroupStart,表示开始读取一个新的RowGroup// 这里调用advance()后,nextRowGroupStart指向了下一个要读取的RowGroup的起始位置,// 但当前的RowGroup是还没有被读取的,被延迟到了后面的过程。advance();}// batchSize is an integer, so casting to integer is safe// 读取当前RowGroup的数据,其中://   nextRowGroupStart指向的是下一个RowGroup的起始位置,//   valuesRead的值表示一共读取了多少行// 这里必须有nextRowGroupStart >= nextRowGroupStart,而它们的差值就是当前RowGroup剩余的没有被读取的行int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize);// 读取指定数量的行,这里的model就是前面提到的ColumnarBatchReader的实例对象。if (reuseContainers) {this.last = model.read(last, numValuesToRead);} else {this.last = model.read(null, numValuesToRead);}// 累加读取的行数valuesRead += numValuesToRead;return last;}/*** 移动读取指针到下一个RowGroup的起始位置。*/private void advance() {while (shouldSkip[nextRowGroup]) {nextRowGroup += 1;reader.skipNextRowGroup();}PageReadStore pages;try {pages = reader.readNextRowGroup();} catch (IOException e) {throw new RuntimeIOException(e);}// 从绑定的RowGroups信息中,计算下一个RowGroup的起始位置long rowPosition = rowGroupsStartRowPos[nextRowGroup];model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition);nextRowGroupStart += pages.getRowCount();nextRowGroup += 1;}}

ColumnarBatchReader::read

public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {protected final VectorHolder[] vectorHolders;@Overridepublic final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {if (reuse == null) {// 如果指定了不复用当前的VectorHolder来存储数据时,就关闭它们closeVectors();}// 由内部类ColumnBatchLoader负责代理进行真正的读取操作。ColumnarBatch columnarBatch = new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();rowStartPosInBatch += numRowsToRead;return columnarBatch;}
}

ColumnBatchLoader::loadDataToColumnBatch读取数据,封装成ColumnarBatch对象

  private class ColumnBatchLoader {// 读取的数据记录总数private final int numRowsToRead;// the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when// there is no deletesprivate int[] rowIdMapping;// the array to indicate if a row is deleted or not, it is null when there is no "_deleted"// metadata columnprivate boolean[] isDeleted;ColumnBatchLoader(int numRowsToRead) {Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);this.numRowsToRead = numRowsToRead;if (hasIsDeletedColumn) {isDeleted = new boolean[numRowsToRead];}}ColumnarBatch loadDataToColumnBatch() {// 对读取的数据记录进行过滤,得到未删除的数据记录总数int numRowsUndeleted = initRowIdMapping();// 以Arrows格式,读取每一列的数据,表示为Spark.ColumnVector类型ColumnVector[] arrowColumnVectors = readDataToColumnVectors();// 创建一个ColumnarBatch实例,包含所有存活的数据ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);newColumnarBatch.setNumRows(numRowsUndeleted);if (hasEqDeletes()) {// 如果有等值删除的文件存在,则还需要按值来过滤掉被删除的数据行// 由于基于等值删除的文件过滤数据时,需要知道每一行的实际值,因此只有将数据读取到内存中才知道哪一行要被删除掉applyEqDelete(newColumnarBatch);}if (hasIsDeletedColumn && rowIdMapping != null) {// 如果存在被删除的数据行,则需要重新分配行号,从0开始自然递增// reset the row id mapping array, so that it doesn't filter out the deleted rowsfor (int i = 0; i < numRowsToRead; i++) {rowIdMapping[i] = i;}newColumnarBatch.setNumRows(numRowsToRead);}// 返回return newColumnarBatch;}ColumnVector[] readDataToColumnVectors() {ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length];ColumnVectorBuilder columnVectorBuilder = new ColumnVectorBuilder();for (int i = 0; i < readers.length; i += 1) {vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);int numRowsInVector = vectorHolders[i].numValues();Preconditions.checkState(numRowsInVector == numRowsToRead,"Number of rows in the vector %s didn't match expected %s ",numRowsInVector,numRowsToRead);arrowColumnVectors[i] =columnVectorBuilder.withDeletedRows(rowIdMapping, isDeleted).build(vectorHolders[i], numRowsInVector);}return arrowColumnVectors;}boolean hasEqDeletes() {return deletes != null && deletes.hasEqDeletes();}int initRowIdMapping() {Pair<int[], Integer> posDeleteRowIdMapping = posDelRowIdMapping();if (posDeleteRowIdMapping != null) {rowIdMapping = posDeleteRowIdMapping.first();return posDeleteRowIdMapping.second();} else {rowIdMapping = initEqDeleteRowIdMapping();return numRowsToRead;}}/*** 如果当前文件包含 positions delete files,那么需要建立索引数据结构*/Pair<int[], Integer> posDelRowIdMapping() {if (deletes != null && deletes.hasPosDeletes()) {return buildPosDelRowIdMapping(deletes.deletedRowPositions());} else {return null;}}/*** Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we* delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the* row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position* delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6]* [F,F,T,F,F,F,T,F] -- After applying position deletes** @param deletedRowPositions a set of deleted row positions* @return the mapping array and the new num of rows in a batch, null if no row is deleted*/Pair<int[], Integer> buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) {if (deletedRowPositions == null) {return null;}// 为新读取的数据记录,创建一个数组,保存所有没有被删除的行号,从0开始// 基本算法:使用双指针,将所有未删除的行放到队列一端,且有序int[] posDelRowIdMapping = new int[numRowsToRead];int originalRowId = 0; // 指向待判定的行的下标int currentRowId = 0; // 存活行的下标while (originalRowId < numRowsToRead) {if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) {// 如果当前行没有被删除,则将其添加到currentRowId指向的位置posDelRowIdMapping[currentRowId] = originalRowId;// currentRowId指向下一个待插入的位置  currentRowId++;} else {if (hasIsDeletedColumn) {isDeleted[originalRowId] = true;}deletes.incrementDeleteCount();}originalRowId++;}if (currentRowId == numRowsToRead) {// there is no delete in this batchreturn null;} else {return Pair.of(posDelRowIdMapping, currentRowId);}}int[] initEqDeleteRowIdMapping() {int[] eqDeleteRowIdMapping = null;if (hasEqDeletes()) {eqDeleteRowIdMapping = new int[numRowsToRead];for (int i = 0; i < numRowsToRead; i++) {eqDeleteRowIdMapping[i] = i;}}return eqDeleteRowIdMapping;}/*** Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original* status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted* array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num* records to 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 <= x <=* 3 [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4]* [F,T,T,T,F,F,T,F] -- After applying equality deletes** @param columnarBatch the {@link ColumnarBatch} to apply the equality delete*/void applyEqDelete(ColumnarBatch columnarBatch) {// 对经过position deletes 过滤的数据行,进行按值删除Iterator<InternalRow> it = columnarBatch.rowIterator();int rowId = 0;int currentRowId = 0;while (it.hasNext()) { // 行式遍历InternalRow row = it.next();if (deletes.eqDeletedRowFilter().test(row)) {// the row is NOT deleted// skip deleted rows by pointing to the next undeleted row Id// 更新成员变量rowIdMappingrowIdMapping[currentRowId] = rowIdMapping[rowId];currentRowId++;} else {if (hasIsDeletedColumn) {isDeleted[rowIdMapping[rowId]] = true;}deletes.incrementDeleteCount();}rowId++;}// 更新最新的存活记录数columnarBatch.setNumRows(currentRowId);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/597402.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大数据技术在民生资金专项审计中的应用

一、应用背景 目前&#xff0c;针对审计行业&#xff0c;关于大数据技术的相关研究与应用一般包括大数据智能采集数据技术、大数据智能分析技术、大数据可视化分析技术以及大数据多数据源综合分析技术。其中&#xff0c;大数据智能采集数据技术是通过网络爬虫或者WebService接…

Docker无法启动Postgresql容器

目录 问题描述解决问题 问题描述 拉取了一个Postgresql14.2的镜像&#xff0c;在docker run创建并运行容器之后使用docker ps发现容器没有跑起来&#xff0c;再次使用docker start也没跑起来。 docker run -d --name mypg -v psql-data:/var/lib/postgresql/data -e POSTGRES…

Python random模块(获取随机数)常用方法和使用例子

嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 random.random random.random()用于生成一个0到1的随机符点数: 0 < n < 1.0 random.uniform random.uniform(a, b)&#xff0c;用于生成一个指定范围内的随机符点数&#xff0c;两个参数其中一个是上限&#xff0c;一…

2023-我的CSDN创作之旅

1.博客内容与数量 2023年共发表博客59篇&#xff0c;内容主要集中在GIS&#xff0c;空间分析等领域 主要内容有&#xff1a; networkx学习 Geospatial Data Science Geocomputation ESDA in PySal SHAP Spatial Data Analysis BikeDNA 以下是对这几个章节主要内容的简…

机器学习 -- k近邻算法

场景 我学习Python的初衷是学习人工智能&#xff0c;满足现有的业务场景。所以必须要看看机器学习这一块。今天看了很久&#xff0c;做个总结。 机器学习分为深度学习和传统机器学习 深度学习 深度学习模型通常非常复杂&#xff0c;包含多层神经网络&#xff0c;每一层都包含…

在SQL脚本中删除所有的 prompt那一行

在SQL脚本中删除所有的 prompt 命令&#xff0c;您可以手动编辑SQL文件&#xff0c;移除所有包含 prompt 关键字的行。具体操作取决于您使用的是哪种文本编辑器。大多数文本编辑器支持查找和替换功能&#xff0c;可以用它来删除所有 prompt 行。 以下是在不同编辑器中进行操作…

html+css 对input的使用以及详解

表单 form标签主要用于收集用户信息&#xff0c;对表单结果的处理和发送 使用场景&#xff1a;登录页面、注册页面、搜索区域 form属性描述action规定当提交表单时向何处发送表单数据method规定用于发送表单数据的 HTTP 方法name规定表单的名称target规定在何处打开 action …

mysql知识学习

0 mysql的隔离级别有4个&#xff0c;读未提交&#xff0c; 读已提交&#xff08;rc&#xff09;&#xff0c; 可重复读&#xff08;rr&#xff09;,串行化&#xff0c;mysql默认的隔离级别时rr 1聚簇索引就是主键索引&#xff0c; 可以一步到位i的索引 非聚簇索引就是要通过二…

Golang - http请求InsecureSkipVerify 字段为 true 来忽略忽略 SSL验证

在Golang中&#xff0c;可以通过设置 tls.Config 的 InsecureSkipVerify 字段为 true 来忽略 SSL 验证。 下面是一个简单的示例代码&#xff1a; package mainimport ("crypto/tls""fmt""net/http" )func main() {// 创建一个自定义的 Transp…

矩阵式键盘按键值的数码管显示实验

#include<reg51.h> //包含51单片机寄存器定义的头文件 sbit P14P1^4; //将P14位定义为P1.4引脚 sbit P15P1^5; //将P15位定义为P1.5引脚 sbit P16P1^6; //将P16位定义为P1.6引脚 sbit P17P1^7; //将P17位定义为P1.7引脚 unsigned char code Tab[ ]…

CTF-PWN-栈溢出-高级ROP-【SROP】

文章目录 linux信息处理2017 360春秋杯 smallest检查源码思路第一次要执行ret时的栈执行write函数时修改rsp到泄露的栈地址上去 输入/bin/sh并sigreturn调用系统调用回忆exp注意一个离离原上谱的地方 参考链接 SROP(Sigreturn Oriented Programming) 于 2014 年被 Vrije Univer…

简单多状态dp问题(打家劫舍Ⅱ)

通过分类谈论&#xff0c;将环形的问题&#xff0c;转化成两个线性的 “ 打家劫舍Ⅰ ” 1.状态表示 2.状态转移方程 3.初始化 f[ 0 ] nums[ 0 ] g[ 0 ] 0 4.填表顺序 从左往右填表&#xff0c;两个表一块填 5.返回值 max( f[ n-1 ] , g [ n - 1 ] )

【Bug】Android BottomNavigationView 图标黑色色块问题

最近在研究Android Jetpack组件&#xff0c;在使用Navigation配合底部导航栏时&#xff0c;发现一个奇怪的问题&#xff0c;如下&#xff1a; 说明&#xff1a;图标来源于Iconfont开源图标库 我的第三个图标变成了一个黑色色块&#xff0c;这个问题前两天我遇见过&#xff0c…

.NetCore部署微服务(一)

目录 前言 什么是微服务 微服务的优势 微服务的原则 创建项目 在Docker中运行服务 客户端调用 简单的集群服务 前言 写这篇文章旨在用最简单的代码阐述一下微服务 什么是微服务 微服务描述了从单独可部署的服务构建分布式应用程序的体系结构流程&#xff0c;同时这些…

C# 使用Microsoft消息队列(MSMQ)

写在前面 Microsoft Message Queuing (MSMQ) 是在多个不同的应用之间实现相互通信的一种异步传输模式&#xff0c;相互通信的应用可以分布于同一台机器上&#xff0c;也可以分布于相连的网络空间中的任一位置。 使用消息队列可以实现异步通讯&#xff0c;无需关心接收端是否在…

海康威视摄像头+服务器+录像机配置校园围墙安全侦测区域入侵侦测+越界侦测.docx

一、适用场景 1、校园内&#xff0c;防止课外时间翻越围墙到校外、从校外翻越围墙到校内&#xff1b; 2、通过服务器摄像头的侦测功能及时抓图保存&#xff0c;为不安全因素提供数字化依据&#xff1b; 3、网络录像机保存监控视频&#xff0c;服务器保存抓拍到的入侵与越界&am…

UI自动化Selenium iframe切换多层嵌套

IFRAME是HTML标签&#xff0c;作用是文档中的文档&#xff0c;或者浮动的框架(FRAME)。iframe元素会创建包含另外一个文档的内联框架(即行内框架)。 简单来说&#xff0c;就像房子内的一个个房间一样&#xff1b;你要去房间里拿东西&#xff0c;就得先开门&#xff1b; 如上图…

指针大礼包5

三、程序改错 共10题 &#xff08;共计100分&#xff09; 第1题 &#xff08;10.0分&#xff09; 题号:72 难度:中 第8章 /*------------------------------------------------------- 【程序改错】 ---------------------------------------------…

出现 No such instance field: ‘XXXX‘ 的解决方法

目录 1. 问题所示2. 原理分析3. 解决方法1. 问题所示 作为一个全栈的开发玩家,需要调试前后端的数据传输,方便发现问题所在! 在debug整个项目的时候,检查传输数据的时候,发现前端可以传输,但是后端一直拿不到 出现如下问题:No such instance field: parentModel 截图…

UI5与后端的文件交互(四)

文章目录 前言一、后端开发1. 新建管理模板表格2. 新建Function&#xff0c;动态创建文档 二、修改UI5项目1.Table里添加下载证明列2. 实现onClickDown事件 三、测试四、附 前言 这系列文章详细记录在Fiori应用中如何在前端和后端之间使用文件进行交互。 这篇的主要内容有&…