dbt では、Model configurations を使い分けることで、dbt run 実行時の Model の更新内容(SQLの内容)を変更することができる。
具体的には、設定内容に沿ってクエリをコンパイルし、作成されたクエリを実行することで Model を更新する。
そこで、この Model configuration ごとに、どのようなクエリが作成され実行されるのかを見ていくことで、各設定における更新内容を理解していく。
なお、使用する data platform は BigQuery
に限定し、dbt は dbt Core の v1.7 を使用する。
公式ドキュメント
前準備
最低限 dbt で BigQuery を操作するための準備をしていく。
ざっくり書くので、詳細な手順は公式の Quickstart を参照。
一点、今回 dbt は pipenv
を使ってインストールするため、dbt コマンド実行時、本記事では以下のように実行している箇所については、
pipenv run dbt run ~~~
公式ドキュメント内の以下のコマンドと同義となる。
dbt run ~~~
dbt-core, dbt-bigquery をインストール
pipenv
環境を作成して dbt-core
, dbt-bigquery
をインストール
# pipenv環境作成(Pythonバージョンは3.11とする)
pipenv --python 3.11
# dbt-core, dbt-bigquery をインストール
pipenv install dbt-core dbt-bigquery
dbt プロジェクト作成
dbt プロジェクトを作成(プロジェクト名は blog_dbt
とした)
pipenv run dbt init blog_dbt
こちらのコマンド実行後、インラインで設定値に関する質問が出てくるので適宜答えていく。
今回は以下のように設定。
-
Which database would you like to use?
→bigquery
-
Desired authentication method option
→oauth
-
project (GCP project id)
→ 普段自分が使っている GCP プロジェクト -
dataset (the name of your dbt dataset)
→dbt_sample
-
threads (1 or more)
→ 8 -
Desired location option
→US
materialized='table' だけを指定したモデルのクエリ
前準備が終わったので、本題に入っていく。
まずは、materialized='table'
のみを、config
に設定したシンブルなモデルについて見ていく。
モデル名は replace
とし、models/replace.sql
を以下のように作成する。
{{
config(
materialized="table",
)
}}
select 1 as col1, 2 as col2
dbt run
を実行してモデルを作成する。
pipenv run dbt run --select "replace"
BigQuery 上にテーブル dbt_sample.replace
が作成された。
実際に実行されたクエリを見るには、BigQuery のコンソール上のプロジェクト履歴を見たり、JOBS ビュー を確認する方法もあるが、今回は dbt によるクエリのコンパイル結果を見ていく。
dbt によって実際に実行されたクエリの内容は target/run
ディレクトリ配下に置かれている。
今回の場合、target/run/blog_dbt/models/replace.sql
の中にクエリが保管されており、中身は以下。
- ※ プロジェクト名の秘匿化と見やすいようにコード整形をしている
create or replace table `${プロジェクト名}`.`dbt_sample`.`replace`
OPTIONS()
as (
select 1 as col1, 2 as col2
);
つまり、モデルの集計定義の内容で、dbt_sample.replace
を create or replace table
で置換している。
materialized="incremental" のモデルのクエリ
dbt の materializationsには、table
以外にも incremental というモデルが存在する。
これは、モデル更新の初回実行時は materialized="table"
と同じ処理内容になるが、2回目以降の更新では、既存テーブルの置換ではなく、積み上げが行われる。
積み上げによる更新方法は materialized
以外の設定内容によって変わるので1つずつ見ていく。
materialized="incremental" だけを指定している場合
まずは、materialized="incremental"
だけを指定したら実行クエリがどうなるかを見ていく。
モデル名は incremental
とし、models/incremental.sql
を以下のように作成する。
{{
config(
materialized="incremental",
)
}}
select
1 AS col1,
2 AS col2,
union all
select
cast(rand() * 100 as INT64) as col1,
cast(rand() * 100 as INT64) as col2,
以下のクエリを2回実行していく。
pipenv run dbt run --select "incremental"
1回目の実行後、実際に実行されたクエリを確認。
create or replace table `${プロジェクト名}`.`dbt_sample`.`incremental`
OPTIONS()
as (
select
1 AS col1,
2 AS col2,
union all
select
cast(rand() * 100 as INT64) as col1,
cast(rand() * 100 as INT64) as col2,
);
1回目に関しては、モデル replace
と同じように、新たに dbt_sample.incremental
を作成するクエリが実行される。
そして2回目の実行後、実際に実行されたクエリを確認。
merge into `${プロジェクト名}`.`dbt_sample`.`incremental` as DBT_INTERNAL_DEST
using (
select
1 AS col1,
2 AS col2,
union all
select
cast(rand() * 100 as INT64) as col1,
cast(rand() * 100 as INT64) as col2,
) as DBT_INTERNAL_SOURCE
on (FALSE)
when not matched then insert
(`col1`, `col2`)
values
(`col1`, `col2`)
MERGE 文 が実行されている。
MERGE 文
とは、既存テーブルとクエリ実行結果(using
句の中身)を比較し、on
句内に記述した条件で一致したレコードについて、クエリ結果の内容で既存テーブルのレコードを更新する。
しかし、今回は on (FALSE)
となっているため、既存テーブルで更新・削除されるレコードはないということになる。
つまり、既存テーブルにクエリ結果を追加するだけなので、上記 SQL は以下と同義となる。
insert into `${プロジェクト名}`.`dbt_sample`.`incremental`
select
1 AS col1,
2 AS col2,
union all
select
cast(rand() * 100 as INT64) as col1,
cast(rand() * 100 as INT64) as col2,
つまり、materialized="incremental"
だけを指定している場合、dbt run
を実行するたびにクエリ結果がただ積み上がっていくので、モデルの冪等性が担保されない。
障害発生時などに備えて dbt run
の再実行を容易にしたい場合、この後紹介する unique_key
や insert_overwrite
を設定することをオススメする。
※ pipenv run dbt run --select "incremental"
を4回実行した時の dbt_sample.incremental
の中身
- どんどん積み上がってしまっている
unique_key を設定している場合
続いて、materialized="incremental"
以外に unique_key を設定した場合について見ていく。
モデル名は incremental_unique_key
とし、models/incremental_unique_key.sql
を以下のように作成する。
{{
config(
materialized="incremental",
unique_key=["col1"],
)
}}
select
1 AS col1,
2 AS col2
union all
select
cast(rand() * 100 as INT64) as col1,
cast(rand() * 100 as INT64) as col2
dbt run
を実行してモデルを作成。
初回実行時のクエリについては、モデル replace
, incremental
と同じなので割愛する。
そのため、いきなり dbt run
コマンドを2回実行する。
# 2回実行する
pipenv run dbt run --select "incremental_unique_key"
pipenv run dbt run --select "incremental_unique_key"
実際に実行されたクエリを確認。
merge into `${プロジェクト名}`.`dbt_sample`.`incremental_unique_key` as DBT_INTERNAL_DEST
using (
select
1 AS col1,
2 AS col2,
union all
select
cast(rand() * 100 as INT64) as col1,
cast(rand() * 100 as INT64) as col2,
) as DBT_INTERNAL_SOURCE
on (
DBT_INTERNAL_SOURCE.col1 = DBT_INTERNAL_DEST.col1
)
when matched then update set
`col1` = DBT_INTERNAL_SOURCE.`col1`,`col2` = DBT_INTERNAL_SOURCE.`col2`
when not matched then insert
(`col1`, `col2`)
values
(`col1`, `col2`)
MERGE
文が実行され、既存テーブルとクエリ結果との間で、ユニークキー( col1
)が同じレコードはクエリ結果で既存テーブルを更新し、それ以外のクエリ結果はインサートされている。
テーブルの中身を見てみる。
クエリ結果はレコードを2件持つが、2回 dbt run
した後のレコードは3件となっている。
これは、col1
が1のレコードに関しては、インサートではなく更新がかかったため。
incremental_strategy="insert_overwrite" を設定している場合
incremental
のモデルには、incremental_strategy という設定項目が存在する。
これは、更新対象となるレコードを決める際のロジックの設定値であり、モデルが BigQuery テーブルの場合、以下2つのどちらから選択する。
merge
insert_overwrite
デフォルトは merge
となるため、先ほどの「unique_key を設定している場合」での、モデル incremental_unique_key
は incremental_strategy="merge"
だったことになる。
insert_overwrite
を設定した場合は partition_by
の設定が必須となるので注意。
では、incremental_strategy="insert_overwrite"
を設定した場合の実行クエリを見ていく。
モデル名は insert_overwrite
とし、models/insert_overwrite.sql
を以下のように作成する。
{{
config(
materialized="incremental",
partition_by={
"field": "ymd",
"data_type": "DATE",
"granularity": "day",
},
incremental_strategy="insert_overwrite",
)
}}
select date('2024-06-10') as ymd, 1 as col1
union all
select date('2024-06-09') as ymd, 2 as col1
union all
select date('2024-06-08') - cast(rand() * 100 as INT64) as ymd, 3 as col1
union all
select date('2024-06-08') - cast(rand() * 100 as INT64) as ymd, 4 as col1
以下のクエリを2回実行していく。
pipenv run dbt run --select "insert_overwrite"
1回目の実行後、実際に実行されたクエリを確認。
create or replace table `${プロジェクト名}`.`dbt_sample`.`insert_overwrite`
partition by ymd
OPTIONS()
as (
select date('2024-06-10') as ymd, 1 as col1
union all
select date('2024-06-09') as ymd, 2 as col1
union all
select date('2024-06-08') - cast(rand() * 100 as INT64) as ymd, 3 as col1
union all
select date('2024-06-08') - cast(rand() * 100 as INT64) as ymd, 4 as col1
);
1回目に関しては、モデル replace
や incremental
と同じように、新たに dbt_sample.insert_overwrite
を作成しているが、partition by ymd
によってパーティションを切っている点が異なる。
そして2回目の実行後、実際に実行されたクエリを確認。
-- generated script to merge partitions into `${プロジェクト名}`.`dbt_sample`.`insert_overwrite`
declare dbt_partitions_for_replacement array<date>;
-- 1. create a temp table with model data
create or replace table `${プロジェクト名}`.`dbt_sample`.`insert_overwrite__dbt_tmp`
partition by ymd
OPTIONS(
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
select date('2024-06-10') as ymd, 1 as col1
union all
select date('2024-06-09') as ymd, 2 as col1
union all
select date('2024-06-08') - cast(rand() * 100 as INT64) as ymd, 3 as col1
union all
select date('2024-06-08') - cast(rand() * 100 as INT64) as ymd, 4 as col1
);
-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
-- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
array_agg(distinct date(ymd) IGNORE NULLS)
from `${プロジェクト名}`.`dbt_sample`.`insert_overwrite__dbt_tmp`
);
-- 3. run the merge statement
merge into `${プロジェクト名}`.`dbt_sample`.`insert_overwrite` as DBT_INTERNAL_DEST
using (
select
* from `${プロジェクト名}`.`dbt_sample`.`insert_overwrite__dbt_tmp`
) as DBT_INTERNAL_SOURCE
on FALSE
when not matched by source
and date(DBT_INTERNAL_DEST.ymd) in unnest(dbt_partitions_for_replacement)
then delete
when not matched then insert
(`ymd`, `col1`)
values
(`ymd`, `col1`)
;
-- 4. clean up the temp table
drop table if exists `${プロジェクト名}`.`dbt_sample`.`insert_overwrite__dbt_tmp`
4つのクエリが実行されていることがわかる。
dbt 側がよしなにコメントで説明をつけてくれているのでそれに沿って見ていく。
1. create a temp table with model data
クエリ結果からまず tmp テーブルを作成(${モデル名}__dbt_tmp
となる模様)
2. define partitions to update
tmp テーブル内のパーティションキー(ymd
カラム)の値を DISTICNT
して取得。
3. run the merge statement
MERGE
文を実行し、既存テーブルと2で取得したパーティションキーとの間で、同じパーティションキーの値を持つ既存テーブルのレコードはクエリ結果で更新し、それ以外のクエリ結果はインサートする。
4. clean up the temp table
tmp テーブルを削除。
テーブルの中身を見てみる。
クエリ結果はレコードを4件持つが、2回 dbt run
した後のレコードは6件となっている。
これは、ymd
が 2024-06-10
, 2024-06-09
の2件のレコードに関しては、インサートではなく更新がかかったため。
まとめ
これまで紹介した、4パターンの Model configurations によって、実際に実行される BigQuery のクエリを以下にまとめる。
留意点:
- テーブルはすでに存在している状態での2回目以降の
dbt run
実行時のクエリを記載する - 更新対象の BigQuery テーブルを
${dbtモデル}
と記載する -
models
ディレクトリ配下の SQL ファイルに書かれているクエリを${集計クエリ}
と記載する
materialized='table' だけを指定
create or replace table `${dbtモデル}`
OPTIONS()
as (
${集計クエリ}
);
materialized="incremental" だけを指定
merge into `${dbtモデル}` as DBT_INTERNAL_DEST
using (
${集計クエリ}
) as DBT_INTERNAL_SOURCE
on (FALSE)
when not matched then insert
(${全カラム})
values
(${全カラム})
materialized="incremental" + unique_key
merge into `${dbtモデル}` as DBT_INTERNAL_DEST
using (
${集計クエリ}
) as DBT_INTERNAL_SOURCE
on (
DBT_INTERNAL_SOURCE.${ユニークキー} = DBT_INTERNAL_DEST.${ユニークキー}
)
when not matched then insert
(${全カラム})
values
(${全カラム})
materialized="incremental" + incremental_strategy="insert_overwrite"
-- generated script to merge partitions into ${dbtモデル}
declare dbt_partitions_for_replacement array<date>;
-- 1. create a temp table with model data
create or replace table `${dbtモデル}__dbt_tmp`
partition by ${パーティションキー}
OPTIONS(
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
${集計クエリ}
);
-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
-- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
array_agg(distinct date(${パーティションキー}) IGNORE NULLS)
from `${dbtモデル}__dbt_tmp`
);
-- 3. run the merge statement
merge into `${dbtモデル}` as DBT_INTERNAL_DEST
using (
select
* from `${dbtモデル}__dbt_tmp`
) as DBT_INTERNAL_SOURCE
on FALSE
when not matched by source
and date(DBT_INTERNAL_DEST.${パーティションキー}) in unnest(dbt_partitions_for_replacement)
then delete
when not matched then insert
(${全カラム})
values
(${全カラム})
;
-- 4. clean up the temp table
drop table if exists `${dbtモデル}__dbt_tmp`