LoginSignup
4
4

Bronze テーブルから Silver テーブルへのデータローディングパターン

Posted at

概要

この記事では、Bronze テーブルから Silver テーブルへのデータローディングパターンを整理します。データエンジニアリングを実施する際には、全件連携、差分連携、履歴連携の3つの主要な連携方法があります。特に、履歴連携は一般的に Slowly Changing Dimension Type 2 (SCD type 2) 相当の処理を含みます。

# 連携方法 概要
1 全件連携 ソースシステムから連携された全件データを反映する方法
2 差分連携 ソースシステムから連携された差分データを反映する方法
3 履歴連携 ソースシステムから連携されたデータに基づき履歴データとしして反映する方法

これらの連携方法に応じて、いくつかのデータローディングパターンが存在します。SCD Type 2 を実装する方法としては、バージョン番号やタイムスタンプで管理する方法などがありますが、ここでは他のテーブルと結合が可能なタイムスタンプ(レコードごとに開始日と終了日を付与する方法)で管理する SCD Type 2 のみを取り上げます。

以下の表は、各連携方法とそれに対応するデータローディングパターン、および主なユースケースを示しています。

# 連携方法 データローディングパターン 主なユースケース
1 全件連携 最後に連携されたファイルにより上書きする処理 全件データが連携されるケース
2 差分連携 差分データ により Upsert する処理 差分データが連携されるケース
3 差分連携 特定のカラム値により選択的上書きする処理 会計データのように特定の年度データの洗替を行うケース
4 差分連携 連携されたファイルを Append する処理 IoT データなど書き込み頻度が多いデータを連携するケース
5 履歴連携 タイムスタンプにより管理する SCD Type 2 として差分履歴データを連携する処理 履歴テーブルを実装するケース
6 履歴連携 ソースシステムの主キー列と INGEST_TIMESTAMP に基づき UPSERT 処理連携する処理 ソースシステムで物理削除されたデータを特定するケース

本記事では、下記表の監査列を保持していることを前提として説明を行います。

# カラム名 概要
1 INGEST_TIMESTAMP 列 データ分析基盤にデータが連携された日時を保持する列
2 UPDATE_TIMESTAMP 列 データ分析基盤にてデータが処理された日時を保持する列
3 DATASOURCE 列 ソースファイル名を保持する列

また、Bronze テーブルからデータを抽出する際には、指定したUPDATE_TIMESTAMP列以降のデータから求めたINGEST_TIMESTAMP列の最小の値以降のレコードを抽出する方法を実施しますが、下記の記事で詳細を説明しています。

データローディングパターンの処理ステップ

前提事項

データローディングパターンの処理ステップでは、以下の前提条件を満たすことを前提としています。

  • Bronze テーブルにおいてインタフェースファイルごとのデータがすべて保持されていること
  • 連携されたファイル内での主キー制約が保証されていること
  • ingest_timestampが連携されたファイルごとに異なること

1. 最後に連携されたファイルにより上書きする処理

  1. Bronze テーブルからデータを抽出
    1. 処理日に応じてUPDATE_TIMESTAMP列が指定したタイムスタンプ以降のデータを取得
    2. そのデータにおけるingest_timestamp列の最大値を取得
    3. テーブルからingest_timestamp列がその最大値以降のデータを取得
  2. Silver テーブルと同等のスキーマに変換
    1. そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
    2. ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
  3. Silver テーブルへの連携前準備
    1. 監査列を更新
  4. 連携処理
    1. Silver テーブルへOVERWRITEモードで上書きを実施

2. 差分データ により Upsert する処理

  1. Bronze テーブルからデータを抽出
    1. 処理日に応じてUPDATE_TIMESTAMP列が指定したタイムスタンプ以降のデータを取得
    2. そのデータにおけるINGEST_TIMESTAMP列の最小値を算出
    3. テーブルからINGEST_TIMESTAMP列がその最小値以降のデータを取得
    4. Silver テーブルにおける主キーのカラムごとにINGEST_TIMESTAMP列の最大のデータを取得
    5. そのデータとテーブルを内部結合したデータ(主キーごとに最後に連携されたデータ)を取得
  2. Silver テーブルと同等のスキーマに変換
    1. そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
    2. ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
  3. Silver テーブルへの連携前準備
    1. 監査列を更新
  4. 連携処理
    1. Silver テーブルへ Merge 文により Upsert 処理を実施

