7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

この記事では、Apache Iceberg の代表的な特徴である以下の4つを、OCI上でできるだけシンプルに体験します。

1. 安全な schema evolution
2. hidden partitioning
3. partition evolution
4. snapshot ベースの time travel / rollback

Iceberg は、Object Storage 上の Parquet ファイル群を、単なるファイルの集まりではなく、テーブルとして管理するための table format です。

Parquet だけでもデータを保存することはできます。
しかし、データが増えたり、運用が長くなったりすると、次のような問題が出てきます。

  • 列を追加したら、古いファイルと新しいファイルで schema がずれる
  • partition 用の列を、利用者が意識しないといけない
  • 途中で partition 設計を変えると、フォルダ構成が混ざって管理しづらい
  • 誤ったデータを入れたときに、テーブルとして過去状態に戻しづらい

Iceberg は、こうした問題を metadata によって管理します。

この記事では、OCI Object Storage と OCI Data Flow / Spark を使って、Iceberg がどのようにこれらの問題を解決するのかを確認します。

この記事でやること

今回の検証では、次の構成を使います。

OCI Object Storage
  ├─ Iceberg table のデータとメタデータ
  └─ plain Parquet の比較用データ

OCI Data Flow / Spark
  ├─ Iceberg table を作成する
  ├─ Iceberg table にデータを書き込む
  ├─ schema evolution を確認する
  ├─ hidden partitioning を確認する
  ├─ partition evolution を確認する
  └─ snapshot time travel / rollback を確認する

今回は、Iceberg と plain Parquet を同じようなデータで比較してみます。

Iceberg だけを見ると、「便利そうだけど、何がそんなに違うのか」が分かりにくいです。
そこでこの記事では、同じような操作を Parquetファイル でも行い、Iceberg では自然にできることが、Parquet だけだとどこで困るのかを見ていきます。

進め方

この記事では、次の流れで進めます。

  1. OCI Object Storage に検証用バケットを作成する
  2. OCI Data Flow が Object Storage を読み書きできるように IAM を設定する
  3. PySpark スクリプトを Object Storage にアップロードする
  4. OCI Data Flow で PySpark スクリプトを実行する
  5. Data Flow のログを見ながら、Iceberg の4つの特徴を確認する

Iceberg の各機能を Data Flow のログで確認します。

事前準備編

Step 1: Object StorageとIAMの準備

1-1. Object Storage バケットを作る

OCI コンソールで Object Storage バケットを作成します。

バケット名:iceberg-lab

image.png

以下の用途で使用します。

iceberg-lab/
  apps/
    → Data Flow で実行する PySpark スクリプト置き場

 iceberg_warehouse/
   demo/
     adb_sales/
        → schema evolution と rollback を試す Iceberg table

      web_events/
        → hidden partitioning / partition evolution を試す Iceberg table

  plain_demo/
    → plain Parquet の比較データ置き場

1-2. IAMを設定する

Data Flow から Object Storage にアクセスできるように、動的グループとポリシーを作成します。

まず動的グループを作成します。dg-dataflow-iceberg-lab

一致ルールの例です。

ALL {resource.type='dataflowrun', resource.compartment.id='<your_compartment_ocid>'}

image.png

ポリシーを作成します。policy-dataflow-iceberg-lab

Allow dynamic-group dg-dataflow-iceberg-lab to read buckets in compartment <compartment-name>
Allow dynamic-group dg-dataflow-iceberg-lab to manage objects in compartment <compartment-name>

image.png

Step 2: PySpark スクリプトを Object Storage に置く

実行するPySpark のサンプルスクリプトを以下から保存して、Obect Storageのバケットにアップロードします。

iceberg_feature_demo.py をダウンロード

分かりやすく、接頭辞をapps/を付けて保存します。

image.png

Step 3: Data Flowを実行する

3-1. アプリケーションの作成

ナビゲーションメニュー > アナリティクスと AI > **Data Flow **を開きます。

