读:数据管道中Schema变更的四种形状
目录
数据管道跑了几个月没问题,某天突然挂了。翻日志一看,报了 SchemaError ,原因是上游数据加了一列属性。
原文 来自 Polars 官方博客,讨论的就是这个问题:schema 变更不可避免,但不同存储格式应对变更的能力差异很大。原文以 Polars 为工具讲解具体参数,本文提取其中的通用原则,重点看不同格式在 schema 演化上的设计取舍。
四种形状
Schema 变更看似五花八门,归纳起来只有四种:
| 形状 | 例子 | 能否自动处理 |
|---|---|---|
| 新增列(Additive) | 上游加了一个 category 字段 |
可以,旧数据填 null |
| 缺失列(Subtractive) | 期望的列不在新数据里了 | 可以,新数据填 null |
| 类型漂移(Type drift) | Int32 变成 Int64 ,因为数据量大了整数范围不够用 |
部分可以,只支持无损拓宽 |
| 破坏性变更(Breaking) | 列被重命名、语义变了、类型不兼容 | 不行,必须手动处理 |
前三种有自动化的空间,第四种没有。遇到报错时,先判断是哪种形状,再找对应的处理方式。
顺便说一下"类型漂移只支持无损拓宽"这件事。 Int64 缩到 Int32 ,超过 2^31 的值就溢出了;浮点数缩窄精度也会丢失。所以自动处理只能往宽了走( Int32 → Int64 ),不能往窄了走。
四种格式的设计哲学
CSV:没有元数据,一切靠猜
CSV 文件不携带类型信息,读取时工具只能采样前若干行来推断类型。如果后面的行突然出现小数,而采样推断出来的是整数,就报错。
生产环境应该提前声明已知列的类型(告诉工具"这一列就是浮点数")。否则一旦推断错了就会酿成故障。
Parquet:文件级元数据,多文件时易出错
Parquet 是列式存储格式,同一列的数据连续存放。分析查询(对某列求和、求平均)只需读相关列,不用扫描整行。每个文件自带 schema 元数据(列名和类型),还附带统计信息(每列的最大值、最小值、记录数),查询引擎可以利用这些统计跳过不相关的文件。
单文件没有推断问题。问题出在"多个文件"上:多数工具(如 events_*.parquet )在扫描多文件时,以第一个文件的 schema 为基准,后续不一致就报错。
新增列时,可以声明一个超集 schema,让旧文件里没有的列填 null。类型漂移时,可以让整数自动拓宽。如果两种问题同时出现(列不同 + 类型不同),可以独立读取各文件再按列名对齐,类型取公共超类型。
Delta Lake:事务日志,显式声明演化
Delta Lake 在 Parquet 数据旁边维护一个 _delta_log/ 目录,里面是按序号命名的 JSON 文件( 00001.json 、 00002.json 、...),每次写入操作生成一个新的 JSON,记录这次加了哪些文件、删了哪些文件、schema 有没有变。定期还会合并为 Parquet 格式的 checkpoint 文件防止日志过长。这种设计让 Parquet 有了事务能力:多个写入者通过乐观并发控制协调(先提交者赢,其他人重试),读操作通过日志就知道该读哪些文件,不用扫描全部。
写入时默认严格匹配 schema,不一致就拒绝。要让 schema 演化,需要显式声明允许合并:新旧列的增减自动处理,旧数据自动填 null。重命名和类型不兼容仍然需要手动处理。和 Parquet 相比,Delta Lake 能通过事务日志回溯 schema 变更的历史,知道哪天加了列、哪天改了类型。
Iceberg:field ID 追踪
Iceberg 是四种格式中对 schema 演化支持最完整的。它的元数据是分层的:最上层是 metadata 文件,指向各个 snapshot(快照);每个 snapshot 指向一个 manifest list;manifest list 指向 manifest 文件;manifest 文件记录一组数据文件及其统计信息(记录数、文件大小、列的上下界)。这种分层结构让查询引擎在规划阶段就能跳过大量不相关的文件,不用真正读数据。底层数据文件可以是 Parquet、ORC 或 Avro 格式。
Schema 演化做得好,核心在于一个设计选择:用稳定的 field ID 而不是列名来追踪列。
其他格式都是按列名或位置匹配。列名改了,旧数据文件找不到对应的列;位置变了,列会对不上号。Iceberg 的做法是:每列有一个数字 ID(如 1、2、3),存储在 catalog(元数据服务,通常用 Hive Metastore、AWS Glue 或 REST catalog)里。列名改了,ID 不变。读的时候 catalog 知道 "ID=2 现在叫 category " ,自动映射。加列、删列、重命名,都不需要重写数据文件。
from pyiceberg.schema import Schema from pyiceberg.types import NestedField, LongType schema = Schema( NestedField(field_id=1, name="id", field_type=LongType()), NestedField(field_id=2, name="value", field_type=LongType()), )
注意 field_id 参数:这是 Iceberg 追踪列的标识符,和列名无关。之后通过 catalog 的 update_schema() 方法加列、删列、重命名,都是操作这个 ID 映射,不动数据文件。读取时,扫描器始终读 catalog 中的最新 snapshot(快照,某个时间点的完整表状态),自动处理列的增删和重命名,不需要任何额外参数。
对比与选择
CSV 四项全部手动,不单独列出。
| 需求 | Parquet | Delta Lake | Iceberg |
|---|---|---|---|
| 新增列 | 声明超集 schema | 自动(merge 模式) | 自动(field ID) |
| 缺失列 | 指定参数填充 null | 自动(merge 模式) | 自动(field ID) |
| 类型漂移 | 自动拓宽 | 手动 | 自动(无损拓宽) |
| 列重命名 | 手动 | 手动 | 自动(field ID 不变) |
| 额外设施 | 无 | 无 | 需要 catalog 服务 |
Iceberg 支持最完整,但需要额外搭建 catalog 服务。Delta Lake 不需要额外服务,文件系统就够了。Parquet 也没有额外依赖,但多个文件 schema 不一致时只能自己写代码处理。CSV 连列类型都没有,全靠工具猜。
你的 schema 变更频繁吗?如果频繁且需要自动化,Iceberg 的 catalog 开销就值。schema 稳定的话,Parquet 或 Delta Lake 就足够。如果连 schema 都还没有,那么先搞定 CSV 的类型推断再说吧。