以下の記事ではテーブルのスキーマ定義だけをterraformで管理してデータはINSERT文などのDMLで用意していました。
この記事ではテーブルの中のデータもterraformで管理する方法を解説します。
まずは以下のようにスキーマ定義とデータを格納したYAMLファイルを用意します。
schema:
- name: col1
type: STRING
mode: NULLABLE
- name: col2
type: INTEGER
mode: NULLABLE
- name: col3
type: RECORD
mode: REPEATED
fields:
- name: col4
type: INTEGER
mode: NULLABLE
- name: col5
type: INTEGER
mode: NULLABLE
data:
- col1: 'abcd'
col2: 1234
col3:
- col4: 5678
col5: 9012
- col1: 'efgh'
col2: 3456
col3:
- col4: 7890
col5: 1234
あとは、上記のYAMLファイルのスキーマ部分とデータ部分をいい感じに取り出してterraformに渡せばOKです。
スキーマ部分は前回の記事で解説したようにyamldecodeした後にjsonencodeすれば問題ないですが、データ部分はちょっと工夫が必要です。
まずは以下のスクリプトでYAMLがスキーマ通りなのかどうかをチェックしつつJSONLに変換します。
実際に使うときにはterraform applyをしているCI/CDパイプラインの直前に入れておくと良いです。
import json
import yaml
import sys
import re
def validate_value(name, _type, mode, fields, value):
if mode == "NULLABLE" and value is None:
return
match _type, mode:
case "INTEGER", "NULLABLE" | "REQUIRED":
if type(value) is not int:
raise RuntimeError(f"cloumn {name}: {value} is not integer")
case "FLOAT", "NULLABLE" | "REQUIRED":
if type(value) is not float:
raise RuntimeError(f"cloumn {name}: {value} is not float")
case "STRING", "NULLABLE" | "REQUIRED":
if type(value) is not str:
raise RuntimeError(f"cloumn {name}: {value} is not string")
case "TIMESTAMP", "NULLABLE" | "REQUIRED":
if type(value) is not str:
raise RuntimeError(f"cloumn {name}: {value} is not string")
if not re.match(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', value): #TODO: 正規表現を厳密にする
raise RuntimeError(f"cloumn {name}: {value} is not valid timestamp format")
case "DATETIME", "NULLABLE" | "REQUIRED":
if type(value) is not str:
raise RuntimeError(f"cloumn {name}: {value} is not string")
if not re.match(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', value): #TODO: 正規表現を厳密にする
raise RuntimeError(f"cloumn {name}: {value} is not valid datetime format")
case "DATE", "NULLABLE" | "REQUIRED":
if type(value) is not str:
raise RuntimeError(f"cloumn {name}: {value} is not string")
if not re.match(r'\d{4}-\d{2}-\d{2}', value):
raise RuntimeError(f"cloumn {name}: {value} is not valid timestamp format")
case "RECORD" | "STRUCT", "NULLABLE" | "REQUIRED":
if type(value) is not dict:
raise RuntimeError(f"cloumn {name}: {value} is not hash")
for column in fields:
field_name = column['name']
_type = column['type']
mode = column['mode']
fields = column.get('fields')
name_in_hash = f"{name}['{field_name}']"
value_in_hash = value[field_name]
validate_value(name_in_hash, _type, mode, fields, value_in_hash)
case _, "REPEATED":
if type(value) is not list:
raise RuntimeError(f"cloumn {name}: {value} is not array")
for i, v in enumerate(value):
name_in_array = f"{name}[{i}]"
validate_value(name_in_array, _type, "NULLABLE", fields, v)
case _, _:
raise RuntimeError(f"Unknown schema name: {name}, type: {_type}, mode: {mode}")
def schema_validation(schema, row):
for column in schema:
name = column['name']
_type = column['type']
mode = column['mode']
fields = column.get('fields')
value = row[name]
validate_value(name, _type, mode, fields, value)
yaml_filename = sys.argv[1]
jsonl_filename = yaml_filename.replace('.yaml', '.jsonl')
schema_filename = yaml_filename.replace('.yaml', '.schema.yaml')
with open(yaml_filename) as f:
static_data_table = yaml.safe_load(f)
schema = static_data_table['schema']
rows = static_data_table['data']
converted_rows = []
for row in rows:
schema_validation(schema, row)
converted_rows.append(json.dumps(row, ensure_ascii=False))
with open(schema_filename, 'w') as f:
yaml.dump(schema, f, sort_keys=False)
with open(jsonl_filename, 'w') as jsonl_file:
for converted_row in converted_rows:
jsonl_file.write(converted_row)
jsonl_file.write("\n")
そして、以下のterraformでテーブルの中のデータであるJSONLファイルをGCSにアップロードするとともに、そのファイルを参照する外部表を作成します。BigLakeテーブル形式の外部表にしたほうが権限管理が楽なので、最初にBigQuery Connectionを作成し、ConnectionのService Accountに対してBucketの読み出し権限を与えています。
resource "google_bigquery_connection" "gcs-resource" {
connection_id = "gcs_resource"
location = "US"
description = "Connection for GCS Resource"
cloud_resource {}
}
resource "google_storage_bucket" "static-data-table" {
name = "${local.project}-static-data-table"
location = "US-CENTRAL1"
uniform_bucket_level_access = true
public_access_prevention = "enforced"
}
resource "google_storage_bucket_iam_member" "static-data-table-connection-grant" {
bucket = google_storage_bucket.static-data-table.name
role = "roles/storage.objectViewer"
member = format("serviceAccount:%s", google_bigquery_connection.gcs-resource.cloud_resource[0].service_account_id)
}
resource "google_storage_bucket_object" "static-data-table" {
for_each = fileset("${path.module}/static_data_tables", "*/*.jsonl")
name = each.value
bucket = google_storage_bucket.static-data-table.name
source = "${path.module}/static_data_tables/${each.value}"
}
resource "google_bigquery_table" "static-data-table" {
for_each = fileset("${path.module}/static_data_tables", "*/*.jsonl")
project = local.project
dataset_id = split("/", each.value)[0]
table_id = trimsuffix(split("/", each.value)[1], ".jsonl")
schema = jsonencode(yamldecode(file(replace("${path.module}/static_data_tables/${each.value}", ".jsonl", ".schema.yaml"))))
deletion_protection = false
external_data_configuration {
source_format = "NEWLINE_DELIMITED_JSON"
compression = "NONE"
autodetect = false
connection_id = google_bigquery_connection.gcs-resource.id
source_uris = [
join("/", [google_storage_bucket.static-data-table.url, google_storage_bucket_object.static-data-table[each.value].name])
]
}
# Ignore connection_id because the difference will appear even though the connection_id has not changed.
# https://github.com/hashicorp/terraform-provider-google/issues/12386
lifecycle {
ignore_changes = [external_data_configuration[0].connection_id]
}
}
これによって、記事先頭で書いたYAMLファイルを追加するだけで自動的にBigQueryの外部表が作成されるようになります。