5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ZOZOAdvent Calendar 2024

Day 2

dbt が実行している BigQuery のクエリを見ていく

Last updated at Posted at 2024-12-01

dbt では、Model configurations を使い分けることで、dbt run 実行時の Model の更新内容(SQLの内容)を変更することができる。
具体的には、設定内容に沿ってクエリをコンパイルし、作成されたクエリを実行することで Model を更新する。
そこで、この Model configuration ごとに、どのようなクエリが作成され実行されるのかを見ていくことで、各設定における更新内容を理解していく。
なお、使用する data platform は BigQuery に限定し、dbt は dbt Core の v1.7 を使用する。

公式ドキュメント

前準備

最低限 dbt で BigQuery を操作するための準備をしていく。
ざっくり書くので、詳細な手順は公式の Quickstart を参照。
一点、今回 dbt は pipenv を使ってインストールするため、dbt コマンド実行時、本記事では以下のように実行している箇所については、

pipenv run dbt run ~~~

公式ドキュメント内の以下のコマンドと同義となる。

dbt run ~~~

dbt-core, dbt-bigquery をインストール

pipenv 環境を作成して dbt-core, dbt-bigquery をインストール

# pipenv環境作成(Pythonバージョンは3.11とする)
pipenv --python 3.11
# dbt-core, dbt-bigquery をインストール
pipenv install dbt-core dbt-bigquery

dbt プロジェクト作成

dbt プロジェクトを作成(プロジェクト名は blog_dbt とした)

pipenv run dbt init blog_dbt

こちらのコマンド実行後、インラインで設定値に関する質問が出てくるので適宜答えていく。
今回は以下のように設定。

  • Which database would you like to use?bigquery
  • Desired authentication method optionoauth
  • project (GCP project id) → 普段自分が使っている GCP プロジェクト
  • dataset (the name of your dbt dataset)dbt_sample
  • threads (1 or more) → 8
  • Desired location optionUS

materialized='table' だけを指定したモデルのクエリ

前準備が終わったので、本題に入っていく。
まずは、materialized='table' のみを、config に設定したシンブルなモデルについて見ていく。
モデル名は replace とし、models/replace.sql を以下のように作成する。

replace.sql
{{
    config(
        materialized="table",
    )
}}
select 1 as col1, 2 as col2

dbt run を実行してモデルを作成する。

pipenv run dbt run --select "replace"

BigQuery 上にテーブル dbt_sample.replace が作成された。

スクリーンショット 2024-06-17 16.39.18.png

実際に実行されたクエリを見るには、BigQuery のコンソール上のプロジェクト履歴を見たり、JOBS ビュー を確認する方法もあるが、今回は dbt によるクエリのコンパイル結果を見ていく。
dbt によって実際に実行されたクエリの内容は target/run ディレクトリ配下に置かれている。
今回の場合、target/run/blog_dbt/models/replace.sql の中にクエリが保管されており、中身は以下。

  • ※ プロジェクト名の秘匿化と見やすいようにコード整形をしている
target/run/blog_dbt/models/replace.sql
create or replace table `${プロジェクト名}`.`dbt_sample`.`replace`
OPTIONS()
as (
  select 1 as col1, 2 as col2
);

つまり、モデルの集計定義の内容で、dbt_sample.replacecreate or replace table で置換している。

materialized="incremental" のモデルのクエリ

dbt の materializationsには、table 以外にも incremental というモデルが存在する。
これは、モデル更新の初回実行時は materialized="table" と同じ処理内容になるが、2回目以降の更新では、既存テーブルの置換ではなく、積み上げが行われる。
積み上げによる更新方法は materialized 以外の設定内容によって変わるので1つずつ見ていく。

materialized="incremental" だけを指定している場合

まずは、materialized="incremental" だけを指定したら実行クエリがどうなるかを見ていく。
モデル名は incremental とし、models/incremental.sql を以下のように作成する。

increnebtal.sql
{{
    config(
        materialized="incremental",
    )
}}

