1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【完全攻略】TreasureData x dbt part3

Last updated at Posted at 2024-09-23

前回までの記事

はじめに

これまでの記事の内容でdbtを使った中間テーブル生成を実運用していく際に必要な情報の9割くらいはカバーできているかなと思います。
そこで今回の内容は稀に発生するケースに関してどうやって対応していくかについて書いていこうと思います。

具体的には、

  • 差分更新テーブルにカラムを追加 or 変更する
  • 差分更新のテーブルを新規作成 or 全更新する

をどうやって実装したかについて説明していきたいと思います。

背景

この章では、

  • 差分更新テーブルにカラムを追加 or 変更する
  • 差分更新のテーブルを新規作成 or 全更新する

に関してもう少し詳しく説明していきます。

前回までの実装で差分更新(≠全更新)のテーブルに対してカラムの追加やロジックの変更を行うとどうなるかというと、更新範囲のレコードに関しては変更が適応されますが、更新範囲にないレコードに関しては従来の定義のままです(追加されたカラムに関してはNULLが入る)。
カラム追加に関しては経験上timeカラムにunixtimeを設定してtime partitionを効かせるために使うパターンが多いです。

差分更新のテーブルの新規作成に関してはそもそも実行エラーになります(テーブルが存在していることが前提の実装になっているから)。
新規作成に関しては「一度全更新でテーブル作成してから、後から差分更新にすればこれまでの実装だけで完結できるのでは?」と思われた方もいるかもしれませんが、そのご指摘は正しいです。
ただし、開発プロセスが2段階になるのと、詳細は後述しますが、メモリリミットでエラーになりテーブルを作れないパターンが存在する(特に差分更新にしたいテーブルは重いことが多いので)という2点の問題が存在します。

実装方針

dbtが提供している差分更新用のmateialize方法であるincrementalでは、sql内で更新範囲に相当する部分を{% if is_incremental() %} where hogehoge {% endif %}と記載した上で、--full_refreshオプション付きでdbtコマンドを実行すると、更新範囲を無視して全更新できます(詳細は下記)。

prestoエンジンでは上記のincrementalが(裏側でmerge処理をしているから)使えないわけですが、似せた実装をすればいいかというとそれだけでは不十分です。
tdのprestoは他のDWHとは違いメモリリミットが存在して動的にコンピュートリソースを確保するようにはなっていないため、大きなテーブルを一括で処理することはできない可能性があります。
よって、tdの特性を踏まえた上での実装を検討する必要があります。

具体実装

結論から言ってしまいますが、ここからはdbtだけではなくpythonとtdコマンド(tdのapi)を使って機能を実装していきます(保守観点で可能な限りdbtに寄せたかったのですが、dbtだけではどう足掻いても不可能でした)。
そもそもで言うとprestoではなくメモリリミットがないhiveを使えばいいのでは?という意見もあるかと思うのですが、prestoとはsql記法が変わってきたりそれ用のdbt_project.ymlファイル等を用意するなど、それはそれで運用負担が重いので見送っております。

差分更新テーブルにカラムを追加 or 変更する

_dbt/scripts/append_diff_column_edit.py

import os
import glob
import subprocess
import pytd
import pandas as pd
import shutil
import sys

