Iceberg数据湖初探-spark

环境配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
/spark/to/path/spark-sql \
--master yarn \
--deploy-mode client \
--conf spark.executor.instances=25 \
--conf spark.executor.memory=2g \
--conf spark.executor.cores=4 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hive \
--conf spark.sql.catalog.local.uri=thrift://hivemetastore:port \
--jars /path/to/libs/iceberg-spark-runtime-3.2_2.12-1.3.1.jar \
-f query.sql

DDL

  • Create table
1
2
3
4
5
6
7
8
9
CREATE TABLE local.db.table 
USING iceberg
OPTIONS ('path'='/path/to/table','format-version'='2') 
AS (SELECT * FROM db.hive_table limit 1);
alter table local.db.table drop COLUMN dt;
alter table local.db.table ADD COLUMN dt string FIRST;
alter table local.db.table ADD PARTITION FIELD dt; 
alter table local.db.table set TBLPROPERTIES ('write.distribution-mode'='range');
show create table local.db.table;
  • Replace table
1
2
REPLACE TABLE prod.db.sample USING iceberg AS
  SELECT ...
  • Drop table
1
DROP TABLE prod.db.sample [purge];
  • Alter table
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
ALTER TABLE prod.db.sample RENAME TO prod.db.new_name;
ALTER TABLE prod.db.sample SET TBLPROPERTIES (
    'read.split.target-size'='268435456'
);
ALTER TABLE prod.db.sample UNSET TBLPROPERTIES ('read.split.target-size');
ALTER TABLE prod.db.sample SET TBLPROPERTIES (
    'comment' = 'A table comment.'
);
ALTER TABLE prod.db.sample
ADD COLUMNS (
    new_column string comment 'new_column docs'
);
ALTER TABLE prod.db.sample
ADD COLUMN new_column bigint AFTER other_column;
ALTER TABLE prod.db.sample
ADD COLUMN nested.new_column bigint FIRST;
ALTER TABLE prod.db.sample RENAME COLUMN data TO payload;
ALTER TABLE prod.db.sample RENAME COLUMN location.lat TO latitude;
ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double;
ALTER TABLE prod.db.sample DROP COLUMN id;
ALTER TABLE prod.db.sample DROP COLUMN point.z;
ALTER TABLE prod.db.sample ADD PARTITION FIELD catalog; -- identity transform
ALTER TABLE prod.db.sample DROP PARTITION FIELD catalog;
ALTER TABLE prod.db.sample DROP PARTITION FIELD bucket(16, id);
ALTER TABLE prod.db.sample DROP PARTITION FIELD truncate(4, data);
ALTER TABLE prod.db.sample DROP PARTITION FIELD year(ts);
ALTER TABLE prod.db.sample DROP PARTITION FIELD shard;
ALTER TABLE prod.db.sample REPLACE PARTITION FIELD ts_day WITH day(ts);
-- use optional AS keyword to specify a custom name for the new partition field 
ALTER TABLE prod.db.sample REPLACE PARTITION FIELD ts_day WITH day(ts) AS day_of_ts;
ALTER TABLE prod.db.sample WRITE ORDERED BY category, id
-- use optional ASC/DEC keyword to specify sort order of each field (default ASC)
ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC, id DESC
-- use optional NULLS FIRST/NULLS LAST keyword to specify null order of each field (default FIRST)
ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST
INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category;
ALTER TABLE prod.db.sample WRITE LOCALLY ORDERED BY category, id;
ALTER TABLE prod.db.sample WRITE UNORDERED;
ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION;
ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id;
ALTER TABLE prod.db.sample SET IDENTIFIER FIELDS id
-- single column
ALTER TABLE prod.db.sample SET IDENTIFIER FIELDS id, data
-- multiple columns
ALTER TABLE prod.db.sample DROP IDENTIFIER FIELDS id
-- single column
ALTER TABLE prod.db.sample DROP IDENTIFIER FIELDS id, data
-- multiple columns
-- CREATE audit-branch at current snapshot with default retention.
ALTER TABLE prod.db.sample CREATE BRANCH `audit-branch`

-- CREATE audit-branch at current snapshot with default retention if it doesn't exist.
ALTER TABLE prod.db.sample CREATE BRANCH IF NOT EXISTS `audit-branch`

-- CREATE audit-branch at current snapshot with default retention or REPLACE it if it already exists.
ALTER TABLE prod.db.sample CREATE OR REPLACE BRANCH `audit-branch`
-- CREATE audit-branch at snapshot 1234 with default retention.
ALTER TABLE prod.db.sample CREATE BRANCH `audit-branch`
AS OF VERSION 1234

-- CREATE audit-branch at snapshot 1234, retain audit-branch for 31 days, and retain the latest 31 days. The latest 3 snapshot snapshots, and 2 days worth of snapshots. 
ALTER TABLE prod.db.sample CREATE BRANCH `audit-branch`
AS OF VERSION 1234 RETAIN 30 DAYS 
WITH SNAPSHOT RETENTION 3 SNAPSHOTS 2 DAYS

-- CREATE historical-tag at current snapshot with default retention.
ALTER TABLE prod.db.sample CREATE TAG `historical-tag`

-- CREATE historical-tag at current snapshot with default retention if it doesn't exist.
ALTER TABLE prod.db.sample CREATE TAG IF NOT EXISTS `historical-tag`

-- CREATE historical-tag at current snapshot with default retention or REPLACE it if it already exists.
ALTER TABLE prod.db.sample CREATE OR REPLACE TAG `historical-tag`
-- CREATE historical-tag at snapshot 1234 with default retention.
ALTER TABLE prod.db.sample CREATE TAG `historical-tag` AS OF VERSION 1234

-- CREATE historical-tag at snapshot 1234 and retain it for 1 year. 
ALTER TABLE prod.db.sample CREATE TAG `historical-tag` 
AS OF VERSION 1234 RETAIN 365 DAYS

-- REPLACE audit-branch to reference snapshot 4567 and update the retention to 60 days.
ALTER TABLE prod.db.sample REPLACE BRANCH `audit-branch`
AS OF VERSION 4567 RETAIN 60 DAYS

-- REPLACE historical-tag to reference snapshot 4567 and update the retention to 60 days.
ALTER TABLE prod.db.sample REPLACE TAG `historical-tag`
AS OF VERSION 4567 RETAIN 60 DAYS

Configuration

Procedures

1
CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1);
functionusageexample
rollback_to_snapshot
rollback_to_timestamp
set_current_snapshot
cherrypick_snapshot
publish_changes
fast_forward
expire_snapshots
remove_orphan_files
rewrite_data_files
rewrite_manifests
rewrite_position_delete_files
snapshot
migrate
add_files
register_table
ancestors_of
create_changelog_view

Query

  • query with sql

SELECT * FROM prod.db.table; SELECT * FROM prod.db.table.files;

  • Time Travel
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
-- time travel to October 26, 1986 at 01:21:00
SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';

-- time travel to snapshot with id 10963874102873L
SELECT * FROM prod.db.table VERSION AS OF 10963874102873;

-- time travel to the head snapshot of audit-branch
SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';

-- time travel to the snapshot referenced by the tag historical-snapshot
SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';

Writes

  • insert into
  • merge into
  • insert overwrite
  • delete from
  • update
Licensed under CC BY-NC-SA 4.0
Built with Hugo
Theme Stack designed by Jimmy