5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

BigQueryAdvent Calendar 2024

Day 4

Google Cloud Pub/SubのBigQuery Subscriptionを使って、GCSからBigQueryへ通知&連携してみた

Last updated at Posted at 2024-12-03

はじめに

BigQueryへのデータ連携パイプラインでGoogle Cloud Storage (GCS) を活用するケースは多い。
「どれだけのデータが連携されたか」「どのタイミングで連携されたか」のモニタリングを手軽にやりたい。
そこで、Pub/SubのBigQuery Subscriptionを活用して、GCSへのファイル格納時に発生するPub/Sub通知をBigQueryに直接連携してみた。

Pub/SubのBigQuery Subscriptionとは?

Pub/SubのBigQuery Subscriptionは、Pub/SubのメッセージをBigQueryに直接書き込む仕組み。

BigQuery Subscriptionで作成されるテーブルのスキーマ指定には3種類ある。

  1. トピックスキーマ
    あらかじめ Pub/Sub トピックにスキーマ(メッセージの形式)を定義する。Apache Avro と Protocol Buffer の2つの形式に対応。
  2. テーブルスキーマ
    Pub/Sub に投入された JSON メッセージのキー名と対応する列に、データが書き込まれる。あらかじめ Pub/Sub 側にスキーマを設定しておく必要はない。
  3. スキーマなし
    データは対象 BigQuery テーブルの data という名称の列にメッセージを書き込まれる。

今回はGCSからのイベント通知を取り込むので、テーブルスキーマ、スキーマなしそれぞれを試してみた。

構築手順

# 事前準備
export PROJECT_NAME=aa
export PROJECT_NUMBER=11
export BQ_DATASET_NAME=bb
export BQ_TABLE_NAME_WITH_TABLE_SCHEMA=gcs_notification_table_schema
export BQ_TABLE_NAME_LESS_SCHEMA=gcs_notification_less_schema
export GCS_BUCKET_NAME=dd
export PUBSUB_TOPIC_NAME=ee
export PUBSUB_SUBSCRIPTION_NAME_WITH_TABLE_SCHEMA=ff
export PUBSUB_SUBSCRIPTION_NAME_LESS_SCHEMA=gg

# Pub/Sub Topic作成(省略可)
gcloud pubsub topics create ${PUBSUB_TOPIC_NAME}

# バケットを作成
gsutil mb gs://${GCS_BUCKET_NAME}

# 通知設定を作成
gcloud storage buckets notifications create gs://${GCS_BUCKET_NAME} --topic=${PUBSUB_TOPIC_NAME}

# Pub/SubのサービスエージェントにBigQueryデータ編集者の権限を付与
gcloud projects add-iam-policy-binding ${PROJECT_NAME} \
--member="serviceAccount:service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com" \
--role="roles/bigquery.dataEditor"

# BigQueryにテーブルを事前作成
## テーブルスキーマ
bq mk --table ${BQ_DATASET_NAME}.${BQ_TABLE_NAME_WITH_TABLE_SCHEMA} schema.json

## スキーマなし
bq mk --table ${BQ_DATASET_NAME}.${BQ_TABLE_NAME_LESS_SCHEMA} data:JSON

# BigQuery Subscriptionを作成
## テーブルスキーマ
gcloud pubsub subscriptions create ${PUBSUB_SUBSCRIPTION_NAME_WITH_TABLE_SCHEMA} \
  --topic ${PUBSUB_TOPIC_NAME} \
  --bigquery-table ${PROJECT_NAME}:${BQ_DATASET_NAME}.${BQ_TABLE_NAME_WITH_TABLE_SCHEMA} \
  --use-table-schema

## スキーマなし
gcloud pubsub subscriptions create ${PUBSUB_SUBSCRIPTION_NAME_LESS_SCHEMA} \
  --topic ${PUBSUB_TOPIC_NAME} \
  --bigquery-table ${PROJECT_NAME}:${BQ_DATASET_NAME}.${BQ_TABLE_NAME_LESS_SCHEMA}
schama.jsonの中身

https://cloud.google.com/storage/docs/json_api/v1/objects#resource-representations
を参考に。

