プロローグ ~こんなことをやりたかった~
業務で実装しているシステムで、定期的にこんな処理をする必要がありました。
- DynamoDBからデバイスのIDの一覧を取得する
- 各IDについて、DynamoDBにアクセスして生データを取得し、サマライズして別のテーブルに格納
こんなことをLambdaにやらせてたのですが、一つのLambdaファンクションに丸投げしてたのが災いして、デバイスが増えてきたりすると非常に辛い感じになってきました。
ですので、これを機に今まで真面目に触らなかったStepFunctionsに手を出して見ました。
ちなみにまだ実験がてらちょっといじってみたぐらいですので、記事中のソースソースコードはかなりしょぼいです。
出来上がりはこちら
Step Functionsって何?
って方のために手短に説明しますと、 Lambda同士の連携をいい感じに管理するツール と思えばほぼ間違いないです。
これはデフォルトで用意されているブループリントの図ですが、視覚的にワークフローを把握しながらLambdaファンクションを組み立てることができます。
また、あるLambdaファンクションの結果に応じて次の処理を切り替えるといったことも簡単にできるようになります。
実装編
大まかな流れ
今回はあくまでStepFunctionsの使用感を知るための実験が主目的なので、処理を簡略化します。
まずは以下のLambdaファンクションを用意します。
- LambdaA: 処理対象のIDの一覧を取得(実験なのでIDは適当な文字列をハードコーディング)
- LambdaB: 配列で受け取ったID一つ一つについて、LambdaCを非同期で並列にinvokeする
- LambdaC: 受け取ったIDをコンソールに出力
LambdaBからLambdaCへの受け渡しもStepFunctionsで行いたいところですが、同一のLambdaを不特定多数並列に立ち上げるのは辛そうなので、ここはLambda内でinvokeします。
よって処理の流れは以下のようになります。
- LambdaAでIDの一覧を取得(実験なのでIDは適当な文字列をハードコーディング)
- 特に意味はないが10秒ほどスリープする
- LambdaBはLambdaAから受け取ったIDの一覧を使い、一つのIDに対して一つのLambdaCを非同期でinvokeする
- LambdaCはLambdaBから受け取ったIDをコンソールに出力。処理が並列化されていることをわかりやすくするため、2秒ほどスリープしてからLambdaBに結果を返す
- 全てのLambdaCの処理が終わったらLambdaBも終了する
うーん、「Lambda」がゲシュタルト崩壊しそうですね
ではこれらをServerless Frameworkで実装しようかと思います
ここで注意事項
StepFunctionsで定義した一連の処理の流れをStateMachineと言いますが、 State Machineは一度作成すると編集できません。変更したいなら都度削除して作り直さなければいけません。
幸いにして2017年2月にCloudFormationがStepFunctionsをサポートしましたので、これを利用しましょう。
AWSコンソールのGUIで毎回入力し直すよりは楽かと思います。
各種設定ファイル作成
ではServerlessで実装するための設定ファイルを書いていきましょう。
serverless.yml
まずはおなじみのserverless.ymlです
service: step-test
provider:
name: aws
runtime: nodejs6.10
# you can overwrite defaults here
stage: dev
region: ap-northeast-1
memorySize: 256
timeOut: 30
iamRoleStatements:
- Effect: "Allow"
Action:
- "lambda:InvokeFunction"
Resource:
- arn:aws:lambda:${self:provider.region}:${self:custom.config.accountId}:function:${self:service}-${self:custom.config.stage}-parallel
custom:
config:
accountId: "your Account ID" # 自分のAWSアカウントのID
stage: ${opt:stage, self:provider.stage}
functions:
# LambdaA
first:
handler: handler.first
# LambdaB
second:
handler: handler.second
environment:
TARGET_LAMBDA_ARN: ${self:service}-${self:custom.config.stage}-parallel
# LambdaC
parallel:
handler: handler.parallel
resources: ${file(./resources/state_machine.yml)}
できるだけ使い回せるように色々と変数を使ってます。
変数の基本的な使い方はここを見ていただくとして、いくつかポイントになるところを解説します。
- ${file(./resources/state_machine.yml)}
- 別のファイルから読み込みます。これの中身はCloudFormationテンプレートです(後述)
- stage: ${opt:stage, self:provider.stage}
- コマンドラインオプションで --stageが与えられればそれを、なければprovidor.stageの値(この場合は"dev")をデフォルトで使用。Lambdaファンクションの名前に関わります。
あと大事な点として、LambdaBはinvokeするためにLambdaCの名前を知っておく必要があります。
今回は環境変数として設定するようにしてあります。
CloudFormationテンプレート
StateMachineをCloudFormationテンプレートで作成します。
こちらを参考に
AWSTemplateFormatVersion: "2010-09-09"
Description: "Create Step Function StateMachine"
Resources:
InvokeLambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Sid: StepFunctionsAssumeRolePolicy
Effect: Allow
Principal:
Service:
Fn::Join: [ ".", [ states, Ref: "AWS::Region", amazonaws, com ] ]
Action: sts:AssumeRole
Path: /
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaRole
TestStateMachine:
Type: "AWS::StepFunctions::StateMachine"
Properties:
RoleArn:
Fn::GetAtt: [ InvokeLambdaRole, Arn ]
DefinitionString: |-
{
"Comment": "An example of the Amazon States Language using wait states",
"StartAt": "FirstState",
"States": {
"FirstState": {
"Type": "Task",
"Resource": "arn:aws:lambda:${self:provider.region}:${self:custom.config.accountId}:function:${self:service}-${self:custom.config.stage}-first",
"Next": "wait_using_seconds"
},
"wait_using_seconds": {
"Type": "Wait",
"Seconds": 10,
"Next": "FinalState"
},
"FinalState": {
"Type": "Task",
"Resource": "arn:aws:lambda:${self:provider.region}:${self:custom.config.accountId}:function:${self:service}-${self:custom.config.stage}-second",
"End": true
}
}
}
Outputs:
StateMachineArn:
Value:
Ref: TestStateMachine
StateMachineName:
Value:
Fn::GetAtt: [ TestStateMachine, Name ]
StateMachineの定義の部分を取り出すとこうなります
{
"Comment": "An example of the Amazon States Language using wait states",
"StartAt": "FirstState",
"States": {
"FirstState": {
"Type": "Task",
"Resource": "arn:aws:lambda:${self:provider.region}:${self:custom.config.accountId}:function:${self:service}-${self:custom.config.stage}-first",
"Next": "wait_using_seconds"
},
"wait_using_seconds": {
"Type": "Wait",
"Seconds": 10,
"Next": "FinalState"
},
"FinalState": {
"Type": "Task",
"Resource": "arn:aws:lambda:${self:provider.region}:${self:custom.config.accountId}:function:${self:service}-${self:custom.config.stage}-second",
"End": true
}
}
}
これに関しては用意されているWaitStateのブループリントをほぼそのまま流用しています。
Lambdaファンクション実装
さて、Lambdaファンクション本体を実装していきますが、上述の通り実験用なのでかなり簡素です。
LambdaA
こいつの役割はIDの一覧を返すことですが、実験用なのでハードコーディングしたIDの配列を返します。
実用化のさいはAPIなりDBなりにアクセスして取得するようになるでしょう
"use strict";
const firstFunction = (event, context, callback) => {
console.log("Call First Function");
callback(null, {
ids: ["a", "b", "c", "d", "e", "f"]
});
};
module.exports = firstFunction;
Node.jsで実装する場合は、callbackの第二引数に入れた値がStateMachineに受け渡されます。
他の言語の場合は異なる可能性がありますので、お使いの言語に合わせて実装しましょう。
LambdaB
LambdaBはIDの一覧を分解して、それぞれに対してLambdaCをinvokeします。
LambdaAの結果をどうやって受け取るのかが気になるところでしたが、StepFunctionsで連携させた場合、LambdaAでcallbackの第二引数に入れた値がそのままevent変数として渡されます。
"use strict";
const AWS = require("aws-sdk");
const secondFunction = (event, context, callback) => {
console.log("Call Second Function");
console.log(event);
const targetLambdaArn = process.env.TARGET_LAMBDA_ARN;
if (!targetLambdaArn) {
console.log("no target");
return callback(null);
}
console.log(targetLambdaArn);
const lambda = new AWS.Lambda();
const ids = event.ids;
Promise.all(ids.map((id) => {
return lambda.invoke({
FunctionName: targetLambdaArn,
Payload: JSON.stringify({id: id})
}).promise();
})).then(() => {
return callback(null);
}).catch((err) => {
return callback(err);
});
};
module.exports = secondFunction;
ついでにNode.jsでのlambda.invoke()
ですが、FunctionNameはARNでも関数名でもいいようです。基本的に関数名で問題ないでしょうが、より確実性を求めるならARNで指定するのもありでしょう。
LambdaC
LambdaCでは各IDに応じた処理を行います。
実際はそのIDを使ってDBにアクセスみたいな処理になると思いますが、今回は単純にコンソールに出力するだけです。
また、ちゃんと並列になってるかの確認もしたいので、現在のタイムスタンプを出力して2秒ほど待ってからcallbackします。
並列処理になっていれば各タイムスタンプはほぼ同じ時刻を示すはずです。
"use strict";
const moment = require("moment");
const parallelFunction = (event, context, callback) => {
console.log(event);
console.log(`now: ${moment().format("X")}`);
setTimeout(() => {
callback(null);
}, 2000)
};
module.exports = parallelFunction;
実行編
ではデプロイします。
sls deploy
うまくいっていればStepFunctionsのコンソールにStateMachineが追加されています。
「New execution」をクリックして、最初のファンクションに与えるインプットを入力するとStateMachineが動き出します。
実行中はこんな感じで今どの処理をやってるか確認できます。
この図ですとFirstState(LambdaA)の処理が正常に終了し、Waiting中です。
ログ確認
では懸案だったLambdaCの並列分散処理はうまくいっているのか、ログを見てみましょう。
ほぼ同時にLogStreamが複数できているので、どうやら分散化はできているようです。
次にコンソールに出力したタイムスタンプを見てみます。
配列内で先頭と末尾であったaとfを比べてみましょう。
タイムスタンプはmoment().format("X")
で出力しているので、秒単位のUNIXタイムスタンプです。
ログを見たところタイムスタンプは同じなので、1秒以内に両者は実行されていたことになります。2秒スリープを入れているのにほぼ同時に実行されているので、並列に実行されているとみなして良いかと思います。
エピローグ ~感想と今後の課題~
さて、初めてStepFunctionsを使ってみたわけですが、当初は「StepFunctionsの中にStateMachineがあって...え〜っと...よくわからん」な感じでしたが、いざ使ってみたらそんなに難しいものではありませんでした。
複雑な処理とかはできるだけLambdaを分割してStepFunctionsで連携させたいところですね。プロダクションにも取り入れるつもりです。
問題なのはStateMachineを実行する手段がコンソールから手動実行かAPIをコールするぐらいで、イベントをトリガーにできないこと。
イベントドリブンで色々やろうとしたら、まずはイベントをトリガーにLambdaを起動し、その中でStateMachineを動かすという手段を取らなければいけません。正直言ってこれはあまりイケてない。
今後の機能追加に期待ですね。
以上、これからStepFunctionsを使おうとしている方の参考になれば幸いです。