4
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【完全攻略】TreasureData x dbt part2

Last updated at Posted at 2024-06-16

前回の記事

続きものなので、こちらから読んでいただけますと理解がスムーズかなと思います。

はじめに

今回の記事では前回記事で書いたTreasureData(以降、td)に対してdbtを使う際の問題点に関して、具体どういう実装をして解決しているかについて説明していきたいと思います。
前回記事で書いたハードルの中でも下記に対する回答になっています。

  • デフォのままだとdbt runの際に既存テーブルがあるとエラーを吐く
  • 裏側でmergeが発生する処理(ex. incremental, snapshot)は使えないため機能が制限される

具体的には、dbt(dbt core)からtd上のテーブルの全更新や差分更新ができる状態にするところまで解説したいと思います。

以降で書いているdbt-coreのversionは1.4.9、dbt-trinoのversionは1.4.0でちょっと(?)古めです。
言い訳としては実行環境としてpoetryを使っていて、環境がロックされているからです。
新しいversionだともしかしたらちょっと挙動違うかもしれないですが、いい感じに読み替えていただけると嬉しいです。

フォルダの構成(必要なところだけ)

repository
|-dbt
    |-models
        |-dataset_a
            |-delete_and_create
                |-delete_and_create_sample.sql
        |-dataset_b
            |-append_diff
                |-append_diff_sample.sql
        |-dataset_c
            |-delete_and_create
            |-append_diff
    |-seeeds
        |-dataset_z
            |-seed_sample.csv
    |-macros
        |-pre_hook_append_diff.sql
    |-dbt_project.yml
    |-profiles.yml

まず一番大事なクエリ周りのフォルダ構成について説明していこうと思います。
dbtではテーブル用のクエリをmodelsというfolder内に管理する構成になっているわけですが、models内のフォルダ構成は上記のようになっています。
基本はデータセット名ごとにdelete_and_createとappend_diffという二つのフォルダがありその中にテーブル用のクエリが格納されています。名前を見てお察しいただけるかと思いますが、deleta_and_createは全更新を表しており、append_diffは差分更新(最新のレコードだけ更新する)というテーブル更新方法を表しております。
つまりテーブル更新方法によってクエリの格納フォルダを分けているわけです。具体的にどうやって更新方法の違いを実現しているかについては後続の章で説明させてください。

クエリ以外に関しては接続情報と実行方法を管理しているymlファイルであるprofiles.ymlと実行時のdbtの設定をマクロレベルで指定しているdbt_project.yml、処理を関数化するためのmacrosフォルダ、csvからテーブルを生成するためのseedsフォルダが存在しています。

ファイルの中身

delete_and_create_sample.sql

select user_id, min(datetime) as min_datetime
from {{ ref("seed_sample") }}
group by 1

append_diff_sample.sql

{{ config(pre_hook=['{{ pre_hook_append_diff_in_sql("-7d/0d") }}']) }}

select
    substr(datetime, 1, 10) as date,
    td_date_trunc('day', time, 'JST') as time,
    sum(payment) as sum_payment,
    count(distinct user_id) as cnt_uu
from {{ ref("seed_sample") }}
where td_interval(time, '{{ var("append_diff_range","-7d/0d") }}', 'JST')
group by 1, 2

seed_sample.csv

time,datetime,user_id,payment
1717214520,2024-06-01 13:02:00,a,200
1717287720,2024-06-02 09:22:00,b,700
1717383240,2024-06-03 11:54:00,b,600
1717486740,2024-06-04 16:39:00,c,500
1717585200,2024-06-05 20:00:00,a,300
1717627620,2024-06-06 07:47:00,b,900
1717628580,2024-06-06 08:03:00,a,100

pre_hook_append_diff.sql

