18
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Step FunctionsのParallelのエラー処理をいい感じにする

Posted at

最近、Step Functionsをさわることが増えてきたのでその時の対応を書こうかと思います

Step Functionsはワークフローを定義してLambdaやAWS Batchとかのサービスを順に実行するといったことができるサービスです
簡易なワークフローならいいのですが、ちょっと複雑なことをしようとするとなかなかつらかったです(個人的に)
Step Functionsで並列処理を行うのにはParallelとMapの2つがありますが、Parallelでのエラー処理をいい感じにできないかと試行錯誤したときの内容です

##Parallelステート
まず、こんな感じのワークフローを作成
2つの処理が並行で実行される単純なワークフローです

{
  "Comment": "Example",
  "StartAt": "Parallel",
  "States": {
    "Parallel": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Job1-1",
          "States": {
            "Job1-1": {
              "Type": "Task",
              "Resource": arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_1:$LATEST",
              "Next": "Job1-2"
            },
            "Job1-2": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_2:$LATEST",
              "End": true
            }
          }
        },
        {
          "StartAt": "Job2-1",
          "States": {
            "Job2-1": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_2_1:$LATEST",
              "Next": "Job2-2"
            },
            "Job2-2": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_2_2:$LATEST",
              "End": true
            }
          }
        }
      ],
      "End": true
    }
  }
}

stepfunctions_graph (3).png

上記のワークフローではJob1-1でエラーとなった場合、並行で実行されている処理はそこで中断されてしまいます
stepfunctions_graph (4).png

##エラー時に並行処理を継続実行させる
並行処理をキャンセルさせずにそのまま継続させたいので Catch でエラーを捕捉します

{
  "Comment": "Example",
  "StartAt": "Parallel",
  "States": { 
    "Parallel": {
      "Type": "Parallel",
        "Branches": [
          {
            "StartAt": "Job1-1",
            "States": {
               "Job1-1": {
                 "Type": "Task",
                 "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_1:$LATEST",
                 "Next": "Job1-2",
                 "Catch": [{
                   "ErrorEquals": [ "States.ALL" ],
                   "Next": "Error1"
                 }]
               },
               "Job1-2": {
                 "Type": "Task",
                 "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_2:$LATEST",
                 "End": true,
                 "Catch": [{
                   "ErrorEquals": [ "States.ALL" ],
                   "Next": "Error1"
                 }]
               },
               "Error1": {
                 "Type": "Pass",
                 "End": true
               }
             }
          },
          {
            "StartAt": "Job2-1",
            "States": {
              "Job2-1": {
                "Type": "Task",
                "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_2_1:$LATEST",
                "Next": "Job2-2",
                 "Catch": [{
                   "ErrorEquals": [ "States.ALL" ],
                   "Next": "Error2"
                 }]
              },
              "Job2-2": {
                "Type": "Task",
                "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_2_2:$LATEST",
                "End": true,
                 "Catch": [{
                   "ErrorEquals": [ "States.ALL" ],
                   "Next": "Error2"
                 }]
              },
              "Error2": {
                "Type": "Pass",
                "End": true
              }
            }
          }
       ],
       "End": true
     }
  }
}

stepfunctions_graph (5).png

こうすることで並行処理がキャンセルされることなく実行されるようになりました
stepfunctions_graph (6).png

##StateMahineのステータスをFailedにする
上記ではStateMachineのステータスはSucceededとなってしまうのでStateMachineのステータスはFailedとなるようにしたいと思います

まず、1つ目のLambdaのoutputは $.lambda にセットし、2つ目のInputPathに**$.lambdaを渡します
2つ目のLambdaのoutputは不要なので
OutputPathnull**とします

               "Job1-1": {
                 "Type": "Task",
                 "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_1:$LATEST",
                 "ResultPath": "$.lambda",
                 "Next": "Job1-2",
                 "Catch": [{
                   "ErrorEquals": [ "States.ALL" ],
                   "Next": "Error1"
                 }]
               },
               "Job1-2": {
                 "Type": "Task",
                 "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_2:$LATEST",
                 "InputPath": "$.lambda",
                 "OutputPath": null,
                 "End": true,
                 "Catch": [{
                   "ErrorEquals": [ "States.ALL" ],
                   "Next": "Error1"
                 }]
               },

