最近、Step Functionsをさわることが増えてきたのでその時の対応を書こうかと思います
Step Functionsはワークフローを定義してLambdaやAWS Batchとかのサービスを順に実行するといったことができるサービスです
簡易なワークフローならいいのですが、ちょっと複雑なことをしようとするとなかなかつらかったです(個人的に)
Step Functionsで並列処理を行うのにはParallelとMapの2つがありますが、Parallelでのエラー処理をいい感じにできないかと試行錯誤したときの内容です
##Parallelステート
まず、こんな感じのワークフローを作成
2つの処理が並行で実行される単純なワークフローです
{
"Comment": "Example",
"StartAt": "Parallel",
"States": {
"Parallel": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Job1-1",
"States": {
"Job1-1": {
"Type": "Task",
"Resource": arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_1:$LATEST",
"Next": "Job1-2"
},
"Job1-2": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_2:$LATEST",
"End": true
}
}
},
{
"StartAt": "Job2-1",
"States": {
"Job2-1": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_2_1:$LATEST",
"Next": "Job2-2"
},
"Job2-2": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_2_2:$LATEST",
"End": true
}
}
}
],
"End": true
}
}
}
上記のワークフローではJob1-1でエラーとなった場合、並行で実行されている処理はそこで中断されてしまいます
##エラー時に並行処理を継続実行させる
並行処理をキャンセルさせずにそのまま継続させたいので Catch でエラーを捕捉します
{
"Comment": "Example",
"StartAt": "Parallel",
"States": {
"Parallel": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Job1-1",
"States": {
"Job1-1": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_1:$LATEST",
"Next": "Job1-2",
"Catch": [{
"ErrorEquals": [ "States.ALL" ],
"Next": "Error1"
}]
},
"Job1-2": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_2:$LATEST",
"End": true,
"Catch": [{
"ErrorEquals": [ "States.ALL" ],
"Next": "Error1"
}]
},
"Error1": {
"Type": "Pass",
"End": true
}
}
},
{
"StartAt": "Job2-1",
"States": {
"Job2-1": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_2_1:$LATEST",
"Next": "Job2-2",
"Catch": [{
"ErrorEquals": [ "States.ALL" ],
"Next": "Error2"
}]
},
"Job2-2": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_2_2:$LATEST",
"End": true,
"Catch": [{
"ErrorEquals": [ "States.ALL" ],
"Next": "Error2"
}]
},
"Error2": {
"Type": "Pass",
"End": true
}
}
}
],
"End": true
}
}
}
こうすることで並行処理がキャンセルされることなく実行されるようになりました
##StateMahineのステータスをFailedにする
上記ではStateMachineのステータスはSucceededとなってしまうのでStateMachineのステータスはFailedとなるようにしたいと思います
まず、1つ目のLambdaのoutputは $.lambda にセットし、2つ目のInputPathに**$.lambdaを渡します
2つ目のLambdaのoutputは不要なのでOutputPathをnull**とします
"Job1-1": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_1:$LATEST",
"ResultPath": "$.lambda",
"Next": "Job1-2",
"Catch": [{
"ErrorEquals": [ "States.ALL" ],
"Next": "Error1"
}]
},
"Job1-2": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_2:$LATEST",
"InputPath": "$.lambda",
"OutputPath": null,
"End": true,
"Catch": [{
"ErrorEquals": [ "States.ALL" ],
"Next": "Error1"
}]
},
さらにエラー処理を以下のようにします
"Error1": {
"Type": "Pass",
"Result": {
"error": "Job1"
},
"End": true
}
次にParallel の次の処理として以下を追加します
"Results": {
"Type": "Pass",
"Parameters": {
"results.$": "States.JsonToString($)"
},
"Next": "Check"
},
"Check": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.results",
"StringMatches": "*error*",
"Next": "Failed"
}
],
"Default": "Success"
},
"Failed": {
"Type": "Fail",
"Error": "States.TaskFailed"
},
"Success": {
"Type": "Pass",
"End": true
}
Results でのinputとして以下のJSONが渡ります
"input": [
{},
{}
]
"input": [
{
"error": "Job1"
},
{}
]
これを States.JsonToString() で以下のような文字列に変換します
"output": {
"results": "[{},{}]"
}
"output": {
"results": "[{\"error\":\"Job1\"},{}]"
}
Choiceで error という文字が含まれているかどうかをチェックし、含まれていればFailステートに遷移させます
ASL全体は以下のようになります
{
"Comment": "Example",
"StartAt": "Parallel",
"States": {
"Parallel": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Job1-1",
"States": {
"Job1-1": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_1:$LATEST",
"ResultPath": "$.lambda",
"Next": "Job1-2",
"Catch": [{
"ErrorEquals": [ "States.ALL" ],
"Next": "Error1"
}]
},
"Job1-2": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_2:$LATEST",
"InputPath": "$.lambda",
"OutputPath": null,
"End": true,
"Catch": [{
"ErrorEquals": [ "States.ALL" ],
"Next": "Error1"
}]
},
"Error1": {
"Type": "Pass",
"Result": {
"error": "Job1"
},
"End": true
}
}
},
{
"StartAt": "Job2-1",
"States": {
"Job2-1": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_2_1:$LATEST",
"ResultPath": "$.lambda",
"Next": "Job2-2",
"Catch": [{
"ErrorEquals": [ "States.ALL" ],
"Next": "Error2"
}]
},
"Job2-2": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_2_2:$LATEST",
"InputPath": "$.lambda",
"OutputPath": null,
"End": true,
"Catch": [{
"ErrorEquals": [ "States.ALL" ],
"Next": "Error2"
}]
},
"Error2": {
"Type": "Pass",
"Result": {
"error": "Job2"
},
"End": true
}
}
}
],
"Next": "Results"
},
"Results": {
"Type": "Pass",
"Parameters": {
"results.$": "States.JsonToString($)"
},
"Next": "Check"
},
"Check": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.results",
"StringMatches": "*error*",
"Next": "Failed"
}
],
"Default": "Success"
},
"Failed": {
"Type": "Fail",
"Error": "States.TaskFailed"
},
"Success": {
"Type": "Pass",
"End": true
}
}
}
##おわりに
これでStateMachineのステータスはFailedとすることができ、EventBridgeでFailedであればChatbotでSlackに通知できるようになりました
上記のサンプルは簡単なワークフローですが、実際に作成したものは並列処理がネスト・完了するまでに数時間かかるようなワークフローになっており、途中でキャンセルされてしまうと再実行に時間がかかってしまうのでこのような形での対応としています