def main():
    # 実行環境を受け取る
    arr = sys.argv
    if len(arr) > 2:
        raise Exception("Too Many Args")
    elif len(arr) == 2:
        target = arr[1]
        if target not in ("dev","prod"):
            raise ValueError("Target is Not Dev or Prod")
    else:
        target = 'dev'

    API_KEY = os.environ["TREASURE_DATA_API_KEY"]

    # カレントディレクトリを_dbtに設定
    os.chdir(os.path.dirname(__file__))
    os.chdir('..')

    sql_folder_path = "analyses/append_diff_column_edit_queries"
    removed_folder_path = "old/append_diff_column_edited_queries"
    compiled_folder_path = "target/compiled/_dbt/analyses/append_diff_column_edit_queries"

    # delete target_folder
    if os.path.isdir(compiled_folder_path):
        shutil.rmtree(compiled_folder_path)
    # dbt compileコマンドを実行
    command = f"""dbt compile --target={target}"""
    subprocess.run(command, shell=True)

    # append_diff_column_edit_queriesにsqlがなかったら正常終了
    if not os.path.isdir(compiled_folder_path):
        print("【FINISH】No column edit file.")
        sys.exit()
    
    # sql_fileを元にテーブルを更新
    for compiled_file_name in os.listdir(compiled_folder_path):
        compiled_file_path  = os.path.join(compiled_folder_path,compiled_file_name)

        # dbt compile後のファイルをread
        with open(compiled_file_path) as f:
            compiled_query = f.read()

        # modelsのpathからdatabase_nameを復元
        model_file_path = glob.glob('models/**/' + compiled_file_name, recursive=True)[0]
        database_name = model_file_path.split("/")[-3] if target == 'prod' else 'test_' + model_file_path.split("/")[-3]
        table_name = os.path.basename(model_file_path).split(".")[0]
        replace_table_name = table_name + "_replace"
        temp_file_path = "temp.sql"

        # カラム変更した新しいテーブルを生成
        ## 一度実行したいクエリをsqlファイルに書き込み(ファイルに書き込まないとクオーテーションで実行できない)
        query = f"""DROP TABLE IF EXISTS "{replace_table_name}"; CREATE TABLE "{replace_table_name}" AS {compiled_query}""" 
        with open(temp_file_path,"w") as o:
            print(query, file = o)
        ## 生成したファイルを使ってtdコマンドを実行
        command = f"""td -k {API_KEY} -e https://api.treasuredata.co.jp query -T presto -w -d {database_name} -q {temp_file_path}"""
        subprocess.run(command, shell=True)
        ## tmpファイル破棄
        if os.path.isfile(temp_file_path):
            os.remove(temp_file_path)

        # record数テスト
        ## record countするクエリを実行してdfとして保存
        client = pytd.Client(apikey=API_KEY, endpoint='https://api.treasuredata.co.jp/', database=database_name, default_engine='presto')
        res = client.query(f"""select count(*) as cnt from "{table_name}" union all select count(*) as cnt from "{replace_table_name}"  """)
        df = pd.DataFrame(**res)
        ## cntカラムの最初のレコード(元テーブルのレコード数)とcntカラムの2番めのレコード(新テーブルのレコード数)を比較
        if df.cnt[0] != df.cnt[1]:
            raise Exception("New table row count not equal old table.")
        
        # column数テスト
        ## それぞれのテーブルのcolumnを取得
        client = client = pytd.Client(apikey=API_KEY, endpoint='https://api.treasuredata.co.jp/', database=database_name, default_engine='presto')
        res_1 = client.query(f"""show columns from "{table_name}" """)
        old_df = pd.DataFrame(**res_1)
        res_2 = client.query(f"""show columns from "{replace_table_name}" """)
        new_df = pd.DataFrame(**res_2)
        ## column数を比較して新テーブルの方がカラムが少なかったらエラー
        if len(old_df.Column) > len(new_df.Column):
            raise Exception("New table column num is less than old table.")
        
        # 旧テーブルを破棄 or 名前を変更
        replaced_table_name = table_name + "_replaced"
        if target =='dev':
            ## devは何かあった時のためにテーブルを残しておく
            query = f"""drop table if exists "{replaced_table_name}"; alter table "{table_name}" rename to "{replaced_table_name}"; alter table "{replace_table_name}" rename to "{table_name}" """
        else:
            query = f"""drop table if exists "{table_name}"; alter table "{replace_table_name}" rename to "{table_name}" """
        command = f"""td -k {API_KEY} -e https://api.treasuredata.co.jp query -T presto -w -d {database_name} '{query}' """
        subprocess.run(command, shell=True)

        # tmp_sql_fileの破棄とappend_diff_column_edit_queriesのsqlファイルを移動(2回以上実行しないように)
        ## 本番環境では一回実行にしたいためファイルを移動
        if target == 'prod':
            if os.path.isfile(os.path.join(sql_folder_path, compiled_file_name)):
                shutil.move(os.path.join(sql_folder_path, compiled_file_name), os.path.join(removed_folder_path, compiled_file_name))