select
  1 AS col1,
  2 AS col2,
union all
select
  cast(rand() * 100 as INT64) as col1,
  cast(rand() * 100 as INT64) as col2,

以下のクエリを2回実行していく。

pipenv run dbt run --select "incremental"

1回目の実行後、実際に実行されたクエリを確認。

target/run/blog_dbt/models/incremental.sql
create or replace table `${プロジェクト名}`.`dbt_sample`.`incremental`
OPTIONS()
as (
  select
    1 AS col1,
    2 AS col2,
  union all
  select
    cast(rand() * 100 as INT64) as col1,
    cast(rand() * 100 as INT64) as col2,
);

1回目に関しては、モデル replace と同じように、新たに dbt_sample.incremental を作成するクエリが実行される。
そして2回目の実行後、実際に実行されたクエリを確認。

target/run/blog_dbt/models/incremental.sql
merge into `${プロジェクト名}`.`dbt_sample`.`incremental` as DBT_INTERNAL_DEST
using (
  select
    1 AS col1,
    2 AS col2,
  union all
  select
    cast(rand() * 100 as INT64) as col1,
    cast(rand() * 100 as INT64) as col2,
) as DBT_INTERNAL_SOURCE
on (FALSE)
when not matched then insert
  (`col1`, `col2`)
values
  (`col1`, `col2`)

MERGE 文 が実行されている。
MERGE 文 とは、既存テーブルとクエリ実行結果(using 句の中身)を比較し、on 句内に記述した条件で一致したレコードについて、クエリ結果の内容で既存テーブルのレコードを更新する。
しかし、今回は on (FALSE) となっているため、既存テーブルで更新・削除されるレコードはないということになる。
つまり、既存テーブルにクエリ結果を追加するだけなので、上記 SQL は以下と同義となる。

target/run/blog_dbt/models/incremental.sqlと同義のクエリ
insert into `${プロジェクト名}`.`dbt_sample`.`incremental`
select
  1 AS col1,
  2 AS col2,
union all
select
  cast(rand() * 100 as INT64) as col1,
  cast(rand() * 100 as INT64) as col2,

つまり、materialized="incremental" だけを指定している場合、dbt run を実行するたびにクエリ結果がただ積み上がっていくので、モデルの冪等性が担保されない
障害発生時などに備えて dbt run の再実行を容易にしたい場合、この後紹介する unique_keyinsert_overwrite を設定することをオススメする。

pipenv run dbt run --select "incremental"を4回実行した時の dbt_sample.incremental の中身

  • どんどん積み上がってしまっている

スクリーンショット 2024-06-17 20.01.19.png

unique_key を設定している場合

続いて、materialized="incremental" 以外に unique_key を設定した場合について見ていく。

モデル名は incremental_unique_key とし、models/incremental_unique_key.sql を以下のように作成する。

incremental_unique_key.sql
{{
    config(
        materialized="incremental",
        unique_key=["col1"],
    )
}}
select
  1 AS col1,
  2 AS col2
union all
select
  cast(rand() * 100 as INT64) as col1,
  cast(rand() * 100 as INT64) as col2

dbt run を実行してモデルを作成。
初回実行時のクエリについては、モデル replace, incremental と同じなので割愛する。
そのため、いきなり dbt run コマンドを2回実行する。

# 2回実行する
pipenv run dbt run --select "incremental_unique_key"
pipenv run dbt run --select "incremental_unique_key"

実際に実行されたクエリを確認。

target/run/blog_dbt/models/incremental_unique_key.sql
merge into `${プロジェクト名}`.`dbt_sample`.`incremental_unique_key` as DBT_INTERNAL_DEST
using (
  select
    1 AS col1,
    2 AS col2,
  union all
  select
    cast(rand() * 100 as INT64) as col1,
    cast(rand() * 100 as INT64) as col2,
) as DBT_INTERNAL_SOURCE
on (
  DBT_INTERNAL_SOURCE.col1 = DBT_INTERNAL_DEST.col1
)
when matched then update set
  `col1` = DBT_INTERNAL_SOURCE.`col1`,`col2` = DBT_INTERNAL_SOURCE.`col2`
