LoginSignup
3
2

More than 1 year has passed since last update.

Delta Lakeにダイビング:スキーマの強制、進化

Last updated at Posted at 2021-06-28

Schema Evolution & Enforcement on Delta Lake - Databricksの翻訳です。

サンプルノートブックをこちらからダウンロードできます。

我々の経験では、データは常に進化し蓄積していくものだと言えます。これについていくためには、我々の世界に対するメンタルモデルを、これまで見たことがないような概念に対する新たな観点をもって、新たな次元を含む新たなデータに適合させる必要があります。これらのメンタルモデルはテーブルのスキーマのようなものではなく、新たな情報をどのようにカテゴリ分けして処理するのかを定義するものです。

これによってスキーマ管理が必要となります。ビジネスの課題や要件は時とともに進化し、データもまた同様です。Delta Lakeを活用することで、データの変更に合わせて、新たな次元を取り込むことが容易となります。ユーザーはテーブルのスキーマを制御するためのシンプルなセマンティクスにアクセスできます。これらのツールには、ユーザーの間違いや、ゴミのデータを登録することによってテーブルを汚してしまうことを防ぐスキーマ強制(schema enforcement)と、新たなデータを追加した際に自動でカラムを追加するスキーマ進化(schema evolution)が含まれます。このブログでは、これらのツールの使用法にダイビングします。

テーブルのスキーマを理解する

Apache Spark™におけるすべてのデータフレームには、データタイプ、カラム、メタデータのようなデータの形状を定義する設計図であるスキーマが含まれます。Delta Lakeにおいては、テーブルスキーマはトランザクションログ内にJSONフォーマットとして保存されます。

スキーマ強制とは何か?

スキーマ強制、あるいはスキーマ検証(schema validation)は、テーブルスキーマに合致しないテーブルへの書き込みを拒否することでデータ品質を確保するDelta Lakeにおける安全装置です。予約している方のみを受け付ける忙しいレストランのフロントデスクマネージャーのように、テーブルにインサートされるデータのそれぞれのカラムが、期待されるカラムのリストにあるか(言い換えれば、それぞれが"予約"しているか)をチェックし、リストにないカラムへの書き込みは拒否します。

スキーマ強制はどのように動作するのか?

Delta Lakeはスキーマ強制を書き込み時に行います。これは、新たなデータをテーブルに書き込む際に、ターゲットとなるテーブルスキーマに対する互換性をチェックすると言うことを意味します。スキーマに互換性がない場合、Delta Lakeは全てのトランザクションをキャンセルし(データは書き込まれません)、ユーザーに対してミスマッチであることを知らせる例外を発生させます。

テーブルに互換性があるかどうかを判断するために、Delta Lakeは以下のルールを適用します。書き込まれるデータフレームは:

  • ターゲットテーブルのスキーマに存在しない追加カラムを含んではいけません。逆に、追加するデータにターゲットテーブルにある全てのカラムが存在する必要はありません。存在しないカラムにはnull値が割り当てられます。
  • ターゲットテーブルと異なるデータタイプを持ってはなりません。ターゲットテーブルがStringTypeデータを有しているのに、データフレームで対応するカラムがIntegerTypeであった場合、スキーマ強制は例外を発生させ、書き込み処理をキャンセルします。
  • 大文字小文字のみが異なるカラムを含んではいけません。これは、同じテーブルにおいて、fooと言う列とFooと言う列を持つことはできないことを意味します。Sparkにおいては大文字小文字を区別するモードと大文字小文字を区別しないモード(デフォルト)がある一方で、Delta Lakeは大文字小文字を区別しますが、スキーマを保存する際には大文字小文字を区別しません。Parquetはカラム情報を保存、返却する際には大文字小文字を区別します。データ破損や損失など潜在的な間違い(実際Databricksにおいて体験しました)を避けるために、我々はこの制限を追加する決断をしました。

