3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Auto Loaderにおけるスキーマ推定とスキーマ進化の設定

Last updated at Posted at 2022-07-01

Configuring schema inference and evolution in Auto Loader | Databricks on AWS [2022/5/23時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Auto Loaderでは、ご自身のデータに新たなカラムに追加されたことを自動で検知して再起動するので、自分でスキーマ変更への対応を行う必要はありません。また、Auto Loaderは予期しないデータをJSONのblobカラムに「レスキュー」することができるので、準構造化データアクセスAPIを用いて後でアクセスすることもできます。

以下のフォーマットでスキーマ推定とスキーマ進化をサポートしています。

ファイルフォーマット サポートバージョン
JSON Databricksランタイム8.2以降
CSV Databricksランタイム8.3以降
Avro Databricksランタイム10.2以降
Parquet 開発中
ORC 未サポート
Text 適用されません(固定スキーマ)
Binaryfile 適用されません(固定スキーマ)

スキーマ推定

スキーマを推定するためには、Auto Loaderは最初の50GBあるいは1000ファイル、どちらか先にリミットに到達する方をサンプリングします。ストリームが起動するたびにこの推定のコストが発生することを避け、あるいは、安定版のスキーマをストリームの再起動の際に指定できるようにするには、オプションcloudFiles.schemaLocationを指定しなくてはなりません。Auto Loaderは入力データにおけるスキーマ変更を追跡するために、この場所に_schemasという隠しディレクトリを作成します。データを取り込むためにお使いのストリームに単一のcloudFilesが存在する場合には、cloudFiles.schemaLocationとして、チェックポイントの場所を指定することができます。そうではない場合、このオプション固有のディレクトリを指定します。入力データがストリームが想定しないスキーマを返した際には、指定したスキーマの格納場所が単一のAuto Loaderソースによってのみ使用されていることを確認してください。

注意
サンプリングに使用するデータのサイズを変更するために、SQL設定を行うことができます。

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(バイト文字列、例えば10gb)

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(integer)

デフォルトでは、CSVやJSONのようなテキストベースのファイルフォーマットでは、カラムをstringカラムと推定します。JSONデータセットにおいては、ネストされたカラムもstringカラムと推定されます。JSONやCSVデータは自己説明型であり、多くのデータ型をサポートしているので、データを文字列として推定することで、数値型のミスマッチ(integer、long、float)のようなスキーマ進化の問題を回避する役に立ちます。オリジナルのSparkのスキーマ推定の挙動を維持したいのであれば、オプションcloudFiles.inferColumnTypestrueに設定します。

注意
ケースセンシティブが有効化されていない限り、カラムabcAbcABCは、スキーマ推定においては同じカラムとみなされます。どのケースが選択されるかは任意であり、サンプルデータに依存します。どのケースが使用されるべきかを強制するためにスキーマヒントを使用することができます。選択が行われスキーマが推定されると、Auto Loaderは選択されなかったケースのバリエーションはスキーマと生合成があるとは見做しません。これらのカラムはレスキューデータカラムから見つけ出す必要があるかもしれません。

また、Auto LoaderはデータがHiveスタイルのパーティショニングに従って配置されている場合、ディレクトリの構造に基づいてパーティションカラムを推定します。例えば、base_path/event=click/date=2021-04-01/f0.jsonのようなファイルパスは、パーティションカラムとしてdateカラムとeventカラムを推定することになります。cloudFiles.inferColumnTypesをtrueに設定しない限り、これらのカラムのデータ型は文字列となります。ディレクトリ構成にHiveパーティションと競合する内容が含まれていたり、Hiveスタイルのパーティションが含まれていない場合、パーティションカラムは無視されます。ディレクトリ構成にkey=valueペアとしてカラムが存在する場合、ファイルパスから特定のカラムを常にパースしようとするように、カンマ区切りのカラム名のリストをcloudFiles.partitionColumnsオプションに指定することができます。

注意
バイナリーファイル(binaryFile)とtextファイルフォーマットでは固定のデータスキーマがありますが、パーティションカラムの推定もサポートされています。cloudFiles.schemaLocationを指定しない限り、ストリームの再起動ごとにパーティションカラムが推定されます。これらのフォーマットではcloudFiles.schemaLocationは必須のオプションでは無いので、潜在的なエラーや情報のロスを避けるために、cloudFiles.schemaLocationあるいはcloudFiles.partitionColumnsオプションを設定することをお勧めします。

スキーマヒント

推定されたデータ型は、常にあなたが期待しているものになるとは限りません。スキーマヒントを用いることで、既知の情報を強制したり、推定されるスキーマに対する期待値を強制することができます。

デフォルトでは、Apache Sparkはデータカラムの型を推定するために標準的なアプローチを取ります。例えば、ネストされたJSONを文字列、integerをlongとして推定します。一方、Auto Loaderは全てのカラムを文字列と考えます。あるカラムが特定のデータタイプであることを知っている場合、あるいは、より汎用的なデータ型を選択したいと考えた場合(例えば、integerではなくdouble)、以下のようにカラムのデータ型に関して任意の数のヒントを指定することができます。

Python
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

サポートされているデータ型の一覧に関してはdata typesのドキュメントを参照ください。

ストリーム起動時にカラムが存在していない場合、推定されるスキーマに当該カラムを追加するためにスキーマヒントを使うこともできます。

スキーマヒントを用いた際の挙動を確認するための推定スキーマの例を示します。推定されたスキーマが以下のようなものだとします。

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

以下のスキーマヒントを指定します。

Python
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

以下のようになります。

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

注意
ArrayとMapのスキーマヒントは、Databricksランタイム9.1LTS以降で利用できます。

スキーマヒントの挙動を確認するための、複雑なデータ型の推定スキーマの例を示します。

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

以下のスキーマヒントを指定します。

Python
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

以下のようになります。

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

注意
スキーマヒントはAuto Loaderにスキーマを指定しない場合にのみ利用できます。cloudFiles.inferColumnTypesが有効化されていても無効化されていてもスキーマヒントを使用することができます。

スキーマ進化

Auto Loaderはデータを処理している際に新たなカラムの追加を検知します。デフォルトでは、新規カラムの追加はストリームのUnknownFieldExceptionによる停止を引き起こします。ストリームがこのエラーを生じさせる前に、Auto Loaderは最新のマイクロバッチのデータに対してスキーマ推定を実施し、最新のスキーマを用いてスキーマの格納場所を更新します。新規カラムはスキーマの最後に追加されます。既存カラムのデータ型は変更されません。DatabricksジョブでAuto Loaderのストリームを設定することで、このようなスキーマ変更があった際にストリームは自動で再起動します。

Auto Loaderでは、cloudFiles.schemaEvolutionModeオプションで指定できる以下のモードのスキーマ進化モードをサポートしています。

  • addNewColumns: Auto Loaderにスキーマが提供されていない場合のデフォルトのモード。ストリーミングジョブはUnknownFieldExceptionで失敗します。スキーマに新規カラムが追加されます。既存カラムのデータ型は進化しません。ストリームのスキーマが提供されている場合にはaddNewColumnsは許可されません。このモードを使いたい場合には、スキーマヒントとしてスキーマを提供します。
  • failOnNewColumns: Auto Loaderが新規カラムを検知した際にストリームは失敗します。違反しているデータが削除されるか、指定されているスキーマが更新されない限り、ストリームを再起動することはできません。
  • rescue: ストリームは一番最初に指定されたスキーマあるいは指定されたスキーマで動作します。データ型が変化した、あるいは新規カラムが追加された際、_rescued_dataとしてストリームのスキーマに自動的に追加されるレスキューデータカラムに救助されます。このモードでは、スキーマ変更でストリームが失敗しません。
  • none: スキーマが指定されている際のデフォルトモード。スキーマは進化せず、新規カラムは無視され、オプションとしてレスキューデータカラムが指定されない限りデータは救助されません。

スキーマ進化ではパーティションカラムは考慮されません。初期のディレクトリ構成がbase_path/event=click/date=2021-04-01/f0.jsonのようなものであり、新たなファイルをase_path/event=click/date=2021-04-01/hour=01/f1.jsonという形で受け取り始めた際、hourカラムは無視されます。新たなパーティションカラムの情報を捕捉するためには、cloudFiles.partitionColumnsevent,date,hourを設定します。

レスキューデータカラム

レスキューデータカラムを用いることで、ETLの間にデータを失うことや見逃すことを避けることができます。レスキューデータカラムには、指定されたスキーマに含まれていない、型のミスマッチがあった、レコードのカラムの大文字小文字が異なった、ファイルがスキーマにマッチしなかったなどの理由でパースできなかった全てのデータが格納されます。レスキューデータカラムは救助されたカラム、レコードのソースファイルパス(ソースファイルパスはDatabricksランタイム8.3以降で利用できます)を含むJSONのblobとして返却されます。レスキューデータカラムからソースファイルパスを除外するには、SQL設定spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")を設定します。レスキューデータカラムは、スキーマが推定された際に、デフォルトでは_rescued_dataとしてAuto Loaderによって返却されるスキーマに含まれます。rescuedDataColumnオプションを設定することで、カラム名を変更したり、スキーマを指定した場合にレスキューデータカラムを含めることができます。

Python
spark.readStream.format("cloudFiles").option("cloudFiles.rescuedDataColumn", "_rescued_data").option("cloudFiles.format", <format>).schema(<schema>).load(<path>).

cloudFiles.inferColumnTypesのデフォルト値はfalseであり、スキーマが推定された際のcloudFiles.schemaEvolutionModeのデフォルトはaddNewColumnsなので、rescuedDataColumnにはスキーマと異なるカラムのみが捕捉されます。

JSONとCSVのパーサーはレコードをパースする際に3つのモードをサポートしています: PERMISSIVE, DROPMALFORMED, FAILFASTです。rescuedDataColumnと一緒に使用した場合、データ型のミスマッチはDROPMALFORMEDモードでのレコード削除を行いませんし、FAILFASTモードでエラーになりません。破損したレコード、不完全あるいは不正なJSON、CSVのみが削除されたりエラーを引き起こしたりします。JSONやCSVをパースする際にbadRecordsPathを使用している場合、rescuedDataColumnを使っている場合にはデータ型のミスマッチは不正レコードとみなされません。不完全あるいは不正なJSON、CSVレコードのみがbadRecordsPathに格納されます。

制限

  • foreachBatchを使用するDatabricksランタイム8.2、8.3が稼働しているPythonアプリケーションではスキーマ進化はサポートされていません。代わりにScalaのforeachBatchを使用することができます。

Databricks 無料トライアル

Databricks 無料トライアル

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?