if __name__ == "__main__":
    main()

使い方としては、_dbt/analyses/append_diff_column_edit_queriesに変更したいクエリ名のsqlを作成して変更したいロジックを記述してappend_diff_column_edit.py実行する形になります。
例えば、timeカラムが設定されていないtable_aに新しくtimeカラムを設定する場合は下記のようなsqlを用意します。

_dbt/analyses/append_diff_column_edit_queries/table_a.sql

select *, td_time_parse(timestamp_column, 'JST') as time --timestamp_columnはtimestamp形式のデータ
from {{ ref("table_a") }}

これを見てお気づきかと思いますが、append_diff_column_edit.pyはテーブルに存在するカラムを使う前提なので、そうでない場合は後述の新規作成用のappend_diff_refresh.pyを使う形になります。
ポイントとしてはdbt compileが適応されるanalysesの中に変更用のsqlファイルを格納することで、開発環境も本番環境にも対応できるようにしたことと、複数ファイルを一括で処理できるようにしているところになります(特にtimeカラムの設定は複数テーブルを一括で処理したいケースが多いと思われるため)。
本番環境(prod)で実行後はanalysesにあるファイルをold(使わなくなったファイルの格納場所)に移動して一度しか実行されないようにしています(開発環境では移動に伴うミスの方が怖いので移動させていません)。

蛇足ですが、dbtのanalyses(詳細は下記)はdbt-power-userの拡張機能でcompile結果をサクッと見ることができる環境だと使い所がいまいちよくわかっておらず、この処理のためにしか使っていません。

実運用に関してですが、このpythonはgithub actionsでプルリクがマージされた時に実行される運用にしています。

.github/workflows/pull_request_merged.yml

name: workflow when pull request merged

on:
  push:
    branches:
      - master

defaults:
  run:
    working-directory: _dbt

env:
  TREASURE_DATA_API_KEY: ${{ secrets.TREASURE_DATA_API_KEY }}
  target: prod

jobs:
  append_diff_column_edit:
    runs-on: ubuntu-latest
    permissions:
      contents: write 

    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: 3.9

      - name: Install poetry and td
        run: |
          pip install poetry
          poetry install
          curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-focal-td-agent4.sh | sh
          poetry run dbt deps

      - name: Debug dbt
        run: poetry run dbt debug --profile _dbt

      - name: run python
        run: |
          poetry run python append_diff_column_edit.py prod

      - uses: stefanzweifel/git-auto-commit-action@v5

※本当はこのworkflowの後続処理でdbt docsをs3に上げている処理もしているが割愛

ポイントとしては実行後analysesにあるファイルを移動するので、write権限を与えてcomitさせる必要があるところです(本質じゃないけどやらないといけない系が一行で実装できるのがgithub actionsの便利なところ)。

差分更新のテーブルを新規作成 or 全更新する

_dbt/scripts/append_diff_refresh.py

import os
import sys
import glob
import subprocess
import pandas as pd
import datetime
import numpy as np
from concurrent.futures import ThreadPoolExecutor

