記事を書いた背景
- この記事はdbt Advent Calendar 2023の21日目の記事です
- dbtを用いてモデリングを行う際に冪等性を担保する場合は、通常Incrementalモデルを使用します
- しかし、以下のような場合にはIncrementalモデルを用いた冪等性の担保を実現することは難しいです
- レイヤアーキテクチャを採用している基盤で、Rawやデータレイクに蓄積されたデータからMartなどのレイヤにロードする際に、必要なデータだけに絞り込むようにモデリングしていたとします。例えばautomateDV(dbtVault)を使用している場合、フレームワークの仕様に合わせてraw_stageレイヤにて、データを以降のレイヤで使用するデータだけに絞り込む必要があります
- dbtプロジェクトを日次で実行される場合はソースデータの抽出条件を変数で指定することにより冪等性を担保することが可能ですが、一定時間毎に実行する場合でソースデータから増分データをロードするような実装をしている場合、冪等性を担保することが困難になる場合があります
- そこでこの記事では、前述のようなシチュエーションで冪等性を担保する方法を考えます
想定読者
- dbtとは何かを理解している人
- データウェアハウスの構築・運用を担当している人
当記事で扱うデータセットについて
- 当記事ではTPC-Hというデータベースの処理性能に関する指標を集計する際に使用するデータモデルを使います。TPC-Hをソースデータとして分析基盤に取り込むものとします
- 「TPC-H」はデータベースの性能を測るものさしのようなもの 「TPC-H」とは、トランザクション処理性能評議会(Transaction Processing Performance Council:以下TPC)が運営するデータベースの処理性能の指標(ものさし)の1つです
- TPC-Hのデータモデルに関しては、こちらのURLを参照してください
- TPC-Hの中でもORDERSテーブルを使用して説明します。
- サンプルモデルはautomateDVのDemoプロジェクトを使用します
- automateDV-Demo
- レイヤ構造は次の通りです
- ソースデータ:ORDERS、raw_stage:raw_ordersに注目して、今回のソリューションを検討していきます
冪等性とは何か
- 冪等性(べきとうせい)とは、同じ操作を何度繰り返しても、同じ結果が得られるという性質です。冪等性がある操作は、1回操作した場合の結果と、2回以上操作した場合の結果は同じになります
- 加工パイプラインにおいて冪等性が担保されていれば、パイプラインの途中で障害が発生した場合でも、パイプラインの最初からやり直すことが出来るため運用負荷を下げることが出来ます
dbt incrementalを使用した冪等性の担保について
- 当節ではdbtを用いた冪等性について見ていきます
日次での加工パイプラインに関して
- 日次で加工パイプラインを実行しているとします
- ソースからのデータのロードに関して、スキャンコストを抑えるために増分の取込を行う必要があることとします
- 初回実行時は全件ロードし、二回目以降は変数で指定された日に注文されたデータをロードします
{{ config( materialized="incremental" ) }} ・・・ {%- if is_incremental() %} WHERE ORDERDATE = TO_DATE('{{ var('load_date') }}') {%- endif %}
- このモデルを何回繰り返してもソースデータの状態が変わらない限り、変数で渡された日付を元にソースデータを絞り込むことで実行結果は同一となり冪等性が担保されています
- 後述する「スキャンコストを抑えるためにソースデータから絞り込んだデータをロードし、そのデータを元に加工パイプラインを実施するようなアーキテクチャを採用している場合」でも、冪等性を担保することが可能です
- 次に、一日に複数回加工パイプラインを実施する必要がある場合についてみてみましょう
マイクロバッチでの加工パイプラインに関して
-
一定時間毎に加工パイプラインを実行しているとします
-
ソースからのデータのロードに関して、スキャンコストを抑えるために増分の取込を行う必要があることとします
-
増分データを抽出するために、ソースデータ側に更新日時(updated_at)項目を追加します
-
初回実行時は全件ロードし、二回目以降はまだロードされていないレコードを更新日時を元にソースデータから絞り込んで参照し、ロードします
{{ config( materialized="incremental" ) }} ・・・ {%- if is_incremental() %} WHERE ORDERDATE = TO_DATE('{{ var('load_date') }}') and updated_at > ( select max( updated_at ) from {{ this }} ) {%- endif %}
-
raw_stageレイヤのモデルを実行した際にソースデータの増分をraw_stageレイヤにロードしたあと、stageレイヤのモデルを実行する前に再度raw_stageのモデルを実行したとします。この場合、ソースデータには増分データが無いため、raw_stageレイヤには新しいデータは挿入されず、前回実行時のままとなります。よって、モデルの冪等性は保たれていると言えるでしょう
-
このようにIncrementalモデルを使用すれば一日に複数回のロードが必要であった場合でも対応は可能です
-
しかし、スキャンコストを抑えるためにソースデータから絞り込んだデータをロードし、そのデータを元に加工パイプラインを実施するようなアーキテクチャを採用している場合、Incrementalモデルを採用することは出来ません。この場合、以下の二つの方式が考えられます
- 案1:差分抽出のロジックをそのまま使用し、Incrementalモデルは使用せずに永続化方法にtableを使用する
{{ config( materialized="table" ) }} ・・・ {%- if is_incremental() %} WHERE ORDERDATE = TO_DATE('{{ var('load_date') }}') and updated_at > ( select max( updated_at ) from {{ this }} ) {%- endif %}
- この方法には以下の二つの問題点があります
- 冪等性が失われる。ソースデータに変化が無い状態でこのモデルを二回実行すると増分データが無いために、このモデルによって生成されるテーブルが空になってしまう。その結果、次に実行される際に初回実行と同等の状態となり、全件ロードされてしまう
- 加工パイプラインの起動の間にソースデータに増分が無かった場合、増分データがが無いために、このモデルによって生成されるテーブルが空になってしまう。その結果、次に実行される際に初回実行と同等の状態となり、全件ロードされてしまう
- この二つの問題は発生する原因は違いますが、同じ事象が引き起こされます
- この方法には以下の二つの問題点があります
- 案2:ソースデータの絞り込み範囲を変数で指定するようにする
- この方法の場合、日次で処理する場合に変数で絞り込み条件を指定する方法と同様に冪等性を担保することが可能です
- しかし、スケジューラとの連携にて変数を引き渡す部分の作り込みが必要になるなどdbt以外の対応も必要となるため、元々日毎のバッチしか基盤が対応していない状況で一日複数回の収集・加工処理が必要になった場合に対応までの時間を要する為、ビジネス側の要求するスピードに間に合わないリスクなどがありえます
{{ config( materialized="table" ) }} ・・・ {%- if is_incremental() %} WHERE ORDERDATE = PARSE_DATE("%Y-%m-%d", cast( {{ var('load_date') }} as string ) ) and PARSE_DATETIME('%Y-%m-%d %H:%M:%S', cast( {{ var('start_updated_at') }} as string ) ) <= updated_at and updated_at <= PARSE_DATETIME('%Y-%m-%d %H:%M:%S', cast( {{ var('end_updated_at') }} as string ) ) {%- endif %}
- そこで次節では、Incrementalモデルを使わない、且つ変数でソースデータの範囲を指定しない方法で、一日に複数回の加工パイプライン実施における冪等性担保について見ていきましょう
- 案1:差分抽出のロジックをそのまま使用し、Incrementalモデルは使用せずに永続化方法にtableを使用する
Incrementalモデルが使えない状況での冪等性担保に関する実装方法
- Incrementalモデルを使わずに、且つ変数によるソースデータの範囲を指定しない方法の手順は次の通りとなります
- raw_stageレイヤに未ロードのデータがソースデータに存在するかチェックします
1-1. raw_stageレイヤのテーブルから最大の更新日時を取得します
1-2. 1-1. で取得した更新日時より更新日時が大きいレコード数がソースデータに存在するか、チェックします - 1-2. で取得した件数が1件以上の場合(=未ロードのデータがソースデータに存在する場合)、ソースデータから未ロードのデータを抽出し、raw_stageのテーブルにCreate / Insertします
- 1-2. で取得した件数が0件の場合(=未ロードのデータがソースデータに存在しない場合)、raw_stageレイヤのテーブルから全レコードを抽出し、raw_stageのテーブルにCreate / Insertします(再作成します)
- raw_stageレイヤに未ロードのデータがソースデータに存在するかチェックします
- 手順のイメージはこちらです
- 実装イメージは以下の通りです
{{ config( materialized='table' ) }} -- 更新先から最終取込日時を取得 {%- set get_last_created_at_sql %} select max( updated_at ) from {{ this }} {%- endset %} {% set relation_exists = load_relation( this ) is not none %} {% if relation_exists %} {% set results = run_query( get_last_created_at_sql ) %} {% if execute %} {% set last_ymd = results.columns[0].values() | first %} {% else %} {% set last_ymd = none %} {% endif %} {% else %} {% set last_ymd = none %} {% endif %} -- 加工対象のレコード数の取得 {%- set get_count_sql %} select count( * ) from {{ source('tpch_sample', 'ORDERS') }} {%- if last_ymd is not none %} where updated_at > "{{ last_ymd }}" {% endif %} {%- endset %} {% set results = run_query( get_count_sql ) %} {% if execute %} {% set count = results.columns[0].values() | first %} {% else %} {% set count = 0 %} {% endif %} with source as ( -- ソースデータに差分があるか、初回実行の場合 {% if count > 0 or last_ymd is none %} select * from {{ source('tpch_sample', 'ORDERS') }} {%- if last_ymd is not none %} where updated_at > "{{ last_ymd }}" {% endif %} -- ソースデータに差分データがない場合 {% else %} select * from {{ this }} {% endif %} )
- このモデルでは、load_relationというマクロを使っています。dbtの場合、テーブルの生成も行う運用を行うことが多いので、初回実行時にはテーブルが無いので、テーブル自体の存在を確認する必要があります
前述のソリューションの問題点
- 前述のソリューションは、冪等性は担保できるものの本当に差分データが無い場合もraw_stageレイヤにデータが残ります。raw_stageレイヤから既にstageレイヤにロードされている場合、stageレイヤのモデルを再実行すると同じデータを再度ロードしようとします
- その対応として、Incrementalモデルで蓄積するレイヤのの手前のレイヤまで、増分データが無い状態でもデータを維持し、Incrementalモデルにて重複排除を行います。具体的には以下の方法が考えられます
- 案1:Data Vault、DWHやMartなど、過去分のデータをIncrementalで蓄積するレイヤの手前のレイヤまでViewで構成します。当記事のプロジェクト構成ではstageレイヤもViewを構成します(raw_vaultはincrementalで構成する)
- 案2:Data Vault、DWHやMartなど、過去分のデータをIncrementalで蓄積するレイヤの手前のレイヤまで、当記事のソリューションを適用します。当記事のプロジェクト構成ではraw_stageに対してソリューションを適用することを前提に説明してきましたが、stageレイヤにも適用します
- 案1 / 2ともに、Incrementalモデルに対してunique_keyを設定することで、重複データがロードされることを防ぎます。ただし、増分データに対してMergeをするモデルの場合はこの方式を採用することが出来ますが、在庫数などのように増分データを加算してUpdateを行う場合には適用できないことに注意してください
最後に
- 当記事は、少し特殊な状況下でのみ発生する課題かもしれません。前述の通り、本来はスケジューラとの連携方法を見直したほうが、Incrementalモデルに対してunique_keyを指定する必要が無くなり、Mergeを使わない分スキャンコストは下がる可能性はあります
- 一方で、スケジューラとの連携部分をシンプルにすることで、その分ほかの機能の開発にリソースを充てることが出来るかもしれませんし、基盤側の開発完了時期がビジネス側の要求スピードと合わないかもしれません
- コストに関して何を優先するかはそれぞれの会社ごとに違います。同じようなシチュエーションで困っている方の参考になれば幸いです