アプリケーションの作成をクリックします。

名前:iceberg-feature-demo
他の設定は、デフォルトのままにします。

image.png

Sparkバージョンは、後で指定する Iceberg Runtime JAR と合わせる必要があります。この記事の例では Spark 3.5 系を前提にしているため、Iceberg の runtime package も Spark 3.5 用の iceberg-spark-runtime-3.5_2.12 を使います。

アプリケーション構成で Spark-Submit オプションの仕様を有効にします。

テキストボックスに以下を入力します。<namespace>はご自身の環境の値に入れ替えます。

--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.1
--conf dataflow.auth=resource_principal
--conf spark.executorEnv.OCI_JAVASDK_JERSEY_CLIENT_DEFAULT_CONNECTOR_ENABLED=true
--conf spark.driverEnv.OCI_JAVASDK_JERSEY_CLIENT_DEFAULT_CONNECTOR_ENABLED=true
oci://iceberg-lab@<namespace>/apps/iceberg_feature_demo.py iceberg-lab <namespace> ap-tokyo-1

image.png

アプリケーション・ログの場所を指定します。iceberg-lab バケットをログ出力先として使います。

image.png

設定出来たら、作成をクリックします。

3-2. アプリケーションの実行

作成した Application の詳細画面で、実行を押します。

image.png

「アプリケーションの実行」パネルが表示されます。
基本的には、作成時に設定した内容が入っているため、そのままで構いません。
確認する項目は次です。

  • Spark-submit オプション: 先ほど入力した内容になっていること
  • ログ出力先: oci://iceberg-lab@ になっていること
  • リソース構成: ドライバ、エグゼキュータ、エグゼキュータ数が想定通りであること

image.png

確認の上、実行をクリックします。
ステータスが、「受け入れ済み」から「成功」になるまで待ちます。

失敗」する場合は、実行の詳細画面からログを確認します。まず探すべきエラーは次のあたりです。

AccessDenied / NotAuthorized
→ IAM Policy または Resource Principal 設定が不足
Cannot download package / Maven
→ --packages が依存ライブラリを取得できていない
No FileSystem for scheme oci
→ OCI HDFS connector まわり、または Data Flow runtime 設定の問題
AnalysisException
→ SQL構文、Iceberg拡張、Spark/Iceberg version の不整合

成功」しました。今回は4分弱くらいの実行時間で完了しました。
image.png

モニタリングからログを確認します。

image.png

spark_application_stdout.log.gzを確認すると以下のような結果になりました。

image.png

ログの中に、次のような区切りが出ていれば、スクリプトは想定通り動いています。

================================================================================
schema evolution 前の Iceberg table
================================================================================

================================================================================
schema evolution 後。古い行の coupon_code は NULL になる
================================================================================

================================================================================
hidden partitioning: 利用者は event_day ではなく event_ts で検索する
================================================================================
・
・
・

検証編

ここからは、Data Flow のログを見ながら Iceberg の4つの特徴を確認します。

今回の検証で大事なのは、Iceberg の結果だけを見ることではありません。
同じようなことを plain Parquet で行った場合に、どこで困るのかもあわせて確認します。

デモ1: 安全な schema evolution を確認する

1-1. このデモで見たいこと

ここでは、すでにデータが入っている Iceberg テーブル に、あとから列を追加しても安全に読めることを確認します。

たとえば、最初は次の4つの列が売上テーブルにあるIcebergテーブルがあるとします。

order_id
customer_id
amount
order_ts

運用しているうちに、あとから次の列を追加したくなりました。

coupon_code

よくある業務変更です。
例えば

今まではクーポンを記録していなかった
でも今日から、購入時に使われたクーポンコードも保存したい

というような事情です。
このときに問題になるのは、古いデータには coupon_code が存在しないという点です。

Iceberg では、列追加などの schema 更新は metadata の変更として扱われ、既存のデータファイルをすぐに書き換える必要はありません。
また、Iceberg は列を一意の ID で追跡するため、追加・削除・並び替えなどによって別の列の値を誤って読む問題を避けやすい設計になっています。

