2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Step Functionsの並行(Parallel)状態における入出力処理

Last updated at Posted at 2022-07-29

この記事は何?

StepFunctionsを利用して、並行状態の入出力処理について得た知見についてこのメモにまとめます。

Step Functionsとは?

step Functions は、AWS Lambda 関数およびその他のビジネスクリティカルなアプリケーションを構築するための AWS のサービスを組み合わせることができるサーバーレスオーケストレーションサービスです。Step Functions のグラフィカルコンソールでは、アプリケーションのワークフローを一連のイベント駆動型ステップとして確認できます。
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/welcome.html

サーバレスでワークフローを実装することができるAWSのサービスです。特にAWSのサービスとは相性がよく、AWSのAPIの呼び出しはStepFunctionsの機能として組み込まれているため、AWSを管理するワークフローであればローコードでの実装が可能です。

類似ツールとしてはApache Airflowがあります。

使用してみた所感としては、GUIで直感的にワークフローを組み立てることができるので、少ない学習コストで利用できると感じました。

入出力処理

StepFunctionsはJSONデータを入力として受け取り、ワークフローのそれぞれの処理(状態)にてそのJSONを入力として利用します。また出力もJSONとして、次の状態に渡します。

例えば
image.png

というようなワークフローがある場合、各状態における入出力は以下の通りになります。

開始時および状態1の入力
{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {}        
}
状態1の出力および状態2の入力
{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {
        "state1" : "状態1の出力"    
    }        
}
plaintext:状態2の出力および終了時
{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {
        "state1" : "状態1の出力",
        "state2" : "状態2の出力"    
    }        
}

上記はあくまで一例です。どのように入出力処理を制御するかもStepFunctionsを利用する上で重要になる点だと思います。

並行

ワークフローの中で並行処理を実装するための部品です。

これを使うと簡単に、並行処理を実装することが可能です。
並行状態においては、枝分かれした状態の流れをブランチと呼びます。

image.png

上記の並行状態では、ブランチが2つあるということなります。

各ブランチは独立したワークフローとなります。
並行状態内の全てのブランチの処理が完了(ブランチの最後の状態が完了)したら、その並行状態が完了したと見なされます。

並行状態における入出力処理

先ほど紹介したシーケンシャルなワークフロー

image.png

では、JSONが入出力としてワークフローの上から下に流れていくだけでイメージしやすいと思います。

では以下のような並行なワークフロー
image.png

こちらにおいて、入出力はどのようになるでしょうか?

並行状態の入力

並行状態への入力は各ブランチに自身(並行状態)のコピーを渡します。

Parallel 状態は各ブランチに自身の入力データ (InputPath フィールドによって変更される場合があります) のコピーを提供します。そのブランチからの出力を含む各ブランチの 1 つの要素から成る配列である出力が生成されます。すべての要素が同じタイプである必要はありません。出力配列は、通常の方法で ResultPath を使用して入力データに挿入し、全部を Parallel 状態の出力として送信できます (入力および出力処理を参照)。
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-parallel-state.html#amazon-states-language-parallel-state-output

つまり状態1と状態2が開始から同じJSONを入力として受け取ることになります。

並行状態の出力

では出力はどうなるでしょうか?

並行状態からの出力は、各ブランチの出力が配列となって次の状態に渡されます。

(このことに関しては公式のドキュメントに記載なさそうです。)

並行状態における入出力のサンプル

開始時
{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {}        
}
状態1の入力
{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {}        
}
状態2の入力
{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {}        
}

状態1と状態2の入力は並行状態に入った際に、コピーして同一のものが渡されます。

状態1の出力
{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {
        "state1" : "状態1の出力"
    }        

}
状態2の出力
{
    "input":
        "state1" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
        "state2" {
            "hoge":"hoge",
            "fuga":"fuga"
        }
    "output": {
        "state2" : "状態2の出力"
    }        
}
終了時
[
    {
        "input":
            "state1" {
                "hoge":"hoge",
                "fuga":"fuga"
            }
            "state2" {
                "hoge":"hoge",
                "fuga":"fuga"
            }
        "output": {
            "state1" : "状態1の出力"
        }        
    },
    {
        "input":
            "state1" {
                "hoge":"hoge",
                "fuga":"fuga"
            }
            "state2" {
                "hoge":"hoge",
                "fuga":"fuga"
            }
        "output": {
            "state2" : "状態2の出力"
        }        
    }
]

並列状態の出力は上記のように、配列に各ブランチの出力が格納される形になる。もしこの並列状態の後にさらにワークフローが続くようであれば、入出力処理が煩雑になります。

並列状態の前では
json.input.state1.hoge
と指定していたところを
json[0].input.state1.hoge
といった形で配列の要素を指定する必要が出てきます。

もしワークフローに開始時のJSONに入っている値であれば、全てのブランチにコピーされるため、どの配列の要素を選択しても値は取得できます。ただ並列状態内の出力を利用したい場合は何番目のブランチであるかを選択しないと値を取得することができません。

例えば
image.png

のようなワークフローがあった場合、状態3に渡す入力が開始時に渡したJSONにあれば、json[0].input.state3.hogejson[1].input.state3.hogeどちらを選択しても状態3に入力を渡すことが可能です。

ただもし、状態2の出力を状態3に渡したいというケースにおいてはjson[1].output.state2を選択する必要があります。json[0].outputには状態1の出力のみが格納されているため)