実際に説明するために以下のコードを用いて、Delta Lakeテーブルで受け付けられる設定がなされていない場合、新たに計算したカラムを追加しようとした際に、何が起きるのかをみてみましょう。

# Generate a DataFrame of loans that we'll append to our Delta Lake table
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Show original DataFrame's schema
original_loans.printSchema()

"""
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
"""

# Show new DataFrame's schema
loans.printSchema()

"""
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
"""

# Attempt to append new DataFrame (with new column) to existing table
loans.write.format("delta") \
           .mode("append") \
           .save(DELTALAKE_PATH)

""" Returns:

A schema mismatch detected when writing to the Delta table.

To enable schema migration, please set:
'.option("mergeSchema", "true")\'

Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)


Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)

If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

"""

新たなカラムを自動で追加するのではなく、Delta Lakeはスキーマを強制し、書き込みを停止します。どのカラムがミスマッチだったのかを特定できるように、Sparkはスタックトレースに両方のスキーマを表示し比較できるようにします。

なぜスキーマ強制は有用なのか?

このような厳密なチェックによって、実運用での利用に即したクリーンかつ完全に変換されたデータに対する門番としてスキーマ強制は機能します。通常、以下のユースケースで用いられるテーブルでスキーマ強制は行われます。

  • 機械学習アルゴリズム
  • BIダッシュボード
  • データ分析、可視化ツール
  • 高度に構造化され、型付け、セマンティックスキーマが求められる実運用システム

この最後のハードルに向けてデータを準備するために、多くのユーザーは、順次テーブルに構造を追加するシンプルな"マルチホップ"アーキテクチャを採用します。詳細に関してはブログ記事Productionizing Machine Learning With Delta Lakeをご覧ください。

もちろん、スキーマ強制はパイプラインのどこでも利用できるものですが、例えば、テーブルに対するストリーミングによる書き込みを行う際、一つのカラムを追加し忘れた場合にエラーとなることは少々フラストレーションになることにご注意ください。

データの希薄化を防ぐ

ここまで来て、自身に問い掛けるかもしれません。「この騒ぎはなんだ?」と。結局、特にDelta Lakeを使い始めた際には、予期しない「スキーマミスマッチ」エラーがワークフローに現れ、驚くことになるかもしれません。なぜ、どのようなデータフレームも記述できるように必要なスキーマを変化させないのか?

「1オンスの防御は、1ポンドの回復に値します」と古語にあります。ある時点で、スキーマ強制を行わなかったとしたら、データ型互換性に関する問題は、その醜い頭をもたげることになります。ひと目では均質に見える生データには、夜中に問題を引き起こす、極端なケース、破損したカラム、間違ったマッピング、その他の恐ろしいことが含まれています。より良いアプローチは、スキーマ強制を使って、入り口でこれらの敵を止めることです。先送りするのではなく、日中にこれらに対応して、かれらが実運用のコードに密かに忍び込むのを防ぐべきです。

スキーマ強制は、明示的に変更しない限り、テーブルのスキーマは変化しないと言う心の平安をもたらします。これはデータの希薄化(dilution)、以前はリッチで簡潔だったテーブルが頻繁にカラムが追加されることで、データの洪水によって意味と有用性を失ってしまう現象を防ぐことができます。意図的に高い標準を設け、高い品質を期待することで、スキーマ強制は意図した通りに動作し、テーブルをクリーンな状態に維持できます。

もし、更なるレビューを経て、本当に新たなカラムを追加することを決断したのであれば、以下に示すように簡単に一行の修正で対応できます。このソリューションがスキーマ進化です!

スキーマ進化とは何か?

スキーマ進化によって、データの変更に合わせて、ユーザーが簡単にテーブルの現状のスキーマを容易に変更できます。もっとも一般的なケースとしては、追加、上書きオペレーションを行う際に、一つ以上の新たなカラムを追加する際にスキーマ進化が用いられます。

スキーマ進化はどのように動作するのか?

