こんにちは。ユビキタス言語1年生の人です。
先日、S3バケットの特定prefixを持つ(同じフォルダ配下にある)オブジェクトをそれぞれLambdaで処理したい・・・と有識者の人に相談したら、テンション高めで「Step Functions Distributed Mapが適しています!」とおススメされたので早速使ってみました。
使ってから知ったのですが、AWSのグローバルイベントre:Invent 2022で発表されたばかりのアップデート機能でした。それまで、Step Functionsを使ったことも触ったこともなかったのですが、ものすごく便利だったのでご紹介します。
Step Functionsの初心者が知ったこと
Step FunctionsはWorkflow StudioというGUIの視覚的にドラッグ&ドロップで作れるワークフローのモードと、jsonで定義されたAmazon States Language (ASL) というコードでワークフローを作るモードがあります。最初にフロー全体の骨組みを作るときはWorkflow Studioが作りやすいですが、ちょこっと更新するだけの場合はASLを修正する方が楽でした。
例:Lambda関数を1回実行するステートマシン
Workflow Studio
Amazon States Language (ASL)
{
"Comment": "A description of my state machine",
"StartAt": "Test Lambda",
"States": {
"Test Lambda": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:test-lambda:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": true
}
}
}
本題の「特定のprefixを持つ(同じフォルダ配下にある)複数オブジェクトを入力にLambda並列実行」するステートマシン
前提
- S3バケットへのオブジェクト作成(Object Created)のイベントを元にStep Functionsのステートマシンを起動する想定です。
概要
- Pass 入力情報の変換を行います
- Parameters 組み込み関数のStringSplitとArrayGetItemを使い、オブジェクトのKeyから '/'で区切ってフォルダ名を取得します(prefixという変数にセット)
- Map S3オブジェクトを取得してLambdaを並列実行します
- ItemReader バケット名(\$.myinput.bucket)、プレフィックス(\$.myinput.prefix)に合致するオブジェクト情報を取得
- ItemSelector Lambdaに各オブジェクトの情報を動的に渡す
- Test Lambda 各オブジェクトの情報をinputとした、Lambda関数(Test Lambda)を並列実行。入力フォーマットはS3イベント通知のフォーマットと同じにしています。
Workflow Studio
Amazon States Language (ASL)
{
"Comment": "A description of my state machine",
"StartAt": "Pass",
"States": {
"Pass": {
"Type": "Pass",
"Next": "Map",
"Parameters": {
"bucket.$": "$.detail.bucket.name",
"prefix.$": "States.ArrayGetItem(States.StringSplit($.detail.object.key,'/'),0)"
},
"ResultPath": "$.myinput"
},
"Map": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "Test Lambda",
"States": {
"Test Lambda": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:test-lambda"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": true
}
}
},
"End": true,
"Label": "Map",
"MaxConcurrency": 1000,
"ItemReader": {
"Resource": "arn:aws:states:::s3:listObjectsV2",
"Parameters": {
"Bucket.$": "$.myinput.bucket",
"Prefix.$": "$.myinput.prefix"
}
},
"ItemSelector": {
"objectkey.$": "$$.Map.Item.Value.Key",
"Records": [
{
"s3": {
"bucket": {
"name.$": "$.myinput.bucket"
},
"object": {
"key.$": "$$.Map.Item.Value.Key"
}
}
}
]
}
}
}
}
実際に試せるテスト用jsonも記載しておきます。
{
"detail": {
"bucket": {
"name": "<バケット名>"
},
"object": {
"key": "<フォルダ>/<ファイル名>"
}
}
}
最後に
並列処理のテストもStep Functionsを使うとデバッグしやすく、Map内でのエラーもキャッチもでき、非常に便利でした。これからもStep Functionsを勉強して使いこなしていきたいと思います!