1-2. スクリプトがやっていること

スクリプトは、まず adb_sales という Iceberg table を作ります。

CREATE TABLE oci.demo.adb_sales (
  order_id BIGINT,
  customer_id BIGINT,
  amount DOUBLE,
  order_ts TIMESTAMP
)
USING iceberg

この時点では、coupon_code はありません。

次に、3件の売上データを入れます。

order_id = 1
order_id = 2
order_id = 3

ここまでが、古い schema の状態です。

そのあとで、列を追加します。

ALTER TABLE oci.demo.adb_sales
ADD COLUMNS (
  coupon_code STRING COMMENT 'coupon used at purchase time'
)

そして、追加された列を使って新しいデータを1件入れます。

order_id    = 4
amount      = 150.0
coupon_code = WELCOME

つまり、テーブルの中には次の2種類のデータが混ざります。

古いデータ:
  coupon_code 列がなかった時代のデータ

新しいデータ:
  coupon_code 列がある時代のデータ

この状態のデータがどうなっているか確認してみましょう。

1-3. ログで見る場所

Data Flow のログから、次の見出しを探します。

================================================================================
schema evolution 後。古い行の coupon_code は NULL になる
================================================================================

ここで期待する結果は、次のような形です。

order_id | amount | coupon_code
---------+--------+------------
1        | 120.0  | NULL
2        | 80.0   | NULL
3        | 200.0  | NULL
4        | 150.0  | WELCOME

image.png

見るポイントはここです。

order_id = 1, 2, 3
  → coupon_code は NULL

order_id = 4
  → coupon_code は WELCOME

古い行には、もともと coupon_code が存在しません。
そのため Iceberg は、古い行の coupon_codeNULL として自然に見せます。

1-4. Iceberg だと何が嬉しいのか

嬉しい点は、テーブルとして schema の変化を管理できることです。

単なる Parquet ファイルの集まりではなく、Iceberg table は metadata を持っています。 その metadata の中で、今回の例でいくと次のような情報を管理します。

  • 最初の schema はこれ
  • 途中で coupon_code が追加された
  • この snapshot から新しい schema になった
  • 各列には一意の ID がある

そのため、読み手は途中でスキーマ変更があったような場合にも、「古いファイルと新しいファイルで列が違うかもしれない」と意識しなくても、Iceberg table として自然に扱えます。

1-5. Iceberg ではない場合に何が困るのか

比較用に、スクリプトでは Parquetファイル でも似たことをしています。

古い Parquet ファイル:
  order_id, amount だけ

新しい Parquet ファイル:
  order_id, amount, coupon_code がある

このように schema が違う Parquet ファイルが同じ場所に混ざると、読み取り側が schema merge を意識する必要があります。

Data Flow のログには、次の2つの比較が出ます。

plain Parquet: mergeSchema なしで読む
plain Parquet: mergeSchema ありで読む

mergeSchema は、Spark が Parquet ファイルを読むときのオプションです。
複数の Parquet ファイルに異なる schema がある場合、それらを見比べて1つの schema にまとめる機能です。

image.png

mergeSchema を使わない場合、Spark は代表的な schema だけを見て読みます。
その結果、今回はcoupon_code が出てきていません。

一方、mergeSchema を有効にすると、Spark は複数ファイルの schema を確認し、列を統合して表示をしています。

小さな検証や単純な列追加なら、mergeSchemaで対応できていますが、実運用では困る点が出てきます。
読み手が mergeSchema をしないと、新しい列が見えない、または期待と違う schema で読まれる可能性があったり、mergeSchema する場合もファイル数が多いデータレイクの環境では処理が重くなります。