when not matched then insert
  (`col1`, `col2`)
values
  (`col1`, `col2`)

MERGE 文が実行され、既存テーブルとクエリ結果との間で、ユニークキー( col1 )が同じレコードはクエリ結果で既存テーブルを更新し、それ以外のクエリ結果はインサートされている。
テーブルの中身を見てみる。
クエリ結果はレコードを2件持つが、2回 dbt run した後のレコードは3件となっている。

スクリーンショット 2024-06-17 17.26.04.png

これは、col1 が1のレコードに関しては、インサートではなく更新がかかったため。

incremental_strategy="insert_overwrite" を設定している場合

incremental のモデルには、incremental_strategy という設定項目が存在する。
これは、更新対象となるレコードを決める際のロジックの設定値であり、モデルが BigQuery テーブルの場合、以下2つのどちらから選択する。

  • merge
  • insert_overwrite

デフォルトは merge となるため、先ほどの「unique_key を設定している場合」での、モデル incremental_unique_keyincremental_strategy="merge" だったことになる。

insert_overwrite を設定した場合は partition_by の設定が必須となるので注意

では、incremental_strategy="insert_overwrite" を設定した場合の実行クエリを見ていく。
モデル名は insert_overwrite とし、models/insert_overwrite.sql を以下のように作成する。

insert_overwrite.sql
{{
    config(
        materialized="incremental",
        partition_by={
          "field": "ymd",
          "data_type": "DATE",
          "granularity": "day",
        },
        incremental_strategy="insert_overwrite",
    )
}}
select date('2024-06-10') as ymd, 1 as col1
union all
select date('2024-06-09') as ymd, 2 as col1
union all
select date('2024-06-08') - cast(rand() * 100 as INT64) as ymd, 3 as col1
union all
select date('2024-06-08') - cast(rand() * 100 as INT64) as ymd, 4 as col1

以下のクエリを2回実行していく。

pipenv run dbt run --select "insert_overwrite"

1回目の実行後、実際に実行されたクエリを確認。

target/run/blog_dbt/models/insert_overwrite.sql
create or replace table `${プロジェクト名}`.`dbt_sample`.`insert_overwrite`
partition by ymd
OPTIONS()
as (
  select date('2024-06-10') as ymd, 1 as col1
  union all
  select date('2024-06-09') as ymd, 2 as col1
  union all
  select date('2024-06-08') - cast(rand() * 100 as INT64) as ymd, 3 as col1
  union all
  select date('2024-06-08') - cast(rand() * 100 as INT64) as ymd, 4 as col1
);

1回目に関しては、モデル replaceincremental と同じように、新たに dbt_sample.insert_overwrite を作成しているが、partition by ymd によってパーティションを切っている点が異なる。

そして2回目の実行後、実際に実行されたクエリを確認。

target/run/blog_dbt/models/insert_overwrite.sql
-- generated script to merge partitions into `${プロジェクト名}`.`dbt_sample`.`insert_overwrite`
declare dbt_partitions_for_replacement array<date>;
-- 1. create a temp table with model data
create or replace table `${プロジェクト名}`.`dbt_sample`.`insert_overwrite__dbt_tmp`
partition by ymd
OPTIONS(
  expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
  select date('2024-06-10') as ymd, 1 as col1
  union all
  select date('2024-06-09') as ymd, 2 as col1
  union all
  select date('2024-06-08') - cast(rand() * 100 as INT64) as ymd, 3 as col1
  union all
  select date('2024-06-08') - cast(rand() * 100 as INT64) as ymd, 4 as col1
);
  
-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
  select as struct
    -- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
    array_agg(distinct date(ymd) IGNORE NULLS)
  from `${プロジェクト名}`.`dbt_sample`.`insert_overwrite__dbt_tmp`
);

