1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Step Functions Parallelのエラーハンドリング

Last updated at Posted at 2026-01-26

はじめに

AWS Step FunctionsのParallelステートを使って、複数のLambda関数を並列に実行する仕組みを実装しました。

その中で学んだ、並列実行時のエラーハンドリングをAWS CDK(TypeScript)で実装する例を、簡単な例を用いて紹介します。

エラーハンドリングについて実現したかったこと

Parallelステートでは、どれか1つのブランチがFailすると、
Parallel 全体がFail扱いとなり、他のブランチは
「キャンセル済み(Canceled)」の状態になります。

そのため、すべてのブランチの処理結果を確認する前に
ステートマシンの実行が失敗として終了してしまいます。

image.png

しかし今回は、どれか1つのLambda関数が失敗しても、ほかのLambda関数の処理を進め、最後に1つでも失敗があればステートマシン全体を失敗にしたいという要件がありました。

こちらを実現するために行った実装の実装例を紹介します。

処理の流れ

全体の流れは次のようになります。

  1. Parallelステートで複数のブランチを同時に実行する
  2. 各ブランチではLambdaを実行し、成功/失敗に関わらず、結果を統一フォーマットに整形する
  3. Parallelの出力を評価する
  4. 1つでも失敗が含まれていればステートマシン全体をFailにする

重要なのは、ブランチ内ではFailにしないという点です。
Failにするかどうかは、あくまでParallel実行がすべて完了したあとに判断します。

実装のポイント

ここからは、Step Functionsの構成を理解しやすいように、実装の要点となる部分を抜粋して解説していきます。

全体のAWS CDKコードは、記事の最後に参考として掲載しています。

■ Parallel ステートの定義

const parallel = new sfn.Parallel(this, "Parallel", {
    resultPath: "$.parallelResults",
})

ここで指定しているresultPathにより、各ブランチの出力はparallelResultsという配列としてまとめて格納されます。

この配列を後段で評価するのが今回のポイントです。

■ LambdaInvokeとCatch

各ブランチでは、Lambdaを実行するLambdaInvokeステートを作成します。

const invoke = new tasks.LambdaInvoke(this, `Invoke${branchId}`, {
     lambdaFunction: fn,
     outputPath: "$.Payload"
})

このままLambdaが失敗すると、Parallel全体が即Failしてしまいます。
今回はそれを避けたいので、Catchを追加します。

const failed = new sfn.Pass(this, `MarkFailed${branchId}`, {
    parameters: {
        id: branchId,
        status: "FAILED",
        error: sfn.JsonPath.objectAt("$.error")
    },
    resultPath: "$"
})

invoke.addCatch(failed, {
    resultPath: "$.error"
});

Lambdaの実行が失敗した場合はCatchでエラーを受け取り、Passステート(failed)に遷移させます。

Passステートは、入力データを加工・整形するためのステートです。
ここでは失敗したブランチの結果を{id, status: "FAILED", error}という形式に変換しています。

■ ブランチの出力フォーマット統一

Passステートで、成功・失敗に関わらずブランチの出力フォーマットを統一しています。
出力にstatusフィールドを持たせることで、Parallel後に成功・失敗を簡単に判定できるようにしています。

Parallelステートの出力は各ブランチの最終出力をそのまま配列にまとめるため、ブランチ側で出力形式をそろえておくことが重要です。

  • 失敗時
const failed = new sfn.Pass(this, `MarkFailed${branchId}`, {
    parameters: {
        id: branchId,
        status: "FAILED",
        error: sfn.JsonPath.objectAt("$.error")
    },
    resultPath: "$"
})

invoke.addCatch(failed, {
    resultPath: "$.error"
});
  • 成功時
const succeeded = new sfn.Pass(this, `MarkSucceeded${branchId}`, {
    parameters: {
        id: branchId,
        status: "SUCCEEDED",
        payload: sfn.JsonPath.objectAt("$")
    },
    resultPath: "$"
})

parallel.branch(invoke.next(succeeded))

■ Parallel後の結果評価

Parallelステートの出力は、各ブランチの結果を要素とする配列になります。
ここでは Pass ステートを使い、その配列の中にstatus: "FAILED"が含まれているかを評価しています。
評価結果はhasFailedとして保持し、後段のChoiceステートで最終的な成否判定に利用します。

Choiceステートでは配列の集計処理ができないため、事前にPassステートで評価結果を作成しています。

 const evaluation = new sfn.Pass(this, "EvaluateParallelResults", {
    parameters: {
        hasFailed: sfn.JsonPath.arrayContains(
            sfn.JsonPath.listAt("$.parallelResults[*].status"),
            "FAILED"
        ),
    },
    resultPath: "$.parallelEvaluation"
 })