{# dbt_project.yml内のappend_diffのpre_hookとして行う処理 #}
{% macro pre_hook_append_diff() -%}
    {% if target.name != "prod" %}
        DROP TABLE IF EXISTS "{{this.schema}}"."{{this.table}}";
        CREATE TABLE IF NOT EXISTS "{{this.schema}}"."{{this.table}}" AS SELECT * FROM "{{ this.schema |replace('test_','') }}"."{{this.table}}";
    {% endif %}
    CREATE TABLE IF NOT EXISTS "{{ this.schema }}"."tmp__{{ this.table }}" AS SELECT 1;
    CREATE TABLE IF NOT EXISTS "{{ this.schema }}"."{{ this.table }}" AS SELECT * FROM "{{ this.schema }}"."tmp__{{ this.table }}";
    DROP TABLE IF EXISTS "{{ this.schema }}"."tmp__{{ this.table }}"
{%- endmacro %}

{# append_diffのsql内のpre_hookとして行う処理 #}
{% macro pre_hook_append_diff_in_sql(append_diff_range) -%}
    DELETE FROM {{ this }} WHERE td_interval(time,'{{ append_diff_range }}', 'JST');
    CREATE TABLE IF NOT EXISTS "{{ this.schema }}".tmp__{{ this.table }} AS SELECT * FROM {{ this }};
    DROP TABLE IF EXISTS {{ this }};
    DROP TABLE IF EXISTS "{{ this.schema }}".{{ this.table }}__dbt_tmp;
{%- endmacro %}

profiles.yml

dbt:
  outputs:
    dev:
      type: trino
      method: none # optional, one of {none | ldap | kerberos}
      user: "{{ env_var('TREASURE_DATA_API_KEY') }}" # ~/.td/td.conf
      password: dummy
      database: td-presto
      host: api-presto.treasuredata.co.jp
      port: 443
      schema: test_dbt
      threads: 5
      http_scheme: https
    prod:
      type: trino
      method: none # optional, one of {none | ldap | kerberos}
      user: "{{ env_var('TREASURE_DATA_API_KEY') }}" # ~/.td/td.conf
      password: dummy
      database: td-presto
      host: api-presto.treasuredata.co.jp
      port: 443
      schema: dbt
      threads: 5
      http_scheme: https
  target: dev

dbt_project.yml

name: "dbt"
version: "1.0.0"
config-version: 2
profile: "dbt"
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
  - "target"
  - "dbt_packages"
  
# 数字始まりの DB やテーブル名をクォートする
quoting:
  identifier: true
  schema: true
models:
  dbt:
    database: '"td-presto"'
    materialized: table
    dataset_a:
      +schema: "{{ 'test_dataset_a' if target.name != 'prod' else 'dataset_a' }}"
      delete_and_create:
        pre-hook:
          - DROP TABLE IF EXISTS "{{ this.schema }}"."{{ this.table }}__dbt_tmp"
          - DROP TABLE IF EXISTS "{{ this.schema }}"."{{ this.table }}"
      append_diff:
        pre-hook:
          - '{{pre_hook_append_diff()}}'
        post-hook:
          - 'INSERT INTO {{ this }} SELECT * FROM "{{ this.schema }}".tmp__{{ this.table }}'
          - 'DROP TABLE IF EXISTS "{{ this.schema }}".tmp__{{ this.table }}'
    dataset_b:
      +schema: "{{ 'test_dataset_b' if target.name != 'prod' else 'dataset_b' }}"
      delete_and_create:
        pre-hook:
          - DROP TABLE IF EXISTS "{{ this.schema }}"."{{ this.table }}__dbt_tmp"
          - DROP TABLE IF EXISTS "{{ this.schema }}"."{{ this.table }}"
      append_diff:
        pre-hook:
          - '{{pre_hook_append_diff()}}'
        post-hook:
          - 'INSERT INTO {{ this }} SELECT * FROM "{{ this.schema }}".tmp__{{ this.table }}'
          - 'DROP TABLE IF EXISTS "{{ this.schema }}".tmp__{{ this.table }}'
seeds:
  dbt:
    database: '"td-presto"'
    dataset_z:
      +schema: "{{ 'test_dataset_z' if target.name != 'prod' else 'dataset_z' }}"
      +column_types:
        datetime: varchar
      +pre-hook:
        - DROP TABLE IF EXISTS "{{ this.schema }}"."{{ this.table }}"

dbtの環境構成

前回の記事ではdbtはDWH等のデータを保存しているところに対して諸々指示を出しているということとテーブルをref参照にすることで参照先のデータセットを適切に読み替えてクエリ実行できると言うところまで書いたかと思います。
この特性を利用して開発用のデータセット(test_dataset)を参照先とする開発(dev)環境と本番用のデータセット(無印dataset)を参照先とする本番(prod)環境で運用しています。
これによって本番データセットに影響を与えずにテーブルの開発ができる状態になってます。
具体実装としてはprofile.ymlです。
これだけ見ても実行データセットを変える実装はありませんが、「devとprodという二つの実行方法を定義して、何も指定しない場合はdevで実行する」と指定していることに意味があります。
ちなみにdbtのコマンドをDWH側のapiに読み替えるためのドライバーとしてはdbt-trinoを使用しています。

なぜpresto用のドライバーでないかというと、prestoのドライバーが公式メンテ対象外になっているからという理由からです。
さらにちなみにいうと、trinoドライバーを使っていることによってprestoと違うところが生じて色々問題が発生するわけですが、そこの一部が前述している問題点につながっています。
代表的なところでいうと下記のdbt-prestoリポジトリを見た感じだと「incrementというmaterializeはできないよ」って言ってくれるみたいなのですが、trinoの場合は実行しようとしてエラーになります(trinoでは実行できるから)。

流石に脱線が過ぎたので本題に戻ります。
実行方法ごとの定義はdbt_project.yml内のschemaというところで指定しています。

    dataset_a:
      +schema: "{{ 'test_dataset_a' if target.name != 'prod' else 'dataset_a' }}"

ここでtarget.nameと書かれているのが実行方法を指定するjinja記法になっています。
つまり{{ ref(table名) }}(や後述する{{ this.schema }})として呼び出したときに「本番実行以外(開発環境)だったらdataset_aでそれ以外だったらtest_dataset_a」として読み込まれるわけです。
以上が本番と開発環境の指定方法とそれをデータセットに反映させる方法になります。

全更新(deleta_and_create)の実装

ここから前に説明を飛ばした実行方法の違いをどうやって実装しているのかについて説明していきたいと思います。
最初にシンプルな全更新(deleta_and_create)に関する説明です。

解説

まずdbt-trinoドライバーを使ってdbtのテーブル更新系のコマンドであるdbt run(build)を実行したときにtd上ではどういう処理が走るかについて説明したいと思います。
簡単な確認方法としてはtdのコンソールにログインして「Job Activities」のページを見ると実際に実行したjobが表示されます。(target/run/...からも確認できそうですがクエリに付随する部分しかわからないためおすすめしません)
実際に下記のsample.sqlを実行すると下記のような出力になっていることが確認できるかと思います。

-- dataset_a/sample.sql
select 0
  • 下記を実行
create table "td-presto"."test_dataset_a"."sample__dbt_tmp"
as(
select 0
)
  • 下記を実行
alter table "td-presto"."test_dataset_a"."sample__dbt_tmp" rename to "td-presto"."test_dataset_a"."sample"
  • 下記を実行
drop table if exists "td-presto"."test_dataset_a"."sample__dbt_tmp"

test_dataset_aになっているのは前に説明している通りで開発環境で実行しているからになります。処理に関しては大したことはしてないことがわかっていただけると思いますが、クエリ内容を元にtable名__dbt_tmpという名前のテーブルを作って、それをtable名という名前変換してテーブルを作っています。
この処理に関しては下記公式のFAQに書いているRedshiftでの処理と似た実装ですね(BigQueryやSnowflakeはシンプルで羨ましい)。

さて、実行される処理を理解したところで、素置きでこの処理のままで全更新ができるかというとそんなことはなく、下記問題が発生します。

  • 同名テーブルが存在するとエラーになる
  • 途中で異常終了した場合にtable名__dbt_tmpというテーブルが残る

これらの問題に対してprehook(posthook)をうまく活用していくことで対応していきます。
prehook(posthook)は名前の通りではあるのですがクエリを実行する前(後)に行う処理を指定することができる方法になります。
書き方としてはdbt_project.ymlに記載する書き方(以降、グローバルprehook)とsqlファイル内の先頭にconfigとして記載する書き方(以降、ローカルprehook)の2種類が存在していて、実行順番としてはグローバルprehookが先でローカルprehookが後になります。
一応公式の説明は下記になります。

さて、前置きの説明が済んだところで全更新を実現するためにグローバルprehookとしてdbt_project.ymlに下記を追加しています。

    delete_and_create:
        pre-hook:
          - DROP TABLE IF EXISTS "{{ this.schema }}"."{{ this.table }}__dbt_tmp"
          - DROP TABLE IF EXISTS "{{ this.schema }}"."{{ this.table }}"

{{ this.schema }}{{ this.table }}の解説を加えれば大したことないなと思っていただけるかと思います。
{{ this.schema }}は実行時のデータセット名にコンパイルされます。今回で言うと開発環境ならtest_dataset_aで本番環境ならdataset_aになります。
{{ this.table }}はテーブル名になります。
シンプルに{{ this }}という書き方もできてその場合全部ひっくるめて"td-presto"."データセット名"."テーブル名"とコンパイルされます(つまり二つ目に関しては DROP TABLE IF EXISTS {{ this }}でも問題ないはず)。
今回{{ this.schema }}{{ this.table }}を使っているのは一つ目で"{{ this.table }}__dbt_tmp"のようなtable名にprefixやsuffixがついてるタイプの処理したかったためです。
以上の解説を踏まえると、prehookでやってることとしては既存テーブルと既存テーブル生成時にできるテーブル両方を存在してたらdeleteする処理になります。
これによって、テーブル生成前に既存テーブルが存在しない環境を作りつつ、異常終了時に残るテーブルも綺麗にできるわけです。
以上の実装でクエリに問題なければdbt run(build)によって全更新(delete_and_create)を実行できるようになっているとお分かりいただけるかと思います。

ここから実際にtdに対してテーブル更新する例をスクショ多めで説明していきます。
何度も説明している通りdbtはELTのTを司るツールなので、前提としてリネージュを辿った時の一番上流のテーブルがELによってtdに存在してる状態じゃないと動作しません。
とはいえテーブルを入れておいてくださいというのも味気ないので、今回は参照元となるテーブルをdbtのseedという機能を使って作ることにします。
seedはcsvファイルからテーブルを作る機能になっています。実際の用途としては頻繁に更新が発生しないが、管理されてないマスターに相当するテーブル(ex. 都道府県と県庁所在地の紐付けなど)を管理するためのものです。詳細は下記をみてください。

今回は本来用途と違いますが、簡単なテーブルを0から作るのに都合がいいので利用することとします。
csvファイルとしては前述のseed_sample.csvを使います。

time,datetime,user_id,payment
1717214520,2024-06-01 13:02:00,a,200
1717287720,2024-06-02 09:22:00,b,700
1717383240,2024-06-03 11:54:00,b,600
1717486740,2024-06-04 16:39:00,c,500
1717585200,2024-06-05 20:00:00,a,300
1717627620,2024-06-06 07:47:00,b,900
1717628580,2024-06-06 08:03:00,a,100

日時ごとのユーザーの支払いをイメージした雑なテーブルです。
特徴としてtimeとしてdatetime(jst)のunixtimeを指定しています。
(ご存知かと思いますが)tdではtimeをこのような形にするしかpartitionできないので、実際のテーブル感を出すためにこのようにしています。
seedに関しても既存テーブルが存在する状態で実行すると、エラーになるので、下記のようにdbt_project.yml内に設定を追加しています。

seeds:
  dbt:
    database: '"td-presto"'
    dataset_z:
      +schema: "{{ 'test_dataset_z' if target.name != 'prod' else 'dataset_z' }}"
      +column_types:
        datetime: varchar
      +pre-hook:
        - DROP TABLE IF EXISTS "{{ this.schema }}"."{{ this.table }}"

seedに関してはtmpテーブルを作ってrenameするわけではないのでprehookではシンプルにテーブルを消してます。
columnの型を指定しないで実行するとdatetimeというカラムをtimestampとして読み込むのですが、tdはtimestamp型をテーブルにできないためvarchar型として読み込むような設定も追加しています。
seedをテーブルとして実体化するためにはdbt seedというコマンドが存在するのですが、dbt buildは実行すると上流にあるseedも実行してくれる便利コマンドなので、ここでは実行しないでおきます。

さて準備はこれくらいにして、本題の全更新でテーブルを作成する作業をしていきます。
クエリはdelete_and_create_sample.sqlです。

select user_id, min(datetime) as min_datetime
from {{ ref("seed_sample") }}
group by 1

ユーザーごとの最初の支払日時を出している風のクエリです。
全期間遡らないと最初の支払い日時はわからないため全更新しています(ちゃんと設計すれば差分更新で実装できるというツッコミは飲み込んでいただけますと)。

実行環境として前記事にしたvscode + dbt-power-userという私的最強環境を使ってるわけですが、UIは下記の画像の通りになってます。

右上の「Compiled dbt Preview」を押すと{{ ref }}をコンパイルした後のクエリが確認できます。

image.png

今は開発環境での実行なのでコンパイル後が"td-presto"."test_dataset_z"."seed_sample"になっていることが確認できるかと思います。
また下部では「LINEAGE」としてseed_sampleというテーブルを使ってdelete_and_create_sampleを生成しているというグラフが確認できるかと思います。

image.png

さて実際のdbt buildに関してはトンカチマークのプルダウンにある「Build Upstram Models」を押すと実行できます。文字通りではあるのですが、表示しているクエリ(今でいうとdelete_and_create_sample.sql)の上流含めbuildするという意味で、今回で言うとseed_sampleも実行してくれます。

image.png

実行後の出力はこんな感じ

08:27:06  Concurrency: 5 threads (target='dev')
08:27:06  
08:27:06  1 of 2 START seed file "td-presto".test_dataset_z.seed_sample .................. [RUN]
08:27:10  1 of 2 OK loaded seed file test_dataset_z.seed_sample .......................... [INSERT 7 in 3.99s]
08:27:10  2 of 2 START sql table model "td-presto".test_dataset_a.delete_and_create_sample  [RUN]
08:27:13  2 of 2 OK created sql table model "td-presto".test_dataset_a.delete_and_create_sample  [SUCCESS in 3.12s]
08:27:13  
08:27:13  Finished running 1 seed, 1 table model in 0 hours 0 minutes and 11.96 seconds (11.96s).
08:27:14  
08:27:14  Completed successfully
08:27:14  
08:27:14  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

target='dev'つまり開発環境で実行していることや、step1としてtest_dataset_z.seed_sampleが作られた後でsrep2としてtest_dataset_a.delete_and_create_sampleが作られていることがわかるかと思います。
実際にtdのコンソールの「Databases」から該当テーブルを確認すると、下記の通り意図通りのテーブルが作られていることが確認できます。
image.png

image.png

またtdで実行されたjobをコピペで一箇所に書くと以下のようになっており、プロセスも意図通りであったことが確認できます。(seedの部分の処理に関しては私も雰囲気でしか理解していない)

DROP TABLE IF EXISTS "test_dataset_z"."seed_sample"
create table "td-presto"."test_dataset_z"."seed_sample" ("time" INTEGER,"datetime" varchar,"user_id" VARCHAR,"payment" INTEGER)
PREPARE st_b149528974f947c49e88e25f57ac3eb9 FROM insert into "td-presto"."test_dataset_z"."seed_sample" ("time", "datetime", "user_id", "payment") values
          (?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?),(?,?,?,?)
EXECUTE st_b149528974f947c49e88e25f57ac3eb9 USING DECIMAL '1717214520','2024-06-01 13:02:00','a',DECIMAL '200',DECIMAL '1717287720','2024-06-02 09:22:00','b',DECIMAL '700',DECIMAL '1717383240','2024-06-03 11:54:00','b',DECIMAL '600',DECIMAL '1717486740','2024-06-04 16:39:00','c',DECIMAL '500',DECIMAL '1717585200','2024-06-05 20:00:00','a',DECIMAL '300',DECIMAL '1717627620','2024-06-06 07:47:00','b',DECIMAL '900',DECIMAL '1717628580','2024-06-06 08:03:00','a',DECIMAL '100'
DEALLOCATE PREPARE st_b149528974f947c49e88e25f57ac3eb9
DROP TABLE IF EXISTS "test_dataset_a"."delete_and_create_sample__dbt_tmp"
DROP TABLE IF EXISTS "test_dataset_a"."delete_and_create_sample"
create table "td-presto"."test_dataset_a"."delete_and_create_sample__dbt_tmp"
    
  as (
    select user_id, min(datetime) as min_datetime
from "td-presto"."test_dataset_z"."seed_sample"
group by 1
  )
alter table "td-presto"."test_dataset_a"."delete_and_create_sample__dbt_tmp" rename to "td-presto"."test_dataset_a"."delete_and_create_sample"
drop table if exists "td-presto"."test_dataset_a"."delete_and_create_sample__dbt_backup"

以上でdbtを使ってtdにテーブルを全更新で作成できることが実例として示せたかなと思います。
全更新に関してはdbtの設定の仕方を理解していただければ大したことはしていないと思っていただけると思うのですが、次の章の差分更新(append_diff)に関してはこれより複雑になっていますので、ここまでの内容は完全理解した上で次に進んでいただけるとありがたいです。

差分更新(append_diff)の実装

ここから差分更新(append_diff)に関して説明していきます。

解説

まず現実を見るところから始めるのですが、dbtではmaterizedというどういう方法でテーブルを実体化するかを指定できるようにしていて、その中にincremmentalという差分更新の方法が存在しています。
とはいえ、軽く前述していますが、trinoではできる可能性がありますが、prestoだと実行できません。
具体的には裏側としてmerge処理をするらしく、mergeがないprestoでは実行できないということらしいです。
とはいえ差分更新ができないというのはメモリリミット(つまり計算リソース制限)があるprestoにおいては死活問題なので実装していきます。
これまでの説明から新しい要素は基本出てこないのですが、処理をmacroという機能を使ってモジュール化しているので、そこだけ説明します。
基本的にdbtはDRY(don't repeat yourself)にせよという思想なので、共通化できる処理はmacroとしてまとめましょうという良い子発言をしつつ、直接ymlファイルに書くとクオーテーションが3種類必要になったからmacroに避難させているというのが一番の理由だったりします(dryならdelete_and_createのprehookもmacroに寄せろって話ですし)。
実際のmacroは前述のmacros/pre_hook_append_diff.sqlのような記述方法になります。
処理の詳細についてはこの後macro外の処理も含めて説明するので割愛しますが、{% macro function_name(args) %} hogehoge {% endmacro %}で囲んで内部に処理を書く感じです(-%だったり%-だったりしてますが-の有無でコンパイル後の改行を制御できます)。
内部の処理に関してはjinja記法に従ってる以外はsqlを書くだけです。

ここから本題の処理について説明していきます。
グローバルprehookに関しては最後に説明するので、ローカルprehookとグローバルposthookに関して説明していきます。
まずローカルprehookはpre_hook_append_diff_in_sqlというmacroを実行しています。

{# append_diffのsql内のpre_hookとして行う処理 #}
{% macro pre_hook_append_diff_in_sql(append_diff_range) -%}
    DELETE FROM {{ this }} WHERE td_interval(time,'{{ append_diff_range }}', 'JST');
    CREATE TABLE IF NOT EXISTS "{{ this.schema }}".tmp__{{ this.table }} AS SELECT * FROM {{ this }};
    DROP TABLE IF EXISTS {{ this }};
    DROP TABLE IF EXISTS "{{ this.schema }}".{{ this.table }}__dbt_tmp;
{%- endmacro %}

この処理は

  • 環境内にあるテーブルの直近のデータだけ消して
  • それをtmp__table名として保存して
  • 環境のtable名を消す
  • 中間生成テーブルも消す

になっています。
つまりローカルprehookの実行が終わったタイミングで、table名というテーブルが消えて、tmp__table名という直近のデータが欠損したテーブルが存在している状況になります。
そこから前述している通常のdbt run(build)による

  • table名__dbt_tmpという中間生成テーブルをクエリに基づいて作り
  • table名にrenameし
  • table名__dbt_tmpを消す

が実行されてtable名がクエリに基づいて作られます。
最後にグローバルposthookによる

        post-hook:
          - 'INSERT INTO {{ this }} SELECT * FROM "{{ this.schema }}".tmp__{{ this.table }}'
          - 'DROP TABLE IF EXISTS "{{ this.schema }}".tmp__{{ this.table }}'
  • table名tmp__table名をinsertし
  • tmp__table名を消す

が実行されることで、直近のデータはクエリによって作られ、それ以外は既存テーブルの内容という差分更新が実現することになります。
なお、pre_hook_append_diff_in_sql内での直近の指定方法としてtdの固有関数であるtd_intervalを採用しています(引数もtd_intervalの期間指定の記法の通り書いてもらうことにしている)。
td利用者なら当たり前かと思いますが、tdはtimeカラム以外でpartitionできないため、timeを処理するtd_hogehogeという関数が充実しています。
td_intervalも(詳細は以下のページを見てください)その中の一つになっており、つまり前提としてtimeをフィルタしたい時刻のunixtimeに設定してpartitionを効かせる運用にしています。

これはそもそも差分更新するテーブルは全更新だと時間がかかる程度にレコード数が多い(or 今後多くなると見込まれる)ことからちゃんとtimeを設定してpartitionできるようにしようという設計思想に基づきます。
そして、当たり前ですが、prehookで指定した期間範囲とクエリで指定した期間範囲が異なると問題が発生するため、統一する必要があります(クエリで参照する側のテーブルもtimeに時刻のunixtimeが指定されている必要あり)。

割愛していたグローバルprehook(つまりローカルprehookより前に処理される)に関して説明していきます。
まず最初のif文からなのですが、

{% if target.name != "prod" %}
    DROP TABLE IF EXISTS "{{this.schema}}"."{{this.table}}";
    CREATE TABLE IF NOT EXISTS "{{this.schema}}"."{{this.table}}" AS SELECT * FROM "{{ this.schema |replace('test_','') }}"."{{this.table}}";
{% endif %}

これは端的に説明すると開発環境だったら開発環境のテーブルを消して、本番環境のテーブルを丸々コピーしてくるという処理になります。
なんでこんな処理をしているかというと、本番テーブルは毎日更新するworkflowを実行しているため最新のデータなのですが、開発環境は定期更新してないことから、dbt run(build)実行時に差分更新テーブルの場合は本番テーブルをコピーすることで定期実行していない期間の穴埋めをしています。
具体例で言うと、開発環境で20日更新してない状態で直近10日分の差分更新をした場合、直近20-10日分のデータが欠損します。
この状態でテーブルの下流のクエリも実行した場合、本番環境と結果が合わないという現象が起きるため、定期実行をしなくても実行時には本番環境に近い状態を再現すると言う目的での実装になります。
後半部分に関してですが、

    CREATE TABLE IF NOT EXISTS "{{ this.schema }}"."tmp__{{ this.table }}" AS SELECT 1;
    CREATE TABLE IF NOT EXISTS "{{ this.schema }}"."{{ this.table }}" AS SELECT * FROM "{{ this.schema }}"."tmp__{{ this.table }}";
    DROP TABLE IF EXISTS "{{ this.schema }}"."tmp__{{ this.table }}"

これは異常終了に対処するための処理になります。差分更新のテーブルが異常終了すると、過去レコードを保存しているtmp__table名が存在してtable名が存在しない状態になります(特に本番環境での実行の場合)。
この状態ではローカルprehook実行時にtable名が存在しないためエラーになるので、tmp__table名が存在してtable名が存在しない時はtmp__table名table名に戻してあげる処理をしています。
1行目はtmp__table名が存在しないと2行目がエラーになるため、形式的にテーブルを作ってる処理で、3行目は異常終了で残っていたもの(or 1行目でできたもの)を消す処理になります。
※1行目と2行目をシンプルにalterでかけるのではと思ったかもしれませんが、開発環境の場合前述の通り最初に本番コピーをするため、tmp__table名table名が両方存在してエラーになることから今回の方法を採用しています。
これによって異常終了した場合も同日中に再実行するだけで良くなります。
長くなりましたが、以上で差分更新を可能にしております。

ここから具体実行したらどうなるかについて説明していくのですが、やること自体は全更新の時と同じなので新しいところだけ説明していきます。
まず、クエリはappend_diff_sample.sqlになります。

{{ config(pre_hook=['{{ pre_hook_append_diff_in_sql("-7d/0d") }}']) }}

select
    substr(datetime, 1, 10) as date,
    td_date_trunc('day', time, 'JST') as time,
    sum(payment) as sum_payment,
    count(distinct user_id) as cnt_uu
from {{ ref("seed_sample") }}
where td_interval(time, '{{ var("append_diff_range","-7d/0d") }}', 'JST')
group by 1, 2

日毎の売り上げやユーザー数をカウントしている風のクエリです(td関数使っていて若干直感的じゃないですが、クエリを短くしたかっただけでやってることはたいしたことないです)。
日毎の集計なのでわざわざ過去分更新しなくてもいいよねと言うことで差分更新にしています。
timeに関しては前述の通り、partitionを効かせるために設定しています。
where句に関してもtimeでpartitionを効かせるための書き方になります。
{{ var("append_diff_range","-7d/0d") }}
の部分に関しては次回記事で説明予定なので今回は割愛させてください。
コンパイル後のクエリが以下のようになるということだけで以降の説明は理解できます。

select substr(datetime,1,10) as date, td_date_trunc('day',time,'JST') as time, sum(payment) as sum_payment, count(distinct user_id) as cnt_uu
from "td-presto"."test_dataset_z"."seed_sample"
where td_interval(time, '-7d/0d', 'JST')
group by 1,2

UIに関しても先ほどと変わっていないですが、今回新しいこととして右上の「Execute dbt SQL」を実行したいと思います。
これを実行すると(tdにテーブルは作成されず)vscode上でクエリ結果を確認できます。
※先ほどはseed_sampleがテーブルとして存在しなかったためできませんでした。

image.png

ページ下部の「QUERY RESULT」から結果が確認できます。
実行日が2024/06/11なので過去7日分のレコードだけが表示されていてそれより前に関しては存在しない形です。

image.png

あとはdelete_and_createと同様dbt buildをポチればいいんでしょと思われたかと思いますが、一つ今回のシナリオ特有の問題があります。
「開発環境での実行の際は本番環境から該当テーブルをコピーする」という差分更新用のテーブルが本番の運用プロセスに乗っている前提がクリアできていないことです。
このような差分更新のテーブルの初期化周りに関しては次回説明するので、今回は単純に下記コマンドをtdコンソール上から実行して本番環境(dataset_b)にテーブルを作ってしまいます。

create table if not exists "dataset_b".append_diff_sample as(
  select substr(datetime,1,10) as date, td_date_trunc('day',time,'JST') as time, sum(payment) as sum_payment, count(distinct user_id) as cnt_uu
  from "td-presto"."test_dataset_z"."seed_sample"
  group by 1,2
)

実行すると下記のように本番データセットにテーブルができました。
image.png

これで実行準備が整ったので「Build Upstram Models」をポチります(seed_sampleを更新する必要はないので単体だけ更新する「Build dbt Model」でも問題ない)。

以下が出力されて無事実行が完了します。

10:41:58  Concurrency: 5 threads (target='dev')
10:41:58  
10:41:58  1 of 2 START seed file "td-presto".test_dataset_z.seed_sample .................. [RUN]
10:42:01  1 of 2 OK loaded seed file test_dataset_z.seed_sample .......................... [INSERT 7 in 2.91s]
10:42:01  2 of 2 START sql table model "td-presto".test_dataset_b.append_diff_sample ..... [RUN]
10:42:11  2 of 2 OK created sql table model "td-presto".test_dataset_b.append_diff_sample  [SUCCESS in 10.92s]
10:42:11  
10:42:11  Finished running 1 seed, 1 table model in 0 hours 0 minutes and 18.47 seconds (18.47s).
10:42:12  
10:42:12  Completed successfully
10:42:12  
10:42:12  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

td上では下記のようにテーブルが無事できていることが確認できます。
ポイントとしては、先ほどvscode上でクエリを実行した時の出力は3レコードだったのに今回は過去分含め6レコード存在していることです。
全更新の場合は3レコードなはずなので正しく差分更新が実装できていることの証左です。
image.png

ついでにtdで実行されたjobはこんな感じです。

DROP TABLE IF EXISTS "test_dataset_b"."append_diff_sample"
CREATE TABLE IF NOT EXISTS "test_dataset_b"."append_diff_sample" AS SELECT * FROM "dataset_b"."append_diff_sample"
CREATE TABLE IF NOT EXISTS "test_dataset_b"."tmp__append_diff_sample" AS SELECT 1
CREATE TABLE IF NOT EXISTS "test_dataset_b"."append_diff_sample" AS SELECT * FROM "test_dataset_b"."tmp__append_diff_sample"
DROP TABLE IF EXISTS "test_dataset_b"."tmp__append_diff_sample"
DELETE FROM "td-presto"."test_dataset_b"."append_diff_sample" WHERE td_interval(time,'-7d/0d', 'JST')
CREATE TABLE IF NOT EXISTS "test_dataset_b".tmp__append_diff_sample AS SELECT * FROM "td-presto"."test_dataset_b"."append_diff_sample"
DROP TABLE IF EXISTS "td-presto"."test_dataset_b"."append_diff_sample"
DROP TABLE IF EXISTS "test_dataset_b".append_diff_sample__dbt_tmp
create table "td-presto"."test_dataset_b"."append_diff_sample__dbt_tmp"
    
  as (
    

select substr(datetime,1,10) as date, td_date_trunc('day',time,'JST') as time, sum(payment) as sum_payment, count(distinct user_id) as cnt_uu
from "td-presto"."test_dataset_z"."seed_sample"
where td_interval(time, '-7d/0d', 'JST')
group by 1,2
  )
alter table "td-presto"."test_dataset_b"."append_diff_sample__dbt_tmp" rename to "td-presto"."test_dataset_b"."append_diff_sample"
drop table if exists "td-presto"."test_dataset_b"."append_diff_sample__dbt_backup"
INSERT INTO "td-presto"."test_dataset_b"."append_diff_sample" SELECT * FROM "test_dataset_b".tmp__append_diff_sample
DROP TABLE IF EXISTS "test_dataset_b".tmp__append_diff_sample

解説のところで説明した内容の通りになっていることがお分かりいただけるかなと思います。

定期更新に関して

これまでの説明によってローカル環境から開発データセットの更新ができることはイメージしてもらえたかなと思います。
ここからは本番用のデータセットをgithub actionsを使って定期更新する方法について説明していきたいと思います(いろいな方法があると思うので軽めにします)。
権限管理が甘いとこれまで説明してきた内容はtargetをprodとすることでローカルからでも本番用データセットを更新できちゃうわけですが、ガバナンス的によろしくないかつスケジューリング実行はできないのでgithub actionsを本番更新用として採用しています。
github actionsはcicdサービスでdbtのリポジトリにymlファイルを記述するだけで簡単に実装できるので便利です(詳細は以下。私も都度調べながら実装してるのであまり詳しくない)。

特にdbt自体はtdに対してapiを叩いているだけで処理自体はtd側が回しているので、大きなリソースを必要としていないところもgithub actionsと相性がいいなと思っていたりします。
今回はスケジューリングでコンテナを立ち上げて、dbt buildを実行させるworkflowを作っています。
ポイントとしてはローカルでの作業環境と実行差分を発生させないことと、環境変数の取り扱いぐらいです。
.github/workflows/daily_update.ymlとして下記を作成します。

name: daily-update

on:
  schedule:
    - cron: "0 0 * * *" # JST 9:00
  workflow_dispatch:

env:
  TREASURE_DATA_API_KEY: ${{ secrets.TREASURE_DATA_API_KEY }}
  target: prod

defaults:
  run:
    working-directory: dbt

jobs:
  dbt_build:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: 3.9

      - name: Install Dependencies
        run: |
          pip install poetry
          poetry install
          poetry run dbt deps
          # td コマンドのインストール
          curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-focal-td-agent4.sh | sh

      - name: Debug dbt
        run: poetry run dbt debug --profile dbt

      - name: dbt build
        run: poetry run dbt build --target ${target}

  notify:
    runs-on: ubuntu-latest
    if: always()
    continue-on-error: true
    needs:
      [
        dbt_build,
      ]
    steps:
      - uses: actions/checkout@v4
      - name: Check job results
        id: check
        run: |
          if [[ "${{ needs.dbt_build.result }}" == "success" ]]; then
            echo "all_succeeded=true" >> $GITHUB_ENV
          else
          echo "all_succeeded=false" >> $GITHUB_ENV
          fi

      - name: Success notification
        if: env.all_succeeded == 'true'
        uses: 8398a7/action-slack@v3
        with:
          status: ${{ job.status }}
          fields: repo,commit,author,action
          author_name: dbt job notifier
        env:
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_SUCCESS_WEBHOOK_URL }}

      - name: Failure notification
        if: env.all_succeeded == 'false'
        uses: 8398a7/action-slack@v3
        with:
          status: ${{ job.status }}
          fields: repo,commit,author,action
          author_name: dbt job notifier
          text: "Job Failed :warning:"
        env:
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_FAILURE_WEBHOOK_URL }}

