0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Step Functions ステートマシンで並列処理を実装する

Last updated at Posted at 2024-11-25

目的

この記事は、AWS Step Functionsを使用して並列処理を行う際の設計例と注意点について解説します。特に状態監視とタイムアウト制御を組み込む方法を中心に、実装例を示します。初心者向けにも分かりやすく構成しています。

技術・サービス

AWS

Step Functions
DynamoDB
Lambda
IAM

構成

Step Functions ステートマシン

フローで扱うサービスの権限を付与する必要があることに注意。
今回では DynamoDB への putItem の権限が必要です。

stepfunctions_graph (20).png

サンプルコード(ステートマシン)
sample_state_machine.asl.json
{
  "Comment": "Parallelステートでどちらかの条件が満たされたらフローを進める",
  "StartAt": "入力パラメータ設定",
  "States": {
    "入力パラメータ設定": {
      "Type": "Pass",
      "Next": "Map",
      "Parameters": {
        "param1": "テスト",
        "param2": true,
        "items": [
          {
            "task_token_table_name": "test_task_token_table",
            "timeout_seconds.$": "$.timeout_seconds"
          }
        ]
      }
    },
    "Map": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "INLINE"
        },
        "StartAt": "条件をチェックする処理 (Parallel)",
        "States": {
          "条件をチェックする処理 (Parallel)": {
            "Type": "Parallel",
            "Branches": [
              {
                "StartAt": "タスクトークン登録(コールバック)",
                "States": {
                  "タスクトークン登録(コールバック)": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::aws-sdk:dynamodb:putItem.waitForTaskToken",
                    "Parameters": {
                      "TableName.$": "$.task_token_table_name",
                      "Item": {
                        "token_id": {
                          "N": "1"
                        },
                        "task_token": {
                          "S.$": "$$.Task.Token"
                        }
                      }
                    },
                    "Next": "中断 (Fail)"
                  },
                  "中断 (Fail)": {
                    "Type": "Fail",
                    "Error": "Callback",
                    "Cause": "コールバック処理が完了したため中断"
                  }
                }
              },
              {
                "StartAt": "タイムアウト待機",
                "States": {
                  "タイムアウト待機": {
                    "Type": "Wait",
                    "SecondsPath": "$.timeout_seconds",
                    "Next": "中断2 (Fail)"
                  },
                  "中断2 (Fail)": {
                    "Type": "Fail",
                    "Error": "Timeout",
                    "Cause": "タイムアウトにより中断"
                  }
                }
              }
            ],
            "ResultPath": "$.parallel_results",
            "Next": "中断を正常として扱う",
            "Catch": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "Next": "中断を正常として扱う",
                "ResultPath": "$"
              }
            ]
          },
          "中断を正常として扱う": {
            "Type": "Succeed"
          }
        }
      },
      "Next": "パラメータチェック",
      "MaxConcurrency": 1,
      "ResultPath": "$.map_results",
      "ItemsPath": "$.items"
    },
    "パラメータチェック": {
      "Type": "Pass",
      "Next": "Choice",
      "Parameters": {
        "param1.$": "$.param1",
        "param2.$": "$.param2",
        "timeout_seconds.$": "$.items[0].timeout_seconds",
        "Error.$": "$.map_results[0].Error"
      }
    },
    "Choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.Error",
          "StringEquals": "Callback",
          "Next": "Success",
          "Comment": "コールバックの場合"
        },
        {
          "Variable": "$.Error",
          "StringEquals": "Timeout",
          "Next": "Fail",
          "Comment": "タイムアウトの場合"
        }
      ],
      "Default": "Fail"
    },
    "Fail": {
      "Type": "Fail"
    },
    "Success": {
      "Type": "Succeed"
    }
  }
}

DynamoDB テーブル

コールバックを返すためのタスクトークンを保存します。
タスクトークンは実行ごとに異なる値を使用します。

test_task_token_table

id task_token
1 {タスクトークン}

Lambda 関数

テスト用関数。
入力パラメータにタスクトークンを渡すことでコールバックを返すことができます。
Step Functions の SendTaskSuccess 権限を付与する必要があります。

callback_sample.py
import os
import boto3

SFN = boto3.client('stepfunctions')

def lambda_handler(event, context):

    # 出力されたタスクトークンを取得する
    task_token = event.get("task_token", None)
    if task_token is None:
        return {
            'statusCode': 500,
            'body': 'No task_token'
        }

    # 何らかの処理
    # ...

    # ステートマシンにコールバックを返す
    output = {
        "message": "Callback task completed successfully.",
        "result": True,
    }
    params = {"output": json.dumps(output), "taskToken": TASK_TOKEN}

    SFN.send_task_success(**params)
    

    # レスポンスを作成して返す
    return {
        'statusCode': 200,
        'body': 'Data retrieved successfully'
    }

解説

1. 入力パラメータ設定

Passステートで初期パラメータを設定します。この例では、タイムアウト値を入力パラメータから取得します。

入力パラメータ例
{
    "timeout_seconds":60
}

2. Mapステート