■ Choiceによる最終判定

Parallelの実行結果を評価したあとは、その結果をもとにステートマシン全体の成否を判定します。
Choiceステートでは、hasFailedtrueの場合はFailステートへ遷移し、すべてのブランチが成功していればSucceedステートへ遷移します。

const fail = new sfn.Fail(this, "ParallelHasFailure", {
    error: "ParallelHasFailure",
    cause: "One or more branches failed. Check parallelResults for details."
})

const succeed = new sfn.Succeed(this, "AllSucceeded");

const choice = new sfn.Choice(this, "CheckParallelResult")
    .when(sfn.Condition.booleanEquals("$.parallelEvaluation.hasFailed", true), fail)
    .otherwise(succeed)

const definition = parallel.next(evaluation).next(choice);

const stateMachine = new sfn.StateMachine(this, "ParallelLambdaStateMachine", {
    definitionBody: sfn.DefinitionBody.fromChainable(definition)
})

このように、最終的な成否判定をステートマシンの末尾で一元的に行っています。

実際にステートマシンを実行した際のグラフビューは以下のようになります。
1つのブランチが失敗しても、他のブランチは最後まで実行され、
最終的にステートマシン全体がFailになることが確認できます。

image.png

まとめ

Parallelステートでは、1つのブランチが失敗するとParallel全体がFail扱いとなり、未完了のブランチはCanceledになります。
そのままでは、すべての処理結果を確認する前にステートマシンが失敗してしまいます。

本記事では、ブランチ内で失敗をCatchして出力を整形し、Parallel実行後にまとめて成否を判定する実装例を紹介しました。

Parallelを使った設計の一例として、何かのヒントになればうれしいです。

全体の実装例(参考)

※ 本コードは解説用の簡易的な例です。実際のユースケースでは、リトライ設定やエラーハンドリング方針などを要件に応じて調整してください。

step-functions-lambda-parallel-stack.ts
import { Stack, StackProps } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';

export class StepfunctionsLambdaParallelStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    const branchIds = ["A", "B"]; // Parallelで実行するブランチの識別子

    const parallel = new sfn.Parallel(this, "RunInParallel", {
        resultPath: "$.parallelResults",
    })

    for (const branchId of branchIds) {
         const fn = new lambda.Function(this, `Lambda${branchId}`, {
             runtime: lambda.Runtime.PYTHON_3_11,
             handler: "index.lambda_handler",
             code: lambda.Code.fromAsset("lambda")
         })

        const invoke = new tasks.LambdaInvoke(this, `Invoke${branchId}`, {
             lambdaFunction: fn,
             outputPath: "$.Payload"
        })

        const failed = new sfn.Pass(this, `MarkFailed${branchId}`, {
            parameters: {
                id: branchId,
                status: "FAILED",
                error: sfn.JsonPath.objectAt("$.error")
            },
            resultPath: "$"
        })

        invoke.addCatch(failed, {
            resultPath: "$.error"
        });

        const succeeded = new sfn.Pass(this, `MarkSucceeded${branchId}`, {
            parameters: {
                id: branchId,
                status: "SUCCEEDED",
                payload: sfn.JsonPath.objectAt("$")
            },
            resultPath: "$"
        })

        parallel.branch(invoke.next(succeeded))
    }

    /**
     Parallel後の集計
     */
     const evaluation = new sfn.Pass(this, "EvaluateParallelResults", {
        parameters: {
            hasFailed: sfn.JsonPath.arrayContains(
                sfn.JsonPath.listAt("$.parallelResults[*].status"),
                "FAILED"
            ),
        },
        resultPath: "$.parallelEvaluation"
     })

    const fail = new sfn.Fail(this, "ParallelHasFailure", {
        error: "ParallelHasFailure",
        cause: "One or more branches failed. Check parallelResults for details."
    })

    const succeed = new sfn.Succeed(this, "AllSucceeded");

    const choice = new sfn.Choice(this, "CheckParallelResult")
        .when(sfn.Condition.booleanEquals("$.parallelEvaluation.hasFailed", true), fail)
        .otherwise(succeed)

    const definition = parallel.next(evaluation).next(choice);

    const stateMachine = new sfn.StateMachine(this, "ParallelLambdaStateMachine", {
        definitionBody: sfn.DefinitionBody.fromChainable(definition)
    })
  }
}

We Are Hiring!

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?