0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

データ基盤の設計思想:Databricksのスキーマ推論・進化

0
Posted at

1. はじめに:なぜスキーマ推論・進化が重要なのか

データエンジニアリングの現場では、「データの形が毎回少しずつ違う」 という問題がつきまといます。
特に、IoTデバイスのログや業務アプリからのイベントデータ、API経由で取得するJSONなど、構造が頻繁に変わるデータを扱うとき、この課題は顕著です。

こうした変化に都度手作業で対応していると、パイプラインのメンテナンスコストが跳ね上がり、運用が回らなくなります。
そこで登場するのが、Databricksの「スキーマ推論(Schema Inference)」と「スキーマ進化(Schema Evolution)」 です。
これは、データの構造を自動で理解し、必要に応じて変化に対応する仕組みです。

スキーマ推論・スキーマ進化とは

「スキーマ」とは、データの列名や型、階層構造など、データの設計図のようなものです。

image.png

Databricksではデータを取り込むと列名・それぞれのデータ型を勝手に定義してくれる

ETLやデータ基盤の設計では、このスキーマをどのタイミングで決めるかが重要になります。

ここでよく出てくるのが次の2つの考え方です。

モデル 説明
スキーマ・オン・ライト(Schema-on-Write) データを保存する前に、スキーマを明示的に定義して書き込む方式。RDBやDWHなどが代表例。
スキーマ・オン・リード(Schema-on-Read) データを読み込むときにスキーマを適用する方式。データレイクや半構造化データで多用される。

データレイクのように柔軟な構造を扱う環境では、スキーマ・オン・リードが主流です。
ただし、自由度が高い分「スキーマをどう解釈するか」という問題が発生します。
たとえば、同じキーでも日によってデータ型が違ったり、列が増えたり減ったりするケースです。
このような状況で役立つのが、スキーマ推論
ファイルの中身をサンプリングして、自動的に列名とデータ型を判断する機能です。

Databricksが「自動でスキーマを理解・進化させる」仕組みを持つ理由

Databricksは、レイクハウスアーキテクチャを支えるプラットフォームとして、
構造化・半構造化・非構造化データをすべて統一的に扱うことを目指しています。

このために、

  • Autoloader によるリアルタイム取込+自動スキーマ検出
  • Delta Lake によるスキーマ進化(ALTER TABLEを使わずに列を追加)
  • Rescue Data Column による未知フィールドの自動保存

といった仕組みが備わっています。

つまりDatabricksでは、「スキーマを固定して守る」よりも
“変化を前提に、安全に受け入れる” という考え方が基本になっています。
この設計思想こそが、データレイクハウス時代の運用負荷を大きく減らすポイントです。

2. Autoloaderによるスキーマ推論の仕組み

Databricksの Autoloader は、クラウドストレージ上の新規データを自動で検知・取り込みできる仕組みです。
特徴は、単なるバッチ取り込みではなく、「増分処理」+「スキーマ自動検出」 を同時に実現していること。
データが増えても構造が変わっても止まらないこと がAutoloaderの強みです。

Autoloaderの概要

Autoloaderは内部的にストレージイベント通知やディレクトリスキャンを用い、
「まだ読み込まれていないファイルだけ」をインクリメンタルに取り込みます。

df = (spark.readStream
      .format("cloudFiles")                     # AutoLoader(増分読み込み用フォーマット)
      .option("cloudFiles.format", "json")      # 入力データのフォーマット
      .option("cloudFiles.schemaLocation",      # スキーマ定義を保存するパス(推論結果のキャッシュ)
              "/mnt/schema/")
      .load("/mnt/input/"))                     # 入力元のディレクトリ(クラウドストレージなど)

.option("cloudFiles.schemaLocation", "/mnt/schema/") が重要です。
Autoloaderが推論したスキーマ情報を保存し、次回の読み込み時に再利用するキャッシュとして機能します。
これがないと、毎回全ファイルをスキャンして推論し直すため、コストも時間もかかります。

サンプリング方式

スキーマ推論は、最初のデータ読み込み時に「サンプルデータ」をもとに行われます。
公式ドキュメントによると、Autoloaderは次のようなサンプリング戦略を取ります。

  • 1,000ファイルまたは50GBまでを対象にサンプリング(超過した場合はそこで打ち切り)
  • ファイル形式によって推論戦略が異なる(JSONやCSVは中身をスキャン、ParquetやAvroはメタ情報から抽出)
  • 設定値は内部オプション
    (例:spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles)で調整可能

これにより、データのボリュームが大きくても効率的にスキーマを判断できます。

schemaLocation によるスキーマキャッシュ

schemaLocation に指定したパスには、以下のようなメタデータが保存されます。

  • 最新のスキーマ(推論済みの構造)
  • 変更履歴(スキーマ進化時に追加された列の情報)
  • rescueDataColumnなどの例外フィールド情報

この仕組みにより、次回以降の実行では 「スキーマを再推論せずに」前回の構造を再利用 できます。

Autoloaderには、inferColumnTypes, schemaHints, cloudFiles.schemaInference など、
推論を制御するためのいくつかのオプションがあります。

オプション名 役割
cloudFiles.inferColumnTypes 推論時に数値やブール型を自動識別する(falseの場合すべてstring) "true"
cloudFiles.schemaHints 列ごとに型を事前指定できる "age INT, price DOUBLE"
cloudFiles.schemaInference.mode 推論の実行タイミング(rescue, addNewColumns, noneなど) "addNewColumns"

実装例:

df = (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")       # CSV形式を読み込む
      .option("cloudFiles.inferColumnTypes",    # 数値や日時を自動推論(falseなら全てstring)
              "true")
      .option("cloudFiles.schemaHints",         # 列単位で型を明示指定(誤推論防止)
              "id INT, created_at TIMESTAMP")
      .option("cloudFiles.schemaLocation",      # スキーマを保存するディレクトリ
              "/mnt/schema/")
      .load("/mnt/input/"))

このように設定することで、誤推論(例:数字をstring扱いするなど)を防ぎながら、柔軟なスキーマ制御が可能になります。

フォーマットごとの違い

フォーマット 推論方法 特徴
JSON / CSV ファイル内容をサンプリングして列名・型を解析 柔軟だがサンプリングコスト高
Parquet / Avro メタ情報からスキーマを直接抽出 高速かつ正確だが柔軟性は低い
Binary / Image 固定スキーマまたはメタ情報のみ ML用途などに限定される

たとえば、JSONを扱う場合は半構造化データのため推論コストが高くなります。
このため、頻繁にスキーマが変わる場合は Parquet 変換してから扱う方が安定的です。

3. スキーマ進化の対応

スキーマ推論は「いまのデータ構造を理解する」機能ですが、
スキーマ進化(Schema Evolution) は「変化を受け入れて更新する」仕組みです。
現場では、新しい列が追加されたり、型が変わったりといった変更が頻繁に起こります。
DatabricksのAutoloaderは、そうした変更を自動的に取り込む設計になっています。

addNewColumns / rescueDataColumn モードの動作

Autoloaderは、スキーマの変化を検知した際にいくつかの動作モードを選べます。

モード名 動作内容
addNewColumns 新しい列を自動でスキーマに追加し、テーブルを更新する(最も一般的)
rescue 予期しない列を _rescued_data という構造体列にまとめて保存(データ損失防止)
none / failOnNewColumns 新しい列があるとエラーを発生させ、手動対応を促す

この設定は .option("cloudFiles.schemaEvolutionMode", "addNewColumns") のように指定します。

実装例:Autoloaderのスキーマ進化設定

df = (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("cloudFiles.schemaLocation", "/mnt/schema/")
      .option("cloudFiles.inferColumnTypes", "true")
      .option("cloudFiles.schemaEvolutionMode", # 新しい列が追加されたときの挙動を制御
              "addNewColumns")                  # 自動で新しい列を追加
      .load("/mnt/input/"))

上記設定では、新しい列が出現してもジョブが停止せず、自動的にDeltaテーブルへ追加されます。
一方で、スキーマ変更を無制限に許すと構造が不安定になるため、
開発環境では addNewColumns、本番では rescue モード のように環境別運用を推奨されています。

Delta Lakeとの関係:スキーマ変更をどう反映させるか

Autoloaderが推論・進化させたスキーマは、通常Delta Lakeテーブルに保存されます。
このとき、Delta Lake側でもスキーマの整合性を保つために
mergeSchema オプションを設定する必要があります。

実装例:

(df.writeStream
   .option("mergeSchema", "true")               # Delta側でもスキーマ変更を許可
   .option("checkpointLocation",                # ストリーミング状態を保存(必須)
           "/mnt/checkpoints/")
   .trigger(availableNow=True)                  # すぐに1回分を処理(バッチ的な動作)
   .table("bronze.events"))                     # 書き込み先のDeltaテーブル名

これにより、Autoloaderが新しい列を追加してもDeltaテーブルに自動反映されます。
逆にこの設定がないと、スキーマ不一致エラーが発生します。

実運用での注意点

スキーマ進化を本番で使う場合、次の3点には注意が必要です。

  1. 型変更(int → stringなど)は非互換
    → 自動では変換されず、明示的なCASTや中間処理が必要。
  2. チェックポイントとの整合性
    → ストリーミングは過去のスキーマをチェックポイントに保存しているため、
    スキーマ変更後に再起動するときは再推論・再設定が必要になることがあります。
  3. statefulな処理との相性
    groupByaggregate など状態を保持する処理では、
    スキーマ進化がトリガーでジョブが再起動するケースもあります。

AutoloaderとDelta Lakeのスキーマ進化機能を組み合わせることで、
従来のETLのように「スキーマ変更=障害」ではなく、
変化を前提とした柔軟なパイプラインを構築できます。

ただし、自動化しすぎるとスキーマドリフト(想定外の構造変化)を招くため、
「ガイド付きの自動化(ヒント・契約・監視)」を組み合わせることが重要です。

まとめ

本記事では、AutoLoader/Delta Lake を活用した Databricks における スキーマ推論(Schema Inference)スキーマ進化(Schema Evolution) の仕組みと実装を整理しました。

  • データレイク環境では、データ構造が時間とともに変化するため、手動でスキーマを固定していくと運用コスト・障害リスクが高まるという背景があります。
  • Auto Loader によるスキーマ推論機能により、初回取り込み時にサンプリングして構造を自動的に判断し、以降はスキーマキャッシュ(schemaLocation)を活用することで処理の安定化を図れます。
  • スキーマ進化にも対応しており、例えば addNewColumns モードや rescue モードを設定することで「新しい列が来ても止まらない」設計が可能です。
  • ただし、スキーマ変更には運用上の注意点があります。チェックポイント、状態管理、ストリーミング再起動など設計を誤ると障害につながります。
  • 実運用では、完全自動化ではなく「ヒントを活用する」「スキーマ契約を設ける」など、制御された自動化が鍵となります。
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?