ここでParquetとの違いを整理します。

  • Parquet:
    ファイルごとに schema を持っている
    新しい列を持つ Parquet ファイルを追加で置くことはできる
    ただし「テーブル全体の正式な schema 履歴」を管理してくれるわけではない

  • Iceberg:
    テーブル全体の schema を metadata として管理する
    いつ、どの列が追加されたかを table format が管理する
    読み手は Iceberg table として一貫した schema で読める

新しい Parquet ファイルを、次の schema で追加することはできます。
同じフォルダ配下に次のようなファイルが混ざる状態になります。

plain_demo/schema_evolution_parquet/
  part-00001.parquet   ← order_id, amount だけ
  part-00002.parquet   ← order_id, amount, coupon_code

ただし、この時点でそのフォルダは同じデータセットのように見えるけど、ファイルごとに schema が違う状態になります。

ポイントは、Parquet はファイルフォーマットであるということです。
それぞれの Parquet ファイルの中に schema 情報はあります。
しかし、Parquet ファイル群だけでは、次のような「テーブル全体のルール」は持ちません。
そのため、「このデータセットの現在の正式な schema は何か」、「coupon_code はいつ追加されたのか」、「古いファイルに coupon_code がない場合どう扱うのか」、「列名変更と列削除をどう区別するのか」、「列の順番が変わった場合どう安全に読むのか」という問題が発生します。その際に、読み取る側のエンジンが「複数ファイルの schema をどう解釈するか」を決める必要があります。

デモ2: hidden partitioning を確認する

2-1. このデモで見たいこと

このデモでは、利用者が partition のためだけに作った列を意識しなくても、Iceberg が裏側で partition を使ってくれることを確認します。

たとえば、Webイベントログには次のような時刻列があるとします。

event_ts

実際の分析では、よく次のような検索をします。

2026年4月1日のイベントだけ見たい

普通に考えると、日付単位でデータを分けておくと便利です。

2026-04-01 のデータ
2026-04-02 のデータ
2026-04-03 のデータ

この「日付ごとに分けて保存する」ための値が、partition です。

2-2. 普通の Parquet partition で起きやすいこと

Parquet では、日付ごとに partition したい場合、よく event_day という列を作ります。

event_ts              event_day
-------------------   ----------
2026-04-01 10:00:00   2026-04-01
2026-04-01 10:05:00   2026-04-01
2026-04-02 09:00:00   2026-04-02

この event_day は、業務データというより 保存場所を分けるための列です。

フォルダ構成で見ると、次のようになります。

events/
  event_day=2026-04-01/
    part-0001.parquet
  event_day=2026-04-02/
    part-0002.parquet

この場合、効率よく読むには、利用者が event_day を使って検索する必要があります。

SELECT *
FROM events
WHERE event_day = DATE '2026-04-01';

2-3. event_day が見えると何が困るのか

event_day が見えること自体が絶対に悪いわけではありませんが、初心者や分析利用者にとっては、次のような迷いやミスが起きやすくなります。

  • 困りごと1: 似た列が2つあって迷う

利用者から見ると、似た意味の列が2つあります。

event_ts   = 実際のイベント発生日時
event_day  = partition 用の日付

すると、次のような迷いが生じます。

  • 日付で検索するときは event_ts を使うの?
  • それとも event_day を使うの?
  • 両方指定したほうがいいの?

本当は利用者は「2026年4月1日のイベントが見たい」だけです。
でも、物理設計の都合で event_day という別の列を意識する必要が出てきます。

  • 困りごと2: 条件を書き忘れると無駄に読むことがある

たとえば利用者が自然にこう書いたとします。

SELECT *
FROM events
WHERE event_ts >= TIMESTAMP '2026-04-01 00:00:00'
  AND event_ts <  TIMESTAMP '2026-04-02 00:00:00';

業務的には正しい SQL です。
でも、plain Parquet の partition が event_day で作られている場合、環境によっては event_day の条件を書かないと partition pruning が効きにくいことがあります。

効率よく読むために、次のように書く必要が出ます。

