Iceberg Changelog

01 Iceberg Changelog使用

0101 Flink使用

CREATE CATALOG hive_catalog WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://xxxx:19083','clientimecol'='5','property-version'='1','warehouse'='hdfs://nameservice/spark'
);use CATALOG hive_catalog;CREATE TABLE test2(
id BIGINT COMMENT 'unique id',
data STRING,
primary key(id) not ENFORCED
);
ALTER TABLE test2 SET('format-version'='2');SET table.exec.iceberg.use-flip27-source = true;SELECT * FROM test2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s')*/ ;

Iceberg流式更新目前只支持Append的数据,不支持更新删除

0102 Spark使用

Spark不是使用正常的业务SQL语句,而是在表后面加一个.changes

SELECT * FROM test2.changes ;1       2022-11-20      INSERT  0       3417478502276420225
5       2022-12-13      INSERT  16      8632592143956424357
1       2022-11-12      DELETE  4       8089565695180123096
1       2022-01-01      DELETE  13      7376527066811164734
1       2022-12-12      DELETE  8       1562898119085686311
7       2022-12-13      INSERT  18      7329729628749942729

02 Flink流式更新

Iceberg Flink FLIP-27实现当中介绍了流式更新时Iceberg相关的接口和流程,主要涉及分片相关的内容,就是获取文件列表的过程,流程中只读取了Append的数据文件

此章节结合FlinkSQL ChangeLog相关内容,看GenericRowData的RowKind产生过程

0201 初始化数据类型

参考kafka,追踪IcebergSourceRecordEmitter,发现没有做数据转换,直接做了数据转发

public void emitRecord(RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) {output.collect(element.record());split.updatePosition(element.fileOffset(), element.recordOffset());
}

数据格式的构建在更前面读数据的时候就完成了,读数据的核心逻辑在DataIterator

private void updateCurrentIterator() {try {while (!currentIterator.hasNext() && tasks.hasNext()) {currentIterator.close();currentIterator = openTaskIterator(tasks.next());fileOffset += 1;recordOffset = 0L;}} catch (IOException e) {throw new UncheckedIOException(e);}}
}

主要的功能类就是currentIterator,实现类为RowDataFileScanTaskReader,最终调用下一层iterator,下一层的实现类根据文件类型不同,parquet的实现类为ParquetReader,在next中读取数据

public T next() {if (valuesRead >= nextRowGroupStart) {advance();}if (reuseContainers) {this.last = model.read(last);} else {this.last = model.read(null);}valuesRead += 1;return last;
}

model实现类为ParquetValueReaders

public final T read(T reuse) {I intermediate = newStructData(reuse);for (int i = 0; i < readers.length; i += 1) {set(intermediate, i, readers[i].read(get(intermediate, i)));// setters[i].set(intermediate, i, get(intermediate, i));}return buildStruct(intermediate);
}

newStructData构建数据,创建了GenericRowData

protected GenericRowData newStructData(RowData reuse) {if (reuse instanceof GenericRowData) {return (GenericRowData) reuse;} else {return new GenericRowData(numFields);}
}

0202 支持更新类型

FlinkSQL ChangeLog中明确了数据源支持类型由ScanTableSource中定义,Iceberg的实现类是IcebergTableSource,支持支insert,因此不走Flink的ChangelogNormalize流程

public ChangelogMode getChangelogMode() {return ChangelogMode.insertOnly();
}

03 Spark Changelog

0301 SparkChangelogTable

Spark的changelog有一个专门的处理类SparkChangelogTable,根据第一节中的用法,它需要对应在表名后面加上.changes

public class SparkChangelogTable implements Table, SupportsRead, SupportsMetadataColumns {public static final String TABLE_NAME = "changes";

创建了一个Changelog的TableScan

public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {if (refreshEagerly) {icebergTable.refresh();}return new SparkScanBuilder(spark(), icebergTable, changelogSchema(), options) {@Overridepublic Scan build() {return buildChangelogScan();}};
}

buildChangelogScan当中创建了SparkChangelogScan

IncrementalChangelogScan scan =table.newIncrementalChangelogScan().caseSensitive(caseSensitive).filter(filterExpression()).project(expectedSchema);return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);

IncrementalChangelogScan是其中的核心,BaseTable提供了创建接口;BaseIncrementalChangelogScan是Spark调用的,IncrementalAppendScan是Flink调用的(参看Iceberg Flink FLIP-27)

public IncrementalAppendScan newIncrementalAppendScan() {return new BaseIncrementalAppendScan(this, schema(), ImmutableTableScanContext.builder().metricsReporter(reporter).build());
}@Override
public IncrementalChangelogScan newIncrementalChangelogScan() {return new BaseIncrementalChangelogScan(this);
}

0302 BaseIncrementalChangelogScan

略过其他调用流程,其构建changelog分片的核心是doPlanFiles接口,接口输入是起始和结尾的SnapshotId

protected CloseableIterable<ChangelogScanTask> doPlanFiles(Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
  • snapshot排序

首先是对Snapshot进行排序,基本处理就是遍历,遍历的过程中判断如果有delete类型的就异常

private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl) {Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {if (!snapshot.operation().equals(DataOperations.REPLACE)) {if (snapshot.deleteManifests(table().io()).size() > 0) {throw new UnsupportedOperationException("Delete files are currently not supported in changelog scans");}changelogSnapshots.addFirst(snapshot);}}return changelogSnapshots;
}

deleteManifests就是删除类型的Manifests,由ManifestFile的content字段决定

Types.NestedField MANIFEST_CONTENT =optional(517, "content", Types.IntegerType.get(), "Contents of the manifest: 0=data, 1=deletes");

insert into、overwrite、delete三种操作出来的ManifestFile的字段值都是0也就是data(这应该是读写模式的原因,默认是copy-on-write)

  • ManifestFile列表

之后根据snapshot范围获取ManifestFile的文件列表,这一步并没有什么特殊的地方

Set<ManifestFile> newDataManifests =FluentIterable.from(changelogSnapshots).transformAndConcat(snapshot -> snapshot.dataManifests(table().io())).filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())).toSet();
  • ManifestGroup

