1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DatabricksAdvent Calendar 2022

Day 14

Delta Live Tables(DLT)クイックスタートのウォークスルー

Posted at

Databricksで高品質、高パフォーマンスなETLパイプラインを構築できるソリューションがDelta Live Tables(略してDLT)です。

こちらの記事でクイックスタートを説明していますが、一年以上に書いたものなので改めてDLTの画面をウォークスルーしてみました。

Delta Live Tablesとは

Delta Live Tablesは、信頼性が高く、維持が容易でテスト可能なデータ処理パイプラインを構築するためのフレームワークです。データに対する変換処理を定義すると、Delta Live Tablesはタスクのオーケストレーション、クラスター管理、モニタリング、データ品質、エラー処理を行います。そして、Delta Live Tablesはそれぞれの処理ステップであなたが定義したターゲットスキーマに基づき、どのようにデータが変換されるのかを管理します。
Screen Shot 2022-12-13 at 10.36.43.png

また、Delta Live Tablesのエクスペクテーションを活用してデータ品質を維持することもできます。エクスペクテーションによって、期待されるデータ品質を定義し、エクスペクテーションに沿わないデータをどのように取り扱うのかを定義することができます。
Screen Shot 2022-12-13 at 10.38.13.png

Delta Live Tablesのパイプラインは、Databricksノートブックを使用して簡単に作成することができます。
Screen Shot 2022-12-13 at 10.39.26.png

なお、DLTはメダリオンアーキテクチャを前提としていますので、こちらのコンセプトを理解しておくことをお勧めします。

メダリオンアーキテクチャは、レイクハウスに格納されているデータの品質を示す一連のデータレイヤーを表現します。企業のデータプロダクトに対して信頼できる唯一の情報源(single source of truth)を構築するために、Databricksではマルチレイヤーのアプローチを取ることをお勧めしています。このアーキテクチャは、公立的な分析に最適化されたレイアウトでデータ格納される前の検証、変換を行う複数のレイヤーをデータが追加する際の、原子性、一貫性、分離性、耐久性を保証します。ブロンズ(生)、シルバー(検証済み)、ゴールド(拡張済み)という用語はそれぞれのレイヤーにおけるデータの品質を表現しています。

クイックスタートノートブックのウォークスルー

本記事ではDelta Live TablesパイプラインとWikipediaクリックストリームデータを用いてデモをお見せします。Wikipediaクリックストリームデータを用いて以下のことを行います。

  • 生のJSONクリックストリームデータをテーブルに読み込みます。
  • 生データテーブルからレコードを読み込み、Delta Live Tablesのエクスペクテーションを用いて、クレンジングされたデータを格納する新たなテーブルを作成します。
  • Delta Live Tablesのクエリーを行い、クレンジングデータテーブルのレコードから新たなデータセットを作成します。

以下のノートブックを使用します。なお、Pythonのノートブックでも同様のことが可能です。

パイプラインの準備

上のノートブックだけでもDLTを実行することは可能ですが、追加のパラメーターを設定することで、DLTの取り回しがさらに楽になります。

ヘルパーノートブックの作成

./Includes/setupというノートブックを作成し、以下のロジックを記述します。こちらを実行することで、DLTパイプラインで使用するデータベースやストレージロケーションを設定します。

Python
import pyspark.sql.functions as F
import re

course = "dlt"

username = spark.sql("SELECT current_user()").collect()[0][0]
userhome = f"dbfs:/user/{username}/{course}"
database = f"""{course}_{re.sub("[^a-zA-Z0-9]", "_", username)}_db"""

print(f"""
username: {username}
userhome: {userhome}
database: {database}""")

dbutils.widgets.text("mode", "setup")
mode = dbutils.widgets.get("mode")

if mode == "reset":
    spark.sql(f"DROP DATABASE IF EXISTS {database} CASCADE")
    dbutils.fs.rm(userhome, True)
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database}")
spark.sql(f"USE {database}")

if mode == "cleanup":
    spark.sql(f"DROP DATABASE IF EXISTS {database} CASCADE")
    dbutils.fs.rm(userhome, True)

storage_location = userhome + "/output"

準備用ノートブックの作成、実行

ワークスペースの現在フォルダにDLT-mainというPythonノートブックを作成します。名前は任意です。

%run ./Includes/setup $mode="reset"

上のコマンドを実行することで、ヘルパーノートブックが実行されます。
Screen Shot 2022-12-13 at 10.42.46.png

以下のコマンドを実行してストレージの場所を取得しておきます。

Python
storage_location.split(':')[1]

Screen Shot 2022-12-13 at 10.52.17.png

パイプラインのノートブックの作成

DLTでは、パイプラインの処理を記述するのではなく、パイプラインの各ステップを宣言するアプローチをとっています。これによって、各ステップの意味が明確になることに加え、間の処理はDLTが面倒を見てくれます。dlt-quickstart-sqlというSQLノートブックを作成し、以下のロジックを記述します。

ブロンズテーブル

ブロンズテーブルでは生のデータを保持します。