def main():
    arr = sys.argv
    # 変数定義
    target = arr[1]
    model_name = arr[2]
    start_date = arr[3]
    period = int(arr[4])
    threads = int(arr[5])

    API_KEY = os.environ["TREASURE_DATA_API_KEY"]

    # append_diff_startとappend_diff_endのvarを使っているテーブルを指定
    append_diff_start_end_tables = []

    # カレントディレクトリを_dbtに設定
    os.chdir(os.path.dirname(__file__))
    os.chdir('..')

    model_file_path = glob.glob('models/**/' + model_name + '.sql', recursive=True)[0]
    compiled_file_path = 'target/compiled/_dbt/' + model_file_path
    database_name = model_file_path.split("/")[-3] if target == 'prod' else 'test_' + model_file_path.split('/')[-3]
    refresh_model_name = model_name + '_refresh'
    
    # 期間からforで回す変数を生成
    delta = (datetime.datetime.now() - pd.to_datetime(start_date)).days-1 # start_dateから実行日の前日までの日数
    starts = np.arange(0,delta,period) # 各実行クエリの期間のstartを算出
    starts_last = starts[0] # 0は最新だから最後に別で処理
    starts_first = starts[-1] # 最後は一番古いから最初に処理
    starts = np.delete(starts,[0,-1])[::-1] # 他は同じロジックで処理。逆順にしておく

    # 初回実行用のdbt comopileをする(first_fun_flag=true)
    # delete target_file
    if os.path.isfile(compiled_file_path):
        os.remove(compiled_file_path)
    # dbt compileコマンドを実行(変数をわかりやすい形にして後で差し替えれるようにする)
    if model_name not in append_diff_start_end_tables:
        command = f"""dbt compile -s {model_name} --target={target} """ + "--vars '{append_diff_range: __r__, append_diff_existing_table_prefix: __etp__, append_diff_existing_table_suffix: __ets__, append_diff_first_run_flag: true}'"
    else:
        command = f"""dbt compile -s {model_name} --target={target} """ + "--vars '{append_diff_start: __s__, append_diff_end: __e__}'"
    subprocess.run(command, shell=True)

    # 生成されたcompile後のクエリを変数として保存
    with open(compiled_file_path) as f:
        compiled_query_first = f.read()
    
    # 増分実行用のdbt comopileをする
    # delete target_file
    if os.path.isfile(compiled_file_path):
        os.remove(compiled_file_path)
    # dbt compileコマンドを実行(変数をわかりやすい形にして後で差し替えれるようにする)
    if model_name not in append_diff_start_end_tables:
        command = f"""dbt compile -s {model_name} --target={target} """ + "--vars '{append_diff_range: __r__, append_diff_existing_table_prefix: __etp__, append_diff_existing_table_suffix: __ets__}'"
    else:
        command = f"""dbt compile -s {model_name} --target={target} """ + "--vars '{append_diff_start: __s__, append_diff_end: __e__}'"
    subprocess.run(command, shell=True)

    # 生成されたcompile後のクエリを変数として保存
    with open(compiled_file_path) as f:
        compiled_query = f.read()
    
    # 自己参照している時にthreadsが1以外だとエラーを吐く
    if (threads != 1) and (model_name in compiled_query):
        raise Exception("自己参照があるモデルはthreads = 1 を指定してください")

    # 最初の処理
    temp_file_path = 'temp.sql'
    main_query = compiled_query_first.replace('__r__',str(-1*(delta - starts_first + 1))+'d'+'/'+str(-1*starts_first)+'d').replace('__etp__','').replace('__ets__','_refresh') if model_name not in append_diff_start_end_tables else compiled_query_first.replace('__s__',str(-1*(delta+1))).replace('__e__',str(-1*starts_first))
    with open(temp_file_path,"w") as o:
        # 最初はテーブル生成
        print(f'''drop table if exists "{refresh_model_name}"; create table "{refresh_model_name}" as ''' + main_query, file = o)
    command = f"""td -k {API_KEY} -e https://api.treasuredata.co.jp query -T presto -w -d {database_name} -q {temp_file_path}"""
    subprocess.run(command, shell=True)
    # delete temp_file
    if os.path.isfile(temp_file_path):
        os.remove(temp_file_path)
    
    # マルチスレッドでクエリを回す
    with ThreadPoolExecutor() as thread:
        futures = []
        for i in range(threads):
            # 後述のfunctionを実行
            futures.append(thread.submit(function, *(compiled_query, starts[i::threads], period, f'''temp_{i}.sql''', refresh_model_name + '_' + str(i), model_name, refresh_model_name, API_KEY, database_name, append_diff_start_end_tables)))

    # 最後の処理
    temp_table_name = refresh_model_name + '_0'
    # 最後は計算対象期間の調整が必要(start_dateを超えて計算しないように)
    main_query = compiled_query.replace('__r__',str(-1*period)+'d'+'/'+str(-1*starts_last)+'d').replace('__etp__','').replace('__ets__','_refresh') if model_name not in append_diff_start_end_tables else compiled_query.replace('__s__',str(-1*(starts_last+period-1))).replace('__e__',str(-1*starts_last))
    # temp_table_nameをcreateしてからrefresh_model_nameにinsertする
    query = f'''
    drop table if exists "{temp_table_name}"; 
    create table "{temp_table_name}" as
    '''  \
    + main_query \
    + f'''
    ; 
    insert into "{refresh_model_name}" select * from "{temp_table_name}";
    drop table if exists "{temp_table_name}"
    '''
    with open(temp_file_path,"w") as o:
        print(query, file = o)
    ## fileを元にinsertする
    command = f"""td -k {API_KEY} -e https://api.treasuredata.co.jp query -T presto -w -d {database_name} -q {temp_file_path}"""
    subprocess.run(command, shell=True)
    # delete temp_file
    if os.path.isfile(temp_file_path):
        os.remove(temp_file_path)
    
    replaced_model_name = model_name + '_replaced'
    # テーブルをrename
    if target =='dev':
        # devは何かあった時のためにテーブルを残しておく
        query = f"""drop table if exists "{replaced_model_name}"; alter table if exists "{model_name}" rename to "{replaced_model_name}"; alter table "{refresh_model_name}" rename to "{model_name}" """
    else:
        query = f"""drop table if exists "{model_name}"; alter table "{refresh_model_name}" rename to "{model_name}" """
    command = f"""td -k {API_KEY} -e https://api.treasuredata.co.jp query -T presto -w -d {database_name} '{query}' """
    subprocess.run(command, shell=True)
    

