3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

S3オブジェクトを動的抽出してLambda並列処理するStep Functions Distributed Mapを実装してみた

Last updated at Posted at 2023-01-25

こんにちは。ユビキタス言語1年生の人です。

先日、S3バケットの特定prefixを持つ(同じフォルダ配下にある)オブジェクトをそれぞれLambdaで処理したい・・・と有識者の人に相談したら、テンション高めで「Step Functions Distributed Mapが適しています!」とおススメされたので早速使ってみました。

使ってから知ったのですが、AWSのグローバルイベントre:Invent 2022で発表されたばかりのアップデート機能でした。それまで、Step Functionsを使ったことも触ったこともなかったのですが、ものすごく便利だったのでご紹介します。

Step Functionsの初心者が知ったこと

Step FunctionsはWorkflow StudioというGUIの視覚的にドラッグ&ドロップで作れるワークフローのモードと、jsonで定義されたAmazon States Language (ASL) というコードでワークフローを作るモードがあります。最初にフロー全体の骨組みを作るときはWorkflow Studioが作りやすいですが、ちょこっと更新するだけの場合はASLを修正する方が楽でした。

例:Lambda関数を1回実行するステートマシン

Workflow Studio

stepfunctions_graph1.png

Amazon States Language (ASL)

{
  "Comment": "A description of my state machine",
  "StartAt": "Test Lambda",
  "States": {
    "Test Lambda": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:test-lambda:$LATEST"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "End": true
    }
  }
}

本題の「特定のprefixを持つ(同じフォルダ配下にある)複数オブジェクトを入力にLambda並列実行」するステートマシン

前提

  • S3バケットへのオブジェクト作成(Object Created)のイベントを元にStep Functionsのステートマシンを起動する想定です。

概要

  • Pass 入力情報の変換を行います
    • Parameters 組み込み関数のStringSplitとArrayGetItemを使い、オブジェクトのKeyから '/'で区切ってフォルダ名を取得します(prefixという変数にセット)
  • Map S3オブジェクトを取得してLambdaを並列実行します
    • ItemReader バケット名(\$.myinput.bucket)、プレフィックス(\$.myinput.prefix)に合致するオブジェクト情報を取得
    • ItemSelector Lambdaに各オブジェクトの情報を動的に渡す
    • Test Lambda 各オブジェクトの情報をinputとした、Lambda関数(Test Lambda)を並列実行。入力フォーマットはS3イベント通知のフォーマットと同じにしています。

Workflow Studio

stepfunctions_graph2.png

Amazon States Language (ASL)

{
  "Comment": "A description of my state machine",
  "StartAt": "Pass",
  "States": {
    "Pass": {
      "Type": "Pass",
      "Next": "Map",
      "Parameters": {
        "bucket.$": "$.detail.bucket.name",
        "prefix.$": "States.ArrayGetItem(States.StringSplit($.detail.object.key,'/'),0)"
      },
      "ResultPath": "$.myinput"
    },
    "Map": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "DISTRIBUTED",
          "ExecutionType": "STANDARD"
        },
        "StartAt": "Test Lambda",
        "States": {
          "Test Lambda": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "OutputPath": "$.Payload",
            "Parameters": {
              "Payload.$": "$",
              "FunctionName": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:test-lambda"
            },
            "Retry": [
              {
                "ErrorEquals": [
                  "Lambda.ServiceException",
                  "Lambda.AWSLambdaException",
                  "Lambda.SdkClientException",
                  "Lambda.TooManyRequestsException"
                ],
                "IntervalSeconds": 2,
                "MaxAttempts": 6,
                "BackoffRate": 2
              }
            ],
            "End": true
          }
        }
      },
      "End": true,
      "Label": "Map",
      "MaxConcurrency": 1000,
      "ItemReader": {
        "Resource": "arn:aws:states:::s3:listObjectsV2",
        "Parameters": {
          "Bucket.$": "$.myinput.bucket",
          "Prefix.$": "$.myinput.prefix"
        }
      },
      "ItemSelector": {
        "objectkey.$": "$$.Map.Item.Value.Key",
        "Records": [
          {
            "s3": {
              "bucket": {
                "name.$": "$.myinput.bucket"
              },
              "object": {
                "key.$": "$$.Map.Item.Value.Key"
              }
            }
          }
        ]
      }
    }
  }
}

実際に試せるテスト用jsonも記載しておきます。

{
  "detail": {
    "bucket": {
      "name": "<バケット名>"
    },
    "object": {
      "key": "<フォルダ>/<ファイル名>"
    }
  }
}

最後に

並列処理のテストもStep Functionsを使うとデバッグしやすく、Map内でのエラーもキャッチもでき、非常に便利でした。これからもStep Functionsを勉強して使いこなしていきたいと思います!

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?