株式会社Nateeの中坊です。
4月頃にGAになったCloud Run jobsを使ってスケール可能なバッチ処理を作る方法を紹介します。
Cloud Run jobs
Cloud Run jobsはCloud Runの拡張版のような位置付けで、長時間を要するバッチ処理を実現するためのソリューションです。
https://cloud.google.com/run/docs/create-jobs?hl=ja
当然こちらもサーバレスであり、コンテナイメージと実行コマンドさえ用意すれば実現できます。
前提としてすでにコンテナイメージベースのアプリケーションを作成していて、imageを使い回せる状況であると仮定します。
実現したいこと
下記の要件で作りました
- Cloud Run jobsをTerraformで構成管理する
- バッチ処理毎に並列数を制御する
- 処理を分散させる
- タスク1つ毎の所要時間は1時間以内に納める
Cloud Run jobsをTerraformで構成管理する
Terraformでjobを作成するためにはgoogle_cloud_run_v2_jobを使います。
job毎に可変にしたいパラメータはlocalsに配列で定義し、for文を使って繰り返し利用します。
その他のパラメータは実装の都合でvariableを利用していますがlocalsに入れても良いです。
メモリやCPUもコスト最適化の観点で可変にしています。
以下がサンプルコード
locals {
batch_jobs = [
{
app_name = "what-you-want"
command = ["your", "command"]
args = null
cpu = 2.0
memory = "8Gi"
task_count = 20
parallelism = 5
},
]
}
variable "location" {
type = string
description = "リージョン"
}
variable "image_url" {
type = string
description = "CloudRunJobsで実行するコンテナイメージのURL"
}
variable "vpc_connector_id" {
type = string
description = "VPCアクセスコネクタのID"
}
variable "service_account_email" {
type = string
description = "CloudRunJobsを実行するためのサービスアカウント"
}
resource "google_cloud_run_v2_job" "batch_job" {
for_each = { for job in local.batch_jobs : job.app_name => job }
name = each.value.app_name
location = var.location
template {
task_count = each.value.task_count
parallelism = each.value.parallelism
template {
service_account = var.service_account_email
containers {
image = var.image_url
command = each.value.command
args = each.value.args
resources {
limits = {
cpu = each.value.cpu
memory = each.value.memory
}
}
}
vpc_access {
connector = var.vpc_connector_id
egress = "PRIVATE_RANGES_ONLY"
}
timeout = "3600s" # 1時間
}
}
}
vpc_accessはCloud SQLに接続するために設定しています。
バッチ処理毎に並列数を制御する
上記のTerraformのコードでパラメータを配列で定義しているため、ジョブ毎に並列数の制御ができるようになりました。
バッチ処理を作る際に気を付けるべきパラメータは下記の3つです。
パラメータ名 | 役割 |
---|---|
task_count | jobの中で起動するtaskの数(起動するインスタンス数) |
parallelism | 同時に実行するタスクの最大数 |
timeout | タイムアウトまでの時間 |
parallelismが同時に実行する最大並列数で、task_countに達するまで最大限並列化して処理を進めます。
タスクの規模によってパラメータを調整して必要な数だけ並列化して実行することが可能です。
処理を分散させる
Cloud Run jobsの task_count
を1より大きい値にすることで、分散処理を実現できます。
タスクとは処理の実行単位で、task_count=20とすると、20個分指定したコマンドを実行します。
実際にCloud Run jobsを実行すると、CLOUD_RUN_TASK_INDEX
という環境変数から実行中のタスクの番号を取得することができるため、この値を使って分散処理を記述します。
例えば、SQLのlimitとoffsetを動的に変更するように実装するなら下記の様に記述することで実現できます。
※CLOUD_RUN_TASK_INDEXは0始まりの連番
index := strconv.Atoi(os.GetEnv("CLOUD_RUN_TASK_INDEX"))
limit := 30
offset := index * limit
sql := fmt.Sprintf("SELECT * FROM sample ORDER by created_at LIMIT %d OFFSET %d", limit, offset)
タスク1つ毎の所要時間は1時間以内に納める
これはCloud Run jobsの制約によるもので、1時間を超えるものはまだプレビュー機能だったので
タスク数を増やして1つあたりの所要時間を減らす方向で検討しました。
デフォルトでは、各タスクは最大 10 分間実行されます。この時間を短くしたり、長くすることができます(最長 24 時間まで)。ただし、1 時間を超える時間はプレビュー機能です。
https://cloud.google.com/run/docs/create-jobs?hl=ja
最後に
バッチ処理で利用するタスク数をterraformの定義で簡単に定義し、複製がしやすくなることは開発体験としては良いものだと感じています。
Goを使っている場合だとgoroutineを使って並列処理をすることも可能ですが、1つのCloud Runインスタンスではマシンリソースに限界もあるのでこのようにスケールさせる方法があると大規模な処理を比較的容易に実現できるので、できることの幅が広がりました。