2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

プロダクションワークロード向けのAuto Loaderの設定

Last updated at Posted at 2022-10-24

Configure Auto Loader for production workloads | Databricks on AWS [2022/10/17時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

プロダクション環境でAuto Loaderを実行する際にはストリーミングのベストプラクティスに従うことをお勧めします。

インクリメンタルなデータ取り込みには、Delta Live TablesでAuto Loaderを使用することをお勧めします。Delta Live TablesはApache Sparkの構造化ストリーミングを拡張し、数行の宣言型PythonあるいはSQLを記述することで、以下の機能を持つプロダクション品質のデータパイプラインをデプロイすることができます。

Auto Loaderのモニタリング

Auto Loaderによって検知されたファイルへのクエリー

注意
cloud_files_state関数はDatabricksランタイム10.5以降で利用できます。

Auto Loaderは、ストリームの状態を調査するためのSQL APIを提供しています。cloud_files_state関数を用いることで、Auto Loaderのストリームによって検知されたファイルに関するメタデータを取得することができます。Auto Loaderのストリームに関連づけられたチェックポイントのロケーションを指定して、シンプルにcloud_files_stateにクエリーを行います。

SQL
SELECT * FROM cloud_files_state('path/to/checkpoint');

ストリームアップデートのリッスン

Auto Loaderのストリームをさらにモニタリングするには、Apache SparkのStreaming Query Listenerインタフェースを使うことをお勧めします。

Auto Loaderはそれぞれのバッチにおいて、Streaming Query Listenerにメトリクスをレポートします。ストリーミングクエリーの進捗ダッシュボードRaw Dataタブの下のnumFilesOutstandingnumBytesOutstandingメトリクスでバックログにどれだけのファイルが存在するのか、バックログがどれだけ大きいのかを確認することができます。

JSON
{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

AWSやAzureでDatabricksランタイム10.1以降でファイル通知モードを使う際、メトリクスにはクラウドキューにおおよそどれだけのファイルイベントが存在するのかを示すapproximateQueueSizeが含まれます。

コストの検討

Auto Loaderを実行する際、コストの主要因となるのは計算資源とファイル検知のコストになるでしょう。

計算コストを削減するためには、低レーテンシーの要件がない限り、連続的に稼働させるのではなく、Trigger.AvailableNow(Databricksランタイム10.1以降)やTrigger.Onceを用いたバッチジョブとしてAuto LoaderをスケジュールするためにDatabricksジョブを使用することをお勧めします。

ファイル検知は、ディレクトリ一覧モードでお使いのストレージアカウントに対するLISTオペレーションの実行、サブスクリプションサービスに対するAPIリクエスト、ファイル通知モードのキューサービスによってコストを発生させることがあります。以下のことをお勧めします。

  • ディレクトリ一覧モードでAuto Loaderを継続的に実行する際にはProcessingTimeトリガーを指定します。
  • 可能であればインクリメンタルな一覧を活用できる様に、語順でファイルをストレージアカウントにアップロードする様に設計します。
  • 特に深くネストされたディレクトリに対しては、Databricksランタイム9.0やディレクトリ一覧モードを使用します。
  • インクリメンタル一覧が利用できない場合には、ファイル通知を利用します。
  • コストを追跡するために、Auto Loaderによって作成されたリソースをタグ付けするためにリソースタグを使用します。

Trigger.AvailableNowとレート制限の使用

注意
Databricksランタイム10.1ではScalaのみ利用できます。

Databricksランタイム10.2以降ではPythonとScalaで利用できます。

Trigger.AvailableNowを用いることで、バッチジョブとしてDatabricksジョブの中でAuto Loaderを実行する様にスケジュールすることができます。AvailableNowトリガーは、クエリー開始時間の前に到着したすべてのファイルを処理する様にAuto Loaderに指示します。ストリームが起動した後にアップロードされた新規ファイルは次のトリガーまで無視されます。

Trigger.AvailableNowを用いることで、データ処理とは非同期にファイル検知が行われ、レート制限を用いた複数のマイクロバッチでデータを処理することができます。デフォルトではAuto Loaderはマイクロバッチごとに最大1000ファイルを処理します。マイクロバッチでどれだけのファイル数やバイト数を処理すべきかを設定するために、cloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTriggerを設定することができます。ファイルの制限はハードリミットですが、バイトの制限はソフトリミットであり、指定されたmaxBytesPerTriggerよりも多いバイト数が処理される場合があることを意味します。両方のオプションが指定された場合には、Auto Loaderはいずれかの制限に達するまでに可能な限り多くのファイルを処理します。

イベントの保持

注意
Databricksランタイム8.4以降で利用できます。

Auto Loaderは確実に一度(exactly-once)の取り込みの保証をするために、RocksDBを用いたチェックポイントの場所に検知したファイルの履歴を保持します。ボリュームの大きいデータセットにおいては、ストレージコストとAuto Loader起動時間を削減するために、チェックポイントの場所からイベントを廃棄するためにcloudFiles.maxFileAgeオプションを使用することができます。cloudFiles.maxFileAgeに設定できる最小値は"14 days"です。RocksDBにおける削除はトゥームストーンのエントリーとして表示されるので、安定する前にイベントが廃棄されるので、一時的にストレージの利用量が一時的に増加する場合があります。

警告!
大容量のデータセットに対するコストコントロールの機構として、cloudFiles.maxFileAgeが指定され、毎時数百万のファイルが取り込まれます。cloudFiles.maxFileAgeを適切にチューニングしないと、データ品質問題につながる場合があります。このため、本当に必要な場合を除いてこのパラメーターをチューニングすることはお勧めしません。

cloudFiles.maxFileAgeオプションをチューニングしようとすると、Auto Loaderによって無視されることで未処理のファイルが生じる場合があり、処理済みのファイルの廃棄され、再処理によって重複データが生じる場合があります。cloudFiles.maxFileAgeを選択する際に検討すべき事柄を以下に示します。

  • 長時間経過したあとにストリームを再スタートする場合には、cloudFiles.maxFileAgeより古いキューからプルされるファイル通知イベントは無視されます。同様に、ディレクトリ一覧を使用している場合には、cloudFiles.maxFileAgeより古い、ダウンタイムの期間に出現したファイルは無視されます。
  • ディレクトリ一覧モードを使用しており、cloudFiles.maxFileAgeを使う際には、例えば"1 month"を設定し、ストリームを停止した後にcloudFiles.maxFileAge"2 months"に設定して再スタートさせ、1ヶ月より古く2ヶ月よりも新しいすべてのファイルが再処理されます。

cloudFiles.maxFileAgeをチューニングするベストプラクティスは、"1 year"のような緩い廃棄期間からスタートし、"9 months"のように引き下げていくというものです。ストリームを初めてスタートする際にこのオプションを設定するのであれば、cloudFiles.maxFileAgeより古いデータは取り込まれないので、古いデータを取り込みたいのであればストリームを開始する際にこのオプションを設定すべきではありません。

Databricks 無料トライアル

Databricks 無料トライアル

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?