AWS Step Functions#
ワークフローを構築することができるサービスです。
ワークフローを構築することで、各ステップの機能を小さくてシンプルなものにすることができます。
また、AWS Step Functionsは分岐や並列実行といった基本的なロジックの実装を標準機能として提供するため、開発負荷が軽減されます。
たとえば、Lambdaですが、Lambda関数から別なLambda関数を呼ぶ場合にはプログラムコードで次に呼び出すLambda関数名を設定したり、何らかのアクションを間に挟む必要があり、プログラムのカスタマイズが必要になります。AWS Step Functionsを利用すれば、その必要がなくなります。
ステートマシンとその構成#
Step Functionsで作成するワークフローはステートマシンと呼ばれます。
Amazonmステートメント言語と呼ばれるJSON形式のファイルを記述することで定義します。
ステートマシンの各要素はステートと呼ばれています。ステートのタイプには以下のようなものがあります。
状態
注意点#
-
タイムアウトの設定
ステートマシンのタスクにはデフォルトではタイムアウトが設定されておらず、タスクでエラーが発生した場合、ステートマシン上の次のステートは、タスクの完了を待ち続けます。そのため、タスク定義にTimeoutSecondsという項目を追加し、タイムアウトを設定することで実行のスタックを回避します、 -
ステート間のデータの受け渡し
入出力としてステート間で受け渡すデータのサイズには上限があり、データのサイズが32KBを越えると、ステートマシンの実行は失敗します。
32KB以上のデータの受け渡しを行う場合は、S3にオブジェクトとしてデータを保存し、ステート間では保存したオブジェクトのARNを受け渡します。 -
Lambdaサービス例外の処理
ステートマシンに含まれるLambda関数の実行が失敗すると、その時点でステートマシン全体の実行が終了されてしまいます。
Lambda関数を含むステートマシンでは、Lambda例外のリトライ処理または例外のキャッチ処理を実装します。
Demo1#
Step Functions を使用し、複数の AWS Lambda 関数を連携させたサーバーレスワークフローを設計し実行します。
1.IAMロールを作成する##
AWS サービス:Step Functions
ロール:AWSLambdaRole
2.AWS Lambda 関数を作成する##
Node.js8.10で5つの関数を作成します。
IAMロールはすべて、「lambda_basic_execution」を指定。
exports.handler = (event, context, callback) => {
// Create a support case using the input as the case ID, then return a confirmation message
var myCaseID = event.inputCaseID;
var myMessage = "Case " + myCaseID + ": opened...";
var result = {Case: myCaseID, Message: myMessage};
callback(null, result);
};
exports.handler = (event, context, callback) => {
// Assign the support case and update the status message
var myCaseID = event.Case;
var myMessage = event.Message + "assigned...";
var result = {Case: myCaseID, Message: myMessage};
callback(null, result);
};
exports.handler = (event, context, callback) => {
// Generate a random number to determine whether the support case has been resolved, then return that value along with the updated message.
var min = 0;
var max = 1;
var myCaseStatus = Math.floor(Math.random() * (max - min + 1)) + min;
var myCaseID = event.Case;
var myMessage = event.Message;
if (myCaseStatus == 1) {
// Support case has been resolved
myMessage = myMessage + "resolved...";
} else if (myCaseStatus == 0) {
// Support case is still open
myMessage = myMessage + "unresolved...";
}
var result = {Case: myCaseID, Status : myCaseStatus, Message: myMessage};
callback(null, result);
};
exports.handler = (event, context, callback) => {
// Close the support case
var myCaseStatus = event.Status;
var myCaseID = event.Case;
var myMessage = event.Message + "closed.";
var result = {Case: myCaseID, Status : myCaseStatus, Message: myMessage};
callback(null, result);
};
exports.handler = (event, context, callback) => {
// Escalate the support case
var myCaseID = event.Case;
var myCaseStatus = event.Status;
var myMessage = event.Message + "escalating.";
var result = {Case: myCaseID, Status : myCaseStatus, Message: myMessage};
callback(null, result);
};
3.サーバーレスワークフローとステートマシンを作成する##
「コードスニペットで作成」で以下のとおり作成し、Resourceは先程作成したLambdaのarnを入れる。
{
"Comment": "A simple AWS Step Functions state machine that automates a call center support session.",
"StartAt": "Open Case",
"States": {
"Open Case": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME",
"Next": "Assign Case"
},
"Assign Case": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME",
"Next": "Work on Case"
},
"Work on Case": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME",
"Next": "Is Case Resolved"
},
"Is Case Resolved": {
"Type" : "Choice",
"Choices": [
{
"Variable": "$.Status",
"NumericEquals": 1,
"Next": "Close Case"
},
{
"Variable": "$.Status",
"NumericEquals": 0,
"Next": "Escalate Case"
}
]
},
"Close Case": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME",
"End": true
},
"Escalate Case": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME",
"Next": "Fail"
},
"Fail": {
"Type": "Fail",
"Cause": "Engage Tier 2 Support." }
}
}
# Task ("Type": "Task") は、ステートマシンによって実行される単一の作業単位を表します。
# Resource (必須)接続されたリソースの API アクションに情報を渡すのに使用します。
# Resource フィールドに指定された Lambda 関数が完了すると、その出力が Next フィールド (「NextState」) で指定された状態に送信されます。
「IAMロールARNを指定する」で「1.IAMロールを作成する」で作成したものを入力。
4.ワークフローを実行する##
以下の内容を入力し「実行の開始」ここではケース002を実行しています。
{
"inputCaseID": "002"
}
実行結果が現れます。それぞれクリックしていき[入力] と [出力]も確認して見ましょう。
ステートマシンに入力したケース002が各ステップから次のステップに受け渡されていること、Lambda 関数が動作を完了するたびにメッセージが更新されていることが確認できると思います。
Demo2#
複数の Lambda 関数を連携させて、AWS Step Functions を使用してワークフローのランタイムエラーに対処できるようにします。
1.Lambd関数の作成####
Python 3.6で以下の通り作成します。
IAMロールは「lambda_basic_execution」を指定します。
class TooManyRequestsException(Exception): pass
class ServerUnavailableException(Exception): pass
class UnknownException(Exception): pass
def lambda_handler(event, context):
statuscode = event["statuscode"]
if statuscode == "429":
raise TooManyRequestsException('429 Too Many Requests')
elif statuscode == "503":
raise ServerUnavailableException('503 Server Unavailable')
elif statuscode == "200":
return '200 OK'
else:
raise UnknownException('Unknown error')
2.IAMロールを作成する##
AWS サービス:Step Functions
ロール:AWSLambdaRole
3.API を呼び出して例外を処理する Step Functions のステートマシンを作成する##
Resourceは、「#1.Lambd関数の作成」で作成したarnに置き換えます。
IAMロールは、「2.IAMロールを作成する」で作成したものを指定します。
{
"Comment": "An example of using retry and catch to handle API responses",
"StartAt": "Call API",
"States": {
"Call API": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME",
"Next" : "OK",
"Comment": "Catch a 429 (Too many requests) API exception, and resubmit the failed request in a rate-limiting fashion.",
"Retry" : [ {
"ErrorEquals": [ "TooManyRequestsException" ],
"IntervalSeconds": 1,
"MaxAttempts": 2
} ],
"Catch": [
{
"ErrorEquals": ["TooManyRequestsException"],
"Next": "Wait and Try Later"
}, {
"ErrorEquals": ["ServerUnavailableException"],
"Next": "Server Unavailable"
}, {
"ErrorEquals": ["States.ALL"],
"Next": "Catch All"
}
]
},
"Wait and Try Later": {
"Type": "Wait",
"Seconds" : 1,
"Next" : "Change to 200"
},
"Server Unavailable": {
"Type": "Fail",
"Error":"ServerUnavailable",
"Cause": "The server is currently unable to handle the request."
},
"Catch All": {
"Type": "Fail",
"Cause": "Unknown error!",
"Error": "An error of unknown type occurred"
},
"Change to 200": {
"Type": "Pass",
"Result": {"statuscode" :"200"} ,
"Next": "Call API"
},
"OK": {
"Type": "Pass",
"Result": "The request has succeeded.",
"End": true
}
}
}
4.テストする##
ステータスコード200でテストする。###
{
"statuscode": "200"
}
"The request has succeeded."とPassであることがわかります。
ステータスコード503でテストする。###
{
"statuscode": "503"
}
"The server is currently unable to handle the request."とFailであることがわかります。
ステータスコード429でテストする。###
{
"statuscode": "429"
}
Wait and Try Later → Change to 200 になり完了していることがわかります。
ステータスコード999でテストする。###
{
"statuscode": "999"
}
"An error of unknown type occurred"でFailであることがわかります。
Demo3#
Step Functions を使用して CloudWatch イベントに応答するサーバーレスワークフローをスケジューリングします。
1.IAMロールを作成する##
AWS サービス:Step Functions
ロール:AWSLambdaRole
2.AWS Step Functions でステートマシンを作成する##
[テンプレート] の [Hello world] を利用し、コードは以下の通り置き換える。フロー図を更新すると書き換わるのがわかります。
{
"Comment": "A Hello World example of the Amazon States Language using a Pass state",
"StartAt": "Do Something",
"States": {
"Do Something": {
"Type": "Pass",
"Result": "Work complete!",
"End": true
}
}
}
## Pass 状態 ("Type": "Pass") は、何も作業せずに入力を出力に渡します。
## Result:次の状態に渡される仮想タスクの出力として扱われ、ResultPath フィールド (ある場合) に従ってフィルタリングされます。
[IAM ロール] は「1.IAMロールを作成する。」で作成したものを選択する。
3.Amazon CloudWatch Events のルールを作成する##
ステートマシンが 1 分間隔で実行されるようにCloudWatch Eventsで以下の通りスケジュール設定する。
イベントソース:1分
ターゲット:Step Functions ステートマシン「2.AWS Step Functions でステートマシンを作成する」で指定したものを選択。
4.ステートマシンの入出力の処理方法をカスタマイズする##
状態の入力に結果を追加するには、行 7 の後に以下の新しい行を追加します。
"ResultPath": "$.taskresult",
これによって、ステートマシンの実行を呼び出した CloudWatch イベントの詳細に、Pass 状態の出力 (この場合は「Work complete!」) が追加されます。
{
"Comment": "A Hello World example of the Amazon States Language using a Pass state",
"StartAt": "Do Something",
"States": {
"Do Something": {
"Type": "Pass",
"Result": "Work complete!",
"ResultPath": "$.taskresult",
"End": true
}
}
}
## ResultPath:Result で指定された仮想タスクの「出力」を配置する場所 (入力内) を指定します。その後、入力は OutputPath フィールド (ある場合) に従ってフィルタリングされてから状態の出力に使用されます。
5.ワークフローが意図したとおりに動作しているか確認する##
CloudWatch Eventsで設定したとおり1分置きにステートマシンが実行されていることがわかります。
また、出力後、"taskresult": "Work complete!" が追加されていることがわかります。
{
"version": "0",
"id": "776e5f6e-594b-a577-fded-8185979e7562",
"detail-type": "Scheduled Event",
"source": "aws.events",
"account": "",
"time": "2019-08-05T07:44:08Z",
"region": "ap-northeast-1",
"resources": [
"arn:aws:events:ap-northeast-1::rule/execute_state_machine"
],
"detail": {}
}
{
"version": "0",
"id": "776e5f6e-594b-a577-fded-8185979e7562",
"detail-type": "Scheduled Event",
"source": "aws.events",
"account": "",
"time": "2019-08-05T07:44:08Z",
"region": "ap-northeast-1",
"resources": [
"arn:aws:events:ap-northeast-1::rule/execute_state_machine"
],
"detail": {},
"taskresult": "Work complete!"
}