[
  {"name": "kind", "type": "STRING"},
  {"name": "id", "type": "STRING"},
  {"name": "selfLink", "type": "STRING"},
  {"name": "mediaLink", "type": "STRING"},
  {"name": "name", "type": "STRING"},
  {"name": "bucket", "type": "STRING"},
  {"name": "generation", "type": "INTEGER"},
  {"name": "metageneration", "type": "INTEGER"},
  {"name": "contentType", "type": "STRING"},
  {"name": "storageClass", "type": "STRING"},
  {"name": "size", "type": "INTEGER"},
  {"name": "softDeleteTime", "type": "TIMESTAMP"},
  {"name": "restoreToken", "type": "STRING"},
  {"name": "hardDeleteTime", "type": "TIMESTAMP"},
  {"name": "md5Hash", "type": "STRING"},
  {"name": "contentEncoding", "type": "STRING"},
  {"name": "contentDisposition", "type": "STRING"},
  {"name": "contentLanguage", "type": "STRING"},
  {"name": "cacheControl", "type": "STRING"},
  {"name": "crc32c", "type": "STRING"},
  {"name": "componentCount", "type": "INTEGER"},
  {"name": "etag", "type": "STRING"},
  {"name": "kmsKeyName", "type": "STRING"},
  {"name": "temporaryHold", "type": "BOOLEAN"},
  {"name": "eventBasedHold", "type": "BOOLEAN"},
  {"name": "retentionExpirationTime", "type": "TIMESTAMP"},
  {"name": "retention", "type": "RECORD", "fields": [
    {"name": "retainUntilTime", "type": "TIMESTAMP"},
    {"name": "mode", "type": "STRING"}
  ]},
  {"name": "timeCreated", "type": "TIMESTAMP"},
  {"name": "updated", "type": "TIMESTAMP"},
  {"name": "timeDeleted", "type": "TIMESTAMP"},
  {"name": "timeStorageClassUpdated", "type": "TIMESTAMP"},
  {"name": "customTime", "type": "TIMESTAMP"},
  {"name": "metadata", "type": "RECORD", "mode": "REPEATED", "fields": [
    {"name": "key", "type": "STRING"},
    {"name": "value", "type": "STRING"}
  ]},
  {"name": "acl", "type": "RECORD", "mode": "REPEATED", "fields": [
    {"name": "entity", "type": "STRING"},
    {"name": "role", "type": "STRING"}
  ]},
  {"name": "owner", "type": "RECORD", "fields": [
    {"name": "entity", "type": "STRING"},
    {"name": "entityId", "type": "STRING"}
  ]},
  {"name": "customerEncryption", "type": "RECORD", "fields": [
    {"name": "encryptionAlgorithm", "type": "STRING"},
    {"name": "keySha256", "type": "STRING"}
  ]}
]

動作確認

GCSにファイルをアップロードしてみる。

gcloud storage cp test.csv gs://ckfm0211-bucket-notification-test/2024/12/5/test_table/datafile.csv

テーブルを確認。

bq head ${PROJECT_NAME}:${BQ_DATASET_NAME}.${BQ_TABLE_NAME_WITH_TABLE_SCHEMA}
+----------------+--------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-----------------------------------+------------------+----------------+-------------+--------------+------+----------------+--------------+----------------+--------------------------+-----------------+--------------------+-----------------+--------------+----------+----------------+------------------+------------+---------------+----------------+-------------------------+-----------+---------------------+---------------------+-------------+-------------------------+------------+----------+-----+-------+--------------------+
|      kind      |                                          id                                          |                                                       selfLink                                                        |                                                                                mediaLink                                                                                 |               name                |              bucket               |    generation    | metageneration | contentType | storageClass | size | softDeleteTime | restoreToken | hardDeleteTime |         md5Hash          | contentEncoding | contentDisposition | contentLanguage | cacheControl |  crc32c  | componentCount |       etag       | kmsKeyName | temporaryHold | eventBasedHold | retentionExpirationTime | retention |     timeCreated     |       updated       | timeDeleted | timeStorageClassUpdated | customTime | metadata | acl | owner | customerEncryption |
+----------------+--------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-----------------------------------+------------------+----------------+-------------+--------------+------+----------------+--------------+----------------+--------------------------+-----------------+--------------------+-----------------+--------------+----------+----------------+------------------+------------+---------------+----------------+-------------------------+-----------+---------------------+---------------------+-------------+-------------------------+------------+----------+-----+-------+--------------------+
| storage#object | ckfm0211-bucket-notification-test/2024/12/5/test_table/datafile.csv/1732520594602464 | https://www.googleapis.com/storage/v1/b/ckfm0211-bucket-notification-test/o/2024%2F12%2F5%2Ftest_table%2Fdatafile.csv | https://storage.googleapis.com/download/storage/v1/b/ckfm0211-bucket-notification-test/o/2024%2F12%2F5%2Ftest_table%2Fdatafile.csv?generation=1732520594602464&alt=media | 2024/12/5/test_table/datafile.csv | ckfm0211-bucket-notification-test | 1732520594602464 |              1 | text/csv    | STANDARD     |   10 |           NULL | NULL         |           NULL | gkwcnrFC+35b4Lxf+5cyvQ== | NULL            | NULL               | NULL            | NULL         | uXIkYw== |           NULL | CODz7ZT+9okDEAE= | NULL       |          NULL |           NULL |                    NULL |      NULL | 2024-11-25 07:43:14 | 2024-11-25 07:43:14 |        NULL |     2024-11-25 07:43:14 |       NULL |       [] |  [] |  NULL |               NULL |
+----------------+--------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-----------------------------------+------------------+----------------+-------------+--------------+------+----------------+--------------+----------------+--------------------------+-----------------+--------------------+-----------------+--------------+----------+----------------+------------------+------------+---------------+----------------+-------------------------+-----------+---------------------+---------------------+-------------+-------------------------+------------+----------+-----+-------+--------------------+