# subprocessを関数化 
def function(compiled_query, starts, period, temp_file_path, temp_table_name, model_name, refresh_model_name, API_KEY, database_name, append_diff_start_end_tables):
    for start in starts:
        main_query = compiled_query.replace('__r__',str(-1*(period))+'d'+'/'+str(-1*start)+'d').replace('__etp__','').replace('__ets__','_refresh') if model_name not in append_diff_start_end_tables else compiled_query.replace('__s__',str(-1*(start+period-1))).replace('__e__',str(-1*start))
        # temp_table_nameをcreateしてからrefresh_model_nameにinsertする
        query = f''' \
        drop table if exists "{temp_table_name}";
        create table "{temp_table_name}" as
        '''  \
        + main_query \
        + f'''
        ; 
        insert into "{refresh_model_name}" select * from "{temp_table_name}";
        drop table if exists "{temp_table_name}"
        '''
        with open(temp_file_path,"w") as o:
            print(query, file = o)
        command = f"""td -k {API_KEY} -e https://api.treasuredata.co.jp query -T presto -w -d {database_name} -q {temp_file_path}"""
        subprocess.run(command, shell=True)
        if model_name not in append_diff_start_end_tables:
            print('finish:' + str(-1*(period))+'d'+'/'+str(-1*start)+'d')
        else:
            print('finish:' + str(-1*(start+period-1)) + 'd to ' + str(-1*start) + 'd')
        if os.path.isfile(temp_file_path):
            os.remove(temp_file_path)

if __name__ == "__main__":
    main()

append_diff_column_edit.pyよりはだいぶわかりにくくなってるので丁寧めに説明したいと思います。
まず基本的な思想としては、メモリリミットしないように期間を分割した上でそれぞれクエリ実行して一つのテーブルにinsertすることで全期間処理するという内容になります。
入力としてdbtで使うのと同じsqlファイルを利用することにしていますが、一つのsqlファイルを使って異なる期間の計算をするための方法としてdbtのvarで初期値あり変数を使う形にしています(varについては下記参照)。

