LoginSignup
5
4

More than 1 year has passed since last update.

step functions Mapを使用して動的に並列処理してみた

Last updated at Posted at 2021-08-20

DMMデータインフラ部に所属しているyuuaです
DMMデータインフラ部では、一部プロダクトにてaws step functions を活用してプロダクト開発を行っております。

背景

複数のサイトを運営しており、その際のデータを収集するため一部APIリクエストを使用しています
その際に正しい値がPOSTされているかを毎回手動でチェックしていたため
こちらをStep functionsを使用し並列に自動で行うための仕組みを作成しました。

構成

今回はStep functionsのMapがどういうものなのかをわかりやすくするため
数値のListを生成し偶数か奇数かを判定する簡易的なLambda関数を実装し、step functionsで並列処理します

architecture.png

step functions map概要

Map 状態 ("Type": "Map") を使用して、入力配列の要素ごとに一連のステップを実行できます。   
Parallel 状態は同じ入力を使用して複数のステップのブランチを実行しますが、
Map 状態は状態入力の配列の複数のエントリに対して同じステップを実行します。 

こちらのaws公式ドキュメントの説明通り、入力配列の要素ごとにステップを実行することができ、
Parallelは同じデータに対する実行ですが、Mapは動的な個々(異なる)の値に対してステップを実行することができます。

aws resources作成

terraformで構成を管理書くことが多いのでterraformで作成します

step functionsの内容を定義するjsonファイル

  • Statesで実行するlambda関数を定義しそれを実行します
  • Nextでは次のステップで実行するtask名を定義します
  • Mapの定義自体は通常のTaskと同じように定義します
  • Mapのパラメータに関してはlambdaのeventで渡されます
  • concurrencyはMapの最大並列実行数です、Lambdaを指定する場合は関数の最大実行数を考慮して設定してください
  • Retry/Catchに関してはMap内の処理が一つでも失敗した場合ステートマシン自体が失敗になり処理が中止されてしまうため定義しています。(通常ではここでエラー内容の通知を行っていますが、今回はエラー処理自体をパスします)
  • 当然ですがRetryに関してはMap内のみRetryされます
definition.json
{
    "Comment": "state machine",
    "StartAt": "MakePatternArray",
    "States": {
        "MakePatternArray": {
            "Type": "Task",
            "Resource": "${list_lambda_arn}",
            "Next": "ExecuteMap"
        },
        "ExecuteMap": {
            "Type": "Map",
            "MaxConcurrency": ${concurrency},
            "Iterator": {
                "StartAt": "ExecuteMaps",
                "States": {
                    "ExecuteMaps": {
                        "Type": "Task",
                        "Resource": "${map_lambda_arn}",
                        "Retry": [ {
                            "ErrorEquals": [ "States.ALL" ],
                            "IntervalSeconds": 2,
                            "BackoffRate": 2.0,
                            "MaxAttempts": 3
                        }],
                        "Catch": [{
                            "ErrorEquals": ["States.ALL"],
                            "Next": "MapException"
                        }],
                        "End": true
                    },
                    "MapException": {
                       "Type": "Pass",
                       "Result": "Error",
                       "End": true
                    }
                }
            },
            "End": true
        }
    }
}

  • lambdaやroleなどは別途作成してある想定
stpfnc.tf
data "template_file" "definition_json" {
  template = file("definition.json")
  vars = {
    list_lambda_arn      = var.list_lambda_arn
    map_lambda_arn       = var.map_lambda_arn
    concurrency          = 10
  }
}

resource "aws_sfn_state_machine" "sfn_state_machine" {
  name       = "${var.step_function_name}-state-machine"
  role_arn   = aws_iam_role.state_machine_role.arn
  definition = data.template_file.definition_json.rendered
}

Lambda関数の定義

数値の配列を返すlambda

list.js
exports.handler = async () => {
    const response = [291,394,290,111,2,8,10]
    return response;
};

上では数値の配列を生成していますがObject自体を返すこともできます。
下記のようにObjectを生成してlambdaのeventをMap側で受け取ることも可能
ただし、渡せるデータ容量には制限があるので注意してください

object.js
exports.handler = async () => {
    const response = [
     {
       'aaa': 123
       `bbb`: 333
     },
     {
       'aaa': 1231
       `bbb`: 3333
     }
    ]
    return response;
};

判定するLambda

整数値以外は許容していない、非常に適当な関数ですが...

map.js
exports.handler = async (event) => {
    if (event % 2 == 0) {
      return '偶数'
    }
    return '奇数'
};

今回はstep functionsのDSL内にて Catch を使用してエラーが発生したものは Passするようになっているため
一部が失敗してもstate machine自体は成功します

state machineを起動する

起動方法はCloudWatch eventやCliなどいろいろありますが、ここではコンソールで起動/実行します

作成したstate machine

statemachine.png

作成したstate machineから 実行の開始 を選択肢(パラメータが必要であれば入力/json)すると起動されます。
起動後の画面は上記のグラフインスペクターとLogなどが表示されます。

成功時

成功時はstate machine内のtaskが緑色になります
success.png

taskの選択もしくは実行イベント履歴からOutputされたデータの確認も可能です
log.png

エラー発生時

失敗時は下記の画像のように失敗したtaskが赤くなり、Catch を定義していない場合は
state machine全体が失敗になります。

error.png

失敗したtaskを選択もしくは
実行イベント履歴にあるLogからエラー内容を確認することが可能です

まとめ

step functionsのMapを使用することで動的に生成した異なるデータに対して簡単に並列にアプローチをすることができます。一連のworkflowの中で、動的データを並列に処理することができれば、処理の高速化などといったメリットになると思います。
step functionsには他にもいろいろなStatesや様々なawsサービスとの連携も簡単に行うことができ非常に便利なので
workflowエンジンをお考えの際は有力な選択肢としてぜひ一度使ってみてはいかがでしょうか。

データインフラ部では、AWSでのビッグデータ基盤関連の運用を行っており、
Step Functions なども活用したプロダクト開発を行っています。
中途採用などもおこなっておりますので、興味のある方は、一度弊社HPなどから
カジュアル面談など、是非ご応募ください。

5
4
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4