Stepfunctionsのワークフローを記述するとき
「あれ、InputPathとItemsPathって何を書くんだっけ?」
と、Mapでよくある部分をyamlで書いてみる。
値をそのまま使う場合
Map内のInputを単純に使う方法。
下記がFirstState
で整数を格納した配列を生成して、その各値をMap内で処理するもの。
yaml
StartAt: FirstState
States:
FirstState:
Type: Task
Resource: arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME_1
InputPath: $.input
ResultPath: $.parallel_input
OutputPath: $
Next: ParallelProcess
ParallelProcess:
Type: Map
InputPath: $
ItemsPath: $.parallel_input
ResultPath: $.parallel_result
Iterator:
StartAt: MapIterator
States:
MapIterator:
Type: Task
Resource: arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME_2
InputPath: $
OutputPath: $
End: true
End: true
End: true
Lambda
FirstState
import random
def some_process(x: int):
return [random.randint(1, 10) for i in range(x)]
def lambda_handler(event, context):
# 入力値の取得
input_value = event['x']
# 入力値の数だけ1~10の整数を生成
return some_process(input_value)
MapIterator
def some_process(x: int):
return x * 2
def lambda_handler(event, context):
# 入力値に2倍にする
return some_process(event)
入力
{
"input": {
"x": 3
}
}
最終出力
{
"output": {
"input": {
"x": 3
},
"parallel_input": [
5,
9,
2
],
"parallel_result": [
10,
18,
4
]
},
"outputDetails": {
"truncated": false
}
}
Key: Value方式
辞書型データで渡してなんらかの処理をする、よくあると思う。
下記はFirstState
で辞書型データを格納した配列を生成して、その各辞書型データをMap内で処理するもの。
StartAt: FirstState
States:
FirstState:
Type: Task
Resource: arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME_1
InputPath: $.input
ResultPath: $.parallel_input
OutputPath: $
Next: ParallelProcess
ParallelProcess:
Type: Map
InputPath: $
ItemsPath: $.parallel_input
Parameters:
# 「y」を除外して固定値「text」を追加する
x: $$.Map.Item.Value.x
z: $$.Map.Item.Value.z
text: hoge
ResultPath: $.parallel_result
Iterator:
StartAt: MapIterator
States:
MapIterator:
Type: Task
Resource: arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME_2
InputPath: $
OutputPath: $
End: true
End: true
End: true
Lambda
FirstState
import random
def some_process(x: int):
return [
{
'x': random.randint(1, 10),
'y': random.randint(1, 10),
'z': random.randint(1, 10),
} for i in range(x)
]
def lambda_handler(event, context):
# 入力値の取得
input_value = event['x']
# 入力値の数だけx,y,zのキーを持つ1~10の整数を生成
return some_process(input_value)
MapIterator
def some_process(input: dict):
return {key: value * 2 for key, value in input.items()}
def lambda_handler(event, context):
# 入力値の値だけ2倍にする
return some_process(event)
入力
{
"input": {
"x": 3
}
}
最終出力
{
"output": {
"input": {
"x": 3
},
"parallel_input": [
{
"x": 10,
"y": 2,
"z": 7
},
{
"x": 1,
"y": 10,
"z": 8
},
{
"x": 1,
"y": 2,
"z": 8
}
],
"parallel_result": [
{
"text": "hogehoge",
"x": 20,
"z": 14
},
{
"text": "hogehoge",
"x": 2,
"z": 16
},
{
"text": "hogehoge",
"x": 2,
"z": 16
}
]
},
"outputDetails": {
"truncated": false
}
}
補足
最終出力のoutput
部分がフロー内の$
に相当する。
Map処理後にMapの結果を参照したい場合は、ParallelProcess
で設定したResultPath
を指定する。