背景・目的
DatabricksのAuto Loaderについて、触れる機会がありましたので基本的な仕様などを整理します。
まとめ
下記に特徴をまとめます。
特徴 | 説明 |
---|---|
Auto Loader | 新しいファイルが到着すると、段階的かつ効率的に処理する。 |
クラウドストレージ | ・AWS S3 ・Azure Data Lake Storage Gen2 ・GCPS ・Azure Blob Storage ・ADSL |
ファイル形式 | ・JSON ・CSV ・XML ・PARQUET ・AVRO ・ORC ・TEXT ・BINARYFILE |
仕様 | ・cloudFilesという構造化ストリーミングソースを提供する ・新しいファイルが到着すると自動的に処理する ・オプションでディレクトリ内の既存ファイルも処理する ・DLTでPythonとSQLの両方をサポートする ・何十億者ファイルを処理し、テーブルを移行、または埋め戻すことが可能 |
パフォーマンス | 一時間あたりに数百万ものファイルをニアリアルタイムで取り込む事が可能 |
進行状況の追跡方法 | ・ファイル検出すると、メタデータは、RocksDBに永続化される ・このKey&Valueにより、データが一回だけ処理されることが保証される ・障害発生時には、チェックポイントの場所に保存されている情報を使用して再開できる |
Schema 推論 の仕組み | 検出した最初の下記のどちらかをサンプリングする(デフォルトサイズ) ・50GB ・1000ファイル |
Schema Evolutionのモード | ・addNewColumns ・rescue ・failOnNewColumns ・none |
Auto Loaderでのパーティションの動作 | Hive形式のディレクトリの場合、推論する |
ファイル検出 | 新しいファイルを検出するために、下記の2つのモードがサポートされている ・ディレクトリ一覧 ・ファイル通知 |
ディレクトリリストによる検出 | ・入力ディレクトリをリスト化し、新規ファイルを識別する クラウドストレージ上のデータへのアクセス以外の権限設定無しですぐに利用可能 |
ファイル通知 | ・クラウドインフラのファイル通知サービスとキューサービスを利用する ・入力ディレクトリからファイルイベントをサブスクライブする通知サービスとキューサービスを自動で設定できる |
概要
Auto Loaderとはを元に整理します。
Auto Loaderは、新しいデータファイルがクラウドストレージに到着すると、追加設定なしで段階的かつ効率的に処理します。
- Auto Loaderとは
- 新しいファイルが到着すると、段階的かつ効率的に処理する
Auto Loaderの仕組み
Auto Loader は、新しいデータ ファイルがクラウド ストレージに到着すると、段階的かつ効率的に処理します。 Auto Loaderは、AWS S3 (s3://)、Azure Data Lake Storage Gen2 (ADLS Gen2、abfss://)、Google Cloud Storage (GCS、gs://)、Azure Blob Storage (wasbs://)、ADLS からデータ ファイルをロードできます。 Gen1 (adl://)、および Databricks ファイル システム (DBFS、dbfs:/)。 Auto Loader はJSON 、 CSV 、 XML 、 PARQUET 、 AVRO 、 ORC 、 TEXT 、およびBINARYFILEファイル形式を取り込むことができます。
- Auto Loaderは下記からファイルを取り込む
- AWS S3
- Azure Data Lake Storage Gen2
- GCPS
- Azure Blob Storage
- ADSL
- 下記のファイルの形式を取り込むことが可能
- JSON
- CSV
- XML
- PARQUET
- AVRO
- ORC
- TEXT
- BINARYFILE
Auto Loaderは、cloudFilesという構造化ストリーミングソースを提供します。クラウドファイルストレージ上の入力ディレクトリパスが与えられると、cloudFilesソースは新しいファイルが到着すると自動的に処理し、オプションでそのディレクトリ内の既存のファイルも処理します。Auto Loaderは、デルタ・ライブ・テーブルでPythonとSQLの両方をサポートしています。
Auto Loaderを使用して、何十億ものファイルを処理し、テーブルを移行または埋め戻すことができます。Auto Loaderは、1時間あたり数百万ものファイルをほぼリアルタイムで取り込むことができます。
- cloudFilesという構造化ストリーミングソースを提供する
- 新しいファイルが到着すると自動的に処理する
- オプションでディレクトリ内の既存ファイルも処理する
- DLTでPythonとSQLの両方をさポーチする
- 何十億者ファイルを処理し、テーブルを移行、または埋め戻すことが可能
- 一時間あたりに数百万ものファイルをニアリアルタイムで取り込む事が可能
Auto Loaderは取り込みの進行状況をどのように追跡しますか?
ファイルが検出されると、そのメタデータはAuto Loaderパイプラインのチェックポイントの場所にあるスケーラブルなキーバリューストア(RocksDB)に永続化されます。このキーと値のストアにより、データが1回だけ処理されることが保証されます。
障害が発生した場合、Auto Loaderはチェックポイントの場所に保存されている情報によって中断されたところから再開でき、Delta Lakeへのデータの書き込み時に1回限りの保証を提供し続けます。フォールトトレランスや1回限りのセマンティクスを実現するために、自分で状態を維持または管理する必要はありません。
- ファイル検出すると、メタデータは、RocksDBに永続化される
- このKey&Valueにより、データが一回だけ処理されることが保証される
- 障害発生時には、チェックポイントの場所に保存されている情報を使用して再開できる
Delta Live TablesでAuto Loaderを使用した増分インジェスト
Databricksは、増分データの取り込みにDelta Live TablesのAuto Loaderを推奨しています。Delta Live Tablesは、Apache Spark Structured Streamingの機能を拡張し、わずか数行の宣言的なPythonまたはSQLを書くだけで、本番品質のデータパイプラインを導入することができます。これによって、以下のことが可能になります。
- コンピュートインフラストラクチャのオートスケーリングによるコスト削減
- 期待値を使用したデータ品質チェック
- 自動スキーマ進化処理
- イベントログのメトリクスによるモニタリング
Delta Live Tablesがパイプラインの設定を自動的に管理するため、スキーマやチェックポイントの場所を指定する必要はありません。「Delta Live Tablesでデータを読み込む」を参照してください。
Databricks では、Apache Spark 構造化ストリーミングを使用してクラウドオブジェクトストレージからデータを取り込む場合は常に Auto Loader ことをお勧めします。 APIs は Python と Scala で利用できます。
- DLTは、増分データの取り込みに推奨している
- DLTは、Spart Streamingの機能を拡張している
ファイルに直接構造化ストリーミングを使用するよりも、Auto Loaderが優れている点
Apache Sparkでは、spark.readStream.format(fileFormat).load(directory)を使用してファイルを段階的に読み取ることができます。Auto Loaderには、ファイルソースに比べて次のような利点があります。
- スケーラビリティ:Auto Loaderは数十億のファイルを効率的に検出できます。バックフィルは非同期的に実行できるため、コンピューティングリソースの無駄を避けることができます。
- パフォーマンス:Auto Loaderによるファイル検出のコストは、ファイルが置かれる可能性のあるディレクトリの数ではなく、取り込まれるファイルの数によって変化します。「 Auto Loaderのディレクトリリストモードとは」を参照してください。
- スキーマの推論と進化のサポート: Auto Loader スキーマのドリフトを検出し、スキーマの変更が発生したときに通知し、他の方法では無視または失われたデータをレスキューできます。 「 Auto Loader スキーマ推論のしくみ」を参照してください。
- コスト:Auto Loaderは、ネイティブクラウドAPIを使用して、ストレージに存在するファイルの一覧を取得します。さらに、Auto Loaderのファイル通知モードは、ディレクトリのリストを完全に回避することで、クラウドのコストをさらに削減するのに役立ちます。Auto Loaderは、ストレージにファイル通知サービスを自動的に設定して、ファイル検出を大幅に低コストにすることができます。
- Auto Loaderには、ファイルソースと比較し下記の利点がある
- スケーラビリティ
- 数十億のファイルを効率的に検出できる
- バックフィルは非同期に実行できる
- 無駄なリソースの使用を避けられる
- パフォーマンス
- ファイル検出のコストは、ファイルが置かれている可能性のあるディレクトリ数ではない
- 取り込まれるファイルの数によって変化する
- スキーマの推論と進化のサポート
- ドリフトを検出し、スキーマの変更が生じたときに、通知し他の方法では無視または失われたデータをレスキューできる
- コスト
- Auto LoaderはネイティブクラウドAPIを使用して、ストレージに存在するファイルの一覧を取得する
- Auto Loaderのファイル通知モードは、ディレクトリのリストを完全に回避することが可能。それによりコストが更に削減できる
- スケーラビリティ
Auto Loaderのオプションを設定する
Configure schema inference and evolution in Auto Loaderを元に整理します。
読み込まれたデータのスキーマを自動的に検出するようにAuto Loaderを構成すると、データスキーマを明示的に宣言せずにテーブルを初期化し、新しい列の導入に応じてテーブルスキーマを進化させることができます。これにより、時間の経過とともにスキーマの変更を手動で追跡して適用する必要がなくなります。
- スキーマを自動的に検出するよう構成できる
- スキーマを明示的に宣言せずにテーブルを初期化子、新しい列の導入に応じてテーブルスキーマを進化させる事が可能
- 時間の経過とともに、スキーマの変更を手動で追跡して適用する必要はない
Auto Loaderスキーマ推論の仕組み
最初にデータを読み取るときにスキーマを推測するために、Auto Loaderは、検出した最初の50 GBまたは1000ファイル(最初に制限を超えた方)をサンプリングします。Auto Loaderは、入力データに対するスキーマの変更を経時的に追跡するために、設定されたcloudFiles.schemaLocationのディレクトリ_schemasにスキーマ情報を保存します。
- 検出した最初の下記のどちらかをサンプリングする
- 50GB
- 1000ファイル
- 設定されたcloudFiles.schemaLocationディレクトリ _schemasにスキーマ情報を保存する
- 使用されるサンプルのサイズを変更する方法は、下記の通り
- サンプルサイズ
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
- ファイル数
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
- サンプルサイズ
デフォルトでは、Auto Loader スキーマ推論は、型の不一致によるスキーマ進化の問題を回避しようとします。 データ型をエンコードしない形式 (JSON、CSV、および XML) の場合、 Auto Loaderすべての列を文字列として推論します (JSON ファイル内のネストされたフィールドを含む)。 型指定されたスキーマを持つ形式 (Parquet および Avro) の場合、Auto Loader はファイルのサブセットをサンプリングし、個々のファイルのスキーマをマージします。 この動作を次の表にまとめます。
- デフォルトでは、型の不一致によるスキーマ Evolutuonの問題を回避しようとする
- データ型をエンコードしない形式(JSON,CSV,XML)は、すべての列を文字列として推論する
- JSONのネストされたフィールドを含む
- 型指定されたスキーマを持つ形式(Parquet、Avro)は、ファイルのサブセットをサンプリングする
Apache Spark DataFrameReader は、スキーマ推論に異なる動作を使用し、サンプル データに基づいて JSON、CSV、および XML ソースの列のデータ型を選択します。 Auto Loaderでこの動作を有効にするには、オプション cloudFiles.inferColumnTypes を true に設定します。
- サンプルデータに基づいて、推論するには、
cloudFiles.inferColumnTypes を true
にする
Auto Loaderスキーマ進化の仕組み
Auto Loaderは、データを処理するときに新しい列の追加を検出します。Auto Loaderが新しい列を検出すると、ストリームはUnknownFieldExceptionで停止します。ストリームがこのエラーをスローする前に、Auto Loaderはデータの最新のマイクロバッチに対してスキーマ推論を実行し、新しい列をスキーマの末尾にマージすることでスキーマの場所を最新のスキーマで更新します。既存の列のデータ型は変更されません。
- Auto Loaderはデータを処理するときに新しい列の追加を検出する
- 新しい列を検出すると、ストリームは、UnknownFieldExceptionで停止する
- ストリームがこのエラーをスローする前に、Auto Loaderはデータの最新のマイクロバッチに対してスキーマ推論を実行する
- 新しい列のスキーマの末尾にマージすることで スキーマの場所を最新のスキーマで更新する
- 既存の列のデータ型は変更されない
Databricksでは、このようなスキーマの変更後に自動的に再起動するように、ワークフローを使用してAuto Loaderストリームを構成することをお勧めしています。
Auto Loaderは、以下のスキーマ進化モードをサポートします。これは、オプションcloudFiles.schemaEvolutionModeで設定します。
- addNewColumns
- デフォルト
- ストリームが失敗する
- 新しい列がスキーマに追加される
- 既存の列は、データ型は変更されない
- rescue
- schema evolutionは行われない
- スキーマの変更によりストリームが失敗することは無い
- すべての新しい列は、レスキューされたデータ列に記録される
- failOnNewColumns
- ストリームが失敗する
- 提供されたスキーマが更新されるか、問題のあるデータファイルが削除されない限りストリームは再起動されない
- none
- schema evolutionは行われない
- 新しい列は無視される
- rescuedDataColumnオプションが設定されていない限りデータはレスキューされない
- スキーマの変更によりストリームが失敗することはない
Auto Loaderでのパーティションの動作
データがHiveスタイルのパーティショニングでレイアウトされている場合、Auto Loaderはデータの基礎となるディレクトリ構造からパーティション列を推測しようとします。たとえば、ファイルパスbase_path/event=click/date=2021-04-01/f0.jsonでは、パーティション列としてdateとeventが推論されます。基礎となるディレクトリ構造に競合するHiveパーティションが含まれているか、Hiveスタイルのパーティショニングが含まれていない場合、パーティション列は無視されます。
- Hive形式の場合、パーティション列を推測しようとする
バイナリファイル(binaryFile)およびtextファイル形式には固定のデータスキーマがありますが、パーティション列推論がサポートされています。Databricksでは、これらのファイル形式にcloudFiles.schemaLocationを設定することをお勧めしています。これにより、潜在的なエラーや情報損失が回避され、Auto Loaderが起動するたびにパーティション列が推論されることがなくなります。
スキーマの進化ではパーティション列は考慮されません。base_path/event=click/date=2021-04-01/f0.jsonのような初期ディレクトリ構造があり、その後base_path/event=click/date=2021-04-01/hour=01/f1.jsonとして新しいファイルの受信を開始した場合、Auto Loaderは時間列を無視します。新しいパーティション列の情報を取得するには、cloudFiles.partitionColumnsをevent,date,hourに設定します。
- Schema Evolutionではパーティション列は考慮されない
レスキューされたデータ列とは
Auto Loaderがスキーマを推論すると、レスキューされたデータ列が_rescued_dataとしてスキーマに自動的に追加されます。オプションrescuedDataColumnを設定することで、列の名前を変更したり、スキーマを指定する場合に列を含めたりすることができます。
- スキーマ推論の結果、レスキューされたデータ列が、resucued_dataとしてスキーマに自動的に追加される
レスキューされたデータ列は、スキーマに一致しない列を削除せずレスキューします。レスキューされたデータ列には、以下の理由で解析されなかったデータが含まれます。
- 列がスキーマにない
- 型が一致しない
- 大文字小文字が一致しない
- レスキューされたデータ列には、レスキューされた列とレコードのソースファイルパスを含むJSONが含まれます。
スキーマヒントを使用してスキーマ推論をオーバーライドする
スキーマヒントを使用すると、推論されたスキーマに対して、自分が知っていて期待しているスキーマ情報を適用できます。列が特定のデータ型であることが分かっている場合や、より一般的なデータ型(例えば、integerの代わりにdoubleなど)を選択したい場合は、SQLスキーマ指定構文を使用して、列のデータ型に関する任意の数のヒントを以下のような文字列として提供することができます:
- スキーマヒントにより、推論されたスキーマに対して、自分が知っていて期待しているスキーマ情報を提供できる
Auto Loaderのファイル検出モードを設定する
Auto Loader ファイル検出モードの比較を元に整理します。
Auto Loader では、新しいファイルを検出するために、ディレクトリ一覧とファイル通知の 2 つのモードがサポートされています。 ストリームの再起動間でファイル検出モードを切り替えても、正確に一度のデータ処理が保証されます。
- 新しいファイルを検出するために、下記の2つのモードがサポートされている
- ディレクトリ一覧
- ファイル通知
- ストリーム再起動間でファイル検出モードを切り替えても、正確に一度のデータ処理が保証される
ディレクトリリストモード
ディレクトリー・リスト・モードでは、 Auto Loader は入力ディレクトリーをリストすることによって新規ファイルを識別します。 ディレクトリリストモードでは、クラウドストレージ上のデータへのアクセス以外の権限設定なしで Auto Loader ストリームをすばやく開始できます。
- 入力ディレクトリをリスト化し、新規ファイルを識別する
- クラウドストレージ上のデータへのアクセス以外の権限設定無しですぐに利用可能
Databricks Runtime 9.1 以降では、Auto Loader はファイルが字句順でクラウド ストレージに到着しているかどうかを自動的に検出し、新しいファイルの検出に必要な API 呼び出しの量を大幅に削減できます。詳細については、「 Auto Loader ディレクトリリストモードとは」を参照してください。
- Runtime 9.1以降では、ファイルが字句順で到着しているか自動で検出する
ディレクトリリストモードはどのように機能しますか?
Databricks では、 Auto Loader が他の Apache Spark オプションよりも効率的にクラウド ストレージ内のファイルを検出できるように、ディレクトリ一覧モードが最適化されています。
たとえば、 /some/path/YYYY/MM/DD/HH/fileNameとして 5 分ごとにファイルをアップロードする場合、これらのディレクトリ内のすべてのファイルを検索するために、Apache Spark ファイル ソースにはすべてのサブディレクトリが並列に一覧表示されます。 次のアルゴリズムは、オブジェクト・ストレージへのAPI LIST ディレクトリ呼び出しの総数を推定します:
1 (ベース ディレクトリ) + 365 (1 日あたり) * 24 (1 時間あたり) = 8761 コール
ストレージからフラット化された応答を受信することで、 Auto Loader は API 呼び出しの数をストレージ内のファイルの数で割った値に減らし、各 API 呼び出しによって返される結果の数を減らし、クラウドのコストを大幅に削減します。 次の表は、共通オブジェクト・ストレージの各APIコールによって返されるファイルの数を示しています:
- ディレクトリモードが最適化されている
- ストレージからフラット化された応答を受信し、APIコールの数をストレージ内のファイルの数で割った値に減らす
ファイル通知モード
ファイル通知モードでは、クラウドインフラストラクチャアカウントのファイル通知サービスとキューサービスを利用します。 Auto Loader 、入力ディレクトリからファイル イベントをサブスクライブする通知サービスとキュー サービスを自動的に設定できます。
- クラウドインフラのファイル通知サービスとキューサービスを利用する
- 入力ディレクトリからファイルイベントをサブスクライブする通知サービスとキューサービスを自動で設定できる
ファイル通知モードは、大きな入力ディレクトリや大量のファイルに対してよりパフォーマンスとスケーラビリティがありますが、設定するには追加のクラウド権限が必要です。 詳細については、「 Auto Loader ファイル通知モードとは」を参照してください。
- 大きな入力ファイル、大量ファイル、パフォーマスとスケーラビリティがある
- 設定にはクラウドの権限が必要
Auto Loader ファイル通知モードで使用されるクラウドリソース
ファイル通知イベント
AWS S3 は、ファイルがプットアップロードまたはマルチパートアップロードのどちらでアップロードされたかに関係なく、ファイルが S3 バケットにアップロードされたときに ObjectCreated イベントを提供します。
ADLS Gen2 では、Gen2 コンテナーに表示されるファイルに対してさまざまなイベント通知が提供されます。
- Auto Loader は、ファイルを処理するための FlushWithClose イベントをリッスンします。
- Auto Loader ストリームは、ファイルを検出するための RenameFile アクションをサポートします。 RenameFile アクションでは、名前が変更されたファイルのサイズを取得するために、ストレージ・システムへのAPIリクエストが必要です。
- Databricks Runtime 9.0 以降で作成された Auto Loader ストリームでは、ファイルを検出するための RenameDirectory アクションがサポートされています。RenameDirectory アクションでは、名前を変更したディレクトリの内容を一覧表示するために、ストレージ・システムへのAPI要求が必要です。
Google Cloud ストレージでは、ファイルのアップロード時に上書きやファイルのコピーなどの OBJECT_FINALIZE イベントが提供されます。 アップロードに失敗しても、このイベントは生成されません。
クラウドプロバイダーは、非常にまれな条件下ですべてのファイルイベントの100%配信を保証するものではなく、ファイルイベントの待機時間に関する厳格なSLAを提供していません。 Databricks では、データの完全性が要件である場合、 cloudFiles.backfillInterval オプションを使用して定期的なバック Auto Loader フィルをトリガーし、特定の SLA 内ですべてのファイルが検出されるようにすることをお勧めします。 通常のバックフィルをトリガーしても、重複は発生しません。
- クラウドプロバイダーは、ファイルイベントの100%配信を保証しない
- SLAも提供しない
- Databricksとしては、データの完全性が要件の場合に、バックフィルオプションを使用して定期的なバックフィルをトリガーし、特定のSLA内ですべてのファイルが検出されるようにすることを推奨している
- バックフィルをトリガーしても重複は発生しない
実践
Databricksで最初のETLワークロードを実行する
- Databricksにサインインします
ステップ1:クラスターを作成する
ステップ2:Databricksノートブックを作成する
ステップ3:データをDelta Lakeに取り込むようにAuto Loaderを構成する
- 下記のコードを貼り付けて、実行します
# Import functions from pyspark.sql.functions import col, current_timestamp # Define variables used in code below file_path = "/databricks-datasets/structured-streaming/events" username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] table_name = f"{username}_etl_quickstart" checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart" # Clear out data from previous demo execution spark.sql(f"DROP TABLE IF EXISTS {table_name}") dbutils.fs.rm(checkpoint_path, True) # Configure Auto Loader to ingest JSON data to a Delta table (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", checkpoint_path) .load(file_path) .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time")) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name))
ステップ4:データを処理し、操作する
ステップ5:ジョブをスケジュールする
ファイルを配置し、AutoLoaderの挙動を確認する
事前準備
データの用意
Storage Credentialの用意
- こちらで作成したIAMロールに、対象のバケットを追加します
- ナビゲーションペインで、Catalogを選択します
- 「Add a new storage credential」をクリックします
- IAMのarnを指定し、「Create」をクリックします
ca6ef543-b396-4253-91b3-ec1114a4e55d
外部ロケーションの作成
- 「Add an external location」をクリックします
- Manualを選択し、Nextをクリックします
- S3パス、External location name、Storage Credentialを指定し、「Create」をクリックします
- できました。「Test connection」をクリックします
- 成功しました
実装
-
下記のコードをノートブックに用意します
# Import functions from pyspark.sql.functions import col, current_timestamp # Define variables used in code below file_path = "s3://XXXX/test1" table_name = f"autoloader_test2" checkpoint_path = f"s3://XXXX/etl_quickstartautoloader_test2" # Clear out data from previous demo execution spark.sql(f"DROP TABLE IF EXISTS {table_name}") dbutils.fs.rm(checkpoint_path, True) # Configure Auto Loader to ingest JSON data to a Delta table (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", checkpoint_path) .option("inferSchema","true") .load(file_path) .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time")) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name))
ファイルを追加し実行
チェックポイントを確認
- 下記のコードでチェックポイントを確認します
checkpoint_path = "s3://XXXXX/etl_quickstartautoloader_test2" df = spark.sql(f"SELECT * FROM cloud_files_state('{checkpoint_path}')") display(df)
- 表示されました
考察
今回は、AutoLoaderの挙動について確認しました。次回はAutoLoaderのファイル通知を試してみます
参考