明けましておめでとうございます。今年もよろしくお願いします。今年初めの記事はDelta Live Tables(DLT)からです。
Delta Live Tablesとは
Databricksにおけるデータパイプライン開発・運用のフレームワークです。背後ではSparkの構造化ストリーミングやDelta Lakeが動いており、チェンジデータキャプチャやストリーミングを必要とするようなデータパイプラインを低工数で実装でき、運用のために必要なオーケストレーション、自動テスト、オートスケール、イベントログ管理などの機能も提供してます。
で、タイトルに戻ります。一昨年にこちらを書きました。
この時点では、パイプラインの画面からしかパイプラインを実行できませんでした。何を言っているのかと思われるかもしれませんが、Delta Live Tablesにおけるパイプラインの実装はDatabricksノートブックで行います。リリース当時はノートブックでロジックを実装、パイプラインの画面で処理を実行という形になっており、画面の行ったり来たりが発生していて、正直イケていませんでした。その後こちらの機能改善がなされました。
これによって、ノートブックから直接パイプラインを実行できるようになり、QoLが上がった訳です。そして、マニュアルを改めて見てみるとチュートリアルの流れも更新されていました。ということで、改めてチュートリアルをウォークスルーします。
CSVファイルのダウンロード
適当なノートブックを作成して、以下を実行します。カタログやスキーマ、ボリューム名はアクセス権のあるものに適宜変更してください。しかし、dbutils.fs.cp
で、URLからボリュームに直接コピーできるの知りませんでした。
my_catalog = "users"
my_schema = "takaaki_yayoi"
my_volume = "dlt_tutorial"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
ファイルがボリュームに保存されました。
ステップ 1: パイプラインを作成する
この部分でも知らない機能がありました。昔はパイプラインを作成する前にパイプラインの実装のノートブックを準備しておいて、作成時に選択する必要がありました。今では、事前のノートブックの準備が不要になっています。
パイプライン作成画面で名前やモードを指定していき、ソースコードの部分は空でもパイプラインを作成できます。この場合、空のノートブックが作成されるので、そこにパイプラインの実装を記述する形になります。
Delta Live Tablesパイプラインを実装する際には、処理結果を保持するマテリアライズドビューを作成します。配信先では、これらのマテリアライズドビューを保存するカタログとスキーマを指定します。
処理の成功や失敗を通知することができますが、ここではスキップします。Advancedではパイプラインのパラメーターをキーバリュー方式で指定します。これで、開発時と運用時で挙動を変更できるようにパイプラインをパラメーター化することができます。このパラメーターは後のステップで参照します。
今回はソースコードでノートブックを指定していないので、作成ボタンを押すと以下のようなダイアログが表示されます。これも以前は無かったフローです。
ステップ 2: Python または SQL を使用してノートブックでマテリアライズド ビューとストリーミング テーブルを宣言する
パイプラインが作成されたのに合わせて、実装用のノートブックが作成されます。こちらを編集してパイプラインを実装します。
ただ、この時点ではこのノートブックは単なるノートブックです。Delta Live Tablesとの連携機能を活用するには、画面右上の接続をクリックして、上のステップで作成したパイプラインを選択、接続をクリックします。
すると、画面下にDLT... と言ったペインが表示されるようになります。DLTはDelta Live Tablesの略です。また、画面の右上のボタンも変化し、検証、起動というものに変わります。これはDLTパイプラインの検証や起動のためのボタンとなります。機能の詳細はこちらをご覧ください。
これで実装の準備ができたので、画面上のセルに以下のロジックを記載します。DLTではPythonあるいはSQLでパイプラインを実装できますが、以下ではPython(PySpark)で実装しています。詳細はPythonやSQLのマニュアルをご覧ください。
# モジュールのインポート
import dlt
from pyspark.sql.functions import *
# パイプラインパラメータを変数に割り当てる
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# ソースデータのパスを定義する
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# ボリュームからデータを取り込むストリーミングテーブルを定義する
@dlt.table(
comment="ニューヨークの人気のある赤ちゃんのファーストネーム。このデータはニューヨーク州保健局から取り込まれました。"
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# データを検証し、列名を変更するマテリアライズドビューを定義する
@dlt.table(
comment="分析のためにクリーンアップおよび準備されたニューヨークの人気のある赤ちゃんのファーストネームデータ。"
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# データのフィルタリング、集計、およびソートされたビューを持つマテリアライズドビューを定義する
@dlt.table(
comment="2021年のニューヨークのトップ赤ちゃんの名前のカウントを要約したテーブル。"
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
ここでいきなり実行せずに、ロジックの検証を行ってみます。画面右上の検証をクリックします。この場合、データの処理は行わず、パイプラインのロジックの検証のみを行います。開発過程では頻繁に使うことになると思います。
ロジックに問題がなければ、DLTグラフタブにはパイプラインの依存関係(リネージ)が表示されます。
ステップ 3: パイプラインの更新を開始する
これでパイプラインの準備ができました。今度は起動ボタンを押して実際にパイプラインの処理を実行します。パイプラインが起動すると、DLTグラフリアルタイムで進捗を確認することができます。
また、DLTイベントログにはパイプラインのイベントログが表示されます。
全ての処理が完了したら、カタログエクスプローラでテーブルを確認します。生データを保持するブロンズテーブル、クレンジング結果を保持するシルバーテーブル、集計結果を保持するゴールドテーブルが作成されていることを確認できます。これらのテーブルはメダリオンアーキテクチャに即したものとなっています。
カタログエクスプローラでも、これらのテーブルのリネージを確認することができます。
最後になりますが、今回のウォークスルーはノートブック中心に行いましたが、パイプラインの実行、運用にフォーカスする場合には、Delta Live TablesパイプラインのUIを活用することをお勧めします。パイプラインの状況をより俯瞰できるようになっています。