Configuring schema inference and evolution in Auto Loader | Databricks on AWS [2022/5/23時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Auto Loaderでは、ご自身のデータに新たなカラムに追加されたことを自動で検知して再起動するので、自分でスキーマ変更への対応を行う必要はありません。また、Auto Loaderは予期しないデータをJSONのblobカラムに「レスキュー」することができるので、準構造化データアクセスAPIを用いて後でアクセスすることもできます。
以下のフォーマットでスキーマ推定とスキーマ進化をサポートしています。
ファイルフォーマット | サポートバージョン |
---|---|
JSON | Databricksランタイム8.2以降 |
CSV | Databricksランタイム8.3以降 |
Avro | Databricksランタイム10.2以降 |
Parquet | 開発中 |
ORC | 未サポート |
Text | 適用されません(固定スキーマ) |
Binaryfile | 適用されません(固定スキーマ) |
スキーマ推定
スキーマを推定するためには、Auto Loaderは最初の50GBあるいは1000ファイル、どちらか先にリミットに到達する方をサンプリングします。ストリームが起動するたびにこの推定のコストが発生することを避け、あるいは、安定版のスキーマをストリームの再起動の際に指定できるようにするには、オプションcloudFiles.schemaLocation
を指定しなくてはなりません。Auto Loaderは入力データにおけるスキーマ変更を追跡するために、この場所に_schemas
という隠しディレクトリを作成します。データを取り込むためにお使いのストリームに単一のcloudFiles
が存在する場合には、cloudFiles.schemaLocation
として、チェックポイントの場所を指定することができます。そうではない場合、このオプション固有のディレクトリを指定します。入力データがストリームが想定しないスキーマを返した際には、指定したスキーマの格納場所が単一のAuto Loaderソースによってのみ使用されていることを確認してください。
注意
サンプリングに使用するデータのサイズを変更するために、SQL設定を行うことができます。
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(バイト文字列、例えば10gb
)
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(integer)
デフォルトでは、CSVやJSONのようなテキストベースのファイルフォーマットでは、カラムをstring
カラムと推定します。JSONデータセットにおいては、ネストされたカラムもstring
カラムと推定されます。JSONやCSVデータは自己説明型であり、多くのデータ型をサポートしているので、データを文字列として推定することで、数値型のミスマッチ(integer、long、float)のようなスキーマ進化の問題を回避する役に立ちます。オリジナルのSparkのスキーマ推定の挙動を維持したいのであれば、オプションcloudFiles.inferColumnTypes
をtrue
に設定します。
注意
ケースセンシティブが有効化されていない限り、カラムabc
、Abc
、ABC
は、スキーマ推定においては同じカラムとみなされます。どのケースが選択されるかは任意であり、サンプルデータに依存します。どのケースが使用されるべきかを強制するためにスキーマヒントを使用することができます。選択が行われスキーマが推定されると、Auto Loaderは選択されなかったケースのバリエーションはスキーマと生合成があるとは見做しません。これらのカラムはレスキューデータカラムから見つけ出す必要があるかもしれません。
また、Auto LoaderはデータがHiveスタイルのパーティショニングに従って配置されている場合、ディレクトリの構造に基づいてパーティションカラムを推定します。例えば、base_path/event=click/date=2021-04-01/f0.json
のようなファイルパスは、パーティションカラムとしてdate
カラムとevent
カラムを推定することになります。cloudFiles.inferColumnTypes
をtrueに設定しない限り、これらのカラムのデータ型は文字列となります。ディレクトリ構成にHiveパーティションと競合する内容が含まれていたり、Hiveスタイルのパーティションが含まれていない場合、パーティションカラムは無視されます。ディレクトリ構成にkey=value
ペアとしてカラムが存在する場合、ファイルパスから特定のカラムを常にパースしようとするように、カンマ区切りのカラム名のリストをcloudFiles.partitionColumns
オプションに指定することができます。
注意
バイナリーファイル(binaryFile
)とtext
ファイルフォーマットでは固定のデータスキーマがありますが、パーティションカラムの推定もサポートされています。cloudFiles.schemaLocation
を指定しない限り、ストリームの再起動ごとにパーティションカラムが推定されます。これらのフォーマットではcloudFiles.schemaLocation
は必須のオプションでは無いので、潜在的なエラーや情報のロスを避けるために、cloudFiles.schemaLocation
あるいはcloudFiles.partitionColumns
オプションを設定することをお勧めします。
スキーマヒント
推定されたデータ型は、常にあなたが期待しているものになるとは限りません。スキーマヒントを用いることで、既知の情報を強制したり、推定されるスキーマに対する期待値を強制することができます。
デフォルトでは、Apache Sparkはデータカラムの型を推定するために標準的なアプローチを取ります。例えば、ネストされたJSONを文字列、integerをlongとして推定します。一方、Auto Loaderは全てのカラムを文字列と考えます。あるカラムが特定のデータタイプであることを知っている場合、あるいは、より汎用的なデータ型を選択したいと考えた場合(例えば、integerではなくdouble)、以下のようにカラムのデータ型に関して任意の数のヒントを指定することができます。
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
サポートされているデータ型の一覧に関してはdata typesのドキュメントを参照ください。
ストリーム起動時にカラムが存在していない場合、推定されるスキーマに当該カラムを追加するためにスキーマヒントを使うこともできます。
スキーマヒントを用いた際の挙動を確認するための推定スキーマの例を示します。推定されたスキーマが以下のようなものだとします。
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
以下のスキーマヒントを指定します。
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
以下のようになります。
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
注意
ArrayとMapのスキーマヒントは、Databricksランタイム9.1LTS以降で利用できます。
スキーマヒントの挙動を確認するための、複雑なデータ型の推定スキーマの例を示します。
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
以下のスキーマヒントを指定します。
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
以下のようになります。
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
注意
スキーマヒントはAuto Loaderにスキーマを指定しない場合にのみ利用できます。cloudFiles.inferColumnTypes
が有効化されていても無効化されていてもスキーマヒントを使用することができます。
スキーマ進化
Auto Loaderはデータを処理している際に新たなカラムの追加を検知します。デフォルトでは、新規カラムの追加はストリームのUnknownFieldException
による停止を引き起こします。ストリームがこのエラーを生じさせる前に、Auto Loaderは最新のマイクロバッチのデータに対してスキーマ推定を実施し、最新のスキーマを用いてスキーマの格納場所を更新します。新規カラムはスキーマの最後に追加されます。既存カラムのデータ型は変更されません。DatabricksジョブでAuto Loaderのストリームを設定することで、このようなスキーマ変更があった際にストリームは自動で再起動します。
Auto Loaderでは、cloudFiles.schemaEvolutionMode
オプションで指定できる以下のモードのスキーマ進化モードをサポートしています。
-
addNewColumns
: Auto Loaderにスキーマが提供されていない場合のデフォルトのモード。ストリーミングジョブはUnknownFieldException
で失敗します。スキーマに新規カラムが追加されます。既存カラムのデータ型は進化しません。ストリームのスキーマが提供されている場合にはaddNewColumns
は許可されません。このモードを使いたい場合には、スキーマヒントとしてスキーマを提供します。 -
failOnNewColumns
: Auto Loaderが新規カラムを検知した際にストリームは失敗します。違反しているデータが削除されるか、指定されているスキーマが更新されない限り、ストリームを再起動することはできません。 -
rescue
: ストリームは一番最初に指定されたスキーマあるいは指定されたスキーマで動作します。データ型が変化した、あるいは新規カラムが追加された際、_rescued_data
としてストリームのスキーマに自動的に追加されるレスキューデータカラムに救助されます。このモードでは、スキーマ変更でストリームが失敗しません。 -
none
: スキーマが指定されている際のデフォルトモード。スキーマは進化せず、新規カラムは無視され、オプションとしてレスキューデータカラムが指定されない限りデータは救助されません。
スキーマ進化ではパーティションカラムは考慮されません。初期のディレクトリ構成がbase_path/event=click/date=2021-04-01/f0.json
のようなものであり、新たなファイルをase_path/event=click/date=2021-04-01/hour=01/f1.json
という形で受け取り始めた際、hourカラムは無視されます。新たなパーティションカラムの情報を捕捉するためには、cloudFiles.partitionColumns
にevent,date,hour
を設定します。
レスキューデータカラム
レスキューデータカラムを用いることで、ETLの間にデータを失うことや見逃すことを避けることができます。レスキューデータカラムには、指定されたスキーマに含まれていない、型のミスマッチがあった、レコードのカラムの大文字小文字が異なった、ファイルがスキーマにマッチしなかったなどの理由でパースできなかった全てのデータが格納されます。レスキューデータカラムは救助されたカラム、レコードのソースファイルパス(ソースファイルパスはDatabricksランタイム8.3以降で利用できます)を含むJSONのblobとして返却されます。レスキューデータカラムからソースファイルパスを除外するには、SQL設定spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
を設定します。レスキューデータカラムは、スキーマが推定された際に、デフォルトでは_rescued_data
としてAuto Loaderによって返却されるスキーマに含まれます。rescuedDataColumn
オプションを設定することで、カラム名を変更したり、スキーマを指定した場合にレスキューデータカラムを含めることができます。
spark.readStream.format("cloudFiles").option("cloudFiles.rescuedDataColumn", "_rescued_data").option("cloudFiles.format", <format>).schema(<schema>).load(<path>).
cloudFiles.inferColumnTypes
のデフォルト値はfalse
であり、スキーマが推定された際のcloudFiles.schemaEvolutionMode
のデフォルトはaddNewColumns
なので、rescuedDataColumn
にはスキーマと異なるカラムのみが捕捉されます。
JSONとCSVのパーサーはレコードをパースする際に3つのモードをサポートしています: PERMISSIVE
, DROPMALFORMED
, FAILFAST
です。rescuedDataColumn
と一緒に使用した場合、データ型のミスマッチはDROPMALFORMED
モードでのレコード削除を行いませんし、FAILFAST
モードでエラーになりません。破損したレコード、不完全あるいは不正なJSON、CSVのみが削除されたりエラーを引き起こしたりします。JSONやCSVをパースする際にbadRecordsPath
を使用している場合、rescuedDataColumn
を使っている場合にはデータ型のミスマッチは不正レコードとみなされません。不完全あるいは不正なJSON、CSVレコードのみがbadRecordsPath
に格納されます。
制限
-
foreachBatch
を使用するDatabricksランタイム8.2、8.3が稼働しているPythonアプリケーションではスキーマ進化はサポートされていません。代わりにScalaのforeachBatch
を使用することができます。