10
1

More than 1 year has passed since last update.

CloudWatch EventとLambdaでStepFunctions間のエラ-を伝搬する

Last updated at Posted at 2021-12-05

StepFunctions は、並行して実行するなどワ-クフロ-を構築・管理する際によく使用します。StepFunctions内のLambdaなどが実行失敗した際には、StepFunctionsが終了しますが、後続するStepFunctionsは実行されてしまいます。言い換えれば、StepFunctoins内のエラーは伝搬するが、StepFunctions間のエラーは伝搬しません。

今回は、StepFunctions間でエラーを伝搬させる構成を紹介します。

解決したい課題

StepFunctions間でエラーを伝播させる

StepFunction内だけではなく、StepFunctions間でエラーを伝播したい。
具体的には、1つのStepFunctionsが失敗すると、後続するStepFunctionsを実行させないようにします。

StepFunctionsのエラー伝播

ここから、具体例を用いて、StepFunctionsでのエラー伝搬について説明します。

StepFunctions内のエラー伝播

下図のように、複数のLambdaなどを順番に実行することができるので、バッチ処理などによく使われています。
image.png

また、一部のLambdaでエラ-で異常終了した場合には、エラ-が伝搬しますので、Stepfunction内の全Lambdaが実行されずに終了し、リカバリの範囲も少なく済ます。

image.png

このように、エラ-ハンドリングによるリトライを含め1つのLambdaでも失敗すると、StepFunctionsが終了しますので、StepFunctions内のエラ-伝搬ができていると言えます。

StepFunctions間のエラー伝播

バッチ処理などを目的とした場合に、依存関係などにより、複数のStepFunctionsを異なる時間に実行する構成を取る場合があります。下図では、CloudWatch Eventによって、1つ目のStepFunctions①は18時に起動し、2つ目のStepfunction②は19時に起動します。

image.png

リカバリが必要とする範囲を広げないために、StepFunctions①が失敗した場合に、StepFunctions②を実行しないような仕組みが必要となります。

この場合、AWSマネ-ジメントコンソ-ルからStepfunction②のCloudWatch Eventを無効化するなど手動オペレーションは非常に手間がかかり、現実的な解決法ではありません。

課題を解決する技術、手法

StepFunctions間のエラー伝搬を実現するために、CloudWatch EventとLambdaを使用します。
具体的には、下図の構成になります。

image.png

まず、StepFunctionsが失敗状態に変化すると、起動するCloudWatch Eventを準備します。
CloudWatch EventからLambdaを起動し、StepFunctionsの実行を無効化します。
最後に、全てのStepFunctionsの実行時刻が過ぎた後に、上記と同じ構成で、CloudWatch EventからLambdaを起動し、StepFunctionsの実行を有効化します。

CloudWatch EventとLambdaで、StepFunctionsを起動するCloudWatch Eventを制御することで、StepFunctions間のエラーを伝搬させます。

実装

ここから、TerraformとGoでの実装を下記の流れで説明します。

  • Stepfunctions①が失敗するイベントで起動する CloudWatch Event を作成
  • Stepfunctions①を起動させるCloudWatch Eventを有効化・無効化するLambda を作成

Terraformのソ-スコ-ド例を示しますが、Terraform の操作を省略します。

CloudWatch Event

StepFunctionsが失敗状態に変化すると、Lambdaを起動させるCloudWatch Eventを作成します。

まず、CloudWatch Eventが起動するルールを作成します。
event_pattern でStepFunctionsが失敗状態に変化するイベントを指定します。

resource "aws_cloudwatch_event_rule" "demo_for_disable_CWE" {
  name = "demo-for-disable-CWE"

  event_pattern = <<EOF
{
  "source": ["aws.states"],
  "detail-type": ["Step Functions Execution Status Change"],
  "detail": {
    "status": ["FAILED"],
    "stateMachineArn": [
      "arn:aws:states:${region}:${accountId}:stateMachine:${stepfunctionsName}"
    ]
  }
}
EOF

次に、CloudWatch Eventが起動するターゲットとして、Lambdaを指定します。
Lambdaは無効化と有効化のどちらも行うので、input で無効化なのか有効化なのかを教えます。

resource "aws_cloudwatch_event_target" "demo_for_disable_CWE" {
  rule = aws_cloudwatch_event_rule.demo_for_disable_CWE.name
  arn  = aws_lambda_function.demo_for_disable_enable_CWE.arn

  input = <<EOF
{
  "detail": "disable"
}
EOF
}

Lambda

では、Stepfunctionsを起動するCloudWatch Eventを有効化・無効化するLambdaを作成します。

まずは、Lambdaを定義します。
ここで、どのStepFunctionを起動させるCloudWatch Eventの状態(有効化・無効化)を教える必要がありますので、環境変数TARGETで指定します。

resource "aws_lambda_function" "demo_for_disable_enable_CWE" {
  filename      = "lambda_initial_script.zip"
  function_name = "demo-for-disable-enable-CWE"
  role          = aws_iam_role.demo_for_disable_enable_CWE.arn
  handler       = "lambda"
  runtime       = "go1.x"
  memory_size   = 512
  timeout       = 900

  environment {
    variables = {
      TARGET : aws_cloudwatch_event_rule.${cloudWatchEventInvokeStepfunctions}.name
    }
  }  
}

また、CloudWatch Eventの状態を変更するため、下記のポリシ-が必要となります
aws_iam_roleaws_iam_role_policy_attachmentも定義する必要がありますが、ここでは割愛します。

resource "aws_iam_role_policy" "demo_for_disable_enable_CWE" {
  name = "demo-for-disable-enable-CWE"
  role = aws_iam_role.demo_for_disable_enable_CWE.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = [
          "events:EnableRule",
          "events:DisableRule",
        ]
        Effect   = "Allow"
        Resource = "*"
      },
    ]
  })
}

最後に、Lambdaで実行するソースコードを準備します。

import (
    "context"
    "fmt"
    "os"
    "strings"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/cloudwatchevents"
)

var (
    target = os.Getenv("TARGET")

    sess, err = session.NewSession(&aws.Config{
        Region: aws.String("ap-northeast-1"),
    })
)

func Handle(ctx context.Context, event events.CloudWatchEvent) error {

    // CloudWatch Events Client を生成
    svc := cloudwatchevents.New(sess)

    // CloudWatch Eventから受け取ったメッセージを取得
    e := strings.Trim(string(event.Detail), "\"")

    switch e {
    // CloudWatch Event を無効化
    case "disable":
        if _, err := svc.DisableRule(&cloudwatchevents.DisableRuleInput{
            Name: aws.String(target),
        }); err != nil {
            return fmt.Errorf("disable CloudWatch Events error: %v", err)
        }

    // CloudWatch Event を有効化
    case "enable":
        if _, err := svc.EnableRule(&cloudwatchevents.EnableRuleInput{
            Name: aws.String(target),
        }); err != nil {
            return fmt.Errorf("enable CloudWatch Events error: %v", err)
        }

    default:
        return fmt.Errorf("CloudWatch Events input error: invalid input. Please input enable or disable")
    }

    return nil
}

最後に、これらのリソースをアプライ、デプロイすることで、StepFunctions間のエラー伝搬が実現します。

まとめ

今回は、CloudWatch EventとLambdaを使用して、StepFunctions間のエラー伝搬を実現しました。

10
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
1