はじめに
ECS schedule taskなどで動かしているバッチ処理はアプリケーションの方で冪等性を
担保していることが多いと思います。ただ、今回はアプリケーションの変更が
難しい際に、タスクの多重起動を防ぐことによって、問題を解消するアプローチ方法について書きます!
構成
StepFunctionsからECS taskとLambdaは起動可能なので
ECS taskとEventBridgeの間にStepFunctionsを挟み、そこで、Lambdaを
呼び出し、すでに起動しているタスク(ステートマシン)がないかを確認するという処理を行います。
StepFunctionsのフロー
ステートマシンは以下のように定義していきます!
{
"StartAt": "get execution id",
"States": {
// Lambdaに渡す引数の準備
"get execution id": {
"Type": "Pass",
"Parameters": {
"StateMachineId.$" : "$$.StateMachine.Id",
"ExecutionName.$" : "$$.Execution.Name"
},
"Next": "check running statemachine"
},
// Lambdaの呼び出し
"check running statemachine": {
"Type": "Task",
"Resource": "${LambdaFunction}",
"Next": "choice"
},
// 分岐
"choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.running_flg",
"BooleanEquals": true,
"Next": "fail" // すでに起動しているステートマシンがあれば終了する
}
],
"Default": "Run an ECS Task and wait for it to complete"
},
// ECS Task の起動
"Run an ECS Task and wait for it to complete": {
"Type": "Task",
"Resource": "arn:aws:states:::ecs:runTask.sync",
"Parameters": {
"Cluster": "cluster-arn",
"TaskDefinition": "job-id",
"Overrides": {
"ContainerOverrides": [
{
"Name": "container-name",
"Command.$": "$.commands"
}
]
}
},
"Next": "succeed"
},
"succeed": {
"Type": "Succeed"
},
"fail": {
"Type": "Fail",
"Error": "ErrorCode",
"Cause": "There is a running state machine."
}
}
}
フロー図はこんな感じです、、、
(自動で出てくるので分かりやすくてありがたいです)
Lambdaの実装
今回はgoで実装してみました!
goじゃないとやりにくいとかは無いので、どの言語でも特に問題ないと思います。
package main
import (
"context"
"fmt"
"log"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/stepfunctions"
)
type Event struct {
StateMachineId string `json:"StateMachineId"`
ExecutionName string `json:"ExecutionName"`
}
func handleRequest(ctx context.Context, event Event) (map[string]bool, error) {
// StepFunctionsのAPIを呼び出す
sess := session.Must(session.NewSession())
sfnClient := stepfunctions.New(sess)
log.Printf("EventData: %+v\n", event)
// 実行中のステートマシンを取得する
resp, err := sfnClient.ListExecutions(&stepfunctions.ListExecutionsInput{
StateMachineArn: aws.String(event.StateMachineId),
StatusFilter: aws.String("RUNNING"),
})
if err != nil {
log.Fatalf("Error listing executions: %v", err)
return nil, err
}
runningFlg := false
for _, exec := range resp.Executions {
log.Printf("Execution name of the running Statemachine: %s\n", *exec.Name)
// 実行名が同じものがないか確認する
if event.ExecutionName != *exec.Name {
runningFlg = true
log.Printf("There is a running state machine: %s", *exec.Name)
break
}
}
return map[string]bool{
"running_flg": runningFlg,
}, nil
}
func main() {
lambda.Start(handleRequest)
}
さいごに
ざっくりとした記事になってしまいましたが、今回は以上になります。
StepFunctionsはそのままAWS APIが呼び出せたり、フロー図が勝手に出てきたりと非常に便利ですね!
(フロー図からステートマシンを作成することもできるそうです、すごい、、、)
これを機にもっと活用方法見つけていきたいです。
最後まで読んでいただきありがとうございました!!