SELECT *
FROM events
WHERE event_day = DATE '2026-04-01'
  AND event_ts >= TIMESTAMP '2026-04-01 00:00:00'
  AND event_ts <  TIMESTAMP '2026-04-02 00:00:00';

これは、なぜ同じような日付条件を2回書くのか、event_day と event_ts がずれたらどちらを信じるのか、など分かりにくい構造になります。

  • 困りごと3: partition の作り方を変えにくくなる

最初は event_day で partition していたが、あとから月単位event_monthに変える変更が出てくるかもしれません。このとき、利用者の SQL に event_day がたくさん書かれていると、partition 設計を変えるのが大変になります。

2-4. スクリプトがやっていること

Iceberg の hidden partitioning ではどうなるかをログの結果を見て確認してみましょう。

スクリプトは、web_events という Iceberg table を作ります。

CREATE TABLE oci.demo.web_events (
  event_id BIGINT,
  user_id BIGINT,
  event_ts TIMESTAMP,
  event_type STRING
)
USING iceberg
PARTITIONED BY (days(event_ts))

重要なのは、この部分です。

PARTITIONED BY (days(event_ts))

これは、event_ts を日単位に変換して partition する、という意味です。

ただし、テーブルの列として event_day を作っているわけではありません。

テーブルの見た目は、あくまで次の4列です。

event_id
user_id
event_ts
event_type

Iceberg の hidden partitioning では、利用者は event_ts に対して自然に条件を書けばよく、Iceberg が裏側で partition 情報を使って不要なファイルを除外します。

2-5. ログで見る場所

ログから、次の見出しの部分を確認します。

================================================================================
hidden partitioning: 利用者は event_day ではなく event_ts で検索する
================================================================================

スクリプトは、次のような SQL を実行しています。

SELECT event_id, user_id, event_ts, event_type
FROM oci.demo.web_events
WHERE event_ts >= TIMESTAMP '2026-04-01 00:00:00'
  AND event_ts <  TIMESTAMP '2026-04-02 00:00:00'
ORDER BY event_id

見るポイントは、SQL に event_day が出てこないことです。

利用者が書く条件:

  event_ts >= ...
  event_ts < ...

利用者が書かない条件:

  event_day = ...

つまり、利用者は業務的に自然な列である event_ts だけを使っています。

Iceberg は裏側で、event_ts の条件から「2026-04-01 の partition を読めばよい」と判断できます。

2-6. 裏側の partition 情報を見る

次に、ログからこの見出しを探します。

================================================================================
Iceberg の files metadata table。partition 情報は裏側で管理されている
================================================================================

スクリプトは次の SQL を実行しています。

SELECT spec_id, partition, record_count, file_path
FROM oci.demo.web_events.files
ORDER BY spec_id, file_path

ここで出てくる .files は、Iceberg の メタデータテーブル です。
Iceberg では、通常のデータだけでなく、filessnapshots などの メタデータテーブル を Spark SQL から参照できます。files metadata table では、各データファイルの partition 情報やファイルパスなどを確認できます。

以下のような結果が出力されます。

spec_id | partition                  | record_count | file_path
--------+----------------------------+--------------+----------
0       | event_ts_day=2026-04-01    | 2            | ...
0       | event_ts_day=2026-04-02    | 2            | ...

image.png

見るポイントはここです。

  • 通常の SELECT では event_day は見えない
  • でも メタデータテーブル を見ると、裏側では日単位 partition が管理されている

2-7. Iceberg だと何が嬉しいのか

嬉しい点は、物理的なデータ配置を利用者に意識させなくてよいことです。

利用者はこう書けます。

WHERE event_ts >= TIMESTAMP '2026-04-01 00:00:00'
  AND event_ts <  TIMESTAMP '2026-04-02 00:00:00'

Iceberg は裏側で、event_ts から日単位 partition に変換して、読むべきファイルを判断できます。

2-8. Iceberg ではない場合に何が困るのか

比較用に、スクリプトでは plain Parquet でも日付 partition を作っています。

