はじめに
BigQueryへのデータ連携パイプラインでGoogle Cloud Storage (GCS) を活用するケースは多い。
「どれだけのデータが連携されたか」「どのタイミングで連携されたか」のモニタリングを手軽にやりたい。
そこで、Pub/SubのBigQuery Subscriptionを活用して、GCSへのファイル格納時に発生するPub/Sub通知をBigQueryに直接連携してみた。
Pub/SubのBigQuery Subscriptionとは?
Pub/SubのBigQuery Subscriptionは、Pub/SubのメッセージをBigQueryに直接書き込む仕組み。
BigQuery Subscriptionで作成されるテーブルのスキーマ指定には3種類ある。
- トピックスキーマ
あらかじめ Pub/Sub トピックにスキーマ(メッセージの形式)を定義する。Apache Avro と Protocol Buffer の2つの形式に対応。 - テーブルスキーマ
Pub/Sub に投入された JSON メッセージのキー名と対応する列に、データが書き込まれる。あらかじめ Pub/Sub 側にスキーマを設定しておく必要はない。 - スキーマなし
データは対象 BigQuery テーブルの data という名称の列にメッセージを書き込まれる。 
今回はGCSからのイベント通知を取り込むので、テーブルスキーマ、スキーマなしそれぞれを試してみた。
構築手順
# 事前準備
export PROJECT_NAME=aa
export PROJECT_NUMBER=11
export BQ_DATASET_NAME=bb
export BQ_TABLE_NAME_WITH_TABLE_SCHEMA=gcs_notification_table_schema
export BQ_TABLE_NAME_LESS_SCHEMA=gcs_notification_less_schema
export GCS_BUCKET_NAME=dd
export PUBSUB_TOPIC_NAME=ee
export PUBSUB_SUBSCRIPTION_NAME_WITH_TABLE_SCHEMA=ff
export PUBSUB_SUBSCRIPTION_NAME_LESS_SCHEMA=gg
# Pub/Sub Topic作成(省略可)
gcloud pubsub topics create ${PUBSUB_TOPIC_NAME}
# バケットを作成
gsutil mb gs://${GCS_BUCKET_NAME}
# 通知設定を作成
gcloud storage buckets notifications create gs://${GCS_BUCKET_NAME} --topic=${PUBSUB_TOPIC_NAME}
# Pub/SubのサービスエージェントにBigQueryデータ編集者の権限を付与
gcloud projects add-iam-policy-binding ${PROJECT_NAME} \
--member="serviceAccount:service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com" \
--role="roles/bigquery.dataEditor"
# BigQueryにテーブルを事前作成
## テーブルスキーマ
bq mk --table ${BQ_DATASET_NAME}.${BQ_TABLE_NAME_WITH_TABLE_SCHEMA} schema.json
## スキーマなし
bq mk --table ${BQ_DATASET_NAME}.${BQ_TABLE_NAME_LESS_SCHEMA} data:JSON
# BigQuery Subscriptionを作成
## テーブルスキーマ
gcloud pubsub subscriptions create ${PUBSUB_SUBSCRIPTION_NAME_WITH_TABLE_SCHEMA} \
  --topic ${PUBSUB_TOPIC_NAME} \
  --bigquery-table ${PROJECT_NAME}:${BQ_DATASET_NAME}.${BQ_TABLE_NAME_WITH_TABLE_SCHEMA} \
  --use-table-schema
## スキーマなし
gcloud pubsub subscriptions create ${PUBSUB_SUBSCRIPTION_NAME_LESS_SCHEMA} \
  --topic ${PUBSUB_TOPIC_NAME} \
  --bigquery-table ${PROJECT_NAME}:${BQ_DATASET_NAME}.${BQ_TABLE_NAME_LESS_SCHEMA}
