LoginSignup
0
0

More than 1 year has passed since last update.

【Stepfunctions】yamlで記述する ~Map編~

Last updated at Posted at 2023-02-03

Stepfunctionsのワークフローを記述するとき
「あれ、InputPathとItemsPathって何を書くんだっけ?」
と、Mapでよくある部分をyamlで書いてみる。

値をそのまま使う場合

Map内のInputを単純に使う方法。
下記がFirstStateで整数を格納した配列を生成して、その各値をMap内で処理するもの。

yaml

StartAt: FirstState
States:
FirstState:
  Type: Task
  Resource: arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME_1
  InputPath: $.input
  ResultPath: $.parallel_input
  OutputPath: $
  Next: ParallelProcess
ParallelProcess:
  Type: Map
  InputPath: $
  ItemsPath: $.parallel_input
  ResultPath: $.parallel_result
  Iterator:
    StartAt: MapIterator
    States:
      MapIterator:
        Type: Task
        Resource: arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME_2
        InputPath: $
        OutputPath: $
        End: true
  End: true
End: true

Lambda

FirstState
import random

def some_process(x: int):
    return [random.randint(1, 10) for i in range(x)]

def lambda_handler(event, context):
    # 入力値の取得
    input_value = event['x']
    
    # 入力値の数だけ1~10の整数を生成
    return some_process(input_value)
MapIterator
def some_process(x: int):
    return x * 2

def lambda_handler(event, context):
    # 入力値に2倍にする
    return some_process(event)

入力

{
    "input": {
        "x": 3
    }
}

最終出力

{
  "output": {
    "input": {
      "x": 3
    },
    "parallel_input": [
      5,
      9,
      2
    ],
    "parallel_result": [
      10,
      18,
      4
    ]
  },
  "outputDetails": {
    "truncated": false
  }
}

Key: Value方式

辞書型データで渡してなんらかの処理をする、よくあると思う。
下記はFirstStateで辞書型データを格納した配列を生成して、その各辞書型データをMap内で処理するもの。

StartAt: FirstState
States:
FirstState:
  Type: Task
  Resource: arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME_1
  InputPath: $.input
  ResultPath: $.parallel_input
  OutputPath: $
  Next: ParallelProcess
ParallelProcess:
  Type: Map
  InputPath: $
  ItemsPath: $.parallel_input
  Parameters:
    # 「y」を除外して固定値「text」を追加する
    x: $$.Map.Item.Value.x
    z: $$.Map.Item.Value.z
    text: hoge
  ResultPath: $.parallel_result
  Iterator:
    StartAt: MapIterator
    States:
      MapIterator:
        Type: Task
        Resource: arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME_2
        InputPath: $
        OutputPath: $
        End: true
  End: true
End: true

Lambda

FirstState
import random

def some_process(x: int):
    return [
        {
            'x': random.randint(1, 10),
            'y': random.randint(1, 10),
            'z': random.randint(1, 10),
        } for i in range(x)
    ]

def lambda_handler(event, context):
    # 入力値の取得
    input_value = event['x']
    
    # 入力値の数だけx,y,zのキーを持つ1~10の整数を生成
    return some_process(input_value)
MapIterator
def some_process(input: dict):
    return {key: value * 2 for key, value in input.items()}

def lambda_handler(event, context):
    # 入力値の値だけ2倍にする
    return some_process(event)

入力

{
    "input": {
        "x": 3
    }
}

最終出力

{
  "output": {
    "input": {
      "x": 3
    },
    "parallel_input": [
      {
        "x": 10,
        "y": 2,
        "z": 7
      },
      {
        "x": 1,
        "y": 10,
        "z": 8
      },
      {
        "x": 1,
        "y": 2,
        "z": 8
      }
    ],
    "parallel_result": [
      {
        "text": "hogehoge",
        "x": 20,
        "z": 14
      },
      {
        "text": "hogehoge",
        "x": 2,
        "z": 16
      },
      {
        "text": "hogehoge",
        "x": 2,
        "z": 16
      }
    ]
  },
  "outputDetails": {
    "truncated": false
  }
}

補足

最終出力のoutput部分がフロー内の$に相当する。
Map処理後にMapの結果を参照したい場合は、ParallelProcessで設定したResultPathを指定する。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0