2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksでミニAI基盤を想像で作ってみた

Posted at

はじめに

こんにちは、とあるSIer企業で働いている threebottles です。
初めてQiitaに投稿してみます。
私は普段仕事では Snowflake や AWS を使ったデータウェアハウスなどの構築に関わっています。

最近、Databricks という言葉をよく聞くようになり、そろそろちゃんと触ってみようかなと思い今回の記事を作っています。
Databricks のホームページを覗いてみると「データインテリジェンスプラットフォーム」と紹介されており、ただのデータ基盤ではなく、データ分析や AI 活用も含めた統合的なプラットフォームであることがわかります。
そこで、勝手な想像でミニ AI 基盤を作ってみました。
私はデータエンジニアリング領域の人間で、AI/ML の実務経験はありませんので、あくまで「想像で作ってみた」体験記です。

また、今回は無料で使える Databricks の Free Edition というサービスを使ってみました。

Databricks Free Edition とは?

Databricks の Free Edition は通常の Databricks 商用プランと基本的には同じ機能を試すことができる無料の環境です。
コンピュートは最小限なサーバレスのみ使用可能なため、大規模な処理は無理ですが、ちょっとお試しで触ってみたり、学習目的だったら費用のことを考えずに使うことができます。

使ったデータ

今回ミニ AI 基盤を作る上で何かデータセットを実際に使ってみようと思い、クレジットカードの不正利用検知のデータを利用しました。
CSV ファイルの先頭3レコードを参考として以下に記載します。

Time,"V1","V2","V3","V4","V5","V6","V7","V8","V9","V10","V11","V12","V13","V14","V15","V16","V17","V18","V19","V20","V21","V22","V23","V24","V25","V26","V27","V28","Amount","Class"
0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,"0"
0,1.19185711131486,0.26615071205963,0.16648011335321,0.448154078460911,0.0600176492822243,-0.0823608088155687,-0.0788029833323113,0.0851016549148104,-0.255425128109186,-0.166974414004614,1.61272666105479,1.06523531137287,0.48909501589608,-0.143772296441519,0.635558093258208,0.463917041022171,-0.114804663102346,-0.183361270123994,-0.145783041325259,-0.0690831352230203,-0.225775248033138,-0.638671952771851,0.101288021253234,-0.339846475529127,0.167170404418143,0.125894532368176,-0.00898309914322813,0.0147241691924927,2.69,"0"
1,-1.35835406159823,-1.34016307473609,1.77320934263119,0.379779593034328,-0.503198133318193,1.80049938079263,0.791460956450422,0.247675786588991,-1.51465432260583,0.207642865216696,0.624501459424895,0.066083685268831,0.717292731410831,-0.165945922763554,2.34586494901581,-2.89008319444231,1.10996937869599,-0.121359313195888,-2.26185709530414,0.524979725224404,0.247998153469754,0.771679401917229,0.909412262347719,-0.689280956490685,-0.327641833735251,-0.139096571514147,-0.0553527940384261,-0.0597518405929204,378.66,"0"

V1 〜 V28 は PCA 分析で次元削減された特徴量、Amount はクレジットカードの取引額を表します。
Class が不正取引を分類する目的変数で、1が不正利用を表します。
全284,807件の取引のうち492件が不正利用で、陽性クラス(不正利用)は全取引の0.172%ととても不均一なデータになります。
普段、金融業界のシステムに関わっているので、面白いなと思ってこのデータを選んでみました。

実装内容

全体感

まずは今回作ったミニ AI 基盤の全体フローです。

  1. データ投入
    通常のシステムでは上流システムからデータを受領しますが、今回はそのフローを省略し、手動でアップロードしてデータを用意しました。

    • CSV ファイルデータセット(クレジットカード不正利用検知 CSV)を Databricks にアップロード
    • creditcard_stage テーブルに格納

  2. データ分割
    取り込んだデータを、用途に応じて4つに分割しました。
    INCR は夜間のバッチ処理をイメージし、追加で連携されたデータの想定で、モデルの追加学習に使います。
    Predict Input も同様に夜間のバッチ処理をイメージし、最新のクレジットカード利用データが連携されてきたと想定して、不正利用かどうかを実際に予測するためのデータとしました。

    • RAW : 初回モデル構築用
    • VAL : モデル評価専用(固定)
    • INCR : 追加学習用
    • Predict Input : 予測対象データ(Class 列なし)

  3. 初回学習
    まずはベースとなるモデルを構築します。
    構築したモデルは Databricks 上の Volume に保存しておき、追加学習時に利用します。
    また、モデルの情報や性能評価結果をテーブルへ保村しておきます。

    • RAW を使ってモデルを学習
    • VAL を使って性能評価(Precision / Recall / AUC など)
    • モデルファイル(v0001)を Volume に保存
    • 評価結果を model_state に記録

  4. 追加学習
    保存されている最新モデルを取得し、追加学習を行います。
    初回学習時と同じで、モデルの保存とテーブルへの記録を行います。

    • 最新モデルをロード
    • INCR で追加学習
    • VAL で再評価し、結果を model_state に追記
    • 新しいモデルファイルを v0002, v0003… として保存

  5. 推論
    学習したモデルを使って実際に予測を行います。

    • 最新モデルを使って Predict Input のデータを推論
    • 推論結果を Predict Output に保存

