0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS CDK で Step Functions に入門する

Last updated at Posted at 2024-10-17

はじめに

仕事で Step Functions を使うことになったので、いい加減ちゃんと触ってみる。

想定読者

  • Step Functionsに入門したい人
  • CDKを使ったことがある人

Step Functions

概要

Step Functions は、ワークフローを作成して分散アプリケーションの構築、プロセスの自動化などが可能。

  • 全体の一連の流れ = ワークフロー = ステートマシン
  • 一連の流れの中の各ステップ = 状態(ステート)

ワークフロータイプは2種類あり、標準ワークフローとExpressワークフロー(ストリーミング処理など使う)がある。

状態の種類

フィールド情報は、私見で使いそうだなと思ったもののみ抜粋した。すべて確認したい場合は、公式を参照ください。

タスク

Task状態は、単一の作業単位を表す。タスクはLambda関数やアクティビティを使用したり、ほかのAWSサービスと統合したりして実行される。

AWSサービスと最適化された統合をした場合のResourceフィールドの指定は以下の通り。

arn:${partition}:states:${region}:${account}:${servicename}:${APIname}
  • partition: 一般的にはawsを指定
  • region/account/servicename: リージョン / アカウントID / AWSサービス名
  • APIname: 例:Lambda Invokeの場合はinvoke ※参考

最適化された統合を用いると、例えばLambda Invokeの場合、レスポンスのPayloadフィールドはエスケープされたJSONからプレーンなJSONに変換される。

以下は、Lambda関数の最適化された統合を用いた場合のTask状態定義である。

example_lambda.json
"LambdaState": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "OutputPath": "$.Payload",
  "Parameters": {
    "Payload.$": "$",
    "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:HelloWorld:$LATEST"
  },
  "Next": "NextState"
}
  • OutputPath: 状態出力の一部を選択して次の状態に渡す。上記の例ではPayloadのみを渡している。未指定の場合、$となりJSON全体が渡される
  • Parameters: Lambda InvokeAPIに渡す引数。上記の例では、Payload配下にこのTask状態の入力JSON全体を渡し、FunctionNameに実行したいLambda関数のARNを渡している

選択

Choice状態は、条件付きロジック。

  • Choices (必須): ステートマシンが次に移行する状態を決定する選択ルールの配列
  • Default (オプション): Choicesのいずれにもマッチしないときの移行先を指定する
Choice状態サンプル(Not,And,Default)
example_choice.json
{
  "ChoiceStateX": {
    "Type": "Choice",
    "Choices": [
      {
        "Not": {
          "Variable": "$.type",
          "StringEquals": "Private"
        },
        "Next": "Public"
      },
      {
        "Variable": "$.value",
        "NumericEquals": 0,
        "Next": "ValueIsZero"
      },
      {
        "And": [
          {
            "Variable": "$.value",
            "NumericGreaterThanEquals": 20
          },
          {
            "Variable": "$.value",
            "NumericLessThan": 30
          }
        ],
        "Next": "ValueInTwenties"
      }
    ],
    "Default": "DefaultState"
  },
  
  "Public": {
    "Type" : "Task",
    "Resource": "arn:aws:lambda:us-east-1:123456789012:function:Foo",
    "Next": "NextState"
  },
  
  "ValueIsZero": {
    "Type" : "Task",
    "Resource": "arn:aws:lambda:us-east-1:123456789012:function:Zero",
    "Next": "NextState"
  },
  
  "ValueInTwenties": {
    "Type" : "Task",
    "Resource": "arn:aws:lambda:us-east-1:123456789012:function:Bar",
    "Next": "NextState"
  },
  
  "DefaultState": {
    "Type": "Fail",
    "Cause": "No Matches!"
  }
}

上記Choice状態には、以下INPUTを与える想定。

choice_input.json
{
  "type": "Private",
  "value": 22
}

並行