ちなみに並行状態後の配列の順番ですが、ブランチの番号順とリンクします。

[{ブランチ1},{ブランチ2}.....{ブランチX}]
という形の配列になります。

検証

hello worldを返すシンプルなLambdaを使用して、並行状態を使用したワークフローを作成する。

lambdaの中身(python3)
import json

def lambda_handler(event, context):
    return {
        "statusCode": 200,
        "body": json.dumps({
            "message": "hello world",
        }),
    }

ワークフロー

image.png

state1とstate2はhello worldのlambdaを呼び出すものになっている。

ワークフローはSAMで管理しています。
https://github.com/keiusukematsuda/stepfunctions-parallel-test

では入出力がどうなっているかを確認しましょう。

以下のJSONをワークフローに入力として渡します。

{
  "input": {
    "state1": {
      "hoge": "hoge",
      "fuga": "fuga"
    },
    "state2": {
      "hoge": "hoge",
      "fuga": "fuga"
    }
  }
}

そしてワークフローを実行して、並行状態の後の出力は以下のようになります。

[
  {
    "input": {
      "state1": {
        "hoge": "hoge",
        "fuga": "fuga"
      },
      "state2": {
        "hoge": "hoge",
        "fuga": "fuga"
      }
    },
    "output": {
      "state1": {
        "ExecutedVersion": "$LATEST",
        "Payload": {
          "statusCode": 200,
          "body": "{\"message\": \"hello world\"}"
        },
        "SdkHttpMetadata": {
          "AllHttpHeaders": {
            "X-Amz-Executed-Version": [
              "$LATEST"
            ],
            "x-amzn-Remapped-Content-Length": [
              "0"
            ],
            "Connection": [
              "keep-alive"
            ],
            "x-amzn-RequestId": [
              "b156fdbc-d611-4363-ba1f-a1ddfeebf90d"
            ],
            "Content-Length": [
              "61"
            ],
            "Date": [
              "Fri, 29 Jul 2022 06:31:28 GMT"
            ],
            "X-Amzn-Trace-Id": [
              "root=1-62e37ec0-509f195236e45faf2d85a28c;sampled=0"
            ],
            "Content-Type": [
              "application/json"
            ]
          },
          "HttpHeaders": {
            "Connection": "keep-alive",
            "Content-Length": "61",
            "Content-Type": "application/json",
            "Date": "Fri, 29 Jul 2022 06:31:28 GMT",
            "X-Amz-Executed-Version": "$LATEST",
            "x-amzn-Remapped-Content-Length": "0",
            "x-amzn-RequestId": "b156fdbc-d611-4363-ba1f-a1ddfeebf90d",
            "X-Amzn-Trace-Id": "root=1-62e37ec0-509f195236e45faf2d85a28c;sampled=0"
          },
          "HttpStatusCode": 200
        },
        "SdkResponseMetadata": {
          "RequestId": "b156fdbc-d611-4363-ba1f-a1ddfeebf90d"
        },
        "StatusCode": 200
      }
    }
  },
  {
    "input": {
      "state1": {
        "hoge": "hoge",
        "fuga": "fuga"
      },
      "state2": {
        "hoge": "hoge",
        "fuga": "fuga"
      }
    },
    "output": {
      "state2": {
        "ExecutedVersion": "$LATEST",
        "Payload": {
          "statusCode": 200,
          "body": "{\"message\": \"hello world\"}"
        },
        "SdkHttpMetadata": {
          "AllHttpHeaders": {
            "X-Amz-Executed-Version": [
              "$LATEST"
            ],
            "x-amzn-Remapped-Content-Length": [
              "0"
            ],
            "Connection": [
              "keep-alive"
            ],
            "x-amzn-RequestId": [
              "7208079e-9ab3-4f7e-8608-243949f0ecb8"
            ],
            "Content-Length": [
              "61"
            ],
            "Date": [
              "Fri, 29 Jul 2022 06:31:28 GMT"
            ],
            "X-Amzn-Trace-Id": [
              "root=1-62e37ec0-0c7565263145ba8e6fea3d5a;sampled=0"
            ],
            "Content-Type": [
              "application/json"
            ]
          },
          "HttpHeaders": {
            "Connection": "keep-alive",
            "Content-Length": "61",
            "Content-Type": "application/json",
            "Date": "Fri, 29 Jul 2022 06:31:28 GMT",
            "X-Amz-Executed-Version": "$LATEST",
            "x-amzn-Remapped-Content-Length": "0",
            "x-amzn-RequestId": "7208079e-9ab3-4f7e-8608-243949f0ecb8",
            "X-Amzn-Trace-Id": "root=1-62e37ec0-0c7565263145ba8e6fea3d5a;sampled=0"
          },
          "HttpStatusCode": 200
        },
        "SdkResponseMetadata": {
          "RequestId": "7208079e-9ab3-4f7e-8608-243949f0ecb8"
        },
        "StatusCode": 200
      }
    }
  }
]

lambdaの出力を全て取得しているので、長くなってしまっていますが、入出力がブランチ数(2)の数分コピーされて、それが配列に格納されていることがわかります。

まとめ

Step Functionsの並行(Parallel)状態における入出力処理において

  • 並行状態への入力は各ブランチにコピーを渡す
  • 各ブランチの出力が配列に格納される
    • 配列に格納される順番はブランチの番号通り

StepFunctionsはあまりナレッジが落ちてないから、調べるのが大変。。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?