LoginSignup
3

More than 3 years have passed since last update.

BigQueryのテーブルのスキーマが変更された時のDataflowの挙動まとめ

Last updated at Posted at 2020-12-15

BigQueryにデータをリアルタイムにインサートする時のパターンとしてCloud Pub/Sub→Cloud Dataflow→BigQueryという定番があります。

この定番パターンを使った場合に、スキーマ変更に対してどのような挙動をするのかを確認してみました。

最初にこのような検証用環境を作ります。
カラムが2つだけのBigQueryテーブルと、そこにデータを流し込むためのDataflowジョブを用意します。

terraform {
  required_version = "0.13.5"

  required_providers {
    google      = "3.45.0"
    google-beta = "3.45.0"
  }
}

locals {
  project = "プロジェクトID"
}

provider "google" {
  project = local.project
}

resource "google_storage_bucket" "temp_bucket" {
  name          = "dataflow_schema_change_temp_bucket"
  location      = "US"
}

resource "google_pubsub_topic" "test" {
  name = "test"
}

resource "google_pubsub_subscription" "pubsub2dataflow-test" {
  name  = "pubsub2dataflow-test"
  topic = google_pubsub_topic.test.name
}

resource "google_bigquery_dataset" "dataflow_test" {
  dataset_id    = "dataflow_test"
  friendly_name = "dataflow_test"
  location      = "US"
}

resource "google_bigquery_table" "test" {
  dataset_id = google_bigquery_dataset.dataflow_test.dataset_id
  table_id   = "test"
  schema     = <<EOF
  [
      {
          "name": "col1",
          "type": "STRING",
          "mode": "NULLABLE"
      },
      {
          "name": "col2",
          "type": "STRING",
          "mode": "NULLABLE"
      }
  ]
  EOF
}

resource "google_dataflow_job" "test" {
  name              = "test"
  template_gcs_path = "gs://dataflow-templates-us-central1/2020-05-27-00_RC00/PubSub_Subscription_to_BigQuery"
  temp_gcs_location = google_storage_bucket.temp_bucket.url
  on_delete         = "drain"
  zone              = "us-central1-a"
  machine_type      = "n1-standard-1"
  parameters = {
    inputSubscription = google_pubsub_subscription.pubsub2dataflow-test.path
    outputTableSpec   = "${google_bigquery_table.test.project}:${google_bigquery_table.test.dataset_id}.${google_bigquery_table.test.table_id}"
  }
}

BigQueryのテーブルスキーマに一致するJSONをpublishする時のコマンドは以下のものです。

gcloud pubsub topics publish projects/プロジェクト名/topics/test --message '{"col1":"hoge","col2":"fuga"}'

では、この環境でPubSubにpublishしたJSONのスキーマとBigQueryのテーブルのスキーマを変えて検証していきます。

テーブルそのままで、PubSubにpublishするJSONのカラムを増やしたとき

以下のコマンドでpublishしてみます。
col3というカラムがBigQueryにはないカラムです。

gcloud pubsub topics publish projects/プロジェクト名/topics/test --message '{"col1":"hoge","col2":"fuga", "col3":"piyo"}'

このとき、BigQueryの目的のテーブルにはインサートされず、代わりに test_error_records テーブルにインサートされます。
このテーブルは正常にインサートできなかったデータが入っているテーブルです。
errorMessageフィールドを見るとこのようなJSONが入っているため、publishするJSON側に余分なカラムがあるとダメなようです。

{"errors":[{"debugInfo":"","location":"col3","message":"no such field.","reason":"invalid"}],"index":0}

テーブルそのままで、PubSubにpublishするJSONのカラムを減らしたとき

先程の例とは逆にpublishするJSONのカラムを減らすとどうでしょうか。

gcloud pubsub topics publish projects/プロジェクト名/topics/test --message '{"col1":"hoge"}'

この場合は目的のテーブルにインサートがされます。
そして、不足しているカラムであるcol2の値はNULLになっています。

PubSubにpublishするJSONのカラムはそのままで、テーブルのカラムを増やしたとき

次にDataflowジョブを動かしたままBigQueryのテーブルのスキーマ変更をしてみます。
テーブルのカラムにNULLABLEなcol3を追加します。

resource "google_bigquery_table" "test" {
  dataset_id = google_bigquery_dataset.dataflow_test.dataset_id
  table_id   = "test"
  schema     = <<EOF
  [
      {
          "name": "col1",
          "type": "STRING",
          "mode": "NULLABLE"
      },
      {
          "name": "col2",
          "type": "STRING",
          "mode": "NULLABLE"
      },
      {
          "name": "col3",
          "type": "STRING",
          "mode": "NULLABLE"
      }
  ]
  EOF
}

この場合はDataflowは問題なくテーブルへのインサートを続けます。
新規に追加したカラムであるcol3に対してはNULLがインサートされています。

PubSubにpublishするJSONのカラムはそのままで、テーブルのカラムを減らしたとき

では最後にテーブルのカラムを減らす場合を検証してみます。

resource "google_bigquery_table" "test" {
  dataset_id = google_bigquery_dataset.dataflow_test.dataset_id
  table_id   = "test"
  schema     = <<EOF
  [
      {
          "name": "col1",
          "type": "STRING",
          "mode": "NULLABLE"
      }
  ]
  EOF
}

今度はインサートがエラーになり、代わりに test_error_records テーブルにインサートがされます。
errorMessageには以下のように書かれており、やはりBigQueryのテーブルのカラムが不足しているという内容がかかれています。

{"errors":[{"debugInfo":"","location":"col2","message":"no such field.","reason":"invalid"}],"index":0}

まとめ

この検証を通して、以下のことが分かりました。

  • BigQueryのテーブルのカラムに対して、JSONのカラムが不足しているときにはインサートは成功し不足していたカラムのデータはNULLになる
  • BigQueryのテーブルのカラムに対して、JSONのカラムが過剰なときにはインサートは失敗し テーブル名_error_records に書き込まれる
  • 上記の2つはDataflowジョブを動かしたままBigQueryのテーブルのスキーマを変更しても成り立つ

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3