8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Serverless Framework を用いて Step Functions Distributed Map による Lambda の大規模並列処理を実現する

Last updated at Posted at 2022-12-13

Step Functions Distributed Map が発表されましたね!

先日の AWS re:Invent にて、Step Functions Distributed Map が発表(GA)されました!
(Distributed Map の公式リリースは以下。日本語でリリース内容を知りたい方はDevelopersIOによる速報を参照)

上のリンクの通り、公式リリースでは手動でAWSコンソール上から実現する手順が記載されています。
しかしながら、実際に商用で利用する場合には、AWS CDK や Serverless Framework などを用いて管理したいところです。

そこで本記事では、Serverless Framework を用いて、Step Functions Distributed Map を構成することを最終目標に、そもそもMapステートってなに?というところから順に書いていきます。

Step Functions の Mapステートとは

Step Functions とは

AWS Step Functions は、AWS の提供するワークフローサービスで、データ処理やマイクロサービスのオーケストレーションなどに活用されます(参考)。

参考に、AWS公式が提示する事例の画像を見てみます(以下)。AWSの様々なサービスを順序よく実行できることがなんとなくイメージできるかなと思います。
step-functions-example.png

Map ステートとは何か ・ 何が嬉しいか

ここで、こういったワークフローの中に 「1000個の要素を持つ配列のそれぞれについて処理を行うタスク」 があると考えてみましょう。
仮にこの処理が1つの要素あたり10秒かかるとすると、3時間程度 かかってしまうことになります(10秒 x 1,000要素 = 10,000秒)。

この部分を40並列で実行できれば、この3時間ほどかかっていた処理を 約4分 に短縮することができます(10,000秒 / 40 = 250秒)。
それを簡単に実現できるのが Map ステートという設定になります。

従来の Mapステートの問題点

従来の Mapステートでは、この 並列処理実行数に40程度の制限 がありました。
そのため、40並列以上を達成したい場合、並列処理をネストしてどうにかしろと公式がアナウンスしていました。
(英語版からは既に削除されている記述のため、日本語版ドキュメントをスクショしたものを以下に添付します。自動翻訳なので日本語があれですが)
step-functions-map-limit.png

ちなみにですが、英語版ドキュメントでは従来バージョンのことを Inlineモード と呼んでおり、将来的には日本語ドキュメントもその呼称になるものと思われます。

Distributed Mapの何がすごいか

もうおおよそ予想がついてることと思われますが、Distributed Map を用いると、最大で 10,000並列 まで可能になります。
すごい!!!!!!!

並列実行させるサービスでも並列実行可能な上限が変わります。Lambdaの場合はデフォルトの同時実行可能数が1000のため、上限の引き上げを行わなければ、1000より大きい値を設定するとエラーとなります

Serverless Framework で実現する方法

それでは、Distributed Map のすごさがわかったところで、Serverless Framework と Lambda を使って実際に動かす方法をステップ別に説明していきます(エラーハンドリングとかは今回は無視)。

※ 最近使い始めたTypeScriptで書いていきます(が、動けばいいだけのコードではあるのでご容赦を)
※ AWS CLI や Serverless Frameworkの細かな設定や解説まで書くと長くなりすぎるので要点だけ書いていきます
※ バージョンはこれで試してます

$ sls --version
Running "serverless" from node_modules
Framework Core: 3.25.1
Plugin: 6.2.2
SDK: 4.3.2

Step1: Lambda 関数の作成

とりあえず Hello, world してくれる関数を用意します。

handler.ts
export const hello = (_event, _context, callback) => {
  callback(null, { message: "hello, world" });
};

Serverless Frameworkでは、以下のように functions: の下に各 Lambda の定義を書いていきます。
(複数 Lambda を使用する場合は、 hello と同階層に複数設定が並ぶ形ですね)

serverless.yml(抜粋)
functions:
  hello:
    handler: handler.hello

諸々設定できたら、以下でAWS上にデプロイできます。

$ sls deploy

AWSコンソールから手動で Lambda をテスト実行するとちゃんと動いてくれます。
serverless-lambda-sample.png

Step2: Step Functions workflow の作成

それでは、同じ要領で複数の Lambda を作っていき、それを Step Functions workflow でつなげてみます。

