1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks Delta Live Tables にて Bronze テーブルから Silver テーブルに書き込む処理方法

Last updated at Posted at 2024-10-06

概要

Databricks Delta Live Tables (DLT)にて Bronze テーブルから Silver テーブルに書き込む処理方法を共有します。DLT にて Change Data Capture を実施できる APPLY CHANGES API 機能を利用することで、簡単に実装できました。

image.png

image.png

引用元:APPLY CHANGES APIs : Delta Live Tablesを使用してチェンジデータキャプチャを簡素化 | Databricks on AWS

Bronze テーブルから Silver テーブルに対する処理方法としては、PySpark によるデータエンジニアリング実践という長い記事における3-2. Silver テーブルへのデータ書き込み例という章にて投稿しています。

image.png

引用元:PySpark によるデータエンジニアリング実践 #Python - Qiita

その処理の中では下記のステップを実行しています。

  1. Silver テーブルに反映すべきデータを Bronze テーブルから取得
  2. データ型を変換
  3. データの一意性を保証
  4. Silver テーブルに対して Upsert 処理

それらのステップを DLT で実装する方法として、Change Data Capture 機能とビュー作成の機能により実装できました。

# ステップ DLT での実装方法
1 Silver テーブルに反映すべきデータを Bronze テーブルから取得 APPLY CHANGES API 機能により実装
2 データ型を変換 ビュー作成により実装
3 データの一意性を保証 APPLY CHANGES API 機能により実装
4 Silver テーブルに対して Upsert 処理 APPLY CHANGES API 機能により実装

本記事では、下記の Delta Live Tables パイプラインを構築する手順を紹介します。

image.png

事前準備

1. カタログとスキーマを作成

%sql
CREATE CATALOG IF NOT EXISTS manabian_test;
CREATE SCHEMA IF NOT EXISTS manabian_test.dlt_brz2slv;

image.png

2. Volumes の作成とソースファイルの配置

%sql
CREATE VOLUME IF NOT EXISTS manabian_test.dlt_brz2slv.src_01;
dbutils.fs.cp(
    "/databricks-datasets/tpch/data-001/part/part.tbl",
    "/Volumes/manabian_test/dlt_brz2slv/src_01/part.tbl",
    True,
)

image.png

Delta Live Tables のプログラム作成と登録

Delta Live Tables のプログラムをノートブックに記述

import dlt
from pyspark.sql.functions import col

image.png

@dlt.table(
    name="part_raw",
)
def bronze():
    # ソースファイルから読み込み
    src_file_path = "/Volumes/manabian_test/dlt_brz2slv/src_01/part.tbl"
    df = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "False")
        .option("inferSchema", "False")
        .option("sep", "|")
        .load(src_file_path)
    )

    # ソースファイルにヘッダーがないため、カラム名を変更
    renamed_cols_names = {
        '_c0':'p_partkey',
        '_c1':'p_name',
        '_c2':'p_mfgr',
        '_c3':'p_brand',
        '_c4':'p_type',
        '_c5':'p_size',
        '_c6':'p_container',
        '_c7':'p_retailprice',
        '_c8':'p_comment',
    }
    for existing_col,new_col in renamed_cols_names.items():
        df = df.withColumnRenamed(existing_col, new_col)

    # 最後のカラムを削除
    df = df.drop('_c9')


    # 監査列として、`_datasource`列と`_ingest_timestamp`列を追加
    df = df.select("*", col("_metadata").alias("_metadata"))
    df = (
        df.withColumn("_datasource", col("_metadata.file_path"))
        .withColumn("_ingest_timestamp", col("_metadata.file_modification_time"))
        .drop("_metadata")
    )

    return df

image.png

@dlt.view(
    name="_tmp_part",
)
def temp_view_for_silver():
    df = spark.readStream.table("LIVE.part_raw")
    temp_view_name = "_tmp_part_raw"
    df.createOrReplaceTempView(temp_view_name)
    brz_to_slv_sql = f'''
    SELECT
        p_partkey::long,
        p_name,
        p_mfgr,
        p_brand,
        p_type,
        p_size::int,
        p_container,
        p_retailprice::decimal(12, 2),
        p_comment,
        _datasource,
        _ingest_timestamp
    FROM
        {temp_view_name}
    '''
    df = spark.sql(brz_to_slv_sql)
    return df

image.png

dlt.create_streaming_table("part")
dlt.apply_changes(
  target = "part",
  source = "_tmp_part",
  keys = ["p_partkey"],
  sequence_by = col("_ingest_timestamp"),
  stored_as_scd_type = "1",
)

image.png

2. Delta Live Tables として登録

image.png

実行結果の確認

1. Delta Live Tables の実行

image.png

2. 書き込み先テーブルのデータを確認

df = spark.table("manabian_test.dlt_brz2slv.part_raw")
df.display()

image.png

df = spark.table("manabian_test.dlt_brz2slv.part")
df.display()

image.png

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?