search
LoginSignup
1

More than 1 year has passed since last update.

posted at

updated at

Organization

CloudWatch EventとLambdaでStepFunctions間のエラ-を伝搬する

StepFunctions は、並行して実行するなどワ-クフロ-を構築・管理する際によく使用します。StepFunctions内のLambdaなどが実行失敗した際には、StepFunctionsが終了しますが、後続するStepFunctionsは実行されてしまいます。言い換えれば、StepFunctoins内のエラーは伝搬するが、StepFunctions間のエラーは伝搬しません。

今回は、StepFunctions間でエラーを伝搬させる構成を紹介します。

解決したい課題

StepFunctions間でエラーを伝播させる

StepFunction内だけではなく、StepFunctions間でエラーを伝播したい。
具体的には、1つのStepFunctionsが失敗すると、後続するStepFunctionsを実行させないようにします。

StepFunctionsのエラー伝播

ここから、具体例を用いて、StepFunctionsでのエラー伝搬について説明します。

StepFunctions内のエラー伝播

下図のように、複数のLambdaなどを順番に実行することができるので、バッチ処理などによく使われています。
image.png

また、一部のLambdaでエラ-で異常終了した場合には、エラ-が伝搬しますので、Stepfunction内の全Lambdaが実行されずに終了し、リカバリの範囲も少なく済ます。

image.png

このように、エラ-ハンドリングによるリトライを含め1つのLambdaでも失敗すると、StepFunctionsが終了しますので、StepFunctions内のエラ-伝搬ができていると言えます。

StepFunctions間のエラー伝播

バッチ処理などを目的とした場合に、依存関係などにより、複数のStepFunctionsを異なる時間に実行する構成を取る場合があります。下図では、CloudWatch Eventによって、1つ目のStepFunctions①は18時に起動し、2つ目のStepfunction②は19時に起動します。

image.png

リカバリが必要とする範囲を広げないために、StepFunctions①が失敗した場合に、StepFunctions②を実行しないような仕組みが必要となります。

この場合、AWSマネ-ジメントコンソ-ルからStepfunction②のCloudWatch Eventを無効化するなど手動オペレーションは非常に手間がかかり、現実的な解決法ではありません。

課題を解決する技術、手法

StepFunctions間のエラー伝搬を実現するために、CloudWatch EventとLambdaを使用します。
具体的には、下図の構成になります。

image.png

まず、StepFunctionsが失敗状態に変化すると、起動するCloudWatch Eventを準備します。
CloudWatch EventからLambdaを起動し、StepFunctionsの実行を無効化します。
最後に、全てのStepFunctionsの実行時刻が過ぎた後に、上記と同じ構成で、CloudWatch EventからLambdaを起動し、StepFunctionsの実行を有効化します。

CloudWatch EventとLambdaで、StepFunctionsを起動するCloudWatch Eventを制御することで、StepFunctions間のエラーを伝搬させます。

実装

ここから、TerraformとGoでの実装を下記の流れで説明します。

  • Stepfunctions①が失敗するイベントで起動する CloudWatch Event を作成
  • Stepfunctions①を起動させるCloudWatch Eventを有効化・無効化するLambda を作成

Terraformのソ-スコ-ド例を示しますが、Terraform の操作を省略します。

CloudWatch Event

StepFunctionsが失敗状態に変化すると、Lambdaを起動させるCloudWatch Eventを作成します。

まず、CloudWatch Eventが起動するルールを作成します。
event_pattern でStepFunctionsが失敗状態に変化するイベントを指定します。

resource "aws_cloudwatch_event_rule" "demo_for_disable_CWE" {
  name = "demo-for-disable-CWE"

  event_pattern = <<EOF
{
  "source": ["aws.states"],
  "detail-type": ["Step Functions Execution Status Change"],
  "detail": {
    "status": ["FAILED"],
    "stateMachineArn": [
      "arn:aws:states:${region}:${accountId}:stateMachine:${stepfunctionsName}"
    ]
  }
}
EOF

次に、CloudWatch Eventが起動するターゲットとして、Lambdaを指定します。
Lambdaは無効化と有効化のどちらも行うので、input で無効化なのか有効化なのかを教えます。

resource "aws_cloudwatch_event_target" "demo_for_disable_CWE" {
  rule = aws_cloudwatch_event_rule.demo_for_disable_CWE.name
  arn  = aws_lambda_function.demo_for_disable_enable_CWE.arn

  input = <<EOF
{
  "detail": "disable"
}
EOF
}

Lambda

では、Stepfunctionsを起動するCloudWatch Eventを有効化・無効化するLambdaを作成します。

まずは、Lambdaを定義します。
ここで、どのStepFunctionを起動させるCloudWatch Eventの状態(有効化・無効化)を教える必要がありますので、環境変数TARGETで指定します。

resource "aws_lambda_function" "demo_for_disable_enable_CWE" {
  filename      = "lambda_initial_script.zip"
  function_name = "demo-for-disable-enable-CWE"
  role          = aws_iam_role.demo_for_disable_enable_CWE.arn
  handler       = "lambda"
  runtime       = "go1.x"
  memory_size   = 512
  timeout       = 900

  environment {
    variables = {
      TARGET : aws_cloudwatch_event_rule.${cloudWatchEventInvokeStepfunctions}.name
    }
  }  
}

また、CloudWatch Eventの状態を変更するため、下記のポリシ-が必要となります
aws_iam_roleaws_iam_role_policy_attachmentも定義する必要がありますが、ここでは割愛します。

resource "aws_iam_role_policy" "demo_for_disable_enable_CWE" {
  name = "demo-for-disable-enable-CWE"
  role = aws_iam_role.demo_for_disable_enable_CWE.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = [
          "events:EnableRule",
          "events:DisableRule",
        ]
        Effect   = "Allow"
        Resource = "*"
      },
    ]
  })
}

最後に、Lambdaで実行するソースコードを準備します。

import (
    "context"
    "fmt"
    "os"
    "strings"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/cloudwatchevents"
)

var (
    target = os.Getenv("TARGET")

    sess, err = session.NewSession(&aws.Config{
        Region: aws.String("ap-northeast-1"),
    })
)

func Handle(ctx context.Context, event events.CloudWatchEvent) error {

    // CloudWatch Events Client を生成
    svc := cloudwatchevents.New(sess)

    // CloudWatch Eventから受け取ったメッセージを取得
    e := strings.Trim(string(event.Detail), "\"")

    switch e {
    // CloudWatch Event を無効化
    case "disable":
        if _, err := svc.DisableRule(&cloudwatchevents.DisableRuleInput{
            Name: aws.String(target),
        }); err != nil {
            return fmt.Errorf("disable CloudWatch Events error: %v", err)
        }

    // CloudWatch Event を有効化
    case "enable":
        if _, err := svc.EnableRule(&cloudwatchevents.EnableRuleInput{
            Name: aws.String(target),
        }); err != nil {
            return fmt.Errorf("enable CloudWatch Events error: %v", err)
        }

    default:
        return fmt.Errorf("CloudWatch Events input error: invalid input. Please input enable or disable")
    }

    return nil
}

最後に、これらのリソースをアプライ、デプロイすることで、StepFunctions間のエラー伝搬が実現します。

まとめ

今回は、CloudWatch EventとLambdaを使用して、StepFunctions間のエラー伝搬を実現しました。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
1