概要
この記事では、Bronze テーブルから Silver テーブルへのデータローディングパターンを整理します。データエンジニアリングを実施する際には、全件連携、差分連携、履歴連携の3つの主要な連携方法があります。特に、履歴連携は一般的に Slowly Changing Dimension Type 2 (SCD type 2) 相当の処理を含みます。
# | 連携方法 | 概要 |
---|---|---|
1 | 全件連携 | ソースシステムから連携された全件データを反映する方法 |
2 | 差分連携 | ソースシステムから連携された差分データを反映する方法 |
3 | 履歴連携 | ソースシステムから連携されたデータに基づき履歴データとしして反映する方法 |
これらの連携方法に応じて、いくつかのデータローディングパターンが存在します。SCD Type 2 を実装する方法としては、バージョン番号やタイムスタンプで管理する方法などがありますが、ここでは他のテーブルと結合が可能なタイムスタンプ(レコードごとに開始日と終了日を付与する方法)で管理する SCD Type 2 のみを取り上げます。
以下の表は、各連携方法とそれに対応するデータローディングパターン、および主なユースケースを示しています。
# | 連携方法 | データローディングパターン | 主なユースケース |
---|---|---|---|
1 | 全件連携 | 最後に連携されたファイルにより上書きする処理 | 全件データが連携されるケース |
2 | 差分連携 | 差分データ により Upsert する処理 | 差分データが連携されるケース |
3 | 差分連携 | 特定のカラム値により選択的上書きする処理 | 会計データのように特定の年度データの洗替を行うケース |
4 | 差分連携 | 一定期間のデータを上書きする処理 | 主キーがないことや物理削除が実施される場合にデータの洗替を行うケース |
5 | 差分連携 | 連携されたファイルを Append する処理 | IoT データなど書き込み頻度が多いデータを連携するケース |
6 | 履歴連携 | タイムスタンプにより管理する SCD Type 2 として差分履歴データを連携する処理 | 履歴テーブルを実装するケース |
7 | 履歴連携 | ソースシステムの主キー列と INGEST_TIMESTAMP に基づき UPSERT 処理連携する処理 | ソースシステムで物理削除されたデータを特定するケース |
本記事では、下記表の監査列を保持していることを前提として説明を行います。
# | カラム名 | 概要 |
---|---|---|
1 | INGEST_TIMESTAMP 列 | データ分析基盤にデータが連携された日時を保持する列 |
2 | UPDATE_TIMESTAMP 列 | データ分析基盤にてデータが処理された日時を保持する列 |
3 | DATASOURCE 列 | ソースファイル名を保持する列 |
また、Bronze テーブルからデータを抽出する際には、指定したUPDATE_TIMESTAMP
列以降のデータから求めたINGEST_TIMESTAMP
列の最小の値以降のレコードを抽出する方法を実施しますが、下記の記事で詳細を説明しています。
データローディングパターンの処理ステップ
前提事項
データローディングパターンの処理ステップでは、以下の前提条件を満たすことを前提としています。
- Bronze テーブルにおいてインタフェースファイルごとのデータがすべて保持されていること
- 連携されたファイル内での主キー制約が保証されていること
-
ingest_timestamp
が連携されたファイルごとに異なること
1. 最後に連携されたファイルにより上書きする処理
- Bronze テーブルからデータを抽出
- 処理日に応じて
UPDATE_TIMESTAMP
列が指定したタイムスタンプ以降のデータを取得 - そのデータにおける
ingest_timestamp
列の最大値を取得 - テーブルから
ingest_timestamp
列がその最大値以降のデータを取得
- 処理日に応じて
- Silver テーブルと同等のスキーマに変換
- そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
- ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
- Silver テーブルへの連携前準備
- 監査列を更新
- 連携処理
- Silver テーブルへ
OVERWRITE
モードで上書きを実施
- Silver テーブルへ
2. 差分データ により Upsert する処理
-
Bronze テーブルからデータを抽出
- 処理日に応じて
UPDATE_TIMESTAMP
列が指定したタイムスタンプ以降のデータを取得 - そのデータにおける
INGEST_TIMESTAMP
列の最小値を算出 - テーブルから
INGEST_TIMESTAMP
列がその最小値以降のデータを取得 - Silver テーブルにおける主キーのカラムごとに
INGEST_TIMESTAMP
列の最大のデータを取得 - そのデータとテーブルを内部結合したデータ(主キーごとに最後に連携されたデータ)を取得
- 処理日に応じて
-
Silver テーブルと同等のスキーマに変換
- ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
-
Silver テーブルへの連携前準備
- 監査列を更新
-
連携処理
- Silver テーブルへ Merge 文により Upsert 処理を実施
3. 特定のカラム値により選択的上書きする処理
- Bronze テーブルからデータを抽出
- 処理日に応じて
UPDATE_TIMESTAMP
列が指定したタイムスタンプ以降のデータを取得 - そのデータにおける
INGEST_TIMESTAMP
列の最小値を算出 - テーブルから
INGEST_TIMESTAMP
列がその最小値以降のデータを取得 - 選択的上書きの基準となるカラムごとに
INGEST_TIMESTAMP
列の最大のデータを取得 - そのデータにに基づきテーブルをフィルタリングしたデータ(選択的上書きの基準となるカラムごとに最後に連携されたデータ)を取得
- 処理日に応じて
- Silver テーブルと同等のスキーマに変換
- そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
- ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
- Silver テーブルへの連携前準備
- 監査列を更新
- 連携処理
- Silver テーブルへ選択的上書きを実施 *1
*1 Databricks (Spark + Delte Lake)にて選択的上書きを実施する際に、replaceWhere
オプションを指定してOVERWRITE
モードで書き込みを実施する。
4. 一定期間のデータを上書きする処理
- Bronze テーブルからデータを抽出
- 処理日に応じて
UPDATE_TIMESTAMP
列が指定したタイムスタンプ以降のデータを取得 - そのデータにおける
INGEST_TIMESTAMP
列の最小値を算出 - テーブルから
INGEST_TIMESTAMP
列がその最小値以降のレコードを取得 -
INGEST_TIMESTAMP
列ごとに連携対象の期間のレコードを取得 *1-
INGEST_TIMESTAMP
列を降順でソートする -
INGEST_TIMESTAMP
列ごとにウォーターマーク列の最大とウォーターマーク列の最小を求める -
INGEST_TIMESTAMP
列が最も新しい日付の場合には、ウォーターマーク列の最大を最大日として、INGEST_TIMESTAMP
列の日付からにデータ連携期間の日付を減算した日付を最小日として保持 - それ以降の日付の場合には、前の
INGEST_TIMESTAMP
列の最小日から 1秒、あるいは、1日減算した日付を最大を最大日として、INGEST_TIMESTAMP
列の日付からにデータ連携期間の日付を減算した日付を最小日として保持
-
-
INGEST_TIMESTAMP
列がその最小値以降のレコードとINGEST_TIMESTAMP
列ごとに連携対象の期間のレコードを内部結合したレコードを取得
- 処理日に応じて
- Silver テーブルと同等のスキーマに変換
- ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
- Silver テーブルへの連携前準備
- 監査列を更新
- 連携処理
-
INGEST_TIMESTAMP
列の最小日にデータ連携期間の日付を減算した日付を取得 - 取得した日付以降の条件でSilver テーブルへ選択的上書きを実施 *2
-
*1 処理方法に関する下記の補足記事を投稿。
*2 Databricks (Spark + Delte Lake)にて選択的上書きを実施する際に、replaceWhere
オプションを指定してOVERWRITE
モードで書き込みを実施する。
5. 連携されたファイルを Append する処理
- Bronze テーブルからデータを抽出
- 処理日に応じて
UPDATE_TIMESTAMP
列が指定したタイムスタンプ以降のデータを取得
- 処理日に応じて
- Silver テーブルと同等のスキーマに変換
- そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
- ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
- Silver テーブルへの連携前準備
- 監査列を更新
- 連携処理
- Silver テーブルへ Append を実施
6. タイムスタンプにより管理する SCD Type 2 として差分履歴データを連携する処理
- Bronze テーブルからデータを抽出
- 処理日に応じて
UPDATE_TIMESTAMP
列が指定したタイムスタンプ以降のデータを取得 - そのデータにおける
ingest_timestamp
列の最大値を取得 - テーブルから
ingest_timestamp
列がその最大値以降のデータを取得 - 処理日に応じて
UPDATE_TIMESTAMP
列が指定したタイムスタンプ以降のデータを取得 - そのデータにおける
INGEST_TIMESTAMP
列の最小値を算出 - テーブルから
INGEST_TIMESTAMP
列が最小値以降のデータを取得
- 処理日に応じて
- Silver テーブルと同等のスキーマに変換
-
INGEST_TIMESTAMP
列の値を開始日の列と NULL を保持させた終了日の列を追加 - そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
- ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
-
- Silver テーブルから終了日を更新すべきデータ
- そのデータにより開始日以外の主キー列と
INGEST_TIMESTAMP
が開始日と終了日に含まれるデータ(終了日を更新すべきデータ)を取得 - そのデータとBronze テーブルから抽出したデータを UNION ALL したデータを取得
- そのデータにより開始日以外の主キー列と
- 終了日の更新
- そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
- ウィンドウ(分析)関数により開始日を基準として開始日以外の主キーごとの1つ後の開始日を取得し、その値(取得できない場合はタイムスタンプの最後の日時)を開始日列として追加
- Silver テーブルへの連携前準備
- 監査列を更新
- 連携処理
- Silver テーブルへ Merge 文により Upsert 処理を実施
7. ソースシステムの主キー列と INGEST_TIMESTAMP に基づき UPSERT 処理連携する処理
- Bronze テーブルからデータを抽出
- 処理日に応じて
UPDATE_TIMESTAMP
列が指定したタイムスタンプ以降のデータを取得
- 処理日に応じて
- Silver テーブルと同等のスキーマに変換
-
INGEST_TIMESTAMP
列の値を開始日として追加 - そのデータにて連携先 Silver テーブルで保証すべき主キー制約を保証
- ソースデータのカラム値を連携先 Silver テーブルのデータ型に変換
-
- Silver テーブルへの連携前準備
- 監査列を更新
- 連携処理
- Silver テーブルへ Merge 文により Upsert 処理を実施
まとめ
この記事では、Bronze テーブルから Silver テーブルへのデータローディングパターンを詳細に解説しました。全件連携、差分連携、履歴連携の3つの主要な連携方法を中心に、それぞれの連携方法に対応するデータローディングパターンと主なユースケースを示しました。
また、各連携方法に対応するデータローディングパターンの具体的な処理ステップを説明し、それぞれのステップで必要となる前提条件や処理内容を明示しました。これらの情報は、データエンジニアリングを実施する際の参考となるでしょう。
これらの情報を通じて、読者がデータローディングパターンの理解を深め、自身のデータエンジニアリングの業務に活かすことができることを期待しています。
修正履歴
2024/05/22
初回リリース。
2024/07/03
一定期間のデータを上書きする処理を追加。