LoginSignup
5
6

More than 1 year has passed since last update.

AWS Step Functions 調査メモ

Last updated at Posted at 2021-08-09

はじめに

AWS Step Functions の調査メモ。深堀りはせず概要を把握することが目標です。

Step Functions で Lambda 実行

Step FunctionsでCloudWatch logsにログを書き込むLambdaを実行します。

以下はログを書き込むLambda(Python3.8)です。
Lambda実行時に受け取ったJSONデータをログに記録します。
また、return で受け取ったJSONデータを返しています。

test001
import json

def lambda_handler(event, context):
    print("test desu {0}".format(event) )
    return (event)
  • 必要なIAMロールは、CluodWatch Logs の以下3つ
    • CreateLogGroup
    • CreateLogStream
    • PutLogEvents

作成したStepFunctionsのjsonコードです。

MyStateMachine001
{
  "Comment": "This is your state machine",
  "StartAt": "Lambda Test001",
  "States": {
    "Lambda Test001": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:test001:$LATEST",
        "Payload.$": "$"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "End": true,
      "Comment": "あいうえお"
    }
  }
}

Step FunctionsのWorkflow Studioです。

image.png

実行時のペイロードです

image.png

実行ステータスが成功と表示されればStep Functionsとしては成功しています

image.png

Lambda test001 の実行ログにStep Functions実行時のJSONデータが記録されています(期待した挙動)

image.png

ペイロードとは

ペイロードとは、StepFunctions 実行時に渡すJSON形式のデータのこと。
どうやら引数みたいなものらしい。

ペイロードには3つの設定があります。

設定 概要
Use state input as payload StepFuncitons 実行時に設定
Enter payload StepFuncitonsコード内に設定
No payload ペイロード不要

フローとは

フローとは、Step Functions に用意された条件分岐などのプログラム処理のことです。
2021年8月時点では7種類のフローがあります。

フロー 概要
Choice 条件分岐 If文みたいなもの。
Parallel 並行処理
Map ループ処理 For文に似てる。同時実行数を設定できる。
Pass 情報を渡すだけ。入力値をもとに出力値を作成できる。デバッグする際に便利
Wait 指定した時間(秒)処理と止める。指定した日時まで処理を止めることもできる。
Success,Fail Step Functions の実行結果を設定できる。

フロー使ってみる

7種類のフローを使ってみます


【Choice】使ってみる

Choiceは条件分岐の処理です。実行結果によって、実行する処理を変えることができます。

図1.png

作成したStep Functionsのjsonデータです。

{
  "Comment": "This is your state machine",
  "StartAt": "Lambda Test001",
  "States": {
    "Lambda Test001": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:test001:$LATEST",
        "Payload.$": "$"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Comment": "あいうえお",
      "Next": "Choice"
    },
    "Choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.HOGE",
          "StringEquals": "HOGE 200",
          "Next": "Lambda HOGE 200"
        }
      ],
      "Default": "Lambda Not HOGE 200"
    },
    "Lambda HOGE 200": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:test001:$LATEST",
        "Payload": {
          "FUGA": "FUGA 100"
        }
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "End": true
    },
    "Lambda Not HOGE 200": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:test001:$LATEST",
        "Payload": {
          "FUGA": "FUGA 200"
        }
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "End": true
    }
  }
}

実行時のペイロード①

image.png

<実行結果①>

image.png

実行時のペイロード②

image.png

<実行結果②>

image.png


【Parallel】使ってみる

平行処理ができる様子

image.png


【Map】【Wait】使ってみる

lambda test001 を指定した回数実行する処理を作成します。

image.png

項目配列へのパスに、JSONデータの中の配列情報をもつ箇所を指定します。
最大同時実行を 1 に設定し、1回のループで1つ処理を行うようにしました。

lambda test001に S3バケットにファイル出力する仕組みを追加

(Lambda)test001
import json
import boto3
from datetime import datetime

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    bucket = 'tmp-hokdvhvwzw'    # S3バケット名
    key = 'test_' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + '.txt'
    file_contents = str(event)  # ファイルの内容

    obj = s3.Object(bucket,key)     # S3バケット名とパスを指定
    obj.put( Body=file_contents )   # S3バケットにファイル出力

    print("key = {0}, event = {1}".format(key, event) )
    return (event)
  • 必要なIAMロール
    • CluodWatch Logs
      • CreateLogGroup
      • CreateLogStream
      • PutLogEvents
    • S3バケット
      • AmazonS3FullAccess
      • ↑ ホントはここまで強い権限は必要ない