Parallel状態は、枝分かれ(ブランチ)を作って並列処理を行うための状態。

  • Branches (必須): 並列実行する状態を指定するオブジェクトの配列
  • ResultPath (オプション): Resultで指定した内容を出力のどこに配置するかを指定
  • ResultSelector (オプション): 状態の出力から必要なものを取り出したりして加工する
  • Retry (オプション): リトライポリシーを配列指定。失敗したものだけでなく全反復操作に適用されるため注意。エラーだけに適用したい場合は、ErrorEqualsなどを使用して適用対象を限定する
  • Catch (オプション): フォールバック状態(どういうエラーのときどの状態に遷移させるかなど)を配列指定
example_parallel.json
{
  "Comment": "Parallel Example.",
  "StartAt": "LookupCustomerInfo",
  "States": {
    "LookupCustomerInfo": {
      "Type": "Parallel",
      "End": true,
      "Branches": [
        {
         "StartAt": "LookupAddress",
         "States": {
           "LookupAddress": {
             "Type": "Task",
             "Resource":
               "arn:aws:lambda:us-east-1:123456789012:function:AddressFinder",
             "End": true
           }
         }
       },
       {
         "StartAt": "LookupPhone",
         "States": {
           "LookupPhone": {
             "Type": "Task",
             "Resource":
               "arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder",
             "End": true
           }
         }
       }
      ]
    }
  }
}

マッピング

Map状態は、反復操作を並行して実行する(動的並列処理)。インラインモード(デフォルト)と分散モード(高並列向け。詳細は割愛)。インラインモードでは、同時反復は40回まで。

  • ItemProcessor (必須): 処理内容を定義
    • ProcessorConfig: モードの指定(デフォルト:インライン)
    • StartAt: 反復処理を行う各INPUTを最初にどの状態に渡すかを指定する
    • States: 反復処理を行うTask状態などを定義する
  • ItemsPath (オプション): 反復処理を行う各INPUTはMap状態の入力JSONのどこから取得するか指定
  • ItemSelector (オプション): これを挟むことで各反復処理に渡す前に各INPUTを加工できる
  • MaxConcurrency (オプション): 並列数の上限(デフォルト: 0=上限なし。可能な限り並列実行)
example_input.json
{
  "ship-date": "2016-03-14T01:59:00Z",
  "detail": {
    "delivery-partner": "UQS",
    "shipped": [
      { "prod": "R31", "dest-code": 9511, "quantity": 1344 },
      { "prod": "S39", "dest-code": 9511, "quantity": 40 },
      { "prod": "R40", "dest-code": 9511, "quantity": 1220 }
    ]
  }
}

上記入力が以下のMap状態に渡されるとき

example_map.json
"Validate-All": {
  "Type": "Map",
  "InputPath": "$.detail",
  "ItemsPath": "$.shipped",
  "MaxConcurrency": 0,
  "ResultPath": "$.detail.shipped",
  "ItemSelector": {
    "parcel.$": "$$.Map.Item.Value",
    "courier.$": "$.delivery-partner"
  },
  "ItemProcessor": {
    "StartAt": "Validate",
    "States": {
      "Validate": {
        "Type": "Task",
        "Resource": "arn:aws:lambda:us-east-1:123456789012:function:ship-val",
        "End": true
      }
    }
  },
  "End": true
}

起動した各Lambdaには、以下のように入力が渡される。

example_lambda_input.json
{
  "parcel": {
    "prod": "R31",
    "dest-code": 9511,
    "quantity": 1344
   },
   "courier": "UQS"
}

パス

何もせずに次の状態に渡す。デバッグに便利。
このPass状態をはさむことで、強制的に入力変換を行うこともできる。

  • Result (オプション): 次の状態にわたす内容を指定
example_pass.json
"No-op": {
  "Type": "Pass",
  "Result": {
    "x-datum": 0.381018,
    "y-datum": 622.2269926397355
  },
  "ResultPath": "$.coords",
  "End": true
}

をというPass状態を作ったとき、

example_input.json
{
  "georefOf": "Home"
}

という入力がくると

example_output.json
{
  "georefOf": "Home",
  "coords": {
    "x-datum": 0.381018,
    "y-datum": 622.2269926397355
  }
}

と出力される。

待機

sleep。何秒待機するかを指定することも、特定日時まで待機させることも可能。

example_wait.json
"wait_ten_seconds": {
  "Type": "Wait",
  "Seconds": 10,
  "Next": "NextState"
}

成功

