hudi相关疑问

标题

    • 1、flink流式写入hudi表时,Changelog模式和Append模式区别
      • Changelog 模式
      • Append 模式
      • 配置示例
        • 配置 Append 模式
        • 配置 Changelog 模式
      • 总结
    • 2、flink流式写入hudi表时,设置了Changelog模式,还需要设置write.operation参数吗
      • Changelog 模式下的 `write.operation` 参数
      • 详细配置步骤
      • 示例代码
      • 自定义反序列化 schema 示例
      • 解释
    • 3、upsert支持删除数据吗
      • Hudi `upsert` 操作支持删除
      • 配置和实现示例
      • 自定义反序列化 schema 示例
      • 解释
    • 4、changelog模式默认开启的吗
      • Changelog 模式和 Append 模式的区别
      • Changelog 模式默认开启吗?
      • 如何启用 Changelog 模式
        • 在 Flink SQL 中配置
        • 在 Flink DataStream API 中配置
      • 总结
    • 5、如果在flink实时写hudi时,只指定'hoodie.datasource.write.operation' = 'upsert'; 而不设置changelog或者append模式,这样是否可以
      • 仅设置 `write.operation` 为 `upsert`
      • Changelog 模式与 Append 模式
      • 不设置 `changelog` 或 `append` 模式
      • 具体配置示例
        • Flink SQL 配置
        • Flink DataStream API 配置
      • 总结
    • 6、hudi如果不开启Changelog模式,hudi的write.operation 设置为 upsert,是否支持删除数据

1、flink流式写入hudi表时,Changelog模式和Append模式区别

在 Flink 中,流式写入 Hudi 表时,可以使用不同的模式来处理数据变更(changelog)。主要的两种模式是 Changelog 模式和 Append 模式。它们在处理数据插入、更新、删除等操作时的行为有所不同。

Changelog 模式

Changelog 模式(变更日志模式)处理数据流中的变更记录。每条记录可以是插入、更新或删除操作。这种模式通常用于需要处理复杂变更逻辑的场景。

特点:

  1. 记录类型支持:支持 INSERT、UPDATE、DELETE 三种记录类型。
  2. 数据一致性:确保数据在多次更新和删除操作后的一致性。
  3. 适用场景:适用于有频繁更新和删除操作的数据流。

优点:

  • 可以高效处理更新和删除操作。
  • 适合需要保持最新数据状态的场景,如维表更新、实时数据修正等。

缺点:

  • 相对复杂的变更处理逻辑,可能会增加一定的性能开销。

Append 模式

Append 模式(追加模式)只处理插入操作,假设数据流中所有记录都是新的插入。这种模式适用于只追加数据的场景,而不考虑更新和删除操作。

特点:

  1. 记录类型支持:只支持 INSERT 记录类型。
  2. 数据写入:每条记录都被视为新的插入,不会覆盖或删除已有记录。
  3. 适用场景:适用于数据只追加、不需要更新或删除的场景。

优点:

  • 写入逻辑简单,性能较高。
  • 适合只需要追加数据的场景,如日志数据、传感器数据等。

缺点:

  • 无法处理数据的更新和删除操作,数据一致性较低。

配置示例

以下是如何在 Flink 中配置这两种模式的示例代码:

配置 Append 模式
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hudi.config.HoodieWriteConfig;public class AppendModeExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Configuration conf = new Configuration();conf.setString(HoodieWriteConfig.TABLE_NAME, "append_mode_table");conf.setBoolean("hoodie.datasource.write.insert.drop.duplicates", false);DataStream<Row> dataStream = ... // your data stream// 设置 Append 模式OptionsResolver.setAppendMode(conf, true);sinkHudi(conf, dataStream);env.execute("Flink Append Mode Example");}public static void sinkHudi(Configuration conf, DataStream<Row> dataStream) {DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, dataStream);Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);}
}
配置 Changelog 模式
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hudi.config.HoodieWriteConfig;public class ChangelogModeExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Configuration conf = new Configuration();conf.setString(HoodieWriteConfig.TABLE_NAME, "changelog_mode_table");conf.setBoolean("hoodie.datasource.write.insert.drop.duplicates", true);DataStream<Row> dataStream = ... // your data stream// 设置 Changelog 模式OptionsResolver.setAppendMode(conf, false);sinkHudi(conf, dataStream);env.execute("Flink Changelog Mode Example");}public static void sinkHudi(Configuration conf, DataStream<Row> dataStream) {DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, dataStream);Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);}
}

