7
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Amazon S3 / Google Cloud Storageから、イベントドリブンでTROCCOを実行する(Terraform Module付き)

Last updated at Posted at 2025-08-30

はじめに

この記事は株式会社primeNumberの夏の自由研究企画、AI Native Summer Calendar 2025の8/30分の記事です。

データ処理の最適化を考えたときに、「TROCCOのジョブ実行をファイル格納を起点としたイベントドリブンにしたい」というご要望を、これまでにいただくことがありました。

「その気持ちは分かります、すみません・・・」と思いながらも、現時点では機能的に提供できていません。そこで、今回プロダクトを補完するものとして、Claude Codeとともにサンプルコードを作成してみました。

実装はTerraform Moduleの形式になっていますが、Terraformはあくまでリソースを作成するためのツールなので、対象になっているリソースをGUIで作成すればTerraformを利用しなくとも実装可能です。

今回の実装はあくまでサンプルなため、

  • APIキーをTerraform経由で登録している
  • 実運用で細かいテストをしているわけではない

などの課題があるものになります。実運用の際には設定を調整&検証の上、ご利用ください。

こんな方におすすめ

  • TROCCOをファイル格納のイベントドリブンで実行したい方
  • Amazon S3 / Google Cloud Storageのファイル格納を起点にしたLambda / Cloud Run Functions実行の仕組みについて参考になる実装を見たい方

できるようになったこと

今回作成したTerraform ModuleはGitHubで公開しています。

ModuleとしてはAWS環境用およびGoogle Cloud環境用の2種類を用意していますが、いずれも実装内容はほぼ同一で、

  • オブジェクトストレージにファイルが格納される
  • ファイル格納を起点にメッセージが作成される
  • メッセージを受け取って、FaaSを起動する
  • FaaSからTROCCO APIを実行する

といった流れになっています。

AWSの例

S3にファイルを格納します。このとき、完了日時は2025/08/30 13:47:45になっています。
(手動格納が面倒なのでとりあえずTROCCOで突っ込んでいます)

image.png

すると、S3からの取り込みの転送ジョブがAPI経由で実行されます。転送ジョブの起動日時(=カスタム変数展開時刻)は2025/08/30 13:47:47なので、ほぼ即時ですね。

なお、このときパスプレフィックスやファイルのURL、更新日時をカスタム変数の値として入力しています。

image.png

Google Cloudの例

Google Cloud Storageにファイルを格納します。このとき、完了日時は2025/08/30 13:50:09になっています。

image.png

すると、Google Cloud Storageからの取り込みの転送ジョブがAPI経由で実行されます。転送ジョブの起動日時は2025/08/30 13:50:10なので、こちらもほぼ即時です。

image.png

処理概要

それぞれの処理の概要については以下のようになっています。

AWSの場合

Google Cloudの場合

注意点やハマりどころなど

利用する際の注意点や、実装していたときのハマりどころについてもまとめておきます。

AWSの場合

Terraformの実装

  • 対象は./modules/event_driven_ingestion/aws/main.tf
  • SQSが適切に処理されずに、デッドレターキューに入った場合の処理は実装されていません。それほど発生はしないだろうと思いつつ・・・。
  • Lambdaのスクリプトをデプロイするにあたり、ローカル側で依存関係を含めてzip圧縮しないといけないため、以下のような形で実装しています。そこで、Terraformの実行環境でPythonを利用できる必要があります。
./modules/event_driven_ingestion/aws/main.tf
resource "local_file" "job_definition_mappings" {
  content  = jsonencode(var.module_config.job_definition_mappings)
  filename = "${local.lambda_source_dir}/job_definition_mappings.json"
}

resource "null_resource" "install_dependencies" {
  triggers = {
    requirements_hash = filemd5("${local.lambda_source_dir}/requirements.txt")
    source_code_hash  = filemd5("${local.lambda_source_dir}/main.py")
  }

  provisioner "local-exec" {
    command = "cd ${local.lambda_source_dir} && pip install -r requirements.txt -t packages/"
  }
}

