はじめに
オブジェクトストレージへのファイルアップロードをトリガーに後続の処理を実行したい、というケースは多々あると思います。
今回はGoogle CloudのオブジェクトストレージであるGCSへファイルをアップロードしたことをトリガーに、後続処理としてWorkflowsを起動する方法を検討しました。尚、要件としてGCSの prefix指定 ができることをゴールとしています。
事前準備
検討を始める前に、本筋から外れる以下リソースを事前作成しています。
- GCSバケット: workflow-trigger-test-bucket
- Workflow: workflow-trigger-test
- サービスアカウント: sa-workflow-trigger-test@.iam.gserviceaccount.com
- ロールは以下をバインド済み
- roles/workflows.invoker
- roles/pubsub.publisher
- roles/eventarc.developer
- roles/eventarc.eventReceiver
- ロールは以下をバインド済み
方式検討
Workflowsをイベントトリガーで実行したいという要件で真っ先に思いつくのが、Eventarcによる起動だと思います。実際、Workflowの作成、編集時に設定できるトリガーにもEventarcが用意されており、その中で今回ソースとしたいGCSも指定することが可能となっています。
なんだ、簡単にできるじゃないか!となればよいのですが、今回のゴールである prefix指定 をする項目が見つかりません。そうなんです、Eventarcのトリガーを作成するだけではバケット指定までが可能で、prefix指定はできません。今回は特定のprefixでファイルがアップロードされたことを条件としたいので、バケット全体を条件にしてしまうと、該当バケットに対する全てのアップロード操作でWorkflowが起動してしまうことになります。どこかでフィルタをかけたい。。
コンソールでは指定できないことは分かったので、gcloudコマンドでそれっぽいオプションを指定してみましたが、prefix指定はできませんでした。
$ gcloud eventarc triggers create test-trigger \
--location=asia-northeast1 \
--destination-workflow=workflow-trigger-test \
--event-filters="type=google.cloud.storage.object.v1.finalized" \
--event-filters="bucket=workflow-trigger-test-bucket" \
--event-filters-path-pattern="prefix=test001/*" \
--service-account=sa-workflow-trigger-test@<project_id>.iam.gserviceaccount.com \
--project=<project_id>
ERROR: (gcloud.eventarc.triggers.create) INVALID_ARGUMENT: The request was invalid: invalid argument: event type google.cloud.storage.object.v1.finalized not supported: attribute prefix not found within event type
調べると、確かにtype=google.cloud.storage.object.v1.finalized
はpath-pattern
がサポートされていないようでした。
# サポートされているケース
$ gcloud eventarc providers describe cloudaudit.googleapis.com --location=asia-northeast1
displayName: Cloud Audit Logs
eventTypes:
- description: An audit log is created that matches the trigger's filter criteria.
filteringAttributes:
- attribute: methodName
description: The identifier of the service's operation.
required: true
- attribute: resourceName
description: The complete path to a resource. Used to filter events for a specific
resource.
pathPatternSupported: true
- attribute: serviceName
description: The identifier of the Google Cloud service.
required: true
- attribute: type
required: true
type: google.cloud.audit.log.v1.written
name: projects/<project_id>/locations/asia-northeast1/providers/cloudaudit.googleapis.com
※"pathPatternSupported: true"となっている。
# サポートされていないケース
$ gcloud eventarc providers describe storage.googleapis.com --location=asia-northeast1
displayName: Cloud Storage
eventTypes:
- description: The live version of an object has become a noncurrent version.
filteringAttributes:
- attribute: bucket
description: The bucket name being watched.
required: true
- attribute: type
required: true
type: google.cloud.storage.object.v1.archived
- description: An object has been permanently deleted.
filteringAttributes:
- attribute: bucket
description: The bucket name being watched.
required: true
- attribute: type
required: true
type: google.cloud.storage.object.v1.deleted
- description: A new object is successfully created in the bucket.
filteringAttributes:
- attribute: bucket
description: The bucket name being watched.
required: true
- attribute: type
required: true
type: google.cloud.storage.object.v1.finalized
- description: The metadata of an existing object changes.
filteringAttributes:
- attribute: bucket
description: The bucket name being watched.
required: true
- attribute: type
required: true
type: google.cloud.storage.object.v1.metadataUpdated
name: projects/<project_id>/locations/asia-northeast1/providers/storage.googleapis.com
※"pathPatternSupported" が存在しない。
参考: パスパターンについて
Eventarcトリガー作成時には、バケットのprefixを指定できそうにないことは分かりました。
ただどこかでフィルターをかけたい・・・ということで、そもそもEventarcトリガーを作成した際に、裏でどのようなリソースが作成されているのかを調べてみることにしました。
$ gcloud eventarc triggers create test-trigger \
--location=asia-northeast1 \
--destination-workflow=workflow-trigger-test \
--event-filters="type=google.cloud.storage.object.v1.finalized" \
--event-filters="bucket=workflow-trigger-test-bucket" \
--service-account=sa-workflow-trigger-test@<project_id>.iam.gserviceaccount.com \
--project=<project_id>
上記コマンドでWorkflowsのトリガーを作成すると、合わせてCloud StorageのNotification、Pub/Sub Topicが作成されていることが分かりました。
# Eventarc Triggerの確認
$ gcloud eventarc triggers list
NAME TYPE DESTINATION ACTIVE LOCATION
test-trigger google.cloud.storage.object.v1.finalized Workflows: workflow-trigger-test Yes asia-northeast1
# Pub/Sub Topicの確認
$ gcloud pubsub topics list
---
labels:
goog-drz-eventarc-location: asia-northeast1
goog-drz-eventarc-uuid: 84e15887-4ca8-49a3-84d3-505223b737fd
goog-eventarc: ''
messageStoragePolicy:
allowedPersistenceRegions:
- asia-northeast1
name: projects/<project_id>/topics/eventarc-asia-northeast1-test-trigger-120
# Cloud Storage Notificationの確認
$ gcloud storage buckets notifications list
---
Bucket URL: gs://workflow-trigger-test-bucket/
Notification Configuration:
etag: eventarc-<project_id>-asia-northeast1-test-trigger
event_types:
- OBJECT_FINALIZE
id: eventarc-<project_id>-asia-northeast1-test-trigger
kind: storage#notification
payload_format: JSON_API_V1
selfLink: https://www.googleapis.com/storage/v1/b/workflow-trigger-test-bucket/notificationConfigs/eventarc-<project_id>-asia-northeast1-test-trigger
topic: //pubsub.googleapis.com/projects/<project_id>/topics/eventarc-asia-northeast1-test-trigger-120
図で表すと、以下のような構成であることが分かります。
検証
このように整理すると、大本となるCloud StorageのNotificationの時点でフィルタできればよいのでは?という仮説を考えまして、gcloud storage buckets notifications create
のオプションで--object-prefix
が指定できそう!ということで、実際にNotificationを作成してみます。
参考: gcloud storage buckets notifications create
gcloud storage buckets notifications create gs://workflow-trigger-test-bucket \
--event-types=OBJECT_FINALIZE \
--object-prefix=test001/ \
--topic=gcs-topic-workflow-trigger-test \
--payload-format=json \
--project=<project_id>
Notificationの作成コマンドでは、Pub/Sub Topicを新たに指定しています。
指定した名前で新たにTopicも作成されています。
# Cloud Storage Notificationの確認
$ gcloud storage buckets notifications list
Bucket URL: gs://workflow-trigger-test-bucket/
Notification Configuration:
etag: '2'
event_types:
- OBJECT_FINALIZE
id: '2'
kind: storage#notification
object_name_prefix: test001/
payload_format: JSON_API_V1
selfLink: https://www.googleapis.com/storage/v1/b/workflow-trigger-test-bucket/notificationConfigs/2
topic: //pubsub.googleapis.com/projects/<project_id>/topics/gcs-topic-workflow-trigger-test
# Pub/Sub Topicの確認
$ gcloud pubsub topics list
---
name: projects/<project_id>/topics/gcs-topic-workflow-trigger-test
object_name_prefixが指定されたNotificationとTopicが紐づけられていることが確認できましたので、次はEventarc TriggerとTopic、Workflowを紐づけます。
gcloud eventarc triggers create eventarc-trigger-test \
--location=asia-northeast1 \
--destination-workflow=workflow-trigger-test \
--destination-workflow-location=asia-northeast1 \
--event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
--service-account=sa-workflow-trigger-test@<project_id>.iam.gserviceaccount.com \
--transport-topic=gcs-topic-workflow-trigger-test \
--project=<project_id>
EventarcとPub/Subの状況を見ると、確かに紐づいている様子が確認できます。
# Eventarc Triggerの確認
$ gcloud eventarc triggers list
NAME TYPE DESTINATION ACTIVE LOCATION
eventarc-trigger-test google.cloud.pubsub.topic.v1.messagePublished Workflows: workflow-trigger-test Yes asia-northeast1
$ gcloud eventarc triggers describe eventarc-trigger-test --location asia-northeast1
createTime: '2024-11-25T10:10:18.611202525Z'
destination:
workflow: projects/<project_id>/locations/asia-northeast1/workflows/workflow-trigger-test
eventFilters:
- attribute: type
value: google.cloud.pubsub.topic.v1.messagePublished
name: projects/<project_id>/locations/asia-northeast1/triggers/eventarc-trigger-test
serviceAccount: workflow-trigger-test@<project_id>.iam.gserviceaccount.com
transport:
pubsub:
subscription: projects/<project_id>/subscriptions/eventarc-asia-northeast1-eventarc-trigger-test-sub-613
topic: projects/<project_id>/topics/gcs-topic-workflow-trigger-test
uid: 0c0d5d0e-8610-4ffc-80d5-5c63d12dfa89
updateTime: '2024-11-25T10:10:36.342382396Z'
# Pub/Sub Topicの確認
$ gcloud pubsub topics list-subscriptions gcs-topic-workflow-trigger-test
---
projects/<project_id>/subscriptions/eventarc-asia-northeast1-eventarc-trigger-test-sub-613
実際に、対象Workflowのほうを確認すると、トリガーが作成されていることが分かります。
動作確認
設定上はprefix指定でgs://workflow-trigger-test-bucket/test001
にファイルがアップロードされたことをトリガーにworkflow-trigger-test
が実行される状態となりました。実際に意図した挙動となるか、動作確認してみます。
確認前の状態
動作確認前のWorkflow実行状態は以下の通りです。
$ gcloud workflows executions list --location=asia-northeast1 workflow-trigger-test
NAME STATE START_TIME END_TIME
projects/111122223333/locations/asia-northeast1/workflows/workflow-trigger-test/executions/e6717791-a6d1-4654-80c9-5c73aec25960 SUCCEEDED 2024-11-25T02:56:31.294579208Z 2024-11-25T02:56:31.598102493Z
projects/111122223333/locations/asia-northeast1/workflows/workflow-trigger-test/executions/dc1b3f75-8300-4102-937f-aaf6790b55d7 SUCCEEDED 2024-11-25T02:55:13.141437820Z 2024-11-25T02:55:13.797743089Z
ファイルのアップロード
まずWorkflowが実行されない想定のgs://workflow-trigger-test-bucket/
直下にファイルをアップロードしてみます。
# 直下にファイルをアップロード
$ gcloud storage cp testfile.txt gs://workflow-trigger-test-bucket/
Copying file://testfile.txt to gs://workflow-trigger-test-bucket/testfile.txt
Completed files 1/1 | 8.0B/8.0B
# Workflowの実行状態を確認
$ gcloud workflows executions list --location=asia-northeast1 workflow-trigger-test
NAME STATE START_TIME END_TIME
projects/111122223333/locations/asia-northeast1/workflows/workflow-trigger-test/executions/e6717791-a6d1-4654-80c9-5c73aec25960 SUCCEEDED 2024-11-25T02:56:31.294579208Z 2024-11-25T02:56:31.598102493Z
projects/111122223333/locations/asia-northeast1/workflows/workflow-trigger-test/executions/dc1b3f75-8300-4102-937f-aaf6790b55d7 SUCCEEDED 2024-11-25T02:55:13.141437820Z 2024-11-25T02:55:13.797743089Z
対象バケット直下にファイルをアップロードしましたが、想定通りworkflow-trigger-test
は実行されていませんでした。
次はWorkflowsが実行される想定のgs://workflow-trigger-test-bucket/test001/
にファイルをアップロードしてみます。
# test001/にファイルをアップロード
$ gcloud storage cp testfile.txt gs://workflow-trigger-test-bucket/test001/
Copying file://testfile.txt to gs://workflow-trigger-test-bucket/test001/testfile.txt
Completed files 1/1 | 8.0B/8.0B
$ gcloud workflows executions list --location=asia-northeast1 workflow-trigger-test
NAME STATE START_TIME END_TIME
projects/19452606482/locations/asia-northeast1/workflows/workflow-trigger-test/executions/51194d7b-a010-483e-9666-e4e499c5143b SUCCEEDED 2024-11-25T13:29:35.944051280Z 2024-11-25T13:29:36.634987306Z
projects/19452606482/locations/asia-northeast1/workflows/workflow-trigger-test/executions/a6d3211c-581b-44fc-9b08-e34a10a81316 SUCCEEDED 2024-11-25T13:29:35.930985060Z 2024-11-25T13:29:36.388231421Z
projects/19452606482/locations/asia-northeast1/workflows/workflow-trigger-test/executions/e6717791-a6d1-4654-80c9-5c73aec25960 SUCCEEDED 2024-11-25T02:56:31.294579208Z 2024-11-25T02:56:31.598102493Z
バケット直下にファイルをアップロードした際は実行されなかったWorkflowが、実行されていることが分かります。意図した通り、prefixを指定した制御ができるようになりました!
まとめ
今回はprefix指定によるWorkflowの実行を制御する方式を実現しました。試行錯誤しましたが、まとめると以下2点がポイントとなります。
- Cloud StorageのNotificationをprefix指定で作成する
- NotificationとPub/Sub Topicが作成される
- 先ほど作成されたPub/Sub Topic、実行するWorkflowを指定しEventarc Triggerを作成する
- TopicのSubscriptionにEventarc Triggerが登録される
パッと思いつく方法ですとなかなか今回の方式に辿り着くことはできなかったので、同様の要件をお持ちの皆様の一助になれば幸いです。
また、よりシンプルな実装方法をご存知の方がいらっしゃれば、是非コメントよろしくお願いします!