两种BaseBatchReader的实现类
BaseBatchReader支持以Batch + Vectorized的特性,读取底层的文件。
ColumnarBatchReader
通过
VectorizedSparkParquetReaders::build Reader()
静态方法创建的读取器,关键特性如下:
- 支持读取Delete File
- 以Arrow的格式直接读取Parquet文件
- 最终返回的数据集的类型为Spark.ColumnarBatch,是Spark中的实现类
public static ColumnarBatchReader buildReader(Schema expectedSchema,MessageType fileSchema,Map<Integer, ?> idToConstant,DeleteFilter<InternalRow> deleteFilter) {return (ColumnarBatchReader)TypeWithSchemaVisitor.visit(expectedSchema.asStruct(),fileSchema,new ReaderBuilder(expectedSchema,fileSchema,NullCheckingForGet.NULL_CHECKING_ENABLED,idToConstant,ColumnarBatchReader::new,deleteFilter));
ArrowBatchReader
通过
ArrowReader::buildReader()
静态方法创建的读取器,关键特性如下:
- 不支持读取Delete File
- 以Arrow的格式直接读取Parquet文件
- 返回的最终结果为ColumnarBatch类型,是Iceberg内置的实现类
在Iceberg 1.2.x的版本中,只在测试用例中使用到,因此在这里不再讨论,它的实现比ColumnarBatchReader更简单。
ColumnarBatchReader的创建
DataSourceRDD::compute方法中创建PartitionReader实例
// 在计算RDD数据的过程中,会通过如下的方法创建一个实现了PartitionReader接口的具体类的实例,
// 这里partitionReaderFactory的类型为SparkColumnarReaderFactory,
// SparkColumnarReaderFactory类是Iceberg中的实现,它重写了createColumnarReader(InputPartition)接口
// 以返回一个PartitionReader<ColumnarBatch>的实例。
val batchReader = partitionReaderFactory.createColumnarReader(inputPartition)
PartitionReaderFactory.createColumnarReader方法创建BatchDataReader实例
class SparkColumnarReaderFactory implements PartitionReaderFactory {public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition inputPartition) {SparkInputPartition partition = (SparkInputPartition) inputPartition;if (partition.allTasksOfType(FileScanTask.class)) {return new BatchDataReader(partition, batchSize);} else {throw new UnsupportedOperationException("Unsupported task group for columnar reads: " + partition.taskGroup());}}
}
BatchDataReader::open方法创建VectorizedParquetReader迭代器
BatchDataReader::open
class BatchDataReader extends BaseBatchReader<FileScanTask>implements PartitionReader<ColumnarBatch> {@Overrideprotected CloseableIterator<ColumnarBatch> open(FileScanTask task) {// 获取Data File的路径String filePath = task.file().path().toString();LOG.debug("Opening data file {}", filePath);// update the current file for Spark's filename() functionInputFileBlockHolder.set(filePath, task.start(), task.length());Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());// 获取底层文件的句柄InputFile inputFile = getInputFile(filePath);Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");// 获取数据文件对应的Delete FilesSparkDeleteFilter deleteFilter =task.deletes().isEmpty()? null: new SparkDeleteFilter(filePath, task.deletes(), counter());// 返回一个数据文件上的迭代器return newBatchIterable(inputFile,task.file().format(),task.start(),task.length(),task.residual(),idToConstant,deleteFilter).iterator();}
}
BaseBatchReader::newBatchIterable方法创建VectorizedParquetReader实例
VectorizedParquetReader类是最上层的类,它提供了对遍历文件内容的入口。
abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {protected CloseableIterable<ColumnarBatch> newBatchIterable(InputFile inputFile,FileFormat format,long start,long length,Expression residual,Map<Integer, ?> idToConstant,SparkDeleteFilter deleteFilter) {switch (format) {case PARQUET:// 如果文件的格式是PARQUET,则创建一个Parquet上的迭代器return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter);case ORC:// 忽略,不讨论return newOrcIterable(inputFile, start, length, residual, idToConstant);default:throw new UnsupportedOperationException("Format: " + format + " not supported for batched reads");}}private CloseableIterable<ColumnarBatch> newParquetIterable(InputFile inputFile,long start,long length,Expression residual,Map<Integer, ?> idToConstant,SparkDeleteFilter deleteFilter) {// get required schema if there are deletesSchema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();return Parquet.read(inputFile).project(requiredSchema).split(start, length)// 指定可以创建BaseBatchReader的实现类的实例的方法.createBatchedReaderFunc(fileSchema ->VectorizedSparkParquetReaders.buildReader(requiredSchema, fileSchema, idToConstant, deleteFilter)).recordsPerBatch(batchSize).filter(residual).caseSensitive(caseSensitive())// Spark eagerly consumes the batches. So the underlying memory allocated could be reused// without worrying about subsequent reads clobbering over each other. This improves// read performance as every batch read doesn't have to pay the cost of allocating memory..reuseContainers().withNameMapping(nameMapping()).build();}
}
ColumnarBatchReader::new方法创建ColumnarBatchReader实例
VectorizedSparkParquetReaders.buildReader()
方法见第一大章节的简述。
public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {private final boolean hasIsDeletedColumn;private DeleteFilter<InternalRow> deletes = null;private long rowStartPosInBatch = 0;// 只有一个构造器,readers是保存了读取文件中每一个列(字段)的Reader,它们都是实现了VectorizedReader<T>接口的// VectorizedArrowReader<T>的实例public ColumnarBatchReader(List<VectorizedReader<?>> readers) {super(readers);// 遍历每一个字段的Reader类型,看看当前文件中是不是存在内置的列_deleted,它标识着当前当前行是不是被删除了。this.hasIsDeletedColumn =readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader);}
}
Parquet文件读取
通过前面的分析,知道对上层(Spark RDD)可见的接口,是由VectorizedParquetReader(一个Iterator的实现类)提供的,
它内部封装了对ColumnarBatchReader的操作。
VectorizedParquetReader::iterator方法,返回Parquet文件上的迭代器
public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {@Overridepublic CloseableIterator<T> iterator() {FileIterator<T> iter = new FileIterator<>(init());addCloseable(iter);return iter;}
}
FileIterator::next方法,读取数据
由于FilterIterator实现了JAVA中的Iterator接口,因此可以在compute Spark RDD时,通过这个迭代器,获取到文件中的内容,
也就是next()方法返回的ColumnarBatch对象。
/*** 这里T的类型为ColumnarBatch。*/private static class FileIterator<T> implements CloseableIterator<T> {public T next() {if (!hasNext()) {throw new NoSuchElementException();}if (valuesRead >= nextRowGroupStart) {// 第一次执行时,valuesRead == nextRowGroupStart,表示开始读取一个新的RowGroup// 这里调用advance()后,nextRowGroupStart指向了下一个要读取的RowGroup的起始位置,// 但当前的RowGroup是还没有被读取的,被延迟到了后面的过程。advance();}// batchSize is an integer, so casting to integer is safe// 读取当前RowGroup的数据,其中:// nextRowGroupStart指向的是下一个RowGroup的起始位置,// valuesRead的值表示一共读取了多少行// 这里必须有nextRowGroupStart >= nextRowGroupStart,而它们的差值就是当前RowGroup剩余的没有被读取的行int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize);// 读取指定数量的行,这里的model就是前面提到的ColumnarBatchReader的实例对象。if (reuseContainers) {this.last = model.read(last, numValuesToRead);} else {this.last = model.read(null, numValuesToRead);}// 累加读取的行数valuesRead += numValuesToRead;return last;}/*** 移动读取指针到下一个RowGroup的起始位置。*/private void advance() {while (shouldSkip[nextRowGroup]) {nextRowGroup += 1;reader.skipNextRowGroup();}PageReadStore pages;try {pages = reader.readNextRowGroup();} catch (IOException e) {throw new RuntimeIOException(e);}// 从绑定的RowGroups信息中,计算下一个RowGroup的起始位置long rowPosition = rowGroupsStartRowPos[nextRowGroup];model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition);nextRowGroupStart += pages.getRowCount();nextRowGroup += 1;}}
ColumnarBatchReader::read
public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {protected final VectorHolder[] vectorHolders;@Overridepublic final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {if (reuse == null) {// 如果指定了不复用当前的VectorHolder来存储数据时,就关闭它们closeVectors();}// 由内部类ColumnBatchLoader负责代理进行真正的读取操作。ColumnarBatch columnarBatch = new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();rowStartPosInBatch += numRowsToRead;return columnarBatch;}
}
ColumnBatchLoader::loadDataToColumnBatch读取数据,封装成ColumnarBatch对象
private class ColumnBatchLoader {// 读取的数据记录总数private final int numRowsToRead;// the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when// there is no deletesprivate int[] rowIdMapping;// the array to indicate if a row is deleted or not, it is null when there is no "_deleted"// metadata columnprivate boolean[] isDeleted;ColumnBatchLoader(int numRowsToRead) {Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);this.numRowsToRead = numRowsToRead;if (hasIsDeletedColumn) {isDeleted = new boolean[numRowsToRead];}}ColumnarBatch loadDataToColumnBatch() {// 对读取的数据记录进行过滤,得到未删除的数据记录总数int numRowsUndeleted = initRowIdMapping();// 以Arrows格式,读取每一列的数据,表示为Spark.ColumnVector类型ColumnVector[] arrowColumnVectors = readDataToColumnVectors();// 创建一个ColumnarBatch实例,包含所有存活的数据ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);newColumnarBatch.setNumRows(numRowsUndeleted);if (hasEqDeletes()) {// 如果有等值删除的文件存在,则还需要按值来过滤掉被删除的数据行// 由于基于等值删除的文件过滤数据时,需要知道每一行的实际值,因此只有将数据读取到内存中才知道哪一行要被删除掉applyEqDelete(newColumnarBatch);}if (hasIsDeletedColumn && rowIdMapping != null) {// 如果存在被删除的数据行,则需要重新分配行号,从0开始自然递增// reset the row id mapping array, so that it doesn't filter out the deleted rowsfor (int i = 0; i < numRowsToRead; i++) {rowIdMapping[i] = i;}newColumnarBatch.setNumRows(numRowsToRead);}// 返回return newColumnarBatch;}ColumnVector[] readDataToColumnVectors() {ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length];ColumnVectorBuilder columnVectorBuilder = new ColumnVectorBuilder();for (int i = 0; i < readers.length; i += 1) {vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);int numRowsInVector = vectorHolders[i].numValues();Preconditions.checkState(numRowsInVector == numRowsToRead,"Number of rows in the vector %s didn't match expected %s ",numRowsInVector,numRowsToRead);arrowColumnVectors[i] =columnVectorBuilder.withDeletedRows(rowIdMapping, isDeleted).build(vectorHolders[i], numRowsInVector);}return arrowColumnVectors;}boolean hasEqDeletes() {return deletes != null && deletes.hasEqDeletes();}int initRowIdMapping() {Pair<int[], Integer> posDeleteRowIdMapping = posDelRowIdMapping();if (posDeleteRowIdMapping != null) {rowIdMapping = posDeleteRowIdMapping.first();return posDeleteRowIdMapping.second();} else {rowIdMapping = initEqDeleteRowIdMapping();return numRowsToRead;}}/*** 如果当前文件包含 positions delete files,那么需要建立索引数据结构*/Pair<int[], Integer> posDelRowIdMapping() {if (deletes != null && deletes.hasPosDeletes()) {return buildPosDelRowIdMapping(deletes.deletedRowPositions());} else {return null;}}/*** Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we* delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the* row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position* delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6]* [F,F,T,F,F,F,T,F] -- After applying position deletes** @param deletedRowPositions a set of deleted row positions* @return the mapping array and the new num of rows in a batch, null if no row is deleted*/Pair<int[], Integer> buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) {if (deletedRowPositions == null) {return null;}// 为新读取的数据记录,创建一个数组,保存所有没有被删除的行号,从0开始// 基本算法:使用双指针,将所有未删除的行放到队列一端,且有序int[] posDelRowIdMapping = new int[numRowsToRead];int originalRowId = 0; // 指向待判定的行的下标int currentRowId = 0; // 存活行的下标while (originalRowId < numRowsToRead) {if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) {// 如果当前行没有被删除,则将其添加到currentRowId指向的位置posDelRowIdMapping[currentRowId] = originalRowId;// currentRowId指向下一个待插入的位置 currentRowId++;} else {if (hasIsDeletedColumn) {isDeleted[originalRowId] = true;}deletes.incrementDeleteCount();}originalRowId++;}if (currentRowId == numRowsToRead) {// there is no delete in this batchreturn null;} else {return Pair.of(posDelRowIdMapping, currentRowId);}}int[] initEqDeleteRowIdMapping() {int[] eqDeleteRowIdMapping = null;if (hasEqDeletes()) {eqDeleteRowIdMapping = new int[numRowsToRead];for (int i = 0; i < numRowsToRead; i++) {eqDeleteRowIdMapping[i] = i;}}return eqDeleteRowIdMapping;}/*** Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original* status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted* array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num* records to 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 <= x <=* 3 [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4]* [F,T,T,T,F,F,T,F] -- After applying equality deletes** @param columnarBatch the {@link ColumnarBatch} to apply the equality delete*/void applyEqDelete(ColumnarBatch columnarBatch) {// 对经过position deletes 过滤的数据行,进行按值删除Iterator<InternalRow> it = columnarBatch.rowIterator();int rowId = 0;int currentRowId = 0;while (it.hasNext()) { // 行式遍历InternalRow row = it.next();if (deletes.eqDeletedRowFilter().test(row)) {// the row is NOT deleted// skip deleted rows by pointing to the next undeleted row Id// 更新成员变量rowIdMappingrowIdMapping[currentRowId] = rowIdMapping[rowId];currentRowId++;} else {if (hasIsDeletedColumn) {isDeleted[rowIdMapping[rowId]] = true;}deletes.incrementDeleteCount();}rowId++;}// 更新最新的存活记录数columnarBatch.setNumRows(currentRowId);}}