ログの次の見出しの部分を確認します。

================================================================================
plain Parquet: 物理 partition column の event_day が見えてしまう
================================================================================

この場合、データを読むと event_day が見えます。

event_id | user_id | event_ts            | event_day
---------+---------+---------------------+------------
1        | 201     | 2026-04-01 10:00:00 | 2026-04-01
2        | 202     | 2026-04-01 10:05:00 | 2026-04-01

つまり、物理的なデータ配置のために作った列が、利用者にも見えてしまいます。

ポイントは以下です。

  • Iceberg では、partition は裏側の設計として持てる。
  • 利用者の SQL に partition 用の列を意識させる必要がない。

デモ3: partition evolution を確認する

3-1. このデモで見たいこと

このデモでは、一度作った Iceberg table の partition 設計を、あとから変えられることを確認します。

最初は、Webイベントログを日単位「days(event_ts)」で partition していました。

これは、日付で絞る分析には便利です。

しかし、データが増えてくると、日単位だけでは1日分のデータが大きくなりすぎるかもしれません。

そこで、あとから次のような partition を追加したくなったとします。

  • user_id を bucket 分割する

つまり、以下のような partition設計に変更したくなったとします。

日付で分ける +
さらに user_id の bucket でも分ける

Iceberg では、partition spec をあとから変更できます。古いデータは古い partition layout のまま、新しいデータは新しい partition layout で書かれます。複数の partition layout は metadata で別々に管理されるため、既存データを即座に書き換える必要はありません。

3-2. スクリプトがやっていること

最初の web_events table は、日単位 partition だけです。

PARTITIONED BY (days(event_ts))

そのあと、次の SQL で partition field を追加します。

ALTER TABLE oci.demo.web_events
ADD PARTITION FIELD bucket(4, user_id) AS user_bucket

ここで、「これから新しく書き込むデータは、日付 partition に加えてuser_id を4つのbucketに分けて保存する」ことを指定しています。

その後、新しいデータを追加します。

event_id = 5
event_id = 6

つまり、テーブルの中には次の2種類のファイルが混ざります。

  • 古いファイル
    days(event_ts) だけで partition されたファイル

  • 新しいファイル
    days(event_ts) + bucket(4, user_id) で partition されたファイル

3-3. ログで見る場所

Data Flow のログから、次の見出しを確認します。

================================================================================
partition evolution 後も、利用者は event_ts / user_id で普通に検索する
================================================================================

スクリプトは次のような SQL を実行しています。

SELECT event_id, user_id, event_ts, event_type
FROM oci.demo.web_events
WHERE event_ts >= TIMESTAMP '2026-04-03 00:00:00'
  AND event_ts <  TIMESTAMP '2026-04-04 00:00:00'
  AND user_id = 201
ORDER BY event_id

ここで見るポイントは、partition 設計が変わっても、利用者の SQL は普通の条件のままだということです。

  • 利用者は user_bucket を書かない
  • 利用者は spec_id も書かない
  • event_ts と user_id だけで検索している

3-4. spec_id を見る

次に、ログからこの見出しを探します。

================================================================================
partition evolution: 古い file は spec_id 0、新しい file は spec_id 1 になる
================================================================================

スクリプトは次の SQL を実行しています。

SELECT spec_id, partition, record_count, file_path
FROM oci.demo.web_events.files
ORDER BY spec_id, file_path

期待するイメージは次のようなものです。

spec_id | partition
--------+--------------------------------------------
0       | event_ts_day=2026-04-01
0       | event_ts_day=2026-04-02
1       | event_ts_day=2026-04-03, user_bucket=...

image.png

ここでの重要ポイントは spec_id です。

  • spec_id = 0
    → 古い partition 設計で書かれたファイル

  • spec_id = 1
    → 新しい partition 設計で書かれたファイル

つまり、同じ Iceberg table の中に、古い partition 設計のファイルと新しい partition 設計のファイルが共存しています。

3-5. Iceberg だと何が嬉しいのか

