Delta Live Tables(DLT)を用いることで、複雑なデータパイプラインであっても簡単かつ、信頼性高く構築、運用が行えるようになります。
DLT単体でもパイプラインを実行できますが、これとDatabricks Jobsとを組み合わせると、さらに複雑な処理を自動化することができます。
本書では、Databricks JobsからどのようにDLTを呼び出すのか、また、その際の注意点を説明します。
Delta Live Tablesでパイプラインを定義する
プレビュー
この機能はパブリックプレビューです。アクセスする際にはDatabricks担当者にお問い合わせください。
JSONを読み込むシンプルなパイプラインを定義します。
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
@dlt.table(
comment="Jobsテスト用DLT"
)
def bronze():
return (spark.read.json(json_path))
サイドバーからJobs > Delta Live Tablesにアクセスし、Create Pipelineをクリックします。上記パイプラインを定義したノートブックを選択し、パイプラインの名前をつけてCreateをクリックしてパイプラインを作成します。
パイプラインの詳細画面が表示されます。ジョブから呼び出すので、ここでパイプラインの実行はしません。ただ、一点注意が必要です。画面右上にあるDevelopment/Productionでパイプラインのモードを指定できるのですが、ジョブから実行する際にはProductionモードにすることを忘れないでください。ジョブからDLTパイプラインを呼び出した際、このモードによってDLTクラスターの挙動が変わります。
- Developmentモード: ジョブ終了後、2時間クラスターが稼働し続けます。
- Productionモード: ジョブ終了後、即座に(約5分後)クラスターが終了します。
Developmentモードは名前の通り、パイプライン開発時に選択するモードで試行錯誤、デバッグを行うため、パイプラインの処理が終了しても即座にDLTクラスターは停止しません。
ジョブからパイプラインを実行する
- サイドバーからJobsにアクセスします。
- Create Jobをクリックします。
- ジョブは複数のタスクから構成することができます。ここでは1つのみのタスクを作成します。タスクのTypeではDelta Live Tables pipelineを選択し、Pipelineでは上で作成したDLTパイプラインを選択します。
- これでDLTパイプラインを呼び出すジョブを定義することができました。
- 右上のRun nowで即時実行することもできますし、右側のScheduleを指定してスケジュール実行することもできます。
なお、ジョブの実行中にDLTのパイプラインにアクセスすると、処理状況をリアルタイムで確認できます。