1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Tips: Workflows と Cloud Run Jobs の連携

Last updated at Posted at 2025-11-20

要約

Workflows のコールバック関数を使って、Cloud Run Jobs とやり取りすると良いぞ。

問題

Workflows には Standard library が用意されており、簡単な処理であれば Workflows 内で完結できます。

しかし、少し複雑な処理をやろうとすると、Cloud Run Service/Functions に任せたいと考えます。

ただ、それらには最大呼び出しが 30 分という制約があります。

また、Workflows と Web API 間の HTTP 接続が切れることが無視できない頻度で発生します。

解決策

今回は、Cloud Run Jobs の利用してみようと思います。

Cloud Run Jobs とは

Cloud Run のインスタンスを使って、長時間のバッチ処理などの実行を目的にした、PaaS サービスです。

Cloud Run が提供するサービスには、主に3種類があります。

  1. Cloud Run Service: Web バックエンド等での利用を目的にしたサービス
  2. Cloud Run Functions: イベント起動などを目的にした単一関数のサービス (いわゆる FaaS)
  3. Cloud Run Jobs: マルチタスクによるバッチ処理などを目的にしたサービス

Workflows と Cloud Run Jobs の連携

Cloud Run Jobs は Web からの呼び出しを想定していないので、要求と応答の受け渡しに工夫が必要です。

まず、Workflows から Cloud Run Jobs への呼び出しは、公式にコネクタが用意されているので、そちらを使います。

要求の送り方

Cloud Run Jobs は Web API のような HTTP リクエストを受け取れません。

ただし、起動引数という考え方があるので、これを活用します。上記ヘルプの body 項目を参照されたし。

イメージとしては、shell のコマンドにパラメータを渡す形に近いです。

応答の受け方

Cloud Run Jobs は Web API のような HTTP レスポンスを返しません。

このため、Clour Run Jobs の処理上でなんらか外部にデータを出力させる方法を取ります。

外部出力には、DB や GCS など選択肢がありますが、Workflows ならコールバック機能を用いると便利でしょう。

サンプルコード

Workflows

ワークフロー実行サービスアカウントには、以下の IAM ロールが必要です。(2025/11/17 時点)

  • roles/workflows.invoker
  • roles/logging.logWriter
  • roles/run.jobsExecutorWithOverrides
  • roles/run.viewer

main:
  steps:
    - create_callback:
      call: events.create_callback_endpoint
      args:
        http_callback_method: "POST"
      result: callback_endpoint
    - run_job:
      call: googleapis.run.v2.projects.locations.jobs.run
      args:
        name: "projects/{プロジェクトID or 番号}/locations/{ジョブのロケーション}/jobs/{Cloud Run ジョブ名}"
        body:
          overrides:
            containerOverrides:
              args:
              - "--arg1=argument1"
              - ${"--callback=" + callback_endpoint.url}
    - get_response:
      call: events.await_callback
      args:
        callback: ${callback_endpoint}
      result: jobs_response
    - print_response:
      call: sys.log
      args:
        text: ${jobs_response.http_request.body}

引数は文字列として設定するので、JSON などの構造化データを送りたい場合には、base64 化して送るのと良いでしょう。

現時点では、Workflows から Cloud Run Jobs は同期呼び出しなので、ジョブが終わるまで run_job ステップは終わらないです。

Cloud Run Jobs

import click
import json
import requests
import google.auth.transport.requests

def send_response(url: str):
  auth_req = google.auth.transport.requests.Request()
  credential, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
  )
  credential.refresh(auth_req)
  token = credential.token

  _r: requests.Response = requests.post(url,
    json.dumps({
      "status": 200,
      "message": "my job ok"
    }),
    headers={
      "Authorization": f"Bearer {token}",
      "Content-Type": "application/json"
      },
  )

  print(f"callback: {_r.status_code}")

@click.command()
@click.option("--arg1", type=str)
@click.option("--callback", type=str)
def main(arg1: str, callback: str):
  print(arg1)
  send_response(callback)

if __name__ == "__main__":
  main()
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?