1.简介
Iceberg 的优势有以下几点:
1、增量数据更新:Iceberg 可以支持增量数据更新,而不是全量数据覆盖,从而减少了数据更新的时间和成本。
2、事务管理:Iceberg 支持事务管理,可以确保数据的一致性和可靠性。
3、版本管理:Iceberg 支持版本管理,可以方便地管理数据的历史版本,从而更好地追踪数据变化。
4、数据格式支持:Iceberg 支持多种数据格式,包括 Parquet、ORC、Avro 等,可以满足不同的数据处理需求。
5、跨存储介质支持:Iceberg 可以支持在多个存储介质上存储数据,包括 HDFS、S3、Azure Blob Storage 等。
Iceberg 最适合与 Spark、Hive、Presto 等大数据处理框架集成。这些框架都可以通过使用 Iceberg API 来访问和操作 Iceberg 存储的数据,从而获得 Iceberg 的优势和能力。同时,Iceberg 也可以与其他数据处理框架集成,如 Flink、Kafka 等,但需要通过一些额外的工作来实现。
iceberg和spark集成 功能是最全的,iceberg的很多功能在spark都能实现,但是其他引擎就不一定了
Spark与Iceberg的版本对应关系如下:
Spark版本 | Iceberg版本 |
---|---|
2.4 | 0.7.0-incubating – 1.1.0 |
3.0 | 0.9.0 – 1.0.0 |
3.1 | 0.12.0 – 1.1.0 |
3.2 | 0.13.0 – 1.1.0 |
3.3 | 0.14.0 – 1.1.0 |
2.环境准备
版本说明:
hadoop:3.1.3
hive:3.1.2
spark:3.3.1
iceberg:1.1.0
1、安装spark
tar -zxvf spark-3.3.1-bin-hadoop3.tgz -C /opt/module/ mv /opt/module/spark-3.3.1-bin-hadoop3 /opt/module/spark-3.3.1
配置环境变量
[root@hadoop-test01 ~]# vim /etc/profile #SPARK_HOME export SPARK_HOME=/opt/module/spark-3.3.1 export PATH=$PATH:$SPARK_HOME/bin [root@hadoop-test01 ~]# source /etc/profile
2、下载iceberg的jar包到spark的jars目录
wget https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.1.0/iceberg-spark-runtime-3.3_2.12-1.1.0.jar cp iceberg-spark-runtime-3.3_2.12-1.1.0.jar /opt/module/spark-3.3.1/jars
到此:iceberg和spark sql就集成完毕了
3.Catalog 配置
Spark中支持两种Catalog的设置:hive和hadoop,Hive Catalog就是Iceberg表存储使用Hive默认的数据路径,Hadoop Catalog需要指定Iceberg格式表存储路径。
]# vim conf/spark-defaults.conf spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hive_prod.type = hive spark.sql.catalog.hive_prod.uri = thrift://hadoop-test01:9083
需要同时将hive配置文件hive-site.xml 放到spark的conf目录下
2、Hadoop Catalog配置
]# vim conf/spark-defaults.conf spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hadoop_prod.type = hadoop spark.sql.catalog.hadoop_prod.warehouse = hdfs://hadoop-test01:8020/warehouse/spark-iceberg
说明:hive和Hadoop的Catalog 可以同时配置,使用的时候使用use进行切换即可
use hive_prod; use hadoop_prod;
配置完成之后就可以进入spark-sql 执行相关操作了
4.catalog和namespace使用
设置使用的catalog和namespace(就是hive数据库的意思)
spark-sql> use hive_prod.default; # 切换到指定的catalog和namespace spark-sql> show current namespace; # 显示当前的namespace hive_prod default spark-sql> show namespaces; # 显示所有的namespace default spark-sql> create database fblinux; # 也可以创建新的namespace spark-sql> show namespaces; default fblinux
5.创建表(常规)
1)创建普通表
创建普通表和sql基本一致,就是需要指定一下catalog和namespace(非必需)和使用USING设置为iceberg表
spark-sql> CREATE TABLE hive_prod.default.sample1 ( > id bigint COMMENT 'unique id', > data string) > USING iceberg; # 指定设置为iceberg表 spark-sql> desc sample1; id bigint unique id data string # Partitioning Not partitioned
建表的一些参数:
- PARTITIONED BY (partition-expressions) :配置分区
- LOCATION ‘(fully-qualified-uri)’ :指定表路径
- COMMENT ‘table documentation’ :配置表备注
- TBLPROPERTIES (‘key’=’value’, …) :配置表属性
表属性:https://iceberg.apache.org/docs/latest/configuration/
对Iceberg表的每次更改都会生成一个新的元数据文件(json文件)以提供原子性。默认情况下,旧元数据文件作为历史文件保存不会删除。
如果要自动清除元数据文件,在表属性中设置write.metadata.delete-after-commit.enabled=true。这将保留一些元数据文件(直到write.metadata.previous-versions-max),并在每个新创建的元数据文件之后删除旧的元数据文件。
2)创建分区表
iceberg 和hive有区别的是分区的字段已经在字段列表里了
spark-sql> CREATE TABLE hive_prod.default.sample2 ( > id bigint, > data string, > category string) > USING iceberg > PARTITIONED BY (category); spark-sql> desc sample2; id bigint data string category string # Partitioning Part 0 category Time taken: 0.036 seconds, Fetched 6 row(s) spark-sql> desc formatted sample2; id bigint data string category string # Partitioning Part 0 category # Metadata Columns _spec_id int _partition struct<category:string> _file string _pos bigint _deleted boolean # Detailed Table Information Name hive_prod.default.sample2 Location hdfs://hadoop-test01:8020/user/hive/warehouse/sample2 Provider iceberg Owner hadoop Table Properties [current-snapshot-id=none,format=iceberg/parquet,format-version=1]
3)创建隐藏分区表
隐藏分区是什么:指定分区字段可以做计算,然后分区的字段不用体现在建表语句当中,如下建表语句 id通过bucket函数分成16个桶,这个列并没有直接存在于字段列表(id分16桶的结果没有作为字段存在于字段列表),这种分区字段就是一种隐藏的分区,如果是hive表就需要将结果定义为字段显示出来。
spark-sql> CREATE TABLE hive_prod.default.sample3 ( > id bigint, > data string, > category string, > ts timestamp) > USING iceberg > PARTITIONED BY (bucket(16, id), days(ts), category); spark-sql> desc formatted sample3; id bigint data string category string ts timestamp # Partitioning Part 0 bucket(16, id) Part 1 days(ts) Part 2 category # Metadata Columns _spec_id int _partition struct<id_bucket:int,ts_day:date,category:string> _file string _pos bigint _deleted boolean # Detailed Table Information Name hive_prod.default.sample3 Location hdfs://hadoop-test01:8020/user/hive/warehouse/sample3 Provider iceberg Owner hadoop Table Properties [current-snapshot-id=none,format=iceberg/parquet,format-version=1]
支持的转换有:
- years(ts):按年划分
- months(ts):按月划分
- days(ts)或date(ts):等效于dateint分区
- hours(ts)或date_hour(ts):等效于dateint和hour分区
- bucket(N, col):按哈希值划分mod N个桶
- truncate(L, col):按截断为L的值划分
- 字符串被截断为给定的长度
整型和长型截断为bin: truncate(10, i)生成分区0,10,20,30,…
6.CTAS 语法建表
除了常规建表以外还可以使用CTAS建表,就是后面添加一个查询,这种语法iceberg是支持的
spark-sql> CREATE TABLE hive_prod.default.sample4 > USING iceberg > AS SELECT * from hive_prod.default.sample3; spark-sql> desc formatted sample4; id bigint data string category string ts timestamp # Partitioning Not partitioned # Metadata Columns _spec_id int _partition struct<> _file string _pos bigint _deleted boolean # Detailed Table Information Name hive_prod.default.sample4 Location hdfs://hadoop-test01:8020/user/hive/warehouse/sample4 Provider iceberg Owner hadoop Table Properties [current-snapshot-id=3746870716195160225,format=iceberg/parquet,format-version=1]
spark-sql> CREATE TABLE hive_prod.default.sample5 > USING iceberg > PARTITIONED BY (bucket(8, id), hours(ts), category) > TBLPROPERTIES ('key'='value') > AS SELECT * from hive_prod.default.sample3; spark-sql> desc formatted sample5; id bigint data string category string ts timestamp # Partitioning Part 0 bucket(8, id) Part 1 hours(ts) Part 2 category # Metadata Columns _spec_id int _partition struct<id_bucket:int,ts_hour:int,category:string> _file string _pos bigint _deleted boolean # Detailed Table Information Name hive_prod.default.sample5 Location hdfs://hadoop-test01:8020/user/hive/warehouse/sample5 Provider iceberg Owner hadoop Table Properties [current-snapshot-id=2956392194498691023,format=iceberg/parquet,format-version=1,key=value]
7.Replace table建表
REPLACE 相当于把表重构,必需使用已经存在的表,要是使用不存在的表会报错
spark-sql> REPLACE TABLE hive_prod.default.sample5 > USING iceberg > AS SELECT * from hive_prod.default.sample3; Time taken: 0.271 seconds spark-sql> desc formatted sample5; id bigint data string category string ts timestamp # Partitioning Not partitioned # Metadata Columns _spec_id int _partition struct<id_bucket:int,ts_hour:int,category:string> _file string _pos bigint _deleted boolean # Detailed Table Information Name hive_prod.default.sample5 Location hdfs://hadoop-test01:8020/user/hive/warehouse/sample5 Provider iceberg Owner hadoop Table Properties [current-snapshot-id=8333723822811548252,format=iceberg/parquet,format-version=1,key=value]
使用REPLACE的时候同样可以指定分区和表属性
spark-sql> REPLACE TABLE hive_prod.default.sample5 > USING iceberg > PARTITIONED BY (part) > TBLPROPERTIES ('key'='value') > AS SELECT * from hive_prod.default.sample3;
表不存在自动创建存在则自动替换使用下面这种方式:
spark-sql> CREATE OR REPLACE TABLE hive_prod.default.sample6 > USING iceberg > AS SELECT * from hive_prod.default.sample3;
8.删除表
对于HadoopCatalog而言:运行DROP TABLE将从catalog中删除表并删除表内容。
CREATE EXTERNAL TABLE hadoop_prod.default.sample7 ( id bigint COMMENT 'unique id', data string) USING iceberg INSERT INTO hadoop_prod.default.sample7 values(1,'a') DROP TABLE hadoop_prod.default.sample7
对于HiveCatalog而言:
- 在0.14之前,运行DROP TABLE将从catalog中删除表并删除表内容。
- 从0.14开始,DROP TABLE只会从catalog中删除表,不会删除数据。为了删除表内容,应该使用DROP table PURGE。
CREATE TABLE hive_prod.default.sample7 ( id bigint COMMENT 'unique id', data string) USING iceberg INSERT INTO hive_prod.default.sample7 values(1,'a')
删除表:
DROP TABLE hive_prod.default.sample7
删除表和数据:
DROP TABLE hive_prod.default.sample7 PURGE
9.修改表
Iceberg在Spark 3中完全支持ALTER TABLE,包括:
- 重命名表
- 设置或删除表属性
- 添加、删除和重命名列
- 添加、删除和重命名嵌套字段
- 重新排序顶级列和嵌套结构字段
- 扩大int、float和decimal字段的类型
- 将必选列变为可选列
此外,还可以使用SQL扩展来添加对分区演变的支持和设置表的写顺序。
spark-sql> CREATE TABLE hive_prod.fblinux.sample1 ( > id bigint COMMENT 'unique id', > data string) > USING iceberg;
不支持修改HadoopCatalog的表名
spark-sql> ALTER TABLE hive_prod.fblinux.sample1 RENAME TO hive_prod.fblinux.sample2; spark-sql> show tables; sample2
2)修改表属性
spark-sql> ALTER TABLE hive_prod.fblinux.sample1 SET TBLPROPERTIES ( > 'read.split.target-size'='268435456'); spark-sql> ALTER TABLE hive_prod.fblinux.sample1 SET TBLPROPERTIES ( > 'comment' = 'A table comment.'); spark-sql> describe formatted sample1; id bigint unique id data string # Partitioning Not partitioned # Metadata Columns _spec_id int _partition struct<> _file string _pos bigint _deleted boolean # Detailed Table Information Name hive_prod.fblinux.sample1 Comment A table comment. Location hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/sample1 Provider iceberg Owner hadoop Table Properties [current-snapshot-id=none,format=iceberg/parquet,format-version=1,read.split.target-size=268435456]
删除表属性:将SET改成UNSET然后跟上要删除的参数名称就可以了
ALTER TABLE hive_prod.fblinux.sample1 UNSET TBLPROPERTIES ('read.split.target-size');
3)添加列
spark-sql> ALTER TABLE hive_prod.fblinux.sample1 > ADD COLUMNS ( > category string comment 'new_column' > ); -- 添加struct类型的列 spark-sql> ALTER TABLE hive_prod.fblinux.sample1 > ADD COLUMN point struct<x: double, y: double>; -- 往struct类型的列中添加字段 spark-sql> ALTER TABLE hive_prod.fblinux.sample1 > ADD COLUMN point.z double; -- 创建struct的嵌套数组列 spark-sql> ALTER TABLE hive_prod.fblinux.sample1 > ADD COLUMN points array<struct<x: double, y: double>>; -- 在数组中的结构中添加一个字段。使用关键字'element'访问数组的元素列。 spark-sql> ALTER TABLE hive_prod.fblinux.sample1 > ADD COLUMN points.element.z double; -- 创建一个包含Map类型的列,key和value都为struct类型 spark-sql> ALTER TABLE hive_prod.fblinux.sample1 > ADD COLUMN pointsm map<struct<x: int>, struct<a: int>>; -- 在Map类型的value的struct中添加一个字段 spark-sql> ALTER TABLE hive_prod.fblinux.sample1 > ADD COLUMN pointsm.value.b int; -- 查看最终的结果 spark-sql> describe formatted sample1; id bigint unique id data string category string new_column point struct<x:double,y:double,z:double> points array<struct<x:double,y:double,z:double>> pointsm map<struct<x:int>,struct<a:int,b:int>> # Partitioning Not partitioned # Metadata Columns _spec_id int _partition struct<> _file string _pos bigint _deleted boolean # Detailed Table Information Name hive_prod.fblinux.sample1 Comment A table comment. Location hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/sample1 Provider iceberg Owner hadoop Table Properties [current-snapshot-id=none,format=iceberg/parquet,format-version=1]
在Spark 2.4.4及以后版本中,可以通过添加FIRST(添加在最前面)或AFTER(指定列的后面)子句在任何位置添加列:
ALTER TABLE hive_prod.fblinux.sample1 ADD COLUMN new_column1 bigint AFTER id; ALTER TABLE hive_prod.fblinux.sample1 ADD COLUMN new_column2 bigint FIRST;
4)修改列
-- 修改列名 spark-sql> ALTER TABLE hive_prod.fblinux.sample1 RENAME COLUMN data TO data1; -- Alter Column修改类型(只允许安全的转换,如int转bigint) ALTER TABLE hive_prod.fblinux.sample1 ADD COLUMNS ( idd int ); ALTER TABLE hive_prod.fblinux.sample1 ALTER COLUMN idd TYPE bigint; -- Alter Column 修改列的注释 ALTER TABLE hive_prod.fblinux.sample1 ALTER COLUMN id TYPE double COMMENT 'a'; ALTER TABLE hive_prod.fblinux.sample1 ALTER COLUMN id COMMENT 'b'; -- Alter Column修改列的顺序 ALTER TABLE hive_prod.fblinux.sample1 ALTER COLUMN id FIRST; ALTER TABLE hive_prod.fblinux.sample1 ALTER COLUMN new_column2 AFTER new_column1; -- Alter Column修改列是否允许为null ALTER TABLE hive_prod.fblinux.sample1 ALTER COLUMN id DROP NOT NULL;
ALTER COLUMN不用于更新struct类型。使用ADD COLUMN和DROP COLUMN添加或删除struct类型的字段。
5)删除列
spark-sql> ALTER TABLE hive_prod.fblinux.sample1 DROP COLUMN idd; spark-sql> ALTER TABLE hive_prod.fblinux.sample1 DROP COLUMN point.z;
10.修改分区
修改分区使用条件(1)spark3才支持,(2)需要配置扩展
1)添加分区
添加扩展配置(无法临时set 只能启动前配置)
# vim spark-default.conf spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
重新进入spark-sql shell:
-- 添加一个已有字段作为分区 ALTER TABLE hive_prod.fblinux.sample1 ADD PARTITION FIELD category; -- 添加隐藏分区,对id取模 ALTER TABLE hive_prod.fblinux.sample1 ADD PARTITION FIELD bucket(16, id); ALTER TABLE hive_prod.fblinux.sample1 ADD PARTITION FIELD truncate(data, 4); ALTER TABLE hive_prod.fblinux.sample1 ADD PARTITION FIELD years(ts); ALTER TABLE hive_prod.fblinux.sample1 ADD PARTITION FIELD bucket(16, id) AS shard;
2)删除分区
ALTER TABLE hive_prod.fblinux.sample1 DROP PARTITION FIELD category; ALTER TABLE hive_prod.fblinux.sample1 DROP PARTITION FIELD bucket(16, id); ALTER TABLE hive_prod.fblinux.sample1 DROP PARTITION FIELD truncate(data, 4); ALTER TABLE hive_prod.fblinux.sample1 DROP PARTITION FIELD years(ts); ALTER TABLE hive_prod.fblinux.sample1 DROP PARTITION FIELD shard;
注意,尽管删除了分区,但列仍然存在于表结构中。
删除分区字段是元数据操作,不会改变任何现有的表数据。新数据将被写入新的分区,但现有数据将保留在旧的分区布局中。
当分区发生变化时,动态分区覆盖行为也会发生变化。例如,如果按天划分分区,而改为按小时划分分区,那么覆盖将覆盖每小时划分的分区,而不再覆盖按天划分的分区。
删除分区字段时要小心,可能导致元数据查询失败或产生不同的结果。
3)修改分区:比如之前按天分区可以使用REPLACE转换为按小时分区
-- 将16个bucket修改为8个bucket ALTER TABLE hive_prod.fblinux.sample1 REPLACE PARTITION FIELD bucket(16, id) WITH bucket(8, id);
11.
ALTER TABLE hive_prod.fblinux.sample1 WRITE ORDERED BY category, id; -- 按照category进行升序写入,按照id进行降序写入 ALTER TABLE hive_prod.fblinux.sample1 WRITE ORDERED BY category ASC, id DESC; -- 按照category进行升序写入,要是有null值则放在最后面,按照id进行降序,要是有null值放在最前面 ALTER TABLE hive_prod.fblinux.sample1 WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST;
表写顺序不能保证查询的数据顺序。它只影响数据写入表的方式。
WRITE ORDERED BY设置了一个全局排序,即跨任务的行排序,就像在INSERT命令中使用ORDER BY一样:
INSERT INTO hive_prod.fblinux.sample1 SELECT id, data, category, ts FROM another_table ORDER BY ts, category;
要在每个任务内排序,而不是跨任务排序,使用local ORDERED BY:
ALTER TABLE hadoop_prod.default.sample1 WRITE LOCALLY ORDERED BY category, id;
12.插入数据
准备两张表:
CREATE TABLE hive_prod.fblinux.a ( id bigint, count bigint) USING iceberg; CREATE TABLE hive_prod.fblinux.b ( id bigint, count bigint, flag string) USING iceberg;
insert into 普通写入:
INSERT INTO hive_prod.fblinux.a VALUES (1, 1), (2, 2), (3, 3); INSERT INTO hive_prod.fblinux.b VALUES (1, 1, 'a'), (2, 2, 'b'), (4, 4, 'd');
merge into行级更新:它可以帮我们实现类似于join的效果,但是在很多场景下用MERGE INTO效率会更高一点,特别是在数仓当中有一个join维度的需求,用merge into可能是一个不错的选择。
MERGE INTO hive_prod.fblinux.a t -- 这里是结果表 USING (SELECT * FROM hive_prod.fblinux.b) u ON t.id = u.id -- USING 是关联表 ON 是关联条件 WHEN MATCHED AND u.flag='b' THEN UPDATE SET t.count = t.count + u.count -- WHEN MATCHED 是关联上了怎么处理,用THEN做更新和删除都可以 WHEN MATCHED AND u.flag='a' THEN DELETE WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count); -- WHEN NOT 是没有关联上怎么处理
13.查询数据
1)普通查询
SELECT count(1) as count, data FROM hive_prod.fblinux.a GROUP BY data;
2)查询元数据
-- 查询表快照 spark-sql> SELECT * FROM hive_prod.fblinux.a.snapshots; 2023-04-16 11:22:34.023 2292277810484862958 NULL append hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/a/metadata/snap-2292277810484862958-1-b642aa9b-b76a-4290-bb29-8eb6a880c817.avro {"added-data-files":"3","added-files-size":"2034","added-records":"3","changed-partition-count":"1","spark.app.id":"local-1681612655211","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2034","total-position-deletes":"0","total-records":"3"} 2023-04-16 11:23:21.716 936323692260036362 2292277810484862958 overwrite hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/a/metadata/snap-936323692260036362-1-69f8c74c-1daa-4e17-92b0-6810265ecc76.avro {"added-data-files":"1","added-files-size":"750","added-records":"2","changed-partition-count":"1","deleted-data-files":"2","deleted-records":"2","removed-files-size":"1356","spark.app.id":"local-1681612655211","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"1428","total-position-deletes":"0","total-records":"3"} Time taken: 0.241 seconds, Fetched 2 row(s) -- 查询数据文件信息 spark-sql> SELECT * FROM hive_prod.fblinux.a.files; 0 hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/a/data/00000-8-a8b23f72-b062-405c-baa8-a9b5dd8d7ff7-00001.parquet PARQUET 0 2 750 {1:54,2:92} {1:2,2:2} {1:0,2:0} {} {1:,2:} {1:,2:} NULL [4] NULL 0 0 hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/a/data/00002-2-e3b90f14-eb87-4a2f-a6d6-4289259c9823-00001.parquet PARQUET 0 1 678 {1:46,2:46} {1:1,2:1} {1:0,2:0} {} {1:,2:} {1:,2:} NULL [4] NULL 0 Time taken: 0.153 seconds, Fetched 2 row(s) -- 查询表历史 spark-sql> SELECT * FROM hive_prod.fblinux.a.history; 2023-04-16 11:22:34.023 2292277810484862958 NULL true 2023-04-16 11:23:21.716 936323692260036362 2292277810484862958 true Time taken: 0.089 seconds, Fetched 2 row(s) -- 查询 manifest spark-sql> SELECT * FROM hive_prod.fblinux.a.manifests; 0 hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/a/metadata/69f8c74c-1daa-4e17-92b0-6810265ecc76-m1.avro 5803 0 936323692260036362 1 0 0 0 0 0 [] 0 hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/a/metadata/69f8c74c-1daa-4e17-92b0-6810265ecc76-m0.avro 5908 0 936323692260036362 0 1 2 0 0 0 [] Time taken: 0.091 seconds, Fetched 2 row(s)
14.存储过程(高级管理功能)
spark的用法当中还支持一个存储过程,主要是一些高级管理功能,而不是传统的查询插入一些东西。Procedures可以通过CALL从任何已配置的Iceberg Catalog中使用。所有Procedures都在namespace中。
1)语法
CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1);
当按位置传递参数时,如果结束参数是可选的,则只有结束参数可以省略。
CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n);
2)快照管理
(1)回滚到指定的快照id
CALL hadoop_prod.system.rollback_to_snapshot('default.a', 7601163594701794741);
(2)回滚到指定时间的快照
CALL hadoop_prod.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000');
(3)设置表的当前快照ID
CALL hadoop_prod.system.set_current_snapshot('db.sample', 1);
(4)从快照变为当前表状态
CALL hadoop_prod.system.cherrypick_snapshot('default.a', 7629160535368763452); CALL hadoop_prod.system.cherrypick_snapshot(snapshot_id => 7629160535368763452, table => 'default.a' );
3)元数据管理
(1)删除早于指定日期和时间的快照,但保留最近100个快照:
CALL hive_prod.system.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100)
(2)删除Iceberg表中任何元数据文件中没有引用的文件
-- 列出所有需要删除的候选文件 CALL catalog_name.system.remove_orphan_files(table => 'db.sample', dry_run => true) -- 删除指定目录中db.sample表不知道的任何文件 CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data')
(3)合并数据文件(合并小文件)
CALL catalog_name.system.rewrite_data_files('db.sample') CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST') CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'zorder(c1,c2)') CALL catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2')) CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"')
(4)重写表清单来优化执行计划
CALL catalog_name.system.rewrite_manifests('db.sample') -- 重写表db中的清单。并禁用Spark缓存的使用。这样做可以避免执行程序上的内存问题。 CALL catalog_name.system.rewrite_manifests('db.sample', false)
4)迁移表
CALL catalog_name.system.snapshot('db.sample', 'db.snap') CALL catalog_name.system.snapshot('db.sample', 'db.snap', '/tmp/temptable/')
(2)迁移
CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo', 'bar')) CALL catalog_name.system.migrate('db.sample')
(3)添加数据文件
CALL spark_catalog.system.add_files( table => 'db.tbl', source_table => 'db.src_tbl', partition_filter => map('part_col_1', 'A') ) CALL spark_catalog.system.add_files( table => 'db.tbl', source_table => '`parquet`.`path/to/table`' )
5)元数据信息
(1)获取指定快照的父快照id
CALL spark_catalog.system.ancestors_of('db.tbl')
(2)获取指定快照的所有祖先快照
CALL spark_catalog.system.ancestors_of('db.tbl', 1) CALL spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl')
转载请注明:西门飞冰的博客 » Iceberg和spark集成笔记