今の所下記2つの軸についてそれぞれ2パターン対応できるように変数を用意しています。

  • 期間指定の方法がtd_intervalの記法か or 開始日と終了日か
  • sql内で自己参照する or 自己参照しない

実際の割合で言うと、期間をtd_intervalで指定する & sql内で自己参照しないパターンがほとんどになっています。

まず一つ目の期間指定の方法として用意している変数が、append_diff_rangeappend_diff_startappend_diff_endになります。
具体例で言うと下記のような使い方です。

select *
from hogehoge
where td_interval(time, '{{ var("append_diff_range","-7d/0d") }}', 'JST')

こちらはシンプルで前回の記事でも説明したtd_intervalでtime partitionを効かせる形に今回のrefresh用の変数を組み込んでいる形になります。

with
    target_dates as (
        select target_date
        from
            unnest(
                sequence(
                    date(
                        td_time_format(
                            td_time_add(
                                td_scheduled_time(),
                                concat('{{ var("append_diff_start","-7") }}', 'd')
                            ),
                            'yyyy-MM-dd',
                            'JST'
                        )
                    ),
                    date(
                        td_time_format(
                            td_time_add(
                                td_scheduled_time(),
                                concat('{{ var("append_diff_end","-0") }}', 'd')
                            ),
                            'yyyy-MM-dd',
                            'JST'
                        )
                    ),
                    interval '1' day
                )
            ) t(target_date)
    ),
    ...

こちらは日付範囲ではなく期間の始まりと終わりをそれぞれ変数で指定しています。
主に例にあるような日付マスターを生成する時に使うことが多いです。

蛇足ですが、はじめappend_diff_rangeは使わないでappend_diff_startappend_diff_endを引き算したりconcatしたりでtd_interval用の文字列を生成してみたのですが、このやり方ではtime partitionが適応されなかったため大人しく変数を用意しています。

次にsqlで自己参照するか否かに関してですが、自己参照しない場合は追加変数がないため上記の変数だけで完結されます。
一方で自己参照をする場合は変数が追加で3つ増えます(実装を工夫すれば2つですむのですがやっていない)。
まずそもそも自己参照について説明すると、例えば「これまでの自分のテーブルにユニークキーが存在していないレコードは追加する」のような実装を想定しています。
ここから具体sqlを使って説明すると下記のようになるかと思います。

first_purchase.sql

delete from first_purchase WHERE td_interval(time,'-7d/0d', 'JST');
insert into first_purchase(
    with recent as(
        select user_id, min(purchase_datetime) as first_purchase_datetime, min(time) as time
        from purchase_log
        where td_interval(time, '-7d/0d', 'JST')
        group by 1
    )
    select user_id, first_purchase_datetime, time
    from recent
    left join (
        select user_id, 1 as exist_flag 
        from first_purchase
    )using(user_id)
    where exist_flag is null
)

この例は直近7日分のpurchase_logの中で初めに購入したユーザーだけ(既存のfirst_purchaseにレコードが存在しない)がinsertされる処理になります。
クエリの中でfirst_purchaseを呼び出すような実装なため自己参照と呼んでいます。
このような自己参照型のクエリを変数を使った記載に直すと以下のようになります。

{{ config(pre_hook=['{{ pre_hook_append_diff_in_sql("-7d/0d") }}']) }}

with recent as(
    select user_id, min(purchase_datetime) as first_purchase_datetime, min(time) as time
    from purchase_log
    where td_interval(time, '{{ var("append_diff_range","-7d/0d") }}', 'JST')
    group by 1
)
select user_id, first_purchase_datetime, time
from recent
{% if var("append_diff_first_run_flag", false) == false %}
    left join (
        select user_id, 1 as exist_flag 
        from "{{ this.schema }}"."{{ var('append_diff_existing_table_prefix', 'tmp__') }}{{ this.table }}{{ var( 'append_diff_existing_table_suffix', '') }}"
    )using(user_id)
