はじめに
仕事で Step Functions を使うことになったので、いい加減ちゃんと触ってみる。
想定読者
- Step Functionsに入門したい人
- CDKを使ったことがある人
Step Functions
概要
Step Functions は、ワークフローを作成して分散アプリケーションの構築、プロセスの自動化などが可能。
- 全体の一連の流れ = ワークフロー = ステートマシン
- 一連の流れの中の各ステップ = 状態(ステート)
ワークフロータイプは2種類あり、標準ワークフローとExpressワークフロー(ストリーミング処理など使う)がある。
状態の種類
フィールド情報は、私見で使いそうだなと思ったもののみ抜粋した。すべて確認したい場合は、公式を参照ください。
タスク
Task
状態は、単一の作業単位を表す。タスクはLambda関数やアクティビティを使用したり、ほかのAWSサービスと統合したりして実行される。
AWSサービスと最適化された統合をした場合のResource
フィールドの指定は以下の通り。
arn:${partition}:states:${region}:${account}:${servicename}:${APIname}
-
partition
: 一般的にはaws
を指定 -
region
/account
/servicename
: リージョン / アカウントID / AWSサービス名 -
APIname
: 例:Lambda Invokeの場合はinvoke
※参考
最適化された統合を用いると、例えばLambda Invoke
の場合、レスポンスのPayload
フィールドはエスケープされたJSONからプレーンなJSONに変換される。
以下は、Lambda関数の最適化された統合を用いた場合のTask
状態定義である。
"LambdaState": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:HelloWorld:$LATEST"
},
"Next": "NextState"
}
-
OutputPath
: 状態出力の一部を選択して次の状態に渡す。上記の例ではPayload
のみを渡している。未指定の場合、$
となりJSON全体が渡される -
Parameters
:Lambda Invoke
APIに渡す引数。上記の例では、Payload
配下にこのTask
状態の入力JSON全体を渡し、FunctionName
に実行したいLambda関数のARNを渡している
選択
Choice
状態は、条件付きロジック。
- Choices (必須): ステートマシンが次に移行する状態を決定する選択ルールの配列
- Default (オプション):
Choices
のいずれにもマッチしないときの移行先を指定する
Choice状態サンプル(Not,And,Default)
{
"ChoiceStateX": {
"Type": "Choice",
"Choices": [
{
"Not": {
"Variable": "$.type",
"StringEquals": "Private"
},
"Next": "Public"
},
{
"Variable": "$.value",
"NumericEquals": 0,
"Next": "ValueIsZero"
},
{
"And": [
{
"Variable": "$.value",
"NumericGreaterThanEquals": 20
},
{
"Variable": "$.value",
"NumericLessThan": 30
}
],
"Next": "ValueInTwenties"
}
],
"Default": "DefaultState"
},
"Public": {
"Type" : "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:Foo",
"Next": "NextState"
},
"ValueIsZero": {
"Type" : "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:Zero",
"Next": "NextState"
},
"ValueInTwenties": {
"Type" : "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:Bar",
"Next": "NextState"
},
"DefaultState": {
"Type": "Fail",
"Cause": "No Matches!"
}
}
上記Choice
状態には、以下INPUTを与える想定。
{
"type": "Private",
"value": 22
}
並行
Parallel
状態は、枝分かれ(ブランチ)を作って並列処理を行うための状態。
-
Branches
(必須): 並列実行する状態を指定するオブジェクトの配列 -
ResultPath
(オプション): Resultで指定した内容を出力のどこに配置するかを指定 -
ResultSelector
(オプション): 状態の出力から必要なものを取り出したりして加工する -
Retry
(オプション): リトライポリシーを配列指定。失敗したものだけでなく全反復操作に適用されるため注意。エラーだけに適用したい場合は、ErrorEquals
などを使用して適用対象を限定する -
Catch
(オプション): フォールバック状態(どういうエラーのときどの状態に遷移させるかなど)を配列指定
{
"Comment": "Parallel Example.",
"StartAt": "LookupCustomerInfo",
"States": {
"LookupCustomerInfo": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "LookupAddress",
"States": {
"LookupAddress": {
"Type": "Task",
"Resource":
"arn:aws:lambda:us-east-1:123456789012:function:AddressFinder",
"End": true
}
}
},
{
"StartAt": "LookupPhone",
"States": {
"LookupPhone": {
"Type": "Task",
"Resource":
"arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder",
"End": true
}
}
}
]
}
}
}
マッピング
Map
状態は、反復操作を並行して実行する(動的並列処理)。インラインモード(デフォルト)と分散モード(高並列向け。詳細は割愛)。インラインモードでは、同時反復は40回まで。
-
ItemProcessor
(必須): 処理内容を定義-
ProcessorConfig
: モードの指定(デフォルト:インライン) -
StartAt
: 反復処理を行う各INPUTを最初にどの状態に渡すかを指定する -
States
: 反復処理を行うTask
状態などを定義する
-
-
ItemsPath
(オプション): 反復処理を行う各INPUTはMap
状態の入力JSONのどこから取得するか指定 -
ItemSelector
(オプション): これを挟むことで各反復処理に渡す前に各INPUTを加工できる -
MaxConcurrency
(オプション): 並列数の上限(デフォルト: 0=上限なし。可能な限り並列実行)
{
"ship-date": "2016-03-14T01:59:00Z",
"detail": {
"delivery-partner": "UQS",
"shipped": [
{ "prod": "R31", "dest-code": 9511, "quantity": 1344 },
{ "prod": "S39", "dest-code": 9511, "quantity": 40 },
{ "prod": "R40", "dest-code": 9511, "quantity": 1220 }
]
}
}
上記入力が以下のMap
状態に渡されるとき
"Validate-All": {
"Type": "Map",
"InputPath": "$.detail",
"ItemsPath": "$.shipped",
"MaxConcurrency": 0,
"ResultPath": "$.detail.shipped",
"ItemSelector": {
"parcel.$": "$$.Map.Item.Value",
"courier.$": "$.delivery-partner"
},
"ItemProcessor": {
"StartAt": "Validate",
"States": {
"Validate": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:ship-val",
"End": true
}
}
},
"End": true
}
起動した各Lambdaには、以下のように入力が渡される。
{
"parcel": {
"prod": "R31",
"dest-code": 9511,
"quantity": 1344
},
"courier": "UQS"
}
パス
何もせずに次の状態に渡す。デバッグに便利。
このPass
状態をはさむことで、強制的に入力変換を行うこともできる。
-
Result
(オプション): 次の状態にわたす内容を指定
"No-op": {
"Type": "Pass",
"Result": {
"x-datum": 0.381018,
"y-datum": 622.2269926397355
},
"ResultPath": "$.coords",
"End": true
}
をというPass
状態を作ったとき、
{
"georefOf": "Home"
}
という入力がくると
{
"georefOf": "Home",
"coords": {
"x-datum": 0.381018,
"y-datum": 622.2269926397355
}
}
と出力される。
待機
sleep。何秒待機するかを指定することも、特定日時まで待機させることも可能。
"wait_ten_seconds": {
"Type": "Wait",
"Seconds": 10,
"Next": "NextState"
}
成功
ワークフローの正常終了地点。
"SuccessState": {
"Type": "Succeed"
}
失敗
ワークフローの異常状態。Catchされない場合は障害としてマークされる。
-
Cause
(オプション): エラーの原因を説明する文字列。運用・診断目的で指定。 -
Error
(オプション): Retry または Catch フィールドを使用してエラー処理を実行する際に指定できるエラー名
"FailState": {
"Type": "Fail",
"Cause": "Invalid response.",
"Error": "ErrorA"
}
【基本】チュートリアル
下記に従い、CDKで作ってみる。
ステートマシンには以下のようにちゃんとLambdaを実行できる権限が付与されてた。
- State Machine
- 以下ポリシーのIAMロールを割り当て
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "lambda:InvokeFunction",
"Resource": [
"arn:aws:lambda:us-east-1:123456789012:function:StepStack-MyLambdaFunction67CCA873-aSAaLthmaS2m",
"arn:aws:lambda:us-east-1:123456789012:function:StepStack-MyLambdaFunction67CCA873-aSAaLthmaS2m:*"
],
"Effect": "Allow"
}
]
}
【応用】アルバイト月収計算ステートマシン
チュートリアルを踏まえて、何か作ってみようと思い、以下のようなアルバイト月収計算ステートマシンを作ってみた。本プログラムはあくまで勉強用で、実利用とはかけ離れているのご容赦ください。また、できるだけ冒頭で説明した様々な状態を使うようにしています。
仕様・処理内容
前提:
- 前月以前の月収は従業員ごとにDB管理されている
- 従業員ごとの単価(時給)はDB管理されている
- 今月分の月収は、その月の勤務時間とその従業員の時給を掛けて算出する
下記いずれかのINPUTを与える。
{
"partTimeWorkerId": "123456",
"yearMonth": "202409"
}
OR
{
"partTimeWorkerId": "123456",
"workHours": [
{
"start": "20241010 10:00",
"end": "20241010 12:00"
},
{
"start": "20241010 16:00",
"end": "20241010 17:00"
}
]
}
- 前月以前の月収を参照したい場合、
yearMonth
に年月を指定して月収を出力する - 今月以降の月収を参照したい場合、
workHours
に勤務予定を指定して月収を算出する
実装
前述した通り、できるだけ多様な状態を扱うようにしたかったので、以下のような処理フローとした。
Step Function および Lambda のCDKは以下のようになる。
Lambda関数の内容含めた全体はGitを参照してください。
import * as cdk from 'aws-cdk-lib'
import * as lambda from 'aws-cdk-lib/aws-lambda'
import * as sfn from 'aws-cdk-lib/aws-stepfunctions'
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks'
import { Construct } from 'constructs'
export class StepStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props)
// 月収取得Lambda
const fetchMonthlyEarningsFunction = new lambda.Function(this, 'fetchMonthlyEarnings', {
code: lambda.Code.fromAsset('lambda'),
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'fetchMonthlyEarnings.handler',
timeout: cdk.Duration.seconds(3)
})
// 単価取得Lambda
const fetchHourlyPayFunction = new lambda.Function(this, 'fetchHourlyPay', {
code: lambda.Code.fromAsset('lambda'),
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'fetchHourlyPay.handler',
timeout: cdk.Duration.seconds(3)
})
// 勤務時間計算Lambda
const calculateWorkHoursFunction = new lambda.Function(this, 'calculateWorkHours', {
code: lambda.Code.fromAsset('lambda'),
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'calculateWorkHours.handler',
timeout: cdk.Duration.seconds(3)
})
// 勤務時間と単価から給料計算するLambda
const calculateEarningsFunction = new lambda.Function(this, 'calculateMonthlyEarnings', {
code: lambda.Code.fromAsset('lambda'),
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'calculateMonthlyEarnings.handler',
timeout: cdk.Duration.seconds(3)
})
// 年月を指定して過去の給料を取得する処理
// 月収取得Lambda呼び出し
const fetchMonthlyEarningsTask = new tasks.LambdaInvoke(this, 'fetchMonthlyEarningsTask', {
lambdaFunction: fetchMonthlyEarningsFunction,
payloadResponseOnly: true // レスポンスをPayloadのみにする
})
.addCatch(
new sfn.Fail(this, 'fetchMonthlyEarningsFail', { cause: 'fetchMonthlyEarnings Error' })
)
.addRetry({
errors: ['States.ALL'], // すべてのエラーが対象
maxAttempts: 1
})
.next(new sfn.Succeed(this, 'fetchMonthlyEarningsSucceed'))
// 勤務時間から給料を計算する処理
// 単価取得処理と勤務時間計算処理を並列呼び出し
const calculateEarningsByWorkHours = new sfn.Parallel(this, 'parallelProcess')
calculateEarningsByWorkHours.branch(
new tasks.LambdaInvoke(this, 'fetchHourlyPayTask', {
lambdaFunction: fetchHourlyPayFunction
})
)
calculateEarningsByWorkHours.branch(
// 勤務時間は時間帯を複数指定可能とするため、Mapにより指定した数だけ集計する
new sfn.Map(this, 'mapProcess', {
itemsPath: sfn.JsonPath.stringAt('$.workHours'),
maxConcurrency: 1,
resultPath: '$.mapOutput'
}).itemProcessor(
new tasks.LambdaInvoke(this, 'calculateWorkHoursTask', {
lambdaFunction: calculateWorkHoursFunction
})
)
)
calculateEarningsByWorkHours.addCatch(
new sfn.Fail(this, 'parallelProcessFail', { cause: 'Error in parallel process' })
)
calculateEarningsByWorkHours
.next(
new tasks.LambdaInvoke(this, 'calculateMonthlyEarningsTask', {
lambdaFunction: calculateEarningsFunction,
payloadResponseOnly: true // レスポンスをPayloadのみにする
}).addCatch(
new sfn.Fail(this, 'calculateEarningsByWorkHoursFail', {
cause: 'calculateEarning Error'
})
)
)
.next(new sfn.Succeed(this, 'calculateEarningsByWorkHoursSucceed'))
// アルバイト給料ステートマシン
new sfn.StateMachine(this, 'PartTimeWorkerEarningsStateMachine', {
definitionBody: sfn.DefinitionBody.fromChainable(
new sfn.Choice(this, 'isYearMonthSpecified')
// 年月が指定されている場合、給料情報をDB等から取得する
.when(sfn.Condition.isPresent('$.yearMonth'), fetchMonthlyEarningsTask)
// 年月が指定されていない場合、勤務時間の指定をもとに給料を計算する
.otherwise(calculateEarningsByWorkHours)
)
})
}
}
Step Functionsでは特別なエラー名というのがある。リトライポリシーやフォールバックポリシー記載時には注意が必要。
- States.ALL: ワイルドカード。すべてのエラーに相当
- States.Runtime: どのエラーも対象にしたくないとき
上記は一部なので、詳細は以下を参照ください。
https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_stepfunctions-readme.html#error-names
動かしてみる
例外処理やリトライ処理含めて、ちゃんと動いた!
INPUT
{
"partTimeWorkerId": "123456",
"yearMonth": "202409"
}
OUTPUT
{
"statusCode": 200,
"body": "{\"montlyEarnings\": 40000}"
}
INPUT
{
"partTimeWorkerId": "123456",
"workHours": [
{
"start": "20241010 10:00",
"end": "20241010 12:00"
},
{
"start": "20241010 16:00",
"end": "20241010 17:00"
}
]
}
OUTPUT
{
"statusCode": 200,
"body": "{\"earnings\": 3000.0}"
}
さいごに
グラフビューがみやすい!
States.Error
などはハマりポイントだと思った。
参考文献