今回は、以下の2つの処理を行うワークフローを作ることとします。

  1. 0 ~ 9 までの10個の要素を持つ配列を作成
  2. 1で作成した配列の各要素を2倍にした配列を作成

まずは、Step1と同様にして2つの Lambda を作ります。

handler.ts
/**
 * 0 ~ 9 までの10個の数値を持つ配列を作成
 */
export const firstFunction = (_event, _context, callback) => {
  const values = [...Array(10)].map((_, i) => i);
  callback(null, values);
};

/**
 * 受け取った配列の各要素を2倍した配列を作成
 */
export const secondFunction = async (event, _context, callback) => {
  const result = event.map((x) => x * 2);
  callback(null, { result: result });
};
serverless.yml(抜粋)
functions:
  first-function:
    handler: handler.firstFunction
  second-function:
    handler: handler.secondFunction

これを Step Functions で取り扱うには、serverless-step-functions というプラグインを導入する必要がありますので、$ yarn add --dev serverless-step-functions などとしておきましょう(このプラグインのGitHubレポジトリ)。

あとは、以下のように Serverless Framework の設定ファイルに記述し、$sls deploy すればOKです(この後に紹介する図の雰囲気で設定内容は理解できるので詳細は割愛)。

serverless.yml(抜粋)
plugins:
  - serverless-step-functions

stepFunctions:
  stateMachines:
    sampleStateMachine:
      name: sampleStateMachine
      definition:
        StartAt: FirstState
        States:
          FirstState:
            Type: Task
            Resource:
              Fn::GetAtt: [first-function, Arn]
            Next: SecondState
          SecondState:
            Type: Task
            Resource:
              Fn::GetAtt: [second-function, Arn]
            End: true

AWSコンソールから該当のステートマシンを見にいくと、このような図を見ることができます。順序よく実行されそうな雰囲気が漂っていますね。それぞれの State が各 Lambda に対応しています。
step2-workflow.png

実際に実行してみると、以下のように実行状態を確認できます(今回はすぐに完了してしまったので全て緑)
step2-workflow-result.png

復習ですが、今回はこのようなワークフローでした。

  1. 0 ~ 9 までの10個の要素を持つ配列を作成
  2. 1で作成した配列の各要素を2倍にした配列を作成

最終的な実行結果を見てみると、確かにうまくいっていることがわかります。
step2-workflow-result-output.png

ちなみにですが、各ステートの入出力を確認することもできます。
FirstState の返り値が次の SecondState に Step Functions により渡されているのが分かります)

step2-workflow-immediate-input-data.png

Step3: Mapステート (Inline モード)の適用

次に並列処理を実装してみましょう。並列処理の威力をわかりやすくするために10秒かかる処理を無理やり入れてみます。

  1. 0 ~ 9 までの10個の要素を持つ配列を作成
  2. (並列処理) 各要素の値を 10秒待った後 2倍する
  3. 2で実行された結果を表示

まずは例によって各関数を実装します。

handler.ts
/**
 * 0 ~ 9 までの10個の数値を持つ配列を作成
 */
export const firstFunction = (_event, _context, callback) => {
  const values = [...Array(10)].map((_, i) => i);
  callback(null, values);
};

/**
 * 受け取った値を10秒待った後に2倍する
 */
export const secondFunction = async (event, _context, callback) => {
  const sleep = async (ms: number) => {
    return new Promise((resolve) => setTimeout(resolve, ms));
  };
  await sleep(10000);

  callback(null, { secondFunctionResult: event * 2 });
};

/**
 * 並列処理された結果をどのように受け取るかを確認するための関数
 */
export const finalFunction = async (event, _context, callback) => {
  callback(null, { finalResult: event });
};

※ serverless framework の Lambda 関数の処理時間のデフォルトは6秒であり、10秒間待つ処理を行えないため、今回は適当にタイムアウトの時間を伸ばしておきます

serverless.yml(抜粋)
functions:
  first-function:
    handler: handler.firstFunction
  second-function:
    handler: handler.secondFunction
    timeout: 30
  final-function:
    handler: handler.finalFunction

設定は以下のように行います。ポイントは以下の2つですかね。

  • 並列処理を行いたいステートについて Type: Mapと設定
    • その中に Iterator という項目を作り、並列処理させたい Lambda について記載
  • MaxConcurrency で最大並列実行数を指定
