概要
Databricks Delta Live Tables (DLT)にて Bronze テーブルから Silver テーブルに書き込む処理方法を共有します。DLT にて Change Data Capture を実施できる APPLY CHANGES API 機能を利用することで、簡単に実装できました。
引用元:APPLY CHANGES APIs : Delta Live Tablesを使用してチェンジデータキャプチャを簡素化 | Databricks on AWS
Bronze テーブルから Silver テーブルに対する処理方法としては、PySpark によるデータエンジニアリング実践
という長い記事における3-2. Silver テーブルへのデータ書き込み例
という章にて投稿しています。
引用元:PySpark によるデータエンジニアリング実践 #Python - Qiita
その処理の中では下記のステップを実行しています。
- Silver テーブルに反映すべきデータを Bronze テーブルから取得
- データ型を変換
- データの一意性を保証
- Silver テーブルに対して Upsert 処理
それらのステップを DLT で実装する方法として、Change Data Capture 機能とビュー作成の機能により実装できました。
# | ステップ | DLT での実装方法 |
---|---|---|
1 | Silver テーブルに反映すべきデータを Bronze テーブルから取得 | APPLY CHANGES API 機能により実装 |
2 | データ型を変換 | ビュー作成により実装 |
3 | データの一意性を保証 | APPLY CHANGES API 機能により実装 |
4 | Silver テーブルに対して Upsert 処理 | APPLY CHANGES API 機能により実装 |
本記事では、下記の Delta Live Tables パイプラインを構築する手順を紹介します。
事前準備
1. カタログとスキーマを作成
%sql
CREATE CATALOG IF NOT EXISTS manabian_test;
CREATE SCHEMA IF NOT EXISTS manabian_test.dlt_brz2slv;
2. Volumes の作成とソースファイルの配置
%sql
CREATE VOLUME IF NOT EXISTS manabian_test.dlt_brz2slv.src_01;
dbutils.fs.cp(
"/databricks-datasets/tpch/data-001/part/part.tbl",
"/Volumes/manabian_test/dlt_brz2slv/src_01/part.tbl",
True,
)
Delta Live Tables のプログラム作成と登録
Delta Live Tables のプログラムをノートブックに記述
import dlt
from pyspark.sql.functions import col
@dlt.table(
name="part_raw",
)
def bronze():
# ソースファイルから読み込み
src_file_path = "/Volumes/manabian_test/dlt_brz2slv/src_01/part.tbl"
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "False")
.option("inferSchema", "False")
.option("sep", "|")
.load(src_file_path)
)
# ソースファイルにヘッダーがないため、カラム名を変更
renamed_cols_names = {
'_c0':'p_partkey',
'_c1':'p_name',
'_c2':'p_mfgr',
'_c3':'p_brand',
'_c4':'p_type',
'_c5':'p_size',
'_c6':'p_container',
'_c7':'p_retailprice',
'_c8':'p_comment',
}
for existing_col,new_col in renamed_cols_names.items():
df = df.withColumnRenamed(existing_col, new_col)
# 最後のカラムを削除
df = df.drop('_c9')
# 監査列として、`_datasource`列と`_ingest_timestamp`列を追加
df = df.select("*", col("_metadata").alias("_metadata"))
df = (
df.withColumn("_datasource", col("_metadata.file_path"))
.withColumn("_ingest_timestamp", col("_metadata.file_modification_time"))
.drop("_metadata")
)
return df
@dlt.view(
name="_tmp_part",
)
def temp_view_for_silver():
df = spark.readStream.table("LIVE.part_raw")
temp_view_name = "_tmp_part_raw"
df.createOrReplaceTempView(temp_view_name)
brz_to_slv_sql = f'''
SELECT
p_partkey::long,
p_name,
p_mfgr,
p_brand,
p_type,
p_size::int,
p_container,
p_retailprice::decimal(12, 2),
p_comment,
_datasource,
_ingest_timestamp
FROM
{temp_view_name}
'''
df = spark.sql(brz_to_slv_sql)
return df
dlt.create_streaming_table("part")
dlt.apply_changes(
target = "part",
source = "_tmp_part",
keys = ["p_partkey"],
sequence_by = col("_ingest_timestamp"),
stored_as_scd_type = "1",
)
2. Delta Live Tables として登録
実行結果の確認
1. Delta Live Tables の実行
2. 書き込み先テーブルのデータを確認
df = spark.table("manabian_test.dlt_brz2slv.part_raw")
df.display()
df = spark.table("manabian_test.dlt_brz2slv.part")
df.display()