0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

dbt incremental update with on_schema_change = append_new_columnでも nested columnは追加できない

Last updated at Posted at 2025-02-05

Links

GitHub issueにも上がっている

公式Docにも明示されている。
https://docs.getdbt.com/docs/build/incremental-models

Screenshot 2025-02-05 at 19.47.45.png

根本的な問題: dbtの問題というよりは、BQがnested column追加のDDLに対応していない (ref)

Screenshot 2025-02-06 at 6.51.36.png

つまり、dbtでの解決は難しく、bq commandやbq APIを使用する必要がある

解決法

手動

手動でやるなら bq コマンドでschema変更を行う

  1. 現在のschema取得

    bq show \
    --schema \
    --format=prettyjson \
    mydataset.mytable > /tmp/myschema.json
    
  2. schema temp file /tmp/myschema.jsonに新しいカラムの追加

  3. schemaの更新

    bq update mydataset.mytable /tmp/myschema.json
    

スクリプト

import json
import argparse
from google.auth import impersonated_credentials, default
from google.cloud import bigquery


def diff_schemas(old_schema, new_schema, prefix=""):
    diffs = []

    old_fields = {field["name"]: field for field in old_schema}
    new_fields = {field["name"]: field for field in new_schema}

    for name, old_field in old_fields.items():
        full_field = f"{prefix}{name}"
        if name not in new_fields:
            diffs.append(f"Field removed: {full_field}")
        else:
            new_field = new_fields[name]

            if old_field.get("type") != new_field.get("type"):
                diffs.append(f"Type changed for field {full_field}: {old_field.get('type')} -> {new_field.get('type')}")

            if old_field.get("mode") != new_field.get("mode"):
                diffs.append(f"Mode changed for field {full_field}: {old_field.get('mode')} -> {new_field.get('mode')}")

            if old_field.get("type") == "RECORD":
                old_nested = old_field.get("fields", [])
                new_nested = new_field.get("fields", [])
                diffs.extend(diff_schemas(old_nested, new_nested, prefix=full_field + "."))

    for name, new_field in new_fields.items():
        full_field = f"{prefix}{name}"
        if name not in old_fields:
            diffs.append(f"Field added: {full_field}")

    return diffs


def get_bigquery_client(project, impersonate_service_account=None):
    """Get a BigQuery client, optionally using impersonated credentials.

    Args:
        project (str): GCP project ID.
        impersonate_service_account (str, optional): Service account email to impersonate.

    Returns:
        bigquery.Client: A BigQuery client.
    """
    if impersonate_service_account:
        # Use the default credentials to create impersonated credentials.
        source_credentials, _ = default()
        target_credentials = impersonated_credentials.Credentials(
            source_credentials=source_credentials,
            target_principal=impersonate_service_account,
            target_scopes=["https://www.googleapis.com/auth/cloud-platform"],
            lifetime=3600,
        )
        return bigquery.Client(project=project, credentials=target_credentials)
    else:
        return bigquery.Client(project=project)


def load_current_schema(
    project: str, dataset_id: str, table_id: str, schema_json: str, impersonate_service_account: str = ""
):
    """Load the current schema of a BigQuery table and save it to a JSON file.

    Args:
        project (str): GCP project ID.
        dataset_id (str): BigQuery dataset ID.
        table_id (str): BigQuery table ID.
        schema_json (str): Path to the JSON file where the schema will be saved.
    """
    # Initialize BigQuery client.
    client = get_bigquery_client(project, impersonate_service_account)

    # Get the table.
    table_ref = client.dataset(dataset_id).table(table_id)
    table = client.get_table(table_ref)

    # Convert current schema (list of SchemaField objects) into a list of dictionaries.
    current_schema = [field.to_api_repr() for field in table.schema]
    with open(schema_json, "w") as f:
        json.dump(current_schema, f, indent=2)