実行時に設定するJSONデータ

{
  "detail": {
    "shipped": [
      { "prod": "E01", "dest-code": 1001, "note": "AAA" },
      { "prod": "E02", "dest-code": 1002, "note": "BBB" },
      { "prod": "E03", "dest-code": 1003, "note": "CCC" },
      { "prod": "E04", "dest-code": 1004, "note": "DDD" },
      { "prod": "E05", "dest-code": 1005, "note": "EEE" }
    ]
  }
}

S3バケットに出力されたファイル

PS C:\Users\user01> cat .\test_2021-08-08-23-18-06.txt
{'prod': 'E01', 'dest-code': 1001, 'note': 'AAA'}
PS C:\Users\user01> cat .\test_2021-08-08-23-18-36.txt
{'prod': 'E02', 'dest-code': 1002, 'note': 'BBB'}
PS C:\Users\user01> cat .\test_2021-08-08-23-19-06.txt
{'prod': 'E03', 'dest-code': 1003, 'note': 'CCC'}
PS C:\Users\user01> cat .\test_2021-08-08-23-19-36.txt
{'prod': 'E04', 'dest-code': 1004, 'note': 'DDD'}
PS C:\Users\user01> cat .\test_2021-08-08-23-20-07.txt
{'prod': 'E05', 'dest-code': 1005, 'note': 'EEE'}
PS C:\Users\user01>

【Pass】使ってみる

Passの次に実行するlambda test001 に渡すJSONデータを加工します。
lambdaはこれ ↓ を使用しました。

test001
import json

def lambda_handler(event, context):
    print("test desu {0}".format(event) )
    return (event)

Passの設定で、HOGE,FUGA の値をもつJSONデータを作成します。FUGAの値は、ペイロードのJSONデータから取得します。

image.png

実行時に設定するJSONデータ

{
  "FUGA": "FUGA 123"
}

Step Functions 実行後のLambda test001 のログ

2021-08-09T09:50:24.212+09:00   test desu {'HOGE': 'HOGE-1234567890', 'FUGA': 'FUGA 123'}

【Success】【Fail】使ってみる

ペイロードの中のHOGEの値が 100 の場合にSuccessとなり、それ以外はFailになる処理を作りました。

image.png

<実行結果>

image.png


作ってみた(1):S3バケット上のJSONデータをもとにループ処理する

S3バケットに格納したJSONデータを取得し、そのJSONデータの情報をもとにループ処理を実行します。

image.png

Lambda testLoadJSONfroms3

testLoadJSONfroms3
import boto3
import json

def lambda_handler(event, context):
    print(event)
    s3 = boto3.resource("s3")
    bucket = s3.Bucket( event["bucketName"] )
    jsonFile = bucket.Object(event["prefix"] + "/" + event["json"])
    jsonData = jsonFile.get()
    jsonInfo = jsonData['Body'].read()

    return jsonInfo
test001
import json

def lambda_handler(event, context):
    print("test desu {0}".format(event) )
    return (event)

※IAMロールの見直し※
作成したStep Functionsに2種類(testLoadJSONfroms3,test001)のLambdaを実行する権限があるかIAMロールを要確認

StepFunctions
{
  "Comment": "This is your state machine",
  "StartAt": "Lambda LoadJSONfromS3",
  "States": {
    "Lambda LoadJSONfromS3": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:testLoadJSONfromS3:$LATEST"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Next": "Pass (Debug)"
    },
    "Pass (Debug)": {
      "Type": "Pass",
      "Next": "Pass (Filter)"
    },
    "Pass (Filter)": {
      "Type": "Pass",
      "InputPath": "$.Payload",
      "Next": "Map"
    },
    "Map": {
      "Type": "Map",
      "End": true,
      "Iterator": {
        "StartAt": "Lambda TEST001",
        "States": {
          "Lambda TEST001": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "OutputPath": "$.Payload",
            "Parameters": {
              "Payload.$": "$",
              "FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:test001:$LATEST"
            },
            "Retry": [
              {
                "ErrorEquals": [
                  "Lambda.ServiceException",
                  "Lambda.AWSLambdaException",
                  "Lambda.SdkClientException"
                ],
                "IntervalSeconds": 2,
                "MaxAttempts": 6,
                "BackoffRate": 2
              }
            ],
            "Next": "Wait"
          },
          "Wait": {
            "Type": "Wait",
            "Seconds": 5,
            "End": true
          }
        }
      },
      "MaxConcurrency": 1
    }
  }
}