嬉しい点は、partition 設計を変えるために、すべての古いデータをすぐに書き換えなくてよいことです。

たとえば、大量の過去データがある場合、partition 設計を変えるたびに全データを再配置するのは大変です。
以下のような作業が必要になります。

  • 全ファイルを読み直す
  • 新しい partition 構成で書き直す
  • 古いファイルを消す
  • メタストアや参照先を調整する

Iceberg では、partition 設計の変更を metadata として管理できます。

そのため、次のような運用ができます。

  • 古いデータ:
    古い layout のまま残す

  • 新しいデータ:
    新しい layout で書く

  • 読み取り:
    Iceberg が各ファイルの spec_id を見て判断する

3-6. Iceberg ではない場合に何が困るのか

比較用に、スクリプトでは plain Parquet でも途中から partition 構成を変える例を作っています。

最初はこうです。

event_day=2026-04-01/
event_day=2026-04-02/

途中からこうなります。

event_day=2026-04-03/user_bucket=1/
event_day=2026-04-03/user_bucket=2/

つまり、同じデータ置き場に次の2種類のフォルダ構造が混ざります。

  • 古い構造:
    event_day だけ

  • 新しい構造:
    event_day + user_bucket

出力結果は次のようになります。
image.png

Parquet では、同じディレクトリ配下の partition 構成が一貫していることが期待されます。
今回は、古いデータを event_day だけで partition し、新しいデータを event_day + user_bucket で partition したため、Spark は同じ Parquet データセットとして解釈できずエラーになりました。

これは、Parquet だけで partition 設計を途中変更すると、読み取り側や運用側が困る例です。

一方 Iceberg では、古い partition 設計と新しい partition 設計をそれぞれ partition spec として metadata に記録します。そのため、同じ table の中に複数の partition layout があっても、Iceberg が spec_id を見て正しく扱えます。

デモ4: snapshot time travel / rollback を確認する

4-1. このデモで見たいこと

このデモでは、Iceberg table が snapshot を持っているため、過去の状態を読んだり、過去の状態に戻したりできることを確認します。

分かりやすく言うと、Iceberg table には「履歴」があります。
例えば以下のような履歴を残しておくことができます。

  • 最初にデータを入れた状態
  • 列を追加した状態
  • さらにデータを入れた状態
  • 誤データを入れてしまった状態
  • rollback した状態

Iceberg では、各 write によって table の新しい snapshot、つまり バージョンが作られます。snapshot は time travel query に使うことができ、有効な snapshot に rollback することもできます。

4-2. スクリプトがやっていること

まず、正常な売上データがあります。

order_id | amount
---------+-------
1        | 120.0
2        | 80.0
3        | 200.0
4        | 150.0

合計は次のようになります。

120 + 80 + 200 + 150 = 550

この正常な状態の snapshot ID を、スクリプトが控えます。

ログには次のように出ます。

[SNAPSHOT_BEFORE_BAD] 1234567890123456789

この snapshot ID は、あとで「誤投入前に戻る」ための目印です。

そのあと、わざと悪いデータを入れます。

order_id    = 999
customer_id = 999
amount      = -9999.0
coupon_code = BAD

これにより、売上合計が壊れます。

正常な合計:
  550

誤投入後の合計:
  550 + (-9999) = -9449

4-3. ログで見る場所: snapshots

Data Flow のログから、次の見出しを探します。

================================================================================
現在の snapshots
================================================================================

スクリプトは次の SQL を実行しています。

SELECT committed_at, snapshot_id, operation
FROM oci.demo.adb_sales.snapshots
ORDER BY committed_at

誤データを入れる前の時点で snapshot 一覧を見ている結果です。
image.png

Iceberg では、データを書き込む操作によって snapshot が作られます。

  • INSERT した
  • ALTER TABLE で列を追加した
  • また INSERT した
  • 誤データを INSERT した
  • rollback した

