Flink状态数据结构升级
1. 升级状态数据结构
为了对给定的状态类型进行升级,你需要采取以下几个步骤:
- 对 Flink 流作业进行 savepoint 操作。
- 升级程序中的状态类型(例如:修改你的 Avro 结构)。
- 从 savepoint 恢复作业。当第一次访问状态数据时,Flink 会判断状态数据 schema 是否已经改变,并进行必要的迁移。
- 用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。 Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有, 那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。
2. 数据结构升级支持的数据类型
目前,仅支持 POJO 和 Avro 类型的 schema 升级 因此,如果你比较关注于状态数据结构的升级,那么目前来看强烈推荐使用 Pojo 或者 Avro 状态数据类型。
POJO 类型 #
Flink 基于下面的规则来支持 POJO 类型结构的升级:
- 可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。
- 可以添加字段。新字段会使用类型对应的默认值进行初始化,比如 Java 类型。
- 不可以修改字段的声明类型。
- 不可以改变 POJO 类型的类名,包括类的命名空间。
- 需要注意,只有从 1.8.0 及以上版本的 Flink 生产的 savepoint 进行恢复时,POJO 类型的状态才可以进行升级。 对 1.8.0 版本之前的 Flink 是没有办法进行 POJO 类型升级的。
Avro 类型 #
Flink 完全支持 Avro 状态类型的升级,只要数据结构的修改是被 Avro 的数据结构解析规则认为兼容的即可。
一个例外是如果新的 Avro 数据 schema 生成的类无法被重定位或者使用了不同的命名空间,在作业恢复时状态数据会被认为是不兼容的。
摘自Flink1.19官方原文:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/