5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ZOZOAdvent Calendar 2023

Day 4

BigQueryの静的データテーブルをterraformで管理する

Posted at

以下の記事ではテーブルのスキーマ定義だけをterraformで管理してデータはINSERT文などのDMLで用意していました。

この記事ではテーブルの中のデータもterraformで管理する方法を解説します。

まずは以下のようにスキーマ定義とデータを格納したYAMLファイルを用意します。

schema:
  - name: col1
    type: STRING
    mode: NULLABLE
  - name: col2
    type: INTEGER
    mode: NULLABLE    
  - name: col3
    type: RECORD
    mode: REPEATED
    fields:
      - name: col4
        type: INTEGER
        mode: NULLABLE
      - name: col5
        type: INTEGER
        mode: NULLABLE

data:
  - col1: 'abcd'
    col2: 1234
    col3:
      - col4: 5678
        col5: 9012
  - col1: 'efgh'
    col2: 3456
    col3:
      - col4: 7890
        col5: 1234

あとは、上記のYAMLファイルのスキーマ部分とデータ部分をいい感じに取り出してterraformに渡せばOKです。
スキーマ部分は前回の記事で解説したようにyamldecodeした後にjsonencodeすれば問題ないですが、データ部分はちょっと工夫が必要です。

まずは以下のスクリプトでYAMLがスキーマ通りなのかどうかをチェックしつつJSONLに変換します。
実際に使うときにはterraform applyをしているCI/CDパイプラインの直前に入れておくと良いです。

import json
import yaml
import sys
import re

def validate_value(name, _type, mode, fields, value):
    if mode == "NULLABLE" and value is None:
        return

    match _type, mode:
        case "INTEGER", "NULLABLE" | "REQUIRED":
            if type(value) is not int:
                raise RuntimeError(f"cloumn {name}: {value} is not integer")
        case "FLOAT", "NULLABLE" | "REQUIRED":
            if type(value) is not float:
                raise RuntimeError(f"cloumn {name}: {value} is not float")
        case "STRING", "NULLABLE" | "REQUIRED":
            if type(value) is not str:
                raise RuntimeError(f"cloumn {name}: {value} is not string")
        case "TIMESTAMP", "NULLABLE" | "REQUIRED":
            if type(value) is not str:
                raise RuntimeError(f"cloumn {name}: {value} is not string")
            if not re.match(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', value): #TODO: 正規表現を厳密にする
                raise RuntimeError(f"cloumn {name}: {value} is not valid timestamp format")
        case "DATETIME", "NULLABLE" | "REQUIRED":
            if type(value) is not str:
                raise RuntimeError(f"cloumn {name}: {value} is not string")
            if not re.match(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', value): #TODO: 正規表現を厳密にする
                raise RuntimeError(f"cloumn {name}: {value} is not valid datetime format")
        case "DATE", "NULLABLE" | "REQUIRED":
            if type(value) is not str:
                raise RuntimeError(f"cloumn {name}: {value} is not string")
            if not re.match(r'\d{4}-\d{2}-\d{2}', value):
                raise RuntimeError(f"cloumn {name}: {value} is not valid timestamp format")
        case "RECORD" | "STRUCT", "NULLABLE" | "REQUIRED":
            if type(value) is not dict:
                raise RuntimeError(f"cloumn {name}: {value} is not hash")

            for column in fields:
                field_name = column['name']
                _type = column['type']
                mode = column['mode']
                fields = column.get('fields')

                name_in_hash = f"{name}['{field_name}']"
                value_in_hash = value[field_name]

                validate_value(name_in_hash, _type, mode, fields, value_in_hash)
        case _, "REPEATED":
            if type(value) is not list:
                raise RuntimeError(f"cloumn {name}: {value} is not array")

            for i, v in enumerate(value):
                name_in_array = f"{name}[{i}]"
                validate_value(name_in_array, _type, "NULLABLE", fields, v)
        case _, _:
            raise RuntimeError(f"Unknown schema name: {name}, type: {_type}, mode: {mode}")

def schema_validation(schema, row):
    for column in schema:
        name = column['name']
        _type = column['type']
        mode = column['mode']
        fields = column.get('fields')
        value = row[name]

        validate_value(name, _type, mode, fields, value)

yaml_filename = sys.argv[1]
jsonl_filename = yaml_filename.replace('.yaml', '.jsonl')
schema_filename = yaml_filename.replace('.yaml', '.schema.yaml')

with open(yaml_filename) as f:
    static_data_table = yaml.safe_load(f)

schema = static_data_table['schema']
rows = static_data_table['data']

converted_rows = []
for row in rows:
    schema_validation(schema, row)
    converted_rows.append(json.dumps(row, ensure_ascii=False))

with open(schema_filename, 'w') as f:
    yaml.dump(schema, f, sort_keys=False)

with open(jsonl_filename, 'w') as jsonl_file:
    for converted_row in converted_rows:
        jsonl_file.write(converted_row)
        jsonl_file.write("\n")

そして、以下のterraformでテーブルの中のデータであるJSONLファイルをGCSにアップロードするとともに、そのファイルを参照する外部表を作成します。BigLakeテーブル形式の外部表にしたほうが権限管理が楽なので、最初にBigQuery Connectionを作成し、ConnectionのService Accountに対してBucketの読み出し権限を与えています。

resource "google_bigquery_connection" "gcs-resource" {
  connection_id = "gcs_resource"
  location      = "US"
  description   = "Connection for GCS Resource"
  cloud_resource {}
}

resource "google_storage_bucket" "static-data-table" {
  name     = "${local.project}-static-data-table"
  location = "US-CENTRAL1"

  uniform_bucket_level_access = true
  public_access_prevention    = "enforced"
}

resource "google_storage_bucket_iam_member" "static-data-table-connection-grant" {
  bucket = google_storage_bucket.static-data-table.name
  role   = "roles/storage.objectViewer"
  member = format("serviceAccount:%s", google_bigquery_connection.gcs-resource.cloud_resource[0].service_account_id)
}

resource "google_storage_bucket_object" "static-data-table" {
  for_each = fileset("${path.module}/static_data_tables", "*/*.jsonl")

  name   = each.value
  bucket = google_storage_bucket.static-data-table.name
  source = "${path.module}/static_data_tables/${each.value}"
}

resource "google_bigquery_table" "static-data-table" {
  for_each = fileset("${path.module}/static_data_tables", "*/*.jsonl")

  project    = local.project
  dataset_id = split("/", each.value)[0]
  table_id   = trimsuffix(split("/", each.value)[1], ".jsonl")
  schema     = jsonencode(yamldecode(file(replace("${path.module}/static_data_tables/${each.value}", ".jsonl", ".schema.yaml"))))

  deletion_protection = false

  external_data_configuration {
    source_format = "NEWLINE_DELIMITED_JSON"
    compression   = "NONE"
    autodetect    = false
    connection_id = google_bigquery_connection.gcs-resource.id

    source_uris = [
      join("/", [google_storage_bucket.static-data-table.url, google_storage_bucket_object.static-data-table[each.value].name])
    ]
  }

  # Ignore connection_id because the difference will appear even though the connection_id has not changed.
  # https://github.com/hashicorp/terraform-provider-google/issues/12386
  lifecycle {
    ignore_changes = [external_data_configuration[0].connection_id]
  }
}

これによって、記事先頭で書いたYAMLファイルを追加するだけで自動的にBigQueryの外部表が作成されるようになります。

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?