StepFunctions は、並行して実行するなどワ-クフロ-を構築・管理する際によく使用します。StepFunctions内のLambdaなどが実行失敗した際には、StepFunctionsが終了しますが、後続するStepFunctionsは実行されてしまいます。言い換えれば、StepFunctoins内のエラーは伝搬するが、StepFunctions間のエラーは伝搬しません。
今回は、StepFunctions間でエラーを伝搬させる構成を紹介します。
解決したい課題
StepFunctions間でエラーを伝播させる
StepFunction内だけではなく、StepFunctions間でエラーを伝播したい。
具体的には、1つのStepFunctionsが失敗すると、後続するStepFunctionsを実行させないようにします。
StepFunctionsのエラー伝播
ここから、具体例を用いて、StepFunctionsでのエラー伝搬について説明します。
StepFunctions内のエラー伝播
下図のように、複数のLambdaなどを順番に実行することができるので、バッチ処理などによく使われています。
また、一部のLambdaでエラ-で異常終了した場合には、エラ-が伝搬しますので、Stepfunction内の全Lambdaが実行されずに終了し、リカバリの範囲も少なく済ます。
このように、エラ-ハンドリングによるリトライを含め1つのLambdaでも失敗すると、StepFunctionsが終了しますので、StepFunctions内のエラ-伝搬ができていると言えます。
StepFunctions間のエラー伝播
バッチ処理などを目的とした場合に、依存関係などにより、複数のStepFunctionsを異なる時間に実行する構成を取る場合があります。下図では、CloudWatch Eventによって、1つ目のStepFunctions①は18時に起動し、2つ目のStepfunction②は19時に起動します。
リカバリが必要とする範囲を広げないために、StepFunctions①が失敗した場合に、StepFunctions②を実行しないような仕組みが必要となります。
この場合、AWSマネ-ジメントコンソ-ルからStepfunction②のCloudWatch Eventを無効化するなど手動オペレーションは非常に手間がかかり、現実的な解決法ではありません。
課題を解決する技術、手法
StepFunctions間のエラー伝搬を実現するために、CloudWatch EventとLambdaを使用します。
具体的には、下図の構成になります。
まず、StepFunctionsが失敗状態に変化すると、起動するCloudWatch Eventを準備します。
CloudWatch EventからLambdaを起動し、StepFunctionsの実行を無効化します。
最後に、全てのStepFunctionsの実行時刻が過ぎた後に、上記と同じ構成で、CloudWatch EventからLambdaを起動し、StepFunctionsの実行を有効化します。
CloudWatch EventとLambdaで、StepFunctionsを起動するCloudWatch Eventを制御することで、StepFunctions間のエラーを伝搬させます。
実装
ここから、TerraformとGoでの実装を下記の流れで説明します。
- Stepfunctions①が失敗するイベントで起動する CloudWatch Event を作成
- Stepfunctions①を起動させるCloudWatch Eventを有効化・無効化するLambda を作成
Terraformのソ-スコ-ド例を示しますが、Terraform の操作を省略します。
CloudWatch Event
StepFunctionsが失敗状態に変化すると、Lambdaを起動させるCloudWatch Eventを作成します。
まず、CloudWatch Eventが起動するルールを作成します。
event_pattern
でStepFunctionsが失敗状態に変化するイベントを指定します。
resource "aws_cloudwatch_event_rule" "demo_for_disable_CWE" {
name = "demo-for-disable-CWE"
event_pattern = <<EOF
{
"source": ["aws.states"],
"detail-type": ["Step Functions Execution Status Change"],
"detail": {
"status": ["FAILED"],
"stateMachineArn": [
"arn:aws:states:${region}:${accountId}:stateMachine:${stepfunctionsName}"
]
}
}
EOF
次に、CloudWatch Eventが起動するターゲットとして、Lambdaを指定します。
Lambdaは無効化と有効化のどちらも行うので、input
で無効化なのか有効化なのかを教えます。
resource "aws_cloudwatch_event_target" "demo_for_disable_CWE" {
rule = aws_cloudwatch_event_rule.demo_for_disable_CWE.name
arn = aws_lambda_function.demo_for_disable_enable_CWE.arn
input = <<EOF
{
"detail": "disable"
}
EOF
}
Lambda
では、Stepfunctionsを起動するCloudWatch Eventを有効化・無効化するLambdaを作成します。
まずは、Lambdaを定義します。
ここで、どのStepFunctionを起動させるCloudWatch Eventの状態(有効化・無効化)を教える必要がありますので、環境変数TARGET
で指定します。
resource "aws_lambda_function" "demo_for_disable_enable_CWE" {
filename = "lambda_initial_script.zip"
function_name = "demo-for-disable-enable-CWE"
role = aws_iam_role.demo_for_disable_enable_CWE.arn
handler = "lambda"
runtime = "go1.x"
memory_size = 512
timeout = 900
environment {
variables = {
TARGET : aws_cloudwatch_event_rule.${cloudWatchEventInvokeStepfunctions}.name
}
}
}
また、CloudWatch Eventの状態を変更するため、下記のポリシ-が必要となります
aws_iam_role
とaws_iam_role_policy_attachment
も定義する必要がありますが、ここでは割愛します。
resource "aws_iam_role_policy" "demo_for_disable_enable_CWE" {
name = "demo-for-disable-enable-CWE"
role = aws_iam_role.demo_for_disable_enable_CWE.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = [
"events:EnableRule",
"events:DisableRule",
]
Effect = "Allow"
Resource = "*"
},
]
})
}
最後に、Lambdaで実行するソースコードを準備します。
import (
"context"
"fmt"
"os"
"strings"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchevents"
)
var (
target = os.Getenv("TARGET")
sess, err = session.NewSession(&aws.Config{
Region: aws.String("ap-northeast-1"),
})
)
func Handle(ctx context.Context, event events.CloudWatchEvent) error {
// CloudWatch Events Client を生成
svc := cloudwatchevents.New(sess)
// CloudWatch Eventから受け取ったメッセージを取得
e := strings.Trim(string(event.Detail), "\"")
switch e {
// CloudWatch Event を無効化
case "disable":
if _, err := svc.DisableRule(&cloudwatchevents.DisableRuleInput{
Name: aws.String(target),
}); err != nil {
return fmt.Errorf("disable CloudWatch Events error: %v", err)
}
// CloudWatch Event を有効化
case "enable":
if _, err := svc.EnableRule(&cloudwatchevents.EnableRuleInput{
Name: aws.String(target),
}); err != nil {
return fmt.Errorf("enable CloudWatch Events error: %v", err)
}
default:
return fmt.Errorf("CloudWatch Events input error: invalid input. Please input enable or disable")
}
return nil
}
最後に、これらのリソースをアプライ、デプロイすることで、StepFunctions間のエラー伝搬が実現します。
まとめ
今回は、CloudWatch EventとLambdaを使用して、StepFunctions間のエラー伝搬を実現しました。