はじめに
AWS Step Functions の調査メモ。深堀りはせず概要を把握することが目標です。
Step Functions で Lambda 実行
Step FunctionsでCloudWatch logsにログを書き込むLambdaを実行します。
以下はログを書き込むLambda(Python3.8)です。
Lambda実行時に受け取ったJSONデータをログに記録します。
また、return で受け取ったJSONデータを返しています。
import json
def lambda_handler(event, context):
print("test desu {0}".format(event) )
return (event)
- 必要なIAMロールは、CluodWatch Logs の以下3つ
- CreateLogGroup
- CreateLogStream
- PutLogEvents
作成したStepFunctionsのjsonコードです。
{
"Comment": "This is your state machine",
"StartAt": "Lambda Test001",
"States": {
"Lambda Test001": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:test001:$LATEST",
"Payload.$": "$"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": true,
"Comment": "あいうえお"
}
}
}
Step FunctionsのWorkflow Studioです。
実行時のペイロードです
実行ステータスが成功と表示されればStep Functionsとしては成功しています
Lambda test001
の実行ログにStep Functions実行時のJSONデータが記録されています(期待した挙動)
ペイロードとは
ペイロードとは、StepFunctions 実行時に渡すJSON形式のデータのこと。
どうやら引数みたいなものらしい。

ペイロードには3つの設定があります。
設定 | 概要 |
---|---|
Use state input as payload | StepFuncitons 実行時に設定 |
Enter payload | StepFuncitonsコード内に設定 |
No payload | ペイロード不要 |
フローとは
フローとは、Step Functions に用意された条件分岐などのプログラム処理のことです。
2021年8月時点では7種類のフローがあります。