ペイロードには、jsonデータがあるS3バケットとプレフィックス(ディレクトリ)を指定します。

ペイロード
{
    "bucketName": {{S3バケット名}}
    "prefix": {{プレフィックス}}
    "json": {{jsonファイル名}}
}

S3バケットにあるJSONデータ

S3バケットにあるJSONデータ
[
    {"StackName": "test001-vpc","Code": "vpc.yml", "PJPrefix": "Project1","VPCCIDR": "10.11.0.0/16"},
    {"StackName": "test001-subnet1","Code": "subnet-public.yml", "PJPrefix": "Project1","NetworkName": "Net001","PublicSubnetCIDR": "10.11.1.0/24", "AZName": "ap-northeast-1a"},
    {"StackName": "test001-sg","Code": "sg.yml", "PJPrefix": "Project1","ServiceName": "ServiceA","SGNo": "001"}
]

作ってみた(2):RunCommandを実行し結果を取得する

EC2に対しRunCommandでOSコマンドを実行し、その結果取得するStep Functionsを作ります。

image.png

Step Functions のjsonコードはこちら。

StepFunctions
{
  "Comment": "This is your state machine",
  "StartAt": "Lambda RunCommand",
  "States": {
    "Lambda RunCommand": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:testRunCommand:$LATEST",
        "Payload.$": "$"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Next": "Wait"
    },
    "Wait": {
      "Type": "Wait",
      "Seconds": 1,
      "Next": "Lambda RunCommand2"
    },
    "Lambda RunCommand2": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:testRunCommand2:$LATEST"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Next": "Choice"
    },
    "Choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.com_status",
          "StringEquals": "Success",
          "Next": "Lambda RunCommand3"
        },
        {
          "Variable": "$.com_status",
          "StringEquals": "InProgress",
          "Next": "Wait"
        }
      ],
      "Default": "Fail"
    },
    "Lambda RunCommand3": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:testRunCommand3:$LATEST"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "End": true
    },
    "Fail": {
      "Type": "Fail"
    }
  }
}

Lambda RunCommand はRunCommandを実行します

RunCommand
import json
import boto3

ssm = boto3.client('ssm')

def lambda_handler(event, context):
    command = event["command"]
    instance_id = event["instance_id"]

    r = ssm.send_command(
        InstanceIds = [instance_id],
        DocumentName = "AWS-RunShellScript",
        Parameters = {
            "commands": [ command ]    
        }
    )
    command_id = r['Command']['CommandId']
    print("command_id = {0}".format(command_id) )

    return { 'command_id': command_id }

Lambda RunCommand2 RunCommandの実行結果を取得します。
invocations[0]['Status']は、処理中なら InProgress 実行成功なら Success 実行失敗なら Failedとなります。

RunCommand2
import json
import boto3

ssm = boto3.client('ssm')

def lambda_handler(event, context):
    command_id = event["command_id"]
    res = ssm.list_command_invocations(
              CommandId = command_id,
              Details = True
          )
    invocations = res['CommandInvocations']
    com_status = invocations[0]['Status']
    output = "output={0}".format(invocations[0]['CommandPlugins'][0]['Output'])

    return { 'command_id': command_id, 'com_status': com_status, 'output': output }

Lambda RunCommand3 は特になにも処理していません。

RunCommand3
import json

def lambda_handler(event, context):
    output = event["output"]
    return { 'statusCode': 200 }

StepFunctions実行時のペイロード
ペイロードには、実行するコマンドと実行するEC2のインスタンスIDを設定します。

ペイロード(コマンド成功)
{
    "command": "ifconfig",
    "instance_id": "i-XXXXXXXXXXX"
}
ペイロード(コマンド失敗)
{
    "command": "BADcommand",
    "instance_id": "i-XXXXXXXXXXX"
}

<実行結果>

image.png

さいごに

AWS Step Functionsの使い方がちょっとわかった気がします。

5
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
6