ワークフローの正常終了地点。

example_succeed.json
"SuccessState": {
  "Type": "Succeed"
}

失敗

ワークフローの異常状態。Catchされない場合は障害としてマークされる。

  • Cause (オプション): エラーの原因を説明する文字列。運用・診断目的で指定。
  • Error (オプション): Retry または Catch フィールドを使用してエラー処理を実行する際に指定できるエラー名
example_fail.json
"FailState": {
  "Type": "Fail",
  "Cause": "Invalid response.",
  "Error": "ErrorA"
}

【基本】チュートリアル

下記に従い、CDKで作ってみる。

以下のワークフローができて、動かすことに成功!
image.png

ステートマシンには以下のようにちゃんとLambdaを実行できる権限が付与されてた。

  • State Machine
    • 以下ポリシーのIAMロールを割り当て
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Action": "lambda:InvokeFunction",
			"Resource": [
				"arn:aws:lambda:us-east-1:123456789012:function:StepStack-MyLambdaFunction67CCA873-aSAaLthmaS2m",
				"arn:aws:lambda:us-east-1:123456789012:function:StepStack-MyLambdaFunction67CCA873-aSAaLthmaS2m:*"
			],
			"Effect": "Allow"
		}
	]
}

【応用】アルバイト月収計算ステートマシン

チュートリアルを踏まえて、何か作ってみようと思い、以下のようなアルバイト月収計算ステートマシンを作ってみた。本プログラムはあくまで勉強用で、実利用とはかけ離れているのご容赦ください。また、できるだけ冒頭で説明した様々な状態を使うようにしています。

仕様・処理内容

前提:

  • 前月以前の月収は従業員ごとにDB管理されている
  • 従業員ごとの単価(時給)はDB管理されている
  • 今月分の月収は、その月の勤務時間とその従業員の時給を掛けて算出する

下記いずれかのINPUTを与える。

{
  "partTimeWorkerId": "123456",
  "yearMonth": "202409"
}

OR

{
  "partTimeWorkerId": "123456",
  "workHours": [
    {
      "start": "20241010 10:00",
      "end": "20241010 12:00"
    },
    {
      "start": "20241010 16:00",
      "end": "20241010 17:00"
    }
  ]
}
  • 前月以前の月収を参照したい場合、yearMonthに年月を指定して月収を出力する
  • 今月以降の月収を参照したい場合、workHoursに勤務予定を指定して月収を算出する

実装

前述した通り、できるだけ多様な状態を扱うようにしたかったので、以下のような処理フローとした。

image.png

Step Function および Lambda のCDKは以下のようになる。
Lambda関数の内容含めた全体はGitを参照してください。

step-stack.ts
import * as cdk from 'aws-cdk-lib'
import * as lambda from 'aws-cdk-lib/aws-lambda'
import * as sfn from 'aws-cdk-lib/aws-stepfunctions'
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks'
import { Construct } from 'constructs'