今回の例では、最初の INSERT と、coupon_code 列追加後の INSERT により、append の snapshot が2つ表示されています。

schema 変更も Iceberg metadata の変更として管理されますが、
この snapshots 一覧では、主にデータ追加などの snapshot を確認しています。

Iceberg の snapshots は metadata table として参照でき、snapshot ID や operation を確認できます。Spark では Iceberg の metadata table を通常の SQL のように参照できます。

4-4. ログで見る場所: 誤投入後の現在状態

次に、ログからこの見出しを探します。

================================================================================
誤投入後の現在状態
================================================================================

期待する結果は次のような形です。

row_count | total_amount
----------+-------------
5         | -9449.0
  • row_count が正常な4件 + 誤データ1件 = 5件 になっている
  • total_amount が 間違った値 -9449.0 になっている

4-5. ログで見る場所: time travel

次に、ログからこの見出しを探します。

================================================================================
time travel: 誤投入前の snapshot を VERSION AS OF で読む
================================================================================

スクリプトは、先ほど控えた snapshot ID を使って、過去の状態を読みます。

SELECT COUNT(*) AS row_count, SUM(amount) AS total_amount
FROM oci.demo.adb_sales VERSION AS OF 1234567890123456789

期待する結果は次です。

row_count | total_amount
----------+-------------
4         | 550.0

ここで重要なのは、現在の table は壊れているのに、過去 snapshot を指定すると正常な状態を読めることです。

Spark では、Iceberg table に対して VERSION AS OFTIMESTAMP AS OF を使った time travel query ができます。

4-6. ログで見る場所: rollback

最後に、スクリプトは table 自体を誤投入前の snapshot に戻します。

CALL oci.system.rollback_to_snapshot(
  'demo.adb_sales',
  1234567890123456789
)

rollback 後、もう一度現在状態を確認します。

ログから次の見出しを探します。

================================================================================
rollback 後の現在状態
================================================================================

期待する結果は次です。

row_count | total_amount
----------+-------------
4         | 550.0

つまり、現在の table そのものが、誤投入前の状態に戻りました。

Iceberg の Spark procedure には rollback_to_snapshot があり、指定した snapshot ID に table を rollback できます。

4-7. Iceberg だと何が嬉しいのか

嬉しい点は、データレイク上の table に、履歴管理と復旧の考え方を持ち込めることです。

たとえば、誤データを入れてしまったときに、次のような確認ができます。

  • 今の状態は壊れているか
  • 壊れる前の snapshot はどれか
  • 壊れる前の状態では集計値はいくつか
  • 問題なければ、その snapshot に戻す

単なる Parquet ファイルの集まりだけで運用している場合、こうした「table としての正しい過去状態」を自動で扱うのは簡単ではありません。

もちろん、Object Storage のバージョニングやバックアップ運用で戻す方法はあります。
ただしそれは、Iceberg table format の snapshot 機能とは別の、ストレージや運用で補う方法です。

ポイントは次のとおりです。

  • Iceberg では、table の状態を snapshot として管理できる。
  • だから、過去を読めるし、必要なら戻せる。

おわりに

今回の検証では、OCI Object Storage 上に Iceberg table と plain Parquet データを作成し、OCI Data Flow / Spark からそれらを操作しました。

Iceberg を使うことで、Object Storage 上の Parquet ファイル群を、単なるファイルの集まりではなく、schema、partition、snapshot を持つテーブルとして扱えることを確認しました。

特に、plain Parquet との比較により、次の違いが分かります。

  • schema evolution:
    Iceberg は schema 変更を table metadata として管理できる

  • hidden partitioning:
    Iceberg は partition 用の列を利用者に意識させずに済む

  • partition evolution:
    Iceberg は古い partition layout と新しい partition layout を共存させられる

  • time travel / rollback:
    Iceberg は snapshot により、過去状態の参照や rollback ができる

このように Iceberg は、データレイク上のデータを安全に運用するための
テーブル管理レイヤーとして機能します。

7
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?