def update_table_schema(
    project: str,
    dataset_id: str,
    table_id: str,
    schema_json: str,
    dryrun: bool = True,
    impersonate_service_account: str = "",
):
    """Update table schema with the specified JSON file.

    Args:
        project (str): GCP project ID.
        dataset_id (str): BigQuery dataset ID.
        table_id (str): BigQuery table ID.
        schema_json (str): Path to the JSON file with the schema definition.
        dryrun (bool): Dry run (do not update the schema).

    Returns:
        None
    """

    # Initialize BigQuery client.
    client = get_bigquery_client(project, impersonate_service_account)

    # Get the table.
    table_ref = client.dataset(dataset_id).table(table_id)
    table = client.get_table(table_ref)

    # Convert current schema (list of SchemaField objects) into a list of dictionaries.
    current_schema = [field.to_api_repr() for field in table.schema]

    # Load the complete schema from the JSON file.
    with open(schema_json, "r") as f:
        new_schema_dict = json.load(f)

    # Convert new schema dictionaries to SchemaField objects.
    new_schema = [bigquery.SchemaField.from_api_repr(field) for field in new_schema_dict]

    added_fields = diff_schemas(current_schema, new_schema_dict)

    if not added_fields:
        print(f"Schema is up-to-date for table {project}.{dataset_id}.{table_id}")
        return

    if dryrun:
        # Identify new fields that are not in the current schema.
        print(f"Fields to be added: {added_fields}")
    else:
        # Assign the new schema to the table and update it.
        table.schema = new_schema
        client.update_table(table, ["schema"])
        print(f"Successfully updated schema for table {dataset_id}.{table_id}")
        return


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Update a BigQuery table schema by adding nested fields using a patch JSON file."
    )
    parser.add_argument("--impersonate_service_account", help="Service account email to impersonate")
    parser.add_argument("--project", required=True, help="GCP project ID")
    parser.add_argument("--dataset", required=True, help="BigQuery dataset ID")
    parser.add_argument("--table", required=True, help="BigQuery table ID")
    parser.add_argument(
        "--action", required=True, choices=["update", "load"], default="load", help="Action to perform (default: load)"
    )
    parser.add_argument("--schema_json", required=True, help="Path to the JSON file with the schema definition")
    parser.add_argument(
        "--no-dryrun", action="store_false", dest="dryrun", help="Perform actual update (default is dry run)"
    )

    args = parser.parse_args()
    if args.action == "load":
        load_current_schema(args.project, args.dataset, args.table, args.schema_json, args.impersonate_service_account)
    elif args.action == "update":
        update_table_schema(
            args.project, args.dataset, args.table, args.schema_json, args.dryrun, args.impersonate_service_account
        )
    else:
        raise ValueError("Invalid action. Use 'update' or 'load'.")
usage: manage_schema.py [-h] [--impersonate_service_account IMPERSONATE_SERVICE_ACCOUNT] --project PROJECT --dataset DATASET --table TABLE --action
                        {update,load} --schema_json SCHEMA_JSON [--no-dryrun]

Update a BigQuery table schema by adding nested fields using a patch JSON file.

options:
  -h, --help            show this help message and exit
  --impersonate_service_account IMPERSONATE_SERVICE_ACCOUNT
                        Service account email to impersonate
  --project PROJECT     GCP project ID
  --dataset DATASET     BigQuery dataset ID
  --table TABLE         BigQuery table ID
  --action {update,load}
                        Action to perform (default: load)
  --schema_json SCHEMA_JSON
                        Path to the JSON file with the schema definition
  --no-dryrun           Perform actual update (default is dry run)
  1. 現在のスキーマファイルを取得
    python manage_schema.py --project <project> --dataset <dataset> --table <table> --action load --schema_json tmp/schema.json
    
  2. スキーマファイルの更新
  3. 変更を確認 (dryrun (default))
    python manage_schema.py --project <project> --dataset <dataset> --table <table> --action update --schema_json tmp/schema.json
    
  4. Schema変更を適用
    python manage_schema.py --project <project> --dataset <dataset> --table <table> --action update --schema_json tmp/schema.json --no-dryrun
    

dbt-python

どうしてもdbt内で実行したい場合は、dbt-pythonでモデルを書くことで、incremental table且つスキーマ変更があれば更新するというモデルを作成することが可能

実行方法

手動

bqコマンドや上のScriptを使った手動のオペレーション

Airflow

dbtをAirflowなどで実行している場合AirflowのDAGに入れることができる

GitHub Actions

実際の運用を考えるとGitHub Actionsで実行できるようにしておけると楽

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?