0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks 上で Apache Iceberg テーブルを CLONE する機能の検証結果

Posted at

概要

Databricks 上で Apache Iceberg テーブルを CLONE する機能を検証した結果を共有します。Google Colab で作成したテーブルに対して以下の操作(INSERT、DELETE、UPDATE、MERGE、TRUNCATE TABLE)を行い、そのたびに Databricks で CLONE を実施し期待どおりに動くかを確認しました。
今回はストレージを Azure Blob Storage とし、Google Colab から Iceberg テーブルを作成し、その後 Databricks から何度も CLONE を行う手順を紹介します。

Iceberg テーブルを CLONE する場合には制約があることに注意してください。特に Merge-On-Read (MoR) により書き込まれている場合には動作しないようです。

更新、削除、またはマージが発生した Iceberg の読み取り時マージ テーブルを複製することはできません。

image.png

CoW と MoR については以下の記事を参照してください。

事前準備

Google Colab にて SparkSession の定義と Namespace の作成

# Azure Storage に関る情報をセット
azure_storage_account_name = "snowflakeicebergqiita"
azure_storage_account_key = "kKRn6nruDHEhUF0AQzM1PFlXtXs9V0BJQSrn6Z8GvJOsb0JHflV9Fn=="

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("iceberg_lab")
    .config(
        "spark.jars.packages",
        'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,'
        'org.apache.iceberg:iceberg-azure-bundle:1.5.2,'
        'org.apache.hadoop:hadoop-azure:3.4.1'
    )
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
    .config('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkCatalog')
    .config("spark.sql.catalog.spark_catalog.type", "hadoop")
    .config("spark.sql.catalog.spark_catalog.warehouse", f"wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone")
    .config(f'spark.hadoop.fs.azure.account.key.{azure_storage_account_name}.blob.core.windows.net', azure_storage_account_key)
    .getOrCreate()
)
spark

image.png

spark.sql("CREATE NAMESPACE IF NOT EXISTS default;")

image.png

Databricks にてデータベースを作成

%sql
CREATE CATALOG iceberg_clone_test;

Databricks にて

# Azure Storage に関る情報をセット
azure_storage_account_name = "snowflakeicebergqiita"
azure_storage_account_key = "kKRn6nruDHEhUF0AQzM1PFlXtXs9V0BJQSrn6Z8GvJOsb0JHflV9Fn=="
spark.conf.set(f"fs.azure.account.key.{azure_storage_account_name}.blob.core.windows.net", azure_storage_account_key)
spark.conf.set(f"fs.azure.account.key.{azure_storage_account_name}.dfs.core.windows.net", azure_storage_account_key)

image.png

本ステップにて Blob Storage エンドポイントと dfs エンドポイントに対する認証設定を実施している背景については下記の記事で紹介しています。

検証コードと結果

Google Colab にてテーブルを作成

spark.sql(f"""
CREATE OR REPLACE TABLE default.first_table(
  id    INT,
  name  STRING,
  status STRING
)
USING iceberg;
""")
spark.sql(f"SHOW TABLES;").show()

image.png

Databricks にて CLONE の実施

retund_df = spark.sql(f"""
CREATE OR REPLACE TABLE iceberg_clone_test.default.test_table_01
  CLONE iceberg.`wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone/default/first_table`;
""")
retund_df

image.png

CLONE の結果確認

%sql
DESC iceberg_clone_test.default.test_table_01;
%sql
DESC HISTORY iceberg_clone_test.default.test_table_01;
%sql
SELECT * FROM iceberg_clone_test.default.test_table_01 ORDER BY id;

image.png

INSERT 後の CLONE

Google Colab にてデータを INSERT

spark.sql("""
INSERT INTO
    default.first_table (id, name, status)
VALUES
    (1, 'Alice', 'active'),
    (2, 'Bob', 'active'),
    (3, 'Charlie', 'active'),
    (4, 'Daisy', 'active'),
    (5, 'Ethan', 'active');
""").show()
spark.sql("""
SELECT * FROM default.first_table ORDER BY id;
""").show()