这一步也没有什么特殊的,就是根据文件分片的通用接口进行构建和扫描

ManifestGroup manifestGroup =new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()).specsById(table().specs()).caseSensitive(isCaseSensitive()).select(scanColumns()).filterData(filter()).filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())).ignoreExisting();return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));

0303 输出格式

输出结果如上,输出的字段由schema定义,在SparkChangelogTable当中

public StructType schema() {if (lazyTableSparkType == null) {this.lazyTableSparkType = SparkSchemaUtil.convert(changelogSchema());}return lazyTableSparkType;
}

最终到ChangelogUtil当中定义,就是数据字段和CHANGELOG_METADATA联合

public static Schema changelogSchema(Schema tableSchema) {return TypeUtil.join(tableSchema, CHANGELOG_METADATA);
}

CHANGELOG_METADATA的定义为

private static final Schema CHANGELOG_METADATA =new Schema(CHANGE_TYPE, CHANGE_ORDINAL, COMMIT_SNAPSHOT_ID);

各个字段的意义为

public static final NestedField CHANGE_TYPE =NestedField.required(Integer.MAX_VALUE - 104,"_change_type",Types.StringType.get(),"Record type in changelog");
public static final NestedField CHANGE_ORDINAL =NestedField.optional(Integer.MAX_VALUE - 105,"_change_ordinal",Types.IntegerType.get(),"Change ordinal in changelog");
public static final NestedField COMMIT_SNAPSHOT_ID =NestedField.optional(Integer.MAX_VALUE - 106,"_commit_snapshot_id",Types.LongType.get(),"Commit snapshot ID");

操作示例

insert into sampleSpark values (1, 'name1');
INSERT OVERWRITE sampleSpark values (1, 'name2');
delete from sampleSpark  where id=1;

结果示例

1       name2   INSERT  1       6345233160542557791
1       name2   DELETE  2       6627122686657309022
1       name1   DELETE  1       6345233160542557791
1       name1   INSERT  0       3035871833168296964

0304 SparkChangelogScan

文件扫描落在SparkChangelogScan,创建SparkBatch

public Batch toBatch() {return new SparkBatch(sparkContext,table,readConf,EMPTY_GROUPING_KEY_TYPE,taskGroups(),expectedSchema,hashCode());
}

SparkBatch当中创建PartitionReaderFactory

public PartitionReaderFactory createReaderFactory() {if (useParquetBatchReads()) {int batchSize = readConf.parquetBatchSize();return new SparkColumnarReaderFactory(batchSize);} else if (useOrcBatchReads()) {int batchSize = readConf.orcBatchSize();return new SparkColumnarReaderFactory(batchSize);} else {return new SparkRowReaderFactory();}
}

前两者都是要启用对应文件读取器的向量化功能,普通设置走最后的分支

// conditions for using Parquet batch reads:
// - Parquet vectorization is enabled
// - at least one column is projected
// - only primitives are projected
// - all tasks are of FileScanTask type and read only Parquet files
private boolean useParquetBatchReads() {return readConf.parquetVectorizationEnabled()&& expectedSchema.columns().size() > 0&& expectedSchema.columns().stream().allMatch(c -> c.type().isPrimitiveType())&& taskGroups.stream().allMatch(this::supportsParquetBatchReads);
}

