本記事はAWS LambdaとServerless Advent Calendar 2021、9日目の記事です。
感想
- かつてLambdaに初めて触れたときの感動を、ワークフローでも感じた
- Step Functionsはいいぞ
背景~今年あった大きな進化
・ワークフロースタジオ爆誕
・AWS SDK統合で200を超えるサービスサポート
ワークフロースタジオによって、ベースのJSONがGUIベースで生み出されるようになりました。これによりAmazon States Languageを理解していなくても、ステートマシンを動かすところまで先に行けることになり、ハードルが大幅に低くなっています。
また、何か処理をするときにひたすらLambda、、、ではなく連携サービスが増えたので、DynamoDBの更新などもワークフローベースで行うことができます。単に夢が広がりますし、考えただけでもわくわくします。
さすがにこれは使ってみるタイミングが来ておるな、と思い触ってみました。
色々な記事もありますが、1つの適用事例として残せればと思い投稿してみました。
対象とした処理
ちょうど以下のような要件でバッチ処理を作成するタイミングがあり、適用しました
- 毎週木曜日の9時頃に、週1回実行される
- 朝5時~10時頃に、機嫌次第のタイミングで更新されるデータを扱う
- データが更新されていることを確認する
- 更新されたデータを抽出する
- Slackに通知も欲しい
Step Functionsが無い世界の場合
EC2でPythonか何かで、それぞれの処理を書くのだと思います。厄介な考慮ポイントとして「データが更新されていることを確認する」の部分で、プログラムを動かし続けるか、定期的に起動しつつ完了ステータスを何かで管理する、みたいなことになるかなと思います。書けるけど、中々に面倒です。
Step Functionsでやってみた
作成したワークフロー
Start
毎週木曜日の9時頃に週1回だけ実行される、という点を解決します。
ステートマシンのアクションより「EventBridge (CloudWatch イベント) ルールを作成」を選択します。
EventBridgeの設定~パターンを定義
ラジオボタンから「スケジュール」を選択し、Cron式で「30 0 ? * THU *」が今回の要件を満たすものとなります。以後10回のトリガー日、としてローカルタイムゾーンでも確認可能なのが良いですね。式を誤っているとそれが表示されないのも便利です。
EventBridgeの設定~ターゲットを選択
これだけで完了です!
##Task:昨日のレコードの存在確認
色々と方法はあるのですが、今回は以下のようなLambdaを作成しました。
- SQLが格納されたS3のパスを受け取る
- SQLを実行した1行目の結果を返却する
以下のようにEnter Payloadを選択します。
{
"databaseInformation": {
"region": "ap-northeast-1",
"sqlBucket": "xxxxxxxxxxxxx",
"sqlTemplatePath": "xxxxx/1-line-selector.sql"
}
}
今回隠れ要件としてBigQueryへの接続となります(AWSのアドベントカレンダーで...)。
とはいえ連携サービス以外で「何らかの処理の結果の件数を返す」はあると思います。
try
client = bigquery.Client()
results = client.query(query).result()
return_value=''
for line in results:
return_value = line[0]
return {
'statusCode': 200,
'return_value': return_value
}
except:
return {
'statusCode': 500,
'return_value': json.dumps(str(sys.exc_info()))
}
##Choice: レコードの確認結果
Rule #1 次へ
上記のLambdaで return_value
として返しているものを、以下のような形でハンドリングします。
$.statusCode == 200 and $.return_value > 0
Then next state is:
データが見つかったのでINSERTを開始する通知をSlackへ送る
Rule #3 リトライ
上記と同様ですが
$.statusCode == 200 and $.return_value == 0
の場合に、
Then next state is:
再実行を待つ通知をSlackへ送る
としています。
なおRule#2を失敗と定義しています。
気持ち的に最後の数字を失敗としてつけたかったのですが、ワークフローの線が交差しないようにするために変な調整を入れています。図の可視性も大事だと思うので、、、!
ちなみに正常系として受け取り数値を確認するこの方法だと、データ更新がされないとおそらく無限に実行し続けます。
それを回避する場合は今回の場合ですとレコードが0件だった場合はLambdaでエラーを返すようにして、Lambdaのリトライによって制御する、という方法があるようです。
今回の要件においては、そもそもデータが更新されてないとそれはそれでとても困る話なのと、10分間隔なので、むしろリトライは入れないほうが良い、と判断しています。
##Task:再実行を待つ通知をSlackへ送る、ほか
Slack連携はSNS経由するとか何かいろいろある気もしましたが、ペイロード渡せる便利さを考え、汎用的に通知を送るLambdaを作成しました。
コードは割愛しますが、以下のようなペイロードを渡します。
{
"username": "データ更新を待つ猫",
"icon_emoji": ":cat2:",
"text": "まだデータが更新されていなかったようなので10分待って再実行します!",
"hooksurl": "https://hooks.slack.com/services/ZZZZZZZZZZZZZZZZ"
}
##Wait
ここは文字通りですね。600秒待ってみます。
##Task:1週間分のデータINSERT
こちらは今回ビジネスロジック的なものをLambdaで書いてしまったので内容は割愛します。
今回は処理の成功(200)、失敗(500)のどちらかで判断するようにしましたが、エラーの内容によって通知する担当を変える、みたいなこともできるでしょうし夢は広がります。
##Choice:1週間分のデータINSERT結果
ここでは
$.statusCode == 500
の時に「処理の失敗のお知らせ...」へと遷移するようにだけ設定しています。
##Task:処理の失敗のお知らせ...
これも上記のSlack通知用Lambdaを再び使っています。ここのChoieで失敗したよ、という内容を通知しています。
##Task:1週間分のデータ更新Lambda完了
こちらもSlack通知です。
正常系の完了通知は、見ると心が安らぐので、私は必須で入れる派です。
これにて、ワークフローが無事完成しました!
本記事で触れていないが適用時に注意が要る点
以下、運用時、適用時の注意点となります。
- 複数人で開発するときのステートマシンごとのIAMロールの考え方
- 多重実行まわりについて
前者については、ステートマシンごとにIAMロールを作るのが王道のようなのですが、とはいえ組織に応じたIAMロールの管理、メンテ運用面の話の考慮が必要になってくると考えます。
後者については、EventBridgeを経由した場合のStep Functions自体と、invokeによるLambdaの同期実行です。本記事投稿時点のドキュメントの解釈が、あくまで個人的にはですが難しく、また、様々な情報があるように見えまして調査をしております( 本記事に追記するかもしれません しました)。
追記:多重実行まわり
Step Functions自体の多重実行
今回のケースですと、されてしまいます。
参考:1 つのイベントに応じてルールが複数回トリガーされた
Lambdaの同期実行
InvocationTypeを指定しない場合は同期実行となり、その場合は重複実行されないようです。今回のケースですと大丈夫で、プログラムを動かす的な用途にするのであれば、あえてInvocationTypeをeventにすることはない(非同期実行にすることはない)と思うので、こちらは踏むことは少ない気はします。
使って良かった点、感想
- EC2からcrontabでバッチ処理をしようとしていたものが解消された
- EC2の障害やメンテナンスに影響を受けない
- サーバーレスのバッチ処理最高
サーバーレスなので当たり前なのですが、何かの処理を考えるときにEC2が間にどうしても来てしまうのが、ワークフローだけを直接意識するようになるのはとても良いです。特に今回のようなアドホックな定期実行に対してはかなり心理的ハードルも下がります。
サーバーレスって良いなぁ
その他
以下のイベントも最高に面白かったです。
[AWS EXpert Online for JAWS-UG 18] 見せてやるよ、Step Functions の本気ってやつをな