总结

  • Append 模式:只处理插入操作,适用于只追加数据的场景,写入性能较高,但不支持数据的更新和删除。
  • Changelog 模式:处理插入、更新和删除操作,适用于需要处理复杂数据变更的场景,能够确保数据的一致性。

根据具体的业务需求选择合适的模式,可以有效地提升数据处理效率和数据一致性。

2、flink流式写入hudi表时,设置了Changelog模式,还需要设置write.operation参数吗

在 Flink 流式写入 Hudi 表时,如果已经设置了 Changelog 模式,write.operation 参数的配置仍然是必要的。这是因为 write.operation 参数决定了 Hudi 如何处理和应用变更记录,例如插入、更新和删除操作。

Changelog 模式下的 write.operation 参数

在 Changelog 模式下,write.operation 参数一般设置为 upsert,因为 Changelog 模式需要处理数据的插入、更新和删除操作。upsert 操作可以确保新记录被插入,现有记录被更新。

以下是配置示例:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.hudi.config.HoodieWriteConfig;public class ChangelogModeExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Configuration conf = new Configuration();conf.setString(HoodieWriteConfig.TABLE_NAME, "changelog_mode_table");conf.setString("hoodie.datasource.write.operation", "upsert");conf.setString("hoodie.datasource.write.recordkey.field", "id"); // 根据具体表结构设置conf.setString("hoodie.datasource.write.precombine.field", "ts"); // 根据具体表结构设置// 其他必要配置// ...DataStream<Row> dataStream = ... // your data stream// 设置 Changelog 模式OptionsResolver.setAppendMode(conf, false);sinkHudi(conf, dataStream);env.execute("Flink Changelog Mode Example");}public static void sinkHudi(Configuration conf, DataStream<Row> dataStream) {DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, dataStream);Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);}
}

详细配置步骤

  1. 配置 Hudi 表的基本参数

    • hoodie.datasource.write.recordkey.field:主键字段。
    • hoodie.datasource.write.precombine.field:用于去重的字段(通常是时间戳)。
  2. 设置 write.operation 参数

    • 设为 upsert,以支持插入、更新和删除操作。
  3. 确保流式数据源的配置

    • 例如从 Kafka 读取数据,解析成 Row 类型。
  4. 数据流写入 Hudi

    • 使用 Pipelines.bootstrap 将数据流转换成 HoodieRecord 类型。
    • 使用 Pipelines.hoodieStreamWrite 将数据写入 Hudi。

示例代码

