1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[2025年1月版] Delta Live Tablesチュートリアルのウォークスルー

Posted at

明けましておめでとうございます。今年もよろしくお願いします。今年初めの記事はDelta Live Tables(DLT)からです。

Delta Live Tablesとは

Databricksにおけるデータパイプライン開発・運用のフレームワークです。背後ではSparkの構造化ストリーミングやDelta Lakeが動いており、チェンジデータキャプチャやストリーミングを必要とするようなデータパイプラインを低工数で実装でき、運用のために必要なオーケストレーション、自動テスト、オートスケール、イベントログ管理などの機能も提供してます。

で、タイトルに戻ります。一昨年にこちらを書きました。

この時点では、パイプラインの画面からしかパイプラインを実行できませんでした。何を言っているのかと思われるかもしれませんが、Delta Live Tablesにおけるパイプラインの実装はDatabricksノートブックで行います。リリース当時はノートブックでロジックを実装、パイプラインの画面で処理を実行という形になっており、画面の行ったり来たりが発生していて、正直イケていませんでした。その後こちらの機能改善がなされました。

これによって、ノートブックから直接パイプラインを実行できるようになり、QoLが上がった訳です。そして、マニュアルを改めて見てみるとチュートリアルの流れも更新されていました。ということで、改めてチュートリアルをウォークスルーします。

CSVファイルのダウンロード

適当なノートブックを作成して、以下を実行します。カタログやスキーマ、ボリューム名はアクセス権のあるものに適宜変更してください。しかし、dbutils.fs.cpで、URLからボリュームに直接コピーできるの知りませんでした。

my_catalog = "users"
my_schema = "takaaki_yayoi"
my_volume = "dlt_tutorial"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

dbutils.fs.cp(download_url, volume_path + filename)

ファイルがボリュームに保存されました。

Screenshot 2025-01-02 at 19.46.40.png

ステップ 1: パイプラインを作成する

この部分でも知らない機能がありました。昔はパイプラインを作成する前にパイプラインの実装のノートブックを準備しておいて、作成時に選択する必要がありました。今では、事前のノートブックの準備が不要になっています。

パイプライン作成画面で名前やモードを指定していき、ソースコードの部分は空でもパイプラインを作成できます。この場合、空のノートブックが作成されるので、そこにパイプラインの実装を記述する形になります。

Delta Live Tablesパイプラインを実装する際には、処理結果を保持するマテリアライズドビューを作成します。配信先では、これらのマテリアライズドビューを保存するカタログとスキーマを指定します。

Screenshot 2025-01-02 at 19.16.47.png

処理の成功や失敗を通知することができますが、ここではスキップします。Advancedではパイプラインのパラメーターをキーバリュー方式で指定します。これで、開発時と運用時で挙動を変更できるようにパイプラインをパラメーター化することができます。このパラメーターは後のステップで参照します。

Screenshot 2025-01-02 at 19.26.34.png

今回はソースコードでノートブックを指定していないので、作成ボタンを押すと以下のようなダイアログが表示されます。これも以前は無かったフローです。

Screenshot 2025-01-02 at 19.18.16.png

ステップ 2: Python または SQL を使用してノートブックでマテリアライズド ビューとストリーミング テーブルを宣言する

パイプラインが作成されたのに合わせて、実装用のノートブックが作成されます。こちらを編集してパイプラインを実装します。

Screenshot 2025-01-02 at 19.18.25.png

Screenshot 2025-01-02 at 19.19.02.png

ただ、この時点ではこのノートブックは単なるノートブックです。Delta Live Tablesとの連携機能を活用するには、画面右上の接続をクリックして、上のステップで作成したパイプラインを選択、接続をクリックします。

Screenshot 2025-01-02 at 19.19.28.png

Screenshot 2025-01-02 at 19.19.34.png

すると、画面下にDLT... と言ったペインが表示されるようになります。DLTはDelta Live Tablesの略です。また、画面の右上のボタンも変化し、検証起動というものに変わります。これはDLTパイプラインの検証や起動のためのボタンとなります。機能の詳細はこちらをご覧ください。

Screenshot 2025-01-02 at 19.19.40.png

これで実装の準備ができたので、画面上のセルに以下のロジックを記載します。DLTではPythonあるいはSQLでパイプラインを実装できますが、以下ではPython(PySpark)で実装しています。詳細はPythonSQLのマニュアルをご覧ください。

# モジュールのインポート

import dlt
from pyspark.sql.functions import *

# パイプラインパラメータを変数に割り当てる

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# ソースデータのパスを定義する

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# ボリュームからデータを取り込むストリーミングテーブルを定義する

@dlt.table(
  comment="ニューヨークの人気のある赤ちゃんのファーストネーム。このデータはニューヨーク州保健局から取り込まれました。"
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# データを検証し、列名を変更するマテリアライズドビューを定義する

@dlt.table(
  comment="分析のためにクリーンアップおよび準備されたニューヨークの人気のある赤ちゃんのファーストネームデータ。"
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("LIVE.baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# データのフィルタリング、集計、およびソートされたビューを持つマテリアライズドビューを定義する

@dlt.table(
  comment="2021年のニューヨークのトップ赤ちゃんの名前のカウントを要約したテーブル。"
)
def top_baby_names_2021():
  return (
    spark.read.table("LIVE.baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

ここでいきなり実行せずに、ロジックの検証を行ってみます。画面右上の検証をクリックします。この場合、データの処理は行わず、パイプラインのロジックの検証のみを行います。開発過程では頻繁に使うことになると思います。

Screenshot 2025-01-02 at 19.20.31.png
Screenshot 2025-01-02 at 19.20.43.png

ロジックに問題がなければ、DLTグラフタブにはパイプラインの依存関係(リネージ)が表示されます。

Screenshot 2025-01-02 at 19.27.43.png

ステップ 3: パイプラインの更新を開始する

これでパイプラインの準備ができました。今度は起動ボタンを押して実際にパイプラインの処理を実行します。パイプラインが起動すると、DLTグラフリアルタイムで進捗を確認することができます。

Screenshot 2025-01-02 at 19.31.39.png
Screenshot 2025-01-02 at 19.31.52.png
Screenshot 2025-01-02 at 19.31.57.png

また、DLTイベントログにはパイプラインのイベントログが表示されます。

Screenshot 2025-01-02 at 20.06.24.png

全ての処理が完了したら、カタログエクスプローラでテーブルを確認します。生データを保持するブロンズテーブル、クレンジング結果を保持するシルバーテーブル、集計結果を保持するゴールドテーブルが作成されていることを確認できます。これらのテーブルはメダリオンアーキテクチャに即したものとなっています。

Screenshot 2025-01-02 at 19.33.53.png
Screenshot 2025-01-02 at 19.34.12.png
Screenshot 2025-01-02 at 19.34.25.png

カタログエクスプローラでも、これらのテーブルのリネージを確認することができます。

Screenshot 2025-01-02 at 19.34.39.png

最後になりますが、今回のウォークスルーはノートブック中心に行いましたが、パイプラインの実行、運用にフォーカスする場合には、Delta Live TablesパイプラインのUIを活用することをお勧めします。パイプラインの状況をより俯瞰できるようになっています。

Screenshot 2025-01-02 at 20.12.18.png

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?