  - name: col1
    type: STRING
    mode: NULLABLE
  - name: col2
    type: INTEGER
    mode: NULLABLE    
  - name: col3
    type: RECORD
    mode: REPEATED
      - name: col4
        type: INTEGER
        mode: NULLABLE
      - name: col5
        type: INTEGER
        mode: NULLABLE

  - col1: 'abcd'
    col2: 1234
      - col4: 5678
        col5: 9012
  - col1: 'efgh'
    col2: 3456
      - col4: 7890
        col5: 1234


実際に使うときにはterraform applyをしているCI/CDパイプラインの直前に入れておくと良いです。

import json
import yaml
import sys
import re

def validate_value(name, _type, mode, fields, value):
    if mode == "NULLABLE" and value is None:

    match _type, mode:
        case "INTEGER", "NULLABLE" | "REQUIRED":
            if type(value) is not int:
                raise RuntimeError(f"cloumn {name}: {value} is not integer")
        case "FLOAT", "NULLABLE" | "REQUIRED":
            if type(value) is not float:
                raise RuntimeError(f"cloumn {name}: {value} is not float")
        case "STRING", "NULLABLE" | "REQUIRED":
            if type(value) is not str:
                raise RuntimeError(f"cloumn {name}: {value} is not string")
        case "TIMESTAMP", "NULLABLE" | "REQUIRED":
            if type(value) is not str:
                raise RuntimeError(f"cloumn {name}: {value} is not string")
            if not re.match(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', value): #TODO: 正規表現を厳密にする
                raise RuntimeError(f"cloumn {name}: {value} is not valid timestamp format")
        case "DATETIME", "NULLABLE" | "REQUIRED":
            if type(value) is not str:
                raise RuntimeError(f"cloumn {name}: {value} is not string")
            if not re.match(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', value): #TODO: 正規表現を厳密にする
                raise RuntimeError(f"cloumn {name}: {value} is not valid datetime format")
        case "DATE", "NULLABLE" | "REQUIRED":
            if type(value) is not str:
                raise RuntimeError(f"cloumn {name}: {value} is not string")
            if not re.match(r'\d{4}-\d{2}-\d{2}', value):
                raise RuntimeError(f"cloumn {name}: {value} is not valid timestamp format")
        case "RECORD" | "STRUCT", "NULLABLE" | "REQUIRED":
            if type(value) is not dict:
                raise RuntimeError(f"cloumn {name}: {value} is not hash")

            for column in fields:
                field_name = column['name']
                _type = column['type']
                mode = column['mode']
                fields = column.get('fields')

                name_in_hash = f"{name}['{field_name}']"
                value_in_hash = value[field_name]

                validate_value(name_in_hash, _type, mode, fields, value_in_hash)
        case _, "REPEATED":
            if type(value) is not list:
                raise RuntimeError(f"cloumn {name}: {value} is not array")

            for i, v in enumerate(value):
                name_in_array = f"{name}[{i}]"
                validate_value(name_in_array, _type, "NULLABLE", fields, v)
        case _, _:
            raise RuntimeError(f"Unknown schema name: {name}, type: {_type}, mode: {mode}")

def schema_validation(schema, row):
    for column in schema:
        name = column['name']
        _type = column['type']
        mode = column['mode']
        fields = column.get('fields')
        value = row[name]

        validate_value(name, _type, mode, fields, value)

yaml_filename = sys.argv[1]
jsonl_filename = yaml_filename.replace('.yaml', '.jsonl')
schema_filename = yaml_filename.replace('.yaml', '.schema.yaml')

with open(yaml_filename) as f:
    static_data_table = yaml.safe_load(f)

schema = static_data_table['schema']
rows = static_data_table['data']

converted_rows = []
for row in rows:
    schema_validation(schema, row)
    converted_rows.append(json.dumps(row, ensure_ascii=False))

with open(schema_filename, 'w') as f:
    yaml.dump(schema, f, sort_keys=False)

with open(jsonl_filename, 'w') as jsonl_file:
    for converted_row in converted_rows:

そして、以下のterraformでテーブルの中のデータであるJSONLファイルをGCSにアップロードするとともに、そのファイルを参照する外部表を作成します。BigLakeテーブル形式の外部表にしたほうが権限管理が楽なので、最初にBigQuery Connectionを作成し、ConnectionのService Accountに対してBucketの読み出し権限を与えています。

resource "google_bigquery_connection" "gcs-resource" {
  connection_id = "gcs_resource"
  location      = "US"
  description   = "Connection for GCS Resource"
  cloud_resource {}

resource "google_storage_bucket" "static-data-table" {
  name     = "${local.project}-static-data-table"
  location = "US-CENTRAL1"

  uniform_bucket_level_access = true
  public_access_prevention    = "enforced"

resource "google_storage_bucket_iam_member" "static-data-table-connection-grant" {
  bucket = google_storage_bucket.static-data-table.name
  role   = "roles/storage.objectViewer"
  member = format("serviceAccount:%s", google_bigquery_connection.gcs-resource.cloud_resource[0].service_account_id)

resource "google_storage_bucket_object" "static-data-table" {
  for_each = fileset("${path.module}/static_data_tables", "*/*.jsonl")

  name   = each.value
  bucket = google_storage_bucket.static-data-table.name
  source = "${path.module}/static_data_tables/${each.value}"

resource "google_bigquery_table" "static-data-table" {
  for_each = fileset("${path.module}/static_data_tables", "*/*.jsonl")

  project    = local.project
  dataset_id = split("/", each.value)[0]
  table_id   = trimsuffix(split("/", each.value)[1], ".jsonl")
  schema     = jsonencode(yamldecode(file(replace("${path.module}/static_data_tables/${each.value}", ".jsonl", ".schema.yaml"))))

  deletion_protection = false

  external_data_configuration {
    source_format = "NEWLINE_DELIMITED_JSON"
    compression   = "NONE"
    autodetect    = false
    connection_id = google_bigquery_connection.gcs-resource.id

    source_uris = [
      join("/", [google_storage_bucket.static-data-table.url, google_storage_bucket_object.static-data-table[each.value].name])

  # Ignore connection_id because the difference will appear even though the connection_id has not changed.
  # https://github.com/hashicorp/terraform-provider-google/issues/12386
  lifecycle {
    ignore_changes = [external_data_configuration[0].connection_id]



