1
2

More than 1 year has passed since last update.

【データ基盤構築/dbt】増分更新をする際の、冪等性を担保する方法や便利な機能など

Last updated at Posted at 2023-02-22

今回の課題

dbtでテーブルの増分更新をする際、冪等性が担保されるように実装してみた。
作成したクエリや、その解説、
また、つまずいてしまった箇所を記録に残そうと思う。

※参考:【dbt小ネタ】ログの集計:incremental モデルの実運用

作成したクエリ

テーブルの増分更新をするためのモデルを作成してみた。

ドキュメントでは、unique_keyの指定方法は'-'で複合主キーを結合して指定する方法が推奨されていた。
しかし、上記のクエリのように配列で指定しないと、Cloumn name user_type is ambiguousというエラーが発生したので注意する。

/* materializedにて、増分更新の設定 */
/* unique_keyにて、mergeの時のkeyを設定 */
{{
    config(
        materialized='incremental',
        unique_key = ['date', 'user_type', 'page_path', 'page_title']
) }}

/* dateをdate型に変換 */
/* page_pathが「/contents/」で始まるURLに絞る */
/* page_pathのデータの揺れの調整 */
with ga_aggregation as (
    select
        date(date) as date --date型に変換
        , page_title
        , user_type
        , page_path
        , regexp_replace(normalize(page_path, NFKC), r'(index|&|\?|https).*|[^a-zA-Z0-9 -/:-@-~_]|[ +:\?#]', '') as page_path_fix --データの揺れを調整
        , pageviews
        , users
        , exits
        , entrances
        , bounces
        , sessions
        , view_id
    from
        `ソーステーブル名`
    where  --URLが「/contents/」で始まる、前日のデータに絞る。
        1=1
        and regexp_contains(page_path, r'^/contents/')
        {%- if is_incremental() %}
        and {%- set target_date = var('target_date', '') %}
            {%- if target_date != '' %}
            {{ dbt.date_trunc('day', 'date') }} = {{ dbt.date_trunc('day', "'"~target_date~"'") }}
            {%- else %}
            {{ dbt.date_trunc('day', 'date') }} = {{ dbt.date_trunc('day', dbt.current_timestamp())}}
            {%- endif %}
        {%- endif %}
)
--page_pathが「/」で終了するように調整をして、必要なカラムを抽出
select
    date
    , page_title
    , user_type
    , pageviews
    , users
    , exits
    , entrances
    , bounces
    , sessions
    , page_path
    , if(regexp_contains(page_path_fix, r'.*/$'), concat(page_path_fix,""), concat(page_path_fix, "/")) as page_path_fix --page_pathカラムがすべて「/」で終わるように調整
from
    ga_aggregation

クエリの解説

1)増分更新や冪等性担保のための設定

{{
    config(
        materialized='incremental',
        unique_key = ['date', 'user_type', 'page_path', 'page_title']
) }}
  • materialized='incremental'と設定することで、増分更新でテーブルを生成できる。
    1回目のモデル実行では、create or replace tableにコンパイルされ、全量洗い替え更新。
    2回目以降のモデル実行ではmergeにコンパイルされ、増分更新がされるというイメージ。

  • unique_keyを設定することで、冪等性が担保できるようになる。
    unique_keyに主キーを指定しておくことで、設定した主キーが存在しない場合はinsert、存在する場合そのデータはupdateで処理が走るようになるため。
    merge文のキーを指定するイメージ。)

2)増分更新(2回目以降のクエリの実行)の場合に機能する述語を作成

    where  --URLが「/contents/」で始まる、前日のデータに絞る。
        1=1
        and regexp_contains(page_path, r'^/contents/')
        {%- if is_incremental() %}
        and {%- set target_date = var('target_date', '') %}
            {%- if target_date != '' %}
            {{ dbt.date_trunc('day', 'date') }} = {{ dbt.date_trunc('day', "'"~target_date~"'") }}
            {%- else %}
            {{ dbt.date_trunc('day', 'date') }} = {{ dbt.date_trunc('day', dbt.current_timestamp())}}
            {%- endif %}
        {%- endif %}
  • {% if is_incremental() %}{% endif %}内の述語は、
    configの引数にてmaterialized='incremental'と設定されていて、
    モデルの実行が2回目以降の場合(増分更新として処理が走るとき)。

    という条件下で機能するようになる

  • {% set target_date = var('target_date', '') %}
    set・・・指定した変数に値を入れることができる。ここでは、左辺のtarget_dateに値を代入している。
    var()・・・デフォルトでは第一引数を返し、第一引数に値が定義されていなかった場合。第二引数を返す。
    上記のように記述すると、dbt build --select model名 --var'target_date: [日付]というコマンドを実行した際に、target_data変数に[日付]を代入したうえで、dbt buildの処理が走るようになる。