bq head ${PROJECT_NAME}:${BQ_DATASET_NAME}.${BQ_TABLE_NAME_LESS_SCHEMA}
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                                                                                                                                                                                                                                                       data                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| {"kind":"storage#object","id":"ckfm0211-bucket-notification-test/2024/12/5/test_table/datafile.csv/1732520594602464","selfLink":"https://www.googleapis.com/storage/v1/b/ckfm0211-bucket-notification-test/o/2024%2F12%2F5%2Ftest_table%2Fdatafile.csv","name":"2024/12/5/test_table/datafile.csv","bucket":"ckfm0211-bucket-notification-test","generation":"1732520594602464","metageneration":"1","contentType":"text/csv","timeCreated":"2024-11-25T07:43:14.648Z","updated":"2024-11-25T07:43:14.648Z","storageClass":"STANDARD","timeStorageClassUpdated":"2024-11-25T07:43:14.648Z","size":"10","md5Hash":"gkwcnrFC+35b4Lxf+5cyvQ==","mediaLink":"https://storage.googleapis.com/download/storage/v1/b/ckfm0211-bucket-notification-test/o/2024%2F12%2F5%2Ftest_table%2Fdatafile.csv?generation=1732520594602464&alt=media","crc32c":"uXIkYw==","etag":"CODz7ZT+9okDEAE="} |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

データが入った。

注意点

テーブルスキーマを採用する場合には、Google Cloud側の仕様変更等によりPub/Subに通知されるGCS通知のJSONスキーマが変更される可能性があるため、その際には

  • テーブル定義の変更
  • 事前に「Drop unknown fields」を有効化

が必要になる。また、前者の場合には失敗を検知するために「Dead lettering」を設定しておく必要がある。

まとめ

GCSにオブジェクトが格納された際のメタデータをBigQueryに自動連携する仕組みを、Pub/SubのBigQuery Subscriptionで構築できた。
データのスキーマが外部に依存(今回はGoogle Cloud)に依存するので、実際のシステムで構築するならスキーマなしでの利用が手軽かと思う。
ただし、このデータを活用する際にはJSON型のデータを取り扱うことになるので、それはそれで少し面倒か。JSON_VALUE関数使えばいいだけなんだけど。

また、Subscription設定では通知のフィルタリングも可能なので、delete objectは無視する、だったり、特定のオブジェクトキーに一致するパスの通知だけを取得するなど、実際のシステムに合わせた変更は必要ではある。

なお、同様のデータはStorage Insights のインベントリ レポートでも取得が可能(CSV or Apache Parquet)。

こちらはGCSに定期的にストレージにあるオブジェクトの情報を出力してくれるので、それをまた自動的にスケジュールクエリなどでBigQueryへロードする、というのでも同じことはできる。
ただ、バケット内の全量が毎回出力されるので、今回の方式のほうがコスパはよさそう。

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?