在PartitionReaderFactory当中创建对应的PartitionReader,分支走changelog

} else if (partition.allTasksOfType(ChangelogScanTask.class)) {return new ChangelogRowReader(partition);}

0305 ChangelogRowReader

ChangelogRowReader是实际的读取类,由数据读取器BaseReader调用,对应就是下面的open调用

this.currentIterator.close();
this.currentTask = taskT;
this.currentIterator = open(currentTask);
return true;

数据上设置类型(insert、delete)就是在open当中设置的,这里出来的数据就是带上insert和delete的

protected CloseableIterator<InternalRow> open(ChangelogScanTask task) {JoinedRow cdcRow = new JoinedRow();cdcRow.withRight(changelogMetadata(task));CloseableIterable<InternalRow> rows = openChangelogScanTask(task);CloseableIterable<InternalRow> cdcRows = CloseableIterable.transform(rows, cdcRow::withLeft);return cdcRows.iterator();
}

具体的设置在cdcRow.withRight(changelogMetadata(task));这一行,changelogMetadata如下,类型由task.operation()确定

private static InternalRow changelogMetadata(ChangelogScanTask task) {InternalRow metadataRow = new GenericInternalRow(3);metadataRow.update(0, UTF8String.fromString(task.operation().name()));metadataRow.update(1, task.changeOrdinal());metadataRow.update(2, task.commitSnapshotId());return metadataRow;
}

根本上来说,入参ChangelogScanTask决定了数据是什么类型,有效的类型是AddedRowsScanTask、DeletedDataFileScanTask;对应insert和delete

