はじめに
Snowflake には Delta Direct と呼ばれる、Delta Lake テーブルを Iceberg テーブルとして読み取る機能が存在します。
当記事では相互運用性の調査で検証した内容を記録します。
基本的な方式
を参考に検証していきます。
ポイントは以下です。
- 各クラウドのデータレイクを外部ボリュームとして構成
- Delta Lake 用のオブジェクトストレージ内のファイルのカタログ統合を作成
- Delta Lake を基にした Iceberg テーブルを作成
OneLake 上の Delta Lake を Snowflake on AWS から読み取る
セットアップ
-
Fabric リージョンを確認
-
AWS Snowflake であること確認
sqlSELECT CURRENT_REGION();
-
OneLake を対象にした外部ボリュームを作成
sqlCREATE EXTERNAL VOLUME ext_vol_onelake_sand STORAGE_LOCATIONS = ( ( NAME = 'ext_vol_onelake_sand_lh_opentableformat_for_volume' STORAGE_PROVIDER = 'AZURE' STORAGE_BASE_URL = 'azure://onelake.dfs.fabric.microsoft.com/<ワークスペース名>/<レイクハウス名>.Lakehouse/Files/<パス>' AZURE_TENANT_ID = '<テナントID>' ) );
-
権限付与と疎通確認
https://qiita.com/ryoma-nagata/items/44c165c8f44f73e0b3b4 などを参考に MS Entra ID サービスプリンシパルを対象の Fabic テナントに追加し、権限を付与します。viewer を利用する場合は、ファイルへのアクセス権をもたないので、対象のレイクハウスの ReadAll などの追加のファイルアクセス権限を付与します
疎通を確認
sqlSELECT SYSTEM$VERIFY_EXTERNAL_VOLUME('ext_vol_onelake_sand');
-
Delta Lake 用のオブジェクトストレージ内のファイルのカタログ統合を作成
sqlCREATE OR REPLACE CATALOG INTEGRATION delta_catalog_integration CATALOG_SOURCE = OBJECT_STORE TABLE_FORMAT = DELTA ENABLED = TRUE;
-
Iceberg テーブル作成と定義確認
sqlCREATE OR REPLACE ICEBERG TABLE public.iceberg_onelake_fact_sale CATALOG = delta_catalog_integration EXTERNAL_VOLUME = ext_vol_onelake_sand AUTO_REFRESH = TRUE BASE_LOCATION = 'fact_sale/';
変更反映の確認
-
変更前の件数を確認
-
Fabric 側で 1000 件 append
pysparkdf = spark.sql("SELECT * FROM LH_opentableformat.fact_sale LIMIT 1000") df.write.mode("append").saveAsTable("fact_sale") result = spark.sql("SELECT count(1) FROM LH_opentableformat.fact_sale ") display(result)
S3 上の Delta Lake を Snowflake on Azure から読み取る
セットアップ
-
Fabric notebook で書き込みします。元のデータは前述のサンプルを使いまわします。
pysparkdf = spark.sql("SELECT * FROM LH_opentableformat.fact_sale LIMIT 1000") s3_backet_name ="localdevdemo" access_key_id=KEYID accecc_key_secret=SECRET sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key_id) sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", accecc_key_secret) s3_route_path=f's3a://{s3_backet_name}' output_path = f"{s3_route_path}/for_volume/fact_sale" df.write.mode("overwrite").format("delta").save(output_path)
-
IAM ロールの準備
https://docs.snowflake.com/ja/user-guide/tables-iceberg-configure-external-volume-s3 を参考にして IAM などを設定します.Putなどは今回は不要ではあると思います。
ポリシー例:
json{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion", "s3:ListBucket", "s3:GetBucketLocation" ], "Resource": "*" } ]
-
Azure Snowflake であることを確認
sqlSELECT CURRENT_REGION();
-
外部ボリュームを作成し、Snowflake の ID を取得します。
sqlCREATE OR REPLACE EXTERNAL VOLUME ext_vol_aws_localdevdemo STORAGE_LOCATIONS = ( ( NAME = 'localdevdemo_for_volume' STORAGE_PROVIDER = 'S3' STORAGE_BASE_URL = 's3://<bucket名>/<パス>/' STORAGE_AWS_ROLE_ARN = '<ロールのARN>' STORAGE_AWS_EXTERNAL_ID = '<ロール作成時に指定した ID 文字列>' ) );
-
疎通確認
sqlSELECT SYSTEM$VERIFY_EXTERNAL_VOLUME('ext_vol_aws_localdevdemo');
-
Delta Lake 用のオブジェクトストレージ内のファイルのカタログ統合を作成
sqlCREATE OR REPLACE CATALOG INTEGRATION delta_catalog_integration CATALOG_SOURCE = OBJECT_STORE TABLE_FORMAT = DELTA ENABLED = TRUE;
-
Iceberg テーブル作成と定義確認
sqlCREATE OR REPLACE ICEBERG TABLE public.iceberg_s3_fact_sale CATALOG = delta_catalog_integration EXTERNAL_VOLUME = ext_vol_aws_localdevdemo AUTO_REFRESH = TRUE BASE_LOCATION = 'fact_sale/';