要約
Workflows のコールバック関数を使って、Cloud Run Jobs とやり取りすると良いぞ。
問題
Workflows には Standard library が用意されており、簡単な処理であれば Workflows 内で完結できます。
しかし、少し複雑な処理をやろうとすると、Cloud Run Service/Functions に任せたいと考えます。
ただ、それらには最大呼び出しが 30 分という制約があります。
また、Workflows と Web API 間の HTTP 接続が切れることが無視できない頻度で発生します。
解決策
今回は、Cloud Run Jobs の利用してみようと思います。
Cloud Run Jobs とは
Cloud Run のインスタンスを使って、長時間のバッチ処理などの実行を目的にした、PaaS サービスです。
Cloud Run が提供するサービスには、主に3種類があります。
- Cloud Run Service: Web バックエンド等での利用を目的にしたサービス
- Cloud Run Functions: イベント起動などを目的にした単一関数のサービス (いわゆる FaaS)
- Cloud Run Jobs: マルチタスクによるバッチ処理などを目的にしたサービス
Workflows と Cloud Run Jobs の連携
Cloud Run Jobs は Web からの呼び出しを想定していないので、要求と応答の受け渡しに工夫が必要です。
まず、Workflows から Cloud Run Jobs への呼び出しは、公式にコネクタが用意されているので、そちらを使います。
要求の送り方
Cloud Run Jobs は Web API のような HTTP リクエストを受け取れません。
ただし、起動引数という考え方があるので、これを活用します。上記ヘルプの body 項目を参照されたし。
イメージとしては、shell のコマンドにパラメータを渡す形に近いです。
応答の受け方
Cloud Run Jobs は Web API のような HTTP レスポンスを返しません。
このため、Clour Run Jobs の処理上でなんらか外部にデータを出力させる方法を取ります。
外部出力には、DB や GCS など選択肢がありますが、Workflows ならコールバック機能を用いると便利でしょう。
サンプルコード
Workflows
ワークフロー実行サービスアカウントには、以下の IAM ロールが必要です。(2025/11/17 時点)
- roles/workflows.invoker
- roles/logging.logWriter
- roles/run.jobsExecutorWithOverrides
- roles/run.viewer
main:
steps:
- create_callback:
call: events.create_callback_endpoint
args:
http_callback_method: "POST"
result: callback_endpoint
- run_job:
call: googleapis.run.v2.projects.locations.jobs.run
args:
name: "projects/{プロジェクトID or 番号}/locations/{ジョブのロケーション}/jobs/{Cloud Run ジョブ名}"
body:
overrides:
containerOverrides:
args:
- "--arg1=argument1"
- ${"--callback=" + callback_endpoint.url}
- get_response:
call: events.await_callback
args:
callback: ${callback_endpoint}
result: jobs_response
- print_response:
call: sys.log
args:
text: ${jobs_response.http_request.body}
引数は文字列として設定するので、JSON などの構造化データを送りたい場合には、base64 化して送るのと良いでしょう。
現時点では、Workflows から Cloud Run Jobs は同期呼び出しなので、ジョブが終わるまで run_job ステップは終わらないです。
Cloud Run Jobs
import click
import json
import requests
import google.auth.transport.requests
def send_response(url: str):
auth_req = google.auth.transport.requests.Request()
credential, _ = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
credential.refresh(auth_req)
token = credential.token
_r: requests.Response = requests.post(url,
json.dumps({
"status": 200,
"message": "my job ok"
}),
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
},
)
print(f"callback: {_r.status_code}")
@click.command()
@click.option("--arg1", type=str)
@click.option("--callback", type=str)
def main(arg1: str, callback: str):
print(arg1)
send_response(callback)
if __name__ == "__main__":
main()