目的
この記事では、AWS Step Functionsを使用して長期的な状態監視を実現する方法を、実装例と共に紹介します。具体的には、Step Functionsの並列処理や再帰処理を活用し、機器の稼働・停止を監視し続けるシステムを構築します。
技術・サービス
AWS
Step Functions
DynamoDB
Lambda
IAM
構成
Step Functions ステートマシン
サンプルコード(ステートマシン)
{
"Comment": "永続的に稼働し続けるステートマシンデモ",
"StartAt": "入力パラメータ設定",
"States": {
"入力パラメータ設定": {
"Type": "Pass",
"Next": "Map",
"Parameters": {
"items": [
{
"task_token_table_name.$": "$.task_token_table_name",
"start_lambda_arn.$": "$.start_lambda_arn",
"stop_lambda_arn.$": "$.stop_lambda_arn",
"state_machine_arn.$": "$.state_machine_arn",
"timeout_seconds.$": "$.timeout_seconds"
}
]
}
},
"Map": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "条件をチェックする処理 (Parallel)",
"States": {
"条件をチェックする処理 (Parallel)": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "タスクトークン登録(コールバック)",
"States": {
"タスクトークン登録(コールバック)": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:putItem.waitForTaskToken",
"Parameters": {
"TableName.$": "$.task_token_table_name",
"Item": {
"token_id": {
"N": "1"
},
"task_token": {
"S.$": "$$.Task.Token"
}
}
},
"Next": "機器稼働1",
"Comment": "機器稼働1の実行待ち",
"ResultPath": "$.callback"
},
"機器稼働1": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:lambda:invoke",
"Parameters": {
"FunctionName.$": "States.Format('{}:$LATEST', $.start_lambda_arn)"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"Next": "タスクトークン登録(コールバック) (1)",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Comment": "何らかのエラー",
"Next": "中断 (Fail)",
"ResultPath": "$.lambda_error"
}
],
"ResultPath": "$.Payload"
},
"タスクトークン登録(コールバック) (1)": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:putItem.waitForTaskToken",
"Parameters": {
"TableName.$": "$.task_token_table_name",
"Item": {
"token_id": {
"N": "2"
},
"task_token": {
"S.$": "$$.Task.Token"
}
}
},
"Next": "機器停止1",
"Comment": "機器停止1の実行待ち",
"ResultPath": "$.callback"
},
"機器停止1": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:lambda:invoke",
"Parameters": {
"FunctionName.$": "States.Format('{}:$LATEST', $.stop_lambda_arn)"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"Next": "タスクトークン登録(コールバック)",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Comment": "何らかのエラー",
"Next": "中断 (Fail)",
"ResultPath": "$.lambda_error"
}
],
"ResultPath": "$.Payload"
},
"中断 (Fail)": {
"Type": "Fail",
"Cause": "機器1でエラーが発生しました",
"Error": "400"
}
}
},
{
"StartAt": "タスクトークン登録(コールバック) (2)",
"States": {
"タスクトークン登録(コールバック) (2)": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:putItem.waitForTaskToken",
"Parameters": {
"TableName.$": "$.task_token_table_name",
"Item": {
"token_id": {
"N": "3"
},
"task_token": {
"S.$": "$$.Task.Token"
}
}
},
"Next": "機器稼働2",
"Comment": "機器稼働2の実行待ち",
"ResultPath": "$.callback"
},
"機器稼働2": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:lambda:invoke",
"Parameters": {
"FunctionName.$": "States.Format('{}:$LATEST', $.start_lambda_arn)"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"Next": "タスクトークン登録(コールバック) (3)",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Comment": "何らかのエラー",
"Next": "中断2 (Fail)",
"ResultPath": "$.lambda_error"
}
],
"ResultPath": "$.Payload"
},
"タスクトークン登録(コールバック) (3)": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:putItem.waitForTaskToken",
"Parameters": {
"TableName.$": "$.task_token_table_name",
"Item": {
"token_id": {
"N": "4"
},
"task_token": {
"S.$": "$$.Task.Token"
}
}
},
"Next": "機器停止2",
"Comment": "機器停止2の実行待ち",
"ResultPath": "$.callback"
},
"機器停止2": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:lambda:invoke",
"Parameters": {
"FunctionName.$": "States.Format('{}:$LATEST', $.stop_lambda_arn)"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"Next": "タスクトークン登録(コールバック) (2)",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Comment": "何らかのエラー",
"Next": "中断2 (Fail)",
"ResultPath": "$.lambda_error"
}
],
"ResultPath": "$.Payload"
},
"中断2 (Fail)": {
"Type": "Fail",
"Error": "400",
"Cause": "機器2でエラーが発生しました"
}
}
},
{
"StartAt": "再帰処理タイマー",
"States": {
"再帰処理タイマー": {
"Type": "Wait",
"Next": "再帰処理",
"SecondsPath": "$.timeout_seconds"
},
"再帰処理": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:sfn:startExecution",
"Parameters": {
"StateMachineArn.$": "$.state_machine_arn",
"Input": {
"task_token_table_name.$": "$.task_token_table_name",
"start_lambda_arn.$": "$.start_lambda_arn",
"stop_lambda_arn.$": "$.stop_lambda_arn",
"state_machine_arn.$": "$.state_machine_arn",
"timeout_seconds.$": "$.timeout_seconds"
}
},
"Next": "中断3 (Fail)"
},
"中断3 (Fail)": {
"Type": "Fail",
"Error": "Recursived",
"Cause": "タイムアウトにより中断"
}
}
}
],
"ResultPath": "$.parallel_results",
"Next": "中断を正常として扱う",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "中断を正常として扱う",
"ResultPath": "$"
}
]
},
"中断を正常として扱う": {
"Type": "Succeed"
}
}
},
"Next": "パラメータチェック",
"MaxConcurrency": 1,
"ResultPath": "$.map_results",
"ItemsPath": "$.items"
},
"パラメータチェック": {
"Type": "Pass",
"Next": "Choice",
"Parameters": {
"Error.$": "$.map_results[0].Error"
}
},
"Choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Error",
"StringEquals": "Recursived",
"Next": "Success",
"Comment": "再帰処理の場合"
},
{
"Variable": "$.Error",
"StringEquals": "400",
"Next": "Fail"
}
],
"Default": "Fail"
},
"Success": {
"Type": "Succeed",
"Comment": "成功で終了"
},
"Fail": {
"Type": "Fail"
}
}
}
DynamoDB テーブル
コールバックを返すためのタスクトークンを保存します。
タスクトークンは実行ごとに異なる値を使用します。
test_task_token_table
id | task_token |
---|---|
1 | {タスクトークン} |
2 | {タスクトークン} |
3 | {タスクトークン} |
4 | {タスクトークン} |
Lambda 関数
テスト用関数。
入力パラメータにタスクトークンを渡すことでコールバックを返すことができます。
import os
import boto3
SFN = boto3.client('stepfunctions')
def lambda_handler(event, context):
# 出力されたタスクトークンを取得する
task_token = event.get("task_token", None)
if task_token is None:
return {
'statusCode': 500,
'body': 'No task_token'
}
# ステートマシンにコールバックを返す
output = {
"message": "Callback task completed successfully.",
"result": True,
}
params = {"output": json.dumps(output), "taskToken": TASK_TOKEN}
SFN.send_task_success(**params)
# レスポンスを作成して返す
return {
'statusCode': 200,
'body': 'Data retrieved successfully'
}
メイン処理関数。
機器の稼働、停止処理を実行する。
例)test_start_lambad.py
import json
import logging
# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def start(event):
try:
# IDの取得とバリデーション
id = event.get("id", None)
if id is None:
raise ValueError("idが指定されていません。")
if not isinstance(id, int):
raise TypeError("idは整数である必要があります。")
# 機器稼働処理...
logger.info(f"機器 {id} を稼働します。")
return 200, True, f"機器 {id} が稼働しました。"
except ValueError as ve:
logger.error(f"バリデーションエラー: {ve}")
return 400, False, str(ve)
except TypeError as te:
logger.error(f"型エラー: {te}")
return 400, False, str(te)
except Exception as e:
logger.error(f"想定外のエラー: {e}")
return 500, False, "機器稼働処理中に予期せぬエラーが発生しました。"
def lambda_handler(event, context):
body = {}
try:
# 開始処理
logger.info(f"イベント受信: {event}")
statusCode, result, message = start(event)
# レスポンスボディ作成
body["message"] = message
body["result"] = result
response = {
'statusCode': statusCode,
'body': json.dumps(body)
}
logger.info(f"レスポンス: {response}")
return response
except Exception as err:
# トレース処理 (詳細なエラー情報を記録)
logger.error(f"予期しないエラー: {err}", exc_info=True)
body["message"] = "不明なエラーが発生しました。"
body["result"] = False
return {
'statusCode': 500,
'body': json.dumps(body)
}
解説
ステートマシンの最大起動時間は一年間です。
それ以降も継続して起動させ続けるには後続の実行ステートマシンを立てる必要があります。
そのため、タイムアウト時刻を半年後に設定し、後続の実行ステートマシンを立ててから正常終了をすることで処理を継続させています。
1. 入力パラメータ設定
Pass
ステートで初期パラメータを設定しています。
この例では、テーブル名、LambdaのARN、タイムアウト値を入力パラメータから取得します。
※AWSアカウントIDなどマスキングしている箇所があります。
{
"task_token_table_name": "test_task_token_table",
"start_lambda_arn": "arn:aws:lambda:{リージョン}:{アカウントID}:function:test_start_lambda",
"stop_lambda_arn": "arn:aws:lambda:{リージョン}:{アカウントID}:function:test_stop_lambda",
"state_machine_arn": "arn:aws:states:{リージョン}:{アカウントID}:stateMachine:test_recursive_conditional_parallel_processing",
"timeout_seconds": 15768000
}
2. Mapステート
パラレル内のFailによって処理を中断させないためのMap。同時実行数は1としています。
Mapステートは、並列処理のどちらかで中断した場合にそのエラーをキャッチし、正常なフローとして処理を続ける役割を持っています。これにより、次のパラメータチェックに進むことが可能になります。
参考:Step Functions ステートマシンで並列処理を実装する
3. 並列処理
Parallelステートで3つの処理を並列に実行しています。
中断された処理がそのままで良いか、後処理が必要かなどは考慮が必要です。
3-1. 状態監視 : コールバック待機
タスクトークンをDynamoDBテーブルに登録し、コールバックを待機します。
別でコールバックを返すLambdaを用意し、フローを進める必要があります。
3-2. 状態監視 : 稼働処理・停止処理
機器を稼働・停止する処理を実行する。
処理が完了したら次フローに進み、コールバックを待機する。
いずれかの処理でエラーが発生した場合、Failステートを使用して並列処理を明示的に中断させる。
3-3. 再帰処理
タイムアウト時刻に達した場合、再帰的にステートマシンを実行する。
ステートマシンの起動に成功した場合、Failステートを使用して並列処理を明示的に中断させる。
4. パラメータチェック
並列処理から渡された結果を検証する。
中断のErrorを取得できるかどうかを検証しています。
5. Choice
Errorの内容をもとにステートマシンの実行結果を判定します。
再帰処理による中断の場合は、成功で終了します。
それ以外のエラーによる中断の場合は、失敗で終了します。
実行結果
実行するとコールバック待機とタイムアウト待機が開始されます。
正常に稼働している状態であれば、コールバックを待機し続ける状態になります。
コールバックが返されるとフローが進み、以下画像のように処理フローをループし続けます。
Lambdaの処理でエラーが発生すると、エラーをキャッチして中断ステートへ遷移します。
ステートマシンは失敗で終了します。
タイムアウトに達すると、再帰処理として新たに実行ステートマシンが起動されます。
ステートマシンは成功で終了されます。
まとめ
本記事では、AWS Step Functionsを以下の手法を用いて永続的な状態監視を実現しました。
- 並列処理: 状態監視、稼働・停止処理、再帰処理を実行
- Mapステート: エラーを正常処理に統合
- タスクトークン: コールバック処理の実装
- 再帰処理: 長期間の実行を可能にする仕組み
AWS Step Functionsを利用することで、状態監視や処理フローの管理が容易になり、信頼性の高いシステムを構築できます。
※ この記事の例をカスタマイズする際は、特定のAWSサービスや制約(料金、リソースの上限など)を考慮してください。
あとがき
一年以上の長期稼働なんて推奨はされていないとは思いますが、少しでも参考になれば幸いです。