假设数据流来自 Kafka,处理成 Row 类型后写入 Hudi 表:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.hudi.config.HoodieWriteConfig;public class KafkaToHudiChangelog {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Configuration conf = new Configuration();conf.setString(HoodieWriteConfig.TABLE_NAME, "changelog_mode_table");conf.setString("hoodie.datasource.write.operation", "upsert");conf.setString("hoodie.datasource.write.recordkey.field", "id"); // 根据具体表结构设置conf.setString("hoodie.datasource.write.precombine.field", "ts"); // 根据具体表结构设置// 其他必要配置// ...DataStream<Row> kafkaDataStream = ... // 从 Kafka 读取并解析的数据流// 设置 Changelog 模式Options以下是如何在 Changelog 模式下设置 Flink 数据流写入 Hudi 表的完整示例,包括详细的代码和配置步骤:### 详细配置步骤1. **配置 Hudi 表的基本参数**- `hoodie.datasource.write.recordkey.field`:主键字段,例如 "id"- `hoodie.datasource.write.precombine.field`:用于去重的字段,例如 "ts"(时间戳)。2. **设置 `write.operation` 参数**- 设为 `upsert`,以支持插入、更新和删除操作。3. **确保流式数据源的配置**- 例如从 Kafka 读取数据,解析成 Row 类型。4. **数据流写入 Hudi**- 使用 `Pipelines.bootstrap` 将数据流转换成 HoodieRecord 类型。- 使用 `Pipelines.hoodieStreamWrite` 将数据写入 Hudi。### 示例代码假设数据流来自 Kafka,处理成 Row 类型后写入 Hudi 表:```java
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class KafkaToHudiChangelog {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// Kafka 消费者配置Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer_group");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 从 Kafka 读取数据FlinkKafkaConsumer<Row> kafkaConsumer = new FlinkKafkaConsumer<>("input_topic",new CustomRowDeserializationSchema(), // 自定义反序列化 schemaproperties);DataStream<Row> kafkaDataStream = env.addSource(kafkaConsumer);// Hudi 配置Configuration conf = new Configuration();conf.setString(HoodieWriteConfig.TABLE_NAME, "changelog_mode_table");conf.setString("hoodie.datasource.write.operation", "upsert");conf.setString("hoodie.datasource.write.recordkey.field", "id");conf.setString("hoodie.datasource.write.precombine.field", "ts");conf.setString("hoodie.datasource.write.keygenerator.class", ComplexKeyGenerator.class.getName());// 其他必要配置conf.setString("hoodie.datasource.write.table.type", "MERGE_ON_READ");conf.setString("hoodie.datasource.compaction.async.enable", "true");// 设置 Changelog 模式OptionsResolver.setAppendMode(conf, false);sinkHudi(conf, kafkaDataStream);env.execute("Flink Kafka to Hudi Changelog Mode Example");}public static void sinkHudi(Configuration conf, DataStream<Row> dataStream) {DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, dataStream);Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);}
}

自定义反序列化 schema 示例

import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.types.Row;import java.io.IOException;public class CustomRowDeserializationSchema extends AbstractDeserializationSchema<Row> {@Overridepublic Row deserialize(byte[] message) throws IOException {// 将消息反序列化为 Row 对象// 假设消息格式为 "id,ts,field1,field2,..."String[] fields = new String(message).split(",");Row row = new Row(fields.length);for (int i = 0; i < fields.length; i++) {row.setField(i, fields[i]);}return row;}@Overridepublic boolean isEndOfStream(Row nextElement) {return false;}@Overridepublic TypeInformation<Row> getProducedType() {return Types.ROW_NAMED(new String[]{"id", "ts", "field1", "field2"}, Types.STRING, Types.STRING, Types.STRING, Types.STRING);}
}

解释

  • Kafka 消费者配置:设置 Kafka 的连接和消费者组等配置。
  • 自定义反序列化 schema:将从 Kafka 读取的字节数组反序列化为 Flink 的 Row 类型。
  • Hudi 配置:设置 Hudi 表名、写操作类型(upsert)、记录键字段和预合并字段等参数。
  • 数据流处理:从 Kafka 读取数据,处理后写入 Hudi 表。

通过以上配置和代码,您可以确保在 Changelog 模式下正确处理数据流的插入、更新和删除操作,同时通过 write.operation 参数指定 Hudi 表的写入操作类型。

3、upsert支持删除数据吗

在 Hudi 中,upsert 操作确实支持删除数据。upsert 是 Hudi 中的一个重要写操作类型,它能够处理插入和更新操作,同时也支持删除记录。删除操作通过将特定记录的 _hoodie_is_deleted 字段标记为 true 来实现。

Hudi upsert 操作支持删除

在 Hudi 表中,如果数据流中包含需要删除的记录,这些记录通常会包含一个特殊的标志字段(如 _hoodie_is_deleted),当该字段被设置为 true 时,Hudi 会将这些记录标记为已删除。在 Changelog 模式下,Flink 可以捕获流中的删除操作,并将这些删除操作传递给 Hudi。

配置和实现示例

以下示例展示了如何配置和使用 Flink DataStream API 处理删除操作,并通过 upsert 将其写入 Hudi 表。

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class KafkaToHudiWithDelete {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// Kafka 消费者配置Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer_group");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 从 Kafka 读取数据FlinkKafkaConsumer<Row> kafkaConsumer = new FlinkKafkaConsumer<>("input_topic",new CustomRowDeserializationSchema(), // 自定义反序列化 schemaproperties);DataStream<Row> kafkaDataStream = env.addSource(kafkaConsumer);// Hudi 配置Configuration conf = new Configuration();conf.setString(HoodieWriteConfig.TABLE_NAME, "changelog_mode_table");conf.setString("hoodie.datasource.write.operation", "upsert");conf.setString("hoodie.datasource.write.recordkey.field", "id");conf.setString("hoodie.datasource.write.precombine.field", "ts");conf.setString("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator");// 其他必要配置conf.setString("hoodie.datasource.write.table.type", "MERGE_ON_READ");conf.setString("hoodie.datasource.compaction.async.enable", "true");// 设置 Changelog 模式OptionsResolver.setAppendMode(conf, false);sinkHudi(conf, kafkaDataStream);env.execute("Flink Kafka to Hudi with Delete Example");}public static void sinkHudi(Configuration conf, DataStream<Row> dataStream) {DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, dataStream);Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);}
}

自定义反序列化 schema 示例

import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.types.Row;import java.io.IOException;public class CustomRowDeserializationSchema extends AbstractDeserializationSchema<Row> {@Overridepublic Row deserialize(byte[] message) throws IOException {// 将消息反序列化为 Row 对象// 假设消息格式为 "id,ts,field1,field2,_hoodie_is_deleted"String[] fields = new String(message).split(",");Row row = new Row(fields.length);for (int i = 0; i < fields.length; i++) {if (i == fields.length - 1) { // 最后一列为 _hoodie_is_deletedrow.setField(i, Boolean.parseBoolean(fields[i]));} else {row.setField(i, fields[i]);}}return row;}@Overridepublic boolean isEndOfStream(Row nextElement) {return false;}@Overridepublic TypeInformation<Row> getProducedType() {return Types.ROW_NAMED(new String[]{"id", "ts", "field1", "field2", "_hoodie_is_deleted"},Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.BOOLEAN);}
}

解释

  • Kafka 消费者配置:设置 Kafka 的连接和消费者组等配置。
  • 自定义反序列化 schema:将从 Kafka 读取的字节数组反序列化为 Flink 的 Row 类型,并处理 _hoodie_is_deleted 字段。
  • Hudi 配置:设置 Hudi 表名、写操作类型(upsert)、记录键字段和预合并字段等参数。
  • 数据流处理:从 Kafka 读取数据,处理后写入 Hudi 表。

通过以上配置和代码,您可以确保在 Changelog 模式下正确处理数据流的插入、更新和删除操作,同时通过 upsert 操作将这些变更应用于 Hudi 表。

4、changelog模式默认开启的吗

在 Flink 中,使用 Hudi 作为 Sink 时,Changelog 模式和 Append 模式是两种不同的数据写入模式。这两种模式的使用取决于配置和业务需求。

Changelog 模式和 Append 模式的区别

  • Changelog 模式:适用于需要处理更新和删除操作的场景。每条记录都会包含其操作类型(插入、更新、删除),从而支持对数据的增量更新。
  • Append 模式:适用于纯插入场景,不处理更新和删除操作。数据只会以追加的方式写入,不会修改已存在的数据。

Changelog 模式默认开启吗?

Changelog 模式并不是默认开启的。默认情况下,Hudi 表的写入模式是 Append 模式。要使用 Changelog 模式,需要显式配置。

如何启用 Changelog 模式

要启用 Changelog 模式,您需要在 Flink 配置中进行如下设置:

  1. 配置写入操作类型:将 write.operation 设置为 upsertbulkinsert。这是因为 Changelog 模式需要支持更新和删除操作,而这些操作需要通过 upsertbulkinsert 来实现。

  2. 启用 Changelog 模式:可以通过 Flink 的配置来启用。下面是具体配置示例:

在 Flink SQL 中配置
SET 'hoodie.datasource.write.operation' = 'upsert';  -- 设置写操作类型为 upsert
SET 'hoodie.changelog.enabled' = 'true';  -- 启用 Changelog 模式
在 Flink DataStream API 中配置
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.config.HoodieWriteConfig;public class HudiSinkExample {public static void main(String[] args) {Configuration conf = new Configuration();// 设置写操作类型为 upsertconf.setString(HoodieWriteConfig.TABLE_NAME, "your_hudi_table");conf.setString("hoodie.datasource.write.operation", "upsert");// 启用 Changelog 模式conf.setBoolean("hoodie.changelog.enabled", true);// 其他配置...// 创建并配置流处理作业}
}

总结

Changelog 模式不是默认开启的。您需要通过配置显式启用它,以支持更新和删除操作。通过在 Flink SQL 或 DataStream API 中进行相应的配置,您可以灵活地控制写入 Hudi 表时的模式,满足不同的业务需求。

5、如果在flink实时写hudi时,只指定’hoodie.datasource.write.operation’ = ‘upsert’; 而不设置changelog或者append模式,这样是否可以

在 Flink 实时写 Hudi 的场景下,指定 'hoodie.datasource.write.operation' = 'upsert' 是可以的,但具体行为和模式的效果会有所不同。让我们详细讨论一下这几种配置组合的影响。

仅设置 write.operationupsert

设置 write.operationupsert 后,Hudi 会将输入数据作为更新(update)或插入(insert)操作处理。这意味着:

  • 如果输入数据的主键在目标 Hudi 表中已经存在,则进行更新操作。
  • 如果输入数据的主键在目标 Hudi 表中不存在,则进行插入操作。

这是最常见的配置,用于需要合并更新和新插入数据的场景。

Changelog 模式与 Append 模式

这两种模式的配置对数据处理的方式有显著影响:

  • Changelog 模式

    • 支持对数据的增量更新,包括插入、更新和删除操作。
    • 需要额外配置 hoodie.changelog.enabledtrue
  • Append 模式

    • 只处理插入操作,不处理更新和删除操作。
    • 需要额外配置 hoodie.append.modetrue

不设置 changelogappend 模式

如果仅指定 write.operationupsert,而不设置 changelogappend 模式,默认情况下:

  1. 更新和插入操作:数据会按照 upsert 方式处理,这意味着支持更新和插入。
  2. 删除操作:不会默认处理删除操作(除非其他配置明确支持)。

具体配置示例

以下是如何在 Flink SQL 和 DataStream API 中进行配置的示例:

Flink SQL 配置
SET 'hoodie.datasource.write.operation' = 'upsert';
-- 不设置 changelog 或 append 模式
Flink DataStream API 配置
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.config.HoodieWriteConfig;public class HudiSinkExample {public static void main(String[] args) {Configuration conf = new Configuration();// 设置写操作类型为 upsertconf.setString(HoodieWriteConfig.TABLE_NAME, "your_hudi_table");conf.setString("hoodie.datasource.write.operation", "upsert");// 不设置 changelog 或 append 模式// 其他配置...// 创建并配置流处理作业}
}

总结

仅设置 write.operationupsert 是可行的,并且这会让 Hudi 按照 upsert 逻辑处理数据(即支持插入和更新)。不过,如果您有特定的需求,比如处理删除操作或仅进行插入操作,则需要明确设置 changelog 模式或 append 模式。

6、hudi如果不开启Changelog模式,hudi的write.operation 设置为 upsert,是否支持删除数据

对于 Hudi(Hoodie), 如果不开启 Changelog 模式,但将 write.operation 设置为 upsert时,是否支持删除数据,这有以下几点说明:

  1. 在 Hudi 中, upsert 操作主要用于更新和插入数据,但不支持直接删除数据。

  2. 如果要删除数据,需要开启 Changelog 模式。Changelog 模式允许您记录数据的插入、更新和删除操作,从而支持对数据进行有效的删除。

  3. 如果未开启 Changelog 模式, upsert 操作只会更新或插入数据,不会删除任何数据。当您需要删除数据时,您需要手动从数据集中删除相应的行或分区。

  4. 另一种方式是使用 delete 操作,该操作可以在不开启 Changelog 模式的情况下直接删除数据。但这种方式可能会导致数据冗余,因为 Hudi 需要保留一些元数据以跟踪已删除的数据。

总之,如果您需要在不开启 Changelog 模式的情况下删除数据,则需要使用 delete 操作,而不是 upsert。建议您开启 Changelog 模式,以获得更灵活和全面的数据管理功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/839677.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python集合与字典的概念与使用-课堂练习[python123题库]

集合与字典的概念与使用-课堂练习 一、单项选择题 1、 哪个选项是下面代码的输出结果&#xff1f;‪‬‪‬‪‬‪‬‪‬‮‬‪‬‫‬‪‬‪‬‪‬‪‬‪‬‮‬‫‬‪‬‪‬‪‬‪‬‪‬‪‬‮‬‫‬‮‬‪‬‪‬‪‬‪‬‪‬‮‬‪‬‮‬‪‬‪‬‪‬‪‬‪‬‮‬‪‬‪‬‪‬…

【调试笔记-20240524-Linux-扩展 OpenWrt-23.05 发行版 EXT4 镜像文件大小】

调试笔记-系列文章目录 调试笔记-20240524-Linux-扩展 OpenWrt-23.05 发行版 EXT4 镜像文件大小 文章目录 调试笔记-系列文章目录调试笔记-20240524-Linux-扩展 OpenWrt-23.05 发行版 EXT4 镜像文件大小 前言一、调试环境操作系统&#xff1a;Ubuntu 22.04.4 LTS工作环境调试目…

在https的系统中挂载其他http系统的画面的解决方案

目录 1.问题及说明 2.解决方案及示例 3.总结 1.问题及说明 A系统使用了https&#xff0c;在A系统中挂载B系统的http的画面&#xff0c;会报错如下&#xff1a; Mixed Content: The page at https://beef.zz.com/front/#/biz/cultivationList/cultivationDetails/5dbf836751…

MySQL之架构设计与历史(五)

MySQL之架构设计与历史 MyISAM存储引擎 在MySQL5.1及之前的版本&#xff0c;MyISAM是默认的存储引擎。MyISAM是默认的存储引擎。MyISAM提供了大量的特性&#xff0c;包括全文索引、压缩、空间函数(GIS)等&#xff0c;但MyISAM不支持事务和行级锁&#xff0c;而且有一个毫无疑…

【SD-WAN】香港企业进入粤港澳大湾区所面临的机遇和挑战

粤港澳大湾区发展及规划是中国其中一个主点发展战略&#xff0c;具备完整的多元化产业结构&#xff0c;城市之间建立强大的经济互补性&#xff0c;是国际性湾区和世界级城市群。因此&#xff0c;大湾区近年吸引了不少香港的创新及科技企业前往发展投资及设立据点扩展业务。本文…

增强版 Kimi:AI 驱动的智能创作平台,实现一站式内容生成(图片、PPT、PDF)!

前言 基于扣子 Coze 零代码平台&#xff0c;我们从零到一轻松实现了专属 Bot 机器人的搭建。 AI 大模型&#xff08;LLM&#xff09;、智能体&#xff08;Agent&#xff09;、知识库、向量数据库、知识图谱&#xff0c;RAG&#xff0c;AGI 的不同形态愈发显现&#xff0c;如何…

SpringBoot中注解@RestController | @ResponseBody | @Controller

ResponseBody 可以修饰类和方法 Controller 和 RestController 只能修饰类 RestController 告诉Spring&#xff0c;帮我们管理这个代码&#xff0c;我们后续访问时&#xff0c;才能访问到 RequestMapping 路由映射&#xff0c;可以修饰方法&#xff0c;也可以修饰类 访问地址…

【代码随想录37期】Day16 二叉树的最大深度、二叉树的最小深度、完全二叉树的节点个数

二叉树的最大深度 v1.0:迭代法 /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}* TreeNode(int x) : val(x), left(nullptr), right…

【Linux网络编程】IO多种转接之Reactor

Reactor 点赞&#x1f44d;&#x1f44d;收藏&#x1f31f;&#x1f31f;关注&#x1f496;&#x1f496; 你的支持是对我最大的鼓励&#xff0c;我们一起努力吧!&#x1f603;&#x1f603; 基于上一篇epoll的学习&#xff0c;现在我们也知道epoll的工作模式有两种&#xff0c…

【UE5.1 多线程 异步】“Async Blueprints Extension”插件使用记录

目录 一、异步生成Actor示例 二、异步计算示例 参考视频 首先需要在商城中下载“Async Blueprints Extension”插件 一、异步生成Actor示例 2. 创建一个线程类&#xff0c;这里要指定父类为“LongAsyncTask”、“InfiniteAsyncTask”、“ShortAsyncTask”中的一个 在线程类…

el-table 实现嵌套表格的思路及完整功能代码

要实现的需求是这样的&#xff1a; 本来我是用 el-table 的 :span-method 方法实现的&#xff0c;但发现合并起来有问题&#xff0c;跟我的需求差距有些大&#xff0c;于是我想到了嵌套表格。但是嵌套完之后的样子也是很奇怪&#xff1a; 不要气馁&#xff0c;思路还是对的&a…

基于文心智能体平台打造专属情感类陪伴智能体【情绪价值提供者】

文章目录 一、文心智能体平台介绍二、文心智能体平台注册三、智能体介绍四、智能体创建过程4.1 基础配置4.2 高级配置4.3 预览调优4.4 公开发布 五、智能体使用心得六、智能体分享方式七、参考链接 一、文心智能体平台介绍 文心智能体平台是百度推出的基于文心大模型的智能体&…

计算机毕业设计 | springboot药品库存追踪与管理系统 药店管理(附源码)

1&#xff0c;绪论 1.1 背景调研 如今药品调价频繁&#xff0c;且品种繁多&#xff0c;增加了药品销售定价的难度。药品来货验收登记中的审查有效期环节容易出错&#xff0c;错收过期或有效期不足的药品。 手工模式下的药品库存难以及时掌握&#xff0c;虽然采取了每日进行缺…

Flask CORS: 解决跨域资源共享问题的利器

文章目录 安装和启用 CORS配置 CORS拓展 在本文中&#xff0c;我们介绍了如何使用 Flask-CORS 扩展来解决跨域问题。Flask-CORS 是一个方便的工具&#xff0c;可以帮助我们轻松地实现跨域资源共享支持。 安装和启用 CORS 要开始使用 Flask-CORS&#xff0c;我们需要先安装它。…

机器学习模型可视化分析和诊断神器Yellowbrick

大家好&#xff0c;机器学习(ML)作为人工智能的核心&#xff0c;近来得到巨大应用&#xff0c;ML是使计算机能够在无需显式编程的情况下进行学习和预测或决策。ML算法通过学习历史数据模式&#xff0c;来对新的未见数据做出明智的预测或决策。然而&#xff0c;构建和训练ML模型…

静态代理和动态代理

静态代理 代理接口 public interface Person {public String wakeUp(String name);public String lunch(String name); }被代理对象 public class Student implements Person{Overridepublic String wakeUp(String name) {System.out.println(name"星期一早上8点上班&a…

【C++风云录】数据处理新纪元:激光扫描数据处理

从libLAS到SPDLib&#xff1a;全面解读六大顶级数据处理库 前言 本文将探讨六个关于点云数据处理的库&#xff0c;包括libLAS、PDAL、PCL、LASlib、LAStools和SPDLib。每个库都将在简介、功能以及支持的格式等方面进行深入的解析和阐述。 欢迎订阅专栏&#xff1a;C风云录 文…

如何让程序适应ChatGPT:提升开发效率的指南

ChatGPT作为一种强大的AI助手&#xff0c;已经在各行各业中展现了其独特的价值。在软件开发领域&#xff0c;如何让程序更好地适应ChatGPT&#xff0c;从而提升开发效率&#xff0c;是一个值得深入探讨的话题。本文将介绍一些策略和方法&#xff0c;帮助开发者优化程序&#xf…

什么是 UUID,uuid

文章目录 一、是什么二、为什么三、怎么用 标题&#xff1a;深入探讨UUID&#xff1a;全球唯一标识符的秘密 一、是什么 在当今数字化时代&#xff0c;唯一标识符&#xff08;UUID&#xff09;在计算机科学领域扮演着重要的角色。UUID是一种用于标识信息的唯一字符串&#xff0…

工具-博客搭建

以下相关讲解均基于hexo github pages方案&#xff0c;请注意&#xff01;&#xff01;&#xff01;博客搭建方案选择 参考文章1 搭建教程 参考文章1 hexo github pages搭建过程中遇到的问题 删除categories、tags 1、删除含有需要删除categories、tags的文章 2、hexo …