処理を中断させないためのMap。
同時実行数を1といるので、ここでは並列処理はしていません。
Mapステートは、並列処理のどちらかで中断した場合にそのエラーをキャッチし、正常なフローとして処理を続ける役割を持っています。これにより、次のパラメータチェックに進むことが可能になります。
今回はテストのため全エラーを受けています(エラーハンドリングを省略しています)。
正常系はSuccess、異常系はFailとして実行ステートマシンの結果を分けることもできます。
中断のErrorも定義しているので、Catchで受け取ったエラーを条件に応じて処理することも可能です。たとえば、タイムアウトエラーを無視して処理を続行する場合や、特定のエラーが発生した際にリトライを試みるなど、ChoiceやRetryステートを活用できます。

Mapがないと並列処理の出力がエラー情報に置き換わってしまい、入力パラメータが失われるため、パラメータチェックでエラーとなります。

stepfunctions_graph (18).png

エラー
An error occurred while executing the state 'パラメータチェック' (entered at the event id #16). The JSONPath '$.param1' specified for the field 'param1.$' could not be found in the input '{"Error":"Timeout","Cause":"タイムアウトにより中断"}'
並列処理の出力
{
  "Error": "Timeout",
  "Cause": "タイムアウトにより中断"
}
Mapなしのステートマシン
sample_state_machine.asl.json
{
  "Comment": "Parallelステートでどちらかの条件が満たされたらフローを進める",
  "StartAt": "入力パラメータ設定",
  "States": {
    "入力パラメータ設定": {
      "Type": "Pass",
      "Next": "条件をチェックする処理 (Parallel)",
      "Parameters": {
        "param1": "テスト",
        "param2": true,
        "task_token_table_name": "test_task_token_table",
        "timeout_seconds.$": "$.timeout_seconds"
      }
    },
    "条件をチェックする処理 (Parallel)": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "タスクトークン登録(コールバック)",
          "States": {
            "タスクトークン登録(コールバック)": {
              "Type": "Task",
              "Resource": "arn:aws:states:::aws-sdk:dynamodb:putItem.waitForTaskToken",
              "Parameters": {
                "TableName.$": "$.task_token_table_name",
                "Item": {
                  "token_id": {
                    "N": "1"
                  },
                  "task_token": {
                    "S.$": "$$.Task.Token"
                  }
                }
              },
              "Next": "中断 (Fail)"
            },
            "中断 (Fail)": {
              "Type": "Fail",
              "Error": "Callback",
              "Cause": "コールバック処理が完了したため中断"
            }
          }
        },
        {
          "StartAt": "タイムアウト待機",
          "States": {
            "タイムアウト待機": {
              "Type": "Wait",
              "SecondsPath": "$.timeout_seconds",
              "Next": "中断2 (Fail)"
            },
            "中断2 (Fail)": {
              "Type": "Fail",
              "Error": "Timeout",
              "Cause": "タイムアウトにより中断"
            }
          }
        }
      ],
      "ResultPath": "$.parallel_results",
      "Next": "パラメータチェック",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "パラメータチェック",
          "ResultPath": "$"
        }
      ]
    },
    "パラメータチェック": {
      "Type": "Pass",
      "Next": "Success",
      "Parameters": {
        "param1.$": "$.param1",
        "param2.$": "$.param2",
        "timeout_seconds.$": "$.timeout_seconds"
      }
    },
    "Success": {
      "Type": "Succeed"
    }
  }
}

3. 並列処理

Parallelステートで2つの処理を並列に実行します。
いずれかの処理が条件を満たした場合に次のフローへ進むため、Failステートを使用して並列処理を明示的に中断させています。
中断された処理がそのままで良いか、後処理が必要かなどは考慮が必要です。

3-1. メイン処理

タスクトークンをDynamoDBテーブルに登録し、コールバックを待機します。
別でコールバックを返すLambdaを用意し、フローを進めます。

3-2. タイムアウト設定

指定した時間をWaitステートで待機します。

4. パラメータチェック

並列処理から渡された結果や前のステートから引き継いだパラメータを検証します。

5. Choice

全体の処理を成功もしくは失敗で終了するステートです。
Mapステートの出力に含まれているErrorを参照して判定しています。

実行結果

いずれかが完了した時点でパラレルの処理が中断され、次フローへと進むことができています。

メイン処理が完了した場合
stepfunctions_graph (22).png

タイムアウトした場合
stepfunctions_graph (21).png

まとめ

この記事では、AWS Step Functionsを使用して並列処理を実装する方法について説明しました。
特に以下の点について解説しました:

  1. Parallelステートを使用して状態監視とタイムアウト制御を並列に実装する方法
  2. Mapステートを活用してエラーハンドリングを行い、並列処理の中断後も正常にフローを継続させる方法
  3. タスクトークンを使用したコールバック処理の実装方法

実際の運用では、DynamoDBテーブルに格納されたタスクトークンを取得し、それに基づいてコールバックを返すLambda関数を設計する必要があります。
また、テーブルにコールバック待機中のフラグをもたせることで、コールバックを返す必要があるか判定できるようになります。さらに、EventBridgeルールを活用して定期的にLambda関数を実行することで、コールバック待機の状態を監視・更新する仕組みも構築できます。

※ この記事の例をカスタマイズする際は、特定のAWSサービスや制約(料金、リソースの上限など)を考慮してください。

あとがき

フローの中で並列に状態監視をして、そのどちらかが条件を満たした場合に次フローへ進む構成を実装する上で、その時の気づきなどを記しておこうと思い作成しました。少しでも参考になれば幸いです。

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?