この記事はコープさっぽろ オープン社内報 Advent Calendar 2021の4日目の記事です。
この記事は?
動いているStepFunctionsワークフローを「失敗しても簡単に再実行できる」ように修正した記録。
何がしたいの?
いくつかのLambdaや外部APIを呼び出しているStepFunctions ワークフロー(以下「ワークフロー」)があります。
これを定期的に動かしているのですが、外部APIの都合でしばしば異常終了してしまいます。
ワークフローは以下のような状況です。
- 呼び出している外部APIは複数あり、この内のいくつかでたまにエラーが発生します。
- 基幹システムを呼び出すPutMemberは 同じ条件で2回呼び出すとエラーになります。
- 可能なものはLambdaの中で自力でリトライを仕掛けていますが、リトライ回数の制限を超えてLambdaがエラーになる場合があります。
- こんなにちょいちょい落ちると思っていなかった
ワークフローがエラーになった場合、AWS Management Consoleから簡単に再実行できますが、先頭から実行したときに
途中のタスクから再実行はできないので、
Lambdaをワークフローから呼ばれたときと同じパラメーターで手動で一つずつ起動するということをやっていました。
上の図の例の場合、2番めのLambdaから10このLambdaを手動で実行する必要があります。
手動でLambdaを1つずつ起動するのがいい加減面倒になったので、ワークフローの再実行でもエラーにならないように「一度正常終了したタスクは実行をスキップする」をワークフローに組み込みたいと思います。
方針
- 実行状況を保存するDynamoDB Table(以下「実行状態保存テーブル」)を用意する。
- ワークフローの各Lambdaの実行が正常終了したら実行状態保存テーブルに記録する。
- ワークフローは各Lambdaの実行前に実行状態保存テーブルを見て、実行済みであればLambdaを呼ばずに次のタスクを実行する。
- これらを新しくLambda Functionなどを作ることなく実現する。
これを各Lambdaでやればうまくいくはずです。
この方式を実現したあと、ワークフローがエラーになった時にはこのような運用ができるようになるはずです。
- ワークフローでエラーになったLambdaを把握する。
- エラーになったLambdaのエラーの原因を取り除く。
- エラーの原因を取り除いたあと
- そのLambdaを再実行できるのであればそのまま ワークフロー先頭から実行する
- そのLambdaをワークフローから実行しなくてよいのであれば実行状態保存テーブルに「そのLambdaは実行済み」と設定してワークフローを先頭から実行する。
いずれのケースでもワークフローは先頭から実行できるようになります。
まずは準備
実行状態保存テーブルの作成とワークフローのロールの更新
実行状態保存テーブルをDynamoDB Tableとして作成します。
- テーブル名 :
MemberRegistrationExecution
- パーティションキー :
key(string)
ワークフローからDynamoDBにアクセスするので、ワークフローのロールに実行状態保存テーブルへの操作を許可します。
(Cloud Formationテンプレートの抜粋)
- Effect: Allow
Action:
- dynamodb:Query
- dynamodb:GetItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
Resource:
- !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${MemberRegistrationExecutionManagementTableName}"
実装開始
StateMachineを考える
この修正を入れる前のワークフローのStateMachineはこのように定義しています。
{
"member": {
"memberCode": "登録データのユニークキー",
:
(もろもろの属性)
}
}
ワークフローで実行状態保存テーブルから取得した前回実行状態を持ち回りたいので、StateMachineは以下のように定義します。
(定義しますと書きましたが、このような定義がどこかにあるわけではなく、こうなるように各処理を設定していきます)
{
"member": {
"memberCode": "登録データのユニークキー",
:
},
"MemberRegistrationExecution" : {
"Item": {
"key": { "S": "登録データのユニークキー"},
"Lambda Function名": { "S": "終了時刻"}
}
}
}
実行状態保存テーブルから取得したデータを MemberRegistrationExecution
というキーで持ち回ります。
実行状態保存テーブルから前回の実行結果を取得する
StepFunctionsで用意されているGetItemアクションを使います。
"GetExecutionStateItem": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "実行状態保存テーブル名",
"Key": {
"key": {
"S.$": "$.member.memberCode"
}
}
},
"Next": "IsExecutionStateExists",
"ResultPath": "$.MemberRegistrationExecution"
},
この定義は
-
GetExecutionStateItem
という名前のStateは -
TableName
で指定したDynamoDB Tableの -
key
という名前のキーが - StateMachineの
member.memberCode
と一致する項目をGetItemして - 結果をStateMachineの
MemberRegistrationExecution
に設定する
という意味になります。
ちなみに、
"S.$": "$.member.memberCode"
プロパティ名に .$
をつけると、StepFunctionsが $.member.memberCode
を実際の値に展開してくれます。
実行状態保存テーブルに項目がなければ作る
Lambdaの実行結果の実行状態保存テーブルへの書き込みはUpdateItemで行う予定なので、GetExecutionStateItemで値を取れなかった場合は項目を事前に作成しておく必要があります。
"IsExecutionStateExists": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.MemberRegistrationExecution.Item.key",
"IsPresent": true,
"Next": "IsSkipPutRegistrationStartedAt"
}
],
"Default": "PutExecutionStateItem",
"Comment": "DynamoDBからExecutionStateが取得できない場合は初回実行としてPutItem"
},
このChoice Stateは以下の意味です。
- StateMachineの
MemberRegistrationExecution.Item.key
がある場合は次の処理(IsSkipPutRegistrationStartedAt
)に行き - それ以外の場合は
PutExecutionStateItem
に行く
PutExecutionStateItem
StateはDynamoDBに項目をPutItemします。
"PutExecutionStateItem": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "実行状態保存テーブル名",
"Item": {
"key": {
"S.$": "$.member.memberCode"
}
}
},
"Next": "IsSkipPutRegistrationStartedAt",
"ResultPath": null
},
ResultPath
にnullを設定しないと、PutItemの結果でStateMachineが書き換わってしまうので忘れずに設定しましょう。後述のUpdateItem、DeleteItemでも同様です。
ワークフローに上記を追加してWorkflow Studioで表示するとこのようになります。
実行状態に次のLambdaの完了が記録されていればLambdaをスキップし、無ければLambdaを実行する
前のステップでStateMachineのMemberRegistrationExecution
に実行状態が保存されているはずなのでそれをChoiseステートで確認します。
"IsSkipPutRegistrationStartedAt": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.MemberRegistrationExecution.Item.PutRegistrationStartedAt",
"IsPresent": true,
"Next": "IsSkipIsDuplicatedBeforeReg"
}
],
"Default": "PutRegistrationStartedAt"
},
このChoice Stateは以下の意味です。
- StateMachineの
MemberRegistrationExecution.Item.PutRegistrationStartedAt
がある場合は次の処理(IsSkipIsDuplicatedBeforeReg
)に行き - それ以外の場合は
PutRegistrationStartedAt
に行く
次にLambdaの実行が完了したら実行状態保存テーブルに書き込むStateを作ります。
"UpdateStatePutRegistrationStartedAt": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "テーブル名",
"Key": {
"key": {
"S.$": "$.member.memberCode"
}
},
"UpdateExpression": "SET #key = :val",
"ExpressionAttributeNames": {
"#key": "PutRegistrationStartedAt"
},
"ExpressionAttributeValues": {
":val": {
"S.$": "$$.State.EnteredTime"
}
}
},
"Next": "IsSkipIsDuplicatedBeforeReg",
"ResultPath": null
},
このStateは以下の意味です。
-
テーブル名
にupdateItemを実行する。 - KeyはStateMachineの
member.memberCode
- プロパティ
PutRegistrationStartedAt
に値としてこのStateの開始時刻を設定する-
$$
はStepFunctiosのコンテキストオブジェクトを示します
-
- updateItemの結果はStateMachineに反映しない
これを追加したWorkflow Studioの定義は以下のようになります。
これをスキップしたいLambdaに順次組み込んでいきます。
最後に実行状態保存テーブルから状態を削除する
ワークフローが最後まで実行されたら実行状態保存テーブルから項目を削除します。
"DeleteExecutionStateItem": {
"End": true,
"Parameters": {
"Key": {
"key": {
"S.$": "$.member.memberCode"
}
},
"TableName": "dev-deli-websubscription-MemberRegistrationExecution"
},
"Resource": "arn:aws:states:::dynamodb:deleteItem",
"Type": "Task",
"ResultPath": null
},
ここについては、実行結果を保存するテーブルなので残しておいてもよいかもしれませんが、削除するタイミングがないでここで削除してしまいます。
残していたとして、ワークフローを再実行したら全部スキップされるから意味もないですし。
実行結果のエビデンスはStepFunctions自身のステータスを見るということにします。
完成!
試してみる
いい感じにエラーになりました。
4つ目のLambda RefreshAuth0Tokens
でエラーになっています。
実行状態保存テーブルを見てみます。
成功した3つのLambdaだけ書き込まれています。
では何も手を加えずにワークフローを再実行してみます。
前回成功している最初の3つのLambda PutRegistrationStartedAt
,IsDuplicatedBeforeReg
, PutMember
が想定通りスキップされています!
ワークフローが全て完了したら実行状態保存テーブルから項目が削除されているのも確認できたので作戦大成功です。
まとめ
- 深く考えずにワークフローの途中から実行が実現できたので運用負荷が軽くなったはず。
- ロジック本体(Lambda)と実行状態の管理を分離して定義することができた。
- JSONPath構文むずい。
- ワークフローのテストむずい。