data "archive_file" "lambda" {
  type        = "zip"
  source_dir  = local.lambda_source_dir
  output_path = local.lambda_zip_path
  excludes = [
    "requirements.txt"
  ]
  depends_on = [
    local_file.job_definition_mappings,
    null_resource.install_dependencies,
  ]
}
  • Parameter StoreへのTROCCO API Key、Slack Webhook URLの登録をTerraform経由で行っていますが、stateファイルにキーが残ってしまうので、実運用にはこの形は非推奨です。

Lambdaのスクリプト

  • 対象は./modules/event_driven_ingestion/aws/src/main.py
  • Google Cloudと違ってメッセージ自体にファイルの更新日時が入ってこないので、別途スクリプトで情報を取得しています。
  • 改めて見て思ったのですが、Slack通知のエラーハンドリングが適切ではないかもしれません。ステータス200ok: falseで返ってくるような気がする・・・??
./modules/event_driven_ingestion/aws/src/main.py
def send_slack_notification(error_message: str):
    """Send error notification to Slack."""
    try:
        webhook_url = get_slack_webhook_url()
        payload = {"text": error_message}
        response = requests.post(webhook_url, json=payload)
        if response.status_code != 200:
            logger.error(
                f"Failed to send Slack notification: {response.status_code} - {response.text}"
            )
    except Exception as e:
        logger.error(f"Error sending Slack notification: {str(e)}")

Google Cloudの場合

Terraformの実装

  • 対象は./modules/event_driven_ingestion/google_cloud/main.tf
  • Secret ManagerへのTROCCO API Key、Slack Webhook URLの登録をTerraform経由で行っていますが、stateファイルにキーが残ってしまうので、実運用にはこの形は非推奨です。

Cloud Run Functionsのスクリプト

  • 対象は./modules/event_driven_ingestion/google_cloud/src/main.py
  • 実は、Cloud StorageトリガーによるCloud Run FunctionsのTerraform構成については、Google Cloud側でサンプルコードが公開されています。

  • しかし、やたらデフォルトサービスアカウントを利用せざるを得ない上、設定内容としても大してラップされておらず全体像が掴みにくい形になっていたので、今回はPub/Subトピックトリガーの形式で実装しています。

