1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【実験】Microsoft Fabric Spark で Iceberg テーブルを生成し、Power BI や Databricks でクエリしてみる

Last updated at Posted at 2025-02-26

はじめに

Databricks にて Hadoop Catalog の Apache Iceberg をさわってみた を参考に Fabric の Spark ノートブックで Iceberg テーブルを生成してみます。

環境セットアップ

レイクハウスの作成と abfss パスの特定

レイクハウスと iceberg 用のディレクトリを作成し、abfss パスを取得します。
このディレクトリ内に iceberg テーブルを生成することになります。

image.png

Spark環境の構成

spark ノートブック用の環境を作成。ランタイムバージョンは 1.3( spark 3.5) です。

image.png

Iceberg-spark-runtimeのjar ファイルをダウンロードして、環境にアップします。

image.png

次に、 https://iceberg.apache.org/docs/latest/spark-configuration/#catalog-configuration にしたがって

Iceberg also supports a directory-based catalog in HDFS that can be configured using type=hadoop:
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path

と、必要に応じて SQL Extensions

spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

も設定します。

spark.sql.catalog.hadoop_prod.warehouse には最初に取得したabfss パスを設定してください。

image.png

spark 上で iceberg を操作する分には Files/・・・などの相対パスでも可能ですが、Iceberg ショートカット では絶対パスでのファイル参照でメタデータが構成される必要があるため、abfss パスを使用します。

spark 環境を発行します。

image.png

テーブル操作

テーブル作成とデータ登録

対象のレイクハウスにアタッチされたノートブックを作成し、

image.png

環境を先ほど作成したものに設定します。

image.png

pyspark での既定のカタログへの書き込み

spark でテーブルを作成してみます。

pyspark

from pyspark.sql.types import *

table_name = "people"

records = [
   (1, 'John', 25, 'NYC', '2023-09-28 00:00:00'),
   (2, 'Emily', 30, 'SFO', '2023-09-28 00:00:00'),
   (3, 'Michael', 35, 'ORD', '2023-09-28 00:00:00'),
   (4, 'Andrew', 40, 'NYC', '2023-10-28 00:00:00'),
   (5, 'Bob', 28, 'SEA', '2023-09-23 00:00:00'),
   (6, 'Charlie', 31, 'DFW', '2023-08-29 00:00:00')
]

schema = StructType([
   StructField("id", IntegerType(), True),
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True),
   StructField("city", StringType(), True),
   StructField("create_ts", StringType(), True)
])

df = spark.createDataFrame(records, schema)

(
   df.write
   .format("iceberg")
   .saveAsTable(table_name)
)

iceberg フォーマットのディレクトリが生成されました。

image.png

テーブルは既定では、<レイクハウスの名称>のNamespaceに作成されることがわかります。

image.png

SQL での書き込み

Namespace の生成をします。

sql

-- データベース(Namespace)を作成する
CREATE NAMESPACE IF NOT EXISTS iceberg_demo;

Namespace を作成すると、ディレクトリが生成されていることがわかります。このあたりはカタログをどのように構成したかで変わるようです。

image.png

テーブルも作成します。

sql

CREATE TABLE iceberg_demo.sample (
    id bigint NOT NULL COMMENT 'unique id',
    data string)
USING iceberg;

sql

INSERT INTO iceberg_demo.sample (id, data) VALUES
    (1, 'first entry'),
    (2, 'second entry'),
    (3, 'third entry');
    

image.png

指定したNamespace のディレクトリにデータが生成されました。

Spark SQL での確認

SQL アクセスを確認します。

image.png

pyspark によるフォーマット直接アクセスによる確認

相対パスを使用して、カタログを使用せず、ファイルメタデータだけでアクセスします。

image.png

pyspark

df = spark.read.format("iceberg").load("Files/hadoop/iceberg_demo/sample")
display(df)

image.png

過去バージョンへのクエリ

データの追加を行い、バージョンの履歴を確認します。

image.png

sparksql

SELECT snapshot_id, committed_at FROM iceberg_demo.sample.snapshots

image.png

sparksql


SELECT * FROM iceberg_demo.sample FOR SYSTEM_VERSION AS OF 1621798013750238808;

image.png

過去バージョンも取得可能できています。

Delta Lake 相互運用

OneLake 上に Iceberg がある場合、Iceberg ショートカットを使用することで Delta Lake として読み取ることができます。

Iceberg ショートカットの生成

Tables にてショートカットを作成

image.png

対象のテーブルディレクトリを選択します。

image.png

iceberg データを含むディレクトリをショートカットするとTables内で_delta_log が生成されます。

image.png

Power BI での確認

Power BI でデータの確認をしてみます。

セマンティックモデルを作成します。

image.png

image.png

レポートで表示を確認します。

image.png

Databricks でのアクセス

Tables フォルダでは、delta_log が生成されているので、Databricksから参照することができます。

パススルークラスターでアクセスするのが楽です。

image.png

このクラスター構成は現在非推奨です。詳しくは Azure Databricks から OneLake 上のデータにアクセスする方法 2024/12 版 で

Tablesのabfss パスを取得します。

image.png

Databricks 上で以下実行していきます。

pyspark

table_path = "abfss://<取得したパス>"

pyspark

# SQL APIでアクセス

sql_df = spark.sql(f"select * from delta.`{table_path}`")
display(sql_df)

image.png

pyspark

# pyspark APIでアクセス

pyspark_df = spark.read.format("delta").load(table_path)
display(pyspark_df)

image.png

また、タイムトラベルも機能することがわかります。(対象テーブルは sample に変えています)

pyspark

sql_df = spark.sql(f"select * from delta.`{table_path}` version as of 0 ")
display(sql_df)

image.png

pyspark

sql_df = spark.sql(f"select * from delta.`{table_path}` version as of 1 ")
display(sql_df)

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?