private CloseableIterable<InternalRow> openChangelogScanTask(ChangelogScanTask task) {if (task instanceof AddedRowsScanTask) {return openAddedRowsScanTask((AddedRowsScanTask) task);} else if (task instanceof DeletedRowsScanTask) {throw new UnsupportedOperationException("Deleted rows scan task is not supported yet");} else if (task instanceof DeletedDataFileScanTask) {return openDeletedDataFileScanTask((DeletedDataFileScanTask) task);} else {throw new IllegalArgumentException("Unsupported changelog scan task type: " + task.getClass().getName());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/719946.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pytest多重断言插件-pytest-assume

最近准备废弃之前用metersphere做的接口自动化&#xff0c;转战pytest了&#xff0c;先来分享下最近接触到的一个插件&#xff1a;pytest-assume。 在使用这个插件之前&#xff0c;如果一个用例里面有多个断言的话&#xff0c;前面的断言失败了&#xff0c;就不会去执行后面的断…

flutter打包命令

apk Build and release an Android app | FlutterHow to prepare for and release an Android app to the Play store.https://docs.flutter.dev/deployment/android#reviewing-the-gradle-build-configuration flutter build apk --split-per-abi 如果不同cpu架构不打到同一个…

数组初始化(指定下标初始化)

在C语言中&#xff0c;在C99标准之后&#xff0c;就可以使用指定初始化器&#xff08;designated initializers&#xff09;来初始化数组或结构体中特定的元素。指定初始化器在初始化数组时允许显式地指定一个或多个索引及其相应的值&#xff0c;对于未显式初始化的元素&#x…

EI论文部分复现:含VSC-HVDC的交直流系统内点法最优潮流计算Simulink模型!

适用平台&#xff1a;MatlabSimulink&#xff1b;复现内容&#xff1a;VSC-HVDC模型 简介 高压直流传输系统主要包括换流站、输电线路和终端设备&#xff0c;其中换流站起着关键作用&#xff0c;他可以实现交流整流和直流逆变。常见的HVDC系统有全桥式、半桥式和两水平VSC等。…

C语言数组案例编程

1. 编写一个程序实现&#xff1a;从键盘输入15个整数存入数组&#xff0c;然后统计其中正整数的个数。 【要求】采用函数编程 #include<stdio.h> void input(int a[],int n) {int i; for(i0;i<n;i)scanf("%d",&a[i]); }int positiveNum(int a[],int n…

【Golang入门】简介与基本语法学习

概述&#xff1a; Golang&#xff0c;又称Go语言&#xff0c;是一种编译型、并发性强的编程语言&#xff0c;由Google公司的Robert Griesemer、Rob Pike及Ken Thompson于2007年开发。Go语言的设计初衷是为了解决多核处理器、网络系统和大规模代码库带来的开发困难。它提供了出色…

Vue 3 中如何使用 provide 和 inject 实现依赖注入?

在 Vue 3 中&#xff0c;provide 和 inject 是一对用于实现依赖注入的 API。它们提供了一种方式&#xff0c;让祖先组件能够向其所有子孙后代组件注入依赖&#xff0c;而无需通过 props 逐层传递。这在开发大型复杂应用时&#xff0c;尤其是当组件层级较深时&#xff0c;可以极…

【Kotlin学习路线】讲解

Kotlin学习路线 1. Kotlin介绍2. 入门阶段3. 进阶阶段4. 实战阶段5. 持续学习与专业提升 1. Kotlin介绍 Kotlin 是一种静态类型编程语言&#xff0c;运行在 Java 虚拟机上&#xff0c;并可以交互使用 Java 代码&#xff0c;它由 JetBrains 公司于2011年首次推出&#xff0c;后…

基于springboot+vue的社区智慧养老监护管理平台

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

Apache Calcite 快速入门指南

Apache Calcite 快速入门指南 参考地址&#xff1a;Apache Calcite 快速入门指南 - 知乎 Apache Calcite 是一个动态数据管理框架&#xff0c;提供了&#xff1a;SQL 解析、SQL 校验、SQL 查询优化、SQL 生成以及数据连接查询等典型数据库管理功能。Calcite 的目标是 One Size …

python第二十节闭包函数与装饰器

闭包函数与装饰器 闭包函数闭包的构成条件闭包如何理解 装饰器函数装饰器一函数装饰器二类装饰器一类装饰器二 闭包函数 闭包的构成条件 在函数嵌套&#xff08;函数里面再定义函数&#xff09;的前提下内部函数使用了外部函数的变量&#xff08;参数&#xff09;外部函数的返…

C++/Qt 元类型——枚举 序列化与反序列化

/*** brief 枚举转字符串* tparam T 枚举类型* param s 枚举类型变量* return 字符串*/ template <typename T> inline QString EnumToString(T s) {// T是枚举类型&#xff0c;编译判断static_assert(std::is_enum<T>::value, "T must be an enum type"…

Android logcat系统

一 .logcat命令介绍 android log系统: logcat介绍 : logcat是android中的一个命令行工具&#xff0c;可以用于得到程序的log信息. 二.C/Clogcat访问接口 Android系统中的C/C日志接口是通过宏来使用的。在system/core/include/android/log.h定义了日志的级别&#xff1a; /…

mysql binlog禁用

要禁用 MySQL 的二进制日志&#xff08;binlog&#xff09;&#xff0c;你可以通过以下步骤进行操作&#xff1a; 临时禁用&#xff1a; 如果你只是想临时禁用二进制日志&#xff0c;你可以使用以下 SQL 命令&#xff1a; sql SET sql_log_bin 0; 这只会影响当前的会话。当…

Linkedln领英账号限制问题|通过代理IP安全使用Linkedln

LinkedIn是跨境外贸必备的拓客工具&#xff0c;世界各地的许多专业人士都使用领英来作为发布和共享内容的主要工具&#xff0c;这使得它成为跨境出海必备的渠道工具。 但是不少做外贸的朋友都知道&#xff0c;领英账号很容易遭遇限制封禁&#xff0c;但如果善用工具&#xff0…

【数据集】ENSO-基于NOAA发布ONI值

NOAA-ONI&#xff08;Oceanic Nino Index&#xff09; ENSO划分标准&#xff1a; 当某ENSO年的指数值连续6个月大于0.5 ℃时&#xff0c;将该年归类为El Nio年当低于-0.5℃时&#xff0c;将其归类为La Nia年否则&#xff0c;年份为中性。 数据下载 注意&#xff1a;此页面将…

改造muduo,不依赖boost,用C++11重构

组件的实现 1. 序 1.1. 总述 muduo库是基于多Reactor-多线程模型实现的TCP网络编程库&#xff0c;性能良好。如libev作者&#xff1a;“One loop per thread is usually a good model”&#xff0c;muduo库的作者陈硕在其《Linux多线程服务端编程》中也力荐这种“One loop pe…

每日五道java面试题之mysql数据库篇(四)

目录&#xff1a; 第一题&#xff1a; Hash索引和B树所有有什么区别或者说优劣呢?第二题&#xff1a;数据库为什么使用B树而不是B树&#xff1f;第三题&#xff1a;B树在满足聚簇索引和覆盖索引的时候不需要回表查询数据&#xff1f;第四题&#xff1a;什么是聚簇索引&#xf…

Java解决比特位计数

Java解决比特位计数 01 题目 给定一个非负整数 n &#xff0c;请计算 0 到 n 之间的每个数字的二进制表示中 1 的个数&#xff0c;并输出一个数组。 示例 1: 输入: n 2 输出: [0,1,1] 解释: 0 --> 0 1 --> 1 2 --> 10示例 2: 输入: n 5 输出: [0,1,1,2,1,2] 解释:…

redis缓存注解使用

这里写自定义目录标题 一、引入依赖二、修改启动类和配置文件三、添加配置类四、缓存示例 一、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><optional>tru…