以下がボツにした初期実装(開いて表示)
  • やたらデフォルトサービスアカウントを利用している
  • ちなみに権限周りが適切になっているかは怪しいです(ややこしすぎて死にそうでした
/*
- Enable APIs
*/

resource "google_project_service" "apis" {
  for_each = toset([
    "cloudbuild.googleapis.com",
    "cloudfunctions.googleapis.com",
    "eventarc.googleapis.com",
    "pubsub.googleapis.com",
    "run.googleapis.com",
  ])
  project            = var.module_config.project.id
  service            = each.value
  disable_on_destroy = false
}


/*
- Publish Pub/Sub messages from GCS
*/

data "google_storage_project_service_account" "default" {
  project = var.module_config.project.id
}

resource "google_project_iam_member" "service_account__gcs_default" {
  project = var.module_config.project.id
  role    = "roles/pubsub.publisher"
  member  = data.google_storage_project_service_account.default.member
}

resource "google_project_iam_member" "service_account_pubsub_default" {
  project = var.module_config.project.id
  role    = "roles/iam.serviceAccountTokenCreator"
  member  = local.default_service_account.pub_sub.member
}


/*
- Receive Pub/Sub messages and trigger Cloud Run Functions
*/

resource "google_project_iam_member" "service_account__eventarc" {
  for_each = toset([
    "roles/eventarc.eventReceiver",
    "roles/eventarc.serviceAgent",
    "roles/run.invoker",
  ])
  project = var.module_config.project.id
  role    = each.key
  member  = local.default_service_account.eventarc.member
  depends_on = [
    google_project_service.apis
  ]
}

resource "google_storage_bucket_iam_member" "service_account__eventarc" {
  bucket = var.module_config.bucket_name
  role   = "projects/${var.module_config.project.id}/roles/storageBucketsGet"
  member = local.default_service_account.eventarc.member
}


/*
- Create and Execute Cloud Run Functions
*/

resource "google_service_account" "function" {
  project      = var.module_config.project.id
  account_id   = local.function_name
  display_name = "Service Account for TROCCO Event Driven Ingestion"
}

resource "google_project_iam_member" "service_account__function" {
  for_each = toset([
    "roles/eventarc.eventReceiver",
    "roles/run.invoker",
    "roles/logging.logWriter",
  ])
  project = var.module_config.project.id
  role    = each.key
  member  = google_service_account.function.member
}

resource "google_storage_bucket_iam_member" "service_account__function" {
  for_each = toset([
    "roles/storage.objectViewer",
  ])
  bucket = var.module_config.bucket_name
  role   = each.key
  member = google_service_account.function.member
}

resource "google_cloudfunctions2_function" "function" {
  project     = var.module_config.project.id
  name        = local.function_name
  description = "Trigger TROCCO Jobs on File Uploads to GCS"
  location    = var.module_config.location
  build_config {
    runtime     = "python311"
    entry_point = "handle_gcs_event"

    source {
      storage_source {
        bucket = var.module_config.bucket_name
        object = google_storage_bucket_object.function.name
      }
    }
  }

  service_config {
    available_memory               = "256M"
    timeout_seconds                = 60
    min_instance_count             = 1
    max_instance_count             = 10
    service_account_email          = google_service_account.function.email
    ingress_settings               = "ALLOW_INTERNAL_ONLY"
    all_traffic_on_latest_revision = true
    environment_variables = {
      PROJECT_ID = var.module_config.project.id
      SECRET_ID  = google_secret_manager_secret.trocco_api_key.secret_id
    }
  }

  event_trigger {
    trigger_region        = var.module_config.location
    event_type            = "google.cloud.storage.object.v1.finalized"
    retry_policy          = "RETRY_POLICY_DO_NOT_RETRY"
    service_account_email = google_service_account.function.email
    event_filters {
      attribute = "bucket"
      value     = var.module_config.bucket_name
    }
  }
  depends_on = [
    google_project_service.apis,
    google_storage_bucket_object.function,
    google_secret_manager_secret_iam_member.service_account__secret_accessor
  ]
}

resource "local_file" "job_definition_mappings" {
  content  = jsonencode(var.module_config.job_definition_mappings)
  filename = "${local.function_source_dir}/job_definition_mappings.json"
}

data "archive_file" "function" {
  type        = "zip"
  source_dir  = local.function_source_dir
  output_path = local.function_zip_path
  depends_on = [
    local_file.job_definition_mappings,
  ]
}

resource "google_storage_bucket_object" "function" {
  name   = "dist/${local.function_name}-${data.archive_file.function.output_md5}.zip"
  bucket = var.module_config.bucket_name
  source = data.archive_file.function.output_path
}

resource "google_secret_manager_secret" "trocco_api_key" {
  project   = var.module_config.project.id
  secret_id = "trocco-api-key-${local.function_name}"
  replication {
    auto {}
  }
}

resource "google_secret_manager_secret_version" "trocco_api_key" {
  secret      = google_secret_manager_secret.trocco_api_key.id
  secret_data = var.trocco_api_key
}

resource "google_secret_manager_secret_iam_member" "service_account__secret_accessor" {
  project   = var.module_config.project.id
  secret_id = google_secret_manager_secret.trocco_api_key.id
  role      = "roles/secretmanager.secretAccessor"
  member    = google_service_account.function.member
}

  • 認知負荷の要因としては、そもそもEventarcの挙動がややこしいのと、Cloud FunctionsがCloud Run Functionsになり裏側としてCloud Runをデプロイするようになっているが、そのあたりが絶妙にラップされていないからだと感じました。

  • ロギングのスクリプトを書いたのが初めてだったので、これで適切かどうかはよくわかっていないです・・・。
  • 改めて見て思ったのですが、Slack通知のエラーハンドリングが適切ではないかもしれません。ステータス200ok: falseで返ってくるような気がする・・・??(Lambdaと同様)

おわりに

公開できるレベルにするためには結構手直しをしてはいるのですが、設計~実装までClaude Codeを活用したことで、最低限動くサンプル実装を作成することができました。よろしければ参考にしてみてください。

参考

7
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?