テーブル設計

まずはステージ用テーブル creditcard_stage を作成し、CSV ファイルを UI からアップロードしました。
そこから SQL で分割して以下のテーブルを用意しました。

  • creditcard_raw: 初回学習用データ(約70%)
  • creditcard_val: 評価専用データ(約20%、常に固定)
  • creditcard_incr: 追加学習用データ(約10%)
  • creditcard_predict_input: 予測用データ(ラベルなし)

インプットデータ以外のテーブルとしては以下を作成しました。

  • creditcard_predict_output: 予測結果の保存先
  • model_state: モデルのメタ情報と評価メトリクス

ちなみにcreditcard_stage, creditcard_predict_output, model_stateのテーブルを作成したSQLはこちらです。他のテーブルは同じレイアウトなので省略します。

CREATE TABLE IF NOT EXISTS creditcard_stage (
  Time DOUBLE,
  V1 DOUBLE,  V2 DOUBLE,  V3 DOUBLE,  V4 DOUBLE,  V5 DOUBLE,  V6 DOUBLE,  V7 DOUBLE,
  V8 DOUBLE,  V9 DOUBLE,  V10 DOUBLE, V11 DOUBLE, V12 DOUBLE, V13 DOUBLE, V14 DOUBLE,
  V15 DOUBLE, V16 DOUBLE, V17 DOUBLE, V18 DOUBLE, V19 DOUBLE, V20 DOUBLE, V21 DOUBLE,
  V22 DOUBLE, V23 DOUBLE, V24 DOUBLE, V25 DOUBLE, V26 DOUBLE, V27 DOUBLE, V28 DOUBLE,
  Amount DOUBLE,
  Class INT
) USING delta;

CREATE TABLE IF NOT EXISTS creditcard_predict_output (
  req_id BIGINT,
  predict_time TIMESTAMP,
  y_pred INT,
  score DOUBLE,
  model_id STRING,
  model_version BIGINT
) USING delta;

CREATE TABLE IF NOT EXISTS model_state (
  model_id STRING,
  version BIGINT,
  model_path STRING,
  algo STRING,
  created_at TIMESTAMP,
  note STRING,
  test_ratio DOUBLE,
  threshold DOUBLE,
  eval_rows BIGINT,
  train_rows BIGINT,
  precision_pos DOUBLE,
  recall_pos DOUBLE,
  f1_pos DOUBLE,
  roc_auc DOUBLE,
  pr_auc DOUBLE
) USING delta;

初回学習(モデル構築)

※コードは一部抜粋(参考)です。

まず、学習用・評価用データをテーブルからロードします。

def load_Xy(table):
    pdf = spark.table(table).na.drop().toPandas()
    X = pdf[[f"V{i}" for i in range(1,29)]].copy()
    X["log_amount"] = np.log1p(pdf["Amount"].astype(float))
    y = pdf["Class"].astype(int).values
    return X, y

X_tr, y_tr = load_Xy(RAW)
X_va, y_va = load_Xy(VAL)

ここで Amount を対数変換しているのは以下の図のように少額のデータが多いためです。
対数にすることで、分布がより均一なデータにしています。

スクリーンショット 2025-08-31 19.03.54.png

次に学習です。今回モデルについては適当に用意しました。

# ---------- 学習(RAW) ----------
clf = SGDClassifier(loss="log_loss", random_state=42, max_iter=5, tol=None)
clf.partial_fit(X_tr, y_tr, classes=classes, sample_weight=sw_tr)

# ---------- 評価(VAL) ----------
proba_va = clf.predict_proba(X_va)[:, 1]
yhat_va  = (proba_va >= THRESHOLD).astype(int)

prec1, rec1, f11, _ = precision_recall_fscore_support(y_va, yhat_va, labels=[1], zero_division=0)
prec1, rec1, f11 = float(prec1[0]), float(rec1[0]), float(f11[0])
roc = float(roc_auc_score(y_va, proba_va))
pr  = float(average_precision_score(y_va, proba_va))

この学習では以下のような評価結果になりました。

  • precision_pos = 0.005
  • recall_pos = 0.974
  • f1_pos = 0.010
  • ROC-AUC = 0.889
  • PR-AUC = 0.006

不正検知はほぼ全件拾えていることがわかります(Recall ≈ 97%)。
ただし Precision ≈ 0.5% と誤検知だらけで、疑わしものをあらかた不正と決めつけているような状態です。。。
今回のデータは不正利用データ(陽性)の割合が極端に小さいため、Accuracy だけを見てはダメということがよくわかります。

では、構築したモデルを保存します。

# ---------- モデル保存 v0001 ----------
version     = 1
ts          = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
model_path  = f"{BASE_DIR}/v{version:04d}_{ts}_{uuid.uuid4().hex[:8]}.joblib"
joblib.dump(clf, model_path)