SQL
CREATE OR REFRESH LIVE TABLE clickstream_raw
COMMENT "/databricks-datasets から取り込まれるWikipediaの生のクリックストリームデータセット(ブロンズ)"
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

シルバーテーブル

ブロンズテーブルに対してクレンジングした結果をシルバーテーブルとして保持します。ここではエクスペクテーションを用いてデータの品質を担保しています。

SQL
CREATE OR REFRESH LIVE TABLE clickstream_prepared(
  -- current_page_titleは非NULL、click_countは1以上であるという制約
  CONSTRAINT valid_current_page EXPECT (current_page_title IS NOT NULL),
  CONSTRAINT valid_count EXPECT (click_count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "分析のためにクレンジング、準備されたWikipediaクリックストリームデータ(シルバー)"
AS SELECT
  curr_title AS current_page_title,
  CAST(n AS INT) AS click_count,
  prev_title AS previous_page_title
FROM live.clickstream_raw;

ゴールドテーブル

BIや機械学習など最終的にビジネスで活用するデータを格納するのがゴールドテーブルです。

Python
CREATE OR REFRESH LIVE TABLE top_spark_referers
COMMENT "Apacke Sparkページにリンクしているクリック数トップページを含むテーブル(ゴールド)"
AS SELECT
  previous_page_title as referrer,
  click_count
FROM live.clickstream_prepared
WHERE current_page_title = 'Apache_Spark'
ORDER BY click_count DESC
LIMIT 10;

このノートブックが完成したら、いよいよDLTパイプラインの作成と実行です。

DLTパイプラインの作成

  1. 左のサイドメニューのワークフローボタンをクリックするところからスタートします。ページの上部にDelta Live Tablesタブが表示されます。
  2. パイプラインを作成をクリックします。
  3. パイプライン名ノートブックライブラリは必須の値です。ノートブックセレクターを用いて、上で作成したdlt-quickstart-sqlのノートブックを選択してください。
    Screen Shot 2022-12-13 at 10.13.21.png
  4. 上で取得したstorage_locationをパイプラインのストレージの場所に貼り付けてください。
    • ストレージの場所は、パイプラインによって生成されるデータ、メタデータ、ログが記録される場所となります。実運用においては、セキュアな外部オブジェクトストレージを指定するようにしてください。
  5. 上のセルで表示されたdatabaseターゲットスキーマに貼り付けてください。
    • ターゲットスキーマは、パイプラインによって生成されたデータを格納するデータベースを保持します。データベースに格納することで、データに対して容易にクエリーを行えるようになります。
      Screen Shot 2022-12-13 at 10.13.29.png
  6. 作成をクリックします。これでDLTパイプラインが作成されました。

DLTパイプラインの実行

  1. DLTパイプラインが表示されます。
    Screen Shot 2022-12-13 at 10.14.53.png

  2. 可能であればパイプラインを開発モードで実行します。これによって、実行の合間で同じクラスターを再利用でき、ジョブ失敗時の自動リトライを無効化しますので、デバッグが容易となります。

  3. 開始をクリックしてテーブルに対する最初の更新をスタートします。Delta Live Tablesは、必要な全てのインフラストラクチャーを自動でデプロイし、全てのデータセット間の依存関係を解決します。

    注意
    依存関係の解決とインフラストラクチャのデプロイのため、最初のテーブルアップデートは数分かかる場合があります。

  4. 数分経つとクラスターが配備され、パイプラインのDAG(有効非巡回グラフ)が表示され、パイプラインが実行されます。
    Screen Shot 2022-12-13 at 10.17.29.png

  5. すべてのテーブルに完了と表示されたらパイプラインが完了したことを意味します。エラーが生じた際には、該当テーブルが赤くなります。
    Screen Shot 2022-12-13 at 10.20.04.png

  6. エクスペクテーションが設定されているテーブルをクリックすると、エクスペクテーションを手寄与した結果を確認することができます。
    Screen Shot 2022-12-13 at 10.20.12.png

テーブルの確認

DLTの処理結果はすべてDeltaテーブルとして保持されます。DLT-mainノートブックに以下のような関数を作成することで、クイックにDeltaテーブルにアクセスすることができます。

Python
def query_table(table_name):
    return spark.sql(f"SELECT * FROM delta.`{storage_location}/tables/{table_name}`")

ブロンズテーブル

Screen Shot 2022-12-13 at 10.27.29.png

シルバーテーブル

Screen Shot 2022-12-13 at 10.27.42.png

ゴールドテーブル

Screen Shot 2022-12-13 at 10.27.48.png

まとめ

Delta Live Tables(DLT)は、背後でSparkの構造化ストリーミングとDelta Lakeを活用しています。DLTを用いることで、ユーザーはSparkやDeltaの詳細を意識することなしに、容易にデータパイプラインを開発、運用できるようになります。本書では触れませんでしたが、DLTの処理結果はすべてイベントログとして、これらもDeltaテーブルとして保持されますので、これらを解析、監視することも可能です。データエンジニアリングに課題を感じられているのであれば、是非DLTを御試しください。この他にも以下のような機能をサポートしています。

Databricks 無料トライアル

Databricks 無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?