where exist_flag is null
{% endif %}

ご覧いただくとappend_diff_first_run_flagappend_diff_existing_table_prefixappend_diff_existing_table_suffixの3つの変数が追加されていることがわかるかと思います。

まず、append_diff_first_run_flagは初回実行かどうかを識別するフラグになっていて、初回実行の場合は既存テーブルが存在しないことから、left joinを行わないようにするために利用します。具体的には記載の通りですが、初期値としてfalseを設定した上でif文の条件分岐の範囲に初回実行時で実行できないクエリを入れています。

append_diff_existing_table_prefixappend_diff_existing_table_suffixに関しては自己参照する際にテーブル名が変化することに対応するための変数になります。

まず基本的なdbt build(run)実行時には、クエリに記載の通りpre_hook_append_diff_in_sqlがprehookとして実行されるわけですが、(前回記事でも説明済みですが)下記の通り処理過程で依存データから直近レコードを除いたテーブルが一時的にtmp__{{ this.table }}に保存されます。

{% macro pre_hook_append_diff_in_sql(append_diff_range) -%}
    DELETE FROM {{ this }} WHERE td_interval(time,'{{ append_diff_range }}', 'JST');
    CREATE TABLE IF NOT EXISTS "{{ this.schema }}".tmp__{{ this.table }} AS SELECT * FROM {{ this }};
    DROP TABLE IF EXISTS {{ this }};
    DROP TABLE IF EXISTS "{{ this.schema }}".{{ this.table }}__dbt_tmp;
{%- endmacro %}

また、後述しますがappend_diff_refresh.py実行時には、{{ this.table }}_refreshというテーブルにinsertしていく実装になっています。
上記を踏まえて、dbtコマンド時にはtmp__{{ this.table }}、append_diff_refresh.py実行時には{{ this.table }}_refreshとできるように変数を利用して{{ var('append_diff_existing_table_prefix', 'tmp__') }}{{ this.table }}{{ var( 'append_diff_existing_table_suffix', '') }}としております。
以上で初期値あり変数を利用することで色々なパターンに関して通常実行だけでくappend_diff_refresh.pyを実行する準備ができることを説明しました。

pythonの具体処理に関しては例のように変数を使って記載したsqlを

# dbt compileコマンドを実行(変数をわかりやすい形にして後で差し替えれるようにする)
    if model_name not in append_diff_start_end_tables:
        command = f"""dbt compile -s {model_name} --target={target} """ + "--vars '{append_diff_range: __r__, append_diff_existing_table_prefix: __etp__, append_diff_existing_table_suffix: __ets__, append_diff_first_run_flag: true}'"
    else:
        command = f"""dbt compile -s {model_name} --target={target} """ + "--vars '{append_diff_start: __s__, append_diff_end: __e__}'"

のような形で、後からfor文内で書き換えやすい形の変数にコンパイルしてから

main_query = compiled_query_first.replace('__r__',str(-1*(delta - starts_first + 1))+'d'+'/'+str(-1*starts_first)+'d').replace('__etp__','').replace('__ets__','_refresh') if model_name not in append_diff_start_end_tables else compiled_query_first.replace('__s__',str(-1*(delta+1))).replace('__e__',str(-1*starts_first))

のような形で、日付を書き換えたりクエリの前後の処理を記述して、tdコマンドでTreasureDataにクエリとして発行する形です。
なお、上記に関してマルチスレッドで実行できるようにしておりますが、マルチスレッドの場合実行順番が保証されないため、自己参照モデルに関してはマルチスレッドで実行しないように制御を入れております。
長くなりましたが、以上でappend_diff_refresh.pyに関する説明を終わりたいと思います。

続きまして、このappend_diff_refresh.pyは引数を受け取る形での実行になるのでそれをgithub actionsでどう実装しているかについて説明したいと思います。

.github/workflows/append_diff_refresh_manual_run.yml

