11
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Step Functions: コールバックパターンを使ったサーバレスな並列分散処理①

Posted at

はじめに

ひさしく触っていなかったAWSを触ることになったので、せっかくだからサーバレスに並列分散処理が組めないか検討してみたというお話です。
構築まで含めると長くなるので、今回は構成の検討と実際の動作だけ紹介して、次回以降で構築をしていきます。

サーバレスな並列分散処理

Lambdaを使ったサーバレスなシステムでは、実行時間等の制約からひとつの関数ですべての処理を終わらせるのでなく、細かいプロセスに分けた複数の関数を組み合わせて実行することが多くなります。
img01-001.png
処理の中には並列化やホストを分散させても問題ないものもあるので、ワークフローに関数を複数同時起動させて分散処理を行い、単一のプロセスに収束させる仕組みを組み込んで処理時間短縮を行うことができます。
ただ、サーバレスの環境において単一のプロセスに収束させるのは意外と組み込むのが大変な処理になります。
分散処理は、S3 Putのようなトリガーで対処するなり、単に複数の関数を起動するスクリプトを前段プロセスの関数に記述するなりでできますが、プロセスの収束は分散したプロセスがすべて終了したうえで、後段プロセスを確実に一回だけ実行させられることが求められます。
img01-002.png
実装として真っ先に考え付くのは、分散したプロセスがDynamoDB等にログを書き出し、終了時に他のプロセスのログを確認して、最後に終了したプロセスが後段プロセスをトリガーするという構成です。
非常に稀なケースではあるものの、最後に終了するプロセスが2つ以上同時になることも考えられ、プロセスが同時に後段プロセスをトリガーしてしまう可能性や実装によっては後段プロセスをトリガーされない可能性があります。
img01-003.png
確実に一回だけ後段プロセスを実行させたければ、分散させたプロセスとは別の関数から定期的に状態をポーリングし、すべてのプロセスが終了したことを確認して後段プロセスをトリガーするという方法があります。
ただ、ポーリング間隔によっては処理を行っていない無駄な時間が発生し、なによりイベント駆動型のサービスを使いながらポーリングに頼るのはあまりナウい感じがしません。
img01-004.png

Step FunctionのParallelステート

Step Functionには、ワークフローの途中に並列処理を挿入する「Parallel」ステートがあります。JSON形式で並列処理のワークフローを記述して簡単に並列処理を実装できます。
img01-005.png
ただ、このParallelステートは、並列処理部分を決め打ちで設計しなければならず、動的に並列数を決定できません。
数十の並列実行を記述しようものなら、ワークフロー定義のJSONもVisual Workflowも長大で可読性に乏しくなるといった問題があります。
img01-006.png

Step Functionsのコールバックパターン

Step FunctionのTaskステートでは、Lambda、ECS/Fargate、SNS、SQSにおいてResourceに「.waitForTaskToken」をつけることでコールバック待機状態にすることができます。

{  
   "StartAt":"Run Workers",
   "States":{  
      "Run Workers":{  
         "Type":"Task",
         "Resource":"arn:aws:states:::lambda:invoke.waitForTaskToken",
         "Parameters":{  
            "FunctionName":"run-workers",
            "Payload":{  
               "Payload.$":"$",
               "TaskToken.$":"$$.Task.Token"
            }
         },
         "End":true
      }
   }
}

この待機状態は、ステートマシンで発行した「Task Token」を外部プロセスから「SendTaskSuccess」のAPIで受け取ることで解除することができ、複数のTask Tokenを受け取ったとしても、最初の1回以外は棄却されるようになっています。
これを応用すれば、分散プロセスから確実に単一プロセスに復帰させることができそうです。
img01-007.png

コールバックパターンを使った並列分散処理

コールバックパターンを使って並列分散処理を構成してみました。
構成としては、メインのステートマシンと分散処理のワーカーにあたるサブのステートマシンを用意して、メインのステートマシンのLambdaからサブのステートマシンを任意の数だけ起動して並列分散処理を実行します。
img01-008.png

実際に動かしてみた

上記の構成を実際に組んで実行してみました。
以下は、構成図の「create-task」で0~2のランダムな数値(float)を100個キューに投げて、10並列で起動させたサブステートマシンの「executor」で順次メッセージを受け取って数値の秒数Sleepするという簡単な実行例です。
メインステートマシンを実行するとサブステートマシンが10個実行され、順次成功してすべて完了するとメインステートマシンの「Start Workers」も「進行中」から「成功」に変化することが確認できました。
img01-009.gif

参考

11
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?