さらにエラー処理を以下のようにします

               "Error1": {
                 "Type": "Pass",
                 "Result": {
                   "error": "Job1"
                 },
                 "End": true
               }

次にParallel の次の処理として以下を追加します

    "Results": {
      "Type": "Pass",
      "Parameters": {
        "results.$": "States.JsonToString($)"
      },
      "Next": "Check"
    },
    "Check": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.results",
          "StringMatches": "*error*",
          "Next": "Failed"
        }
      ],
      "Default": "Success"
    },
    "Failed": {
      "Type": "Fail",
      "Error": "States.TaskFailed"
    },
    "Success": {
      "Type": "Pass",
      "End": true
    }

stepfunctions_graph (7).png

Results でのinputとして以下のJSONが渡ります

成功時
  "input": [
    {},
    {}
  ]
エラー時
  "input": [
    {
      "error": "Job1"
    },
    {}
  ]

これを States.JsonToString() で以下のような文字列に変換します

成功時
  "output": {
    "results": "[{},{}]"
  }
エラー時
  "output": {
    "results": "[{\"error\":\"Job1\"},{}]"
  }

Choiceで error という文字が含まれているかどうかをチェックし、含まれていればFailステートに遷移させます
stepfunctions_graph (8).png

ASL全体は以下のようになります

{
  "Comment": "Example",
  "StartAt": "Parallel",
  "States": { 
    "Parallel": {
      "Type": "Parallel",
        "Branches": [
          {
            "StartAt": "Job1-1",
            "States": {
               "Job1-1": {
                 "Type": "Task",
                 "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_1:$LATEST",
                 "ResultPath": "$.lambda",
                 "Next": "Job1-2",
                 "Catch": [{
                   "ErrorEquals": [ "States.ALL" ],
                   "Next": "Error1"
                 }]
               },
               "Job1-2": {
                 "Type": "Task",
                 "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_1_2:$LATEST",
                 "InputPath": "$.lambda",
                 "OutputPath": null,
                 "End": true,
                 "Catch": [{
                   "ErrorEquals": [ "States.ALL" ],
                   "Next": "Error1"
                 }]
               },
               "Error1": {
                 "Type": "Pass",
                 "Result": {
                   "error": "Job1"
                 },
                 "End": true
               }
             }
          },
          {
            "StartAt": "Job2-1",
            "States": {
              "Job2-1": {
                 "Type": "Task",
                 "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_2_1:$LATEST",
                 "ResultPath": "$.lambda",
                 "Next": "Job2-2",
                 "Catch": [{
                   "ErrorEquals": [ "States.ALL" ],
                   "Next": "Error2"
                 }]
              },
              "Job2-2": {
                "Type": "Task",
                "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:job_2_2:$LATEST",
                "InputPath": "$.lambda",
                "OutputPath": null,
                "End": true,
                "Catch": [{
                  "ErrorEquals": [ "States.ALL" ],
                  "Next": "Error2"
                }]
              },
              "Error2": {
                "Type": "Pass",
                "Result": {
                  "error": "Job2"
                },
                "End": true
              }
            }
          }
       ],
       "Next": "Results"
     },
     "Results": {
       "Type": "Pass",
       "Parameters": {
         "results.$": "States.JsonToString($)"
       },
       "Next": "Check"
     },
     "Check": {
       "Type": "Choice",
       "Choices": [
         {
           "Variable": "$.results",
           "StringMatches": "*error*",
           "Next": "Failed"
         }
       ],
       "Default": "Success"
     },
     "Failed": {
       "Type": "Fail",
       "Error": "States.TaskFailed"
     },
     "Success": {
       "Type": "Pass",
       "End": true
     }
  }
}

##おわりに
これでStateMachineのステータスはFailedとすることができ、EventBridgeでFailedであればChatbotでSlackに通知できるようになりました

上記のサンプルは簡単なワークフローですが、実際に作成したものは並列処理がネスト・完了するまでに数時間かかるようなワークフローになっており、途中でキャンセルされてしまうと再実行に時間がかかってしまうのでこのような形での対応としています

18
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?