これはなに
dbtのバージョンアップを行なった際に、バージョンアップの前後でビルド結果が全く同じなんだっけ? という確認を行うための方法を記載します。
PR間で結果の差分をチェックするような便利なものもあるようですが、今回はとにかく愚直な方法でやります。
データセットを変えてビルドを行い。データセット間でテーブルの差分がないかをチェックします。
チェックでは、行数の一致チェックと、データの内容の一致チェックを行います。
流れ
- dbtのバージョンアップを行った後と行う前の二つの状態のテーブルを作成する
- ローカルのリポジトリをコピーし、一方はバージョンアップする
- 片方のprofile.ymlを編集しdatasetの文字列を変更する
(弊環境では、datasetの文字列を元にビルド先のデータセット名が決定されるので、この文字列を変更することでビルド先を変更しています。) - チェック用のスクリプトを実行する
スクリプト
適宜書き換えてください。参考コードです。
ChatGPT先生に書いてもらってます。
import os
import json
from google.cloud import bigquery
import warnings
# 警告を無視する設定
warnings.filterwarnings('ignore', category=UserWarning, module='google.cloud.bigquery.table')
# チェックしたいモデル名のリスト
# manifest.jsonにあるモデル名で指定します。
model_names = """
model.hoge_project.hoge_model
model.hoge_project.fuga_model
"""
# BigQueryクライアントの初期化
client = bigquery.Client()
# manifest.jsonファイルのパス
manifest_path = 'target/manifest.json'
def load_manifest(manifest_path):
with open(manifest_path, 'r') as file:
manifest = json.load(file)
return manifest
def build_table_names_from_manifest(manifest, model_name):
node_key = model_name
if node_key in manifest['nodes']:
relation_name = manifest['nodes'][node_key]['relation_name']
table1 = relation_name
# 差分比較するため、一方のデータセット名を置換によって変更します。 環境に応じて書き換えてください。
table2 = table1.replace('dbt_sandbox', 'dbt_sandbox_tmp')
return table1, table2
else:
raise KeyError(f"{node_key} not found in manifest.")
def compare_tables(table1, table2):
print(f"Comparing {table1} and {table2}...")
# 行数の取得
query1 = f"SELECT COUNT(*) as count FROM {table1}"
query2 = f"SELECT COUNT(*) as count FROM {table2}"
count1 = client.query(query1).result().to_dataframe().iloc[0]['count']
count2 = client.query(query2).result().to_dataframe().iloc[0]['count']
if count1 != count2:
print(f"Row count mismatch: {count1} (table1) vs {count2} (table2)")
else:
print(f"Row count matches: {count1} rows")
# 内容の比較
query_compare = f"""
SELECT * FROM (
SELECT * FROM {table1}
EXCEPT DISTINCT
SELECT * FROM {table2}
)
UNION ALL
SELECT * FROM (
SELECT * FROM {table2}
EXCEPT DISTINCT
SELECT * FROM {table1}
)
"""
differences = client.query(query_compare).result().to_dataframe()
if not differences.empty:
print(f"Differences found:\n{differences}")
else:
print("No differences found")
def main():
model_list = [model.strip() for model in model_names.strip().split('\n')]
total_models = len(model_list)
manifest = load_manifest(manifest_path)
for idx, model in enumerate(model_list, 1):
try:
table1, table2 = build_table_names_from_manifest(manifest, model)
compare_tables(table1, table2)
except KeyError as e:
print(e)
print(f"Processed {idx} / {total_models} tables")
if __name__ == "__main__":
main()