みなさん、こんにちは!
Databricks には Delta Live Tables という機能があります。データ品質管理も含めたテーブル作成パイプラインを簡単に実現できる Delta Live Tables ですが、通常のテーブル作成との違いなどイメージが沸きにくい部分もあるかと思います。
本記事では、そんな Delta Live Tables の基本と使い方について、シンプルなデータ処理例をもとに解説していきます。
※本記事では Azure Databricks を使用しています。細かい設定などクラウドプロバイダーによって違う可能性があるためご注意ください。
Delta Live Tables とは?
Delta Live Tables は、ファイルや外部データソースからデータを取り込み、テーブルを作成するパイプラインを構築するためのフレームワークです。
名前からテーブルを想像してしまいますが、あくまでもフレームワークであり、そういう種類のテーブルがあるわけではありません。
通常のテーブル作成との違い
Delta Live Tables には以下のような特徴があります。
- Python または SQL で、Delta Live Tables 専用コマンドを使ってテーブル定義を書く。Python では
name
でテーブル名を明示的に指定しない場合、関数名=テーブル名 となる。 - 定義したテーブルは パイプラインを実行しない限り実体を持つテーブルとして作成されない。 パイプライン実行時にマテリアライズドビューが作成され、カタログにも登録される。
- 通常のテーブルと違い、データロード時の制約を指定することが可能。制約に違反し、取り込みに失敗したレコードはイベントログに記録される。
- データのストリーミング取り込みをサポートしている。
データロード時の制約適用による品質管理 や ストリーミングテーブルの作成 など、通常のテーブル作成では実現できない機能をサポートしている点が特徴です。
注意点として、Delta Live Tables は Premium 以上のプランでのみ利用可能です。
使ってみる
さっそく、具体的な使い方について見ていきましょう。
ノートブック作成
人口統計データを DBFS の sample フォルダにアップロードした後、ノートブックで以下を実行します。
import dlt
from pyspark.sql.types import *
from pyspark.sql.functions import *
# 入力ファイルパス
path = "dbfs:/FileStore/tables/sample/c01.csv"
# スキーマ指定
schema = StructType([
StructField("都道府県コード", StringType(), False),
StructField("都道府県名", StringType(), False),
StructField("元号", StringType(), False),
StructField("和暦(年)", StringType(), False),
StructField("西暦(年)", StringType(), False),
StructField("注", StringType(), False),
StructField("人口(総数)", IntegerType(), False),
StructField("人口(男)", IntegerType(), False),
StructField("人口(女)", IntegerType(), False),
])
# CSVデータ読み込み
@dlt.table(
comment="The raw population data."
)
def population_data_raw():
return (
spark.read\
.format("csv")\
.options(header="true")\
.option('charset', 'shift-jis')\
.load(path, schema=schema)
)
# カラム名変更
@dlt.table(
comment="The population data with renamed columns."
)
@dlt.expect("valid_population", "population > 0")
def population_data_renamed():
return (
dlt.read("population_data_raw")\
.withColumnRenamed("都道府県コード", "prefectures_code")\
.withColumnRenamed("都道府県名", "prefectures")\
.withColumnRenamed("元号", "era")\
.withColumnRenamed("和暦(年)", "year_jp")\
.withColumnRenamed("西暦(年)", "year")\
.withColumnRenamed("注", "note")\
.withColumnRenamed("人口(総数)", "population")\
.withColumnRenamed("人口(男)", "man_population")\
.withColumnRenamed("人口(女)", "woman_population")
)
# 分析用データ作成
@dlt.table(
comment="The population data cleaned and prepared for analysis."
)
def population_data_prepared():
return (
dlt.read("population_data_renamed")\
.filter(col("prefectures_code").rlike("^\d{2}$"))\
.select("prefectures_code", "prefectures", "year", "population")
)
入力ファイル(CSV)を読み込み、データをそのまま格納した populationdataraw、populationdataraw から population が0以上のデータのみ取り込んでカラム名を変更した populationdatarenamed、分析用に必要なデータのみ抽出した populationdataprepared の3つを定義しています。
Python の場合、Delta Live Tables の処理対象となるテーブルの定義関数には以下のルールがあります。
-
@dlt.table
デコレータを付けて宣言する - 関数の返り値は pyspark.DataFrame 型にする
途中で出てきている@dlt.expect
というのは、データロード時にレコードが特定の条件を満たしているかをチェックするためのものです。今回の場合、valid_population という名前で population > 0
という条件を設定しています。こちらに関して、詳しくはこちらの記事をご参照ください。
上記を通常のクラスターで実行した場合、ビューが定義されるだけで、実体を持ったテーブルは作成されません。実体を持ったテーブルを作成するには、以降の手順でパイプラインとして実行する必要があります。
パイプライン設定
メニューの「Delta Live Tables」を開き、「パイプラインを作成」をクリックします。
各項目について、以下のように設定を行います。
- 一般
・パイプライン名:任意の名前
・製品エディション:Advanced
・パイプラインモード:トリガー - ソースコード
・パス:実行対象のノートブックを選択 - 配信先
・ストレージオプション:Hive メタストア(Unity Catalog を利用する場合はそちらを指定)
・ストレージの場所:パイプライン情報の格納場所を指定。デフォルトはdbfs:/pipelines/<pipeline-id>
・デフォルトスキーマ:マテリアライズドビューが作成されるスキーマ。存在しないスキーマを指定した場合、新規作成される。 - クラスター
・クラスターポリシー:なし
・クラスターモード:固定サイズ
・ワーカー:0(ワーカーを使用しない場合、ワーカーの数は0に設定可能) - Advanced
・Driverのタイプ:Standard_F4(クォータに余裕があるものを設定する)
※上記以外はデフォルト値です。あくまで検証用パイプラインの設定であるため、クラスターやドライバーのタイプは実際のユースケースに応じて変更する必要があります。
設定が完了したら「作成」ボタンをクリックします。
注意
パイプライン作成時のクラスターの設定に注意する必要があります。
対象VMのクォータが十分にない場合、以下のようなエラーでパイプライン起動に失敗します。
com.databricks.pipelines.common.errors.deployment.DeploymentException: Failed to launch pipeline cluster 0217-145622-c0vfbs0j: Encountered Quota Exhaustion issue in your account: azureerrorcode: QuotaExceeded, azureerrormessage: Operation could not be completed as it results in exceeding approved Total Regional Cores ... This error is likely due to a misconfiguration in the pipeline. Check the pipeline cluster configuration and associated cluster policy.
デフォルトでは強化オートスケーリングが設定されていたり、ある程度パワフルな構成になっています。
そのため、すべてデフォルト設定のままだと料金が高くつく可能性があるため注意が必要です。
実行
「Delta Live Tables」の一覧から対象のパイプラインを選択し、起動します。
結果確認
作成されたテーブル(マテリアライズドビュー)はカタログから確認できます。
イベントログ確認
パイプラインの実行結果詳細をイベントログで照会することが可能です。
Unity Catalog の場合
SELECT * FROM event_log("")
Hive メタストアの場合
SELECT * FROM delta.`/pipelines//system/events`;
※ストレージの場所がデフォルト設定の場合
レコード分析用SQL
以下のSQLを実行すると、パイプライン実行時に制約チェックの結果が合格/不合格となったレコードを確認できます。
-- イベントログの一時ビュー作成
CREATE OR REPLACE TEMP VIEW event_log_raw
AS SELECT * FROM delta.`/pipelines//system/events`;
-- レコードのメトリクス情報抽出
WITH all_expectations AS(
SELECT
explode(from_json(details:flow_progress:data_quality:expectations, schema_of_json("[{'name':'str', 'dataset':'str',
'passed_records':'int', 'failed_records':'int'}]"))) AS expectation
FROM
event_log_raw
WHERE
details:flow_progress.metrics IS NOT NULL
)
SELECT
expectation_name,
X_Axis,
SUM(Y_Axis) AS Y_Axis
FROM
(
SELECT
expectation.name AS expectation_name,
'Passed' AS X_Axis,
expectation.passed_records AS Y_Axis
FROM
all_expectations
UNION ALL
SELECT
expectation.name AS expectation_name,
'Failed' AS X_Axis,
expectation.failed_records AS Y_Axis
FROM
all_expectations
)
GROUP BY
expectation_name,
X_Axis
;
実際にSQL Editorで実行すると、以下のような結果が得られます。
結果の右側に可視化タブを追加し、以下のように設定することで視覚的に確認することも可能です。
- 可視化タイプ:Pie
- X column:X_Axis
- Y columns:Y_Axis Sum
- Group by:expectation_name