serverless.yml(抜粋)
stepFunctions:
  stateMachines:
    sampleStateMachine:
      name: sampleStateMachine
      definition:
        StartAt: FirstState
        States:
          FirstState:
            Type: Task
            Resource:
              Fn::GetAtt: [first-function, Arn]
            Next: SecondState
          SecondState:
            Type: Map
            MaxConcurrency: 5
            Iterator:
              StartAt: MyMapTask
              States:
                MyMapTask:
                  Type: Task
                  Resource:
                    Fn::GetAtt: [second-function, Arn]
                  End: true
            Next: FinalState
          FinalState:
            Type: Task
            Resource:
              Fn::GetAtt: [final-function, Arn]
            End: true

デプロイしてみると以下の図を見ることができます。並列処理してくれそうな雰囲気が漂っていますね。
step3-workflow.png

実行し、結果を見てみましょう。

まずは各処理にかかった時間から見ていきます。
10秒かかるタスクが10個あるので、並列に実行した場合に1分40秒はかかるタスクが、5並列で実行したことで、合計20秒程度で完了したことが分かります。
また、最初の5個のタスクが完了するまで待ってから 残りの5つについても処理を行っている様子がよく分かるかと思います。この辺りも勝手に調整してくれるのは楽ですね。

step3-workflow-time.png

次に肝心の出力結果ですが、こちらもうまく動いてくれたことが分かります。
また、下が見切れていて少し分かりづらいですが、 各並列処理の返り値を配列にまとめた上で、次のステートに渡される ことが確認できますね。
step3-workflow-time.png

試しに1000個のタスクを用意し、MaxConcurrencyを500に設定してみたのですが、50並列相当の時間がかかりました。ドキュメントの通り、40を超えるような設定はあまり意味ないですね。

Step4: Distributed Map の適用

最後にメインの Distributed Map の設定方法を見ていきましょう。この記事を執筆した2022年12月8日現在では、日本語ドキュメントには項目自体が存在しないため、英語ドキュメントを確認する必要があります(東京リージョン対応してるんだから、項目くらい追加しておいてくれると嬉しいなと思わないではない)。

そのドキュメントはこちらですね。

これにより、ItemProcessor という項目の中で、ModeDistributed に、ExecutionType を適当に設定することで使用可能ということがわかります。
step3-workflow-time.png

Inline モードのMapステート(従来型)のドキュメントには記載があるのですが、 Iterator というオプションが非推奨となっており、その代わりとして新たに作られたもののようです。
(ちなみに、こちらを使いたい場合は ModeInline を設定)

step3-workflow-time.png

というわけで、Iterator の代わりに ItemProcessor と各種項目を設定してみたくなるのですが、これをやるとまだプラグイン側が対応していないために $ sls deploy 実行時にエラーがでてしまいます。

しかし!ここで諦めてはいけません。

試した結果 Iterator はそのままに、Mode: "Distributed"ExecutionType を設定することで何故か動くことが分かりました。

つまりは、基本的には、このように3行追加するだけで、新機能の恩恵に預かることができるのです!
(と言いつつ、実際には states:StartExecution をこのステートマシンに追加で許可しておかないと動かないので注意。もちろん lambda:InvokeFunction も忘れずに)

serverless.yml(抜粋)
         SecondState:
            Type: Map
            MaxConcurrency: 500
            Iterator:
              ProcessorConfig:           ← 追加
                Mode: DISTRIBUTED        ← 追加
                ExecutionType: STANDARD  ← 追加
              StartAt: MyMapTask

ということで、最後にこのワークフローを試してみましょう。
並列数は 500 で試してみます!

  1. 0 ~ 999 までの 1000個 の要素を持つ配列を作成
  2. (並列処理) 各要素の値を 10秒待った後 2倍する
  3. 2で実行された結果を表示

処理中のスクショがこちらです。500個動いてますねー

step3-workflow-time.png

処理時間も、並列処理でなければ3時間程度かかるところが、40秒程度で完了しています。すばらしい!
step3-workflow-time.png

1000個のタスクを実行する場合、20sec ~ 30sec 程度のオーバーヘッドが発生するようです。そのため、1000タスク × 10sec / 500並列 = 20sec という想定よりは時間がかかります

終わりに

AWSからのちょっと早い素敵なクリスマスプレゼントでしたね。
これからも良いサーバレス & 並列処理ライフを〜

8
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?