前回までの記事
はじめに
これまでの記事の内容でdbtを使った中間テーブル生成を実運用していく際に必要な情報の9割くらいはカバーできているかなと思います。
そこで今回の内容は稀に発生するケースに関してどうやって対応していくかについて書いていこうと思います。
具体的には、
- 差分更新テーブルにカラムを追加 or 変更する
- 差分更新のテーブルを新規作成 or 全更新する
をどうやって実装したかについて説明していきたいと思います。
背景
この章では、
- 差分更新テーブルにカラムを追加 or 変更する
- 差分更新のテーブルを新規作成 or 全更新する
に関してもう少し詳しく説明していきます。
前回までの実装で差分更新(≠全更新)のテーブルに対してカラムの追加やロジックの変更を行うとどうなるかというと、更新範囲のレコードに関しては変更が適応されますが、更新範囲にないレコードに関しては従来の定義のままです(追加されたカラムに関してはNULLが入る)。
カラム追加に関しては経験上timeカラムにunixtimeを設定してtime partitionを効かせるために使うパターンが多いです。
差分更新のテーブルの新規作成に関してはそもそも実行エラーになります(テーブルが存在していることが前提の実装になっているから)。
新規作成に関しては「一度全更新でテーブル作成してから、後から差分更新にすればこれまでの実装だけで完結できるのでは?」と思われた方もいるかもしれませんが、そのご指摘は正しいです。
ただし、開発プロセスが2段階になるのと、詳細は後述しますが、メモリリミットでエラーになりテーブルを作れないパターンが存在する(特に差分更新にしたいテーブルは重いことが多いので)という2点の問題が存在します。
実装方針
dbtが提供している差分更新用のmateialize方法であるincrementalでは、sql内で更新範囲に相当する部分を{% if is_incremental() %} where hogehoge {% endif %}
と記載した上で、--full_refresh
オプション付きでdbtコマンドを実行すると、更新範囲を無視して全更新できます(詳細は下記)。
prestoエンジンでは上記のincrementalが(裏側でmerge処理をしているから)使えないわけですが、似せた実装をすればいいかというとそれだけでは不十分です。
tdのprestoは他のDWHとは違いメモリリミットが存在して動的にコンピュートリソースを確保するようにはなっていないため、大きなテーブルを一括で処理することはできない可能性があります。
よって、tdの特性を踏まえた上での実装を検討する必要があります。
具体実装
結論から言ってしまいますが、ここからはdbtだけではなくpythonとtdコマンド(tdのapi)を使って機能を実装していきます(保守観点で可能な限りdbtに寄せたかったのですが、dbtだけではどう足掻いても不可能でした)。
そもそもで言うとprestoではなくメモリリミットがないhiveを使えばいいのでは?という意見もあるかと思うのですが、prestoとはsql記法が変わってきたりそれ用のdbt_project.ymlファイル等を用意するなど、それはそれで運用負担が重いので見送っております。
差分更新テーブルにカラムを追加 or 変更する
_dbt/scripts/append_diff_column_edit.py
import os
import glob
import subprocess
import pytd
import pandas as pd
import shutil
import sys
def main():
# 実行環境を受け取る
arr = sys.argv
if len(arr) > 2:
raise Exception("Too Many Args")
elif len(arr) == 2:
target = arr[1]
if target not in ("dev","prod"):
raise ValueError("Target is Not Dev or Prod")
else:
target = 'dev'
API_KEY = os.environ["TREASURE_DATA_API_KEY"]
# カレントディレクトリを_dbtに設定
os.chdir(os.path.dirname(__file__))
os.chdir('..')
sql_folder_path = "analyses/append_diff_column_edit_queries"
removed_folder_path = "old/append_diff_column_edited_queries"
compiled_folder_path = "target/compiled/_dbt/analyses/append_diff_column_edit_queries"
# delete target_folder
if os.path.isdir(compiled_folder_path):
shutil.rmtree(compiled_folder_path)
# dbt compileコマンドを実行
command = f"""dbt compile --target={target}"""
subprocess.run(command, shell=True)
# append_diff_column_edit_queriesにsqlがなかったら正常終了
if not os.path.isdir(compiled_folder_path):
print("【FINISH】No column edit file.")
sys.exit()
# sql_fileを元にテーブルを更新
for compiled_file_name in os.listdir(compiled_folder_path):
compiled_file_path = os.path.join(compiled_folder_path,compiled_file_name)
# dbt compile後のファイルをread
with open(compiled_file_path) as f:
compiled_query = f.read()
# modelsのpathからdatabase_nameを復元
model_file_path = glob.glob('models/**/' + compiled_file_name, recursive=True)[0]
database_name = model_file_path.split("/")[-3] if target == 'prod' else 'test_' + model_file_path.split("/")[-3]
table_name = os.path.basename(model_file_path).split(".")[0]
replace_table_name = table_name + "_replace"
temp_file_path = "temp.sql"
# カラム変更した新しいテーブルを生成
## 一度実行したいクエリをsqlファイルに書き込み(ファイルに書き込まないとクオーテーションで実行できない)
query = f"""DROP TABLE IF EXISTS "{replace_table_name}"; CREATE TABLE "{replace_table_name}" AS {compiled_query}"""
with open(temp_file_path,"w") as o:
print(query, file = o)
## 生成したファイルを使ってtdコマンドを実行
command = f"""td -k {API_KEY} -e https://api.treasuredata.co.jp query -T presto -w -d {database_name} -q {temp_file_path}"""
subprocess.run(command, shell=True)
## tmpファイル破棄
if os.path.isfile(temp_file_path):
os.remove(temp_file_path)
# record数テスト
## record countするクエリを実行してdfとして保存
client = pytd.Client(apikey=API_KEY, endpoint='https://api.treasuredata.co.jp/', database=database_name, default_engine='presto')
res = client.query(f"""select count(*) as cnt from "{table_name}" union all select count(*) as cnt from "{replace_table_name}" """)
df = pd.DataFrame(**res)
## cntカラムの最初のレコード(元テーブルのレコード数)とcntカラムの2番めのレコード(新テーブルのレコード数)を比較
if df.cnt[0] != df.cnt[1]:
raise Exception("New table row count not equal old table.")
# column数テスト
## それぞれのテーブルのcolumnを取得
client = client = pytd.Client(apikey=API_KEY, endpoint='https://api.treasuredata.co.jp/', database=database_name, default_engine='presto')
res_1 = client.query(f"""show columns from "{table_name}" """)
old_df = pd.DataFrame(**res_1)
res_2 = client.query(f"""show columns from "{replace_table_name}" """)
new_df = pd.DataFrame(**res_2)
## column数を比較して新テーブルの方がカラムが少なかったらエラー
if len(old_df.Column) > len(new_df.Column):
raise Exception("New table column num is less than old table.")
# 旧テーブルを破棄 or 名前を変更
replaced_table_name = table_name + "_replaced"
if target =='dev':
## devは何かあった時のためにテーブルを残しておく
query = f"""drop table if exists "{replaced_table_name}"; alter table "{table_name}" rename to "{replaced_table_name}"; alter table "{replace_table_name}" rename to "{table_name}" """
else:
query = f"""drop table if exists "{table_name}"; alter table "{replace_table_name}" rename to "{table_name}" """
command = f"""td -k {API_KEY} -e https://api.treasuredata.co.jp query -T presto -w -d {database_name} '{query}' """
subprocess.run(command, shell=True)
# tmp_sql_fileの破棄とappend_diff_column_edit_queriesのsqlファイルを移動(2回以上実行しないように)
## 本番環境では一回実行にしたいためファイルを移動
if target == 'prod':
if os.path.isfile(os.path.join(sql_folder_path, compiled_file_name)):
shutil.move(os.path.join(sql_folder_path, compiled_file_name), os.path.join(removed_folder_path, compiled_file_name))
if __name__ == "__main__":
main()
使い方としては、_dbt/analyses/append_diff_column_edit_queries
に変更したいクエリ名のsqlを作成して変更したいロジックを記述してappend_diff_column_edit.py実行する形になります。
例えば、timeカラムが設定されていないtable_aに新しくtimeカラムを設定する場合は下記のようなsqlを用意します。
_dbt/analyses/append_diff_column_edit_queries/table_a.sql
select *, td_time_parse(timestamp_column, 'JST') as time --timestamp_columnはtimestamp形式のデータ
from {{ ref("table_a") }}
これを見てお気づきかと思いますが、append_diff_column_edit.pyはテーブルに存在するカラムを使う前提なので、そうでない場合は後述の新規作成用のappend_diff_refresh.pyを使う形になります。
ポイントとしてはdbt compile
が適応されるanalysesの中に変更用のsqlファイルを格納することで、開発環境も本番環境にも対応できるようにしたことと、複数ファイルを一括で処理できるようにしているところになります(特にtimeカラムの設定は複数テーブルを一括で処理したいケースが多いと思われるため)。
本番環境(prod)で実行後はanalysesにあるファイルをold(使わなくなったファイルの格納場所)に移動して一度しか実行されないようにしています(開発環境では移動に伴うミスの方が怖いので移動させていません)。
蛇足ですが、dbtのanalyses(詳細は下記)はdbt-power-userの拡張機能でcompile結果をサクッと見ることができる環境だと使い所がいまいちよくわかっておらず、この処理のためにしか使っていません。
実運用に関してですが、このpythonはgithub actionsでプルリクがマージされた時に実行される運用にしています。
.github/workflows/pull_request_merged.yml
name: workflow when pull request merged
on:
push:
branches:
- master
defaults:
run:
working-directory: _dbt
env:
TREASURE_DATA_API_KEY: ${{ secrets.TREASURE_DATA_API_KEY }}
target: prod
jobs:
append_diff_column_edit:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Install poetry and td
run: |
pip install poetry
poetry install
curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-focal-td-agent4.sh | sh
poetry run dbt deps
- name: Debug dbt
run: poetry run dbt debug --profile _dbt
- name: run python
run: |
poetry run python append_diff_column_edit.py prod
- uses: stefanzweifel/git-auto-commit-action@v5
※本当はこのworkflowの後続処理でdbt docsをs3に上げている処理もしているが割愛
ポイントとしては実行後analysesにあるファイルを移動するので、write権限を与えてcomitさせる必要があるところです(本質じゃないけどやらないといけない系が一行で実装できるのがgithub actionsの便利なところ)。
差分更新のテーブルを新規作成 or 全更新する
_dbt/scripts/append_diff_refresh.py
import os
import sys
import glob
import subprocess
import pandas as pd
import datetime
import numpy as np
from concurrent.futures import ThreadPoolExecutor
def main():
arr = sys.argv
# 変数定義
target = arr[1]
model_name = arr[2]
start_date = arr[3]
period = int(arr[4])
threads = int(arr[5])
API_KEY = os.environ["TREASURE_DATA_API_KEY"]
# append_diff_startとappend_diff_endのvarを使っているテーブルを指定
append_diff_start_end_tables = []
# カレントディレクトリを_dbtに設定
os.chdir(os.path.dirname(__file__))
os.chdir('..')
model_file_path = glob.glob('models/**/' + model_name + '.sql', recursive=True)[0]
compiled_file_path = 'target/compiled/_dbt/' + model_file_path
database_name = model_file_path.split("/")[-3] if target == 'prod' else 'test_' + model_file_path.split('/')[-3]
refresh_model_name = model_name + '_refresh'
# 期間からforで回す変数を生成
delta = (datetime.datetime.now() - pd.to_datetime(start_date)).days-1 # start_dateから実行日の前日までの日数
starts = np.arange(0,delta,period) # 各実行クエリの期間のstartを算出
starts_last = starts[0] # 0は最新だから最後に別で処理
starts_first = starts[-1] # 最後は一番古いから最初に処理
starts = np.delete(starts,[0,-1])[::-1] # 他は同じロジックで処理。逆順にしておく
# 初回実行用のdbt comopileをする(first_fun_flag=true)
# delete target_file
if os.path.isfile(compiled_file_path):
os.remove(compiled_file_path)
# dbt compileコマンドを実行(変数をわかりやすい形にして後で差し替えれるようにする)
if model_name not in append_diff_start_end_tables:
command = f"""dbt compile -s {model_name} --target={target} """ + "--vars '{append_diff_range: __r__, append_diff_existing_table_prefix: __etp__, append_diff_existing_table_suffix: __ets__, append_diff_first_run_flag: true}'"
else:
command = f"""dbt compile -s {model_name} --target={target} """ + "--vars '{append_diff_start: __s__, append_diff_end: __e__}'"
subprocess.run(command, shell=True)
# 生成されたcompile後のクエリを変数として保存
with open(compiled_file_path) as f:
compiled_query_first = f.read()
# 増分実行用のdbt comopileをする
# delete target_file
if os.path.isfile(compiled_file_path):
os.remove(compiled_file_path)
# dbt compileコマンドを実行(変数をわかりやすい形にして後で差し替えれるようにする)
if model_name not in append_diff_start_end_tables:
command = f"""dbt compile -s {model_name} --target={target} """ + "--vars '{append_diff_range: __r__, append_diff_existing_table_prefix: __etp__, append_diff_existing_table_suffix: __ets__}'"
else:
command = f"""dbt compile -s {model_name} --target={target} """ + "--vars '{append_diff_start: __s__, append_diff_end: __e__}'"
subprocess.run(command, shell=True)
# 生成されたcompile後のクエリを変数として保存
with open(compiled_file_path) as f:
compiled_query = f.read()
# 自己参照している時にthreadsが1以外だとエラーを吐く
if (threads != 1) and (model_name in compiled_query):
raise Exception("自己参照があるモデルはthreads = 1 を指定してください")
# 最初の処理
temp_file_path = 'temp.sql'
main_query = compiled_query_first.replace('__r__',str(-1*(delta - starts_first + 1))+'d'+'/'+str(-1*starts_first)+'d').replace('__etp__','').replace('__ets__','_refresh') if model_name not in append_diff_start_end_tables else compiled_query_first.replace('__s__',str(-1*(delta+1))).replace('__e__',str(-1*starts_first))
with open(temp_file_path,"w") as o:
# 最初はテーブル生成
print(f'''drop table if exists "{refresh_model_name}"; create table "{refresh_model_name}" as ''' + main_query, file = o)
command = f"""td -k {API_KEY} -e https://api.treasuredata.co.jp query -T presto -w -d {database_name} -q {temp_file_path}"""
subprocess.run(command, shell=True)
# delete temp_file
if os.path.isfile(temp_file_path):
os.remove(temp_file_path)
# マルチスレッドでクエリを回す
with ThreadPoolExecutor() as thread:
futures = []
for i in range(threads):
# 後述のfunctionを実行
futures.append(thread.submit(function, *(compiled_query, starts[i::threads], period, f'''temp_{i}.sql''', refresh_model_name + '_' + str(i), model_name, refresh_model_name, API_KEY, database_name, append_diff_start_end_tables)))
# 最後の処理
temp_table_name = refresh_model_name + '_0'
# 最後は計算対象期間の調整が必要(start_dateを超えて計算しないように)
main_query = compiled_query.replace('__r__',str(-1*period)+'d'+'/'+str(-1*starts_last)+'d').replace('__etp__','').replace('__ets__','_refresh') if model_name not in append_diff_start_end_tables else compiled_query.replace('__s__',str(-1*(starts_last+period-1))).replace('__e__',str(-1*starts_last))
# temp_table_nameをcreateしてからrefresh_model_nameにinsertする
query = f'''
drop table if exists "{temp_table_name}";
create table "{temp_table_name}" as
''' \
+ main_query \
+ f'''
;
insert into "{refresh_model_name}" select * from "{temp_table_name}";
drop table if exists "{temp_table_name}"
'''
with open(temp_file_path,"w") as o:
print(query, file = o)
## fileを元にinsertする
command = f"""td -k {API_KEY} -e https://api.treasuredata.co.jp query -T presto -w -d {database_name} -q {temp_file_path}"""
subprocess.run(command, shell=True)
# delete temp_file
if os.path.isfile(temp_file_path):
os.remove(temp_file_path)
replaced_model_name = model_name + '_replaced'
# テーブルをrename
if target =='dev':
# devは何かあった時のためにテーブルを残しておく
query = f"""drop table if exists "{replaced_model_name}"; alter table if exists "{model_name}" rename to "{replaced_model_name}"; alter table "{refresh_model_name}" rename to "{model_name}" """
else:
query = f"""drop table if exists "{model_name}"; alter table "{refresh_model_name}" rename to "{model_name}" """
command = f"""td -k {API_KEY} -e https://api.treasuredata.co.jp query -T presto -w -d {database_name} '{query}' """
subprocess.run(command, shell=True)
# subprocessを関数化
def function(compiled_query, starts, period, temp_file_path, temp_table_name, model_name, refresh_model_name, API_KEY, database_name, append_diff_start_end_tables):
for start in starts:
main_query = compiled_query.replace('__r__',str(-1*(period))+'d'+'/'+str(-1*start)+'d').replace('__etp__','').replace('__ets__','_refresh') if model_name not in append_diff_start_end_tables else compiled_query.replace('__s__',str(-1*(start+period-1))).replace('__e__',str(-1*start))
# temp_table_nameをcreateしてからrefresh_model_nameにinsertする
query = f''' \
drop table if exists "{temp_table_name}";
create table "{temp_table_name}" as
''' \
+ main_query \
+ f'''
;
insert into "{refresh_model_name}" select * from "{temp_table_name}";
drop table if exists "{temp_table_name}"
'''
with open(temp_file_path,"w") as o:
print(query, file = o)
command = f"""td -k {API_KEY} -e https://api.treasuredata.co.jp query -T presto -w -d {database_name} -q {temp_file_path}"""
subprocess.run(command, shell=True)
if model_name not in append_diff_start_end_tables:
print('finish:' + str(-1*(period))+'d'+'/'+str(-1*start)+'d')
else:
print('finish:' + str(-1*(start+period-1)) + 'd to ' + str(-1*start) + 'd')
if os.path.isfile(temp_file_path):
os.remove(temp_file_path)
if __name__ == "__main__":
main()
append_diff_column_edit.pyよりはだいぶわかりにくくなってるので丁寧めに説明したいと思います。
まず基本的な思想としては、メモリリミットしないように期間を分割した上でそれぞれクエリ実行して一つのテーブルにinsertすることで全期間処理するという内容になります。
入力としてdbtで使うのと同じsqlファイルを利用することにしていますが、一つのsqlファイルを使って異なる期間の計算をするための方法としてdbtのvarで初期値あり変数を使う形にしています(varについては下記参照)。
今の所下記2つの軸についてそれぞれ2パターン対応できるように変数を用意しています。
- 期間指定の方法がtd_intervalの記法か or 開始日と終了日か
- sql内で自己参照する or 自己参照しない
実際の割合で言うと、期間をtd_intervalで指定する & sql内で自己参照しないパターンがほとんどになっています。
まず一つ目の期間指定の方法として用意している変数が、append_diff_range
とappend_diff_start
とappend_diff_end
になります。
具体例で言うと下記のような使い方です。
select *
from hogehoge
where td_interval(time, '{{ var("append_diff_range","-7d/0d") }}', 'JST')
こちらはシンプルで前回の記事でも説明したtd_intervalでtime partitionを効かせる形に今回のrefresh用の変数を組み込んでいる形になります。
with
target_dates as (
select target_date
from
unnest(
sequence(
date(
td_time_format(
td_time_add(
td_scheduled_time(),
concat('{{ var("append_diff_start","-7") }}', 'd')
),
'yyyy-MM-dd',
'JST'
)
),
date(
td_time_format(
td_time_add(
td_scheduled_time(),
concat('{{ var("append_diff_end","-0") }}', 'd')
),
'yyyy-MM-dd',
'JST'
)
),
interval '1' day
)
) t(target_date)
),
...
こちらは日付範囲ではなく期間の始まりと終わりをそれぞれ変数で指定しています。
主に例にあるような日付マスターを生成する時に使うことが多いです。
蛇足ですが、はじめappend_diff_range
は使わないでappend_diff_start
とappend_diff_end
を引き算したりconcatしたりでtd_interval用の文字列を生成してみたのですが、このやり方ではtime partitionが適応されなかったため大人しく変数を用意しています。
次にsqlで自己参照するか否かに関してですが、自己参照しない場合は追加変数がないため上記の変数だけで完結されます。
一方で自己参照をする場合は変数が追加で3つ増えます(実装を工夫すれば2つですむのですがやっていない)。
まずそもそも自己参照について説明すると、例えば「これまでの自分のテーブルにユニークキーが存在していないレコードは追加する」のような実装を想定しています。
ここから具体sqlを使って説明すると下記のようになるかと思います。
first_purchase.sql
delete from first_purchase WHERE td_interval(time,'-7d/0d', 'JST');
insert into first_purchase(
with recent as(
select user_id, min(purchase_datetime) as first_purchase_datetime, min(time) as time
from purchase_log
where td_interval(time, '-7d/0d', 'JST')
group by 1
)
select user_id, first_purchase_datetime, time
from recent
left join (
select user_id, 1 as exist_flag
from first_purchase
)using(user_id)
where exist_flag is null
)
この例は直近7日分のpurchase_logの中で初めに購入したユーザーだけ(既存のfirst_purchaseにレコードが存在しない)がinsertされる処理になります。
クエリの中でfirst_purchaseを呼び出すような実装なため自己参照と呼んでいます。
このような自己参照型のクエリを変数を使った記載に直すと以下のようになります。
{{ config(pre_hook=['{{ pre_hook_append_diff_in_sql("-7d/0d") }}']) }}
with recent as(
select user_id, min(purchase_datetime) as first_purchase_datetime, min(time) as time
from purchase_log
where td_interval(time, '{{ var("append_diff_range","-7d/0d") }}', 'JST')
group by 1
)
select user_id, first_purchase_datetime, time
from recent
{% if var("append_diff_first_run_flag", false) == false %}
left join (
select user_id, 1 as exist_flag
from "{{ this.schema }}"."{{ var('append_diff_existing_table_prefix', 'tmp__') }}{{ this.table }}{{ var( 'append_diff_existing_table_suffix', '') }}"
)using(user_id)
where exist_flag is null
{% endif %}
ご覧いただくとappend_diff_first_run_flag
とappend_diff_existing_table_prefix
、append_diff_existing_table_suffix
の3つの変数が追加されていることがわかるかと思います。
まず、append_diff_first_run_flag
は初回実行かどうかを識別するフラグになっていて、初回実行の場合は既存テーブルが存在しないことから、left joinを行わないようにするために利用します。具体的には記載の通りですが、初期値としてfalseを設定した上でif文の条件分岐の範囲に初回実行時で実行できないクエリを入れています。
append_diff_existing_table_prefix
、append_diff_existing_table_suffix
に関しては自己参照する際にテーブル名が変化することに対応するための変数になります。
まず基本的なdbt build(run)
実行時には、クエリに記載の通りpre_hook_append_diff_in_sql
がprehookとして実行されるわけですが、(前回記事でも説明済みですが)下記の通り処理過程で依存データから直近レコードを除いたテーブルが一時的にtmp__{{ this.table }}
に保存されます。
{% macro pre_hook_append_diff_in_sql(append_diff_range) -%}
DELETE FROM {{ this }} WHERE td_interval(time,'{{ append_diff_range }}', 'JST');
CREATE TABLE IF NOT EXISTS "{{ this.schema }}".tmp__{{ this.table }} AS SELECT * FROM {{ this }};
DROP TABLE IF EXISTS {{ this }};
DROP TABLE IF EXISTS "{{ this.schema }}".{{ this.table }}__dbt_tmp;
{%- endmacro %}
また、後述しますがappend_diff_refresh.py実行時には、{{ this.table }}_refresh
というテーブルにinsertしていく実装になっています。
上記を踏まえて、dbtコマンド時にはtmp__{{ this.table }}
、append_diff_refresh.py実行時には{{ this.table }}_refresh
とできるように変数を利用して{{ var('append_diff_existing_table_prefix', 'tmp__') }}{{ this.table }}{{ var( 'append_diff_existing_table_suffix', '') }}
としております。
以上で初期値あり変数を利用することで色々なパターンに関して通常実行だけでくappend_diff_refresh.pyを実行する準備ができることを説明しました。
pythonの具体処理に関しては例のように変数を使って記載したsqlを
# dbt compileコマンドを実行(変数をわかりやすい形にして後で差し替えれるようにする)
if model_name not in append_diff_start_end_tables:
command = f"""dbt compile -s {model_name} --target={target} """ + "--vars '{append_diff_range: __r__, append_diff_existing_table_prefix: __etp__, append_diff_existing_table_suffix: __ets__, append_diff_first_run_flag: true}'"
else:
command = f"""dbt compile -s {model_name} --target={target} """ + "--vars '{append_diff_start: __s__, append_diff_end: __e__}'"
のような形で、後からfor文内で書き換えやすい形の変数にコンパイルしてから
main_query = compiled_query_first.replace('__r__',str(-1*(delta - starts_first + 1))+'d'+'/'+str(-1*starts_first)+'d').replace('__etp__','').replace('__ets__','_refresh') if model_name not in append_diff_start_end_tables else compiled_query_first.replace('__s__',str(-1*(delta+1))).replace('__e__',str(-1*starts_first))
のような形で、日付を書き換えたりクエリの前後の処理を記述して、tdコマンドでTreasureDataにクエリとして発行する形です。
なお、上記に関してマルチスレッドで実行できるようにしておりますが、マルチスレッドの場合実行順番が保証されないため、自己参照モデルに関してはマルチスレッドで実行しないように制御を入れております。
長くなりましたが、以上でappend_diff_refresh.pyに関する説明を終わりたいと思います。
続きまして、このappend_diff_refresh.pyは引数を受け取る形での実行になるのでそれをgithub actionsでどう実装しているかについて説明したいと思います。
.github/workflows/append_diff_refresh_manual_run.yml
name: append_diff_refresh_manual_run
on:
workflow_dispatch:
inputs:
model_name:
description: 'テーブル名 ex. first_purchase'
required: true
type: string
start_date:
description: 'ここで指定した日付以降のデータが格納される ex. 2024-01-01'
required: true
type: string
period:
description: '一回のクエリで更新する日付の範囲 重いクエリは小さくした方が良い ex. 10'
required: true
type: string
threads:
description: '同時に何スレッドで更新クエリを実行するか(実行順番を保証したい場合は1を指定) ex. 5'
required: true
type: string
env:
TREASURE_DATA_API_KEY: ${{ secrets.TREASURE_DATA_API_KEY }}
target: prod
defaults:
run:
working-directory: _dbt
jobs:
append_diff_refresh:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Install poetry and td
run: |
pip install poetry
poetry install
curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-focal-td-agent4.sh | sh
poetry run dbt deps
- name: Debug dbt
run: poetry run dbt debug --profile _dbt
- name: run python
run: |
poetry run python append_diff_refresh.py prod ${{ github.event.inputs.model_name }} ${{ github.event.inputs.start_date }} ${{ github.event.inputs.period }} ${{ github.event.inputs.threads }}
ポイントは
on:
workflow_dispatch:
inputs:
model_name:
description: 'テーブル名 ex. first_purchase'
required: true
type: string
start_date:
description: 'ここで指定した日付以降のデータが格納される ex. 2024-01-01'
required: true
type: string
period:
description: '一回のクエリで更新する日付の範囲 重いクエリは小さくした方が良い ex. 10'
required: true
type: string
threads:
description: '同時に何スレッドで更新クエリを実行するか(実行順番を保証したい場合は1を指定) ex. 5'
required: true
type: string
一番最初の部分でこの指定をすると下記のようにgithub actionsの手動実行時に変数を指定できるようになり、その変数を引数としてappend_diff_refresh.pyが実行される運用にしています。
また、vscodeのローカル環境でもgithub actionsと似たuxで挙動確認できるように、下記のような設定をlaunch.jsonに記載しています。
.vscode/launch.json
{
"version": "0.2.0",
"configurations": [
{
"name": "config_for_append_diff_refresh",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"args": ["dev", "${input:input_model_name}", "${input:input_start_date}", "${input:input_period}", "${input:input_threads}"],
}
],
"inputs": [
{
"id": "input_model_name",
"type": "promptString",
"description": "テーブル名 ex. first_purchase"
},
{
"id": "input_start_date",
"type": "promptString",
"description": "ここで指定した日付以降のデータが格納される ex. 2024-01-01"
},
{
"id": "input_period",
"type": "promptString",
"description": "一回のクエリで更新する日付の範囲 重いクエリは小さくした方が良い ex. 10"
},
{
"id": "input_threads",
"type": "promptString",
"description": "同時に何スレッドで更新クエリを実行するか(実行順番を保証したい場合は1を指定) ex. 5",
},
]
}
上記を記載した上でappend_diff_refresh.pyをデバッグ実行すると
変数の入力受付状態になり、入力後その変数を元にpythonが実行されるため、github actionsと比較的近い状態で実行を再現できます(ローカル実行時はもちろんdev環境に対して実行されます)。
まとめ
今回はだいぶ文字が多めになってしまった & pythonのエラーハンドリングとか最低限しかやってなくてだいぶお見苦しいかもしれないですが、ここまでやると実運用に必要なパーツは揃ったのではないかなと思います。
次回はここまで準備してきた諸々を使って、実際にtdのworkflowをdbtに移行するにあたってどんなプロセスで作業したかと、チップス的なところを共有できればと思います。