Apache Iceberg

本文翻译自:《Apache Iceberg: An Architectural Look Under the Covers》

原文链接: 点此跳转

Iceberg 架构

image-20230621094533615

Iceberg三层架构:

  1. Iceberg catalog
  2. metadata 层,包含metadata filemanifest listmanifest file
  3. data

image-20230621094919927

Iceberg catalog

对Iceberg的任何读写操作,首先是要找到当前元数据指针的位置(注意:current metadata pointer 不是官方术语,而是描述性术语)。

Iceberg catalog 的主要要求是它必须支持用于更新当前元数据指针的原子操作(例如:HDFS,Hive MetaStore)。这就是为什么Iceberg表上的事务是原子性的,并能够提供正确性保障。在Iceberg catalog中,每个表都有一个指向该表当前元数据文件的引用。

Iceberg catalog的组织形式:

  • 使用hdfs作为catalog,表的metadata 文件夹下有一个名为version-hint.text 的文件,其内容是当前元数据文件的版本号。
  • 使用Hive metastore作为catalog,hive metastore中的表条目有一个表属性,用于存储当前元数据文件的位置。

因此,当SELECT查询Iceberg表时,查询引擎首先话查询Iceberg catalog,然后检索他要读取的表的当前元数据文件的位置,然后打开该文件。

Metadata file

顾名思义 ,metadata file 存储当前表的元数据。包含相关表的schema,分区信息,快照列表以及当前快照信息。

image-20230621102411391

metadata file 完整内容:

v2.metadata.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
{
"format-version" : 1,
"table-uuid" : "4b96b6e8-9838-48df-a111-ec1ff6422816",
"location" : "/home/hadoop/warehouse/db2/part_table2",
"last-updated-ms" : 1611694436618,
"last-column-id" : 3,
"schema" : {
"type" : "struct",
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : true,
"type" : "int"
}, {
"id" : 2,
"name" : "ts",
"required" : false,
"type" : "timestamptz"
}, {
"id" : 3,
"name" : "message",
"required" : false,
"type" : "string"
} ]
},
"partition-spec" : [ {
"name" : "ts_hour",
"transform" : "hour",
"source-id" : 2,
"field-id" : 1000
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "ts_hour",
"transform" : "hour",
"source-id" : 2,
"field-id" : 1000
} ]
} ],
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"owner" : "hadoop"
},
"current-snapshot-id" : 1257424822184505371,
"snapshots" : [ {
"snapshot-id" : 8271497753230544300,
"timestamp-ms" : 1611694406483,
"summary" : {
"operation" : "append",
"spark.app.id" : "application_1611687743277_0002",
"added-data-files" : "1",
"added-records" : "1",
"added-files-size" : "960",
"changed-partition-count" : "1",
"total-records" : "1",
"total-data-files" : "1",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "/home/hadoop/warehouse/db2/part_table2/metadata/snap-8271497753230544300-1-d8a778f9-ad19-4e9c-88ff-28f49ec939fa.avro"
},
{
"snapshot-id" : 1257424822184505371,
"parent-snapshot-id" : 8271497753230544300,
"timestamp-ms" : 1611694436618,
"summary" : {
"operation" : "append",
"spark.app.id" : "application_1611687743277_0002",
"added-data-files" : "1",
"added-records" : "1",
"added-files-size" : "973",
"changed-partition-count" : "1",
"total-records" : "2",
"total-data-files" : "2",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "/home/hadoop/warehouse/db2/part_table2/metadata/snap-1257424822184505371-1-eab8490b-8d16-4eb1-ba9e-0dede788ff08.avro"
} ],
"snapshot-log" : [ {
"timestamp-ms" : 1611694406483,
"snapshot-id" : 8271497753230544300
},
{
"timestamp-ms" : 1611694436618,
"snapshot-id" : 1257424822184505371
} ],
"metadata-log" : [ {
"timestamp-ms" : 1611694097253,
"metadata-file" : "/home/hadoop/warehouse/db2/part_table2/metadata/v1.metadata.json"
},
{
"timestamp-ms" : 1611694406483,
"metadata-file" : "/home/hadoop/warehouse/db2/part_table2/metadata/v2.metadata.json"
} ]
}

当一个SELECT查询Iceber表时,先通过catalog中获取并打开当前元数据文件,然后查询引擎读取current-snapshot-id的值。然后,它使用该值在snapshots 中查找该快照的条目,然后检索该快照manifest-list 的值,并打开该位置指向的manifest-list

image-20230621103608786

Manifest list

manifest list保存了相关构成该快照的每个manifest file 的信息,如manifest file的位置、它被添加为哪个快照的一部分,以及关于manifest file 所属的分区以及manifest file 跟踪的数据文件的分区列的下限和上限的信息。

eg:snap-1257424822184505371-1-eab8490b-8d16-4eb1-ba9e-0dede788ff08.avro (转换成了json格式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
{
"manifest_path": "/home/hadoop/warehouse/db2/part_table2/metadata/eab8490b-8d16-4eb1-ba9e-0dede788ff08-m0.avro",
"manifest_length": 4884,
"partition_spec_id": 0,
"added_snapshot_id": {
"long": 1257424822184505300
},
"added_data_files_count": {
"int": 1
},
"existing_data_files_count": {
"int": 0
},
"deleted_data_files_count": {
"int": 0
},
"partitions": {
"array": [ {
"contains_null": false,
"lower_bound": {
"bytes": "¹Ô\\u0006\\u0000"
},
"upper_bound": {
"bytes": "¹Ô\\u0006\\u0000"
}
} ]
},
"added_rows_count": {
"long": 1
},
"existing_rows_count": {
"long": 0
},
"deleted_rows_count": {
"long": 0
}
}
{
"manifest_path": "/home/hadoop/warehouse/db2/part_table2/metadata/d8a778f9-ad19-4e9c-88ff-28f49ec939fa-m0.avro",
"manifest_length": 4884,
"partition_spec_id": 0,
"added_snapshot_id": {
"long": 8271497753230544000
},
"added_data_files_count": {
"int": 1
},
"existing_data_files_count": {
"int": 0
},
"deleted_data_files_count": {
"int": 0
},
"partitions": {
"array": [ {
"contains_null": false,
"lower_bound": {
"bytes": "¸Ô\\u0006\\u0000"
},
"upper_bound": {
"bytes": "¸Ô\\u0006\\u0000"
}
} ]
},
"added_rows_count": {
"long": 1
},
"existing_rows_count": {
"long": 0
},
"deleted_rows_count": {
"long": 0
}
}

当SELECT查询Iceberg表并在从元数据文件中获取快照位置后为快照打开清单列表时,查询引擎随后读取清单路径条目的值,并打开清单文件。它还可以在这个阶段进行一些优化,例如使用行计数或使用分区信息过滤数据

image-20230621105906769

Manifest file

manifest file 跟踪数据文件以及有关每个文件的其他详细信息和统计信息。

eg:eab8490b-8d16-4eb1-ba9e-0dede788ff08-m0.avro (转换成json)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
{
"status": 1,
"snapshot_id": {
"long": 1257424822184505300
},
"data_file": {
"file_path": "/home/hadoop/warehouse/db2/part_table2/data/ts_hour=2021-01-26-01/00000-6-7c6cf3c0-8090-4f15-a4cc-3a3a562eed7b-00001.parquet",
"file_format": "PARQUET",
"partition": {
"ts_hour": {
"int": 447673
}
},
"record_count": 1,
"file_size_in_bytes": 973,
"block_size_in_bytes": 67108864,
"column_sizes": {
"array": [ {
"key": 1,
"value": 47
},
{
"key": 2,
"value": 57
},
{
"key": 3,
"value": 60
} ]
},
"value_counts": {
"array": [ {
"key": 1,
"value": 1
},
{
"key": 2,
"value": 1
},
{
"key": 3,
"value": 1
} ]
},
"null_value_counts": {
"array": [ {
"key": 1,
"value": 0
},
{
"key": 2,
"value": 0
},
{
"key": 3,
"value": 0
} ]
},
"lower_bounds": {
"array": [ {
"key": 1,
"value": "\\u0002\\u0000\\u0000\\u0000"
},
{
"key": 2,
"value": "\\u0000„ ,ù\\u0005\\u0000"
},
{
"key": 3,
"value": "test message 2"
} ]
},
"upper_bounds": {
"array": [ {
"key": 1,
"value": "\\u0002\\u0000\\u0000\\u0000"
},
{
"key": 2,
"value": "\\u0000„ ,ù\\u0005\\u0000"
},
{
"key": 3,
"value": "test message 2"
} ]
},
"key_metadata": null,
"split_offsets": {
"array": [
4
]
}
}
}

当SELECT查询Iceberg表并在从清单列表中获取其位置后打开清单文件时,查询引擎然后读取每个数据文件对象的file-path条目的值,并打开数据文件。它还可以在这个阶段进行一些优化,例如使用行计数或使用分区或列统计信息过滤数据。

Iceberg CRUD

create table

1
2
3
4
5
6
7
8
CREATE TABLE table1 (
order_id BIGINT,
customer_id BIGINT,
order_amount DECIMAL(10, 2),
order_ts TIMESTAMP
)
USING iceberg
PARTITIONED BY ( HOUR(order_ts) );

执行此操作后,Iceberg环境如下所示:

image-20230621112452227

在上面,在数据库db1中创建了一个名为table1的表。该表有4列,并以order_ts timestamp列的小时粒度进行分区(稍后将详细介绍)。
当执行上面的查询时,在元数据层中创建了一个带有快照s0的元数据文件(快照s0不指向任何清单列表,因为表中还不存在数据)。然后更新db1.table1的当前元数据指针的目录条目,使其指向这个新的元数据文件的路径。

insert

现在向表中新增一条数据

1
2
3
4
5
6
INSERT INTO table1 VALUES (
123,
456,
36.17,
'2021-01-26 08:10:23'
);

image-20230621112741858

当执行此insert操作时,会发生如下过程:

  1. 首先会创建parquet格式的数据文件 — table1/data/order_ts_hour=2021-01-26-08/00000-5-cae2d.parquet
  2. 然后,创建指向该数据文件的清单文件(manifest file)—table1/metadata/d8f9-ad19-4e.avro
  3. 然后,创建指向此清单文件的清单列表(manifest-list)— table1/metadata/snap-2938-1-4103.avro
  4. 然后,基于先前的当前元数据创建新的元数据文件,新的元数据文件具有新的快照s1 以及保持对先前的快照s0 的跟踪,指向该清单列表 — table1/metadata/v2.metadata.json
  5. 然后,在catalog中自动更新db1.table1的当前元数据指针的值,使其指向这个新的元数据文件

在上述操作没完成之前,其他任何读取表的操作都不能看到当前表的状态和内容。

merge into/upsert

现在假设我们已经将一些数据放到我们创建的staging表中。在这个简单的实例中,每次订单发生变更时都会记录信息,并且我们希望保持此表显示每个订单的最新详细信息,因此如果订单id已经在表中,则更新订单金额,如果还没有该订单的记录,我们希望为这个新订单插入一条记录。在此示例中,staging表包括表中已经存在的订单的更新(order_id=123)和表中尚未存在的新订单,该更新发生在2021-01-27 10:21:46。

select

1
2
SELECT *
FROM db1.table1

image-20230621130847752

执行select语句,将发生一下过程:

  1. 首先查询引擎转到iceberg catalog
  2. 然后,开始检索db1.table1的当前元数据文件位置条目
  3. 然后,打开该元数据文件并检索当前快照s2manifest list位置
  4. 然后它打开这个manifest list ,检索唯一manifest file的位置
  5. 然后,打开manifest file,检索数据文件位置
  6. 最后读取这些数据文件,返回给client端