はじめに
AWS Step FunctionsのParallelステートを使って、複数のLambda関数を並列に実行する仕組みを実装しました。
その中で学んだ、並列実行時のエラーハンドリングをAWS CDK(TypeScript)で実装する例を、簡単な例を用いて紹介します。
エラーハンドリングについて実現したかったこと
Parallelステートでは、どれか1つのブランチがFailすると、
Parallel 全体がFail扱いとなり、他のブランチは
「キャンセル済み(Canceled)」の状態になります。
そのため、すべてのブランチの処理結果を確認する前に
ステートマシンの実行が失敗として終了してしまいます。
しかし今回は、どれか1つのLambda関数が失敗しても、ほかのLambda関数の処理を進め、最後に1つでも失敗があればステートマシン全体を失敗にしたいという要件がありました。
こちらを実現するために行った実装の実装例を紹介します。
処理の流れ
全体の流れは次のようになります。
-
Parallelステートで複数のブランチを同時に実行する - 各ブランチではLambdaを実行し、成功/失敗に関わらず、結果を統一フォーマットに整形する
-
Parallelの出力を評価する - 1つでも失敗が含まれていればステートマシン全体をFailにする
重要なのは、ブランチ内ではFailにしないという点です。
Failにするかどうかは、あくまでParallel実行がすべて完了したあとに判断します。
実装のポイント
ここからは、Step Functionsの構成を理解しやすいように、実装の要点となる部分を抜粋して解説していきます。
全体のAWS CDKコードは、記事の最後に参考として掲載しています。
■ Parallel ステートの定義
const parallel = new sfn.Parallel(this, "Parallel", {
resultPath: "$.parallelResults",
})
ここで指定しているresultPathにより、各ブランチの出力はparallelResultsという配列としてまとめて格納されます。
この配列を後段で評価するのが今回のポイントです。
■ LambdaInvokeとCatch
各ブランチでは、Lambdaを実行するLambdaInvokeステートを作成します。
const invoke = new tasks.LambdaInvoke(this, `Invoke${branchId}`, {
lambdaFunction: fn,
outputPath: "$.Payload"
})
このままLambdaが失敗すると、Parallel全体が即Failしてしまいます。
今回はそれを避けたいので、Catchを追加します。
const failed = new sfn.Pass(this, `MarkFailed${branchId}`, {
parameters: {
id: branchId,
status: "FAILED",
error: sfn.JsonPath.objectAt("$.error")
},
resultPath: "$"
})
invoke.addCatch(failed, {
resultPath: "$.error"
});
Lambdaの実行が失敗した場合はCatchでエラーを受け取り、Passステート(failed)に遷移させます。
Passステートは、入力データを加工・整形するためのステートです。
ここでは失敗したブランチの結果を{id, status: "FAILED", error}という形式に変換しています。
■ ブランチの出力フォーマット統一
Passステートで、成功・失敗に関わらずブランチの出力フォーマットを統一しています。
出力にstatusフィールドを持たせることで、Parallel後に成功・失敗を簡単に判定できるようにしています。
Parallelステートの出力は各ブランチの最終出力をそのまま配列にまとめるため、ブランチ側で出力形式をそろえておくことが重要です。
- 失敗時
const failed = new sfn.Pass(this, `MarkFailed${branchId}`, {
parameters: {
id: branchId,
status: "FAILED",
error: sfn.JsonPath.objectAt("$.error")
},
resultPath: "$"
})
invoke.addCatch(failed, {
resultPath: "$.error"
});
- 成功時
const succeeded = new sfn.Pass(this, `MarkSucceeded${branchId}`, {
parameters: {
id: branchId,
status: "SUCCEEDED",
payload: sfn.JsonPath.objectAt("$")
},
resultPath: "$"
})
parallel.branch(invoke.next(succeeded))
■ Parallel後の結果評価
Parallelステートの出力は、各ブランチの結果を要素とする配列になります。
ここでは Pass ステートを使い、その配列の中にstatus: "FAILED"が含まれているかを評価しています。
評価結果はhasFailedとして保持し、後段のChoiceステートで最終的な成否判定に利用します。
Choiceステートでは配列の集計処理ができないため、事前にPassステートで評価結果を作成しています。
const evaluation = new sfn.Pass(this, "EvaluateParallelResults", {
parameters: {
hasFailed: sfn.JsonPath.arrayContains(
sfn.JsonPath.listAt("$.parallelResults[*].status"),
"FAILED"
),
},
resultPath: "$.parallelEvaluation"
})
■ Choiceによる最終判定
Parallelの実行結果を評価したあとは、その結果をもとにステートマシン全体の成否を判定します。
Choiceステートでは、hasFailedがtrueの場合はFailステートへ遷移し、すべてのブランチが成功していればSucceedステートへ遷移します。
const fail = new sfn.Fail(this, "ParallelHasFailure", {
error: "ParallelHasFailure",
cause: "One or more branches failed. Check parallelResults for details."
})
const succeed = new sfn.Succeed(this, "AllSucceeded");
const choice = new sfn.Choice(this, "CheckParallelResult")
.when(sfn.Condition.booleanEquals("$.parallelEvaluation.hasFailed", true), fail)
.otherwise(succeed)
const definition = parallel.next(evaluation).next(choice);
const stateMachine = new sfn.StateMachine(this, "ParallelLambdaStateMachine", {
definitionBody: sfn.DefinitionBody.fromChainable(definition)
})
このように、最終的な成否判定をステートマシンの末尾で一元的に行っています。
実際にステートマシンを実行した際のグラフビューは以下のようになります。
1つのブランチが失敗しても、他のブランチは最後まで実行され、
最終的にステートマシン全体がFailになることが確認できます。
まとめ
Parallelステートでは、1つのブランチが失敗するとParallel全体がFail扱いとなり、未完了のブランチはCanceledになります。
そのままでは、すべての処理結果を確認する前にステートマシンが失敗してしまいます。
本記事では、ブランチ内で失敗をCatchして出力を整形し、Parallel実行後にまとめて成否を判定する実装例を紹介しました。
Parallelを使った設計の一例として、何かのヒントになればうれしいです。
全体の実装例(参考)
※ 本コードは解説用の簡易的な例です。実際のユースケースでは、リトライ設定やエラーハンドリング方針などを要件に応じて調整してください。
import { Stack, StackProps } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
export class StepfunctionsLambdaParallelStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
const branchIds = ["A", "B"]; // Parallelで実行するブランチの識別子
const parallel = new sfn.Parallel(this, "RunInParallel", {
resultPath: "$.parallelResults",
})
for (const branchId of branchIds) {
const fn = new lambda.Function(this, `Lambda${branchId}`, {
runtime: lambda.Runtime.PYTHON_3_11,
handler: "index.lambda_handler",
code: lambda.Code.fromAsset("lambda")
})
const invoke = new tasks.LambdaInvoke(this, `Invoke${branchId}`, {
lambdaFunction: fn,
outputPath: "$.Payload"
})
const failed = new sfn.Pass(this, `MarkFailed${branchId}`, {
parameters: {
id: branchId,
status: "FAILED",
error: sfn.JsonPath.objectAt("$.error")
},
resultPath: "$"
})
invoke.addCatch(failed, {
resultPath: "$.error"
});
const succeeded = new sfn.Pass(this, `MarkSucceeded${branchId}`, {
parameters: {
id: branchId,
status: "SUCCEEDED",
payload: sfn.JsonPath.objectAt("$")
},
resultPath: "$"
})
parallel.branch(invoke.next(succeeded))
}
/**
Parallel後の集計
*/
const evaluation = new sfn.Pass(this, "EvaluateParallelResults", {
parameters: {
hasFailed: sfn.JsonPath.arrayContains(
sfn.JsonPath.listAt("$.parallelResults[*].status"),
"FAILED"
),
},
resultPath: "$.parallelEvaluation"
})
const fail = new sfn.Fail(this, "ParallelHasFailure", {
error: "ParallelHasFailure",
cause: "One or more branches failed. Check parallelResults for details."
})
const succeed = new sfn.Succeed(this, "AllSucceeded");
const choice = new sfn.Choice(this, "CheckParallelResult")
.when(sfn.Condition.booleanEquals("$.parallelEvaluation.hasFailed", true), fail)
.otherwise(succeed)
const definition = parallel.next(evaluation).next(choice);
const stateMachine = new sfn.StateMachine(this, "ParallelLambdaStateMachine", {
definitionBody: sfn.DefinitionBody.fromChainable(definition)
})
}
}
We Are Hiring!