schama.jsonの中身
https://cloud.google.com/storage/docs/json_api/v1/objects#resource-representations
を参考に。
[
  {"name": "kind", "type": "STRING"},
  {"name": "id", "type": "STRING"},
  {"name": "selfLink", "type": "STRING"},
  {"name": "mediaLink", "type": "STRING"},
  {"name": "name", "type": "STRING"},
  {"name": "bucket", "type": "STRING"},
  {"name": "generation", "type": "INTEGER"},
  {"name": "metageneration", "type": "INTEGER"},
  {"name": "contentType", "type": "STRING"},
  {"name": "storageClass", "type": "STRING"},
  {"name": "size", "type": "INTEGER"},
  {"name": "softDeleteTime", "type": "TIMESTAMP"},
  {"name": "restoreToken", "type": "STRING"},
  {"name": "hardDeleteTime", "type": "TIMESTAMP"},
  {"name": "md5Hash", "type": "STRING"},
  {"name": "contentEncoding", "type": "STRING"},
  {"name": "contentDisposition", "type": "STRING"},
  {"name": "contentLanguage", "type": "STRING"},
  {"name": "cacheControl", "type": "STRING"},
  {"name": "crc32c", "type": "STRING"},
  {"name": "componentCount", "type": "INTEGER"},
  {"name": "etag", "type": "STRING"},
  {"name": "kmsKeyName", "type": "STRING"},
  {"name": "temporaryHold", "type": "BOOLEAN"},
  {"name": "eventBasedHold", "type": "BOOLEAN"},
  {"name": "retentionExpirationTime", "type": "TIMESTAMP"},
  {"name": "retention", "type": "RECORD", "fields": [
    {"name": "retainUntilTime", "type": "TIMESTAMP"},
    {"name": "mode", "type": "STRING"}
  ]},
  {"name": "timeCreated", "type": "TIMESTAMP"},
  {"name": "updated", "type": "TIMESTAMP"},
  {"name": "timeDeleted", "type": "TIMESTAMP"},
  {"name": "timeStorageClassUpdated", "type": "TIMESTAMP"},
  {"name": "customTime", "type": "TIMESTAMP"},
  {"name": "metadata", "type": "RECORD", "mode": "REPEATED", "fields": [
    {"name": "key", "type": "STRING"},
    {"name": "value", "type": "STRING"}
  ]},
  {"name": "acl", "type": "RECORD", "mode": "REPEATED", "fields": [
    {"name": "entity", "type": "STRING"},
    {"name": "role", "type": "STRING"}
  ]},
  {"name": "owner", "type": "RECORD", "fields": [
    {"name": "entity", "type": "STRING"},
    {"name": "entityId", "type": "STRING"}
  ]},
  {"name": "customerEncryption", "type": "RECORD", "fields": [
    {"name": "encryptionAlgorithm", "type": "STRING"},
    {"name": "keySha256", "type": "STRING"}
  ]}
]
動作確認
GCSにファイルをアップロードしてみる。
gcloud storage cp test.csv gs://ckfm0211-bucket-notification-test/2024/12/5/test_table/datafile.csv
テーブルを確認。
bq head ${PROJECT_NAME}:${BQ_DATASET_NAME}.${BQ_TABLE_NAME_WITH_TABLE_SCHEMA}

