はじめに
BigQueryへのデータ連携パイプラインでGoogle Cloud Storage (GCS) を活用するケースは多い。
「どれだけのデータが連携されたか」「どのタイミングで連携されたか」のモニタリングを手軽にやりたい。
そこで、Pub/SubのBigQuery Subscriptionを活用して、GCSへのファイル格納時に発生するPub/Sub通知をBigQueryに直接連携してみた。
Pub/SubのBigQuery Subscriptionとは?
Pub/SubのBigQuery Subscriptionは、Pub/SubのメッセージをBigQueryに直接書き込む仕組み。
BigQuery Subscriptionで作成されるテーブルのスキーマ指定には3種類ある。
- トピックスキーマ
あらかじめ Pub/Sub トピックにスキーマ(メッセージの形式)を定義する。Apache Avro と Protocol Buffer の2つの形式に対応。 - テーブルスキーマ
Pub/Sub に投入された JSON メッセージのキー名と対応する列に、データが書き込まれる。あらかじめ Pub/Sub 側にスキーマを設定しておく必要はない。 - スキーマなし
データは対象 BigQuery テーブルの data という名称の列にメッセージを書き込まれる。
今回はGCSからのイベント通知を取り込むので、テーブルスキーマ、スキーマなしそれぞれを試してみた。
構築手順
# 事前準備
export PROJECT_NAME=aa
export PROJECT_NUMBER=11
export BQ_DATASET_NAME=bb
export BQ_TABLE_NAME_WITH_TABLE_SCHEMA=gcs_notification_table_schema
export BQ_TABLE_NAME_LESS_SCHEMA=gcs_notification_less_schema
export GCS_BUCKET_NAME=dd
export PUBSUB_TOPIC_NAME=ee
export PUBSUB_SUBSCRIPTION_NAME_WITH_TABLE_SCHEMA=ff
export PUBSUB_SUBSCRIPTION_NAME_LESS_SCHEMA=gg
# Pub/Sub Topic作成(省略可)
gcloud pubsub topics create ${PUBSUB_TOPIC_NAME}
# バケットを作成
gsutil mb gs://${GCS_BUCKET_NAME}
# 通知設定を作成
gcloud storage buckets notifications create gs://${GCS_BUCKET_NAME} --topic=${PUBSUB_TOPIC_NAME}
# Pub/SubのサービスエージェントにBigQueryデータ編集者の権限を付与
gcloud projects add-iam-policy-binding ${PROJECT_NAME} \
--member="serviceAccount:service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com" \
--role="roles/bigquery.dataEditor"
# BigQueryにテーブルを事前作成
## テーブルスキーマ
bq mk --table ${BQ_DATASET_NAME}.${BQ_TABLE_NAME_WITH_TABLE_SCHEMA} schema.json
## スキーマなし
bq mk --table ${BQ_DATASET_NAME}.${BQ_TABLE_NAME_LESS_SCHEMA} data:JSON
# BigQuery Subscriptionを作成
## テーブルスキーマ
gcloud pubsub subscriptions create ${PUBSUB_SUBSCRIPTION_NAME_WITH_TABLE_SCHEMA} \
--topic ${PUBSUB_TOPIC_NAME} \
--bigquery-table ${PROJECT_NAME}:${BQ_DATASET_NAME}.${BQ_TABLE_NAME_WITH_TABLE_SCHEMA} \
--use-table-schema
## スキーマなし
gcloud pubsub subscriptions create ${PUBSUB_SUBSCRIPTION_NAME_LESS_SCHEMA} \
--topic ${PUBSUB_TOPIC_NAME} \
--bigquery-table ${PROJECT_NAME}:${BQ_DATASET_NAME}.${BQ_TABLE_NAME_LESS_SCHEMA}
schama.jsonの中身
https://cloud.google.com/storage/docs/json_api/v1/objects#resource-representations
を参考に。
[
{"name": "kind", "type": "STRING"},
{"name": "id", "type": "STRING"},
{"name": "selfLink", "type": "STRING"},
{"name": "mediaLink", "type": "STRING"},
{"name": "name", "type": "STRING"},
{"name": "bucket", "type": "STRING"},
{"name": "generation", "type": "INTEGER"},
{"name": "metageneration", "type": "INTEGER"},
{"name": "contentType", "type": "STRING"},
{"name": "storageClass", "type": "STRING"},
{"name": "size", "type": "INTEGER"},
{"name": "softDeleteTime", "type": "TIMESTAMP"},
{"name": "restoreToken", "type": "STRING"},
{"name": "hardDeleteTime", "type": "TIMESTAMP"},
{"name": "md5Hash", "type": "STRING"},
{"name": "contentEncoding", "type": "STRING"},
{"name": "contentDisposition", "type": "STRING"},
{"name": "contentLanguage", "type": "STRING"},
{"name": "cacheControl", "type": "STRING"},
{"name": "crc32c", "type": "STRING"},
{"name": "componentCount", "type": "INTEGER"},
{"name": "etag", "type": "STRING"},
{"name": "kmsKeyName", "type": "STRING"},
{"name": "temporaryHold", "type": "BOOLEAN"},
{"name": "eventBasedHold", "type": "BOOLEAN"},
{"name": "retentionExpirationTime", "type": "TIMESTAMP"},
{"name": "retention", "type": "RECORD", "fields": [
{"name": "retainUntilTime", "type": "TIMESTAMP"},
{"name": "mode", "type": "STRING"}
]},
{"name": "timeCreated", "type": "TIMESTAMP"},
{"name": "updated", "type": "TIMESTAMP"},
{"name": "timeDeleted", "type": "TIMESTAMP"},
{"name": "timeStorageClassUpdated", "type": "TIMESTAMP"},
{"name": "customTime", "type": "TIMESTAMP"},
{"name": "metadata", "type": "RECORD", "mode": "REPEATED", "fields": [
{"name": "key", "type": "STRING"},
{"name": "value", "type": "STRING"}
]},
{"name": "acl", "type": "RECORD", "mode": "REPEATED", "fields": [
{"name": "entity", "type": "STRING"},
{"name": "role", "type": "STRING"}
]},
{"name": "owner", "type": "RECORD", "fields": [
{"name": "entity", "type": "STRING"},
{"name": "entityId", "type": "STRING"}
]},
{"name": "customerEncryption", "type": "RECORD", "fields": [
{"name": "encryptionAlgorithm", "type": "STRING"},
{"name": "keySha256", "type": "STRING"}
]}
]
動作確認
GCSにファイルをアップロードしてみる。
gcloud storage cp test.csv gs://ckfm0211-bucket-notification-test/2024/12/5/test_table/datafile.csv
テーブルを確認。
bq head ${PROJECT_NAME}:${BQ_DATASET_NAME}.${BQ_TABLE_NAME_WITH_TABLE_SCHEMA}