github actionsではymlファイルを作成してmasterにマージするだけで、今回でいうとスケジューリングで実行されるようになります(リポジトリのページの「Actions」というタブから実行状況等が確認可能)。
実際のgithub actionsのUIのイメージに関しては下記をご覧ください。

解説としては一番最初の部分で実行タイミングを指定したあとは、ローカルで作成したdevcontainerと同じセットアップをしてからdbt buildを実行してるだけです(targetとしてprodを指定しています)。
devcontainerのセットアップに関しては下記に書いてるのですが、poetryを採用しているのでworkflowでもそれを使って作成することで実行環境を同じにしています。
poetryを使わない場合はpip install -r requirements.txt等でローカルと実行条件をそろえていただけますと幸いです。

なお、devcontainerはcent osベースでgithub actionsはubuntuベースじゃないかというツッコミに関しては、同じlinux系なので許しておくれって感じです(windowsのローカルマシンでubuntu imageを使おうとする立ち上げ重いしエラー出るから諦めた)。
最後の部分は実行結果をslackに通知させてます。
さて、環境変数に関してですが、${{ secrets.TREASURE_DATA_API_KEY }}のような形にしていることがわかるかと思います。これはgithubのリポジトリページの「Settings」>「Secrets and variables」>「Actions」から設定できるのでそれを呼び出している形です(詳細は下記)。

終わりに

ここまでやるとdbtから基本的なテーブル生成はできるようになったかと思います。
想像よりボリューミーになってしまい私としてもうまく説明できているか自信がないのですが、コメントに質問いただければ回答させていただきますのでご容赦ください。

次回記事では、差分更新のテーブルに関してカラム追加する方法や全更新をかける方法について説明したいと思います。

4
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?