以上、dbtでの増分更新の際の冪等性の担保の方法や、
モデルを日付を遡って実行する方法などについて解説させていただきました。

おまけ

1)日時を指定し、過去に遡って増分更新を実施する

dbt build --select model名 --var'target_date: 2023-01-01と実行することで、
dbt内で下記のようにコンパイルされた。
※2023年1月1日に遡って、mergeステートメントでupsert処理が走り、増分更新が実施された。

merge into `更新先のテーブル名` as DBT_INTERNAL_DEST
using (

/* dateをdate型に変換 */
/* page_pathが「/contents/」で始まるURLに絞る */
/* page_pathのデータの揺れの調整 */
with ga_aggregation as (
    select
        date(date) as date --date型に変換
        , page_title
        , user_type
        , page_path
        , regexp_replace(normalize(page_path, NFKC), r'(index|&|\?|https).*|[^a-zA-Z0-9 -/:-@-~_]|[ +:\?#]', '') as page_path_fix --データの揺れを調整
        , pageviews
        , users
        , exits
        , entrances
        , bounces
        , sessions
        , view_id
    from
        `データソース名`
    where  --URLが「/contents/」で始まる、前日のデータに絞る。
        1=1
        and regexp_contains(page_path, r'^/contents/')
        and
            timestamp_trunc(
        cast(date as timestamp),
        day
    ) = timestamp_trunc(
        cast('2023-01-01' as timestamp),
        day
    )
)
--page_pathが「/」で終了するように調整をして、必要なカラムを抽出
select
    date
    , page_title
    , user_type
    , pageviews
    , users
    , exits
    , entrances
    , bounces
    , sessions
    , page_path
    , if(regexp_contains(page_path_fix, r'.*/$'), concat(page_path_fix,""), concat(page_path_fix, "/")) as page_path_fix --page_pathカラムがすべて「/」で終わるように調整
from
    ga_aggregation
        ) as DBT_INTERNAL_SOURCE
        on (
                    DBT_INTERNAL_SOURCE.date = DBT_INTERNAL_DEST.date
                ) and (
                    DBT_INTERNAL_SOURCE.user_type = DBT_INTERNAL_DEST.user_type
                ) and (
                    DBT_INTERNAL_SOURCE.page_path = DBT_INTERNAL_DEST.page_path
                ) and (
                    DBT_INTERNAL_SOURCE.page_title = DBT_INTERNAL_DEST.page_title
                )

    
    when matched then update set
        `date` = DBT_INTERNAL_SOURCE.`date`,`page_title` = DBT_INTERNAL_SOURCE.`page_title`,`user_type` = DBT_INTERNAL_SOURCE.`user_type`,`pageviews` = DBT_INTERNAL_SOURCE.`pageviews`,`users` = DBT_INTERNAL_SOURCE.`users`,`exits` = DBT_INTERNAL_SOURCE.`exits`,`entrances` = DBT_INTERNAL_SOURCE.`entrances`,`bounces` = DBT_INTERNAL_SOURCE.`bounces`,`sessions` = DBT_INTERNAL_SOURCE.`sessions`,`page_path` = DBT_INTERNAL_SOURCE.`page_path`,`page_path_fix` = DBT_INTERNAL_SOURCE.`page_path_fix`
    

    when not matched then insert
        (`date`, `page_title`, `user_type`, `pageviews`, `users`, `exits`, `entrances`, `bounces`, `sessions`, `page_path`, `page_path_fix`)
    values
        (`date`, `page_title`, `user_type`, `pageviews`, `users`, `exits`, `entrances`, `bounces`, `sessions`, `page_path`, `page_path_fix`)

ちなみに変数に値を代入する方法としては、下記の2つが存在する。

  • dbt_project.ymlのvarsに値を設定しておくパターン
  • dbt run --vars [定義したい変数]というかたちで処理を走らせるパターン
    ※参考:dbt入門-変数の参照方法

2)モデルとソースデータとの関係をリセットする。

dbtの公式ドキュメントのbest practiceにて、テーブルを作り直す方法が書かれていた。
dbt build --full-refresh -s [モデル名]というように、--full-refreshフラグを指定してモデルを実行する。
こうすることによって、テーブルを作り直すことができるので、incrementalで増分更新する過程で欠損してしまっているデータを補完する場合などに便利。
※公式ドキュメントにて、定期的に実施することが推奨されていた。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2