LoginSignup
2
1

More than 1 year has passed since last update.

ちゃんとしていなかったStepFunctionsワークフローの実行状態管理をちゃんとする

Last updated at Posted at 2021-12-01

この記事はコープさっぽろ オープン社内報 Advent Calendar 2021の4日目の記事です。

この記事は?

動いているStepFunctionsワークフローを「失敗しても簡単に再実行できる」ように修正した記録。

何がしたいの?

いくつかのLambdaや外部APIを呼び出しているStepFunctions ワークフロー(以下「ワークフロー」)があります。
これを定期的に動かしているのですが、外部APIの都合でしばしば異常終了してしまいます。

image.png

ワークフローは以下のような状況です。

  • 呼び出している外部APIは複数あり、この内のいくつかでたまにエラーが発生します。
  • 基幹システムを呼び出すPutMemberは 同じ条件で2回呼び出すとエラーになります
  • 可能なものはLambdaの中で自力でリトライを仕掛けていますが、リトライ回数の制限を超えてLambdaがエラーになる場合があります。
  • こんなにちょいちょい落ちると思っていなかった

ワークフローがエラーになった場合、AWS Management Consoleから簡単に再実行できますが、先頭から実行したときに
途中のタスクから再実行はできないので、
Lambdaをワークフローから呼ばれたときと同じパラメーターで手動で一つずつ起動するということをやっていました。

上の図の例の場合、2番めのLambdaから10このLambdaを手動で実行する必要があります。

手動でLambdaを1つずつ起動するのがいい加減面倒になったので、ワークフローの再実行でもエラーにならないように「一度正常終了したタスクは実行をスキップする」をワークフローに組み込みたいと思います。

方針

  • 実行状況を保存するDynamoDB Table(以下「実行状態保存テーブル」)を用意する。
  • ワークフローの各Lambdaの実行が正常終了したら実行状態保存テーブルに記録する。
  • ワークフローは各Lambdaの実行前に実行状態保存テーブルを見て、実行済みであればLambdaを呼ばずに次のタスクを実行する。
  • これらを新しくLambda Functionなどを作ることなく実現する。

image.png

これを各Lambdaでやればうまくいくはずです。

この方式を実現したあと、ワークフローがエラーになった時にはこのような運用ができるようになるはずです。

  • ワークフローでエラーになったLambdaを把握する。
  • エラーになったLambdaのエラーの原因を取り除く。
  • エラーの原因を取り除いたあと
    • そのLambdaを再実行できるのであればそのまま ワークフロー先頭から実行する
    • そのLambdaをワークフローから実行しなくてよいのであれば実行状態保存テーブルに「そのLambdaは実行済み」と設定してワークフローを先頭から実行する

いずれのケースでもワークフローは先頭から実行できるようになります。

まずは準備

実行状態保存テーブルの作成とワークフローのロールの更新

実行状態保存テーブルをDynamoDB Tableとして作成します。

  • テーブル名 : MemberRegistrationExecution
  • パーティションキー : key(string)

ワークフローからDynamoDBにアクセスするので、ワークフローのロールに実行状態保存テーブルへの操作を許可します。
(Cloud Formationテンプレートの抜粋)

             - Effect: Allow
                Action:
                  - dynamodb:Query
                  - dynamodb:GetItem
                  - dynamodb:UpdateItem
                  - dynamodb:DeleteItem
                Resource:
                  - !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${MemberRegistrationExecutionManagementTableName}"

実装開始

StateMachineを考える

この修正を入れる前のワークフローのStateMachineはこのように定義しています。

{
  "member": {
    "memberCode": "登録データのユニークキー",
     : 
    (もろもろの属性)
  }
}

ワークフローで実行状態保存テーブルから取得した前回実行状態を持ち回りたいので、StateMachineは以下のように定義します。
(定義しますと書きましたが、このような定義がどこかにあるわけではなく、こうなるように各処理を設定していきます)

{
  "member": {
    "memberCode": "登録データのユニークキー",
     : 
  },
  "MemberRegistrationExecution" : {
    "Item": {
      "key": { "S": "登録データのユニークキー"},
      "Lambda Function名": { "S": "終了時刻"}
    }
  }
}

実行状態保存テーブルから取得したデータを MemberRegistrationExecution というキーで持ち回ります。

実行状態保存テーブルから前回の実行結果を取得する

StepFunctionsで用意されているGetItemアクションを使います。

    "GetExecutionStateItem": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:getItem",
      "Parameters": {
        "TableName": "実行状態保存テーブル名",
        "Key": {
          "key": {
            "S.$": "$.member.memberCode"
          }
        }
      },
      "Next": "IsExecutionStateExists",
      "ResultPath": "$.MemberRegistrationExecution"
    },

この定義は

  • GetExecutionStateItemという名前のStateは
  • TableName で指定したDynamoDB Tableの
  • key という名前のキーが
  • StateMachineの member.memberCodeと一致する項目をGetItemして
  • 結果をStateMachineの MemberRegistrationExecution に設定する

という意味になります。

ちなみに、

 "S.$": "$.member.memberCode"

プロパティ名に .$ をつけると、StepFunctionsが $.member.memberCode を実際の値に展開してくれます。

