概要
前処理にて終了パラメータ個数分Lambdaを並列処理実行させることを実装するにはどうしたらいいかと調べつつできたのでメモしておく。
例えば、前処理にてS3へPUTしたファイルの複数のパスを後続へ渡し、各Lambdaがそれぞれの1つのファイルパスを受け取り処理をしたい場合に有効である。
今回はこちらを例にして説明する。
StepFunctionsのワークフロー
①Lambda Invokeで分割した複数ファイルをS3へPUTしたKeyをパラメータで後続に渡す
②並列処理を実行するパラメータをそれぞれLambda Invoke_2に渡す
③Lambda Invoke_2の処理を実行する
実装方法
1. test_Invokeの作成
import json
def lambda_handler(event, context):
# TODO implement
return {
'statusCode': 200,
'body': 'Hello from Lambda!',
'count_list': [
{"count":"3"},
{"count":"21"},
{"count":"39"},
{"count":"40"},
{"count":"52"},
{"count":"60"}
]
}
2. test_Invoke_2の作成
import json
def lambda_handler(event, context):
print(event["count"])
# return {
# 'statusCode': 200
# }
今回は前処理で渡ってきたパラメータを受け取り出力する。(6個のtest_Invoke_2が並列で起動する想定)
3. StepFunctionsのステートマシーンの作成と設定
今回は「test_function」という名前でステートマシーンを作成する。
①「ステートマシーンの作成」をクリック
②タイプは標準で「次へ」
③ワークフロー設計にて1つめのAWS Lambda InvokeをD&D
<設定>
状態名:Test Invoke
API パラメータのFunction nameの入力を選択してtest_InvokeのARNをC&P
その他の入力・出力・エラー処理設定はそのまま。
④ワークフロー設計にてTest Invokeの下にMapをD&D
<設定>
状態名:Map
項目ソースの項目配列へのパスを指定 - 省略可能にチェックを入れ以下を入力
💡 $.Payload.count_listその他の入力・出力・エラー処理設定はそのまま。
⑤ワークフロー設計にて2つめのAWS Lambda InvokeをD&D
<設定>
状態名:Test Invoke 2
API パラメータのFunction nameの入力を選択してtest_Invoke_2のARNをC&P
その他の入力・出力・エラー処理設定はそのまま。
実行
StepFunctionsのステートマシーンtest_functionの「実行の開始」をクリックして実行し、CloudWatchよりログを確認してみる。無事に受け取ったパラメータ個数分のLambdaが起動していて各パラメータを出力できていることが確認できるはず。