Links
GitHub issueにも上がっている
公式Docにも明示されている。
https://docs.getdbt.com/docs/build/incremental-models
根本的な問題: dbtの問題というよりは、BQがnested column追加のDDLに対応していない (ref)
つまり、dbtでの解決は難しく、bq commandやbq APIを使用する必要がある
解決法
手動
手動でやるなら bq コマンドでschema変更を行う
-
現在のschema取得
bq show \ --schema \ --format=prettyjson \ mydataset.mytable > /tmp/myschema.json
-
schema temp file
/tmp/myschema.json
に新しいカラムの追加 -
schemaの更新
bq update mydataset.mytable /tmp/myschema.json
スクリプト
import json
import argparse
from google.auth import impersonated_credentials, default
from google.cloud import bigquery
def diff_schemas(old_schema, new_schema, prefix=""):
diffs = []
old_fields = {field["name"]: field for field in old_schema}
new_fields = {field["name"]: field for field in new_schema}
for name, old_field in old_fields.items():
full_field = f"{prefix}{name}"
if name not in new_fields:
diffs.append(f"Field removed: {full_field}")
else:
new_field = new_fields[name]
if old_field.get("type") != new_field.get("type"):
diffs.append(f"Type changed for field {full_field}: {old_field.get('type')} -> {new_field.get('type')}")
if old_field.get("mode") != new_field.get("mode"):
diffs.append(f"Mode changed for field {full_field}: {old_field.get('mode')} -> {new_field.get('mode')}")
if old_field.get("type") == "RECORD":
old_nested = old_field.get("fields", [])
new_nested = new_field.get("fields", [])
diffs.extend(diff_schemas(old_nested, new_nested, prefix=full_field + "."))
for name, new_field in new_fields.items():
full_field = f"{prefix}{name}"
if name not in old_fields:
diffs.append(f"Field added: {full_field}")
return diffs
def get_bigquery_client(project, impersonate_service_account=None):
"""Get a BigQuery client, optionally using impersonated credentials.
Args:
project (str): GCP project ID.
impersonate_service_account (str, optional): Service account email to impersonate.
Returns:
bigquery.Client: A BigQuery client.
"""
if impersonate_service_account:
# Use the default credentials to create impersonated credentials.
source_credentials, _ = default()
target_credentials = impersonated_credentials.Credentials(
source_credentials=source_credentials,
target_principal=impersonate_service_account,
target_scopes=["https://www.googleapis.com/auth/cloud-platform"],
lifetime=3600,
)
return bigquery.Client(project=project, credentials=target_credentials)
else:
return bigquery.Client(project=project)
def load_current_schema(
project: str, dataset_id: str, table_id: str, schema_json: str, impersonate_service_account: str = ""
):
"""Load the current schema of a BigQuery table and save it to a JSON file.
Args:
project (str): GCP project ID.
dataset_id (str): BigQuery dataset ID.
table_id (str): BigQuery table ID.
schema_json (str): Path to the JSON file where the schema will be saved.
"""
# Initialize BigQuery client.
client = get_bigquery_client(project, impersonate_service_account)
# Get the table.
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
# Convert current schema (list of SchemaField objects) into a list of dictionaries.
current_schema = [field.to_api_repr() for field in table.schema]
with open(schema_json, "w") as f:
json.dump(current_schema, f, indent=2)
def update_table_schema(
project: str,
dataset_id: str,
table_id: str,
schema_json: str,
dryrun: bool = True,
impersonate_service_account: str = "",
):
"""Update table schema with the specified JSON file.
Args:
project (str): GCP project ID.
dataset_id (str): BigQuery dataset ID.
table_id (str): BigQuery table ID.
schema_json (str): Path to the JSON file with the schema definition.
dryrun (bool): Dry run (do not update the schema).
Returns:
None
"""
# Initialize BigQuery client.
client = get_bigquery_client(project, impersonate_service_account)
# Get the table.
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
# Convert current schema (list of SchemaField objects) into a list of dictionaries.
current_schema = [field.to_api_repr() for field in table.schema]
# Load the complete schema from the JSON file.
with open(schema_json, "r") as f:
new_schema_dict = json.load(f)
# Convert new schema dictionaries to SchemaField objects.
new_schema = [bigquery.SchemaField.from_api_repr(field) for field in new_schema_dict]
added_fields = diff_schemas(current_schema, new_schema_dict)
if not added_fields:
print(f"Schema is up-to-date for table {project}.{dataset_id}.{table_id}")
return
if dryrun:
# Identify new fields that are not in the current schema.
print(f"Fields to be added: {added_fields}")
else:
# Assign the new schema to the table and update it.
table.schema = new_schema
client.update_table(table, ["schema"])
print(f"Successfully updated schema for table {dataset_id}.{table_id}")
return
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Update a BigQuery table schema by adding nested fields using a patch JSON file."
)
parser.add_argument("--impersonate_service_account", help="Service account email to impersonate")
parser.add_argument("--project", required=True, help="GCP project ID")
parser.add_argument("--dataset", required=True, help="BigQuery dataset ID")
parser.add_argument("--table", required=True, help="BigQuery table ID")
parser.add_argument(
"--action", required=True, choices=["update", "load"], default="load", help="Action to perform (default: load)"
)
parser.add_argument("--schema_json", required=True, help="Path to the JSON file with the schema definition")
parser.add_argument(
"--no-dryrun", action="store_false", dest="dryrun", help="Perform actual update (default is dry run)"
)
args = parser.parse_args()
if args.action == "load":
load_current_schema(args.project, args.dataset, args.table, args.schema_json, args.impersonate_service_account)
elif args.action == "update":
update_table_schema(
args.project, args.dataset, args.table, args.schema_json, args.dryrun, args.impersonate_service_account
)
else:
raise ValueError("Invalid action. Use 'update' or 'load'.")
usage: manage_schema.py [-h] [--impersonate_service_account IMPERSONATE_SERVICE_ACCOUNT] --project PROJECT --dataset DATASET --table TABLE --action
{update,load} --schema_json SCHEMA_JSON [--no-dryrun]
Update a BigQuery table schema by adding nested fields using a patch JSON file.
options:
-h, --help show this help message and exit
--impersonate_service_account IMPERSONATE_SERVICE_ACCOUNT
Service account email to impersonate
--project PROJECT GCP project ID
--dataset DATASET BigQuery dataset ID
--table TABLE BigQuery table ID
--action {update,load}
Action to perform (default: load)
--schema_json SCHEMA_JSON
Path to the JSON file with the schema definition
--no-dryrun Perform actual update (default is dry run)
- 現在のスキーマファイルを取得
python manage_schema.py --project <project> --dataset <dataset> --table <table> --action load --schema_json tmp/schema.json
- スキーマファイルの更新
- 変更を確認 (dryrun (default))
python manage_schema.py --project <project> --dataset <dataset> --table <table> --action update --schema_json tmp/schema.json
- Schema変更を適用
python manage_schema.py --project <project> --dataset <dataset> --table <table> --action update --schema_json tmp/schema.json --no-dryrun
dbt-python
どうしてもdbt内で実行したい場合は、dbt-pythonでモデルを書くことで、incremental table且つスキーマ変更があれば更新するというモデルを作成することが可能
実行方法
手動
bqコマンドや上のScriptを使った手動のオペレーション
Airflow
dbtをAirflowなどで実行している場合AirflowのDAGに入れることができる
GitHub Actions
実際の運用を考えるとGitHub Actionsで実行できるようにしておけると楽