この記事はケーシーエスキャロット Advent Calendar 2022の4日目の記事です。
StepFunctions
では parallel ステートを使用すると並列処理が行えますが、受け渡し方法でハマってしまったのでメモを残します。
以下のように、lambda aaa
と lambda bbb
を並行に処理した後、受け取った値を DnamoDB API を使用して登録する流れだったとします。
(諸事情により、パワポで図を作成している為、細部は異なる可能性があります)
{
"StartAt": "Parallel lambda",
"States": {
"Parallel lambda": {
"Type": "Parallel",
"Next": "DynamoDB record",
"Branches": [
{
"StartAt": "lambda aaa",
"States": {
"lambda aaa": {
"Type": "Task",
"Resource": "arn:aws:lambda:regin:account_id:function:lambda_aaa",
"End": true
}
}
},
{
"StartAt": "lambda bbb",
"States": {
"lambda bbb": {
"Type": "Task",
"Resource": "arn:aws:lambda:regin:account_id:function:lambda_bbb",
"End": true
}
}
}
]
},
"DynamoDB record": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "lambda_parallel",
"Item": {
"aaa": {
"S.$": "$.ret_a"
},
"bbb": {
"S.$": "$.ret_b"
},
}
},
"ResultPath": "$.DynamoDB",
"End": true
}
}
}
補足として、並行処理に入る際には
{"input": "data"}
というデータが渡ってきており、並行処理を行う各 lambda
では以下のような値を次ステートへ渡しています。
def handler(event, context):
return {"ret_a": "AAA"}
def handler(event, context):
return {"ret_b": "BBB"}
resultPath にはデフォルトで $
が適用されており、parallel
でない場合は上記 ASL のように "$.ret_a"
で受け取れているのですが、実行してみると以下のようなエラーとなりました。
An error occurred while executing the state 'Record DynamoDB'.
The JSONPath '$.ret_a' specified for the field 'S.$' could not be found in the input '[{"ret_a": "AAA"},{"ret_b": "BBB"}]'
どうやら parallel
の場合、出力値の最上位が {}
でなく []
形式で受け取っているようです。
じゃあ、$
部分をリストのような形にすれば良いのかなと思って、以下のようにしてみました。
"Parameters": {
"TableName": "lambda_parallel",
"Item": {
"aaa": {
"S.$": "$.[0].ret_a"
},
"bbb": {
"S.$": "$.[1].ret_b"
},
}
}
…ダメでした。orz
Unable to apply step "DynamoDB" to input [{"ret_a": "AAA"},{"ret_b": "BBB"}]
以下のようなパターンも試してみましたが、上記と同じエラー。
"Parameters": {
"TableName": "lambda_parallel",
"Item": {
"aaa": {
"S.$": "$[0].ret_a"
},
"bbb": {
"S.$": "$[1].ret_b"
},
}
}
何となく []
形式で受け取っているのがダメなのか?と思い、parallel
ステートで"ResultPath": "$.ret"
を指定した上で、以下のように変更して実行。
"Parallel lambda": {
"Type": "Parallel",
"ResultPath": "$.ret",
"Next": "DynamoDB record",
...(snip)...
"DynamoDB record": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "lambda_parallel",
"Item": {
"aaa": {
"S.$": "$.ret[0].ret_a"
},
"bbb": {
"S.$": "$.ret[1].ret_b"
},
}
}
通った!(下図もパワポで図を作成している為、細部が異なる可能性があります)
DynamoDB
へも登録されていたのですが、Parallel lambda
の出力を確認すると、以下のように入力値も含まれた状態となっていました。
{
"input": "data",
"ret": [
{
"ret_a": "AAA"
},
{
"ret_b": "BBB"
}
]
}
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/input-output-resultpath.html
上記サイトによると、$
以外を指定すると出力値に入力値も含まれてしまう模様。
含まれていても良いのだけれど、出来れば出力値だけにしたいと思っていたところ、OutputPath というものを見つけました。
どうやらフィルタリングが行えるらしいので、
"Parallel lambda": {
"Type": "Parallel",
"ResultPath": "$.parallel.ret",
"OutputPath": "$.parallel",
"Next": "DynamoDB record",
とした上で実行してみました。
{
"ret": [
{
"ret_a": "AAA"
},
{
"ret_b": "BBB"
}
]
}
出力値だけになりました!
以上、実現は出来ましたが、ASL
はややこしいですね。
なお、今回は parallel
後にそのまま DnamoDB API
を呼び出す流れとなっていますが、間に lambda
を入れて
[
{
"ret_a": "AAA"
},
{
"ret_b": "BBB"
}
]
を以下のように整形させれば、ResultPath
等の設定は不要です。
{
{
"ret_a": "AAA"
},
{
"ret_b": "BBB"
}
}
ということで lambda
稼働時間を許容出来るのであれば、ASL
自体はシンプルに書けます。
(もしくは、ResultPath
等を指定しなくても実現出来る方法があるのかも?)