|      kind      |                                          id                                          |                                                       selfLink                                                        |                                                                                mediaLink                                                                                 |               name                |              bucket               |    generation    | metageneration | contentType | storageClass | size | softDeleteTime | restoreToken | hardDeleteTime |         md5Hash          | contentEncoding | contentDisposition | contentLanguage | cacheControl |  crc32c  | componentCount |       etag       | kmsKeyName | temporaryHold | eventBasedHold | retentionExpirationTime | retention |     timeCreated     |       updated       | timeDeleted | timeStorageClassUpdated | customTime | metadata | acl | owner | customerEncryption |
+----------------+--------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-----------------------------------+------------------+----------------+-------------+--------------+------+----------------+--------------+----------------+--------------------------+-----------------+--------------------+-----------------+--------------+----------+----------------+------------------+------------+---------------+----------------+-------------------------+-----------+---------------------+---------------------+-------------+-------------------------+------------+----------+-----+-------+--------------------+
| storage#object | ckfm0211-bucket-notification-test/2024/12/5/test_table/datafile.csv/1732520594602464 | https://www.googleapis.com/storage/v1/b/ckfm0211-bucket-notification-test/o/2024%2F12%2F5%2Ftest_table%2Fdatafile.csv | https://storage.googleapis.com/download/storage/v1/b/ckfm0211-bucket-notification-test/o/2024%2F12%2F5%2Ftest_table%2Fdatafile.csv?generation=1732520594602464&alt=media | 2024/12/5/test_table/datafile.csv | ckfm0211-bucket-notification-test | 1732520594602464 |              1 | text/csv    | STANDARD     |   10 |           NULL | NULL         |           NULL | gkwcnrFC+35b4Lxf+5cyvQ== | NULL            | NULL               | NULL            | NULL         | uXIkYw== |           NULL | CODz7ZT+9okDEAE= | NULL       |          NULL |           NULL |                    NULL |      NULL | 2024-11-25 07:43:14 | 2024-11-25 07:43:14 |        NULL |     2024-11-25 07:43:14 |       NULL |       [] |  [] |  NULL |               NULL |

bq head ${PROJECT_NAME}:${BQ_DATASET_NAME}.${BQ_TABLE_NAME_LESS_SCHEMA}

|                                                                                                                                                                                                                                                                                                                                                                                                                                       data                                                                                                                                                                                                                                                                                                                                                                                                                                        |

| {"kind":"storage#object","id":"ckfm0211-bucket-notification-test/2024/12/5/test_table/datafile.csv/1732520594602464","selfLink":"https://www.googleapis.com/storage/v1/b/ckfm0211-bucket-notification-test/o/2024%2F12%2F5%2Ftest_table%2Fdatafile.csv","name":"2024/12/5/test_table/datafile.csv","bucket":"ckfm0211-bucket-notification-test","generation":"1732520594602464","metageneration":"1","contentType":"text/csv","timeCreated":"2024-11-25T07:43:14.648Z","updated":"2024-11-25T07:43:14.648Z","storageClass":"STANDARD","timeStorageClassUpdated":"2024-11-25T07:43:14.648Z","size":"10","md5Hash":"gkwcnrFC+35b4Lxf+5cyvQ==","mediaLink":"https://storage.googleapis.com/download/storage/v1/b/ckfm0211-bucket-notification-test/o/2024%2F12%2F5%2Ftest_table%2Fdatafile.csv?generation=1732520594602464&alt=media","crc32c":"uXIkYw==","etag":"CODz7ZT+9okDEAE="} |

データが入った。
注意点
テーブルスキーマを採用する場合には、Google Cloud側の仕様変更等によりPub/Subに通知されるGCS通知のJSONスキーマが変更される可能性があるため、その際には
- テーブル定義の変更
 - 事前に「Drop unknown fields」を有効化
 
が必要になる。また、前者の場合には失敗を検知するために「Dead lettering」を設定しておく必要がある。
まとめ
GCSにオブジェクトが格納された際のメタデータをBigQueryに自動連携する仕組みを、Pub/SubのBigQuery Subscriptionで構築できた。
データのスキーマが外部に依存(今回はGoogle Cloud)に依存するので、実際のシステムで構築するならスキーマなしでの利用が手軽かと思う。
ただし、このデータを活用する際にはJSON型のデータを取り扱うことになるので、それはそれで少し面倒か。JSON_VALUE関数使えばいいだけなんだけど。
また、Subscription設定では通知のフィルタリングも可能なので、delete objectは無視する、だったり、特定のオブジェクトキーに一致するパスの通知だけを取得するなど、実際のシステムに合わせた変更は必要ではある。
なお、同様のデータはStorage Insights のインベントリ レポートでも取得が可能(CSV or Apache Parquet)。
こちらはGCSに定期的にストレージにあるオブジェクトの情報を出力してくれるので、それをまた自動的にスケジュールクエリなどでBigQueryへロードする、というのでも同じことはできる。
ただ、バケット内の全量が毎回出力されるので、今回の方式のほうがコスパはよさそう。