export class StepStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props)

    // 月収取得Lambda
    const fetchMonthlyEarningsFunction = new lambda.Function(this, 'fetchMonthlyEarnings', {
      code: lambda.Code.fromAsset('lambda'),
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: 'fetchMonthlyEarnings.handler',
      timeout: cdk.Duration.seconds(3)
    })

    // 単価取得Lambda
    const fetchHourlyPayFunction = new lambda.Function(this, 'fetchHourlyPay', {
      code: lambda.Code.fromAsset('lambda'),
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: 'fetchHourlyPay.handler',
      timeout: cdk.Duration.seconds(3)
    })

    // 勤務時間計算Lambda
    const calculateWorkHoursFunction = new lambda.Function(this, 'calculateWorkHours', {
      code: lambda.Code.fromAsset('lambda'),
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: 'calculateWorkHours.handler',
      timeout: cdk.Duration.seconds(3)
    })

    // 勤務時間と単価から給料計算するLambda
    const calculateEarningsFunction = new lambda.Function(this, 'calculateMonthlyEarnings', {
      code: lambda.Code.fromAsset('lambda'),
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: 'calculateMonthlyEarnings.handler',
      timeout: cdk.Duration.seconds(3)
    })

    // 年月を指定して過去の給料を取得する処理
    // 月収取得Lambda呼び出し
    const fetchMonthlyEarningsTask = new tasks.LambdaInvoke(this, 'fetchMonthlyEarningsTask', {
      lambdaFunction: fetchMonthlyEarningsFunction,
      payloadResponseOnly: true // レスポンスをPayloadのみにする
    })
      .addCatch(
        new sfn.Fail(this, 'fetchMonthlyEarningsFail', { cause: 'fetchMonthlyEarnings Error' })
      )
      .addRetry({
        errors: ['States.ALL'], // すべてのエラーが対象
        maxAttempts: 1
      })
      .next(new sfn.Succeed(this, 'fetchMonthlyEarningsSucceed'))

    // 勤務時間から給料を計算する処理
    // 単価取得処理と勤務時間計算処理を並列呼び出し
    const calculateEarningsByWorkHours = new sfn.Parallel(this, 'parallelProcess')
    calculateEarningsByWorkHours.branch(
      new tasks.LambdaInvoke(this, 'fetchHourlyPayTask', {
        lambdaFunction: fetchHourlyPayFunction
      })
    )
    calculateEarningsByWorkHours.branch(
      // 勤務時間は時間帯を複数指定可能とするため、Mapにより指定した数だけ集計する
      new sfn.Map(this, 'mapProcess', {
        itemsPath: sfn.JsonPath.stringAt('$.workHours'),
        maxConcurrency: 1,
        resultPath: '$.mapOutput'
      }).itemProcessor(
        new tasks.LambdaInvoke(this, 'calculateWorkHoursTask', {
          lambdaFunction: calculateWorkHoursFunction
        })
      )
    )
    calculateEarningsByWorkHours.addCatch(
      new sfn.Fail(this, 'parallelProcessFail', { cause: 'Error in parallel process' })
    )
    calculateEarningsByWorkHours
      .next(
        new tasks.LambdaInvoke(this, 'calculateMonthlyEarningsTask', {
          lambdaFunction: calculateEarningsFunction,
          payloadResponseOnly: true // レスポンスをPayloadのみにする
        }).addCatch(
          new sfn.Fail(this, 'calculateEarningsByWorkHoursFail', {
            cause: 'calculateEarning Error'
          })
        )
      )
      .next(new sfn.Succeed(this, 'calculateEarningsByWorkHoursSucceed'))

    // アルバイト給料ステートマシン
    new sfn.StateMachine(this, 'PartTimeWorkerEarningsStateMachine', {
      definitionBody: sfn.DefinitionBody.fromChainable(
        new sfn.Choice(this, 'isYearMonthSpecified')
          // 年月が指定されている場合、給料情報をDB等から取得する
          .when(sfn.Condition.isPresent('$.yearMonth'), fetchMonthlyEarningsTask)
          // 年月が指定されていない場合、勤務時間の指定をもとに給料を計算する
          .otherwise(calculateEarningsByWorkHours)
      )
    })
  }
}

Step Functionsでは特別なエラー名というのがある。リトライポリシーやフォールバックポリシー記載時には注意が必要。

  • States.ALL: ワイルドカード。すべてのエラーに相当
  • States.Runtime: どのエラーも対象にしたくないとき

上記は一部なので、詳細は以下を参照ください。
https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_stepfunctions-readme.html#error-names

動かしてみる

例外処理やリトライ処理含めて、ちゃんと動いた!

■前月以前の月収を取得するとき
image.png

INPUT

{
  "partTimeWorkerId": "123456",
  "yearMonth": "202409"
}

OUTPUT

{
  "statusCode": 200,
  "body": "{\"montlyEarnings\": 40000}"
}

■今月以降の月収を取得するとき
image.png

INPUT

{
  "partTimeWorkerId": "123456",
  "workHours": [
    {
      "start": "20241010 10:00",
      "end": "20241010 12:00"
    },
    {
      "start": "20241010 16:00",
      "end": "20241010 17:00"
    }
  ]
}

OUTPUT

{
  "statusCode": 200,
  "body": "{\"earnings\": 3000.0}"
}

さいごに

グラフビューがみやすい!
States.Errorなどはハマりポイントだと思った。

参考文献

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?