3. 特定のカラム値により選択的上書きする処理

  1. Bronze テーブルからデータを抽出
    1. 処理日に応じてUPDATE_TIMESTAMP列が指定したタイムスタンプ以降のデータを取得
    2. そのデータにおけるINGEST_TIMESTAMP列の最小値を算出
    3. テーブルからINGEST_TIMESTAMP列がその最小値以降のデータを取得
    4. 選択的上書きの基準となるカラムごとにINGEST_TIMESTAMP列の最大のデータを取得
    5. そのデータにに基づきテーブルをフィルタリングしたデータ(選択的上書きの基準となるカラムごとに最後に連携されたデータ)を取得
  2. Silver テーブルと同等のスキーマに変換
    1. そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
    2. ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
  3. Silver テーブルへの連携前準備
    1. 監査列を更新
  4. 連携処理
    1. Silver テーブルへ選択的上書きを実施 *1

*1 Databricks (Spark + Delte Lake)にて選択的上書きを実施する際に、replaceWhereオプションを指定してOVERWRITEモードで書き込みを実施する。

4. 連携されたファイルを Append する処理

  1. Bronze テーブルからデータを抽出
    1. 処理日に応じてUPDATE_TIMESTAMP列が指定したタイムスタンプ以降のデータを取得
  2. Silver テーブルと同等のスキーマに変換
    1. そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
    2. ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
  3. Silver テーブルへの連携前準備
    1. 監査列を更新
  4. 連携処理
    1. Silver テーブルへ Append を実施

5. タイムスタンプにより管理する SCD Type 2 として差分履歴データを連携する処理

  1. Bronze テーブルからデータを抽出
    1. 処理日に応じてUPDATE_TIMESTAMP列が指定したタイムスタンプ以降のデータを取得
    2. そのデータにおけるingest_timestamp列の最大値を取得
    3. テーブルからingest_timestamp列がその最大値以降のデータを取得
    4. 処理日に応じてUPDATE_TIMESTAMP列が指定したタイムスタンプ以降のデータを取得
    5. そのデータにおけるINGEST_TIMESTAMP列の最小値を算出
    6. テーブルからINGEST_TIMESTAMP列が最小値以降のデータを取得
  2. Silver テーブルと同等のスキーマに変換
    1. INGEST_TIMESTAMP列の値を開始日の列と NULL を保持させた終了日の列を追加
    2. そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
    3. ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
  3. Silver テーブルから終了日を更新すべきデータ
    1. そのデータにより開始日以外の主キー列とINGEST_TIMESTAMPが開始日と終了日に含まれるデータ(終了日を更新すべきデータ)を取得
    2. そのデータとBronze テーブルから抽出したデータを UNION ALL したデータを取得
  4. 終了日の更新
    1. そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
    2. ウィンドウ(分析)関数により開始日を基準として開始日以外の主キーごとの1つ後の開始日を取得し、その値(取得できない場合はタイムスタンプの最後の日時)を開始日列として追加
  5. Silver テーブルへの連携前準備
    1. 監査列を更新
  6. 連携処理
    1. Silver テーブルへ Merge 文により Upsert 処理を実施

6. ソースシステムの主キー列と INGEST_TIMESTAMP に基づき UPSERT 処理連携する処理

  1. Bronze テーブルからデータを抽出
    1. 処理日に応じてUPDATE_TIMESTAMP列が指定したタイムスタンプ以降のデータを取得
  2. Silver テーブルと同等のスキーマに変換
    1. INGEST_TIMESTAMP列の値を開始日として追加
    2. そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
    3. ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
  3. Silver テーブルへの連携前準備
    1. 監査列を更新
  4. 連携処理
    1. Silver テーブルへ Merge 文により Upsert 処理を実施

まとめ

この記事では、Bronze テーブルから Silver テーブルへのデータローディングパターンを詳細に解説しました。全件連携、差分連携、履歴連携の3つの主要な連携方法を中心に、それぞれの連携方法に対応するデータローディングパターンと主なユースケースを示しました。

また、各連携方法に対応するデータローディングパターンの具体的な処理ステップを説明し、それぞれのステップで必要となる前提条件や処理内容を明示しました。これらの情報は、データエンジニアリングを実施する際の参考となるでしょう。

これらの情報を通じて、読者がデータローディングパターンの理解を深め、自身のデータエンジニアリングの業務に活かすことができることを期待しています。

修正履歴

2024/05/22

初回リリース。

4
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
4