はじめに
Apache Hudi とか Apache Iceberg とか、新しい Data Lake フォーマットが浸透してきた感じがしますが、私が OCI Data Flow で Delta Lake テーブルを操作してみた記事を書いたのが 2022 年なんですね、時間が経つのは早い...
OCI Data Flow は Iceberg テーブルを扱えます。下記の GitHub のサイトにサンプルが載っていますので、まずはこちらを見ていただくのが良いと思います(記事投稿時から時間が経っているので記載されているバージョンがちょっと古いかも...)。
ということで、今回は Iceberg テーブルの特徴的な操作を試します。
まず、素の Spark 環境を OCI Compute 上に作って、テーブルの作成・更新・検索をやった後に、OCI Data Flow 上で全く同じソースコードを実行させてみたいと思います。
Spark で実行する言語は Python で。
OCI Object Storage に読み書きできる Spark 環境を作る
OCI Compute インスタンス上に Spark の実行環境を構築します。
Java 17 or 21 環境は事前にセットアップしておいて下さい。
Iceberg テーブルは Object Storage 上に作成し操作しますので、"OCI HDFS Connector" の導入&設定も同時に必要となります。
ドキュメンテーションはこちら
GitHub からバイナリを入手できます。
一から Spark と HDFS Connector をダウンロード&設定するのは面倒なので、一発で Spark 環境を構築できる bash スクリプトを作りました。
Spark 3.5.6 / Scala 2.12 / Hadoop 3.3 + OCI HDFS Connector 3.3.4.1.5.1
#!/bin/bash
set -ex
SPARK_BASE=spark-3.5.6-bin-hadoop3
SPARK_HOME=$(pwd)/$SPARK_BASE
spark_dl_url=https://dlcdn.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz
hdfs_connector_dl_url=https://github.com/oracle/oci-hdfs-connector/releases/download/v3.3.4.1.5.1/oci-hdfs.zip
# download spark
wget -q --no-check-certificate ${spark_dl_url}
mkdir $SPARK_HOME
tar xzvf *.tgz -C $SPARK_HOME --strip-components 1
rm $SPARK_BASE.tgz
# download oci hdfs connector
wget -q --no-check-certificate ${hdfs_connector_dl_url}
unzip *.zip -d oci-hdfs
cp oci-hdfs/lib/*.jar $SPARK_HOME/jars
cp oci-hdfs/third-party/lib/*.jar $SPARK_HOME/jars
rm $SPARK_HOME/jars/jsr305-3.0.0.jar # HDFS connector includes 3.0.2
# clean-up hdfs connector
rm oci-hdfs.zip
rm -rf oci-hdfs
# generate log4j2.properties
cat $SPARK_HOME/conf/log4j2.properties.template \
| sed -e 's/rootLogger.level = info/rootLogger.level = warn/' \
| sed -e '$a\\nlogger.oci.name = com.oracle.bmc' \
| sed -e '$a\logger.oci.level = error' \
| sed -e '$a\\nlogger.nativecodeloader.name = org.apache.hadoop.util.NativeCodeLoader' \
| sed -e '$a\logger.nativecodeloader.level = error' \
| sed -e '$a\\nlogger.metrics.name = org.apache.spark.metrics' \
| sed -e '$a\logger.metrics.level = error' \
> $SPARK_HOME/conf/log4j2.properties
# generate spark-defaults.conf - change object storage endpoint as needed
cat $SPARK_HOME/conf/spark-defaults.conf.template \
| sed -e '$a\\nspark.sql.hive.metastore.sharedPrefixes=shaded.oracle,com.oracle.bmc' \
| sed -e '$a\spark.hadoop.fs.oci.client.hostname=https://objectstorage.us-ashburn-1.oraclecloud.com' \
| sed -e '$a\#spark.hadoop.fs.oci.client.hostname.BUCKET.NAMESPACE=https://objectstorage.REGION.oraclecloud.com' \
| sed -e '$a\spark.hadoop.fs.oci.client.custom.authenticator=com.oracle.bmc.hdfs.auth.InstancePrincipalsCustomAuthenticator' \
> $SPARK_HOME/conf/spark-defaults.conf
このスクリプトでは、必要なバイナリをダウンロード&展開・配置して、
- $SPARK_HOME/conf/log4j2.properties
- $SPARK_HOME/conf/spark-defaults.conf
の二つのファイルを作成します。spark-defaults.conf には HDFS Connector の設定が入っています。Object Storage にアクセスするための IAM 関連の設定方法は複数種類あるのですが、このスクリプトでは Instance Principal を使っています。動的グループを使ってザクっとポリシーを設定するのであれば、これ位の権限付与で動作可能です。
Allow dynamic-group <dynamic-group-name> to read objectstorage-namespaces in compartment <compartment-name>
Allow dynamic-group <dynamic-group-name> to manage objects in compartment <compartment-name>
Allow dynamic-group <dynamic-group-name> to manage buckets in compartment <compartment-name>
このスクリプトでは、Ashburn の Object Storage にアクセスする設定になっていますが、適宜修正して下さい。
HDFS Connector の設定まわりの話題は、こちらを参照していただけるといいと思います。
環境変数 SPARK_HOME を作成したディレクトリのパスに設定して下さい。
Iceberg テーブルを操作する
では、PySpark のスクリプトを作って、Iceberg テーブルを操作してみましょう。
コメントに解説を入れています。
# iceberg-test.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when
# Iceberg カタログの設定
# spark.sql.catalog.<catalog-name> - ここでは "dev" という名前のカタログを設定する
# カタログタイプ "hadoop" はファイルシステム ベースでメタデータを管理する
# "warehouse" で Object Storage の場所を指定 oci://spark@NAMESPACE/iceberg/
# -> NAMESPACE ネームスペース、spark バケットの iceberg フォルダ配下にデータが格納される
spark = SparkSession.builder \
.appName("MyIcebergApp") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.dev", "org.apache.iceberg.spark.SparkCatalog")\
.config("spark.sql.catalog.dev.type", "hadoop") \
.config("spark.sql.catalog.dev.warehouse", "oci://spark@NAMESPACE/iceberg/") \
.getOrCreate()
# テーブルは dev ネームスペースに属するので、Iceberg 形式と認識できる
table_name = "dev.db.test"
# テーブルが既に存在する場合は削除
if spark.catalog.tableExists(table_name):
spark.sql(f"DROP TABLE {table_name}") # ここだけは SQL ...
# 2種類の DataFrame を作成
df1 = spark.range(5).withColumn("data", lit("df1"))
df2 = spark.range(5,10).withColumn("data", lit("df2"))
# テーブルを新規作成して df1 を書き込み
df1.writeTo(f"{table_name}").create()
# テーブルにデータ df2 を追加
df2.writeTo(f"{table_name}").append()
# id が 3 の倍数の行のデータを更新
updated_df = spark.read.table(f"{table_name}").withColumn(
"data",
when(col("id") % 3 == 0, "mod3") # data を更新
.otherwise(col("data")) # その他は元の値を保持
)
updated_df.writeTo(f"{table_name}").overwritePartitions()
# 現在のテーブルを表示
print(f"{table_name}")
spark.read.table(f"{table_name}").show()
# 履歴テーブル(Icebergのメタデータ)を読み込む
history = spark.read.table(f"{table_name}.history")
# 履歴テーブルを表示
print(f"{table_name}.history")
history.show()
# 一番最初のスナップショットのIDを取得して表示 (parent_id が NULL のレコード)
snapshot_id = history.where(col("parent_id").isNull()).head()["snapshot_id"]
print(f"snapshot_id: {snapshot_id}")
# 一番最初のスナップショットの内容を表示
spark.read.option("snapshot-id", snapshot_id).table(table_name).show()
# 二番目に古いスナップショット (parent_id = 一番最初のsnapshot_id)を timestamp 指定で取得
ts = history.select(col("made_current_at").alias("ts")).where(col("parent_id") == snapshot_id).head()["ts"]
print(f"timestamp: {ts}")
spark.read.option("as-of-timestamp", int(ts.timestamp() * 1000)).table(table_name).show()
spark.stop()
スクリプトの冒頭、Spark セッションの作成のところで Iceberg 拡張機能の設定を行なっています。
Iceberg のカタログのタイプにはオプションがあるのですが、OCI Data Flow で扱う場合は、hadoop タイプを使います。メタデータ自体も Object Storage で管理されます。
その後、
- テーブルの作成(最初のデータを書き込み)
- データの追加(2回目のデータ書き込み)
- データの更新(id が 3 の倍数の行のデータを更新)
を行なっています。
Iceberg テーブルには、以下のようなメタデータを管理するテーブルがあります。
historymetadata_log_entriessnapshotsfilespartitions
Iceberg テーブルは更新履歴を持っていますので、任意の時点のスナップショットを取り出すことができます。PySparkのスクリプトの後半では、スナップショットID やタイムスタンプを使って過去のスナップショットを取得しています。
- spark.read.option("snapshot-id", snapshot_id).table(table_name)
- spark.read.option("as-of-timestamp", timestamp).table(table_name)
Spark Submit コマンドで PySpark 実行
それでは spark-submit でスクリプトを実行してみましょう。
引数に、
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2
を付加するのがミソです。Spark & Scala のバージョンが一致したパッケージを指定して下さい。このパッケージを追加することによって、Iceberg 拡張機能が使えるようになります。
# 標準エラー出力は表示しない
$ $SPARK_HOME/bin/spark-submit \
--master local[*] \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2 \
iceberg-test.py 2>/dev/null
:: loading settings :: url = jar:file:/home/opc/opt/spark-3.5.6-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
dev.db.test
+---+----+
| id|data|
+---+----+
| 0|mod3|
| 5| df2|
| 1| df1|
| 2| df1|
| 6|mod3|
| 3|mod3|
| 4| df1|
| 7| df2|
| 8| df2|
| 9|mod3|
+---+----+
dev.db.test.history
+--------------------+-------------------+-------------------+-------------------+
| made_current_at| snapshot_id| parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2025-09-23 10:04:...| 58715541349848653| NULL| true|
|2025-09-23 10:04:...|5048860556409992145| 58715541349848653| true|
|2025-09-23 10:04:...| 618497776373035265|5048860556409992145| true|
+--------------------+-------------------+-------------------+-------------------+
snapshot_id: 58715541349848653
+---+----+
| id|data|
+---+----+
| 0| df1|
| 1| df1|
| 2| df1|
| 3| df1|
| 4| df1|
+---+----+
timestamp: 2025-09-23 10:04:34.993000
+---+----+
| id|data|
+---+----+
| 5| df2|
| 6| df2|
| 7| df2|
| 8| df2|
| 9| df2|
| 0| df1|
| 1| df1|
| 2| df1|
| 3| df1|
| 4| df1|
+---+----+
dev.db.test.history テーブルに3つのエントリー(= スナップショット)があり、このメタデータを使って、任意の更新時点のデータにアクセスしました。
一般的に「タイムトラベル機能」と呼ばれていますが、これは監査用途や、(任意の時点への)データのロールバック、(任意の時点から)実験やレポートの再現、などの目的で利用することができます。
この状態で Object Storage を確認してみると、こうなっています。
dev.db.test という名前のテーブルを作成しましたが、dev というネームスペースのカタログの場所を oci://spark@NAMESPACE/iceberg/にしたので、spark バケットの /iceberg フォルダ配下に、/db/test というサブフォルダが作られ、そこに data と metadata というフォルダが作成されています。実際のデータやメタデータはこれらのフォルダの中で管理されています。
デーブル名は、冒頭のカタログのネームスペース部分以外は任意に指定できます( . で区切らなくても、複数の . で区切っても動作する)が、"(namespace名).(database名).(table名)" で階層化するのが良いようです。
SQL文で同じ操作を行う
もう一つ、SQLベースのスクリプトも掲載しておきましょう。
APIベースと同じテーブル操作を行なっています。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyIcebergApp") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.dev", "org.apache.iceberg.spark.SparkCatalog")\
.config("spark.sql.catalog.dev.type", "hadoop") \
.config("spark.sql.catalog.dev.warehouse", "oci://spark@NAMESPACE/iceberg/") \
.getOrCreate()
# テーブルは dev ネームスペースに属するので、Iceberg 形式と認識できる
table_name = "dev.db.test"
# テーブルが既に存在する場合は削除
spark.sql(f"DROP TABLE IF EXISTS {table_name};")
# 2種類の Data を作成
data1 = []
for n in range(5):
data1.append(f"({n}, 'df1')")
values1 = ",".join(data1)
print(f"values1: {values1}")
data2 = []
for n in range(5, 10):
data2.append(f"({n}, 'df2')")
values2 = ",".join(data2)
print(f"values2: {values2}")
# テーブルを新規作成
spark.sql(f"CREATE TABLE {table_name} (id int, data string) USING iceberg")
# valaues1 を追加
spark.sql(f"INSERT INTO {table_name} VALUES {values1}")
# valaues2 を追加
spark.sql(f"INSERT INTO {table_name} VALUES {values2}")
# id が 3 の倍数の行のデータを更新
spark.sql(f"UPDATE {table_name} SET data = 'mod3' WHERE id % 3 = 0")
# 現在のテーブルの内容を表示
print(f"{table_name}")
spark.sql(f"SELECT * FROM {table_name}").show()
# 履歴テーブル(Icebergのメタデータ)を読み込む
history = spark.sql(f"SELECT * FROM {table_name}.history")
history.printSchema()
# 一番最初のスナップショットのIDを取得して表示 (parent_id が NULL のレコード)
snapshot_id = spark.sql(f"SELECT * FROM {table_name}.history WHERE parent_id IS NULL;").head()["snapshot_id"]
print(f"snapshot_id: {snapshot_id}")
# 一番最初のスナップショットの内容を表示
spark.sql(f"SELECT * FROM {table_name} VERSION AS OF {snapshot_id}").show()
# 二番目に古いスナップショット (parent_id = 一番最初のsnapshot_id)を timestamp 指定で取得
ts = spark.sql(f"SELECT made_current_at as ts FROM {table_name}.history WHERE parent_id = {snapshot_id}").head()["ts"]
print(f"timestamp: {ts}")
spark.sql(f"SELECT * FROM {table_name} TIMESTAMP AS OF '{ts}'").show()
spark.stop()
SQL の場合は、テーブルの SELECT 文で VERSION AS OF や TIMESTAMP AS OF を使ってスナップショットを取得することができます。
OCI Data Flow で実行する
では、作成したスクリプトをOCI Data Flow で実行してみます。
前提条件として、Data Flow が Object Storage にアクセスするためには、Resource Principal を使ったポリシーの設定が必要になります。詳しくはこちらを参照して下さい。
それでは、新しく作成するアプリケーションの設定で、まずバージョンを確認

Spark-Submit のオプションを使用するチェックボックスを有効にして、Compute上で実行した時と同じように Iceberg のパッケージと .py ファイルの場所を指定します。ここでもパッケージのバージョンが Sparkの実行環境のバージョンと合っていることを確認。
作成した Python スクリプトは Object Storage にアップロードして Data Flow が実行時にアクセスできるようにして下さい。
アプリケーションの作成が完了したら、実行します(しばらく待ちます)...
成功しました!
実行結果は、アプリケーションの標準出力ログ・ファイル(先頭行のもの)で確認できます。

まとめ
Iceberg テーブルを扱う場合、Catalog の概念や扱い方が最初に引っかかるところかもしれません。
今回は、Hadoop(ファイルシステム型)を使いましたが、Hive Metastore にも対応していますので、既に Hive Metastore を利用している場合は、そちらにメタデータ管理を寄せることもできます。
Object Storage 上でメタデータを管理する前提であれば(OCI Data Flow はそれを想定しています) Hadoop 型を使うことになります。
この書籍が非常に参考になりました。
次は、今回 Object Storage に作った Iceberg テーブルを Autonomous Database から外部表としてクエリをかけてみようと思いますので、乞うご期待!