-- 3. run the merge statement
merge into `${プロジェクト名}`.`dbt_sample`.`insert_overwrite` as DBT_INTERNAL_DEST
  using (
  select
  * from `${プロジェクト名}`.`dbt_sample`.`insert_overwrite__dbt_tmp`
) as DBT_INTERNAL_SOURCE
on FALSE

when not matched by source
and date(DBT_INTERNAL_DEST.ymd) in unnest(dbt_partitions_for_replacement)
then delete

when not matched then insert
    (`ymd`, `col1`)
values
    (`ymd`, `col1`)
;

-- 4. clean up the temp table
drop table if exists `${プロジェクト名}`.`dbt_sample`.`insert_overwrite__dbt_tmp`

4つのクエリが実行されていることがわかる。
dbt 側がよしなにコメントで説明をつけてくれているのでそれに沿って見ていく。

1. create a temp table with model data

クエリ結果からまず tmp テーブルを作成(${モデル名}__dbt_tmpとなる模様)

2. define partitions to update

tmp テーブル内のパーティションキー(ymd カラム)の値を DISTICNT して取得。

3. run the merge statement

MERGE 文を実行し、既存テーブルと2で取得したパーティションキーとの間で、同じパーティションキーの値を持つ既存テーブルのレコードはクエリ結果で更新し、それ以外のクエリ結果はインサートする。

4. clean up the temp table

tmp テーブルを削除。

テーブルの中身を見てみる。
クエリ結果はレコードを4件持つが、2回 dbt run した後のレコードは6件となっている。

スクリーンショット 2024-06-17 19.07.47.png

これは、ymd2024-06-10, 2024-06-09 の2件のレコードに関しては、インサートではなく更新がかかったため。

まとめ

これまで紹介した、4パターンの Model configurations によって、実際に実行される BigQuery のクエリを以下にまとめる。

留意点:

  • テーブルはすでに存在している状態での2回目以降の dbt run 実行時のクエリを記載する
  • 更新対象の BigQuery テーブルを ${dbtモデル} と記載する
  • models ディレクトリ配下の SQL ファイルに書かれているクエリを ${集計クエリ} と記載する

materialized='table' だけを指定

create or replace table `${dbtモデル}`
OPTIONS()
as (
  ${集計クエリ}
);

materialized="incremental" だけを指定

merge into `${dbtモデル}` as DBT_INTERNAL_DEST
using (
  ${集計クエリ}
) as DBT_INTERNAL_SOURCE
on (FALSE)
when not matched then insert
  (${全カラム})
values
  (${全カラム})

materialized="incremental" + unique_key

merge into `${dbtモデル}` as DBT_INTERNAL_DEST
using (
  ${集計クエリ}
) as DBT_INTERNAL_SOURCE
on (
  DBT_INTERNAL_SOURCE.${ユニークキー} = DBT_INTERNAL_DEST.${ユニークキー}
)
when not matched then insert
  (${全カラム})
values
  (${全カラム})

materialized="incremental" + incremental_strategy="insert_overwrite"

-- generated script to merge partitions into ${dbtモデル}
declare dbt_partitions_for_replacement array<date>;
-- 1. create a temp table with model data
create or replace table `${dbtモデル}__dbt_tmp`
partition by ${パーティションキー}
OPTIONS(
  expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
  ${集計クエリ}
);
  
-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
  select as struct
    -- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
    array_agg(distinct date(${パーティションキー}) IGNORE NULLS)
  from `${dbtモデル}__dbt_tmp`
);

-- 3. run the merge statement
merge into `${dbtモデル}` as DBT_INTERNAL_DEST
  using (
  select
  * from `${dbtモデル}__dbt_tmp`
) as DBT_INTERNAL_SOURCE
on FALSE

when not matched by source
and date(DBT_INTERNAL_DEST.${パーティションキー}) in unnest(dbt_partitions_for_replacement)
then delete

when not matched then insert
  (${全カラム})
values
  (${全カラム})
;

-- 4. clean up the temp table
drop table if exists `${dbtモデル}__dbt_tmp`
5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?