BigQueryにデータをリアルタイムにインサートする時のパターンとしてCloud Pub/Sub→Cloud Dataflow→BigQueryという定番があります。
この定番パターンを使った場合に、スキーマ変更に対してどのような挙動をするのかを確認してみました。
最初にこのような検証用環境を作ります。
カラムが2つだけのBigQueryテーブルと、そこにデータを流し込むためのDataflowジョブを用意します。
terraform {
required_version = "0.13.5"
required_providers {
google = "3.45.0"
google-beta = "3.45.0"
}
}
locals {
project = "プロジェクトID"
}
provider "google" {
project = local.project
}
resource "google_storage_bucket" "temp_bucket" {
name = "dataflow_schema_change_temp_bucket"
location = "US"
}
resource "google_pubsub_topic" "test" {
name = "test"
}
resource "google_pubsub_subscription" "pubsub2dataflow-test" {
name = "pubsub2dataflow-test"
topic = google_pubsub_topic.test.name
}
resource "google_bigquery_dataset" "dataflow_test" {
dataset_id = "dataflow_test"
friendly_name = "dataflow_test"
location = "US"
}
resource "google_bigquery_table" "test" {
dataset_id = google_bigquery_dataset.dataflow_test.dataset_id
table_id = "test"
schema = <<EOF
[
{
"name": "col1",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "col2",
"type": "STRING",
"mode": "NULLABLE"
}
]
EOF
}
resource "google_dataflow_job" "test" {
name = "test"
template_gcs_path = "gs://dataflow-templates-us-central1/2020-05-27-00_RC00/PubSub_Subscription_to_BigQuery"
temp_gcs_location = google_storage_bucket.temp_bucket.url
on_delete = "drain"
zone = "us-central1-a"
machine_type = "n1-standard-1"
parameters = {
inputSubscription = google_pubsub_subscription.pubsub2dataflow-test.path
outputTableSpec = "${google_bigquery_table.test.project}:${google_bigquery_table.test.dataset_id}.${google_bigquery_table.test.table_id}"
}
}
BigQueryのテーブルスキーマに一致するJSONをpublishする時のコマンドは以下のものです。
gcloud pubsub topics publish projects/プロジェクト名/topics/test --message '{"col1":"hoge","col2":"fuga"}'
では、この環境でPubSubにpublishしたJSONのスキーマとBigQueryのテーブルのスキーマを変えて検証していきます。
テーブルそのままで、PubSubにpublishするJSONのカラムを増やしたとき
以下のコマンドでpublishしてみます。
col3というカラムがBigQueryにはないカラムです。
gcloud pubsub topics publish projects/プロジェクト名/topics/test --message '{"col1":"hoge","col2":"fuga", "col3":"piyo"}'
このとき、BigQueryの目的のテーブルにはインサートされず、代わりに test_error_records
テーブルにインサートされます。
このテーブルは正常にインサートできなかったデータが入っているテーブルです。
errorMessageフィールドを見るとこのようなJSONが入っているため、publishするJSON側に余分なカラムがあるとダメなようです。
{"errors":[{"debugInfo":"","location":"col3","message":"no such field.","reason":"invalid"}],"index":0}
テーブルそのままで、PubSubにpublishするJSONのカラムを減らしたとき
先程の例とは逆にpublishするJSONのカラムを減らすとどうでしょうか。
gcloud pubsub topics publish projects/プロジェクト名/topics/test --message '{"col1":"hoge"}'
この場合は目的のテーブルにインサートがされます。
そして、不足しているカラムであるcol2の値はNULLになっています。
PubSubにpublishするJSONのカラムはそのままで、テーブルのカラムを増やしたとき
次にDataflowジョブを動かしたままBigQueryのテーブルのスキーマ変更をしてみます。
テーブルのカラムにNULLABLEなcol3を追加します。
resource "google_bigquery_table" "test" {
dataset_id = google_bigquery_dataset.dataflow_test.dataset_id
table_id = "test"
schema = <<EOF
[
{
"name": "col1",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "col2",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "col3",
"type": "STRING",
"mode": "NULLABLE"
}
]
EOF
}
この場合はDataflowは問題なくテーブルへのインサートを続けます。
新規に追加したカラムであるcol3に対してはNULLがインサートされています。
PubSubにpublishするJSONのカラムはそのままで、テーブルのカラムを減らしたとき
では最後にテーブルのカラムを減らす場合を検証してみます。
resource "google_bigquery_table" "test" {
dataset_id = google_bigquery_dataset.dataflow_test.dataset_id
table_id = "test"
schema = <<EOF
[
{
"name": "col1",
"type": "STRING",
"mode": "NULLABLE"
}
]
EOF
}
今度はインサートがエラーになり、代わりに test_error_records
テーブルにインサートがされます。
errorMessageには以下のように書かれており、やはりBigQueryのテーブルのカラムが不足しているという内容がかかれています。
{"errors":[{"debugInfo":"","location":"col2","message":"no such field.","reason":"invalid"}],"index":0}
まとめ
この検証を通して、以下のことが分かりました。
- BigQueryのテーブルのカラムに対して、JSONのカラムが不足しているときにはインサートは成功し不足していたカラムのデータはNULLになる
- BigQueryのテーブルのカラムに対して、JSONのカラムが過剰なときにはインサートは失敗し
テーブル名_error_records
に書き込まれる - 上記の2つはDataflowジョブを動かしたままBigQueryのテーブルのスキーマを変更しても成り立つ