Delta Live Tables concepts | Databricks on AWS [2022/8/1時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
本書では、Delta Live Tablesを効果的に使用するために理解すべき基本的なコンセプトを紹介します。
パイプライン
Delta Live Tablesにおける処理実行のメインユニットはパイプラインとなります。パイプラインは、データソースとターゲットデータセットを接続する有向非循回グラフ(DAG)です。SQLクエリーあるいはSpark SQLあるいはKoalasデータフレームを返却するPython関数を用いて、Delta Live Tablesのデータセットのコンテンツを定義することができます。パイプラインを実行するのに必要な設定をパイプラインに定義することができます。オプションとして、データセットを定義する際にデータ品質の制約を指定することができます。
DatabricksノートブックでDelta Live Tablesのパイプラインを実装します。1つのノートブックあるいは複数のノートブックでパイプラインを実装することができます。1つのノートブックでは、全てのクエリーはPythonあるいはSQLで実装される必要がありますが、Pythonのノートブック、SQLのノートブックを組み合わせた複数のノートブックでパイプラインを設定することが可能です。それぞれのノートブックは出力データの格納場所を共有し、パイプラインの他のノートブックからデータセットを参照することができます。
Delta Live Tablesのノートブックを格納、管理するためにDatabricks Reposを使用することができます。パイプラインを作成する際にDatabricks Reposで管理できるようにするには、以下の設定を行います。
- SQLノートブックの先頭にコメント
-- Databricks notebook source
を追加します。 - Pythonノートブックの先頭にコメント
# Databricks notebook source
を追加します。
パイプラインの作成、実行についてはDelta Live Tablesパイプラインの作成、実行、管理をご覧ください。複数ノートブックによるパイプラインの設定例については、パイプラインの複数のノートブックの設定をご覧ください。
クエリー
クエリーでは、データソースとターゲットデータセットを定義することでデータ変換を定義します。Delta Live TablesのクエリーはPythonあるいはSQLで実装できます。
エクスペクテーション
データセットのコンテンツに対するデータ品質のコントロールを指定するためにエクスペクテーションを使用します。制約に違反するレコードの追加を防ぐ従来型のデータベースのCHECK
制約とは異なり、エクスペクテーションはデータ品質要件に違反するデータの処理に対する柔軟性を提供します。この柔軟性によって、綺麗でないと予測されるデータ、厳密な品質要件に合致すべきデータを処理、格納することが可能となります。
検証に失敗したレコードを保持するか、レコードを削除するか、パイプラインを停止するようにエクスペクテーションを定義することができます。
パイプライン設定
パイプライン設定はJSONで定義され、以下のようにパイプラインの実行に必要なパラメーターを含めることができます。
- Delta Lakeでターゲットデータセットを作成するためのテーブルとビューを定義するクエリーを含む(ノートブック形式の)ライブラリ
- 処理に必要となるテーブルとビューが格納されるクラウドストレージ上の格納場所。この格納場所には、DBFSあるいは指定する場所を設定できます。
- データの処理を行うSparkクラスターの追加設定
詳細はDelta Live Tablesの設定をご覧ください。
データセット
Delta Live Tablesでは、ビューとテーブルという二つのタイプのデータセットが存在します。
- ビューはSQLにおける一時ビューと類似のものであり、何かしらの計算処理のエイリアスとなります。ビューを用いることで、複雑なクエリーをより小さく理解しやすいクエリーに分割することができます。また、1つ以上のテーブルに対するソースとして特定の変換処理を再利用することができます。パイプライン内でのみビューを利用することができ、インタラクティブにクエリーを実行することはできません。
- テーブルは従来のマテリアライズドビューと似たようなものです。Delta Live Tablesランタイムは自動でテーブルをDelta形式で作成し、テーブルを作成するクエリーの最新の結果でこれらのテーブルが更新されることを保証します。
ビューやテーブルはライブあるいはストリーミングライブビュー、テーブルとして定義することができます。
ライブテーブルやライブビューは常に、それらを定義したクエリーの結果を反映します。これには、テーブルやビューを定義するクエリーが更新されたとき、入力データソースが更新された時を含みます。従来のマテリアライズドビューのように、計算リソースと時間を最適化できる際にはライブテーブルやライブビューのすべてが計算されます。
ストリーミングライブテーブルやストリーミングライブビューは、最後のパイプラインアップデート以降に追加されたデータのみを処理します。ストリーミングテーブルやストリーミングビューはステートフルです。テーブルを定義するクエリーが変更された場合、新規のクエリーに基づいて新規データは処理されますが、既存のデータは再計算されません。
ストリーミングライブテーブルは、以下のようなユースケースで価値を発揮します。
- データの保持: ストリーミングライブテーブルは、例えば、Apache KafkaやAmazon Kinesisのような入力データソースでのデータ保持期間が短かったとしても、無限にデータを保持することができます。
- データソースの進化: KafkaからKinesisにデータソースが移行したとしても、データを保持することができます。
後段のコンシューマーで検索、クエリーできるようにテーブルを公開することができます。
パイプラインのアップデート
パイプラインを作成したら実行することができます。アップデートを起動します。アップデートは以下の処理を実行します。
- 適切な設定でクラスターを起動します。
- 定義されたすべてのテーブルとビューを特定し、不正なカラム名、依存関係の欠如、構文エラーなどの解析エラーをチェックします。
- 最新のデータを用いてすべてのテーブルとビューを作成、更新します。
アップデートされるテーブル、ビュー、そして、それらのテーブルやビューがどのようにアップデートされるのかはアップデートのタイプに依存します。
- Refresh all(全てをリフレッシュ): それぞれの入力データソースの現在状態を反映するために、すべてのライブテーブルが更新されます。すべてのストリーミングライブテーブルでは、新規の行がテーブルに追加されます。
- Full refresh all(全てをフルリフレッシュ): それぞれの入力データソースの現在状態を反映するために、すべてのライブテーブルが更新されます。すねてのストリーミングライブテーブルでは、Delta Live Tablesはすべてのテーブルの全データをクリアーし、ストリーミングソースからすべてのデータをロードしようとします。
-
Refresh selection(選択してリフレッシュ):
refresh selection
の挙動はrefresh all
と同じですが、選択したテーブルのみにリフレッシュを実行することができます。選択されたライブテーブルは、それぞれの入力データソースの現在状態を反映するために更新されます。選択されたストリーミングライブテーブルでは、テーブルに新規行が追加されます。 -
Full refresh selection(選択してフルリフレッシュ):
full refresh selection
の挙動はfull refresh all
と同じですが、選択したテーブルのみにフルリフレッシュを実行することができます。選択されたライブテーブルの入力データソースの現在の状態を反映するために更新されます。選択されたストリーミングライブテーブルでは、それぞれのデータオブの全データをクリアし、ストリーミングソースからすべてのデータをロードしようとします。
既存のライブテーブルにおいて、アップデートはマテリアライズドビューにおけるSQLのREFRESH
と同じ挙動をします。新規のライブテーブルでは、この挙動はSQLのCREATE
オペレーションと同じものになります。
パイプラインがトリガーモードの場合、パイプラインのすべてのテーブルの更新後にシステムは処理を停止します。
トリガーモードのアップデートが成功すると、それぞれのテーブルはアップデートがスタートした際のデータに基づいて更新されたことが保証されます。
低レーテンシーの要件があるユースケースでは、連続モードでパイプラインをアップデートするように設定できます。お使いのパイプラインの実行モードの詳細については連続、トリガーパイプラインをご覧ください。
連続、トリガーパイプライン
Delta Live Tablesでは、2つの実行モードをサポートしています。
- トリガー(triggered)パイプラインは、現時点で利用可能なデータを用いてそれぞれのテーブルをアップデートし、パイプラインを実行しているクラスターを停止します。Delta Live Tablesは自動でテーブル間の依存関係を解析し、外部ソースからの読み込みを行う計算処理を起動します。パイプライン内のテーブルは、依存しているデータソースが更新された後に更新されます。
- 連続(continuous)パイプラインは入力データの変更に合わせて継続的にテーブルをアップデートします。アップデートが起動すると、手動で停止されるまで処理を継続します。連続パイプラインには常時稼働のクラスターが必要となりますが、後段のコンシューマーが最新のデータを利用できることを保証します。
トリガーパイプラインでは、クラスターはパイプラインを実行するのに必要な期間のみ稼働するのでリソース消費とコストを削減できます。しかし、パイプラインがトリガーされるまでは、新規データは処理されません。連続パイプラインでは常時稼働のクラスターが必要となり、追加のコストが発生しますが処理のレーテンシーを削減することができます。
パイプライン設定のcontinuous
フラグで実行モードを制御します。デフォルトではトリガー実行モードでパイプラインは実行されます。パイプラインのテーブルに対して低レーテンシーのアップデートが必要な場合には、continuous
をtrue
に設定します。
{
...
"continuous": true,
...
}
実行モードと計算するテーブルのタイプは独立しています。どちらの実行モードでもコンプリートテーブル、インクリメンタルテーブルの両方をアップデートすることができます。
お使いのパイプラインのいくつかのテーブルでレーテンシーの要件が強くない場合には、設定pipelines.trigger.interval
で更新頻度を独立に設定することができます。
spark_conf={"pipelines.trigger.interval", "1 hour"}
このオプションはパイプラインのアップデートの合間にクラスターを停止させませんが、お使いのパイプラインの他のテーブルをアップデートに使用するリソースを解放することができます。
連続パイプラインにおけるテーブルとビュー
連続的に実行されるパイプラインにライブテーブル、ライブビュー、ストリーミングライブテーブル、ストリーミングライブビューを含めることができます。不要な処理を避けるために、パイプラインは依存するDeltaテーブルを自動でモニタリングし、依存するテーブルのコンテンツが変化した時のみアップデートを実行します。
Delta Live Tablesランタイムは非Deltaのデータソースの変更を検知することはできません。テーブルは定期的に更新されますが、デフォルトトリガー周期に大きな値を設定することで、クラスターで生じるインクリメンタルな処理をスローダウンさせるような過度な再計算を回避することができます。
開発、プロダクションモード
開発(development)モードとプロダクション(production)モードを切り替えることで、パイプライン実行を最適化することができます。パイプラインを開発モードで実行すると、Delta Live Tablesシステムは以下の処理を行います。
- 再起動のオーバーヘッドを回避するためにクラスターを再利用します。
- エラーを即座に検知し修正できるように、パイプラインのリトライを無効化します。
プロダクションモードでは、Delta Live Tablesシステムは以下の処理を行います。
- メモリーリークや古い認証情報のような回復可能な特定のエラーに対してクラスターを再起動します。
- クラスターの起動失敗など特定のエラーイベントに対しては、処理の実行をリトライします。
開発モード、プロダクションモードを切り替えるにはパイプラインUIのボタンを使用します。デフォルトでは開発モードとなっています。
開発モード、プロダクションモードの切り替えは、クラスターとパイプライン実行の挙動のみを制御します。ストレージの場所はパイプライン設定の一部として設定されるものであり、このモードを切り替えても影響は受けません。
Databricksの強化オートスケーリング
プレビュー
本機能はパブリックプレビューです。
Databricksの強化オートスケーリング(enhanced autoscaling)は、パイプラインの処理レーテンシーへのインパクトを最小化しつつ、ワークロードの規模に基づいて自動でクラスターリソースを配置することで、クラスターの使用率を最適化します。
強化オートスケーリングは、既存のクラスターオートスケーリングの機能に以下の機能を追加します。
- 強化オートスケーリングは、ストリーミングワークロードの最適化機能を実装しており、バッチワークロードのパフォーマンスを改善するための機能強化も行われています。これらの最適化によって、より効率的にクラスターを使用し、リソースの使用量を削減することでコストを節約します。
- 強化オートスケーリングは、シャットダウンの過程でタスクを失敗させないことを保証しつつ、積極的に使用率の低いノードをシャットダウンします。既存のクラスターオートスケーリング機能は、アイドル状態のノードのみをスケールダウンします。
要件
強化オートスケーリングを使うには、
-
パイプライン設定の
configuration
オブジェクトでpipelines.advancedAutoscaling.enabled
を"true"
に設定します。 - パイプラインの
default
クラスターにautoscale
設定を追加します。以下の例では、最小5台、最大10台のワーカーを持つ強化オートスケーリングクラスターを設定しています。max_workers
はmin_workers
よりも大きくなくてはなりません。
注意
- 強化オートスケーリングは
default
クラスターでのみ利用できます。メンテナンス用クラスター設定にautoscale
設定を追加したとしても、従来のクラスターオートスケーリング機能が使用されます。 -
pipelines.advancedAutoscaling.enabled
の設定なしに、autoscale
設定を追加した場合、Delta Live Tablesは従来のクラスターオートスケーリング機能を使用します。
{
"configuration": {
"pipelines.advancedAutoscaling.enabled": "true"
},
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 5,
"max_workers": 10
}
}
]
}
パイプラインが連続モードの場合、オートスケーリングの設定を変更した後に自動でパイプラインは再起動されます。再起動後の後は少しの間レーテンシーが大きくなることがあります。レーテンシーが増加する期間を過ぎた後で、指定したautoscale
の設定に基づいてクラスターサイズが更新され、パイプラインのレーテンシーは以前の状態に戻ります。
強化オートスケーリングが有効化されたパイプラインのモニタリング
強化オートスケーリングのメトリクスをモニタリングするために、Delta Live Tablesのイベントログを活用することができます。ユーザーインタフェースでこれらのメトリクスを参照することができます。強化オートスケーリングのイベントはイベントタイプautoscale
となります。イベントのサンプルを以下に示します。
イベント | メッセージ |
---|---|
クラスターのリサイズリクエストの送信 | Autoscale cluster to <X> executors while keeping alive <Y> executors and retiring <Z> executors |
クラスターマネージャがリサイズリクエストを許可 | Submitted request to resize cluster <cluster-id> to size <X>. |
リサイズが完了 | Achieved desired cluster size <X> for cluster <cluster-id>. |
また、直接イベントログをクエリーすることで強化オートスケーリングのイベントを参照することもできます。
- Sparkタスクのスロット使用率など、クラスターパフォーマンスのメトリクスをイベントログでクエリーするには、Cluster performance metricsをご覧ください。
- 強化オートスケーリングのオペレーションにおけるクラスターリサイズのリクエストとレスポンスをモニタリングするには、Databricks Enhanced Autoscaling eventsをご覧ください。
製品エディション
パイプラインの要件に適した機能を用いてパイプラインを実行できるように、Delta Live Tablesの製品エディションを選択することができます。以下に利用可能な製品エディションを示します。
-
core
はストリーミングのデータ取り込みのワークロードで使用します。パイプラインでチェンジデータキャプチャ(CDC)やDelta Live Tablesのエクスペクテーションのような高度な機能が不要であれば、core
エディションを選択してください。 -
pro
はストリーミングのデータ取り込みとCDCワークロードで使用します。pro
製品エディションはcore
のすべての機能をサポートしており、加えて、ソースデータの変更に基づいてテーブルを更新する必要があるワークロードもサポートしています。 -
advanced
はストリーミングのデータ取り込み、CDC、エクスペクテーションを必要とするワークロードで使用します。advanced
製品エディションは、core
とpro
の機能をサポートしており、Delta Live Tablesのエクスペクテーションによるデータ品質制約の強化もサポートしています。
パイプラインを作成、編集する際に製品エディションを選択することができます。それぞれのパイプラインで異なるエディションを選択することができます。
エクスペクテーションのように選択した製品エディションでサポートされていない機能をパイプラインが使おうとすると、エラーの理由を示すメッセージを受け取ります。その場合、適切なエディションを選択するためにパイプラインを編集することができます。