概要
Databricks 上で Apache Iceberg テーブルを CLONE する機能を検証した結果を共有します。Google Colab で作成したテーブルに対して以下の操作(INSERT、DELETE、UPDATE、MERGE、TRUNCATE TABLE)を行い、そのたびに Databricks で CLONE を実施し期待どおりに動くかを確認しました。
今回はストレージを Azure Blob Storage とし、Google Colab から Iceberg テーブルを作成し、その後 Databricks から何度も CLONE を行う手順を紹介します。
Iceberg テーブルを CLONE する場合には制約があることに注意してください。特に Merge-On-Read (MoR) により書き込まれている場合には動作しないようです。
更新、削除、またはマージが発生した Iceberg の読み取り時マージ テーブルを複製することはできません。
CoW と MoR については以下の記事を参照してください。
事前準備
Google Colab にて SparkSession の定義と Namespace の作成
# Azure Storage に関る情報をセット
azure_storage_account_name = "snowflakeicebergqiita"
azure_storage_account_key = "kKRn6nruDHEhUF0AQzM1PFlXtXs9V0BJQSrn6Z8GvJOsb0JHflV9Fn=="
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("iceberg_lab")
.config(
"spark.jars.packages",
'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,'
'org.apache.iceberg:iceberg-azure-bundle:1.5.2,'
'org.apache.hadoop:hadoop-azure:3.4.1'
)
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
.config('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkCatalog')
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", f"wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone")
.config(f'spark.hadoop.fs.azure.account.key.{azure_storage_account_name}.blob.core.windows.net', azure_storage_account_key)
.getOrCreate()
)
spark
spark.sql("CREATE NAMESPACE IF NOT EXISTS default;")
Databricks にてデータベースを作成
%sql
CREATE CATALOG iceberg_clone_test;
Databricks にて
# Azure Storage に関る情報をセット
azure_storage_account_name = "snowflakeicebergqiita"
azure_storage_account_key = "kKRn6nruDHEhUF0AQzM1PFlXtXs9V0BJQSrn6Z8GvJOsb0JHflV9Fn=="
spark.conf.set(f"fs.azure.account.key.{azure_storage_account_name}.blob.core.windows.net", azure_storage_account_key)
spark.conf.set(f"fs.azure.account.key.{azure_storage_account_name}.dfs.core.windows.net", azure_storage_account_key)
本ステップにて Blob Storage エンドポイントと dfs エンドポイントに対する認証設定を実施している背景については下記の記事で紹介しています。
検証コードと結果
Google Colab にてテーブルを作成
spark.sql(f"""
CREATE OR REPLACE TABLE default.first_table(
id INT,
name STRING,
status STRING
)
USING iceberg;
""")
spark.sql(f"SHOW TABLES;").show()
Databricks にて CLONE の実施
retund_df = spark.sql(f"""
CREATE OR REPLACE TABLE iceberg_clone_test.default.test_table_01
CLONE iceberg.`wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone/default/first_table`;
""")
retund_df
CLONE の結果確認
%sql
DESC iceberg_clone_test.default.test_table_01;
%sql
DESC HISTORY iceberg_clone_test.default.test_table_01;
%sql
SELECT * FROM iceberg_clone_test.default.test_table_01 ORDER BY id;
INSERT 後の CLONE
Google Colab にてデータを INSERT
spark.sql("""
INSERT INTO
default.first_table (id, name, status)
VALUES
(1, 'Alice', 'active'),
(2, 'Bob', 'active'),
(3, 'Charlie', 'active'),
(4, 'Daisy', 'active'),
(5, 'Ethan', 'active');
""").show()
spark.sql("""
SELECT * FROM default.first_table ORDER BY id;
""").show()
Databricks にて CLONE
retund_df = spark.sql(f"""
CREATE OR REPLACE TABLE iceberg_clone_test.default.test_table_01
CLONE iceberg.`wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone/default/first_table`;
""")
retund_df.display()
CLONE の結果確認
%sql
DESC HISTORY iceberg_clone_test.default.test_table_01;
%sql
SELECT * FROM iceberg_clone_test.default.test_table_01 ORDER BY id;
UPDATE 後の CLONE
Google Colab にて UPDATE
spark.sql("""
UPDATE default.first_table
SET status = 'inactive' WHERE id IN (2, 3);
""").show()
spark.sql("""
SELECT * FROM default.first_table ORDER BY id;
""").show()
Databricks にて CLONE
retund_df = spark.sql(f"""
CREATE OR REPLACE TABLE iceberg_clone_test.default.test_table_01
CLONE iceberg.`wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone/default/first_table`;
""")
retund_df.display()
CLONE の結果確認
%sql
DESC HISTORY iceberg_clone_test.default.test_table_01;
%sql
SELECT * FROM iceberg_clone_test.default.test_table_01 ORDER BY id;
DELETE 後の CLONE
Google Colab にて DELETE
spark.sql("""
DELETE FROM default.first_table
WHERE id = 5;
""").show()
spark.sql("""
SELECT * FROM default.first_table ORDER BY id;
""").show()
Databricks にて CLONE
retund_df = spark.sql(f"""
CREATE OR REPLACE TABLE iceberg_clone_test.default.test_table_01
CLONE iceberg.`wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone/default/first_table`;
""")
retund_df.display()
CLONE の結果確認
%sql
DESC HISTORY iceberg_clone_test.default.test_table_01;
%sql
SELECT * FROM iceberg_clone_test.default.test_table_01 ORDER BY id;
MERGE 後の CLONE
Google Colab にて MERGE
spark.sql("""
MERGE INTO default.first_table AS tgt
USING (
SELECT 1 AS id, 'Alice_updated' AS name, 'inactive' AS status
UNION ALL
SELECT 6 AS id, 'Frank' AS name, 'active' AS status
) AS src
ON tgt.id = src.id
WHEN MATCHED THEN
UPDATE SET tgt.name = src.name, tgt.status = src.status
WHEN NOT MATCHED THEN
INSERT (id, name, status)
VALUES (src.id, src.name, src.status);
""").show()
spark.sql("""
SELECT * FROM default.first_table ORDER BY id;
""").show()
Databricks にて CLONE
retund_df = spark.sql(f"""
CREATE OR REPLACE TABLE iceberg_clone_test.default.test_table_01
CLONE iceberg.`wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone/default/first_table`;
""")
retund_df.display()
CLONE の結果確認
%sql
DESC iceberg_clone_test.default.test_table_01;
%sql
DESC HISTORY iceberg_clone_test.default.test_table_01;
%sql
SELECT * FROM iceberg_clone_test.default.test_table_01 ORDER BY id;
TRUNCATE TABLE 後の CLONE
Google Colab にて TRUNCATE TABLE
spark.sql("""
TRUNCATE TABLE default.first_table;
""").show()
spark.sql("""
SELECT * FROM default.first_table ORDER BY id;
""").show()
Databricks にて CLONE
retund_df = spark.sql(f"""
CREATE OR REPLACE TABLE iceberg_clone_test.default.test_table_01
CLONE iceberg.`wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone/default/first_table`;
""")
retund_df.display()
CLONE の結果確認
%sql
DESC HISTORY iceberg_clone_test.default.test_table_01;
%sql
SELECT * FROM iceberg_clone_test.default.test_table_01 ORDER BY id;