# ---------- model_state へメトリクス込みで追記 ----------
spark.sql(f"""
INSERT INTO {STATE} (
  model_id, version, model_path, algo, created_at, note,
  test_ratio, threshold, eval_rows, train_rows,
  precision_pos, recall_pos, f1_pos, roc_auc, pr_auc
) VALUES (
  '{MODEL_ID}', {version}, '{model_path}',
  'SGDClassifier(log_loss)+partial_fit(sample_weight)',
  current_timestamp(), 'initial: train RAW, eval VAL (fixed)',
  {float(TEST_RATIO)}, {float(THRESHOLD)}, {int(len(y_va))}, {int(len(y_tr))},
  {prec1}, {rec1}, {f11}, {roc}, {pr}
)
""")

実際にモデルが保存されていることも確認できます。

スクリーンショット 2025-08-31 19.20.55.png
スクリーンショット 2025-08-31 19.20.32.png

追加学習

では、追加学習をやってみましょう。
まずは学習用・評価用のデータロードです。

def load_Xy(table):
    pdf = spark.table(table).na.drop().toPandas()
    X = pdf[[f"V{i}" for i in range(1,29)]].copy()
    X["log_amount"] = np.log1p(pdf["Amount"].astype(float))
    y = pdf["Class"].astype(int).values
    return X, y

X_in, y_in = load_Xy(INCR)
X_val, y_val = load_Xy(VAL)

学習済みモデルをロードします。

# 最新モデルロード
latest = spark.sql(f"SELECT version, model_path FROM {STATE} WHERE model_id='{MODEL_ID}' ORDER BY version DESC LIMIT 1").collect()
print(latest)
cur_version = int(latest[0]["version"]); cur_path=latest[0]["model_path"]
clf = joblib.load(cur_path)

ロードした学習済みモデルに対して、追加学習を行います。

# 追加学習
clf.partial_fit(X_in, y_in, classes=classes, sample_weight=sw_in)

# VAL評価
proba_val = clf.predict_proba(X_val)[:,1]
yhat_val  = (proba_val >= THRESHOLD).astype(int)
prec, rec, f1, _ = precision_recall_fscore_support(y_val, yhat_val, labels=[1], zero_division=0)
prec, rec, f1 = float(prec[0]), float(rec[0]), float(f1[0])
roc = float(roc_auc_score(y_val, proba_val))
pr  = float(average_precision_score(y_val, proba_val))

そして、モデルを最新バージョンとして保存します。

# 保存 v+1
new_version = cur_version+1
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
model_path=f"{BASE_DIR}/v{new_version:04d}_{ts}_{uuid.uuid4().hex[:8]}.joblib"
joblib.dump(clf, model_path)

spark.sql(f"""
INSERT INTO {STATE}
VALUES ('{MODEL_ID}', {new_version}, '{model_path}',
        'SGDClassifier(log_loss)+partial_fit', current_timestamp(),
        'incremental: latest model + INCR only, eval VAL (fixed)',
        0.0, {THRESHOLD}, {len(y_val)}, {len(y_in)},
        {prec}, {rec}, {f1}, {roc}, {pr})
""")

モデルが追加で保存されていることがわかります。

スクリーンショット 2025-08-31 19.31.40.png
スクリーンショット 2025-08-31 19.31.50.png

推論

追加学習したモデルを使って予測します。

# 予測(predict_input→predict_output)
pdf_pin = spark.table(P_IN).na.drop().toPandas()
X_pred = pdf_pin[[f"V{i}" for i in range(1,29)]].copy()
X_pred["log_amount"] = np.log1p(pdf_pin["Amount"].astype(float))
proba = clf.predict_proba(X_pred)[:,1]; yhat=(proba>=THRESHOLD).astype(int)

pred_out = pd.DataFrame({
  "req_id": pdf_pin["req_id"].astype("int64"),
  "predict_time": pd.Series([datetime.now(timezone.utc)]*len(yhat), dtype="datetime64[ns]"),
  "y_pred": yhat.astype("int32"),
  "score": proba.astype("float64"),
  "model_id": MODEL_ID,
  "model_version": new_version
})
spark.createDataFrame(pred_out).write.mode("append").saveAsTable(P_OUT)

ジョブ実行

追加学習と予測は1つの Notebook にまとめておき、ジョブを想定したスケジュール実行をやってみます。
Notebookの右上で簡単にスケジュール登録ができます。

スクリーンショット 2025-08-31 19.41.41.png

設定した時間を待って見てみると、成功したようです。

スクリーンショット 2025-08-31 19.43.20.png

詳細な結果も見ることができます。

スクリーンショット 2025-08-31 19.46.06.png

ちなみに、追加で最新モデルが保存されていました。

スクリーンショット 2025-08-31 19.47.07.png
スクリーンショット 2025-08-31 19.47.16.png

バッチ処理をイメージしたジョブ実行も試してみることができました。

おわりに

Databricks Free Edition を使って、ミニ AI 基盤を試作してみました。
今回はクレジットカード不正利用の題材を選んだことで、実務に近い「不均衡データの難しさ」を体感できたのも面白く感じました。

今後、仕事でもDatabricksに触れる機会が出てきたらいいなと感じた良い経験でした。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?