Configure Auto Loader for production workloads | Databricks on AWS [2022/10/17時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
プロダクション環境でAuto Loaderを実行する際にはストリーミングのベストプラクティスに従うことをお勧めします。
インクリメンタルなデータ取り込みには、Delta Live TablesでAuto Loaderを使用することをお勧めします。Delta Live TablesはApache Sparkの構造化ストリーミングを拡張し、数行の宣言型PythonあるいはSQLを記述することで、以下の機能を持つプロダクション品質のデータパイプラインをデプロイすることができます。
- コスト削減のためにオートスケーリングする計算インフラストラクチャ
- エクスペクテーションによるデータ品質チェック
- スキーマ進化の自動ハンドリング
- イベントログのメトリクスを通じたモニタリング
Auto Loaderのモニタリング
Auto Loaderによって検知されたファイルへのクエリー
注意
cloud_files_state
関数はDatabricksランタイム10.5以降で利用できます。
Auto Loaderは、ストリームの状態を調査するためのSQL APIを提供しています。cloud_files_state
関数を用いることで、Auto Loaderのストリームによって検知されたファイルに関するメタデータを取得することができます。Auto Loaderのストリームに関連づけられたチェックポイントのロケーションを指定して、シンプルにcloud_files_state
にクエリーを行います。
SELECT * FROM cloud_files_state('path/to/checkpoint');
ストリームアップデートのリッスン
Auto Loaderのストリームをさらにモニタリングするには、Apache SparkのStreaming Query Listenerインタフェースを使うことをお勧めします。
Auto Loaderはそれぞれのバッチにおいて、Streaming Query Listenerにメトリクスをレポートします。ストリーミングクエリーの進捗ダッシュボードのRaw Dataタブの下のnumFilesOutstanding
とnumBytesOutstanding
メトリクスでバックログにどれだけのファイルが存在するのか、バックログがどれだけ大きいのかを確認することができます。
{
"sources" : [
{
"description" : "CloudFilesSource[/path/to/source]",
"metrics" : {
"numFilesOutstanding" : "238",
"numBytesOutstanding" : "163939124006"
}
}
]
}
AWSやAzureでDatabricksランタイム10.1以降でファイル通知モードを使う際、メトリクスにはクラウドキューにおおよそどれだけのファイルイベントが存在するのかを示すapproximateQueueSize
が含まれます。
コストの検討
Auto Loaderを実行する際、コストの主要因となるのは計算資源とファイル検知のコストになるでしょう。
計算コストを削減するためには、低レーテンシーの要件がない限り、連続的に稼働させるのではなく、Trigger.AvailableNow
(Databricksランタイム10.1以降)やTrigger.Once
を用いたバッチジョブとしてAuto LoaderをスケジュールするためにDatabricksジョブを使用することをお勧めします。
ファイル検知は、ディレクトリ一覧モードでお使いのストレージアカウントに対するLISTオペレーションの実行、サブスクリプションサービスに対するAPIリクエスト、ファイル通知モードのキューサービスによってコストを発生させることがあります。以下のことをお勧めします。
- ディレクトリ一覧モードでAuto Loaderを継続的に実行する際には
ProcessingTime
トリガーを指定します。 - 可能であればインクリメンタルな一覧を活用できる様に、語順でファイルをストレージアカウントにアップロードする様に設計します。
- 特に深くネストされたディレクトリに対しては、Databricksランタイム9.0やディレクトリ一覧モードを使用します。
- インクリメンタル一覧が利用できない場合には、ファイル通知を利用します。
- コストを追跡するために、Auto Loaderによって作成されたリソースをタグ付けするためにリソースタグを使用します。
Trigger.AvailableNowとレート制限の使用
注意
Databricksランタイム10.1ではScalaのみ利用できます。
Databricksランタイム10.2以降ではPythonとScalaで利用できます。
Trigger.AvailableNow
を用いることで、バッチジョブとしてDatabricksジョブの中でAuto Loaderを実行する様にスケジュールすることができます。AvailableNow
トリガーは、クエリー開始時間の前に到着したすべてのファイルを処理する様にAuto Loaderに指示します。ストリームが起動した後にアップロードされた新規ファイルは次のトリガーまで無視されます。
Trigger.AvailableNow
を用いることで、データ処理とは非同期にファイル検知が行われ、レート制限を用いた複数のマイクロバッチでデータを処理することができます。デフォルトではAuto Loaderはマイクロバッチごとに最大1000ファイルを処理します。マイクロバッチでどれだけのファイル数やバイト数を処理すべきかを設定するために、cloudFiles.maxFilesPerTrigger
やcloudFiles.maxBytesPerTrigger
を設定することができます。ファイルの制限はハードリミットですが、バイトの制限はソフトリミットであり、指定されたmaxBytesPerTrigger
よりも多いバイト数が処理される場合があることを意味します。両方のオプションが指定された場合には、Auto Loaderはいずれかの制限に達するまでに可能な限り多くのファイルを処理します。
イベントの保持
注意
Databricksランタイム8.4以降で利用できます。
Auto Loaderは確実に一度(exactly-once)の取り込みの保証をするために、RocksDBを用いたチェックポイントの場所に検知したファイルの履歴を保持します。ボリュームの大きいデータセットにおいては、ストレージコストとAuto Loader起動時間を削減するために、チェックポイントの場所からイベントを廃棄するためにcloudFiles.maxFileAge
オプションを使用することができます。cloudFiles.maxFileAge
に設定できる最小値は"14 days"
です。RocksDBにおける削除はトゥームストーンのエントリーとして表示されるので、安定する前にイベントが廃棄されるので、一時的にストレージの利用量が一時的に増加する場合があります。
警告!
大容量のデータセットに対するコストコントロールの機構として、cloudFiles.maxFileAge
が指定され、毎時数百万のファイルが取り込まれます。cloudFiles.maxFileAge
を適切にチューニングしないと、データ品質問題につながる場合があります。このため、本当に必要な場合を除いてこのパラメーターをチューニングすることはお勧めしません。
cloudFiles.maxFileAge
オプションをチューニングしようとすると、Auto Loaderによって無視されることで未処理のファイルが生じる場合があり、処理済みのファイルの廃棄され、再処理によって重複データが生じる場合があります。cloudFiles.maxFileAge
を選択する際に検討すべき事柄を以下に示します。
- 長時間経過したあとにストリームを再スタートする場合には、
cloudFiles.maxFileAge
より古いキューからプルされるファイル通知イベントは無視されます。同様に、ディレクトリ一覧を使用している場合には、cloudFiles.maxFileAge
より古い、ダウンタイムの期間に出現したファイルは無視されます。 - ディレクトリ一覧モードを使用しており、
cloudFiles.maxFileAge
を使う際には、例えば"1 month"
を設定し、ストリームを停止した後にcloudFiles.maxFileAge
を"2 months"
に設定して再スタートさせ、1ヶ月より古く2ヶ月よりも新しいすべてのファイルが再処理されます。
cloudFiles.maxFileAge
をチューニングするベストプラクティスは、"1 year"
のような緩い廃棄期間からスタートし、"9 months"
のように引き下げていくというものです。ストリームを初めてスタートする際にこのオプションを設定するのであれば、cloudFiles.maxFileAge
より古いデータは取り込まれないので、古いデータを取り込みたいのであればストリームを開始する際にこのオプションを設定すべきではありません。