はじめに
Databricks にて Hadoop Catalog の Apache Iceberg をさわってみた を参考に Fabric の Spark ノートブックで Iceberg テーブルを生成してみます。
環境セットアップ
レイクハウスの作成と abfss パスの特定
レイクハウスと iceberg 用のディレクトリを作成し、abfss パスを取得します。
このディレクトリ内に iceberg テーブルを生成することになります。
Spark環境の構成
spark ノートブック用の環境を作成。ランタイムバージョンは 1.3( spark 3.5) です。
Iceberg-spark-runtimeのjar ファイルをダウンロードして、環境にアップします。
次に、 https://iceberg.apache.org/docs/latest/spark-configuration/#catalog-configuration にしたがって
Iceberg also supports a directory-based catalog in HDFS that can be configured using type=hadoop:
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path
と、必要に応じて SQL Extensions
spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
も設定します。
spark.sql.catalog.hadoop_prod.warehouse には最初に取得したabfss パスを設定してください。
spark 上で iceberg を操作する分には Files/・・・などの相対パスでも可能ですが、Iceberg ショートカット では絶対パスでのファイル参照でメタデータが構成される必要があるため、abfss パスを使用します。
spark 環境を発行します。
テーブル操作
テーブル作成とデータ登録
対象のレイクハウスにアタッチされたノートブックを作成し、
環境を先ほど作成したものに設定します。
pyspark での既定のカタログへの書き込み
spark でテーブルを作成してみます。
from pyspark.sql.types import *
table_name = "people"
records = [
(1, 'John', 25, 'NYC', '2023-09-28 00:00:00'),
(2, 'Emily', 30, 'SFO', '2023-09-28 00:00:00'),
(3, 'Michael', 35, 'ORD', '2023-09-28 00:00:00'),
(4, 'Andrew', 40, 'NYC', '2023-10-28 00:00:00'),
(5, 'Bob', 28, 'SEA', '2023-09-23 00:00:00'),
(6, 'Charlie', 31, 'DFW', '2023-08-29 00:00:00')
]
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True),
StructField("create_ts", StringType(), True)
])
df = spark.createDataFrame(records, schema)
(
df.write
.format("iceberg")
.saveAsTable(table_name)
)
iceberg フォーマットのディレクトリが生成されました。
テーブルは既定では、<レイクハウスの名称>のNamespaceに作成されることがわかります。
SQL での書き込み
Namespace の生成をします。
-- データベース(Namespace)を作成する
CREATE NAMESPACE IF NOT EXISTS iceberg_demo;
Namespace を作成すると、ディレクトリが生成されていることがわかります。このあたりはカタログをどのように構成したかで変わるようです。
テーブルも作成します。
CREATE TABLE iceberg_demo.sample (
id bigint NOT NULL COMMENT 'unique id',
data string)
USING iceberg;
INSERT INTO iceberg_demo.sample (id, data) VALUES
(1, 'first entry'),
(2, 'second entry'),
(3, 'third entry');
指定したNamespace のディレクトリにデータが生成されました。
Spark SQL での確認
SQL アクセスを確認します。
pyspark によるフォーマット直接アクセスによる確認
相対パスを使用して、カタログを使用せず、ファイルメタデータだけでアクセスします。
df = spark.read.format("iceberg").load("Files/hadoop/iceberg_demo/sample")
display(df)
過去バージョンへのクエリ
データの追加を行い、バージョンの履歴を確認します。
SELECT snapshot_id, committed_at FROM iceberg_demo.sample.snapshots
SELECT * FROM iceberg_demo.sample FOR SYSTEM_VERSION AS OF 1621798013750238808;
過去バージョンも取得可能できています。
Delta Lake 相互運用
OneLake 上に Iceberg がある場合、Iceberg ショートカットを使用することで Delta Lake として読み取ることができます。
Iceberg ショートカットの生成
Tables にてショートカットを作成
対象のテーブルディレクトリを選択します。
iceberg データを含むディレクトリをショートカットするとTables内で_delta_log が生成されます。
Power BI での確認
Power BI でデータの確認をしてみます。
セマンティックモデルを作成します。
レポートで表示を確認します。
Databricks でのアクセス
Tables フォルダでは、delta_log が生成されているので、Databricksから参照することができます。
パススルークラスターでアクセスするのが楽です。
このクラスター構成は現在非推奨です。詳しくは Azure Databricks から OneLake 上のデータにアクセスする方法 2024/12 版 で
Tablesのabfss パスを取得します。
Databricks 上で以下実行していきます。
table_path = "abfss://<取得したパス>"
# SQL APIでアクセス
sql_df = spark.sql(f"select * from delta.`{table_path}`")
display(sql_df)
# pyspark APIでアクセス
pyspark_df = spark.read.format("delta").load(table_path)
display(pyspark_df)
また、タイムトラベルも機能することがわかります。(対象テーブルは sample に変えています)
sql_df = spark.sql(f"select * from delta.`{table_path}` version as of 0 ")
display(sql_df)
sql_df = spark.sql(f"select * from delta.`{table_path}` version as of 1 ")
display(sql_df)