Databricks にて CLONE

retund_df = spark.sql(f"""
CREATE OR REPLACE TABLE iceberg_clone_test.default.test_table_01
  CLONE iceberg.`wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone/default/first_table`;
""")
retund_df.display()

image.png

CLONE の結果確認
%sql
DESC HISTORY iceberg_clone_test.default.test_table_01;
%sql
SELECT * FROM iceberg_clone_test.default.test_table_01 ORDER BY id;

image.png

UPDATE 後の CLONE

Google Colab にて UPDATE

spark.sql("""
UPDATE default.first_table
    SET status = 'inactive' WHERE id IN (2, 3);
""").show()
spark.sql("""
SELECT * FROM default.first_table ORDER BY id;
""").show()

image.png

Databricks にて CLONE

retund_df = spark.sql(f"""
CREATE OR REPLACE TABLE iceberg_clone_test.default.test_table_01
  CLONE iceberg.`wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone/default/first_table`;
""")
retund_df.display()

image.png

CLONE の結果確認
%sql
DESC HISTORY iceberg_clone_test.default.test_table_01;
%sql
SELECT * FROM iceberg_clone_test.default.test_table_01 ORDER BY id;

image.png

DELETE 後の CLONE

Google Colab にて DELETE

spark.sql("""
DELETE FROM default.first_table
    WHERE id = 5;
""").show()
spark.sql("""
SELECT * FROM default.first_table ORDER BY id;
""").show()

image.png

Databricks にて CLONE

retund_df = spark.sql(f"""
CREATE OR REPLACE TABLE iceberg_clone_test.default.test_table_01
  CLONE iceberg.`wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone/default/first_table`;
""")
retund_df.display()

image.png

CLONE の結果確認
%sql
DESC HISTORY iceberg_clone_test.default.test_table_01;
%sql
SELECT * FROM iceberg_clone_test.default.test_table_01 ORDER BY id;

image.png

MERGE 後の CLONE

Google Colab にて MERGE

spark.sql("""
MERGE INTO default.first_table AS tgt
USING (
    SELECT 1 AS id, 'Alice_updated' AS name, 'inactive' AS status
    UNION ALL
    SELECT 6 AS id, 'Frank' AS name, 'active' AS status
) AS src
ON tgt.id = src.id
WHEN MATCHED THEN
    UPDATE SET tgt.name = src.name, tgt.status = src.status
WHEN NOT MATCHED THEN
    INSERT (id, name, status)
    VALUES (src.id, src.name, src.status);
""").show()
spark.sql("""
SELECT * FROM default.first_table ORDER BY id;
""").show()

image.png

Databricks にて CLONE

retund_df = spark.sql(f"""
CREATE OR REPLACE TABLE iceberg_clone_test.default.test_table_01
  CLONE iceberg.`wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone/default/first_table`;
""")
retund_df.display()

image.png

CLONE の結果確認
%sql
DESC iceberg_clone_test.default.test_table_01;
%sql
DESC HISTORY iceberg_clone_test.default.test_table_01;
%sql
SELECT * FROM iceberg_clone_test.default.test_table_01 ORDER BY id;

image.png

TRUNCATE TABLE 後の CLONE

Google Colab にて TRUNCATE TABLE

spark.sql("""
TRUNCATE TABLE default.first_table;
""").show()
spark.sql("""
SELECT * FROM default.first_table ORDER BY id;
""").show()

image.png

Databricks にて CLONE

retund_df = spark.sql(f"""
CREATE OR REPLACE TABLE iceberg_clone_test.default.test_table_01
  CLONE iceberg.`wasbs://external@{azure_storage_account_name}.blob.core.windows.net/iceberg_clone/default/first_table`;
""")
retund_df.display()

image.png

CLONE の結果確認
%sql
DESC HISTORY iceberg_clone_test.default.test_table_01;
%sql
SELECT * FROM iceberg_clone_test.default.test_table_01 ORDER BY id;

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?