以前の例ではスキーマミスマッチでリジェクトされましたが、新たなカラムを追加するために開発者は容易にスキーマ進化を活用できます。Sparkコマンドwriteあるいは.writeStreamoption('mergeSchema', 'true')を追加することでスキーマ進化が有効化されます。

# Add the mergeSchema option
loans.write.format("delta") \
           .option("mergeSchema", "true") \
           .mode("append") \
           .save(DELTALAKE_SILVER_PATH)

プロットを参照するには、以下のSpark SQLを実行します。

# Create a plot with the new column to confirm the write was successful
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

あるいは、Sparkの設定にspark.databricks.delta.schema.autoMerge = Trueを追加することで、Sparkセッション全体で上のオプションを適用することができます。この設定を行うと、スキーマ強制は意図しないスキーマミスマッチに対して警告を出さなくなるので注意して使用してください。

クエリーにmergeSchemaオプションを含めることで、データフレームに存在するが、ターゲットテーブルに存在しないすべてのカラムは、書き込みトランザクションの際に自動的にスキーマの末尾に追加されます。ネストされたフィールドの追加も可能で、これらのフィールドは対応するstructカラムの末尾に追加されます。

データエンジニア、データサイエンティストは、新たなカラム(新たにトラッキングされたメトリックや今月のセールスなど)を旧来のカラムに依存する既存のモデルの動作を壊すことなしに、既存の機械学習プロダクションテーブルに追加する際にこのオプションを使用できます。

以下のスキーマ変更ではテーブル追加や上書きの際にスキーマ進化を利用できます。

  • 新たなカラムの追加(最も一般的なシナリオです)
  • データタイプをNullTypeから他のタイプに変更、あるいはByteType -> ShortType -> IntegerTypeへのアップキャスト

スキーマ進化を利用できない他の変更では、.option("overwriteSchema", "true")を追加して、スキーマとデータを上書きする必要があります。例えば、元々はinteger型であった"Foo"カラムを、新たスキーマではstring型にする場合、全てのParquet(データ)ファイルは再書き込みされる必要があります。このような変更には以下が含まれます。

  • カラムのドロップ
  • 既存カラムタイプの変更(インプレースで)
  • 大文字小文字が異なる場合にはカラム名を変更("Foo"と"foo")

最後に、もう直ぐリリースされるSpark3.0(リリース済み)では、明示的DLL(ALTER TABLEを使用)が完全にサポートされ、ユーザーはテーブルスキーマに対して以下の操作を行えるようになります:

  • カラムの追加
  • カラムコメントの変更
  • トランザクションログの保持期間など、テーブルの振る舞いを定義するテーブルプロパティの設定

なぜスキーマ進化は有用なのか?

スキーマ進化は、(データフレームに意図せず存在してはならないカラムを追加してしまうケースとは逆に)あなたがテーブルを進化させたいと考えた際にいつでも利用できます。明示的に宣言を行うことなしに、適切なカラム名とデータタイプを自動で追加するので、スキーマを移行するには最も簡単な方法となります。

まとめ

スキーマ強制は、あなたのテーブルと互換性のないカラムの追加やスキーマ変更を拒否します。これの高い標準を掲げ、設定することで、分析者やエンジニアは使っているデータを信頼でき、高い一貫性、明確さに関する理由付けを行うことができ、より優れたビジネス上の意思決定を行えるようになります。

コインの反対側として、スキーマ進化は、意図したスキーマ変更を自動的に行うことで、スキーマ強制を補完します。結果的に、カラムの追加が容易になります。

スキーマ強制はスキーマ進化の陽に対する陰です。一緒に活用することで、これらの機能はノイズをブロックするのが容易になるだけでなく、シグナルに集中できるようになります。

この記事を執筆するに際して、Mukul MurthyとPranav Anandに感謝の意を評します。

関連記事

その他のリソース

Databricks 無料トライアル

Databricks 無料トライアル

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2