re:Invent 2016 で Step Functions というサービスがローンチされました!
幾つかのステップに分かれる処理を Lambda で構築するときに、処理のステート管理や処理間のコーディネートを行ってくれるサービスです。
これまでは、このような処理を組もうとすると「本来行いたい処理に関するコード」以外に「状態を管理するコード」を書く必要がありましたが、
Step Functions を用いることで、疎結合なメンテのしやすい Serverless バッチシステムを構築できるようになります。
個人的には今年の re:Invent で一番熱いリリースです!早速触ってみました!
(※2017/12 により具体的なサービスに使った記事を書いてみましたので、よろしければこちらもご覧ください。)
AWS Lambda と Step Functions で作るサーバレスなアービトラージ検知システム
Goal
- Step Functions の概要を紹介します
- なぜ Step Functions が必要なのか
- Step FUnctions とは何か
- Step Functions を実際に使って、簡単な Serverless バッチシステムを構築するハンズオンを用意しました
- 紹介が不要で、ハンズオンだけ行いたい方は こちら まで飛んで下さい
Why Step Functions?
例えば、下記のような処理を作るとします。
- ユーザが画像をアップロード
- 画像をS3に保存する
- 画像のサムネイルを作る
- サムネイルをS3に保存する
- 画像に関する情報(アップロードしたユーザ、各画像のパスなど)をDynamoDBに書き込む
- ...
作り方は幾つか候補がありますが、それぞれデメリットがあります。
1. ひとつの Lambda Function で全ての処理を書く
簡単に作れますが、全体の処理が長くメンテしにくいコードになってしまいます。
また、処理が途中でエラーになったときの再ランが難しくなります。
2. 複数の Lambda に分け、Lambda Function から Lambda function を呼ぶようにする
コードのメンテナンス性は多少上がるかもしれませんが、エラーが発生したときに「どこまで処理が完了したか」というステートを管理できておらず、再ラン性は低いままです。
また、それぞれの Lambda Function に「次に呼ぶ function の情報」を埋め込むことになるので、密結合なコードになってしまいます。
3. 複数 Lambda & DynamoDB に状態を持たせる
上の2つに比べるとかなりよいと思います。
実際、Step Functions が登場するまでは、この方式か後述する SQS を用いる設計が多かったと思います。
しかし、「DynamoDBに○○の情報を突っ込む」というような本来の処理とは関係のないコードが存在しているという意味では疎結合にはなっていません。
また、DynamoDBのRead/Writeのキャパシティを確保しておく必要があります。
このようなバッチシステムは「深夜2時間で全件終わらせる」といったような、1日通してみると大したことないが、短い時間に処理が集中することが多いです。
DynamoDB のキャパシティはもっとも高い負荷に合わせて確保する必要があるため、要件によっては費用がかさんでしまうことが考えられます。
4. 複数 Lambda を SQS で繋ぐ
この4つの方式の中では一番多く使われているのではないかと思います。
しかし、「SQSに○○の情報を突っ込む」といった本来の処理とは関係のないコードを書く必要があります。
DynamoDB と同じく、もう少し疎結合に作りたいところです。
What's Step Functions
複数の Lambda 間で引き渡す状態の管理を Lambda function の中で管理するのではなく、
コードの外側でコーディネートするサービスです。
JSON 形式で状態の遷移をステートマシーンとして定義します。
例えば、
- まず、「S3 に画像をアップロードする」Lambda ファンクションを実行する。
- アップロードが完了したら次に「その情報を RDS に書き込む」Lambda ファンクションを実行する。
というようステートマシーンを定義する場合は、下記のような JSON で定義できます。
{
"StartAt": "Save image to S3",
"States": {
"Save image to S3": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:*************:function:save_image_to_S3",
"Next": "Update record of RDS"
},
"Update record of RDS": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:*************:function:insert_image_info_to_RDS",
"End": true
}
}
}
この例では Task
という Type を使い、シーケンシャルに処理を実行しています。
それ以外にも、
-
"Type": "Choice"
前段の処理結果によって後続の処理を分岐する -
"Type": "Parallel"
複数の処理を並行して行わせ全てが完了したら後続のタスクを実施する -
"Type": "Wait"
指定した間隔、ないしは指定した日時まで処理を待つ
などの Task が用意されており、これらを組み合わせてステートマシーンを定義します。
ハンズオン:Step Functions を使って Serverless バッチ環境を構築する
例として、下記のような処理をしたいときに、Step Functions を構築してみたいと思います。
(Lambda Function のコードについては、この記事ではサボらせてもらいます。ただし、最初の API だけはその返り値によって処理を変えたりしたいので一部実装します。)
- 外部のAPIを呼び出す
- 処理が成功したら DynamoDB の課金日時を更新
- 処理が失敗したら下記の処理を並列で行う
- DynamoDB の課金フラグを false に
- ユーザに SNS でメール送信する
"Type": "Task"
、"Type": "Choice"
、"Type": "Parallel"
の3タイプを用いる例になります。
最終的には、下記のような Step Functions を作成します。
1. 外部APIを呼び出す部分を作成する
Lambda Function api_call
を作成します。
Input として user_id を受け取り、処理結果を response_cd
として返します。
(API 呼び出しの部分は実装せず、決め打ちで response_cd を返すだけのものです。)
import json
def lambda_handler(event, context):
# TODO: API Call
return {u'response_cd': u'1', u'user_id': event[u'user_id']}
次に、この Lambda を呼び出すだけの Step Functions を作成します。
まず、Step Functions の画面で「GET Started」をクリックします。
次に、ステートマシーンの名前を決めます。
その下に Blue Print がありますが、今回は使用しません。
次に、Code の部分に先ほど作成した Lambda Function を呼び出すタスクを定義した JSON を貼り付けます。
{
"StartAt": "1_APICall",
"States": {
"1_APICall": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxx:function: api_call",
"End": true
}
}
}
Preview を更新すると、ステートマシーンがプレビューされます。
下記のようなステートマシーンになっていることを確認し、Create Status ボタンを押して次に進みます。
次に Step Functions に IAM ロールを付与します。
デフォルトでロールが用意されているのでそれをそのまま使います。
OK を押して、そのまま進みましょう。
デフォルトロールの中身は、Lambda の実行権限だけを付与する形になっています。
おそらく Step Functions にこれ以外の Action を定義する必要はないので、強いてカスタマイズするなら Resource で実行を許す Lambda Function を限定すると言ったところでしょうか。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": "*"
}
]
}
Step Functions の作成がうまくいくと下記のような画面になります。
続いて、この時点で処理がうまく実行できるかを見たいので、Nex execution を押してください。
下記の画面で、実行時の Input を与えます。
今回は、user_id だけ付与して、Start Execution を押下してください。
{
"user_id": "12345"
}
処理がうまくいったら下記のような画面になるはずです。
また、右側の画面で想定する output が返ってきているかも確認できます。
2. 分岐処理を作成する
"Type": "Choice"
を用いて分岐処理を作ります。
まず、update_dynamoDB_payment_date
、update_dynamoDB_payment_flg
、send_email
という3つの Lambda Function を作成してください。中身は空で問題ありません。
次に、Step Functions を新たに作成します。
(後述しますが、今の時点では既存の Step Function を修正することができません。)
この時点では、失敗した処理は update_dynamoDB_payment_flg
だけで問題ありません。
のちほど並列実行するように step functions を修正するタイミングで、send_email
を追加します。
{
"Comment": "An example of Step Functions",
"StartAt": "1_APICall",
"States": {
"1_APICall": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxx:function: api_call",
"Next": "isAPICallSuccessed"
},
"isAPICallSuccessed": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.response_cd",
"StringEquals": "1",
"Next": "2_APICallSuccessed"
},
{
"Variable": "$.response_cd",
"StringEquals": "-1",
"Next": "3_APICallFailed"
}
],
"Default": "3_APICallFailed"
},
"2_APICallSuccessed": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxx:function: update_dynamoDB_payment_date",
"End": true
},
"3_APICallFailed": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxx:function: update_dynamoDB_payment_flg",
"End": true
}
}
}
簡単に説明すると、Choices
の中で前段のインプットを参照し、ステータスコードによってたどるフローを振り分けています。
また、該当する値がない場合は Default
のフローに遷移します。
下記のようなステートマシーンができあがったでしょうか。
試しに実行してみましょう。
今は Lambda Function api_call
の中で、responce_cd を 1 で返すようにハードコーディングしているので、成功したフローを辿ります。
Lambda Function の responce_cd を1以外にすると、右側の失敗フローを辿ります。
3. 並列処理を作成する
新たに Step Functions を作成しましょう。
{
"Comment": "An example of Step Functions",
"StartAt": "1_APICall",
"States": {
"1_APICall": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxx:function: api_call",
"Next": "isAPICallSuccessed"
},
"isAPICallSuccessed": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.response_cd",
"StringEquals": "1",
"Next": "2_APICallSuccessed"
},
{
"Variable": "$.response_cd",
"StringEquals": "2",
"Next": "3_APICallFailed"
}
],
"Default": "3_APICallFailed"
},
"2_APICallSuccessed": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxx:function: update_dynamoDB_payment_date",
"End": true
},
"3_APICallFailed": {
"Type": "Parallel",
"Next": "3_APICallFailed_End",
"Branches": [
{
"StartAt": "3_1_UpdatePaymentFlg",
"States": {
"3_1_UpdatePaymentFlg": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxx:function: update_dynamoDB_payment_flg",
"End": true
}
}
},
{
"StartAt": "3_1_SendEmail",
"States": {
"3_1_SendEmail": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxx:function: send_email",
"End": true
}
}
}
]
},
"3_APICallFailed_End": {
"Type": "Pass",
"End": true
}
}
}
長くなってきましたね。
もともと 3_APICallFailed
を "Type": "Task"
として定義していましたが、これを "Type": "Parallel"
としました。
Branches
の中で個別のフローを書くことができます。
今回は各 Branche に1つづつしか処理を書いていませんが、複数の処理をシーケンシャルに行うように定義することも可能です。
下記のようなステートマシーンができましたでしょうか。
試しに実行してみます。Labmda Function api_call
が返却する response_cd
を 1以外にしています。
すぐに処理が終わってしまうので、並行に実行しているかわかりませんが、できているはずです。
間に "Type": "Wait"
を挟むことでわかりやすくなります。
元々の Parallel の BluePrint が Wait を挟む処理になっているので、見てみてみると参考になります。
4. (おまけ)Lambda の実装
この記事では実装をサボってしまいましたが、下記の Lambda ファンクションを実装することで、当初やりたかったことができると思います。
- api_call
- update_dynamoDB_payment_date
- update_dynamoDB_payment_flg
- send_email
Step Functions を使った感想
もともと Simple Work Flow のような形で疎結合なバッチを Lambda でも作れないかなーと思っていたので、個人的にはとてもありがたいアップデートです。
それほど学習コストなく直感的に作れるのも非常にいいですね。
ただし、実際に使ってみた結果、幾つか課題や検討する必要があることがあると感じたのでそれらを記載して終わりにしたいと思います。
一度作ったステートマシーンを修正できない
動きを変えたい場合は、一からステートマシーンを作り直す必要があります。
おそらく、過去の実行履歴を残すためだとは思うのですが、都度都度名前を決めて、使う IAM ロールを決めて、、というのが少々煩わしいです。
例えば、編集するとこれまでの実行履歴が消える縛りを入れてよいので JSON だけを修正できるようになると開発段階ではありがたいです。
ただし、CloudFormation で構築すれば、削除→新規作成の流れが自動化できるかもしれないので、それほど問題にならないかもしれません。
別途試してみたいと思います。
JSON でステートマシーンを定義する難しさ
ハンズオンのような簡単な処理なら問題ないですが、複雑な処理を JSON で書くのは無理だと思いました。
特に "Type": "Parallel"
を複数使った処理を書いたりすると、階層がどんどん深くなってしまい、メンテするのが非常に大変だと感じました。
もう少し簡単に書けるようなライブラリを作る必要がありそうです。
せめて YAML でも書けるようにしてほしい。AWS様お願いします!(それでも辛そう。。)
また、JSON の形式が誤っていたときはエラーで弾かれるのですが、どこが間違っているかのヒントが全くないので、切り分けがとても大変でした。
処理のロールバックをどう行うか
例えば "Type": "Parallel"
で3並列に処理を行ったときに、そのうちの1つがエラーになったとします。
他の2つの処理が終わっている場合、全体として不整合な状態になってしまいます。
この不整合を取り除くにはどうすればいいかがイマイチわかっていません。
資料を詳細に読み込めばそのような機能があるかもしれないので、調べてみたいと思います。
function 分割のベストプラクティスは?
なんでもかんでも分割すればいいわけではないと思います。
上のハンズオンのような処理であれば、一つの Lambda Function で書いてしまってもいいかなーと思いました。
どこまで処理を細分化して再利用可能にするかはケースバイケースだと思いますが、ここらへんのベスプラを考えてみたいです。
見積もりの難しさ
Step Functions は、状態遷移ごとに 0.0000025 ドルの課金になります。(2.5¢/1,000 state transitions)
もともと Lambda もリクエスト回数や処理の長さによる課金なので、厳密に見積もるのは難しいのですが、それがさらに難しくなったと思います。
これらのリクエストベースなサービスを使うのに、見積もりをしようとがナンセンスなのかもしれませんが、ざっくりでも出しておきたいシーンはあると思います。
Lambda だけであればどれくらいのリクエスト数がくるかを仮定することでざっくりと見積もれましたが、Step Functions の場合は処理の分割を設計しないと見積もりができないので、提案段階で月々のランニングコストは全く見えないなと思いました。
また、function 分割の話にも関連しますが、処理を分割すればするほど、状態遷移が多くなり課金額が増えてしまいます。
設計上はなるべく疎結合にするのがあるべき姿だと思いますが、そうした結果、課金額が増えてしまうのがうーん、という感じました。
まとめ
- Step Functions の説明
- Step Functions のハンズオン
- Step Functions の問題点
について書いてみました。
ざっと検証しただけなので、間違っている部分があるかもしれません。
何かあればご指摘いただければと思います。
最後に悪い話をたくさん書いてしまいましたが、それでも非常にいいサービスだと思います。
実際に使って運用してみないと見えてこないことが多いと思うので、まずは個人で作っているサービスに導入してみたいと思っています。