| kind | id | selfLink | mediaLink | name | bucket | generation | metageneration | contentType | storageClass | size | softDeleteTime | restoreToken | hardDeleteTime | md5Hash | contentEncoding | contentDisposition | contentLanguage | cacheControl | crc32c | componentCount | etag | kmsKeyName | temporaryHold | eventBasedHold | retentionExpirationTime | retention | timeCreated | updated | timeDeleted | timeStorageClassUpdated | customTime | metadata | acl | owner | customerEncryption |

| storage#object | ckfm0211-bucket-notification-test/2024/12/5/test_table/datafile.csv/1732520594602464 | https://www.googleapis.com/storage/v1/b/ckfm0211-bucket-notification-test/o/2024%2F12%2F5%2Ftest_table%2Fdatafile.csv | https://storage.googleapis.com/download/storage/v1/b/ckfm0211-bucket-notification-test/o/2024%2F12%2F5%2Ftest_table%2Fdatafile.csv?generation=1732520594602464&alt=media | 2024/12/5/test_table/datafile.csv | ckfm0211-bucket-notification-test | 1732520594602464 | 1 | text/csv | STANDARD | 10 | NULL | NULL | NULL | gkwcnrFC+35b4Lxf+5cyvQ== | NULL | NULL | NULL | NULL | uXIkYw== | NULL | CODz7ZT+9okDEAE= | NULL | NULL | NULL | NULL | NULL | 2024-11-25 07:43:14 | 2024-11-25 07:43:14 | NULL | 2024-11-25 07:43:14 | NULL | [] | [] | NULL | NULL |

bq head ${PROJECT_NAME}:${BQ_DATASET_NAME}.${BQ_TABLE_NAME_LESS_SCHEMA}

| data |

| {"kind":"storage#object","id":"ckfm0211-bucket-notification-test/2024/12/5/test_table/datafile.csv/1732520594602464","selfLink":"https://www.googleapis.com/storage/v1/b/ckfm0211-bucket-notification-test/o/2024%2F12%2F5%2Ftest_table%2Fdatafile.csv","name":"2024/12/5/test_table/datafile.csv","bucket":"ckfm0211-bucket-notification-test","generation":"1732520594602464","metageneration":"1","contentType":"text/csv","timeCreated":"2024-11-25T07:43:14.648Z","updated":"2024-11-25T07:43:14.648Z","storageClass":"STANDARD","timeStorageClassUpdated":"2024-11-25T07:43:14.648Z","size":"10","md5Hash":"gkwcnrFC+35b4Lxf+5cyvQ==","mediaLink":"https://storage.googleapis.com/download/storage/v1/b/ckfm0211-bucket-notification-test/o/2024%2F12%2F5%2Ftest_table%2Fdatafile.csv?generation=1732520594602464&alt=media","crc32c":"uXIkYw==","etag":"CODz7ZT+9okDEAE="} |

データが入った。
注意点
テーブルスキーマを採用する場合には、Google Cloud側の仕様変更等によりPub/Subに通知されるGCS通知のJSONスキーマが変更される可能性があるため、その際には
- テーブル定義の変更
- 事前に「Drop unknown fields」を有効化
が必要になる。また、前者の場合には失敗を検知するために「Dead lettering」を設定しておく必要がある。
まとめ
GCSにオブジェクトが格納された際のメタデータをBigQueryに自動連携する仕組みを、Pub/SubのBigQuery Subscriptionで構築できた。
データのスキーマが外部に依存(今回はGoogle Cloud)に依存するので、実際のシステムで構築するならスキーマなしでの利用が手軽かと思う。
ただし、このデータを活用する際にはJSON型のデータを取り扱うことになるので、それはそれで少し面倒か。JSON_VALUE
関数使えばいいだけなんだけど。
また、Subscription設定では通知のフィルタリングも可能なので、delete objectは無視する、だったり、特定のオブジェクトキーに一致するパスの通知だけを取得するなど、実際のシステムに合わせた変更は必要ではある。
なお、同様のデータはStorage Insights のインベントリ レポートでも取得が可能(CSV or Apache Parquet)。
こちらはGCSに定期的にストレージにあるオブジェクトの情報を出力してくれるので、それをまた自動的にスケジュールクエリなどでBigQueryへロードする、というのでも同じことはできる。
ただ、バケット内の全量が毎回出力されるので、今回の方式のほうがコスパはよさそう。