0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PySpark/Databricksデータエンジニアのための実践的Data Vault実装

Last updated at Posted at 2025-06-01

Delta Lake上でのSurrogate KeyとHashdiffの生成

Delta Lake × Data Vaultのプロジェクトで、PySparkによるSurrogate Key(代理キー)とHash生成を実装しました。

本記事では、その具体的な手法を紹介します。

データセットの準備

Databricks上で事前に用意されている TPC-H lineitem データセットを使用します。

path = 'dbfs:/databricks-datasets/tpch/delta-001/lineitem/'
df = spark.read.format("delta").load(path)

※ 本記事では、ビジネスキーのNULL値チェックはData Qualityフレームワーク(例:DQX)で事前に処理されている前提とします。

Data Vaultにおけるキーの取り扱い

Data Vaultでは、レコードの一意性や不変性を担保するために、SHA2-256ベースのハッシュによるキー生成がよく用いられます。

ここでは次の3種類のキーを生成します:

  • 複合Surrogate Key(1レコードを一意に表現)
  • 個別のビジネスキーごとのSurrogate Key(Hub接続用)
  • Hashdiff(SatelliteのSCD2変化検知用)

使用するカラム一覧は以下の通りです:

list_sks = [
    'l_orderkey', 'l_partkey', 'l_suppkey', 'l_linenumber',
    'l_extendedprice', 'l_discount', 'l_tax',
    'l_returnflag', 'l_linestatus', 'l_shipdate',
    'l_commitdate', 'l_receiptdate',
    'l_shipinstruct', 'l_shipmode', 'l_comment'
]

Surrogate Key / Hashdiffの生成クラス

以下のクラスを使えば、Surrogate KeyやHashdiffの生成ロジックを一元管理できます。

※ NULL処理(coalesce()など)はここでは省略していますが、必要に応じて実装可能です。

import pyspark.sql.functions as F

class KeyGenerator:
    """
    Surrogate Key / Hashdiff生成のためのPySparkユーティリティクラス
    """

    @staticmethod
    def generate_surrogate_key(df, cols, output_col=None):
        if output_col is None:
            output_col = "sk_" + "_".join(cols)
        return df.withColumn(
            output_col,
            F.sha2(F.concat_ws("||", *[F.col(c).cast("string") for c in cols]), 256)
        )

    @staticmethod
    def generate_individual_surrogate_keys(df, cols, prefix="sk"):
        for col in cols:
            df = df.withColumn(
                f"{prefix}_{col}",
                F.sha2(F.col(col).cast("string"), 256)
            )
        return df

    @staticmethod
    def generate_hashdiff(df, cols, output_col="hashdiff"):
        return df.withColumn(
            output_col,
            F.sha2(F.concat_ws("||", *[F.col(c).cast("string") for c in cols]), 256)
        )

実装例

from datetime import datetime

# バッチ開始時点のtimestampを全行に適用する
load_date_literal = datetime.now()
df = df.select("*", F.lit(load_date_literal).alias("load_date"))

# 複合Surrogate Keyの生成
df = KeyGenerator.generate_surrogate_key(df, cols=list_sks)

# 個別Surrogate Keyの生成
df = KeyGenerator.generate_individual_surrogate_keys(df, cols=list_sks)

# Hashdiffの生成(例:Satellite用に量とロード日も含める)
df = KeyGenerator.generate_hashdiff(df, cols=list_sks + ['l_quantity', 'load_date'])

実務でのポイント

開発初期段階では、生成されたSurrogate KeyやHashdiffだけでなく、元のビジネスカラムもDataFrameに残しておくと、SatelliteやHubへのロード結果や、BIツール側の期待値と乖離が出た際に、元データと照合して原因を特定する作業が楽になります。

ご意見やご感想、あるいはより良い実装方法のご提案などがあれば、ぜひコメントでお知らせください。
少しでも現場での実装の参考になればと思います。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?