1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[備忘録]dbtで愚直な新旧比較をするためのスクリプト

Last updated at Posted at 2024-07-09

これはなに

dbtのバージョンアップを行なった際に、バージョンアップの前後でビルド結果が全く同じなんだっけ? という確認を行うための方法を記載します。
PR間で結果の差分をチェックするような便利なものもあるようですが、今回はとにかく愚直な方法でやります。
データセットを変えてビルドを行い。データセット間でテーブルの差分がないかをチェックします。
チェックでは、行数の一致チェックと、データの内容の一致チェックを行います。

流れ

  1. dbtのバージョンアップを行った後と行う前の二つの状態のテーブルを作成する
  2. ローカルのリポジトリをコピーし、一方はバージョンアップする
  3. 片方のprofile.ymlを編集しdatasetの文字列を変更する
    (弊環境では、datasetの文字列を元にビルド先のデータセット名が決定されるので、この文字列を変更することでビルド先を変更しています。)
  4. チェック用のスクリプトを実行する

スクリプト

適宜書き換えてください。参考コードです。
ChatGPT先生に書いてもらってます。

import os
import json
from google.cloud import bigquery
import warnings

# 警告を無視する設定
warnings.filterwarnings('ignore', category=UserWarning, module='google.cloud.bigquery.table')


# チェックしたいモデル名のリスト
# manifest.jsonにあるモデル名で指定します。
model_names = """
model.hoge_project.hoge_model
model.hoge_project.fuga_model
"""

# BigQueryクライアントの初期化
client = bigquery.Client()

# manifest.jsonファイルのパス
manifest_path = 'target/manifest.json'

def load_manifest(manifest_path):
    with open(manifest_path, 'r') as file:
        manifest = json.load(file)
    return manifest

def build_table_names_from_manifest(manifest, model_name):
    node_key = model_name
    if node_key in manifest['nodes']:
        relation_name = manifest['nodes'][node_key]['relation_name']
        table1 = relation_name
        # 差分比較するため、一方のデータセット名を置換によって変更します。 環境に応じて書き換えてください。
        table2 = table1.replace('dbt_sandbox', 'dbt_sandbox_tmp')
        return table1, table2
    else:
        raise KeyError(f"{node_key} not found in manifest.")

def compare_tables(table1, table2):
    print(f"Comparing {table1} and {table2}...")

    # 行数の取得
    query1 = f"SELECT COUNT(*) as count FROM {table1}"
    query2 = f"SELECT COUNT(*) as count FROM {table2}"
    count1 = client.query(query1).result().to_dataframe().iloc[0]['count']
    count2 = client.query(query2).result().to_dataframe().iloc[0]['count']

    if count1 != count2:
        print(f"Row count mismatch: {count1} (table1) vs {count2} (table2)")
    else:
        print(f"Row count matches: {count1} rows")

    # 内容の比較
    query_compare = f"""
    SELECT * FROM (
        SELECT * FROM {table1}
        EXCEPT DISTINCT
        SELECT * FROM {table2}
    )
    UNION ALL
    SELECT * FROM (
        SELECT * FROM {table2}
        EXCEPT DISTINCT
        SELECT * FROM {table1}
    )
    """
    differences = client.query(query_compare).result().to_dataframe()

    if not differences.empty:
        print(f"Differences found:\n{differences}")
    else:
        print("No differences found")

def main():
    model_list = [model.strip() for model in model_names.strip().split('\n')]
    total_models = len(model_list)
    manifest = load_manifest(manifest_path)

    for idx, model in enumerate(model_list, 1):
        try:
            table1, table2 = build_table_names_from_manifest(manifest, model)
            compare_tables(table1, table2)
        except KeyError as e:
            print(e)
        print(f"Processed {idx} / {total_models} tables")

if __name__ == "__main__":
    main()
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?