Part.3のおさらい
Part3では、主に以下のことを行いました。
- Lambdaでランダムな結果を返す
- 結果に応じて、次に実行するタスクを変える
- リトライ時は、しばらく時間が経ってから再実行する
今回は、その続きからはじめてみます。
ここから始める方は、
https://github.com/hito-psv/sam-demo-005
のコードをgit clone
してもらっても大丈夫ですし、Part3からやってみてもらっても構いません。
今回のターゲット
- Lamda関数の戻り値をSQSに投げる
をターゲットにしてみたいと思います。
SQSを定義する
まずは、template.yml
を修正し、
- SQSの定義そのものを追加
- SQSのポリシーを追加(Step Functionsから呼べるように)
- Step FunctionsのポリシーにSQSアクセスを追加
です。
変更後のtemplate.yml
は、こんな感じです。
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
sam-app
Sample SAM Template for sam-app
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 3
Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
Properties:
CodeUri: hello_world/
Handler: app.lambda_handler
Runtime: python3.8
Role: !GetAtt HelloWorldFunctionRole.Arn
HelloWorldFunctionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/AWSStepFunctionsReadOnlyAccess
StateMachine:
Type: AWS::Serverless::StateMachine
Properties:
DefinitionUri: step_functions/state_machine.json
DefinitionSubstitutions:
HelloWorldFunction: !GetAtt HelloWorldFunction.Arn
SqsQueue: !Ref SqsQueue
Role: !GetAtt StateMachineRole.Arn
StateMachineRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- states.amazonaws.com
Action:
- sts:AssumeRole
Path: /
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaRole
- arn:aws:iam::aws:policy/AmazonSQSFullAccess
SqsQueue:
Type: AWS::SQS::Queue
SqsQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Id: !Ref SqsQueue
Statement:
- Effect: Allow
Principal: "*"
Action: "SQS:*"
Resource: !GetAtt SqsQueue.Arn
Queues:
- !Ref SqsQueue
SAMのテンプレートは、基本的にCloud Formationの定義と同等ですので、作成したリソースの情報は、!Ref
や!GetAtt
関数で置き換えることができます。
あと、重要な点が一つあり、StateMachine
のProperties
にあるDefinitionSubstitutions
に、SqsQueue
の追加をお忘れなく。
これは、Step Functionsのステートマシン定義のファイルで参照する変数名に、値を渡してあげるために必要となります。
念のため、この部分です。
StateMachine:
Type: AWS::Serverless::StateMachine
Properties:
DefinitionUri: step_functions/state_machine.json
DefinitionSubstitutions:
HelloWorldFunction: !GetAtt HelloWorldFunction.Arn
SqsQueue: !Ref SqsQueue
Role: !GetAtt StateMachineRole.Arn
ステートマシンの定義を追加する
続いては、ステートマシンの定義です。
Part.3では、最後に「hello world 2」を呼び出し、終了していましたが、終了せずにSQSに返却値をSendMessageで送信するように修正してみたいと思います。
{
"StartAt": "hello world 1",
"States": {
"hello world 1": {
"Type": "Task",
"Resource": "${HelloWorldFunction}",
"Parameters": {
"p1.$": $.p1,
"p2.$": "$.p2",
"p3": {
"p3-1": 20,
"p3-2": "xyz"
},
"all.$": "$"
},
"ResultPath": "$.hello_world_result",
"OutputPath": "$",
"Next": "check state"
},
"retry lambda": {
"Type": "Task",
"Resource": "${HelloWorldFunction}",
"InputPath": "$",
"ResultPath": "$.hello_world_result",
"OutputPath": "$",
"Next": "check state"
},
"check state": {
"Type" : "Choice",
"Choices": [
{
"Variable": "$.hello_world_result.body.message",
"StringEquals": "hello world",
"Next": "hello world 2"
},
{
"Variable": "$.hello_world_result.body.message",
"StringEquals": "retry",
"Next": "wait state"
}
],
"Default": "fail state"
},
"wait state": {
"Type": "Wait",
"Seconds": 5,
"Next": "retry lambda"
},
"hello world 2": {
"Type": "Task",
"Resource": "${HelloWorldFunction}",
"InputPath": "$",
"ResultPath": "$.hello_world_result",
"OutputPath": "$",
"Next": "send queue"
},
"send queue": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage",
"Parameters": {
"QueueUrl": "${SqsQueue}",
"MessageBody.$": "$"
},
"End": true
},
"fail state": {
"Type": "Fail",
"Cause": "No Matches!"
}
}
}
変更部分のみの抜粋は、以下の場所です。
"hello world 2": {
"Type": "Task",
"Resource": "${HelloWorldFunction}",
"InputPath": "$",
"ResultPath": "$.hello_world_result",
"OutputPath": "$",
"Next": "send queue"
},
"send queue": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage",
"Parameters": {
"QueueUrl": "${SqsQueue}",
"MessageBody.$": "$"
},
"End": true
},
hello world 2
では、ResultPath、OutputPathに戻り値を入れ直して、次のステートをsend queue
にします。
send queue
では、作成したSQSのキューURLに対して、直前の出力内容をメッセージとして送信し、終了するように定義しています。
ステートマシンを実行してみる
マネジメントコンソールから、ステートマシンを実行してみましょう。
Part.3からの修正のため、入力値のp1、p2を使用するため、入力蘭を以下のように指定してください。
- p1に、9999数値を指定
- p2に「p2 strings.」という文字列を指定
結果をみてみる
Step Functions
まずは、ビジュアルフローを確認してみましょう。
成功していることが確認できます。
もちろん「send queue」も実行されています。
SQS
続いて、SQSにメッセージが送信されているかを確認してみましょう。
SQSの管理画面からキューを選択します。
続いて「メッセージを送受信」ボタンを押します。
キューにメッセージが1件あるので「メッセージをポーリング」ボタンを押して取り出してみましょう。
IDをクリックしてメッセージの中身をみてみましょう。
本文タブを選択すると、メッセージの中身が確認できます。
Lambdaからの戻り値がSQSに送信されてきたことが確認できました!
まとめ
今回は、SQSをタスク実行してみましたが、他にも多数のAWSサービスガStep Functionsからタスク実行できます。
いずれも気をつけないといけないのは
- 実行ロール(Step Functions側、サービス側(必要に応じて))の設定
- ステートマシン定義ファイルへの、必要な情報の引き渡し(
DefinitionSubstitutions
)
です。それ以外は、サービスによってパラメータは異なりますが、考え方はそう大きく違いはありません。
Step Functionsをうまく活用することで、サーバーレスなサービスをワークフロー的に流すことがとても簡単にできます。
Step Functions自体は、まだまだ進化が続いているので、これからますます利用機会が増えるかもしれませんね。
サンプルコードリポジトリ