Delta Lake上でのSurrogate KeyとHashdiffの生成
Delta Lake × Data Vaultのプロジェクトで、PySparkによるSurrogate Key(代理キー)とHash生成を実装しました。
本記事では、その具体的な手法を紹介します。
データセットの準備
Databricks上で事前に用意されている TPC-H lineitem
データセットを使用します。
path = 'dbfs:/databricks-datasets/tpch/delta-001/lineitem/'
df = spark.read.format("delta").load(path)
※ 本記事では、ビジネスキーのNULL値チェックはData Qualityフレームワーク(例:DQX)で事前に処理されている前提とします。
Data Vaultにおけるキーの取り扱い
Data Vaultでは、レコードの一意性や不変性を担保するために、SHA2-256ベースのハッシュによるキー生成がよく用いられます。
ここでは次の3種類のキーを生成します:
- 複合Surrogate Key(1レコードを一意に表現)
- 個別のビジネスキーごとのSurrogate Key(Hub接続用)
- Hashdiff(SatelliteのSCD2変化検知用)
使用するカラム一覧は以下の通りです:
list_sks = [
'l_orderkey', 'l_partkey', 'l_suppkey', 'l_linenumber',
'l_extendedprice', 'l_discount', 'l_tax',
'l_returnflag', 'l_linestatus', 'l_shipdate',
'l_commitdate', 'l_receiptdate',
'l_shipinstruct', 'l_shipmode', 'l_comment'
]
Surrogate Key / Hashdiffの生成クラス
以下のクラスを使えば、Surrogate KeyやHashdiffの生成ロジックを一元管理できます。
※ NULL処理(coalesce()など)はここでは省略していますが、必要に応じて実装可能です。
import pyspark.sql.functions as F
class KeyGenerator:
"""
Surrogate Key / Hashdiff生成のためのPySparkユーティリティクラス
"""
@staticmethod
def generate_surrogate_key(df, cols, output_col=None):
if output_col is None:
output_col = "sk_" + "_".join(cols)
return df.withColumn(
output_col,
F.sha2(F.concat_ws("||", *[F.col(c).cast("string") for c in cols]), 256)
)
@staticmethod
def generate_individual_surrogate_keys(df, cols, prefix="sk"):
for col in cols:
df = df.withColumn(
f"{prefix}_{col}",
F.sha2(F.col(col).cast("string"), 256)
)
return df
@staticmethod
def generate_hashdiff(df, cols, output_col="hashdiff"):
return df.withColumn(
output_col,
F.sha2(F.concat_ws("||", *[F.col(c).cast("string") for c in cols]), 256)
)
実装例
from datetime import datetime
# バッチ開始時点のtimestampを全行に適用する
load_date_literal = datetime.now()
df = df.select("*", F.lit(load_date_literal).alias("load_date"))
# 複合Surrogate Keyの生成
df = KeyGenerator.generate_surrogate_key(df, cols=list_sks)
# 個別Surrogate Keyの生成
df = KeyGenerator.generate_individual_surrogate_keys(df, cols=list_sks)
# Hashdiffの生成(例:Satellite用に量とロード日も含める)
df = KeyGenerator.generate_hashdiff(df, cols=list_sks + ['l_quantity', 'load_date'])
実務でのポイント
開発初期段階では、生成されたSurrogate KeyやHashdiffだけでなく、元のビジネスカラムもDataFrameに残しておくと、SatelliteやHubへのロード結果や、BIツール側の期待値と乖離が出た際に、元データと照合して原因を特定する作業が楽になります。
ご意見やご感想、あるいはより良い実装方法のご提案などがあれば、ぜひコメントでお知らせください。
少しでも現場での実装の参考になればと思います。