0. 概要
本記事はIceberg Advent Calendar 2025の9記事目です。
【この記事を読むと分かること】
- Apache Iceberg x pysparkのスキーマ進化&タイムトラベルの実装法
- SynapseMLを使用したLighGBM構築法
- 上記をGoogle Colabで実行する方法
1. はじめに:特徴量ちゃんと管理できてますか?
機械学習をしていると遭遇する問題の一つとして、特徴量エンジニアリングまわりのお悩みがあるかと思います。例えば…
- 精度向上に効くと思って作った特徴量を入れたらなぜか精度が下がってしまい、泣く泣く削除…と思ったら、後からやっぱり必要になって作り直し
- 特徴量を付けたり消したりして実験しているうちに、中身不明のバージョン違いデータが大量に発生
- 最高精度を出したモデルがどのバージョンの特徴量を使っていたか見失ってしまい、バージョン違いデータを次々入れ替えて精度確認するという無駄な工程の発生
2. そこで、Apache Icebergの出番です
Apache Icebergの主要な特徴である
- タイムトラベル:スナップショットID指定することで過去バージョンのデータにアクセス可能
- スキーマ進化:後方互換性を保ちながら特徴量追加
- 差分更新:データを完全に作り変えるのではなく、差分のみ更新するという効率性
を活用することで
- 特徴量の段階的な追加:差分更新をかけながら効率的に特徴量を追加
- 実験の再現性の保証:同一のスナップショットIDを指定したデータを使用することで、モデルの実行結果を完全に再現
が可能になります。
※Apache Icebergの基本概念については『Apache Iceberg活用入門 オープンテーブルフォーマットによるデータレイク&データレイクハウス』や『実践Apache Iceberg —— 高効率・高生産性を実現するオープンなデータ基盤の構築と運用』等にてご確認ください。
3. それでは、実際に実験してみましょう
今回は、NYC Taxi Trip Data(2023年1月)データを用い、各種乗車情報からタクシー料金を予測するLighGBMモデルを、Google Colabでさくっと実装してみます。
3.1 環境構築
Google Colabでnotebookを新規作成し、notebook冒頭で必要なライブラリやデータを読み込みます。
# Javaのインストール
!apt-get update -qq
!apt-get install -y openjdk-11-jdk-headless -qq
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'
# SynapseMLのインストール
!pip install -q synapseml
# Pysparkのインポート
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
# 機械学習用ライブラリのインポート
import pandas as pd
from synapse.ml.lightgbm import LightGBMRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
# SparkSessionの作成
spark = SparkSession.builder \
.appName('Iceberg-ML-PoC') \
.config('spark.jars.packages','com.microsoft.azure:synapseml_2.12:1.0.4,' 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3') \
.config('spark.sql.extensions','org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.catalog.local','org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.local.type','hadoop') \
.config('spark.sql.catalog.local.warehouse','/content/iceberg-warehouse') \
.config('spark.sql.defaultCatalog','local') \
.getOrCreate()
# データ読み込み
url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet'
pdf = pd.read_parquet(url)
print(f'✅ Downloaded {len(pdf):,} records') # ✅ Downloaded 3,066,766 records
print(f'📜 Columns: {pdf.columns.tolist()}') # 📜 Columns: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee']
# サンプルでいいので適当に軽量化
sampled_pdf = pdf.sample(frac=0.1, random_state=8)
print(len(sampled_pdf)) # 306677
# Sparkのdfに変換
sdf = spark.createDataFrame(sampled_pdf)
3.2 特徴量エンジニアリング
今回は下記の実装方針でデータの作成、及びモデルの構築・評価をしていきたいと思います。
【実装方針】
- オリジナルのデータに、まず基本の特徴量を足す(V1データ)
- 特徴量エンジニアリングしたデータを追加してスキーマ進化(V2データ)
- スナップショットIDからV1データとV2データをそれぞれ呼び出して、LighGBMモデルで学習・精度比較
3.2.1 V1データの作成
# とりあえずごく基本的な特徴量を足しつつ異常値を除去しておく
sdf_features_v1 = sdf.select(
# 元のdfの特徴量
F.col('tpep_pickup_datetime').alias('pickup_datetime'),
F.col('tpep_dropoff_datetime').alias('dropoff_datetime'),
F.col('PULocationID').alias('pickup_zone'),
F.col('DOLocationID').alias('dropoff_zone'),
F.col('passenger_count'),
F.col('trip_distance'),
F.col('fare_amount'),
# 時間帯別特徴量の追加
F.hour('tpep_pickup_datetime').alias('hour'),
F.dayofweek('tpep_pickup_datetime').alias('day_of_week'),
F.month('tpep_pickup_datetime').alias('month'),
# ラッシュアワーフラグ
F.when(
(F.hour('tpep_pickup_datetime').between(7,9)) |
(F.hour('tpep_pickup_datetime').between(17,19)),
True
).otherwise(False).alias('is_rush_hour'),
# 週末フラグ
F.when(
F.dayofweek('tpep_pickup_datetime').isin([1,7]),
True
).otherwise(False).alias('is_weekend')
).filter(
# ざっと異常値除去しておく
(F.col('fare_amount') > 0) &
(F.col('fare_amount') < 500) &
(F.col('trip_distance') > 0) &
(F.col('fare_amount') < 100) &
(F.col('passenger_count') > 0) &
(F.col('passenger_count') <= 6)
)
そしてここで、Icebergテーブルを作ります。
# データベースとテーブル作成
spark.sql('CREATE DATABASE IF NOT EXISTS taxi_features')
# Icebergテーブル作成
sdf_features_v1.writeTo('taxi_features.trip_features') \
.using('iceberg') \
.partitionedBy(F.days('pickup_datetime')) \ # 隠しパーティション(これもApache Icebergの便利機能!)
.create()
これで、データをIcebergテーブルとして保存することができました!
では続いて、データを追加してV1テーブルをV2テーブルにアップデートしていきす。
3.2.2 V2データを作成
Apache Icebergのスキーマ進化の機能を使いましょう。まず、追加するカラムの情報を渡してスキーマ進化させます。
今回はzone別の集約特徴量を作って追加します。
# スキーマ進化
## スキーマにカラムを追加
spark.sql(
"""
ALTER TABLE taxi_features.trip_features
ADD COLUMNS(
zone_trip_count BIGINT COMMENT 'Total trips from this zone',
zone_avg_fare DOUBLE COMMENT 'Average fare from this zone',
zone_avg_distance DOUBLE COMMENT 'Average trip distance from this zone',
zone_std_fare DOUBLE COMMENT 'Std dev of fare from this zone'
)
"""
)
特徴量を作成後、MERGEで既存データを更新することで、データ全体を書き直すのではなく、差分のみを更新することができます。
# zone別の集約特徴量の追加
sdf_existing = spark.table('taxi_features.trip_features')
zone_stats = sdf_existing.groupBy('pickup_zone').agg(
F.count('*').alias('zone_trip_count'),
F.avg('fare_amount').alias('zone_avg_fare'),
F.avg('trip_distance').alias('zone_avg_distance'),
F.stddev('fare_amount').alias('zone_std_fare')
)
zone_stats.createOrReplaceTempView('zone_stats_temp')
# データ更新
spark.sql("""
MERGE INTO taxi_features.trip_features AS target
USING zone_stats_temp AS source
ON target.pickup_zone = source.pickup_zone
WHEN MATCHED THEN UPDATE SET
target.zone_trip_count = source.zone_trip_count,
target.zone_avg_fare = source.zone_avg_fare,
target.zone_avg_distance = source.zone_avg_distance,
target.zone_std_fare = source.zone_std_fare
""")
3.2.3 V1とV2データの比較
ここで、作成したV1とV2データを比較してみましょう。
比較を可能にするために、taxi_features.trip_featuresのスナップショットIDを確認します。
# スナップショットの履歴を見る
spark.sql("""
SELECT
snapshot_id,
committed_at,
operation,
summary
FROM taxi_features.trip_features.snapshots
ORDER BY committed_at
""").show(truncate=False)
下記の画像のような出力が得られました。
最初にV1データを作成したときのスナップショットIDが4478211136779111607、次にV2データを作成したときのスナップショットIDが1034756331854009825となっています。
このスナップショットIDを指定することで、データのバージョンを指定して読み込むことができます。
# V1データ
df_v1 = spark.read \
.option('snapshot-id', 4478211136779111607) \ # V1のスナップショットIDを指定
.format('iceberg') \
.load('taxi_features.trip_features')
print(f'Columns in v1: {len(df_v1.columns)}') # Columns in v1: 12
print(f'v1 columns: {df_v1.columns}') # v1 columns: ['pickup_datetime', 'dropoff_datetime', 'pickup_zone', 'dropoff_zone', 'passenger_count', 'trip_distance', 'fare_amount', 'hour', 'day_of_week', 'month', 'is_rush_hour', 'is_weekend']
# V2データ
df_v2 = spark.read \
.option('snapshot-id',1034756331854009825) \ # V2のスナップショットIDを指定
.format('iceberg') \
.load('taxi_features.trip_features')
print(f'Columns in v2: {len(df_v2.columns)}') # Columns in v2: 16
print(f'v2 columns: {df_v2.columns}') # v2 columns: ['pickup_datetime', 'dropoff_datetime', 'pickup_zone', 'dropoff_zone', 'passenger_count', 'trip_distance', 'fare_amount', 'hour', 'day_of_week', 'month', 'is_rush_hour', 'is_weekend', 'zone_trip_count', 'zone_avg_fare', 'zone_avg_distance', 'zone_std_fare']
# カラムの違い
new_cols = set(df_v2.columns) - set(df_v1.columns)
print(new_cols) # {'zone_avg_distance', 'zone_trip_count', 'zone_avg_fare', 'zone_std_fare'}
V1データは12カラム、V2データは16カラムで、V2データではzone統計特徴量が増えていることを確認できます。
3.3 モデルの学習・評価
それではV1・V2データをそれぞれ用いて、fare_amountを予測するモデルを作ります。
# V1のデータを使ってモデル学習
## データの前処理
df_cleansed, feature_cols = preprocessor(df=df_v1, target='fare_amount')
## 学習・テストデータの分割
train_df, test_df = train_test_split(df=df_cleansed) # train: 230288, test: 57463
## モデル学習
model = train_model(train_df=train_df, feature_cols=feature_cols, target='fare_amount')
## 予測
train_pred, test_pred = predicter(model=model, train_df=train_df, test_df=test_df)
## 精度評価
evaluator(model=model, feature_cols=feature_cols, train_pred=train_pred, test_pred=test_pred)
※データの前処理・学習・評価関数の詳細
# データの前処理関数
def preprocessor(df, target):
df_cleansed = df.select(
'*'
).withColumnRenamed(
target, 'label' # 目的変数定義
).drop(
'pickup_datetime', 'dropoff_datetime' # timestamp型カラム除外
).na.drop() # null除外
# 特徴量一覧の取得
feature_cols = [col for col in df_cleansed.columns if col != 'label']
return df_cleansed, feature_cols
# 学習データとテストデータの分割関数
def train_test_split(df):
train_df, test_df = df.randomSplit([0.8,0.2], seed=8)
print(f'train: {train_df.count()}, test: {test_df.count()}')
return train_df, test_df
# モデル学習用関数
def train_model(train_df, feature_cols, target):
# 特徴量をベクトルに変換(Spark MLの仕様)
assembler = VectorAssembler(
inputCols = [col for col in feature_cols if col != target],
outputCol = 'features'
)
# モデル作成
lgbm = LightGBMRegressor(
featuresCol = 'features',
labelCol = 'label',
numLeaves = 31,
numIterations = 10,
learningRate = 0.1,
objective='regression'
)
# パイプラン作成
pipeline = Pipeline(stages=[assembler, lgbm])
# 学習
model = pipeline.fit(train_df)
return model
def predicter(model, train_df, test_df):
train_pred = model.transform(train_df)
test_pred = model.transform(test_df)
return train_pred, test_pred
def evaluator(model, feature_cols, train_pred, test_pred):
# RMSEとR^2の確認
evaluator_rmse = RegressionEvaluator(
labelCol = 'label',
predictionCol = 'prediction',
metricName = 'rmse'
)
evaluator_r2 = RegressionEvaluator(
labelCol = 'label',
predictionCol = 'prediction',
metricName = 'r2'
)
train_rmse = evaluator_rmse.evaluate(train_pred)
test_rmse = evaluator_rmse.evaluate(test_pred)
test_r2 = evaluator_r2.evaluate(test_pred)
print('\n📜 評価結果')
print(f' Train RMSE: ${train_rmse:.2f}')
print(f' Test RMSE: ${test_rmse:.2f}')
print(f' Test R2: ${test_r2:.4f}')
# feature importancesの確認
importances = model.stages[-1].getFeatureImportances()
importance_data = []
for idx, feature_name in enumerate(feature_cols):
importance_data.append({
'feature': feature_name,
'importance': importances[idx]
})
# pdfに変換
df_importance = pd.DataFrame(importance_data)
df_importance = df_importance.sort_values('importance', ascending=False).reset_index(drop=True)
print('\n 📊 Feature Importance')
display(df_importance)
モデルの学習結果は、RMSEが6.51 (train)、6.54 (test)、test R2が0.8281と、すでにそこそこ良い感じですね。
特徴量として重要度が高いのは、trip_distance、hour、dropoff_zoneなどのようです。
それでは同じようにV2のデータでモデル学習してみましょう。
# V2のデータを使ってモデル学習
## データの前処理
df_cleansed, feature_cols = preprocessor(df=df_v2, target='fare_amount')
## 学習・テストデータの分割
train_df, test_df = train_test_split(df=df_cleansed) # train: 230278, test: 57459
## モデル学習
model = train_model(train_df=train_df, feature_cols=feature_cols, target='fare_amount')
## 予測
train_pred, test_pred = predicter(model=model, train_df=train_df, test_df=test_df)
## 精度評価
evaluator(model=model, feature_cols=feature_cols, train_pred=train_pred, test_pred=test_pred)
モデルの学習結果は、RMSEが6.49 (train)、6.43 (test)、test R2が0.8339と、若干の精度向上を達成しています。
また、特徴量として最も重要度が高いのがtrip_distanceであることには変化がありませんが、新しく追加したzone_avg_fareとzone_std_fareがそれぞれ二番目と三番目にランクインしており、特徴量の追加がポジティブな効果をもたらしていることを確認できます。
【V1データとV2データで学習したモデルの精度の比較】
| データのバージョン | 特徴量数 | RMSE | R^2 |
|---|---|---|---|
| V1 | 9 | 6.54 | 0.8281 |
| V2 | 13 | 6.43 | 0.8339 |
Apache Icebergのタイムトラベル機能により、バージョン違いのデータにクイックにアクセスし、モデル構築・精度比較することができました。
4. まとめ:データもバージョン管理しよう
本記事では、Icebergテーブルを機械学習に利用することで
- スキーマ進化による効率的な特徴量追加
- タイムトラベルによるバージョン違いデータを用いたモデル構築
を実現する様子をデモしました。
上記のようなユースケース以外にも、データドリフト検知時にそれがいつ時点で発生したかをタイムトラベルしながら確認する、特定の過去時点のデータを指定することで過去データのみを用いてモデル構築する…といった用途も需要がある部分かと思います。
また、Apache Icebergでは他にも
- 特定のスナップショットにタグを付ける
- ブランチを切る
- 以前のスナップショットの状態にデータをロールバックする
といったきめ細やかなデータの履歴管理が可能な点は、機械学習パイプラインにおいてもメリットが大きい部分なのではないでしょうか。
📚参考
- 『Apache Iceberg活用入門 オープンテーブルフォーマットによるデータレイク&データレイクハウス』
- 『実践Apache Iceberg —— 高効率・高生産性を実現するオープンなデータ基盤の構築と運用』
- Apache Iceberg Time Travel Guide: Snapshots, Queries & Rollbacks
- Building an AI/ML Data Lake With Apache Iceberg
- Building Reproducible ML Systems with Apache Iceberg and SparkSQL: Open Source Foundations
