はじめに
この記事は株式会社primeNumberの夏の自由研究企画、AI Native Summer Calendar 2025の8/30分の記事です。
データ処理の最適化を考えたときに、「TROCCOのジョブ実行をファイル格納を起点としたイベントドリブンにしたい」というご要望を、これまでにいただくことがありました。
「その気持ちは分かります、すみません・・・」と思いながらも、現時点では機能的に提供できていません。そこで、今回プロダクトを補完するものとして、Claude Codeとともにサンプルコードを作成してみました。
実装はTerraform Moduleの形式になっていますが、Terraformはあくまでリソースを作成するためのツールなので、対象になっているリソースをGUIで作成すればTerraformを利用しなくとも実装可能です。
今回の実装はあくまでサンプルなため、
- APIキーをTerraform経由で登録している
- 実運用で細かいテストをしているわけではない
などの課題があるものになります。実運用の際には設定を調整&検証の上、ご利用ください。
こんな方におすすめ
- TROCCOをファイル格納のイベントドリブンで実行したい方
- Amazon S3 / Google Cloud Storageのファイル格納を起点にしたLambda / Cloud Run Functions実行の仕組みについて参考になる実装を見たい方
できるようになったこと
今回作成したTerraform ModuleはGitHubで公開しています。
ModuleとしてはAWS環境用およびGoogle Cloud環境用の2種類を用意していますが、いずれも実装内容はほぼ同一で、
- オブジェクトストレージにファイルが格納される
- ファイル格納を起点にメッセージが作成される
- メッセージを受け取って、FaaSを起動する
- FaaSからTROCCO APIを実行する
といった流れになっています。
AWSの例
S3にファイルを格納します。このとき、完了日時は2025/08/30 13:47:45になっています。
(手動格納が面倒なのでとりあえずTROCCOで突っ込んでいます)
すると、S3からの取り込みの転送ジョブがAPI経由で実行されます。転送ジョブの起動日時(=カスタム変数展開時刻)は2025/08/30 13:47:47なので、ほぼ即時ですね。
なお、このときパスプレフィックスやファイルのURL、更新日時をカスタム変数の値として入力しています。
Google Cloudの例
Google Cloud Storageにファイルを格納します。このとき、完了日時は2025/08/30 13:50:09になっています。
すると、Google Cloud Storageからの取り込みの転送ジョブがAPI経由で実行されます。転送ジョブの起動日時は2025/08/30 13:50:10なので、こちらもほぼ即時です。
処理概要
それぞれの処理の概要については以下のようになっています。
AWSの場合
Google Cloudの場合
注意点やハマりどころなど
利用する際の注意点や、実装していたときのハマりどころについてもまとめておきます。
AWSの場合
Terraformの実装
- 対象は
./modules/event_driven_ingestion/aws/main.tf
- SQSが適切に処理されずに、デッドレターキューに入った場合の処理は実装されていません。それほど発生はしないだろうと思いつつ・・・。
- Lambdaのスクリプトをデプロイするにあたり、ローカル側で依存関係を含めてzip圧縮しないといけないため、以下のような形で実装しています。そこで、Terraformの実行環境でPythonを利用できる必要があります。
resource "local_file" "job_definition_mappings" {
content = jsonencode(var.module_config.job_definition_mappings)
filename = "${local.lambda_source_dir}/job_definition_mappings.json"
}
resource "null_resource" "install_dependencies" {
triggers = {
requirements_hash = filemd5("${local.lambda_source_dir}/requirements.txt")
source_code_hash = filemd5("${local.lambda_source_dir}/main.py")
}
provisioner "local-exec" {
command = "cd ${local.lambda_source_dir} && pip install -r requirements.txt -t packages/"
}
}
data "archive_file" "lambda" {
type = "zip"
source_dir = local.lambda_source_dir
output_path = local.lambda_zip_path
excludes = [
"requirements.txt"
]
depends_on = [
local_file.job_definition_mappings,
null_resource.install_dependencies,
]
}
- Parameter StoreへのTROCCO API Key、Slack Webhook URLの登録をTerraform経由で行っていますが、stateファイルにキーが残ってしまうので、実運用にはこの形は非推奨です。
Lambdaのスクリプト
- 対象は
./modules/event_driven_ingestion/aws/src/main.py
- Google Cloudと違ってメッセージ自体にファイルの更新日時が入ってこないので、別途スクリプトで情報を取得しています。
- 改めて見て思ったのですが、Slack通知のエラーハンドリングが適切ではないかもしれません。ステータス
200
のok: false
で返ってくるような気がする・・・??
def send_slack_notification(error_message: str):
"""Send error notification to Slack."""
try:
webhook_url = get_slack_webhook_url()
payload = {"text": error_message}
response = requests.post(webhook_url, json=payload)
if response.status_code != 200:
logger.error(
f"Failed to send Slack notification: {response.status_code} - {response.text}"
)
except Exception as e:
logger.error(f"Error sending Slack notification: {str(e)}")
Google Cloudの場合
Terraformの実装
- 対象は
./modules/event_driven_ingestion/google_cloud/main.tf
- Secret ManagerへのTROCCO API Key、Slack Webhook URLの登録をTerraform経由で行っていますが、stateファイルにキーが残ってしまうので、実運用にはこの形は非推奨です。
Cloud Run Functionsのスクリプト
- 対象は
./modules/event_driven_ingestion/google_cloud/src/main.py
- 実は、Cloud StorageトリガーによるCloud Run FunctionsのTerraform構成については、Google Cloud側でサンプルコードが公開されています。
- しかし、やたらデフォルトサービスアカウントを利用せざるを得ない上、設定内容としても大してラップされておらず全体像が掴みにくい形になっていたので、今回はPub/Subトピックトリガーの形式で実装しています。
以下がボツにした初期実装(開いて表示)
- やたらデフォルトサービスアカウントを利用している
- ちなみに権限周りが適切になっているかは怪しいです(
ややこしすぎて死にそうでした)
/*
- Enable APIs
*/
resource "google_project_service" "apis" {
for_each = toset([
"cloudbuild.googleapis.com",
"cloudfunctions.googleapis.com",
"eventarc.googleapis.com",
"pubsub.googleapis.com",
"run.googleapis.com",
])
project = var.module_config.project.id
service = each.value
disable_on_destroy = false
}
/*
- Publish Pub/Sub messages from GCS
*/
data "google_storage_project_service_account" "default" {
project = var.module_config.project.id
}
resource "google_project_iam_member" "service_account__gcs_default" {
project = var.module_config.project.id
role = "roles/pubsub.publisher"
member = data.google_storage_project_service_account.default.member
}
resource "google_project_iam_member" "service_account_pubsub_default" {
project = var.module_config.project.id
role = "roles/iam.serviceAccountTokenCreator"
member = local.default_service_account.pub_sub.member
}
/*
- Receive Pub/Sub messages and trigger Cloud Run Functions
*/
resource "google_project_iam_member" "service_account__eventarc" {
for_each = toset([
"roles/eventarc.eventReceiver",
"roles/eventarc.serviceAgent",
"roles/run.invoker",
])
project = var.module_config.project.id
role = each.key
member = local.default_service_account.eventarc.member
depends_on = [
google_project_service.apis
]
}
resource "google_storage_bucket_iam_member" "service_account__eventarc" {
bucket = var.module_config.bucket_name
role = "projects/${var.module_config.project.id}/roles/storageBucketsGet"
member = local.default_service_account.eventarc.member
}
/*
- Create and Execute Cloud Run Functions
*/
resource "google_service_account" "function" {
project = var.module_config.project.id
account_id = local.function_name
display_name = "Service Account for TROCCO Event Driven Ingestion"
}
resource "google_project_iam_member" "service_account__function" {
for_each = toset([
"roles/eventarc.eventReceiver",
"roles/run.invoker",
"roles/logging.logWriter",
])
project = var.module_config.project.id
role = each.key
member = google_service_account.function.member
}
resource "google_storage_bucket_iam_member" "service_account__function" {
for_each = toset([
"roles/storage.objectViewer",
])
bucket = var.module_config.bucket_name
role = each.key
member = google_service_account.function.member
}
resource "google_cloudfunctions2_function" "function" {
project = var.module_config.project.id
name = local.function_name
description = "Trigger TROCCO Jobs on File Uploads to GCS"
location = var.module_config.location
build_config {
runtime = "python311"
entry_point = "handle_gcs_event"
source {
storage_source {
bucket = var.module_config.bucket_name
object = google_storage_bucket_object.function.name
}
}
}
service_config {
available_memory = "256M"
timeout_seconds = 60
min_instance_count = 1
max_instance_count = 10
service_account_email = google_service_account.function.email
ingress_settings = "ALLOW_INTERNAL_ONLY"
all_traffic_on_latest_revision = true
environment_variables = {
PROJECT_ID = var.module_config.project.id
SECRET_ID = google_secret_manager_secret.trocco_api_key.secret_id
}
}
event_trigger {
trigger_region = var.module_config.location
event_type = "google.cloud.storage.object.v1.finalized"
retry_policy = "RETRY_POLICY_DO_NOT_RETRY"
service_account_email = google_service_account.function.email
event_filters {
attribute = "bucket"
value = var.module_config.bucket_name
}
}
depends_on = [
google_project_service.apis,
google_storage_bucket_object.function,
google_secret_manager_secret_iam_member.service_account__secret_accessor
]
}
resource "local_file" "job_definition_mappings" {
content = jsonencode(var.module_config.job_definition_mappings)
filename = "${local.function_source_dir}/job_definition_mappings.json"
}
data "archive_file" "function" {
type = "zip"
source_dir = local.function_source_dir
output_path = local.function_zip_path
depends_on = [
local_file.job_definition_mappings,
]
}
resource "google_storage_bucket_object" "function" {
name = "dist/${local.function_name}-${data.archive_file.function.output_md5}.zip"
bucket = var.module_config.bucket_name
source = data.archive_file.function.output_path
}
resource "google_secret_manager_secret" "trocco_api_key" {
project = var.module_config.project.id
secret_id = "trocco-api-key-${local.function_name}"
replication {
auto {}
}
}
resource "google_secret_manager_secret_version" "trocco_api_key" {
secret = google_secret_manager_secret.trocco_api_key.id
secret_data = var.trocco_api_key
}
resource "google_secret_manager_secret_iam_member" "service_account__secret_accessor" {
project = var.module_config.project.id
secret_id = google_secret_manager_secret.trocco_api_key.id
role = "roles/secretmanager.secretAccessor"
member = google_service_account.function.member
}
- 認知負荷の要因としては、そもそもEventarcの挙動がややこしいのと、Cloud FunctionsがCloud Run Functionsになり裏側としてCloud Runをデプロイするようになっているが、そのあたりが絶妙にラップされていないからだと感じました。
- ロギングのスクリプトを書いたのが初めてだったので、これで適切かどうかはよくわかっていないです・・・。
- 改めて見て思ったのですが、Slack通知のエラーハンドリングが適切ではないかもしれません。ステータス
200
のok: false
で返ってくるような気がする・・・??(Lambdaと同様)
おわりに
公開できるレベルにするためには結構手直しをしてはいるのですが、設計~実装までClaude Codeを活用したことで、最低限動くサンプル実装を作成することができました。よろしければ参考にしてみてください。
参考