今回の課題
dbtでテーブルの増分更新をする際、冪等性が担保されるように実装してみた。
作成したクエリや、その解説、
また、つまずいてしまった箇所を記録に残そうと思う。
※参考:【dbt小ネタ】ログの集計:incremental モデルの実運用
作成したクエリ
テーブルの増分更新をするためのモデルを作成してみた。
※ドキュメントでは、unique_key
の指定方法は'-'
で複合主キーを結合して指定する方法が推奨されていた。
しかし、上記のクエリのように配列で指定しないと、Cloumn name user_type is ambiguous
というエラーが発生したので注意する。
/* materializedにて、増分更新の設定 */
/* unique_keyにて、mergeの時のkeyを設定 */
{{
config(
materialized='incremental',
unique_key = ['date', 'user_type', 'page_path', 'page_title']
) }}
/* dateをdate型に変換 */
/* page_pathが「/contents/」で始まるURLに絞る */
/* page_pathのデータの揺れの調整 */
with ga_aggregation as (
select
date(date) as date --date型に変換
, page_title
, user_type
, page_path
, regexp_replace(normalize(page_path, NFKC), r'(index|&|\?|https).*|[^a-zA-Z0-9 -/:-@-~_]|[ +:\?#]', '') as page_path_fix --データの揺れを調整
, pageviews
, users
, exits
, entrances
, bounces
, sessions
, view_id
from
`ソーステーブル名`
where --URLが「/contents/」で始まる、前日のデータに絞る。
1=1
and regexp_contains(page_path, r'^/contents/')
{%- if is_incremental() %}
and {%- set target_date = var('target_date', '') %}
{%- if target_date != '' %}
{{ dbt.date_trunc('day', 'date') }} = {{ dbt.date_trunc('day', "'"~target_date~"'") }}
{%- else %}
{{ dbt.date_trunc('day', 'date') }} = {{ dbt.date_trunc('day', dbt.current_timestamp())}}
{%- endif %}
{%- endif %}
)
--page_pathが「/」で終了するように調整をして、必要なカラムを抽出
select
date
, page_title
, user_type
, pageviews
, users
, exits
, entrances
, bounces
, sessions
, page_path
, if(regexp_contains(page_path_fix, r'.*/$'), concat(page_path_fix,""), concat(page_path_fix, "/")) as page_path_fix --page_pathカラムがすべて「/」で終わるように調整
from
ga_aggregation
クエリの解説
1)増分更新や冪等性担保のための設定
{{
config(
materialized='incremental',
unique_key = ['date', 'user_type', 'page_path', 'page_title']
) }}
-
materialized='incremental'
と設定することで、増分更新でテーブルを生成できる。
1回目のモデル実行では、create or replace table
にコンパイルされ、全量洗い替え更新。
2回目以降のモデル実行ではmerge
にコンパイルされ、増分更新がされるというイメージ。 -
unique_key
を設定することで、冪等性が担保できるようになる。
unique_key
に主キーを指定しておくことで、設定した主キーが存在しない場合はinsert、存在する場合そのデータはupdateで処理が走るようになるため。
(merge
文のキーを指定するイメージ。)
2)増分更新(2回目以降のクエリの実行)の場合に機能する述語を作成
where --URLが「/contents/」で始まる、前日のデータに絞る。
1=1
and regexp_contains(page_path, r'^/contents/')
{%- if is_incremental() %}
and {%- set target_date = var('target_date', '') %}
{%- if target_date != '' %}
{{ dbt.date_trunc('day', 'date') }} = {{ dbt.date_trunc('day', "'"~target_date~"'") }}
{%- else %}
{{ dbt.date_trunc('day', 'date') }} = {{ dbt.date_trunc('day', dbt.current_timestamp())}}
{%- endif %}
{%- endif %}
-
{% if is_incremental() %}{% endif %}
内の述語は、
configの引数にてmaterialized='incremental'
と設定されていて、
モデルの実行が2回目以降の場合(増分更新として処理が走るとき)。
という条件下で機能するようになる -
{% set target_date = var('target_date', '') %}
└set
・・・指定した変数に値を入れることができる。ここでは、左辺のtarget_date
に値を代入している。
└var()
・・・デフォルトでは第一引数を返し、第一引数に値が定義されていなかった場合。第二引数を返す。
上記のように記述すると、dbt build --select model名 --var'target_date: [日付]
というコマンドを実行した際に、target_data
変数に[日付]
を代入したうえで、dbt build
の処理が走るようになる。
以上、dbtでの増分更新の際の冪等性の担保の方法や、
モデルを日付を遡って実行する方法などについて解説させていただきました。
おまけ
1)日時を指定し、過去に遡って増分更新を実施する
dbt build --select model名 --var'target_date: 2023-01-01
と実行することで、
dbt内で下記のようにコンパイルされた。
※2023年1月1日に遡って、merge
ステートメントでupsert処理が走り、増分更新が実施された。
merge into `更新先のテーブル名` as DBT_INTERNAL_DEST
using (
/* dateをdate型に変換 */
/* page_pathが「/contents/」で始まるURLに絞る */
/* page_pathのデータの揺れの調整 */
with ga_aggregation as (
select
date(date) as date --date型に変換
, page_title
, user_type
, page_path
, regexp_replace(normalize(page_path, NFKC), r'(index|&|\?|https).*|[^a-zA-Z0-9 -/:-@-~_]|[ +:\?#]', '') as page_path_fix --データの揺れを調整
, pageviews
, users
, exits
, entrances
, bounces
, sessions
, view_id
from
`データソース名`
where --URLが「/contents/」で始まる、前日のデータに絞る。
1=1
and regexp_contains(page_path, r'^/contents/')
and
timestamp_trunc(
cast(date as timestamp),
day
) = timestamp_trunc(
cast('2023-01-01' as timestamp),
day
)
)
--page_pathが「/」で終了するように調整をして、必要なカラムを抽出
select
date
, page_title
, user_type
, pageviews
, users
, exits
, entrances
, bounces
, sessions
, page_path
, if(regexp_contains(page_path_fix, r'.*/$'), concat(page_path_fix,""), concat(page_path_fix, "/")) as page_path_fix --page_pathカラムがすべて「/」で終わるように調整
from
ga_aggregation
) as DBT_INTERNAL_SOURCE
on (
DBT_INTERNAL_SOURCE.date = DBT_INTERNAL_DEST.date
) and (
DBT_INTERNAL_SOURCE.user_type = DBT_INTERNAL_DEST.user_type
) and (
DBT_INTERNAL_SOURCE.page_path = DBT_INTERNAL_DEST.page_path
) and (
DBT_INTERNAL_SOURCE.page_title = DBT_INTERNAL_DEST.page_title
)
when matched then update set
`date` = DBT_INTERNAL_SOURCE.`date`,`page_title` = DBT_INTERNAL_SOURCE.`page_title`,`user_type` = DBT_INTERNAL_SOURCE.`user_type`,`pageviews` = DBT_INTERNAL_SOURCE.`pageviews`,`users` = DBT_INTERNAL_SOURCE.`users`,`exits` = DBT_INTERNAL_SOURCE.`exits`,`entrances` = DBT_INTERNAL_SOURCE.`entrances`,`bounces` = DBT_INTERNAL_SOURCE.`bounces`,`sessions` = DBT_INTERNAL_SOURCE.`sessions`,`page_path` = DBT_INTERNAL_SOURCE.`page_path`,`page_path_fix` = DBT_INTERNAL_SOURCE.`page_path_fix`
when not matched then insert
(`date`, `page_title`, `user_type`, `pageviews`, `users`, `exits`, `entrances`, `bounces`, `sessions`, `page_path`, `page_path_fix`)
values
(`date`, `page_title`, `user_type`, `pageviews`, `users`, `exits`, `entrances`, `bounces`, `sessions`, `page_path`, `page_path_fix`)
ちなみに変数に値を代入する方法としては、下記の2つが存在する。
-
dbt_project.yml
のvarsに値を設定しておくパターン -
dbt run --vars [定義したい変数]
というかたちで処理を走らせるパターン
※参考:dbt入門-変数の参照方法
2)モデルとソースデータとの関係をリセットする。
dbtの公式ドキュメントのbest practiceにて、テーブルを作り直す方法が書かれていた。
dbt build --full-refresh -s [モデル名]
というように、--full-refresh
フラグを指定してモデルを実行する。
こうすることによって、テーブルを作り直すことができるので、incrementalで増分更新する過程で欠損してしまっているデータを補完する場合などに便利。
※公式ドキュメントにて、定期的に実施することが推奨されていた。