3
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

posted at

updated at

Organization

BigQueryのテーブルのスキーマが変更された時のDataflowの挙動まとめ

BigQueryにデータをリアルタイムにインサートする時のパターンとしてCloud Pub/Sub→Cloud Dataflow→BigQueryという定番があります。

この定番パターンを使った場合に、スキーマ変更に対してどのような挙動をするのかを確認してみました。

最初にこのような検証用環境を作ります。
カラムが2つだけのBigQueryテーブルと、そこにデータを流し込むためのDataflowジョブを用意します。

terraform {
  required_version = "0.13.5"

  required_providers {
    google      = "3.45.0"
    google-beta = "3.45.0"
  }
}

locals {
  project = "プロジェクトID"
}

provider "google" {
  project = local.project
}

resource "google_storage_bucket" "temp_bucket" {
  name          = "dataflow_schema_change_temp_bucket"
  location      = "US"
}

resource "google_pubsub_topic" "test" {
  name = "test"
}

resource "google_pubsub_subscription" "pubsub2dataflow-test" {
  name  = "pubsub2dataflow-test"
  topic = google_pubsub_topic.test.name
}

resource "google_bigquery_dataset" "dataflow_test" {
  dataset_id    = "dataflow_test"
  friendly_name = "dataflow_test"
  location      = "US"
}

resource "google_bigquery_table" "test" {
  dataset_id = google_bigquery_dataset.dataflow_test.dataset_id
  table_id   = "test"
  schema     = <<EOF
  [
      {
          "name": "col1",
          "type": "STRING",
          "mode": "NULLABLE"
      },
      {
          "name": "col2",
          "type": "STRING",
          "mode": "NULLABLE"
      }
  ]
  EOF
}

resource "google_dataflow_job" "test" {
  name              = "test"
  template_gcs_path = "gs://dataflow-templates-us-central1/2020-05-27-00_RC00/PubSub_Subscription_to_BigQuery"
  temp_gcs_location = google_storage_bucket.temp_bucket.url
  on_delete         = "drain"
  zone              = "us-central1-a"
  machine_type      = "n1-standard-1"
  parameters = {
    inputSubscription = google_pubsub_subscription.pubsub2dataflow-test.path
    outputTableSpec   = "${google_bigquery_table.test.project}:${google_bigquery_table.test.dataset_id}.${google_bigquery_table.test.table_id}"
  }
}

BigQueryのテーブルスキーマに一致するJSONをpublishする時のコマンドは以下のものです。

gcloud pubsub topics publish projects/プロジェクト名/topics/test --message '{"col1":"hoge","col2":"fuga"}'

では、この環境でPubSubにpublishしたJSONのスキーマとBigQueryのテーブルのスキーマを変えて検証していきます。

テーブルそのままで、PubSubにpublishするJSONのカラムを増やしたとき

以下のコマンドでpublishしてみます。
col3というカラムがBigQueryにはないカラムです。

gcloud pubsub topics publish projects/プロジェクト名/topics/test --message '{"col1":"hoge","col2":"fuga", "col3":"piyo"}'

このとき、BigQueryの目的のテーブルにはインサートされず、代わりに test_error_records テーブルにインサートされます。
このテーブルは正常にインサートできなかったデータが入っているテーブルです。
errorMessageフィールドを見るとこのようなJSONが入っているため、publishするJSON側に余分なカラムがあるとダメなようです。

{"errors":[{"debugInfo":"","location":"col3","message":"no such field.","reason":"invalid"}],"index":0}

テーブルそのままで、PubSubにpublishするJSONのカラムを減らしたとき

先程の例とは逆にpublishするJSONのカラムを減らすとどうでしょうか。

gcloud pubsub topics publish projects/プロジェクト名/topics/test --message '{"col1":"hoge"}'

この場合は目的のテーブルにインサートがされます。
そして、不足しているカラムであるcol2の値はNULLになっています。

PubSubにpublishするJSONのカラムはそのままで、テーブルのカラムを増やしたとき

次にDataflowジョブを動かしたままBigQueryのテーブルのスキーマ変更をしてみます。
テーブルのカラムにNULLABLEなcol3を追加します。

resource "google_bigquery_table" "test" {
  dataset_id = google_bigquery_dataset.dataflow_test.dataset_id
  table_id   = "test"
  schema     = <<EOF
  [
      {
          "name": "col1",
          "type": "STRING",
          "mode": "NULLABLE"
      },
      {
          "name": "col2",
          "type": "STRING",
          "mode": "NULLABLE"
      },
      {
          "name": "col3",
          "type": "STRING",
          "mode": "NULLABLE"
      }
  ]
  EOF
}

この場合はDataflowは問題なくテーブルへのインサートを続けます。
新規に追加したカラムであるcol3に対してはNULLがインサートされています。

PubSubにpublishするJSONのカラムはそのままで、テーブルのカラムを減らしたとき

では最後にテーブルのカラムを減らす場合を検証してみます。

resource "google_bigquery_table" "test" {
  dataset_id = google_bigquery_dataset.dataflow_test.dataset_id
  table_id   = "test"
  schema     = <<EOF
  [
      {
          "name": "col1",
          "type": "STRING",
          "mode": "NULLABLE"
      }
  ]
  EOF
}

今度はインサートがエラーになり、代わりに test_error_records テーブルにインサートがされます。
errorMessageには以下のように書かれており、やはりBigQueryのテーブルのカラムが不足しているという内容がかかれています。

{"errors":[{"debugInfo":"","location":"col2","message":"no such field.","reason":"invalid"}],"index":0}

まとめ

この検証を通して、以下のことが分かりました。

  • BigQueryのテーブルのカラムに対して、JSONのカラムが不足しているときにはインサートは成功し不足していたカラムのデータはNULLになる
  • BigQueryのテーブルのカラムに対して、JSONのカラムが過剰なときにはインサートは失敗し テーブル名_error_records に書き込まれる
  • 上記の2つはDataflowジョブを動かしたままBigQueryのテーブルのスキーマを変更しても成り立つ
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
3
Help us understand the problem. What are the problem?