name: append_diff_refresh_manual_run

on:
  workflow_dispatch:
    inputs:
      model_name:
        description: 'テーブル名 ex. first_purchase'
        required: true
        type: string
      start_date:
        description: 'ここで指定した日付以降のデータが格納される ex. 2024-01-01'
        required: true
        type: string
      period:
        description: '一回のクエリで更新する日付の範囲 重いクエリは小さくした方が良い ex. 10'
        required: true
        type: string
      threads:
        description: '同時に何スレッドで更新クエリを実行するか(実行順番を保証したい場合は1を指定) ex. 5'
        required: true
        type: string

env:
  TREASURE_DATA_API_KEY: ${{ secrets.TREASURE_DATA_API_KEY }}
  target: prod

defaults:
  run:
    working-directory: _dbt

jobs:
  append_diff_refresh:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: 3.9

      - name: Install poetry and td
        run: |
          pip install poetry
          poetry install
          curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-focal-td-agent4.sh | sh
          poetry run dbt deps

      - name: Debug dbt
        run: poetry run dbt debug --profile _dbt

      - name: run python
        run: |
          poetry run python append_diff_refresh.py prod ${{ github.event.inputs.model_name }} ${{ github.event.inputs.start_date }} ${{ github.event.inputs.period }} ${{ github.event.inputs.threads }}

ポイントは

on:
  workflow_dispatch:
    inputs:
      model_name:
        description: 'テーブル名 ex. first_purchase'
        required: true
        type: string
      start_date:
        description: 'ここで指定した日付以降のデータが格納される ex. 2024-01-01'
        required: true
        type: string
      period:
        description: '一回のクエリで更新する日付の範囲 重いクエリは小さくした方が良い ex. 10'
        required: true
        type: string
      threads:
        description: '同時に何スレッドで更新クエリを実行するか(実行順番を保証したい場合は1を指定) ex. 5'
        required: true
        type: string

一番最初の部分でこの指定をすると下記のようにgithub actionsの手動実行時に変数を指定できるようになり、その変数を引数としてappend_diff_refresh.pyが実行される運用にしています。
image.png
また、vscodeのローカル環境でもgithub actionsと似たuxで挙動確認できるように、下記のような設定をlaunch.jsonに記載しています。

.vscode/launch.json

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "config_for_append_diff_refresh",
            "type": "debugpy",
            "request": "launch",
            "program": "${file}",
            "console": "integratedTerminal",
            "args": ["dev", "${input:input_model_name}", "${input:input_start_date}", "${input:input_period}", "${input:input_threads}"],
        }
    ],
    "inputs": [
        {
            "id": "input_model_name",
            "type": "promptString",
            "description": "テーブル名 ex. first_purchase"
        },
        {
            "id": "input_start_date",
            "type": "promptString",
            "description": "ここで指定した日付以降のデータが格納される ex. 2024-01-01"
        },
        {
            "id": "input_period",
            "type": "promptString",
            "description": "一回のクエリで更新する日付の範囲 重いクエリは小さくした方が良い ex. 10"
        },
        {
            "id": "input_threads",
            "type": "promptString",
            "description": "同時に何スレッドで更新クエリを実行するか(実行順番を保証したい場合は1を指定) ex. 5",
        },
    ]
}

上記を記載した上でappend_diff_refresh.pyをデバッグ実行すると

image.png

変数の入力受付状態になり、入力後その変数を元にpythonが実行されるため、github actionsと比較的近い状態で実行を再現できます(ローカル実行時はもちろんdev環境に対して実行されます)。

まとめ

今回はだいぶ文字が多めになってしまった & pythonのエラーハンドリングとか最低限しかやってなくてだいぶお見苦しいかもしれないですが、ここまでやると実運用に必要なパーツは揃ったのではないかなと思います。
次回はここまで準備してきた諸々を使って、実際にtdのworkflowをdbtに移行するにあたってどんなプロセスで作業したかと、チップス的なところを共有できればと思います。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?