実行状態保存テーブルに項目がなければ作る

Lambdaの実行結果の実行状態保存テーブルへの書き込みはUpdateItemで行う予定なので、GetExecutionStateItemで値を取れなかった場合は項目を事前に作成しておく必要があります。

    "IsExecutionStateExists": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.MemberRegistrationExecution.Item.key",
          "IsPresent": true,
          "Next": "IsSkipPutRegistrationStartedAt"
        }
      ],
      "Default": "PutExecutionStateItem",
      "Comment": "DynamoDBからExecutionStateが取得できない場合は初回実行としてPutItem"
    },

このChoice Stateは以下の意味です。

  • StateMachineの MemberRegistrationExecution.Item.key がある場合は次の処理(IsSkipPutRegistrationStartedAt)に行き
  • それ以外の場合は PutExecutionStateItemに行く

PutExecutionStateItem StateはDynamoDBに項目をPutItemします。

    "PutExecutionStateItem": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:putItem",
      "Parameters": {
        "TableName": "実行状態保存テーブル名",
        "Item": {
          "key": {
            "S.$": "$.member.memberCode"
          }
        }
      },
      "Next": "IsSkipPutRegistrationStartedAt",
      "ResultPath": null
    },

ResultPathにnullを設定しないと、PutItemの結果でStateMachineが書き換わってしまうので忘れずに設定しましょう。後述のUpdateItem、DeleteItemでも同様です。

ワークフローに上記を追加してWorkflow Studioで表示するとこのようになります。

image.png

実行状態に次のLambdaの完了が記録されていればLambdaをスキップし、無ければLambdaを実行する

前のステップでStateMachineのMemberRegistrationExecutionに実行状態が保存されているはずなのでそれをChoiseステートで確認します。

    "IsSkipPutRegistrationStartedAt": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.MemberRegistrationExecution.Item.PutRegistrationStartedAt",
          "IsPresent": true,
          "Next": "IsSkipIsDuplicatedBeforeReg"
        }
      ],
      "Default": "PutRegistrationStartedAt"
    },

このChoice Stateは以下の意味です。

  • StateMachineの MemberRegistrationExecution.Item.PutRegistrationStartedAt がある場合は次の処理(IsSkipIsDuplicatedBeforeReg)に行き
  • それ以外の場合は PutRegistrationStartedAtに行く

次にLambdaの実行が完了したら実行状態保存テーブルに書き込むStateを作ります。

   "UpdateStatePutRegistrationStartedAt": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:updateItem",
      "Parameters": {
        "TableName": "テーブル名",
        "Key": {
          "key": {
            "S.$": "$.member.memberCode"
          }
        },
        "UpdateExpression": "SET #key = :val",
        "ExpressionAttributeNames": {
          "#key": "PutRegistrationStartedAt"
        },
        "ExpressionAttributeValues": {
          ":val": {
            "S.$": "$$.State.EnteredTime"
          }
        }
      },
      "Next": "IsSkipIsDuplicatedBeforeReg",
      "ResultPath": null
    },

このStateは以下の意味です。

  • テーブル名 にupdateItemを実行する。
  • KeyはStateMachineのmember.memberCode
  • プロパティPutRegistrationStartedAtに値としてこのStateの開始時刻を設定する
  • updateItemの結果はStateMachineに反映しない

これを追加したWorkflow Studioの定義は以下のようになります。

image.png

これをスキップしたいLambdaに順次組み込んでいきます。

最後に実行状態保存テーブルから状態を削除する

ワークフローが最後まで実行されたら実行状態保存テーブルから項目を削除します。

    "DeleteExecutionStateItem": {
      "End": true,
      "Parameters": {
        "Key": {
          "key": {
            "S.$": "$.member.memberCode"
          }
        },
        "TableName": "dev-deli-websubscription-MemberRegistrationExecution"
      },
      "Resource": "arn:aws:states:::dynamodb:deleteItem",
      "Type": "Task",
      "ResultPath": null
    },

image.png

ここについては、実行結果を保存するテーブルなので残しておいてもよいかもしれませんが、削除するタイミングがないでここで削除してしまいます。
残していたとして、ワークフローを再実行したら全部スキップされるから意味もないですし。
実行結果のエビデンスはStepFunctions自身のステータスを見るということにします。

完成!

image.png

試してみる

image.png

いい感じにエラーになりました。
4つ目のLambda RefreshAuth0Tokens でエラーになっています。

実行状態保存テーブルを見てみます。

image.png

成功した3つのLambdaだけ書き込まれています。

では何も手を加えずにワークフローを再実行してみます。

image.png

前回成功している最初の3つのLambda PutRegistrationStartedAt,IsDuplicatedBeforeReg, PutMemberが想定通りスキップされています!

image.png

ワークフローが全て完了したら実行状態保存テーブルから項目が削除されているのも確認できたので作戦大成功です。

まとめ

  • 深く考えずにワークフローの途中から実行が実現できたので運用負荷が軽くなったはず。
  • ロジック本体(Lambda)と実行状態の管理を分離して定義することができた。
  • JSONPath構文むずい。
  • ワークフローのテストむずい。
2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1