概要
課題・状況
- 大量データの処理を、決まった時間かつ短め(40分とか)に処理したい
- 並列処理のあとにも順番で動かしたいjobがある
- 並列化の仕組みの導入にあまり時間をかけたくない、
- AWS Batchは導入済みだった
結果
- AWS Batchの「並列job」と「job依存関係」を使ったらかんたんに解決できた
- 後述: 並列jobをAWS Batchで動かす
- 後述: 順序のあるjobをAWS Batchで動かす
- 前の並列jobが終わるまで起動しない、前のjobが失敗したら起動しない
- 落とし穴もあったので、その補足も
背景
- 70万件のフリーテキストを定期的に形態素解析したい。
- 形態素解析のユーザ辞書を定期的に更新するので、毎度更新したもので形態素解析を当て直したい
- できるだけ最新の情報を素早く反映したい、ただし最初から1分以内を目指す必要はない
- そこに手間をかけるより、早くリリースしたい。
内容
並列jobをAWS Batchで動かす
- 配列job queueにenqueueする
# 説明のため、主題以外の部分(credentialのロードなど)は簡略化した
aws_batch_credentials = {ecr_arn:... ,job_queue_arn:... ,job_queue_role_arn:... ,task_runner_arn:...}
client = Aws::Batch::Client.new(aws_batch_credentials)
enqueue_resp = client.submit_job({
job_name: {job名},
job_queue: {job_queue_arn名},
array_properties: {
size: {並列数:後述の「並列数を計算する」},
},
job_definition: aws_batch_credentials[:task_runner_arn],
container_overrides: {
command: ["{形態素解析の起動コマンド}"],
environment: [
{name: "RAILS_ENV",value: Rails.env},
],
resource_requirements: [
{type: "VCPU",value: "{必要なcpu数: 1など}"},
{type: "MEMORY",value: "{必要なメモリサイズ: 1024など、足りないとOutOfMemoryで処理が止まるので多めが良い}"}
],
},
retry_strategy: {
attempts: 1
},
timeout: {
attempt_duration_seconds: "{タイムアウト設定時間、長めの設定が良い}",
}
})
# jobのidを保存する: 依存jobを定義するときに参照する
parallel_job_id = enqueue_resp.job_id
- 並列数を計算する
- やりたいこと
- (処理対象件数 * 1件あたりの処理時間(秒))/並列数 + 1ジョブの起動時間 = 規定時間以内におさめる
- 計算方法
-
並列数 = (処理対象件数 * 1件あたりの処理時間(秒)) / (規定時間以内 - 1ジョブの起動時間)
.
-
- 注意
- 1件あたりの処理時間(秒)は毎回自動計測して算出が理想、最低限なら手動で計測して固定値をハードコード
- やりたいこと
注意点(落とし穴?)
- 並列jobの場合、メモリ、CPUの指定はcontainer_overrides.resource_requirementsで行うこと
- container_overrides.vcpusや、container_overrides.memoryは無視されるようです。
# うまく動かない例:
container_overrides: {
vcpus: 1,
memory: 1024
}
# うまく動く例:
container_overrides: {
resource_requirements: [
{type: "VCPU",value: "1"},
{type: "MEMORY",value: "1024"}
],
}
順序のあるjobをAWS Batchで動かす
これは単純、depends_on.job_idに依存関係(前の並列jobが終わるまで起動しない、前のjobが失敗したら起動しない)をもたせたいjobのidを入れるだけ
client.submit_job({
job_name: {job名},
job_queue: aws_batch_configs[:job_queue_arn],
job_definition: sws_batch_configs[:task_runner_arn],
container_overrides: {
vcpus: 1,
memory: 512,
command: ["{起動コマンド}"],
environment: [
{name: "RAILS_ENV",value: Rails.env},
],
},
depends_on: [
{job_id: "{parallel_job_id: このjobが開始する前にSUCCEEDするべきjobのid}",type: "N_TO_N"}
]
})