AWS Step Functionsの簡単な説明
AWS Step Functionsは、アプリケーションの複数のAWSサービス間の作業を視覚的に構成し、実行できるようにするサービスでございます。 各状態は開発作業を示します。 これにより、複雑なビジネスロジックを単純化し、ワークフローを管理することができます。
今回の例はAWS Consoleではなく、Serverless Frameworkでテストを行います。
https://www.serverless.com/plugins/serverless-step-functions
主要概念
-
ステートマシン (State Machine): ワークフローを定義したJSONファイルで構成されるAWS Step Functionsの基本単位です。
-
ステート (State): ステートマシン内の各段階です。Task、Wait、Branchなど多様なタイプがあります。
Stateの種類
-
タスクステート (Task State)
- 外部サービスまたはLambda関数など特定の作業を実行します。
‘Resource’属性で実行する作業を指定します{ "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:MyFunction", "Next": "NextState" }
- 外部サービスまたはLambda関数など特定の作業を実行します。
-
チョイスステート (Choice State)
- 入力に応じて複数の経路の中から一つを選択します。
‘Choices’属性で条件を定義します。{ "Type": "Choice", "Choices": [ { "Variable": "$.type", "StringEquals": "TypeA", "Next": "StateA" }, { "Variable": "$.type", "StringEquals": "TypeB", "Next": "StateB" } ], "Default": "DefaultState" }
- 入力に応じて複数の経路の中から一つを選択します。
-
パラレルステート (Parallel State)
- 複数のブランチを並列に実行し、すべてのブランチが完了するまで待機します。
‘Branches’属性で並列に実行するステートを定義します。{ "Type": "Parallel", "Branches": [ { "StartAt": "ParallelState1", "States": { "ParallelState1": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:ParallelFunction1", "End": true } } }, { "StartAt": "ParallelState2", "States": { "ParallelState2": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:ParallelFunction2", "End": true } } } ], "Next": "NextState" }
- 複数のブランチを並列に実行し、すべてのブランチが完了するまで待機します。
-
ウェイトステート (Wait State)
- 指定された時間の間待機します。
SecondsまたはTimestamp属性で待機時間を定義します。{ "Type": "Wait", "Seconds": 10, "Next": "NextState" }
- 指定された時間の間待機します。
-
サクシードステート (Succeed State)
- ステートマシンが成功裏に完了したことを示します。
追加属性なしで‘Type’だけで定義します。{ "Type": "Succeed" }
- ステートマシンが成功裏に完了したことを示します。
-
フェイルステート (Fail State)
- ステートマシンが失敗したことを示し、エラーを返します。
‘Error’と‘Cause’属性でエラー情報を定義できます。{ "Type": "Fail", "Error": "ErrorCode", "Cause": "Error description" }
- ステートマシンが失敗したことを示し、エラーを返します。
-
パスステート (Pass State)
- 入力を出力に渡し、データ変換作業を行うことができます。
‘Result’属性で出力データを定義します。
{ "Type": "Pass", "Result": { "key": "value" }, "Next": "NextState" }
- 入力を出力に渡し、データ変換作業を行うことができます。
ステートマシンの例
{
"StartAt": "InitialTask",
"States": {
"InitialTask": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:InitialFunction",
"Next": "ChoiceState"
},
"ChoiceState": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "success",
"Next": "SuccessState"
},
{
"Variable": "$.status",
"StringEquals": "failure",
"Next": "FailState"
}
],
"Default": "DefaultTask"
},
"SuccessState": {
"Type": "Succeed"
},
"FailState": {
"Type": "Fail",
"Error": "TaskFailed",
"Cause": "The task has failed"
},
"DefaultTask": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:DefaultFunction",
"End": true
}
}
}
ステートマシンの順序
- InitialTaskステートでLambda関数を呼び出します。
- 呼び出し結果に応じてChoiceStateステートで経路を選択します。
- statusが“success”であればSuccessStateステートに移動し、ステートマシンを成功裏に終了します。
- statusが“failure”であればFailStateステートに移動し、エラーを返します。
- その他の場合はDefaultTaskステートに移動し、別のLambda関数を呼び出して終了します。
MapReduce
Map Stateを使用してMapReduce形式のステートマシンも作成できます。さらに、MaxConcurrencyを使用して同時に実行するLambdaタスクの数を設定することも可能です。
Structure of MapReduce StateMachine
Map作業を行うLambda
文字列を入力として受け取り、文字列の長さを返します。
// mapFunction.js
exports.handler = async (event) => {
const length = event.word.length;
return {
length: length
};
};
Reduce作業を行うLambda
文字列の長さのリストを入力として受け取り、合計を計算します。
// reduceFunction.js
exports.handler = async (event) => {
const lengths = event.map(item => item.length);
const totalLength = lengths.reduce((acc, length) => acc + length, 0);
return {
totalLength: totalLength
};
};
Serverless.yml
service: map-reduce-step-functions
provider:
name: aws
runtime: nodejs18.x
region: us-east-1
functions:
mapFunction:
handler: mapFunction.handler
reduceFunction:
handler: reduceFunction.handler
stepFunctions:
stateMachines:
mapReduceStateMachine:
definition:
Comment: "A simple AWS Step Functions example for MapReduce using Map state"
StartAt: MapTask
States:
MapTask:
Type: Map
MaxConcurrency: 2
Iterator:
StartAt: CalculateLength
States:
CalculateLength:
Type: Task
Resource: arn:aws:lambda:us-east-1:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-mapFunction
End: true
ItemsPath: $.data
ResultPath: $.mappedResults
Next: ReduceTask
ReduceTask:
Type: Task
Resource: arn:aws:lambda:us-east-1:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-reduceFunction
End: true
plugins:
- serverless-step-functions
入力例
{
"data": [
{"word": "apple"},
{"word": "banana"},
{"word": "cherry"},
{"word": "date"},
{"word": "elderberry"},
{"word": "fig"},
{"word": "grape"}
]
}
結果値
{
"totalLength": 39
}
Summary
私は業務ではMapReduceを利用して大容量のデータを並列処理する方式でstep functionを使いました。 step functionは無限の可能性がある技術だと思います。もしこの文を読んで知りたいことがあればコメント残してください。:)