フロー | 概要 |
---|---|
Choice | 条件分岐 If文みたいなもの。 |
Parallel | 並行処理 |
Map | ループ処理 For文に似てる。同時実行数を設定できる。 |
Pass | 情報を渡すだけ。入力値をもとに出力値を作成できる。デバッグする際に便利 |
Wait | 指定した時間(秒)処理と止める。指定した日時まで処理を止めることもできる。 |
Success,Fail | Step Functions の実行結果を設定できる。 |
フロー使ってみる
7種類のフローを使ってみます
【Choice】使ってみる
Choice
は条件分岐の処理です。実行結果によって、実行する処理を変えることができます。
作成したStep Functionsのjsonデータです。
{
"Comment": "This is your state machine",
"StartAt": "Lambda Test001",
"States": {
"Lambda Test001": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:test001:$LATEST",
"Payload.$": "$"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Comment": "あいうえお",
"Next": "Choice"
},
"Choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.HOGE",
"StringEquals": "HOGE 200",
"Next": "Lambda HOGE 200"
}
],
"Default": "Lambda Not HOGE 200"
},
"Lambda HOGE 200": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:test001:$LATEST",
"Payload": {
"FUGA": "FUGA 100"
}
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": true
},
"Lambda Not HOGE 200": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:test001:$LATEST",
"Payload": {
"FUGA": "FUGA 200"
}
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": true
}
}
}
実行時のペイロード①
<実行結果①>
実行時のペイロード②
<実行結果②>
【Parallel】使ってみる
平行処理ができる様子
【Map】【Wait】使ってみる
lambda test001
を指定した回数実行する処理を作成します。
項目配列へのパス
に、JSONデータの中の配列情報をもつ箇所を指定します。
最大同時実行
を 1 に設定し、1回のループで1つ処理を行うようにしました。
lambda test001
に S3バケットにファイル出力する仕組みを追加
import json
import boto3
from datetime import datetime
s3 = boto3.resource('s3')
def lambda_handler(event, context):
bucket = 'tmp-hokdvhvwzw' # S3バケット名
key = 'test_' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + '.txt'
file_contents = str(event) # ファイルの内容
obj = s3.Object(bucket,key) # S3バケット名とパスを指定
obj.put( Body=file_contents ) # S3バケットにファイル出力
print("key = {0}, event = {1}".format(key, event) )
return (event)
- 必要なIAMロール
- CluodWatch Logs
- CreateLogGroup
- CreateLogStream
- PutLogEvents
- S3バケット
- AmazonS3FullAccess
- ↑ ホントはここまで強い権限は必要ない
- CluodWatch Logs
実行時に設定するJSONデータ
{
"detail": {
"shipped": [
{ "prod": "E01", "dest-code": 1001, "note": "AAA" },
{ "prod": "E02", "dest-code": 1002, "note": "BBB" },
{ "prod": "E03", "dest-code": 1003, "note": "CCC" },
{ "prod": "E04", "dest-code": 1004, "note": "DDD" },
{ "prod": "E05", "dest-code": 1005, "note": "EEE" }
]
}
}
S3バケットに出力されたファイル
PS C:\Users\user01> cat .\test_2021-08-08-23-18-06.txt
{'prod': 'E01', 'dest-code': 1001, 'note': 'AAA'}
PS C:\Users\user01> cat .\test_2021-08-08-23-18-36.txt
{'prod': 'E02', 'dest-code': 1002, 'note': 'BBB'}
PS C:\Users\user01> cat .\test_2021-08-08-23-19-06.txt
{'prod': 'E03', 'dest-code': 1003, 'note': 'CCC'}
PS C:\Users\user01> cat .\test_2021-08-08-23-19-36.txt
{'prod': 'E04', 'dest-code': 1004, 'note': 'DDD'}
PS C:\Users\user01> cat .\test_2021-08-08-23-20-07.txt
{'prod': 'E05', 'dest-code': 1005, 'note': 'EEE'}
PS C:\Users\user01>
【Pass】使ってみる
Pass
の次に実行するlambda test001
に渡すJSONデータを加工します。
lambdaはこれ ↓ を使用しました。
import json
def lambda_handler(event, context):
print("test desu {0}".format(event) )
return (event)
Pass
の設定で、HOGE,FUGA の値をもつJSONデータを作成します。FUGAの値は、ペイロードのJSONデータから取得します。
実行時に設定するJSONデータ
{
"FUGA": "FUGA 123"
}
Step Functions 実行後のLambda test001 のログ
2021-08-09T09:50:24.212+09:00 test desu {'HOGE': 'HOGE-1234567890', 'FUGA': 'FUGA 123'}
【Success】【Fail】使ってみる
ペイロードの中のHOGEの値が 100 の場合にSuccessとなり、それ以外はFailになる処理を作りました。
<実行結果>
作ってみた(1):S3バケット上のJSONデータをもとにループ処理する
S3バケットに格納したJSONデータを取得し、そのJSONデータの情報をもとにループ処理を実行します。
Lambda testLoadJSONfroms3
import boto3
import json
def lambda_handler(event, context):
print(event)
s3 = boto3.resource("s3")
bucket = s3.Bucket( event["bucketName"] )
jsonFile = bucket.Object(event["prefix"] + "/" + event["json"])
jsonData = jsonFile.get()
jsonInfo = jsonData['Body'].read()
return jsonInfo
import json
def lambda_handler(event, context):
print("test desu {0}".format(event) )
return (event)
※IAMロールの見直し※
作成したStep Functionsに2種類(testLoadJSONfroms3
,test001
)のLambdaを実行する権限があるかIAMロールを要確認
{
"Comment": "This is your state machine",
"StartAt": "Lambda LoadJSONfromS3",
"States": {
"Lambda LoadJSONfromS3": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:testLoadJSONfromS3:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "Pass (Debug)"
},
"Pass (Debug)": {
"Type": "Pass",
"Next": "Pass (Filter)"
},
"Pass (Filter)": {
"Type": "Pass",
"InputPath": "$.Payload",
"Next": "Map"
},
"Map": {
"Type": "Map",
"End": true,
"Iterator": {
"StartAt": "Lambda TEST001",
"States": {
"Lambda TEST001": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:test001:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "Wait"
},
"Wait": {
"Type": "Wait",
"Seconds": 5,
"End": true
}
}
},
"MaxConcurrency": 1
}
}
}
ペイロードには、jsonデータがあるS3バケットとプレフィックス(ディレクトリ)を指定します。
{
"bucketName": {{S3バケット名}}
"prefix": {{プレフィックス}}
"json": {{jsonファイル名}}
}
S3バケットにあるJSONデータ
[
{"StackName": "test001-vpc","Code": "vpc.yml", "PJPrefix": "Project1","VPCCIDR": "10.11.0.0/16"},
{"StackName": "test001-subnet1","Code": "subnet-public.yml", "PJPrefix": "Project1","NetworkName": "Net001","PublicSubnetCIDR": "10.11.1.0/24", "AZName": "ap-northeast-1a"},
{"StackName": "test001-sg","Code": "sg.yml", "PJPrefix": "Project1","ServiceName": "ServiceA","SGNo": "001"}
]
作ってみた(2):RunCommandを実行し結果を取得する
EC2に対しRunCommandでOSコマンドを実行し、その結果取得するStep Functionsを作ります。
Step Functions のjsonコードはこちら。
{
"Comment": "This is your state machine",
"StartAt": "Lambda RunCommand",
"States": {
"Lambda RunCommand": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:testRunCommand:$LATEST",
"Payload.$": "$"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "Wait"
},
"Wait": {
"Type": "Wait",
"Seconds": 1,
"Next": "Lambda RunCommand2"
},
"Lambda RunCommand2": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:testRunCommand2:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "Choice"
},
"Choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.com_status",
"StringEquals": "Success",
"Next": "Lambda RunCommand3"
},
{
"Variable": "$.com_status",
"StringEquals": "InProgress",
"Next": "Wait"
}
],
"Default": "Fail"
},
"Lambda RunCommand3": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-northeast-1:000000000000:function:testRunCommand3:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": true
},
"Fail": {
"Type": "Fail"
}
}
}
Lambda RunCommand
はRunCommandを実行します
import json
import boto3
ssm = boto3.client('ssm')
def lambda_handler(event, context):
command = event["command"]
instance_id = event["instance_id"]
r = ssm.send_command(
InstanceIds = [instance_id],
DocumentName = "AWS-RunShellScript",
Parameters = {
"commands": [ command ]
}
)
command_id = r['Command']['CommandId']
print("command_id = {0}".format(command_id) )
return { 'command_id': command_id }
Lambda RunCommand2
RunCommandの実行結果を取得します。
invocations[0]['Status']は、処理中なら InProgress
実行成功なら Success
実行失敗なら Failed
となります。
import json
import boto3
ssm = boto3.client('ssm')
def lambda_handler(event, context):
command_id = event["command_id"]
res = ssm.list_command_invocations(
CommandId = command_id,
Details = True
)
invocations = res['CommandInvocations']
com_status = invocations[0]['Status']
output = "output={0}".format(invocations[0]['CommandPlugins'][0]['Output'])
return { 'command_id': command_id, 'com_status': com_status, 'output': output }
Lambda RunCommand3
は特になにも処理していません。
import json
def lambda_handler(event, context):
output = event["output"]
return { 'statusCode': 200 }
StepFunctions実行時のペイロード
ペイロードには、実行するコマンドと実行するEC2のインスタンスIDを設定します。
{
"command": "ifconfig",
"instance_id": "i-XXXXXXXXXXX"
}
{
"command": "BADcommand",
"instance_id": "i-XXXXXXXXXXX"
}
<実行結果>
さいごに
AWS Step Functionsの使い方がちょっとわかった気がします。